Skip to content

Explore Module

High-level analysis and visualization tools for climate data exploration.

Overview

The climakitae.explore module provides user-friendly classes and functions for climate data analysis, including: - Warming level analysis — Analyze data by global warming thresholds - Time series visualization — Plot multi-model ensemble trends - Uncertainty quantification — CMIP6 ensemble analysis - Threshold analysis — Climate threshold and exceedance analysis - Typical meteorological year — TMY analysis for engineering applications - Vulnerability assessment — Climate vulnerability analysis tools

Warming Levels

A container for all of the warming levels-related functionality: - A pared-down Select panel, under "choose_data" - a "calculate" step where most of the waiting occurs - an optional "visualize" panel, as an instance of WarmingLevelVisualize - postage stamps from visualize "main" tab are accessible via "gwl_snapshots" - data sliced around gwl window retrieved from "sliced_data"

Source code in climakitae/explore/warming.py
def __init__(self):
    self.wl_params = WarmingLevelChoose()
    # self.warming_levels = ["0.8", "1.2", "1.5", "2.0", "3.0", "4.0"]
    self.warming_levels = _check_available_warming_levels()
    self.gwl_times = None  # Placeholder for the warming level times
    self.catalog_data = None  # Placeholder for the catalog data

find_warming_slice(level, gwl_times)

Find the warming slice data for the current level from the catalog data.

Parameters:

Name Type Description Default
level str

The warming level to find the slice for.

required
gwl_times DataFrame

The DataFrame containing the warming level times.

required

Returns:

Type Description
DataArray

The warming slice data for the specified level.

Source code in climakitae/explore/warming.py
def find_warming_slice(self, level: str, gwl_times: pd.DataFrame) -> xr.DataArray:
    """
    Find the warming slice data for the current level from the catalog data.

    Parameters
    ----------
    level: str
        The warming level to find the slice for.
    gwl_times: pd.DataFrame
        The DataFrame containing the warming level times.

    Returns
    -------
    xr.DataArray
        The warming slice data for the specified level.
    """
    warming_data = self.catalog_data.groupby("all_sims").map(
        get_sliced_data,
        level=level,
        years=gwl_times,
        months=self.wl_params.months,
        window=self.wl_params.window,
        anom=self.wl_params.anom,
    )
    warming_data = warming_data.expand_dims({"warming_level": [level]})
    warming_data = warming_data.assign_attrs(
        window=self.wl_params.window, months=self.wl_params.months
    )

    # Cleaning data
    warming_data = clean_warm_data(warming_data)

    # Relabeling `all_sims` dimension
    new_warm_data = warming_data.drop_vars("all_sims")
    new_warm_data["all_sims"] = relabel_axis(warming_data["all_sims"])
    return new_warm_data

calculate()

Calculate the warming levels for the selected parameters.

This function retrieves the data from the catalog, slices it according to the warming levels, and stores the results in the sliced_data and gwl_snapshots attributes.

Source code in climakitae/explore/warming.py
def calculate(self):
    """
    Calculate the warming levels for the selected parameters.

    This function retrieves the data from the catalog, slices it according to the
    warming levels, and stores the results in the `sliced_data` and `gwl_snapshots`
    attributes.
    """
    # manually reset to all SSPs, in case it was inadvertently changed by
    # temporarily have ['Dynamical','Statistical'] for downscaling_method
    self.wl_params.scenario_ssp = SSPS

    # Postage data and anomalies
    self.catalog_data = self.wl_params.retrieve()
    self.catalog_data = self.catalog_data.stack(all_sims=["simulation", "scenario"])

    # Dropping invalid simulations that come up from stacking scenarios and simulations together
    self.catalog_data = _drop_invalid_sims(self.catalog_data, self.wl_params)

    if self.wl_params.anom == "Yes":
        self.gwl_times = read_csv_file(GWL_1981_2010_FILE, index_col=[0, 1, 2])
    else:
        self.gwl_times = read_csv_file(GWL_1850_1900_FILE, index_col=[0, 1, 2])
    self.gwl_times = self.gwl_times.dropna(how="all")
    self.catalog_data = clean_list(self.catalog_data, self.gwl_times)

    self.sliced_data = {}
    self.gwl_snapshots = {}
    for level in tqdm(
        self.wl_params.warming_levels, desc="Computing each warming level"
    ):
        warm_slice = self.find_warming_slice(level, self.gwl_times)
        if self.wl_params.load_data:
            warm_slice = load(warm_slice, progress_bar=True)

        # Add GWL snapshots
        self.gwl_snapshots[level] = warm_slice.mean(dim="time", skipna=True)

        # Renaming time dimension for warming slice once "time" is all computed on
        freq_strs = {"monthly": "months", "daily": "days", "hourly": "hours"}
        warm_slice = warm_slice.rename(
            {"time": f"{freq_strs[warm_slice.frequency]}_from_center"}
        )
        self.sliced_data[level] = warm_slice

    self.gwl_snapshots = xr.concat(self.gwl_snapshots.values(), dim="warming_level")

Time Series Visualization

Holds the instance of TimeSeriesParameters that is used for the following purposes: 1) to display a panel that previews various time-series transforms (explore), and 2) to save the transform represented by the current state of that preview into a new variable (output_current).

Parameters:

Name Type Description Default
data DataArray

Time series array with no spatial coordinates.

required

Attributes:

Name Type Description
choices TimeSeriesParameters

Param object containing time series data and analysis parameters.

Source code in climakitae/explore/timeseries.py
def __init__(self, data: xr.DataArray):
    if (
        type(data) != xr.core.dataarray.DataArray
    ):  # Data is NOT in the form of xr.DataArray
        raise ValueError(
            "Please pass an xarray DataArray (e.g. as output by DataParameters.retrieve())."
        )
    else:
        raise_error = False
        error_message = ""
        if "lat" in data.coords:  # Data is NOT area averaged
            raise_error = True
            error_message += "Please pass a timeseries (area average)."
        if not any(
            ["Historical + " in v for v in data.scenario.values]
        ):  # Append historical = False
            if raise_error == True:
                error_message += "\n"
            else:
                raise_error = True
            error_message += (
                "Please append the historical period in your data retrieval."
            )
        if raise_error:  # If any errors
            raise ValueError(error_message)

    self.choices = TimeSeriesParameters(data)

output_current()

Output the current attributes of the class to a DataArray object. Allows the data to be easily accessed by the user after modifying the attributes directly in the explore panel, for example.

Returns:

Type Description
DataArray
Source code in climakitae/explore/timeseries.py
def output_current(self) -> xr.DataArray:
    """Output the current attributes of the class to a DataArray object.
    Allows the data to be easily accessed by the user after modifying the attributes directly in the explore panel, for example.

    Returns
    -------
    xr.DataArray

    """
    to_output = self.choices.transform_data()
    attrs_to_add = dict(self.choices.param.values())
    to_output = _update_attrs(to_output, attrs_to_add)
    return to_output

Uncertainty Analysis (CMIP Ensemble Optimization)

A class for holding relevant data options for cmip preprocessing

Attributes:

Name Type Description
variable str

variable name, cf-compliant (or cmip6 variable name)

area_subset str

geographic boundary name (states/counties)

location str

geographic area name (name of county/state)

timescale str

frequency of data

area_average bool

average computed across domain

Methods:

Name Description
_cmip_clip

CMIP6-specific subsetting

Source code in climakitae/explore/uncertainty.py
def __init__(
    self,
    variable: str = "tas",  ## set up for temp uncertainty notebook
    area_subset: str = "states",
    location: str = "California",
    timescale: str = "monthly",
    area_average: bool = True,
) -> None:
    self.variable = variable
    self.area_subset = area_subset
    self.location = location
    self.timescale = timescale
    self.area_average = area_average

Typical Meteorological Year

Encapsulate the code needed to generate Typical Meteorological Year (TMY) files.

Uses WRF hourly data to produce TMYs. User provides the start and end years along with location to generate file.

How to set location: The location can either be provided as latitude and longitude coordinates or as the name of a HadISD station in California. Do not set latitude and longitude if using a HadISD station. If latitude and longitude are set along with a custom value for station_name (NOT a HadISD station), the custom station name will be used in file headers where appropriate.

How to set time period: The time period can either be set with a time approach using start_year and end_year or with a warming level approach using warming_level. If the warming level approach is used, a 30-year period is obtained centered around the given warming level and the start and end years are taken for that warming level.

Parameters:

Name Type Description Default
start_year str

Initial year of TMY period (time approach)

UNSET
end_year str

Final year of TMY period (time approach)

UNSET
warming_level float | int

Desired warming level (warming level approach)

UNSET
station_name str

Long name of desired station

UNSET
latitude float | int(optional)

Latitude for TMY data if station_name not set

UNSET
longitude float | int(optional)

Longitude for TMY data if station_name not set

UNSET
verbose bool

True to increase verbosity

True

Attributes:

Name Type Description
start_year str

Initial year of TMY period

end_year str

Final year of TMY period

warming_level float | int

Warming level value

lat_range tuple

Pair of latitudes that bracket latitude

lon_range tuple

Pair of longitudes that bracket longitude

simulations list[str]

List of included simulations

scenario list[str]

List of scenarios

vars_and_units dict[str, str]

Dictionary of all required variables and units

verbose bool

True to increase verbosity

cdf_climatology Dataset

CDF climatology data

cdf_monthly Dataset

CDF monthly data by model

weighted_fs_sum Dataset

Weighted F-S statistic results

top_months DataFrame

Table of top months by model

all_vars Dataset

All loaded variables for TMY

tmy_data_to_export dict[Dataframe]

Dictionary of TMY results by simulation

_skip_last bool

Internal flag to track last year for warming level approach

Source code in climakitae/explore/typical_meteorological_year.py
def __init__(
    self,
    start_year: int = UNSET,
    end_year: int = UNSET,
    warming_level: float | int = UNSET,
    station_name: str = UNSET,
    latitude: float | int = UNSET,
    longitude: float | int = UNSET,
    verbose: bool = True,
):

    # Here we go through a few different ways to get the TMY location
    match latitude, longitude, station_name:
        # UNSET will match to object type
        # Case 1: All variables set
        case float() | int(), float() | int(), str():
            if is_HadISD(station_name):
                raise ValueError(
                    "Do not set `latitude` and `longitude` when using a HadISD station for `station_name`. Change `station_name` value if using custom location."
                )
            else:
                print(
                    f"Initializing TMY object for custom location: {latitude} N, {longitude} W with name '{station_name}'."
                )
                self._set_loc_from_lat_lon(latitude, longitude)
                self.stn_name = station_name
        # Case 2: lat/lon provided, no station_name string
        case float() | int(), float() | int(), object():
            print(
                f"Initializing TMY object for custom location: {latitude} N, {longitude} W."
            )
            self._set_loc_from_lat_lon(latitude, longitude)
        # Case 3: station name provided, lat/lon not numeric
        case object(), object(), str():
            print(f"Initializing TMY object for {station_name}.")
            self._set_loc_from_stn_name(station_name)
        # Last case: something else provided
        case _:
            raise ValueError(
                "No valid station name or latitude and longitude provided."
            )
    # Time variables
    match start_year, end_year, warming_level:
        # User set all options - bad
        case float() | int(), float() | int(), float() | int():
            raise ValueError(
                "Variables `start_year` and `end_year` cannot be paired with `warming_level`. Set either `start_year` and `end_year` OR `warming_level."
            )
        # Some other combo - unset variable will be saved as UNSET
        case _:
            self.start_year = start_year
            self.end_year = end_year
            self.warming_level = warming_level
            if isinstance(self.warming_level, int):
                self.warming_level = float(self.warming_level)
    # Whether to drop the last month as a possible match
    self._skip_last = False
    if self.warming_level:
        # True for warming levels because final hours get lost to UTC conversion
        self._skip_last = True
    # Ranges used in get_data to pull smaller dataset without warnings
    self.lat_range = (self.stn_lat - 0.1, self.stn_lat + 0.1)
    self.lon_range = (self.stn_lon - 0.1, self.stn_lon + 0.1)
    # These 4 simulations have the solar variables needed
    self.simulations = [
        "WRF_EC-Earth3_r1i1p1f1",
        "WRF_MPI-ESM1-2-HR_r3i1p1f1",
        "WRF_TaiESM1_r1i1p1f1",
        "WRF_MIROC6_r1i1p1f1",
    ]
    # Data only available for these scenarios
    self.scenario = ["Historical Climate", "SSP 3-7.0"]
    # Raw catalog variables to fetch (variable_id → display name)
    # These are fetched directly from the catalog in their native units.
    self._raw_vars = {
        "t2": "Air Temperature at 2m",
        "q2": "Water Vapor Mixing Ratio at 2m",
        "psfc": "Surface Pressure",
        "u10": "u10",
        "v10": "v10",
        "swdnb": "Instantaneous downwelling shortwave flux at bottom",
        "swddni": "Shortwave surface downward direct normal irradiance",
        "swddif": "Shortwave surface downward diffuse irradiance",
        "lwdnb": "Instantaneous downwelling longwave flux at bottom",
    }
    # Full set of TMY variables (including derived) with desired units.
    # Used for display name references throughout the rest of the TMY code.
    self.vars_and_units = {
        "Air Temperature at 2m": "degC",
        "Dew point temperature": "degC",
        "Relative humidity": "[0 to 100]",
        "Instantaneous downwelling shortwave flux at bottom": "W/m2",
        "Shortwave surface downward direct normal irradiance": "W/m2",
        "Shortwave surface downward diffuse irradiance": "W/m2",
        "Instantaneous downwelling longwave flux at bottom": "W/m2",
        "Wind speed at 10m": "m s-1",
        "Wind direction at 10m": "degrees",
        "Surface Pressure": "Pa",
        "Water Vapor Mixing Ratio at 2m": "g kg-1",
    }
    self.verbose = verbose
    # These will get set later in analysis
    self.cdf_climatology = UNSET
    self.cdf_monthly = UNSET
    self.weighted_fs_sum = UNSET
    self.top_months = UNSET
    self.all_vars = UNSET
    self.tmy_data_to_export = UNSET

load_all_variables()

Load hourly TMY variables and derive daily statistics for CDF/F-S.

Fetches hourly raw variables via ClimateData for the 8760 profile assembly, then derives ALL daily statistics from the hourly data in local time. This matches the original TMY code's approach and avoids two problems with fetching daily catalog variables directly:

  1. UTC vs local time: Catalog daily variables are pre-aggregated over UTC day boundaries, which differ from local-time days by the station's UTC offset (e.g., 8 hours for California).
  2. Non-determinism: With a Dask distributed client active, lazy reductions (.resample().sum()) can produce slightly different floating-point results on each run due to non-deterministic task ordering. Computing hourly data eagerly to numpy before resampling guarantees deterministic daily statistics.
Source code in climakitae/explore/typical_meteorological_year.py
def load_all_variables(self):
    """Load hourly TMY variables and derive daily statistics for CDF/F-S.

    Fetches hourly raw variables via ClimateData for the 8760 profile
    assembly, then derives ALL daily statistics from the hourly data
    in local time.  This matches the original TMY code's approach and
    avoids two problems with fetching daily catalog variables directly:

    1. **UTC vs local time**: Catalog daily variables are pre-aggregated
       over UTC day boundaries, which differ from local-time days by the
       station's UTC offset (e.g., 8 hours for California).
    2. **Non-determinism**: With a Dask distributed client active, lazy
       reductions (``.resample().sum()``) can produce slightly different
       floating-point results on each run due to non-deterministic task
       ordering.  Computing hourly data eagerly to numpy before
       resampling guarantees deterministic daily statistics.
    """
    print("Loading data from catalog via ClimateData.")

    def _fetch_and_clean(variable_id, table_id="1hr"):
        """Fetch a single variable and clean coordinate cruft."""
        self._vprint(f"  Fetching {variable_id} ({table_id})...")
        da = self._fetch_raw_variable(variable_id, table_id=table_id)
        return da.squeeze().drop_vars(
            [
                "lakemask",
                "landmask",
                "x",
                "y",
                "Lambert_Conformal",
                "centered_year",
            ],
            errors="ignore",
        )

    # --- Hourly variables (needed for 8760 profile assembly) ---
    # First hourly fetch must be synchronous for warming level (sets year range)
    hourly_var_ids = list(self._raw_vars.keys())
    if self.warming_level is not UNSET:
        self._vprint(f"  Getting {hourly_var_ids[0]} (sets year range)...")
        # Fetch raw first to capture centered_year before cleaning drops it
        raw_first = self._fetch_raw_variable(hourly_var_ids[0], table_id="1hr")
        if "centered_year" in raw_first.coords:
            self._sim_centered_years = dict(
                zip(
                    raw_first.simulation.values,
                    raw_first.centered_year.values.ravel(),
                )
            )
        first_var = raw_first.squeeze().drop_vars(
            [
                "lakemask",
                "landmask",
                "x",
                "y",
                "Lambert_Conformal",
                "centered_year",
            ],
            errors="ignore",
        )
        first_var.name = self._raw_vars[hourly_var_ids[0]]
        remaining_hourly = hourly_var_ids[1:]
    else:
        first_var = None
        remaining_hourly = hourly_var_ids

    # --- Daily variables (needed for CDF/F-S analysis) ---
    # Derive daily stats from hourly data in local time.
    # This matches the original code's behavior (hourly → local time →
    # daily resample) and avoids using catalog daily variables which are
    # pre-aggregated over UTC day boundaries (different 24-hour window).
    # We also compute hourly data eagerly here so all downstream
    # operations use deterministic numpy math, avoiding non-deterministic
    # floating-point reduction ordering from the dask distributed scheduler.

    # Fetch remaining hourly variables in parallel
    self._vprint("  Loading hourly variables in parallel...")

    def _fetch_hourly(vid):
        da = _fetch_and_clean(vid, "1hr")
        da.name = self._raw_vars[vid]
        return ("hourly", vid, da)

    with ThreadPoolExecutor(max_workers=4) as executor:
        hourly_futures = [
            executor.submit(_fetch_hourly, vid) for vid in remaining_hourly
        ]
        all_results = [f.result() for f in hourly_futures]

    # Separate hourly results
    hourly_list = [r[2] for r in all_results]
    if first_var is not None:
        hourly_list = [first_var] + hourly_list

    # --- Build hourly dataset for 8760 profile ---
    self._vprint("  Merging raw hourly data.")
    raw_ds = xr.merge(hourly_list)

    # Compute hourly derived variables
    self._vprint("  Computing hourly derived variables.")
    t2_degc = raw_ds["Air Temperature at 2m"] - 273.15
    t2_degc.attrs["units"] = "degC"

    q2_gkg = raw_ds["Water Vapor Mixing Ratio at 2m"] * 1000
    q2_gkg.attrs["units"] = "g kg-1"

    psfc_hpa = raw_ds["Surface Pressure"] / 100.0
    psfc_hpa.attrs["units"] = "hPa"

    rh = compute_relative_humidity(
        pressure=psfc_hpa,
        temperature=t2_degc,
        mixing_ratio=q2_gkg,
    )
    rh.name = "Relative humidity"
    rh.attrs["units"] = "[0 to 100]"

    dew_point_k = compute_dewpointtemp(
        temperature=raw_ds["Air Temperature at 2m"],
        rel_hum=rh,
    )
    dew_point = dew_point_k - 273.15
    dew_point.name = "Dew point temperature"
    dew_point.attrs["units"] = "degC"

    wind_speed = compute_wind_mag(u10=raw_ds["u10"], v10=raw_ds["v10"])
    wind_speed.name = "Wind speed at 10m"

    wind_dir = compute_wind_dir(u10=raw_ds["u10"], v10=raw_ds["v10"])
    wind_dir.name = "Wind direction at 10m"

    t2_out = t2_degc.copy()
    t2_out.name = "Air Temperature at 2m"
    t2_out.attrs["units"] = "degC"

    q2_out = q2_gkg.copy()
    q2_out.name = "Water Vapor Mixing Ratio at 2m"

    derived_list = [t2_out, dew_point, rh, wind_speed, wind_dir, q2_out]
    keep_vars = [
        "Instantaneous downwelling shortwave flux at bottom",
        "Shortwave surface downward direct normal irradiance",
        "Shortwave surface downward diffuse irradiance",
        "Instantaneous downwelling longwave flux at bottom",
        "Surface Pressure",
    ]
    kept_from_raw = [raw_ds[v] for v in keep_vars]
    hourly_ds = xr.merge(derived_list + kept_from_raw)
    self._hourly_data = hourly_ds

    # --- Build daily dataset for CDF/F-S analysis ---
    # Compute hourly data eagerly so all daily resampling uses
    # deterministic numpy math (not affected by dask scheduler ordering).
    # For a single grid cell this is small (~30yr × 8760hr × 4sims).
    self._vprint("  Computing hourly data for daily resampling...")
    with ProgressBar():
        hourly_computed = hourly_ds.compute()

    self._vprint("  Resampling hourly data to daily statistics...")
    # Temperature: daily max, min, mean (in degC, already converted above)
    daily_tmax = hourly_computed["Air Temperature at 2m"].resample(time="1D").max()
    daily_tmax.name = "Daily max air temperature"
    daily_tmax.attrs["frequency"] = "daily"

    daily_tmin = hourly_computed["Air Temperature at 2m"].resample(time="1D").min()
    daily_tmin.name = "Daily min air temperature"
    daily_tmin.attrs["frequency"] = "daily"

    daily_tmean = (
        hourly_computed["Air Temperature at 2m"].resample(time="1D").mean()
    )
    daily_tmean.name = "Daily mean air temperature"
    daily_tmean.attrs["frequency"] = "daily"

    # Dew point: daily max, min, mean
    daily_dp_max = (
        hourly_computed["Dew point temperature"].resample(time="1D").max()
    )
    daily_dp_max.name = "Daily max dewpoint temperature"
    daily_dp_max.attrs["frequency"] = "daily"

    daily_dp_min = (
        hourly_computed["Dew point temperature"].resample(time="1D").min()
    )
    daily_dp_min.name = "Daily min dewpoint temperature"
    daily_dp_min.attrs["frequency"] = "daily"

    daily_dp_mean = (
        hourly_computed["Dew point temperature"].resample(time="1D").mean()
    )
    daily_dp_mean.name = "Daily mean dewpoint temperature"
    daily_dp_mean.attrs["frequency"] = "daily"

    # Wind speed: daily max, mean
    daily_ws_max = hourly_computed["Wind speed at 10m"].resample(time="1D").max()
    daily_ws_max.name = "Daily max wind speed"
    daily_ws_max.attrs["frequency"] = "daily"

    daily_ws_mean = hourly_computed["Wind speed at 10m"].resample(time="1D").mean()
    daily_ws_mean.name = "Daily mean wind speed"
    daily_ws_mean.attrs["frequency"] = "daily"

    # Radiation: daily sums
    ghi_sum = (
        hourly_computed["Instantaneous downwelling shortwave flux at bottom"]
        .resample(time="1D")
        .sum()
    )
    ghi_sum.name = "Global horizontal irradiance"
    ghi_sum.attrs["frequency"] = "daily"

    dni_sum = (
        hourly_computed["Shortwave surface downward direct normal irradiance"]
        .resample(time="1D")
        .sum()
    )
    dni_sum.name = "Direct normal irradiance"
    dni_sum.attrs["frequency"] = "daily"

    daily_arrays = [
        daily_tmax,
        daily_tmin,
        daily_tmean,
        daily_dp_max,
        daily_dp_min,
        daily_dp_mean,
        daily_ws_max,
        daily_ws_mean,
        ghi_sum,
        dni_sum,
    ]

    self.all_vars = xr.merge(daily_arrays)
    self._vprint("  Daily statistics ready.")

set_cdf_climatology()

Calculate the long-term climatology for each index for each month so we can establish the baseline pattern.

Source code in climakitae/explore/typical_meteorological_year.py
def set_cdf_climatology(self):
    """Calculate the long-term climatology for each index for each month so
    we can establish the baseline pattern.
    """
    if self.all_vars is UNSET:
        self.load_all_variables()
    self._vprint("Calculating CDF climatology.")
    self.cdf_climatology = get_cdf(self.all_vars)

set_cdf_monthly()

Get CDF for each month and variable.

Source code in climakitae/explore/typical_meteorological_year.py
def set_cdf_monthly(self):
    """Get CDF for each month and variable."""
    if self.all_vars is UNSET:
        self.load_all_variables()
    self._vprint("Calculating monthly CDF.")
    self.cdf_monthly = get_cdf_monthly(self.all_vars)
    # Remove the years for the Pinatubo eruption
    self.cdf_monthly = remove_pinatubo_years(self.cdf_monthly)

set_weighted_statistic()

Calculate the weighted F-S statistic.

Source code in climakitae/explore/typical_meteorological_year.py
def set_weighted_statistic(self):
    """Calculate the weighted F-S statistic."""
    if self.cdf_climatology is UNSET:
        self.set_cdf_climatology()
    if self.cdf_monthly is UNSET:
        self.set_cdf_monthly()
    self._vprint("Calculating weighted F-S statistic.")
    self.weighted_fs_sum = compute_weighted_fs_sum(
        self.cdf_climatology, self.cdf_monthly
    )

set_top_months()

Calculate top months dataframe.

Source code in climakitae/explore/typical_meteorological_year.py
def set_top_months(self):
    """Calculate top months dataframe."""
    # Pass the weighted F-S sum data for simplicity
    if self.weighted_fs_sum is UNSET:
        self.set_weighted_statistic()
    self._vprint("Finding top months (lowest F-S statistic)")
    self.top_months = get_top_months(
        self.weighted_fs_sum, skip_last=self._skip_last
    )

show_tmy_data_to_export(simulation)

Show line plots of TMY data for single model.

Parameters:

Name Type Description Default
simulation str

Simulation to display.

required
Source code in climakitae/explore/typical_meteorological_year.py
def show_tmy_data_to_export(self, simulation: str):
    """Show line plots of TMY data for single model.

    Parameters
    ----------
    simulation: str
        Simulation to display.

    """
    if self.tmy_data_to_export is UNSET:
        print("No TMY data generated.")
        print("Please run TMY.generate_tmy() to create TMY data for viewing.")
        return

    # WVMR not in final TMY dataframe
    fig_y = list(self.vars_and_units.keys())
    fig_y.remove("Water Vapor Mixing Ratio at 2m")

    self.tmy_data_to_export[simulation].plot(
        x="time",
        y=fig_y,
        title=f"Typical Meteorological Year ({simulation})",
        subplots=True,
        figsize=(10, 8),
        legend=True,
    )

run_tmy_analysis()

Generate typical meteorological year data.

Output will be a list of dataframes per simulation. Print statements throughout the function indicate progress.

Notes

Results are saved to the class variable tmy_data_to_export.

Source code in climakitae/explore/typical_meteorological_year.py
def run_tmy_analysis(self):
    """Generate typical meteorological year data.

    Output will be a list of dataframes per simulation.
    Print statements throughout the function indicate progress.

    Notes
    -----
    Results are saved to the class variable `tmy_data_to_export`.
    """
    print("Assembling TMY data to export.")

    self._vprint("  STEP 1: Computing hourly data")

    # Use cached hourly data instead of re-downloading
    if not hasattr(self, "_hourly_data") or self._hourly_data is None:
        # Fallback: load from catalog if run_tmy_analysis called standalone
        self.load_all_variables()
    with ProgressBar():
        all_vars_ds = self._hourly_data.compute()

    # Construct TMY
    self._vprint(
        "\n  STEP 2: Calculating Typical Meteorological Year per model simulation\n  Progress bar shows code looping through each month in the year.\n"
    )

    tmy_data_to_export = self._make_8760_tables(
        all_vars_ds, self.top_months
    )  # Return dict of TMY by simulation

    self._vprint("  Smoothing data at transitions between months.")
    self._vprint("  Dropping water vapor mixing ratio.")
    # Smooth transition hours
    for sim in tmy_data_to_export:
        tmy_data_to_export[sim] = tmy_data_to_export[sim].reset_index()
        tmy_data_to_export[sim] = self._smooth_month_transition_hours(
            tmy_data_to_export[sim]
        )

        # Mixing ratio was only needed for smoothing relative humidity,
        # so it can be dropped now.
        tmy_data_to_export[sim] = tmy_data_to_export[sim].drop(
            columns="Water Vapor Mixing Ratio at 2m"
        )

        # Add metadata columns needed by EPW header writer.
        # The new-core pipeline squeezes these dimensions, so they must be
        # re-attached before export.
        if self.warming_level is not UNSET:
            tmy_data_to_export[sim]["warming_level"] = self.warming_level
        else:
            tmy_data_to_export[sim]["scenario"] = "historical+ssp370"

    self.tmy_data_to_export = tmy_data_to_export
    self._vprint("TMY analysis complete.")

export_tmy_data(extension='epw')

Write TMY data to EPW file.

Parameters:

Name Type Description Default
extension str

Desired file extension ('tmy','epw', or 'csv')

'epw'
Source code in climakitae/explore/typical_meteorological_year.py
def export_tmy_data(self, extension: str = "epw"):
    """Write TMY data to EPW file.

    Parameters
    ----------
    extension: str
        Desired file extension ('tmy','epw', or 'csv')

    """
    print("Exporting TMY to file.")
    for sim, _ in self.tmy_data_to_export.items():
        # Get right year range
        if self.warming_level is UNSET:
            years = (self.start_year, self.end_year)
            clean_sim = sim
        else:
            centered_year = int(self._sim_centered_years[sim])
            year1 = centered_year - 15
            year2 = centered_year + 14
            years = (year1, year2)
            # Append warming level descriptor to simulation name.
            # Legacy sim names no longer contain "_historical+ssp370"
            # after the new-core migration, so we append directly.
            wl_label = match_str_to_wl(self.warming_level)
            clean_sim = f"{sim}_{wl_label}"
        # Attach centered_year so CSV export can include it
        if self.warming_level is not UNSET:
            self.tmy_data_to_export[sim]["centered_year"] = centered_year
        clean_stn_name = (
            self.stn_name.replace(" ", "_").replace("(", "").replace(")", "")
        )
        filename = f"TMY_{clean_stn_name}_{clean_sim}".lower()
        write_tmy_file(
            filename,
            self.tmy_data_to_export[sim],
            years,
            self.stn_name,
            self.stn_code,
            self.stn_lat,
            self.stn_lon,
            self.stn_state,
            file_ext=extension,
        )

get_candidate_months()

Run CDF functions to get top candidates.

This function can be used to view the candidate months without running the entire TMY workflow.

Source code in climakitae/explore/typical_meteorological_year.py
def get_candidate_months(self):
    """Run CDF functions to get top candidates.

    This function can be used to view the candidate months
    without running the entire TMY workflow.
    """
    self._vprint("Getting top months for TMY.")
    self.set_cdf_climatology()
    self.set_cdf_monthly()
    self.set_weighted_statistic()
    self.set_top_months()

generate_tmy()

Run the whole TMY workflow.

Source code in climakitae/explore/typical_meteorological_year.py
def generate_tmy(self):
    """Run the whole TMY workflow."""
    # This runs the whole workflow at once
    print("Running TMY workflow.")
    self.load_all_variables()
    self.get_candidate_months()
    self.run_tmy_analysis()
    self.export_tmy_data()

Climate Vulnerability Assessment

Bases: Parameterized

Climate Analysis and Vulnerability Assessment Parameters Class.

This class defines and validates parameters for climate vulnerability analysis, supporting various climate variables, metrics, and analysis approaches.

Attributes:

Name Type Description
input_locations DataFrame

Input coordinates that must have 'lat' and 'lon' columns with numeric data.

time_start_year int, default=1981

Start year for the analysis period. Must be between 1900 and 2100.

time_end_year int, default=2010

End year for the analysis period. Must be between 1900 and 2100.

units str, default="Celsius"

Units for temperature measurement.

variable str, default="Air Temperature at 2m"

Climate variable to analyze. Options include "Air Temperature at 2m", "Precipitation (total)", "NOAA Heat Index", "Effective Temperature".

metric_calc str, default="max"

Statistical metric calculation method. Options: "min", "max", "mean", "median".

percentile float or None, default=None

Percentile value for calculation. Must be between 0 and 100 if specified.

heat_idx_threshold float or None, default=None

Threshold value for heat index calculations.

one_in_x int, float, list or None, default=None

Return period(s) for extreme event analysis (e.g., 1-in-100 year event). Can be a single value or list of values.

season str, default="all"

Season to analyze. Options: "summer", "winter", "all".

downscaling_method str, default="Dynamical"

Climate model downscaling method. Options: "Dynamical", "Statistical".

approach str, default="Time"

Analysis approach. Options: "Time" (time period), "Warming Level" (temperature target).

warming_level float, default=1.0

Global warming level in degrees Celsius. Must be between 0 and 7.

wrf_bias_adjust bool, default=True

Whether to apply bias adjustment to WRF model data.

historical_data str, default="Historical Climate"

Type of historical data. Options: "Historical Climate", "Historical Reconstruction".

ssp_data list, default=["SSP 3-7.0"]

Shared Socioeconomic Pathway scenarios to use. Options: "SSP 2-4.5", "SSP 3-7.0", "SSP 5-8.5".

export_method str, default="both"

Data export method. Options: "raw", "calculate", "both", "None".

separate_files bool, default=False

Whether to save climate variables in separate files.

file_format str, default="NetCDF"

Output file format.

batch_mode bool, default=False

Whether to run in batch processing mode.

distr str, default="gev"

Statistical distribution for extreme value analysis. Options: "gev" (Generalized Extreme Value), "genpareto" (Generalized Pareto), "gamma".

duration tuple, default=UNSET

Length of extreme event, specified as (4, 'hour'). Only implemented for hourly data.

groupby tuple, default=(1, "day")

Group over which to look for max occurrence, specified as (1, 'day'). Only 'day' groupings supported.

grouped_duration tuple, default=UNSET

Length of event after grouping, specified as (5, 'day'). Requires groupby to be set.

file_name str

Base name for output files, no extension (e.g., "output", not "output.nc").

Methods:

Name Description
validate_params

Validate all parameters for consistency and compatibility.

get_names

Generate standardized names and metadata for data processing. If parameter validation fails. Common validation errors include: - Missing or non-numeric lat/lon columns in input_locations - Invalid time range (start year > end year) - Incompatible parameter combinations (e.g., multiple threshold parameters) - Unsupported variable-downscaling method combinations - Invalid approach-data type combinations

Notes

The class enforces several validation rules: - Only one threshold parameter (heat_idx_threshold, percentile, or one_in_x) can be specified - Historical Reconstruction data requires time-based approach and end year <= 2022 - NOAA Heat Index and Effective Temperature cannot use Statistical downscaling - Dynamical downscaling with time-based approach only supports SSP 3-7.0 - duration only supports "hour" units; groupby and grouped_duration only support "day" units

Examples:

>>> import pandas as pd
>>> locations = pd.DataFrame({'lat': [34.05, 36.17], 'lon': [-118.25, -115.14]})
>>> params = CavaParams(
...     input_locations=locations,
...     time_start_year=2020,
...     time_end_year=2050,
...     variable="Air Temperature at 2m",
...     percentile=95,
...     metric_calc="max"
... )
Source code in climakitae/explore/vulnerability.py
def __init__(self, **params):
    super().__init__(**params)
    self.validate_params()

validate_params()

Validate the parameters for vulnerability analysis.

Returns:

Type Description
None

Raises:

Type Description
ValueError

If any validation check fails. The error message lists all validation failures found during the validation process.

Notes

The method validates the following conditions: - Input locations DataFrame contains required 'lat' and 'lon' columns with numeric data types (float64 or int64) - Time range validity (start year must be <= end year) - Historical reconstruction data constraints: - End year must be <= 2022 - Only time-based approach is supported - Mutual exclusivity of threshold parameters (only one of heat_idx_threshold, percentile, or one_in_x can be specified) - At least one threshold parameter must be specified - Metric calculation ('min' or 'max') compatibility with percentile calculations - Variable and downscaling method compatibility (NOAA Heat Index and Effective Temperature cannot use Statistical downscaling) - SSP data constraints for WRF/Dynamical downscaling (only SSP 3-7.0 allowed for time-based approach)

Side Effects

Converts self.one_in_x to a list if it's not None and not already a list.

Source code in climakitae/explore/vulnerability.py
def validate_params(self):
    """
    Validate the parameters for vulnerability analysis.

    Returns
    -------
    None

    Raises
    ------
    ValueError
        If any validation check fails. The error message lists all validation
        failures found during the validation process.

    Notes
    -----
    The method validates the following conditions:
    - Input locations DataFrame contains required 'lat' and 'lon' columns with
      numeric data types (float64 or int64)
    - Time range validity (start year must be <= end year)
    - Historical reconstruction data constraints:
        - End year must be <= 2022
        - Only time-based approach is supported
    - Mutual exclusivity of threshold parameters (only one of heat_idx_threshold,
      percentile, or one_in_x can be specified)
    - At least one threshold parameter must be specified
    - Metric calculation ('min' or 'max') compatibility with percentile calculations
    - Variable and downscaling method compatibility (NOAA Heat Index and Effective
      Temperature cannot use Statistical downscaling)
    - SSP data constraints for WRF/Dynamical downscaling (only SSP 3-7.0 allowed
      for time-based approach)

    Side Effects
    ------------
    Converts self.one_in_x to a list if it's not None and not already a list.
    """

    errors = []

    # Change `one_in_x` type before param validation and arg generation
    one_in_x = self.one_in_x
    if one_in_x is not None:
        if not isinstance(one_in_x, list):
            self.one_in_x = [one_in_x]
        # Keep as list, don't convert to numpy array here

    # Check if input_locations is a DataFrame with lat/lon columns
    if isinstance(self.input_locations, pd.DataFrame):
        if (
            "lat" not in self.input_locations.columns
            or "lon" not in self.input_locations.columns
        ):
            errors.append("Input coordinates must have `lat` and `lon` columns.")
        elif not pd.api.types.is_numeric_dtype(
            self.input_locations["lat"]
        ) or not pd.api.types.is_numeric_dtype(self.input_locations["lon"]):
            errors.append("Input lat/lon columns must be float64 or int64 types.")

    # Validating that time start year < time end year
    if self.time_start_year > self.time_end_year:
        errors.append(
            "Start year must come before, or be the same year as, the end year."
        )

    # Validate year range for historical reconstruction
    if (
        self.historical_data == "Historical Reconstruction"
        and self.time_end_year > 2022
    ):
        errors.append(
            "End year for Historical Reconstruction data must be 2022 or earlier."
        )

    # Validate time approach with Historical Reconstruction
    if (
        self.historical_data == "Historical Reconstruction"
        and self.approach != "Time"
    ):
        errors.append(
            "Historical Reconstruction data can only be retrieved using a time-based approach."
        )

    # Validating that only one of heat_idx_threshold, percentile, or one_in_x is passed
    if (
        (self.heat_idx_threshold is not None and self.percentile is not None)
        or (self.heat_idx_threshold is not None and self.one_in_x is not None)
        or (self.percentile is not None and self.one_in_x is not None)
    ):
        errors.append(
            "Only one of heat_idx_threshold, one_in_x, and percentile can be non-None."
        )

    # Check that not all 3 customizable metric arguments are none.
    if (
        self.heat_idx_threshold is None
        and self.percentile is None
        and self.one_in_x is None
    ):
        errors.append(
            "`heat_idx_threshold`, `percentile`, and `one_in_x` cannot all be None."
        )

    # Make sure the metric is 'min' or 'max' if percentile is passed in
    if self.percentile is not None and self.metric_calc not in ["min", "max"]:
        errors.append(
            "Metric calculation must be 'min' or 'max' for percentile calculations."
        )

    # Validating variable and downscaling method combo
    if (
        self.variable in ["NOAA Heat Index", "Effective Temperature"]
        and self.downscaling_method == "Statistical"
    ):
        errors.append(
            f"`{self.variable}` cannot be used with Statistical downscaling method."
        )

    # Only allow for SSP 3-7.0 for WRF data
    if self.downscaling_method == "Dynamical" and self.approach == "Time":
        if len(self.ssp_data) > 1 or self.ssp_data[0] != "SSP 3-7.0":
            errors.append(
                "Can only use SSP 3-7.0 data for a time-based approach using WRF data."
            )

    # Validate duration, groupby, and grouped_duration params
    if self.duration is not UNSET:
        if (
            not isinstance(self.duration, tuple)
            or len(self.duration) != 2
            or self.duration[1] != "hour"
        ):
            errors.append(
                "`duration` must be a tuple with unit 'hour', e.g. (4, 'hour')."
            )

    if self.groupby is not UNSET:
        if (
            not isinstance(self.groupby, tuple)
            or len(self.groupby) != 2
            or self.groupby[1] != "day"
        ):
            errors.append(
                "`groupby` must be a tuple with unit 'day', e.g. (1, 'day')."
            )

    if self.grouped_duration is not UNSET:
        if self.groupby is UNSET:
            errors.append("To use `grouped_duration`, `groupby` must also be set.")
        if (
            not isinstance(self.grouped_duration, tuple)
            or len(self.grouped_duration) != 2
            or self.grouped_duration[1] != "day"
        ):
            errors.append(
                "`grouped_duration` must be a tuple with unit 'day', e.g. (5, 'day')."
            )

    # Raise errors if any
    if errors:
        raise ValueError("Parameter validation errors:\n\n" + "\n".join(errors))

get_names()

Generate names and metadata for climate data processing.

Returns:

Type Description
dict

A dictionary containing the following keys: ssp_selected : str or list The SSP (Shared Socioeconomic Pathway) data selected for analysis. variable : str The climate variable being analyzed, potentially adjusted based on downscaling method and metric calculation. variable_type : str Type of variable, either "Variable" for raw climate variables or "Derived Index" for calculated indices. var_name : str Human-readable description of the calculation being performed, including variable, metric, time period/warming level, and season. raw_name : str Standardized name for raw data storage. calc_name : str Standardized name for calculated data storage.

Notes

The function handles three main calculation types based on instance attributes: - Percentile calculations (when self.percentile is not None) - Heat index threshold calculations (when self.heat_idx_threshold is not None) - Return period calculations (when self.one_in_x is not None) For LOCA2 statistical downscaling with air temperature, the variable name is adjusted based on whether maximum or minimum metric calculation is selected. The approach can be either "Time" (using start/end years) or "Warming Level" (using a specific warming level in degrees Celsius).

Raises:

Type Description
ValueError

If metric_calc is not "max" or "min" for LOCA2 air temperature data.

ValueError

If approach is not "Time" or "Warming Level".

ValueError

If an unsupported variable is used for 1-in-X calculations.

Source code in climakitae/explore/vulnerability.py
def get_names(self):
    """
    Generate names and metadata for climate data processing.

    Returns
    -------
    dict
        A dictionary containing the following keys:
        ssp_selected : str or list
            The SSP (Shared Socioeconomic Pathway) data selected for analysis.
        variable : str
            The climate variable being analyzed, potentially adjusted based on
            downscaling method and metric calculation.
        variable_type : str
            Type of variable, either "Variable" for raw climate variables or
            "Derived Index" for calculated indices.
        var_name : str
            Human-readable description of the calculation being performed,
            including variable, metric, time period/warming level, and season.
        raw_name : str
            Standardized name for raw data storage.
        calc_name : str
            Standardized name for calculated data storage.

    Notes
    -----
    The function handles three main calculation types based on instance attributes:
    - Percentile calculations (when self.percentile is not None)
    - Heat index threshold calculations (when self.heat_idx_threshold is not None)
    - Return period calculations (when self.one_in_x is not None)
    For LOCA2 statistical downscaling with air temperature, the variable name is
    adjusted based on whether maximum or minimum metric calculation is selected.
    The approach can be either "Time" (using start/end years) or "Warming Level"
    (using a specific warming level in degrees Celsius).

    Raises
    ------
    ValueError
        If metric_calc is not "max" or "min" for LOCA2 air temperature data.
    ValueError
        If approach is not "Time" or "Warming Level".
    ValueError
        If an unsupported variable is used for 1-in-X calculations.
    """

    # Re-assigning `variable` in case it needs to be re-instantiated later.
    variable = self.variable

    # Determine variable type
    if variable in ["Air Temperature at 2m", "Precipitation (total)"]:
        variable_type = "Variable"
    else:
        variable_type = "Derived Index"

    # Adjust variable name for LOCA2 data
    if (
        variable == "Air Temperature at 2m"
        and self.downscaling_method == "Statistical"
    ):
        match self.metric_calc:
            case "max":
                variable = "Maximum air temperature at 2m"
            case "min":
                variable = "Minimum air temperature at 2m"
            case _:
                raise ValueError('metric_calc needs to be either "max" or "min"')

    # Reducing potential decimal places for warming level
    self.warming_level = round(self.warming_level, 3)

    # Adding time period or warming level to filename
    match self.approach:
        case "Time":
            approach_str = f"{self.time_start_year}_to_{self.time_end_year}"
            approach_var_name = f"from {approach_str}"
        case "Warming Level":
            if float(self.warming_level).is_integer():
                wl_str = str(int(self.warming_level))
            else:
                wl_str = str(self.warming_level).replace(".", "pt")

            approach_str = f"{wl_str}degreeWL"
            approach_var_name = f"for Warming Level {self.warming_level}°C"
        case _:
            raise ValueError(
                'approach needs to be either "Time" or "Warming Level"'
            )
    # Retrieving the name of the calculation about to occur
    if self.percentile is not None:

        def ordinal(n):
            """Find ordinal name for a number (1 -> 1st, 2 -> 2nd, 3 -> 3rd)"""
            return f"{n}{('th' if 11 <= n % 100 <= 13 else {1: 'st', 2: 'nd', 3: 'rd'}.get(n % 10, 'th'))}"

        var_name = f"{ordinal(self.percentile)} percentile of Daily {self.metric_calc.capitalize()} of {variable} for {self.season} seasons {approach_var_name}"
        raw_name = "likely_seasonal_temperature_raw_data"
        calc_name = (
            f"likely_seasonal_{self.season}_{self.metric_calc}_{approach_str}"
        )

    elif self.heat_idx_threshold is not None:
        var_name = f"Days per year with {self.metric_calc.capitalize()} Daily {variable} above a heat index threshold of {self.heat_idx_threshold} {self.units} for {self.season} seasons {approach_var_name}"
        raw_name = "heat_index_raw_data"
        calc_name = f"heat_index_thresh{self.heat_idx_threshold}_{approach_str}"

    elif self.one_in_x is not None:

        event_parts = []
        if self.groupby is not UNSET:
            event_parts.append(f"groupby={self.groupby}")
        if self.duration is not UNSET:
            event_parts.append(f"duration={self.duration}")
        if self.grouped_duration is not UNSET:
            event_parts.append(f"grouped_duration={self.grouped_duration}")
        event_str = f" {', '.join(event_parts)}," if event_parts else ""

        x_vals = ", ".join([f"1-in-{x}" for x in self.one_in_x])
        var_name = f"{x_vals} year,{event_str} {self.metric_calc.capitalize()} {variable} for {self.season} seasons {approach_var_name}"

        # Making names for raw and calculated data
        x_vals_str = "-".join(map(str, self.one_in_x))

        match variable:
            case "Precipitation (total)":
                raw_name = f"one_in_{x_vals_str}_precipitation_raw_data"
                calc_name = f"one_in_{x_vals_str}_precipitation_{approach_str}"
            case (
                "Air Temperature at 2m"
                | "Maximum air temperature at 2m"
                | "Minimum air temperature at 2m"
            ):
                raw_name = f"one_in_{x_vals_str}_temperature_raw_data"
                calc_name = f"one_in_{x_vals_str}_temperature_{approach_str}"
            case "NOAA Heat Index" | "Effective Temperature":
                raw_name = f"one_in_{x_vals_str}_derived_index_raw_data"
                calc_name = f"one_in_{x_vals_str}_derived_index_{approach_str}"
            case _:
                raise ValueError(
                    f'variable "{variable}" is not supported for 1-in-X calculations'
                )

    return {
        "ssp_selected": self.ssp_data,
        "variable": variable,
        "variable_type": variable_type,
        "var_name": var_name,
        "raw_name": raw_name,
        "calc_name": calc_name,
    }

Threshold Tools

Helper functions for performing analyses related to thresholds

calculate_ess(data, nlags=UNSET)

Function for calculating the effective sample size (ESS) of the provided data using the autocorrelation of the data.

Parameters:

Name Type Description Default
data DataArray

Input array is assumed to be timeseries data with potential autocorrelation.

required
nlags int

Number of lags to use in the autocorrelation function, defaults to the length of the timeseries.

UNSET

Returns:

Type Description
DataArray

Effective sample size. Returned as a DataArray object so it can be utilized by xr.groupby and xr.resample.

References

Zwiers, F. W., and H. von Storch, 1995: Taking Serial Correlation into Account in Tests of the Mean. J. Climate, 8, 336–351, https://doi.org/10.1175/1520-0442(1995)008<0336:TSCIAI>2.0.CO;2.

Source code in climakitae/explore/threshold_tools.py
def calculate_ess(data: xr.DataArray, nlags: int = UNSET) -> xr.DataArray:
    """Function for calculating the effective sample size (ESS) of the provided data
    using the autocorrelation of the data.

    Parameters
    ----------
    data : xr.DataArray
        Input array is assumed to be timeseries data with potential autocorrelation.
    nlags : int, optional
        Number of lags to use in the autocorrelation function, defaults to the length of
        the timeseries.

    Returns
    -------
    xr.DataArray
        Effective sample size.
        Returned as a DataArray object so it can be utilized by xr.groupby and xr.resample.

    References
    ----------
    Zwiers, F. W., and H. von Storch, 1995: Taking Serial Correlation into Account in Tests of the Mean. J. Climate, 8, 336–351, https://doi.org/10.1175/1520-0442(1995)008<0336:TSCIAI>2.0.CO;2.

    """
    n = len(data)
    if nlags is UNSET:
        nlags = n
    acf = sm.tsa.stattools.acf(data, nlags=nlags, fft=True)
    sums = 0
    for k in range(1, len(acf)):
        sums = sums + (n - k) * acf[k] / n
    ess = n / (1 + 2 * sums)
    return xr.DataArray(ess, name="ess")

get_block_maxima(da_series, extremes_type='max', duration=UNSET, groupby=UNSET, grouped_duration=UNSET, check_ess=True, block_size=1, rolling_agg='sustained')

Convert a timeseries into block extrema (maxima or minima), defaulting to annual block size.

Resamples the input array by taking the most extreme value in each block. Optional arguments duration, groupby, and grouped_duration define the type of compound event to extract extrema from, corresponding to the event types used in get_exceedance_count.

Parameters:

Name Type Description Default
da_series DataArray

Input timeseries from which block extrema are extracted.

required
extremes_type (max, min)

Whether to extract block maxima or minima. Default is "max".

"max"
duration tuple[int, str]

Length of the extreme event, specified as (n, unit) where unit is "hour". For example, (4, "hour") finds events lasting at least 4 hours. Only supported for hourly data.

UNSET
groupby tuple[int, str]

Temporal grouping applied before computing block extrema, specified as (n, unit) where unit is "day". For example, (1, "day") aggregates to daily values. Most meaningful when combined with grouped_duration.

UNSET
grouped_duration tuple[int, str]

Rolling window applied after groupby, specified as (n, unit) where unit is "day". For example, (5, "day") finds events spanning at least 5 consecutive grouped periods. Requires groupby.

UNSET
check_ess bool

If True, computes the effective sample size (ESS) within each block and emits a warning if the average ESS is below 25. Set to False to suppress this check. Default is True.

True
block_size int

Block size in years over which to take the extremum. Default is 1.

1
rolling_agg (sustained, cumulative, average)

Aggregation method applied during rolling windows (duration, grouped_duration) and groupby resamples (groupby): - "sustained": the extreme value that is maintained throughout the entire window (rolling min for max events, rolling max for min events). Use this when the event intensity must hold for the full duration. Example: the minimum temperature floor of a 21-day heatwave. - "cumulative": the total accumulated value over the window (rolling sum). Use this for events defined by an accumulated total. Example: total precipitation of a 3-day heavy rainfall event. - "average": the mean value over the window (rolling mean). Use this for events characterized by average intensity. Example: average precipitation rate of a short high-intensity burst. Default is "sustained".

"sustained"

Returns:

Type Description
DataArray

Block extrema series with the same non-time dimensions as da_series. The time coordinate reflects the end of each block (annual or block_size-year periods). Metadata describing the event configuration is stored in .attrs.

Raises:

Type Description
ValueError

If rolling_agg is not one of "sustained", "cumulative", or "average".

ValueError

If extremes_type is not "max" or "min".

ValueError

If duration is provided but the data frequency is not hourly or the unit is not "hour".

ValueError

If groupby is provided but the unit is not "day".

ValueError

If grouped_duration is provided without groupby, or the unit is not "day".

ValueError

If all block values are NaN (i.e. the input contains no valid observations for this variable).

Source code in climakitae/explore/threshold_tools.py
def get_block_maxima(
    da_series: xr.DataArray,
    extremes_type: str = "max",
    duration: tuple[int, str] = UNSET,
    groupby: tuple[int, str] = UNSET,
    grouped_duration: tuple[int, str] = UNSET,
    check_ess: bool = True,
    block_size: int = 1,
    rolling_agg: str = "sustained",
) -> xr.DataArray:
    """Convert a timeseries into block extrema (maxima or minima), defaulting to annual block size.

    Resamples the input array by taking the most extreme value in each block.
    Optional arguments ``duration``, ``groupby``, and ``grouped_duration`` define the
    type of compound event to extract extrema from, corresponding to the event types
    used in ``get_exceedance_count``.

    Parameters
    ----------
    da_series : xr.DataArray
        Input timeseries from which block extrema are extracted.
    extremes_type : {"max", "min"}, optional
        Whether to extract block maxima or minima. Default is ``"max"``.
    duration : tuple[int, str], optional
        Length of the extreme event, specified as ``(n, unit)`` where unit is
        ``"hour"``. For example, ``(4, "hour")`` finds events lasting at least
        4 hours. Only supported for hourly data.
    groupby : tuple[int, str], optional
        Temporal grouping applied before computing block extrema, specified as
        ``(n, unit)`` where unit is ``"day"``. For example, ``(1, "day")``
        aggregates to daily values. Most meaningful when combined with
        ``grouped_duration``.
    grouped_duration : tuple[int, str], optional
        Rolling window applied after ``groupby``, specified as ``(n, unit)``
        where unit is ``"day"``. For example, ``(5, "day")`` finds events
        spanning at least 5 consecutive grouped periods. Requires ``groupby``.
    check_ess : bool, optional
        If ``True``, computes the effective sample size (ESS) within each block
        and emits a warning if the average ESS is below 25. Set to ``False`` to
        suppress this check. Default is ``True``.
    block_size : int, optional
        Block size in years over which to take the extremum. Default is ``1``.
    rolling_agg : {"sustained", "cumulative", "average"}, optional
        Aggregation method applied during rolling windows (``duration``,
        ``grouped_duration``) and groupby resamples (``groupby``):
            - ``"sustained"``: the extreme value that is maintained throughout the
              entire window (rolling min for max events, rolling max for min events).
              Use this when the event intensity must hold for the full duration.
              *Example*: the minimum temperature floor of a 21-day heatwave.
            - ``"cumulative"``: the total accumulated value over the window (rolling
              sum). Use this for events defined by an accumulated total.
              *Example*: total precipitation of a 3-day heavy rainfall event.
            - ``"average"``: the mean value over the window (rolling mean). Use this
              for events characterized by average intensity.
              *Example*: average precipitation rate of a short high-intensity burst.
        Default is ``"sustained"``.

    Returns
    -------
    xr.DataArray
        Block extrema series with the same non-time dimensions as ``da_series``.
        The time coordinate reflects the end of each block (annual or
        ``block_size``-year periods). Metadata describing the event configuration
        is stored in ``.attrs``.

    Raises
    ------
    ValueError
        If ``rolling_agg`` is not one of ``"sustained"``, ``"cumulative"``, or
        ``"average"``.
    ValueError
        If ``extremes_type`` is not ``"max"`` or ``"min"``.
    ValueError
        If ``duration`` is provided but the data frequency is not hourly or the
        unit is not ``"hour"``.
    ValueError
        If ``groupby`` is provided but the unit is not ``"day"``.
    ValueError
        If ``grouped_duration`` is provided without ``groupby``, or the unit is
        not ``"day"``.
    ValueError
        If all block values are NaN (i.e. the input contains no valid
        observations for this variable).
    """
    valid_rolling_aggs = ["sustained", "cumulative", "average"]
    if rolling_agg not in valid_rolling_aggs:
        raise ValueError(
            f"invalid rolling_agg. expected one of the following: {valid_rolling_aggs}"
        )

    extremes_types = ["max", "min"]  # valid user options
    if extremes_type not in extremes_types:
        raise ValueError(
            "invalid extremes type. expected one of the following: %s" % extremes_types
        )

    if duration is not UNSET:
        # In this case, user is interested in extreme events lasting at least
        # as long as the length of `duration`.
        dur_len, dur_type = duration
        if dur_type != "hour" or da_series.frequency not in ["1hr", "hourly"]:
            raise ValueError(
                "Current specifications not implemented. `duration` options only implemented for `hour` frequency."
            )

        # First identify the min (max) value for each window of length `duration`
        if rolling_agg == "sustained":
            match extremes_type:
                case "max":
                    # In the case of "max" events, need to first identify the minimum value
                    # in each window of the specified duration
                    da_series = da_series.rolling(time=dur_len, center=False).min(
                        "time"
                    )
                case "min":
                    da_series = da_series.rolling(time=dur_len, center=False).max(
                        "time"
                    )
                case _:
                    raise ValueError('extremes_type needs to be either "max" or "min"')
        elif rolling_agg == "cumulative":
            da_series = da_series.rolling(time=dur_len, center=False).sum("time")
        elif rolling_agg == "average":
            da_series = da_series.rolling(time=dur_len, center=False).mean("time")

    if groupby is not UNSET:
        # In this case, select the max (min) in each group. (This option is
        # really only meaningful when coupled with the `grouped_duration` option.)
        group_len, group_type = groupby
        if group_type != "day":
            raise ValueError(
                "`groupby` specifications only implemented for 'day' groupings."
            )

        # select the max (min) in each group
        if rolling_agg == "sustained":
            match extremes_type:
                case "max":
                    da_series = da_series.resample(
                        time=f"{group_len}D", label="left"
                    ).max()
                case "min":
                    da_series = da_series.resample(
                        time=f"{group_len}D", label="left"
                    ).min()
                case _:
                    raise ValueError('extremes_type needs to be either "max" or "min"')
        elif rolling_agg == "cumulative":
            da_series = da_series.resample(time=f"{group_len}D", label="left").sum()
        elif rolling_agg == "average":
            da_series = da_series.resample(time=f"{group_len}D", label="left").mean()

    if grouped_duration is not UNSET:
        if groupby is UNSET:
            raise ValueError(
                "To use `grouped_duration` option, must first use groupby."
            )
        # In this case, identify the min (max) value of the grouped values for
        # each window of length `grouped_duration``. Must be in `days`.
        dur2_len, dur2_type = grouped_duration
        if dur2_type != "day":
            raise ValueError(
                "`grouped_duration` specification must be in days. example: `grouped_duration = (3, 'day')`."
            )

        # Rechunk time dimension if dask-backed to ensure chunks are large enough
        # for the rolling window. After groupby resampling, chunks may be smaller
        # than the rolling window size, causing a ValueError.
        if hasattr(da_series.data, "chunks"):
            da_series = da_series.chunk(time=-1)

        # Now select the min (max) from the duration period
        if rolling_agg == "sustained":
            match extremes_type:
                case "max":
                    da_series = da_series.rolling(time=dur2_len, center=False).min(
                        "time"
                    )
                case "min":
                    da_series = da_series.rolling(time=dur2_len, center=False).max(
                        "time"
                    )
                case _:
                    raise ValueError('extremes_type needs to be either "max" or "min"')
        elif rolling_agg == "cumulative":
            da_series = da_series.rolling(time=dur2_len, center=False).sum("time")
        elif rolling_agg == "average":
            da_series = da_series.rolling(time=dur2_len, center=False).mean("time")

    # Now select the most extreme value for each block in the series
    match extremes_type:
        case "max":
            bms = da_series.resample(time=f"{block_size}YE").max(keep_attrs=True)
            bms.attrs["extremes type"] = "maxima"
        case "min":
            bms = da_series.resample(time=f"{block_size}YE").min(keep_attrs=True)
            bms.attrs["extremes type"] = "minima"
        case _:
            raise ValueError('extremes_type needs to be either "max" or "min"')

    # Calculate the effective sample size of the computed event type in all blocks
    # Check the average value to ensure that it's above threshold ESS
    if check_ess:
        average_ess = -1
        if "x" in da_series.dims and "y" in da_series.dims:
            # Case for data with spatial dimensions (gridded)
            average_ess = _calc_average_ess_gridded_data(da_series, block_size)

        elif da_series.dims == ("time",):
            # Case for timeseries data (no spatial dimensions)
            average_ess = _calc_average_ess_timeseries_data(da_series, block_size)

        else:
            print(
                f"WARNING: the effective sample size can only be checked for timeseries or spatial data. You provided data with the following dimensions: {da_series.dims}."
            )

        if average_ess < 25 and average_ess > 0:
            print(
                f"WARNING: The average effective sample size in your data is {round(average_ess, 2)} per block, which is lower than a standard target of around 25. This may result in biased estimates of extreme value distributions when calculating return values, periods, and probabilities from this data. Consider using a longer block size to increase the effective sample size in each block of data."
            )

    # Common attributes
    bms = bms.assign_attrs(
        {
            "duration": duration,
            "groupby": groupby,
            "grouped_duration": grouped_duration,
            "rolling_agg": rolling_agg,
            "extreme_value_extraction_method": "block maxima",
            "block_size": f"{block_size} year",
            "timeseries_type": f"block {extremes_type} series",
        }
    )

    # Checking for null values within the blocks. This primarily occurs when values are filtered out before this function.
    # A good example of this is when trying to find 1-in-X events with precipitation. Because we filter out small noise of precip events,
    # (i.e. 10e-10), this results in a DataArray being passed into this function that has many fewer observed counts than the actual retrieve
    # climate data has. In some cases, there can even be years where no precipitation occurs. In these cases, we want to make sure to drop those
    # time blocks from the returned blocks below, hence dropping NaN values along the time dimension. This way, they do not break other functions
    # that rely on `get_block_maxima`, like `get_ks_stat` or `get_return_value`
    if bms.isnull().sum() > 0:

        # This checks if ALL of the values from `bms` are null, or if there are 0 events that occur (i.e. no precipitation counts within the DataArray).
        if bms.isnull().sum().item() == bms.size:
            raise ValueError(
                "ERROR: The given `da_series` does not include any recorded values for this variable, and we cannot create block maximums off of an empty DataArray. Please check your input data and filters to ensure that there are observed values for this variable in the given DataArray."
            )
        else:
            # Determine which non-time dims exist for detecting all-NaN time steps
            non_time_dims = [d for d in bms.dims if d != "time"]

            if non_time_dims:
                # Drop time steps where ALL values across non-time dims are NaN
                all_nan_times = bms.isnull().all(dim=non_time_dims)
            else:
                # Timeseries data (only time dim)
                all_nan_times = bms.isnull()

            n_dropped = int(all_nan_times.sum().item())
            if n_dropped > 0:
                bms = bms.where(~all_nan_times, drop=True)
                print(
                    f"Dropping {n_dropped} block maxima NaN time steps from"
                    f"{f' {bms.name}' if bms.name else ''} DataArray."
                )

    return bms

get_ks_stat(bms, distr='gev', multiple_points=True)

Function to perform kstest on input DataArray

Creates a dataset of ks test d-statistics and p-values from an inputed maximum series.

Parameters:

Name Type Description Default
bms DataArray

Block maximum series, can be output from the function get_block_maxima()

required
distr str

name of distribution to use

'gev'
multiple_points boolean

Whether or not the data contains multiple points (has x, y dimensions)

True

Returns:

Type Description
Dataset
Source code in climakitae/explore/threshold_tools.py
def get_ks_stat(
    bms: xr.DataArray, distr: str = "gev", multiple_points: bool = True
) -> xr.Dataset:
    """Function to perform kstest on input DataArray

    Creates a dataset of ks test d-statistics and p-values from an inputed
    maximum series.

    Parameters
    ----------
    bms : xarray.DataArray
        Block maximum series, can be output from the function get_block_maxima()
    distr : str
        name of distribution to use
    multiple_points : boolean
        Whether or not the data contains multiple points (has x, y dimensions)

    Returns
    -------
    xarray.Dataset

    """
    distr_func = _get_distr_func(distr)
    bms_attributes = bms.attrs

    if multiple_points:
        bms = (
            bms.stack(allpoints=["y", "x"])
            .dropna(dim="allpoints")
            .squeeze()
            .groupby("allpoints")
        )

    def ks_stat(bms):
        parameters, fitted_distr = _get_fitted_distr(bms, distr, distr_func)

        match distr:
            case "gev":
                cdf = "genextreme"
                args = (parameters["c"], parameters["loc"], parameters["scale"])
            case "gumbel":
                cdf = "gumbel_r"
                args = (parameters["loc"], parameters["scale"])
            case "weibull":
                cdf = "weibull_min"
                args = (parameters["c"], parameters["loc"], parameters["scale"])
            case "pearson3":
                cdf = "pearson3"
                args = (parameters["skew"], parameters["loc"], parameters["scale"])
            case "genpareto":
                cdf = "genpareto"
                args = (parameters["c"], parameters["loc"], parameters["scale"])
            case "gamma":
                cdf = "gamma"
                args = (parameters["a"], parameters["loc"], parameters["scale"])
            case _:
                raise ValueError(
                    'invalid distribution type. expected one of the following: ["gev", "gumbel", "weibull", "pearson3", "genpareto", "gamma"]'
                )

        try:
            ks = stats.kstest(bms, cdf, args=args)
            d_statistic = ks[0]
            p_value = ks[1]
        except (ValueError, ZeroDivisionError):
            d_statistic = np.nan
            p_value = np.nan

        return d_statistic, p_value

    d_statistic, p_value = xr.apply_ufunc(
        ks_stat,
        bms,
        input_core_dims=[["time"]],
        exclude_dims=set(("time",)),
        vectorize=True,
        output_core_dims=[[], []],
    )

    d_statistic = d_statistic.rename("d_statistic")
    new_ds = d_statistic.to_dataset()
    new_ds["p_value"] = p_value

    if multiple_points:
        new_ds = new_ds.unstack("allpoints")

    new_ds["d_statistic"].attrs["stat test"] = "KS test"
    new_ds["p_value"].attrs["stat test"] = "KS test"
    new_ds.attrs = bms_attributes
    new_ds.attrs["distribution"] = "{}".format(str(distr))
    new_ds["p_value"].attrs["units"] = UNSET
    new_ds["d_statistic"].attrs["units"] = UNSET
    return new_ds

get_return_value(bms, return_period=10, distr='gev', bootstrap_runs=100, conf_int_lower_bound=2.5, conf_int_upper_bound=97.5, multiple_points=True, extremes_type='max', dropna_time=False, dim_to_fit='time')

Creates xarray Dataset with return values and confidence intervals from maximum series.

Parameters:

Name Type Description Default
bms DataArray

Block maximum series, can be output from the function get_block_maxima()

required
return_period float

The recurrence interval (in years) for which to calculate the return value

10
distr str

The type of extreme value distribution to fit

'gev'
bootstrap_runs int

Number of bootstrap samples

100
conf_int_lower_bound float

Confidence interval lower bound

2.5
conf_int_upper_bound float

Confidence interval upper bound

97.5
multiple_points boolean

Whether or not the data contains multiple points (has x, y dimensions)

True
extremes_type str

Whether to compute max ('max') or min ('min') extremes, by default 'max'.

'max'
dropna_time bool

Whether to drop NaNs along the time axis

False
dim_to_fit str

Name of the dimension that the distribution is going to be fit on.

'time'

Returns:

Type Description
Dataset

Dataset with return values and confidence intervals

Notes

This function calls _get_return_variable, which will attempt to use the block_size attribute from the BMS to get the block size. If the block_size attribute is not found, an default of 1 will be used. The block size is used to get the annual probability of the desired event when the block size is > 1 year.

Source code in climakitae/explore/threshold_tools.py
def get_return_value(
    bms: xr.DataArray,
    return_period: float = 10,
    distr: str = "gev",
    bootstrap_runs: int = 100,
    conf_int_lower_bound: float = 2.5,
    conf_int_upper_bound: float = 97.5,
    multiple_points: bool = True,
    extremes_type: str = "max",
    dropna_time: bool = False,
    dim_to_fit: str = "time",
) -> xr.Dataset:
    """Creates xarray Dataset with return values and confidence intervals from maximum series.

    Parameters
    ----------
    bms : xarray.DataArray
        Block maximum series, can be output from the function get_block_maxima()
    return_period : float
        The recurrence interval (in years) for which to calculate the return value
    distr : str
        The type of extreme value distribution to fit
    bootstrap_runs : int
        Number of bootstrap samples
    conf_int_lower_bound : float
        Confidence interval lower bound
    conf_int_upper_bound : float
        Confidence interval upper bound
    multiple_points : boolean
        Whether or not the data contains multiple points (has x, y dimensions)
    extremes_type : str, optional
        Whether to compute max ('max') or min ('min') extremes, by default 'max'.
    dropna_time: boolean
        Whether to drop NaNs along the time axis
    dim_to_fit : str
        Name of the dimension that the distribution is going to be fit on.

    Returns
    -------
    xarray.Dataset
        Dataset with return values and confidence intervals

    Notes
    -----
    This function calls _get_return_variable, which will attempt to use the block_size
    attribute from the BMS to get the block size. If the block_size attribute is not found,
    an default of 1 will be used. The block size is used to get the annual probability of
    the desired event when the block size is > 1 year.
    """
    return _get_return_variable(
        bms,
        "return_value",
        return_period,
        distr,
        bootstrap_runs,
        conf_int_lower_bound,
        conf_int_upper_bound,
        multiple_points,
        extremes_type,
        dropna_time,
        dim_to_fit,
    )

get_return_prob(bms, threshold, distr='gev', bootstrap_runs=100, conf_int_lower_bound=2.5, conf_int_upper_bound=97.5, multiple_points=True, extremes_type='max', dropna_time=False, dim_to_fit='time')

Creates xarray Dataset with return probabilities and confidence intervals from maximum series.

Parameters:

Name Type Description Default
bms DataArray

Block maximum series, can be output from the function get_block_maxima()

required
threshold float

The threshold value for which to calculate the probability of exceedance

required
distr str

The type of extreme value distribution to fit

'gev'
bootstrap_runs int

Number of bootstrap samples

100
conf_int_lower_bound float

Confidence interval lower bound

2.5
conf_int_upper_bound float

Confidence interval upper bound

97.5
multiple_points boolean

Whether or not the data contains multiple points (has x, y dimensions)

True
extremes_type str

Whether to compute max ('max') or min ('min') extremes, by default 'max'.

'max'
dropna_time bool

Whether to drop NaNs along the time axis

False
dim_to_fit str

Name of the dimension that the distribution is going to be fit on.

'time'

Returns:

Type Description
Dataset

Dataset with return probabilities and confidence intervals

Notes

This function calls _get_return_variable, which will attempt to use the block_size attribute from the BMS to get the block size. If the block_size attribute is not found, an default of 1 will be used. The block size is used to get the annual probability of the desired event when the block size is > 1 year.

Source code in climakitae/explore/threshold_tools.py
def get_return_prob(
    bms: xr.DataArray,
    threshold: float,
    distr: str = "gev",
    bootstrap_runs: int = 100,
    conf_int_lower_bound: float = 2.5,
    conf_int_upper_bound: float = 97.5,
    multiple_points: bool = True,
    extremes_type: str = "max",
    dropna_time: bool = False,
    dim_to_fit: str = "time",
) -> xr.Dataset:
    """Creates xarray Dataset with return probabilities and confidence intervals from maximum series.

    Parameters
    ----------
    bms : xarray.DataArray
        Block maximum series, can be output from the function get_block_maxima()
    threshold : float
        The threshold value for which to calculate the probability of exceedance
    distr : str
        The type of extreme value distribution to fit
    bootstrap_runs : int
        Number of bootstrap samples
    conf_int_lower_bound : float
        Confidence interval lower bound
    conf_int_upper_bound : float
        Confidence interval upper bound
    multiple_points : boolean
        Whether or not the data contains multiple points (has x, y dimensions)
    extremes_type : str, optional
        Whether to compute max ('max') or min ('min') extremes, by default 'max'.
    dropna_time: boolean
        Whether to drop NaNs along the time axis
    dim_to_fit : str
        Name of the dimension that the distribution is going to be fit on.

    Returns
    -------
    xarray.Dataset
        Dataset with return probabilities and confidence intervals

    Notes
    -----
    This function calls _get_return_variable, which will attempt to use the block_size
    attribute from the BMS to get the block size. If the block_size attribute is not found,
    an default of 1 will be used. The block size is used to get the annual probability of
    the desired event when the block size is > 1 year.

    """
    return _get_return_variable(
        bms,
        "return_prob",
        threshold,
        distr,
        bootstrap_runs,
        conf_int_lower_bound,
        conf_int_upper_bound,
        multiple_points,
        extremes_type,
        dropna_time=dropna_time,
        dim_to_fit=dim_to_fit,
    )

get_return_period(bms, return_value, distr='gev', bootstrap_runs=100, conf_int_lower_bound=2.5, conf_int_upper_bound=97.5, multiple_points=True, extremes_type='max', dropna_time=False, dim_to_fit='time')

Creates xarray Dataset with return periods and confidence intervals from maximum series.

Parameters:

Name Type Description Default
bms DataArray

Block maximum series, can be output from the function get_block_maxima()

required
return_value float

The threshold value for which to calculate the return period of occurance

required
distr str

The type of extreme value distribution to fit

'gev'
bootstrap_runs int

Number of bootstrap samples

100
conf_int_lower_bound float

Confidence interval lower bound

2.5
conf_int_upper_bound float

Confidence interval upper bound

97.5
multiple_points boolean

Whether or not the data contains multiple points (has x, y dimensions)

True
extremes_type str

Whether to compute max ('max') or min ('min') extremes, by default 'max'.

'max'
dropna_time bool

Whether to drop NaNs along the time axis

False
dim_to_fit str

Name of the dimension that the distribution is going to be fit on.

'time'

Returns:

Type Description
Dataset

Dataset with return periods and confidence intervals

Notes

This function calls _get_return_variable, which will attempt to use the block_size attribute from the BMS to get the block size. If the block_size attribute is not found, an default of 1 will be used. The block size is used to get the annual probability of the desired event when the block size is > 1 year.

Source code in climakitae/explore/threshold_tools.py
def get_return_period(
    bms: xr.DataArray,
    return_value: float,
    distr: str = "gev",
    bootstrap_runs: int = 100,
    conf_int_lower_bound: float = 2.5,
    conf_int_upper_bound: float = 97.5,
    multiple_points: bool = True,
    extremes_type: str = "max",
    dropna_time: bool = False,
    dim_to_fit: str = "time",
) -> xr.Dataset:
    """Creates xarray Dataset with return periods and confidence intervals from maximum series.

    Parameters
    ----------
    bms : xarray.DataArray
        Block maximum series, can be output from the function get_block_maxima()
    return_value : float
        The threshold value for which to calculate the return period of occurance
    distr : str
        The type of extreme value distribution to fit
    bootstrap_runs : int
        Number of bootstrap samples
    conf_int_lower_bound : float
        Confidence interval lower bound
    conf_int_upper_bound : float
        Confidence interval upper bound
    multiple_points : boolean
        Whether or not the data contains multiple points (has x, y dimensions)
    extremes_type : str, optional
        Whether to compute max ('max') or min ('min') extremes, by default 'max'.
    dropna_time: boolean
        Whether to drop NaNs along the time axis
    dim_to_fit : str
        Name of the dimension that the distribution is going to be fit on.

    Returns
    -------
    xarray.Dataset
        Dataset with return periods and confidence intervals

    Notes
    -----
    This function calls _get_return_variable, which will attempt to use the block_size
    attribute from the BMS to get the block size. If the block_size attribute is not found,
    an default of 1 will be used. The block size is used to get the annual probability of
    the desired event when the block size is > 1 year.

    """
    return _get_return_variable(
        bms,
        "return_period",
        return_value,
        distr,
        bootstrap_runs,
        conf_int_lower_bound,
        conf_int_upper_bound,
        multiple_points,
        extremes_type,
        dropna_time=dropna_time,
        dim_to_fit=dim_to_fit,
    )

get_exceedance_count(da, threshold_value, duration1=UNSET, period=(1, 'year'), threshold_direction='above', duration2=UNSET, groupby=UNSET, smoothing=UNSET)

Calculate the number of occurances of exceeding the specified threshold within each period.

Returns an xarray.DataArray with the same coordinates as the input data except for the time dimension, which will be collapsed to one value per period (equal to the number of event occurances in each period).

Parameters:

Name Type Description Default
da DataArray

array of some climate variable. Can have multiple scenarios, simulations, or x and y coordinates.

required
threshold_value float

value against which to test exceedance

required
duration1 tuple[int, str]

length of exceedance in order to qualify as an event (before grouping)

UNSET
period tuple[int, str]

amount of time across which to sum the number of occurances, default is (1, "year"). Specified as a tuple: (x, time) where x is an integer, and time is one of: ["day", "month", "year"]

(1, 'year')
threshold_direction str

either "above" or "below", default is above.

'above'
duration2 tuple[int, str]

length of exceedance in order to qualify as an event (after grouping)

UNSET
groupby tuple[int, str]

see examples for explanation. Typical grouping could be (1, "day")

UNSET
smoothing int

option to average the result across multiple periods with a rolling average; value is either UNSET or the number of timesteps to use as the window size

UNSET

Returns:

Type Description
DataArray
Source code in climakitae/explore/threshold_tools.py
def get_exceedance_count(
    da: xr.DataArray,
    threshold_value: float,
    duration1: tuple[int, str] = UNSET,
    period: tuple[int, str] = (1, "year"),
    threshold_direction="above",
    duration2: tuple[int, str] = UNSET,
    groupby: tuple[int, str] = UNSET,
    smoothing: int = UNSET,
) -> xr.DataArray:
    """Calculate the number of occurances of exceeding the specified threshold
    within each period.

    Returns an xarray.DataArray with the same coordinates as the input data except for
    the time dimension, which will be collapsed to one value per period (equal
    to the number of event occurances in each period).

    Parameters
    ----------
    da : xarray.DataArray
        array of some climate variable. Can have multiple
        scenarios, simulations, or x and y coordinates.
    threshold_value : float
        value against which to test exceedance
    duration1 : tuple[int, str]
        length of exceedance in order to qualify as an event (before grouping)
    period : tuple[int, str]
        amount of time across which to sum the number of occurances,
        default is (1, "year"). Specified as a tuple: (x, time) where x is an
        integer, and time is one of: ["day", "month", "year"]
    threshold_direction : str
        either "above" or "below", default is above.
    duration2 : tuple[int, str]
        length of exceedance in order to qualify as an event (after grouping)
    groupby : tuple[int, str]
        see examples for explanation. Typical grouping could be (1, "day")
    smoothing : int
        option to average the result across multiple periods with a
        rolling average; value is either UNSET or the number of timesteps to use
        as the window size

    Returns
    -------
    xarray.DataArray

    """
    # --------- Type check arguments -------------------------------------------

    # Check compatibility of periods, durations, and groupbys
    if _is_greater(duration1, groupby):
        raise ValueError(
            "Incompatible `group` and `duration1` specification. Duration1 must be shorter than group."
        )
    if _is_greater(groupby, duration2):
        raise ValueError(
            "Incompatible `group` and `duration2` specification. Duration2 must be longer than group."
        )
    if _is_greater(groupby, period):
        raise ValueError(
            "Incompatible `group` and `period` specification. Group must be longer than period."
        )
    if _is_greater(duration2, period):
        raise ValueError(
            "Incompatible `duration` and `period` specification. Period must be longer than duration."
        )

    # Check compatibility of specifications with the data frequency (hourly, daily, or monthly)
    freq = (
        (1, "hour")
        if da.frequency == "hourly"
        else ((1, "day") if da.frequency == "daily" else (1, "month"))
    )
    if _is_greater(freq, groupby):
        raise ValueError(
            "Incompatible `group` specification: cannot be less than data frequency."
        )
    if _is_greater(freq, duration2):
        raise ValueError(
            "Incompatible `duration` specification: cannot be less than data frequency."
        )
    if _is_greater(freq, period):
        raise ValueError(
            "Incompatible `period` specification: cannot be less than data frequency."
        )

    # --------- Calculate occurances -------------------------------------------

    events_da = _get_exceedance_events(
        da, threshold_value, threshold_direction, duration1, groupby
    )

    # --------- Apply specified duration requirement ---------------------------

    if duration2 is not UNSET:
        dur_len, dur_type = duration2

        if (
            groupby is not UNSET
            and groupby[1] == dur_type
            or groupby is UNSET
            and freq[1] == dur_type
        ):
            window_size = dur_len
        else:
            raise ValueError(
                "Duration options for time types (i.e. hour, day) that are different than group or frequency not yet implemented"
            )

        # The "min" operation will return 0 if any time in the window is not an
        # event, which is the behavior we want. It will only return 1 for True
        # if all values in the duration window are 1.
        events_da = events_da.rolling(time=window_size, center=False).min("time")

    # --------- Sum occurances across each period ------------------------------

    period_len, period_type = period
    period_indexer = _get_freq_string(period_type)
    exceedance_count = events_da.resample(
        time=f"{period_len}{period_indexer}", label="left"
    ).sum()

    # Optional smoothing
    if smoothing is not UNSET:
        exceedance_count = exceedance_count.rolling(time=smoothing, center=True).mean(
            "time"
        )

    # --------- Set new attributes for the counts DataArray --------------------
    exceedance_count.attrs["variable_name"] = da.name
    exceedance_count.attrs["variable_units"] = exceedance_count.units
    exceedance_count.attrs["period"] = period
    exceedance_count.attrs["duration1"] = duration1
    exceedance_count.attrs["group"] = groupby
    exceedance_count.attrs["duration2"] = duration2
    exceedance_count.attrs["threshold_value"] = threshold_value
    exceedance_count.attrs["threshold_direction"] = threshold_direction
    exceedance_count.attrs["units"] = _exceedance_count_name(exceedance_count)

    # Set name (for plotting, this will be the y-axis label)
    exceedance_count.name = "Count"

    return exceedance_count

exceedance_plot_title(exceedance_count)

Function to build title for exceedance plots

Helper function for making the title for exceedance plots.

Parameters:

Name Type Description Default
exceedance_count DataArray
required

Returns:

Type Description
string

Examples:

'Air Temperatue at 2m: events above 35C'
'Preciptation (total): events below 10mm'
Source code in climakitae/explore/threshold_tools.py
def exceedance_plot_title(exceedance_count: xr.DataArray) -> str:
    """Function to build title for exceedance plots

    Helper function for making the title for exceedance plots.

    Parameters
    ----------
    exceedance_count : xarray.DataArray

    Returns
    -------
    string

    Examples
    --------
        'Air Temperatue at 2m: events above 35C'
        'Preciptation (total): events below 10mm'

    """
    return f"{exceedance_count.variable_name}: events {exceedance_count.threshold_direction} {exceedance_count.threshold_value}{exceedance_count.variable_units}"

exceedance_plot_subtitle(exceedance_count)

Function of build exceedance plot subtitle

Helper function for making the subtile for exceedance plots.

Parameters:

Name Type Description Default
exceedance_count DataArray
required

Returns:

Type Description
string

Examples:

'Number of hours per year'
'Number of 4-hour events per 3-months'
'Number of days per year with conditions lasting at least 4-hours'
Source code in climakitae/explore/threshold_tools.py
def exceedance_plot_subtitle(exceedance_count: xr.DataArray) -> str:
    """Function of build exceedance plot subtitle

    Helper function for making the subtile for exceedance plots.

    Parameters
    ----------
    exceedance_count : xarray.DataArray

    Returns
    -------
    string

    Examples
    --------
        'Number of hours per year'
        'Number of 4-hour events per 3-months'
        'Number of days per year with conditions lasting at least 4-hours'

    """
    if exceedance_count.duration2 != exceedance_count.duration1:
        dur_len, dur_type = exceedance_count.duration1
        _s = "" if dur_len == 1 else "s"
        dur_str = f" with conditions lasting at least {dur_len} {dur_type}{_s}"
    else:
        dur_str = ""

    if exceedance_count.duration2 != exceedance_count.group:
        grp_len, grp_type = exceedance_count.group
        if grp_len == 1:
            grp_str = f" each {grp_type}"
        else:
            grp_str = f" every {grp_len} {grp_type}s"
    else:
        grp_str = ""

    per_len, per_type = exceedance_count.period
    if per_len == 1:
        period_str = f" each {per_type}"
    else:
        period_str = f" per {per_len}-{per_type} period"

    _subtitle = (
        _exceedance_count_name(exceedance_count) + period_str + dur_str + grp_str
    )
    return _subtitle

Thresholds

get_threshold_data(selections)

This function pulls data from the catalog and reads it into memory

Arguments

selections : DataParameters object holding user's selections

Returns:

Name Type Description
data DataArray

data to use for creating postage stamp data

Source code in climakitae/explore/thresholds.py
def get_threshold_data(selections: DataParameters) -> xr.DataArray:
    """This function pulls data from the catalog and reads it into memory

    Arguments
    ---------
    selections : DataParameters
        object holding user's selections

    Returns
    -------
    data : xr.DataArray
        data to use for creating postage stamp data

    """
    # Read data from catalog
    data = read_catalog_from_select(selections)
    data = data.compute()  # Read into memory
    return data

Vulnerability Tables

create_vul_table(example_loc, percentile, heat_idx_threshold, one_in_x)

Creates a vulnerability assessment table and exports the table to CSV.

Source code in climakitae/explore/vulnerability_table.py
def create_vul_table(example_loc, percentile, heat_idx_threshold, one_in_x):
    """Creates a vulnerability assessment table and exports the table to CSV."""
    # Create empty df and instantiate variables
    df = pd.DataFrame(columns=table_vars.keys())
    lat, lon = example_loc.lat.item(), example_loc.lon.item()
    months_map = {"winter": [12, 1, 2], "summer": [6, 7, 8], "all": np.arange(1, 13)}
    time_periods = [
        (1981, 2010),
        (2021, 2050),
        (2031, 2060),
        (2061, 2090),
    ]

    # Create each column in the table, which is the historical period (1980-2010) and each WL (1.5, 2.0, 3.0, 4.0).
    for time_period in time_periods:

        metrics = []

        # Calculate each variable in the table
        for key in table_vars:
            params = table_vars[key]

            # Suppress outputs of `cava_data` function
            if suppress_output:

                with contextlib.redirect_stdout(io.StringIO()):

                    data = cava_data(
                        example_loc,
                        variable=params["variable"],
                        approach="time",
                        downscaling_method="Dynamical",
                        time_start_year=time_period[0],
                        time_end_year=time_period[1],
                        historical_data="Historical Climate",  # or "historical reconstruction"
                        ssp_data=["SSP3-7.0"],
                        metric_calc=params["metric_calc"],
                        heat_idx_threshold=params["heat_idx_threshold"]
                        and heat_idx_threshold,  # Heat index
                        one_in_x=params["one_in_x"]
                        and one_in_x,  # Thresholds tools freq. counts
                        percentile=params["percentile"] and percentile,
                        season=params["season"],
                        units="degF",
                        wrf_bias_adjust=True,
                        export_method="None",  # off-ramp, full calculate, both
                        separate_files=True,  # Toggle to determine whether or not the user wants to separate climate variable information into separate files
                        file_format="NetCDF",
                    )

            # Retrieve data and average across simulation dimension
            val = data["calc_data"][0].mean(dim="simulation").item()

            # Add val to metrics to be added into row
            metrics.append(val)

        # Create dictionary of values to be input into DataFrame
        df.loc[str(time_period)] = pd.Series(dict(zip(table_vars.keys(), metrics)))

    # Make slight modifications to DataFrame
    # df = df.T.rename(columns={"0.0": "Hist. Period (1980-2010)"})
    df = df.T

    # Write out dataframe
    df.to_csv(f"final_table_{example_loc.index.item()}.csv", index=True)

    return df