Skip to content

Processor Utilities

Internal utilities for data processor implementation.

Overview

The processor utilities modules provide base classes and helper functions for implementing data processors in the ClimateData interface architecture: - Abstract base classDataProcessor base class and registry - Processor utilities — Helper functions for common operations

Data Processor Base Class

Bases: ABC

Abstract base class for data processing.

All data processors should inherit from this class and implement the required methods.

Notes
  • Processors should only store parameters needed for processing, not the data itself.
  • Processors should not throw exceptions; instead, they should return the data and a warning message if needed.
  • All processors should update the context with information about how they modified the data.

Methods:

Name Description
execute

Process the data and return the result.

update_context

Update the context with additional parameters.

set_data_accessor

Set the data accessor for the processor.

execute(result, context) abstractmethod

Process raw data into the required format.

Parameters:

Name Type Description Default
result Dataset, DataArray, or iterable of these

Data to be processed.

required
context dict

Parameters for processing the data.

required

Returns:

Type Description
Dataset, DataArray, or iterable of these

Processed data.

Raises:

Type Description
ValueError

If the data cannot be processed.

Source code in climakitae/new_core/processors/abc_data_processor.py
@abstractmethod
def execute(
    self,
    result: Union[
        xr.Dataset, xr.DataArray, Iterable[Union[xr.Dataset, xr.DataArray]]
    ],
    context: Dict[str, Any],
) -> Union[xr.Dataset, xr.DataArray, Iterable[Union[xr.Dataset, xr.DataArray]]]:
    """Process raw data into the required format.

    Parameters
    ----------
    result : Dataset, DataArray, or iterable of these
        Data to be processed.
    context : dict
        Parameters for processing the data.

    Returns
    -------
    Dataset, DataArray, or iterable of these
        Processed data.

    Raises
    ------
    ValueError
        If the data cannot be processed.

    """

update_context(context) abstractmethod

Update the context with additional parameters.

Parameters:

Name Type Description Default
context dict

Parameters for processing the data.

required

Returns:

Type Description
None
Source code in climakitae/new_core/processors/abc_data_processor.py
@abstractmethod
def update_context(self, context: Dict[str, Any]) -> None:
    """Update the context with additional parameters.

    Parameters
    ----------
    context : dict
        Parameters for processing the data.

    Returns
    -------
    None

    """

set_data_accessor(catalog) abstractmethod

Set the data accessor for the processor.

Parameters:

Name Type Description Default
catalog DataCatalog

Data catalog for accessing datasets.

required

Returns:

Type Description
None
Source code in climakitae/new_core/processors/abc_data_processor.py
@abstractmethod
def set_data_accessor(self, catalog: "DataCatalog") -> None:
    """Set the data accessor for the processor.

    Parameters
    ----------
    catalog : DataCatalog
        Data catalog for accessing datasets.

    Returns
    -------
    None

    """

Processor Registry

Decorator to register a processor class.

Parameters:

Name Type Description Default
key str

The key to register the processor under. If not provided, a key will be generated from the class name.

UNSET
priority int

Optional priority for the processor. Lower values indicate higher priority.

UNSET

Returns:

Type Description
callable

The decorator function that registers the processor class.

Examples:

@register_processor("my_processor", priority=10) class MyProcessor(DataProcessor): ...

Source code in climakitae/new_core/processors/abc_data_processor.py
def register_processor(
    key: str | object = UNSET, priority: int | object = UNSET
) -> Callable:
    """Decorator to register a processor class.

    Parameters
    ----------
    key : str, optional
        The key to register the processor under. If not provided, a key
        will be generated from the class name.
    priority : int, optional
        Optional priority for the processor. Lower values indicate higher priority.

    Returns
    -------
    callable
        The decorator function that registers the processor class.

    Examples
    --------
    @register_processor("my_processor", priority=10)
    class MyProcessor(DataProcessor):
        ...

    """

    def decorator(cls: Type[DataProcessor]) -> Type[DataProcessor]:
        # If no key is provided, generate one from the class name
        processor_key = (
            key
            if key is not UNSET
            else "".join(
                ["_" + c.lower() if c.isupper() else c for c in cls.__name__]
            ).lstrip("_")
        )
        _PROCESSOR_REGISTRY[processor_key] = (cls, priority)
        return cls

    return decorator

Processor Utilities

Utility functions for processing data arrays in climakitae.

is_station_identifier(value)

Check if a string looks like a station identifier.

This function uses heuristics to determine if a string appears to be a weather station identifier based on common patterns.

Parameters:

Name Type Description Default
value str

String to check

required

Returns:

Type Description
bool

True if the value looks like a station code or station name

Notes

Recognizes two patterns: 1. 4-character codes starting with 'K' (common US airport weather stations) Examples: KSAC (Sacramento), KBFL (Bakersfield), KSFO (San Francisco) 2. Strings with parentheses containing a code with 'K' Examples: "Sacramento (KSAC)", "San Francisco International (KSFO)"

Examples:

>>> is_station_identifier("KSAC")
True
>>> is_station_identifier("Sacramento (KSAC)")
True
>>> is_station_identifier("CA")
False
>>> is_station_identifier("Kern County")
False
Source code in climakitae/new_core/processors/processor_utils.py
def is_station_identifier(value: str) -> bool:
    """
    Check if a string looks like a station identifier.

    This function uses heuristics to determine if a string appears to be
    a weather station identifier based on common patterns.

    Parameters
    ----------
    value : str
        String to check

    Returns
    -------
    bool
        True if the value looks like a station code or station name

    Notes
    -----
    Recognizes two patterns:
    1. 4-character codes starting with 'K' (common US airport weather stations)
       Examples: KSAC (Sacramento), KBFL (Bakersfield), KSFO (San Francisco)
    2. Strings with parentheses containing a code with 'K'
       Examples: "Sacramento (KSAC)", "San Francisco International (KSFO)"

    Examples
    --------
    >>> is_station_identifier("KSAC")
    True
    >>> is_station_identifier("Sacramento (KSAC)")
    True
    >>> is_station_identifier("CA")
    False
    >>> is_station_identifier("Kern County")
    False
    """
    # Check if it's a 4-character code starting with 'K' (common US airport codes)
    if len(value) == 4 and value[0].upper() == "K" and value.isalnum():
        return True

    # Check if it contains parentheses with a code (e.g., "Sacramento (KSAC)")
    if "(" in value and ")" in value and "K" in value.upper():
        return True

    return False

find_station_match(station_identifier, stations_df)

Find matching station(s) in the stations DataFrame.

This function centralizes the station matching logic used by both the Clip processor and the clip parameter validator. It tries multiple matching strategies in order of specificity: 1. Exact match on station ID column 2. Exact match on station name column 3. Partial match on station name column

Parameters:

Name Type Description Default
station_identifier str

Station identifier to search for (e.g., "KSAC", "Sacramento (KSAC)", "Sacramento")

required
stations_df DataFrame

DataFrame containing station data with columns: ID, station, city, state, LAT_Y, LON_X

required

Returns:

Type Description
DataFrame

DataFrame containing matching station(s). May have 0, 1, or multiple rows: - Empty (len=0): No matches found - Single row (len=1): Exact match found - Multiple rows (len>1): Multiple stations match the identifier

Notes

The caller is responsible for: - Checking if stations_df is None or empty before calling - Handling the different match scenarios (no match, single match, multiple matches) - Providing appropriate error messages or warnings based on context

Examples:

>>> # For validation (clip_param_validator.py)
>>> match = find_station_match("KSAC", stations_df)
>>> if len(match) == 0:
...     # Handle no match - provide suggestions
>>> elif len(match) > 1:
...     # Handle multiple matches - ask user to be more specific
>>> else:
...     # Valid single match
...     return True
>>> # For coordinate extraction (clip.py)
>>> match = find_station_match("KSAC", stations_df)
>>> if len(match) == 0:
...     # Raise ValueError with suggestions
>>> elif len(match) > 1:
...     # Raise ValueError asking for more specific identifier
>>> else:
...     # Extract coordinates and metadata
...     lat = float(match.iloc[0]["LAT_Y"])
...     lon = float(match.iloc[0]["LON_X"])
Source code in climakitae/new_core/processors/processor_utils.py
def find_station_match(station_identifier: str, stations_df) -> "pd.DataFrame":
    """
    Find matching station(s) in the stations DataFrame.

    This function centralizes the station matching logic used by both the Clip
    processor and the clip parameter validator. It tries multiple matching strategies
    in order of specificity:
    1. Exact match on station ID column
    2. Exact match on station name column
    3. Partial match on station name column

    Parameters
    ----------
    station_identifier : str
        Station identifier to search for (e.g., "KSAC", "Sacramento (KSAC)", "Sacramento")
    stations_df : pd.DataFrame
        DataFrame containing station data with columns: ID, station, city, state, LAT_Y, LON_X

    Returns
    -------
    pd.DataFrame
        DataFrame containing matching station(s). May have 0, 1, or multiple rows:
        - Empty (len=0): No matches found
        - Single row (len=1): Exact match found
        - Multiple rows (len>1): Multiple stations match the identifier

    Notes
    -----
    The caller is responsible for:
    - Checking if stations_df is None or empty before calling
    - Handling the different match scenarios (no match, single match, multiple matches)
    - Providing appropriate error messages or warnings based on context

    Examples
    --------
    >>> # For validation (clip_param_validator.py)
    >>> match = find_station_match("KSAC", stations_df)
    >>> if len(match) == 0:
    ...     # Handle no match - provide suggestions
    >>> elif len(match) > 1:
    ...     # Handle multiple matches - ask user to be more specific
    >>> else:
    ...     # Valid single match
    ...     return True

    >>> # For coordinate extraction (clip.py)
    >>> match = find_station_match("KSAC", stations_df)
    >>> if len(match) == 0:
    ...     # Raise ValueError with suggestions
    >>> elif len(match) > 1:
    ...     # Raise ValueError asking for more specific identifier
    >>> else:
    ...     # Extract coordinates and metadata
    ...     lat = float(match.iloc[0]["LAT_Y"])
    ...     lon = float(match.iloc[0]["LON_X"])
    """
    # Normalize the input
    station_id_upper = station_identifier.upper().strip()

    # Handle empty string after normalization
    if not station_id_upper:
        # Return empty DataFrame with same structure
        return stations_df.iloc[0:0]

    # Try exact match on ID column
    match = stations_df[stations_df["ID"].str.upper() == station_id_upper]

    if len(match) == 0:
        # Try matching on station name (full station column)
        match = stations_df[stations_df["station"].str.upper() == station_id_upper]

    if len(match) == 0:
        # Try partial match on station name
        match = stations_df[
            stations_df["station"].str.upper().str.contains(station_id_upper, na=False)
        ]

    return match

extend_time_domain(result)

Extend the time domain of the input data to cover 1980-2100.

This method ensures that all SSP scenarios have historical data included in the time series, allowing for proper warming level calculations. This is handled by concatenating historical data with SSP data and updating the attributes to that of the SSP data. Historical data is expected to be available in the input dictionary with keys formatted the same as SSP keys but with "historical" instead of r"ssp.{3}" (e.g., "ssp245" becomes "historical").

Parameters:

Name Type Description Default
result Dict[str, Union[Dataset | DataArray]]

A dictionary containing time-series data with keys representing different scenarios.

required

Returns:

Type Description
Union[Dataset, DataArray]

The extended time-series data.

Notes
  • By construction, this function will drop reanalysis data.
Source code in climakitae/new_core/processors/processor_utils.py
def extend_time_domain(
    result: Dict[str, Union[xr.Dataset, xr.DataArray]],
) -> Union[xr.Dataset, xr.DataArray]:
    """Extend the time domain of the input data to cover 1980-2100.

    This method ensures that all SSP scenarios have historical data
    included in the time series, allowing for proper warming level calculations.
    This is handled by concatenating historical data with SSP data and updating
    the attributes to that of the SSP data. Historical data is expected to be
    available in the input dictionary with keys formatted the same as SSP keys
    but with "historical" instead of r"ssp.{3}" (e.g., "ssp245" becomes "historical").

    Parameters
    ----------
    result : Dict[str, Union[xr.Dataset | xr.DataArray]]
        A dictionary containing time-series data with keys representing different scenarios.

    Returns
    -------
    Union[xr.Dataset, xr.DataArray]
        The extended time-series data.

    Notes
    -----
    - By construction, this function will drop reanalysis data.

    """
    ret = {}

    # don't run twice, check if historical data was already prepended
    # this is to avoid unnecessary processing if the data has already been extended
    if any(v.attrs.get("historical_prepended", False) for v in result.values()):
        return result  # type: ignore

    logger.info(
        "Prepending historical data to SSP scenarios. This is the default concatenation strategy for retrieved data in climakitae. "
        "To change this behavior, set 'concat': 'sim' in your processes dictionary."
    )

    # Process SSP scenarios by finding and prepending historical data
    for key, data in result.items():
        if "ssp" not in key:
            # drop non-SSP data since historical gets prepended
            continue

        # Find corresponding historical key by replacing SSP pattern with "historical"
        hist_key = re.sub(r"ssp.{3}", "historical", key)

        if hist_key not in result:
            logger.warning(
                f"\n\nNo historical data found for {key} with key {hist_key}. "
                f"\nHistorical data is required for time domain extension. "
                f"\nKeeping original SSP data without historical extension.\n"
            )
            ret[key] = data
            continue

            # Concatenate historical and SSP data along time dimension
        try:
            hist_data = result[hist_key]
            # Use proper xr.concat with explicit typing and optimized settings
            # compat="override" skips expensive coordinate equality checks
            # coords="minimal" avoids duplicating coordinates
            concat_kwargs = {
                "dim": "time",
                "coords": "minimal",
                "compat": "override",
                "join": "outer",
            }
            hist_common = hist_data
            data_common = data

            if "member_id" in data.dims and "member_id" in hist_data.dims:
                # Find common member_ids between historical and SSP data
                common_members = np.intersect1d(
                    hist_data.member_id.values,
                    data.member_id.values,
                )

                # Select only matching members
                hist_common = hist_data.sel(member_id=common_members)
                data_common = data.sel(member_id=common_members)

            # Capture variable-level attrs and spatial coords BEFORE concat
            orig_var_attrs = {}
            orig_spatial_coords = {}
            if isinstance(data, xr.Dataset):
                orig_var_attrs = {var: dict(data[var].attrs) for var in data.data_vars}
                spatial_coord_names = ["Lambert_Conformal", "x", "y"]
                orig_spatial_coords = {
                    name: data.coords[name]
                    for name in spatial_coord_names
                    if name in data.coords
                }

            if isinstance(data, xr.Dataset) and isinstance(hist_common, xr.Dataset):
                extended_data = xr.concat([hist_common, data_common], **concat_kwargs)  # type: ignore
            elif isinstance(data, xr.DataArray) and isinstance(
                hist_common, xr.DataArray
            ):
                extended_data = xr.concat([hist_common, data_common], **concat_kwargs)  # type: ignore
            else:
                # Handle mixed types by converting to same type
                if isinstance(data_common, xr.Dataset):
                    if isinstance(hist_common, xr.DataArray):
                        hist_data = hist_common.to_dataset()
                    extended_data = xr.concat([hist_common, data_common], **concat_kwargs)  # type: ignore
                else:  # data is DataArray
                    if isinstance(hist_common, xr.Dataset):
                        data_common = data_common.to_dataset()
                    extended_data = xr.concat([hist_common, data], **concat_kwargs)  # type: ignore

            # Preserve SSP attributes (dataset-level)
            extended_data.attrs.update(data.attrs)

            # Restore variable-level attrs (including grid_mapping)
            if isinstance(extended_data, xr.Dataset):
                for var, var_attrs in orig_var_attrs.items():
                    if var in extended_data.data_vars:
                        extended_data[var].attrs.update(var_attrs)

            # Restore spatial coordinates (Lambert_Conformal, etc.)
            for coord_name, coord_val in orig_spatial_coords.items():
                if coord_name not in extended_data.coords:
                    extended_data = extended_data.assign_coords({coord_name: coord_val})
            # add key attr indicating historical data was prepended
            extended_data.attrs["historical_prepended"] = True
            ret[key] = extended_data

        except (ValueError, TypeError, KeyError, AttributeError) as e:
            logger.warning(
                f"\n\nFailed to concatenate historical and SSP data for {key}: {e}"
                f"\nSince no historical data is available, this data is dropped.\n"
            )

    return ret

get_station_coordinates(station_identifier, catalog, stations_df=None)

Get lat/lon coordinates and metadata for a station identifier.

This function provides a centralized way to extract station coordinates from the catalog. It's used by both the Clip processor and the StationBiasCorrection processor.

Parameters:

Name Type Description Default
station_identifier str

Station code (e.g., "KSAC") or full station name (e.g., "Sacramento (KSAC)")

required
catalog DataCatalog

Data catalog instance for accessing station metadata

required
stations_df DataFrame

Pre-loaded stations DataFrame. If None, will be loaded from catalog.

None

Returns:

Type Description
tuple[float, float, dict]

Latitude, longitude, and station metadata dictionary containing: - station_id: Station ID code - station_name: Full station name - city: City name - state: State abbreviation - elevation: Elevation value (if available)

Raises:

Type Description
RuntimeError

If catalog is not set or station data is not available

ValueError

If station is not found or multiple matches exist

Examples:

>>> lat, lon, metadata = get_station_coordinates("KSAC", catalog)
>>> print(f"Sacramento station at ({lat}, {lon})")
>>> print(f"Elevation: {metadata['elevation']}")
Source code in climakitae/new_core/processors/processor_utils.py
def get_station_coordinates(
    station_identifier: str, catalog, stations_df=None
) -> tuple[float, float, dict]:
    """
    Get lat/lon coordinates and metadata for a station identifier.

    This function provides a centralized way to extract station coordinates
    from the catalog. It's used by both the Clip processor and the
    StationBiasCorrection processor.

    Parameters
    ----------
    station_identifier : str
        Station code (e.g., "KSAC") or full station name (e.g., "Sacramento (KSAC)")
    catalog : DataCatalog
        Data catalog instance for accessing station metadata
    stations_df : pd.DataFrame, optional
        Pre-loaded stations DataFrame. If None, will be loaded from catalog.

    Returns
    -------
    tuple[float, float, dict]
        Latitude, longitude, and station metadata dictionary containing:
        - station_id: Station ID code
        - station_name: Full station name
        - city: City name
        - state: State abbreviation
        - elevation: Elevation value (if available)

    Raises
    ------
    RuntimeError
        If catalog is not set or station data is not available
    ValueError
        If station is not found or multiple matches exist

    Examples
    --------
    >>> lat, lon, metadata = get_station_coordinates("KSAC", catalog)
    >>> print(f"Sacramento station at ({lat}, {lon})")
    >>> print(f"Elevation: {metadata['elevation']}")
    """
    from climakitae.core.constants import UNSET
    from climakitae.new_core.data_access.data_access import DataCatalog

    # Validate catalog
    if catalog is UNSET or not isinstance(catalog, DataCatalog):
        raise RuntimeError("DataCatalog is not set. Cannot access station data.")

    # Load stations if not provided
    if stations_df is None:
        stations_df = catalog["stations"]

    if stations_df is None or len(stations_df) == 0:
        raise RuntimeError("Station data is not available in the catalog.")

    # Use the generalized matching logic
    match = find_station_match(station_identifier, stations_df)

    if len(match) == 0:
        # Station not found - provide suggestions
        all_stations = stations_df["ID"].tolist() + stations_df["station"].tolist()
        from climakitae.new_core.param_validation.param_validation_tools import (
            _get_closest_options,
        )

        suggestions = _get_closest_options(station_identifier, all_stations, cutoff=0.5)

        error_msg = f"Station '{station_identifier}' not found in station database."
        if suggestions:
            error_msg += "\n\nDid you mean one of these?\n  - " + "\n  - ".join(
                suggestions[:5]
            )
        error_msg += "\n\nTo see all available stations, use: cd.show_station_options()"

        raise ValueError(error_msg)

    if len(match) > 1:
        # Multiple matches found - show options
        station_list = match[["ID", "station", "city", "state"]].to_string(index=False)
        raise ValueError(
            f"Multiple stations match '{station_identifier}':\n{station_list}\n\n"
            f"Please use a more specific identifier."
        )

    # Extract coordinates and metadata
    station_row = match.iloc[0]
    lat = float(station_row["LAT_Y"])
    lon = float(station_row["LON_X"])

    metadata = {
        "station_id": station_row["ID"],  # 4-letter airport code (e.g., "KSAC")
        "station_id_numeric": station_row.get(
            "station id", None
        ),  # Numeric ID for HadISD files
        "station_name": station_row["station"],
        "city": station_row["city"],
        "state": station_row["state"],
        "elevation": station_row.get("elevation", None),
    }

    return lat, lon, metadata

convert_stations_to_points(station_identifiers, catalog, stations_df=None)

Convert a list of station identifiers to lat/lon coordinates.

This function provides batch conversion of station identifiers to coordinates, used by processors that need to work with multiple stations.

Parameters:

Name Type Description Default
station_identifiers list[str]

List of station codes or names

required
catalog DataCatalog

Data catalog instance for accessing station metadata

required
stations_df DataFrame

Pre-loaded stations DataFrame. If None, will be loaded from catalog.

None

Returns:

Type Description
tuple[list[tuple[float, float]], list[dict]]

List of (lat, lon) tuples and list of metadata dictionaries

Raises:

Type Description
ValueError

If any station is not found or if there are validation errors

Examples:

>>> stations = ["KSAC", "KSFO", "KLAX"]
>>> points, metadata = convert_stations_to_points(stations, catalog)
>>> for (lat, lon), meta in zip(points, metadata):
...     print(f"{meta['station_name']}: ({lat}, {lon})")
Source code in climakitae/new_core/processors/processor_utils.py
def convert_stations_to_points(
    station_identifiers: list[str], catalog, stations_df=None
) -> tuple[list[tuple[float, float]], list[dict]]:
    """
    Convert a list of station identifiers to lat/lon coordinates.

    This function provides batch conversion of station identifiers to
    coordinates, used by processors that need to work with multiple stations.

    Parameters
    ----------
    station_identifiers : list[str]
        List of station codes or names
    catalog : DataCatalog
        Data catalog instance for accessing station metadata
    stations_df : pd.DataFrame, optional
        Pre-loaded stations DataFrame. If None, will be loaded from catalog.

    Returns
    -------
    tuple[list[tuple[float, float]], list[dict]]
        List of (lat, lon) tuples and list of metadata dictionaries

    Raises
    ------
    ValueError
        If any station is not found or if there are validation errors

    Examples
    --------
    >>> stations = ["KSAC", "KSFO", "KLAX"]
    >>> points, metadata = convert_stations_to_points(stations, catalog)
    >>> for (lat, lon), meta in zip(points, metadata):
    ...     print(f"{meta['station_name']}: ({lat}, {lon})")
    """
    # Load stations once if not provided
    if stations_df is None:
        from climakitae.core.constants import UNSET
        from climakitae.new_core.data_access.data_access import DataCatalog

        if catalog is UNSET or not isinstance(catalog, DataCatalog):
            raise RuntimeError("DataCatalog is not set. Cannot access station data.")
        stations_df = catalog["stations"]

    points = []
    metadata_list = []

    for station_id in station_identifiers:
        try:
            lat, lon, metadata = get_station_coordinates(
                station_id, catalog, stations_df
            )
            points.append((lat, lon))
            metadata_list.append(metadata)
        except ValueError as e:
            # Re-raise with context about which station failed
            raise ValueError(f"Error processing station '{station_id}': {str(e)}")

    return points, metadata_list

Implementation Guide

To create a new processor, follow this template:

from climakitae.new_core.processors.abc_data_processor import DataProcessor, register_processor

@register_processor(key="my_processor", priority=50)
class MyProcessor(DataProcessor):
    """Process climate data in a specific way.

    Parameters
    ----------
    config : dict
        Configuration dictionary with processor-specific parameters
    """

    def __init__(self, config=None):
        self.config = config or {}
        # Validation

    def execute(self, data, context):
        """Execute the processing step.

        Parameters
        ----------
        data : xr.Dataset or xr.DataArray
            Input climate data
        context : dict
            Shared processing context

        Returns
        -------
        xr.Dataset or xr.DataArray
            Processed data with same lazy evaluation properties
        """
        # Processing logic
        return processed_data

    def update_context(self, context):
        """Update shared processing context with metadata.

        Parameters
        ----------
        context : dict
            Shared context to update with processor metadata
        """
        # Store processor-specific metadata
        pass

    def set_data_accessor(self, catalog):
        """Configure data accessor if needed.

        Parameters
        ----------
        catalog : DataCatalog
            Data catalog instance for this processor
        """
        pass

See Also