Skip to content

Data Access Layer

Data catalog and boundary access.

DataCatalog Singleton

Bases: dict

Thread-safe singleton for managing catalog connections to climate data sources.

This class implements a thread-safe singleton pattern and inherits from dict to provide a unified interface for accessing multiple climate data catalogs. It manages connections to boundary, renewables, and general climate datasets through intake and intake-esm catalogs, offering convenient properties and methods for data querying and retrieval.

The class automatically initializes connections to predefined catalogs and supports dynamic addition of new catalogs.

Thread Safety

This class is thread-safe. The singleton instance is protected by a lock during creation, and the get_data() method accepts the catalog key as a parameter rather than storing it as mutable state, allowing concurrent queries from multiple threads.

Properties

data : intake_esm.core.esm_datastore Access to the main climate data catalog. boundary : intake.catalog.Catalog Access to the boundary conditions catalog. boundaries : Boundaries Access to the lazy-loading boundaries data manager. renewables : intake_esm.core.esm_datastore Access to the renewables data catalog. hdp: intake_esm.core.esm_datastore Access to the hdp data catalog

Methods:

Name Description
set_catalog

Add a new catalog to the collection.

get_data

Retrieve data from the specified catalog using query parameters.

resolve_catalog_key

Resolve and validate a catalog key, returning the closest match if needed.

Notes

This class implements the singleton pattern, ensuring only one instance exists throughout the application lifecycle. Multiple calls to DataCatalog() will return the same instance.

The class automatically handles catalog initialization and provides sensible defaults when invalid catalog keys are specified.

Examples:

Thread-safe concurrent usage:

>>> from concurrent.futures import ThreadPoolExecutor
>>> catalog = DataCatalog()
>>> def fetch_data(params):
...     query, catalog_key = params
...     return catalog.get_data(query, catalog_key=catalog_key)
>>> with ThreadPoolExecutor(max_workers=4) as executor:
...     results = list(executor.map(fetch_data, queries_and_keys))

Initialize the DataCatalog instance.

This method sets up the catalog connections and initializes internal state. It only runs once due to the singleton pattern implementation.

The derived variable registry is attached to catalogs that support it, enabling users to query derived variables directly.

Source code in climakitae/new_core/data_access/data_access.py
def __init__(self) -> None:
    """Initialize the DataCatalog instance.

    This method sets up the catalog connections and initializes internal
    state. It only runs once due to the singleton pattern implementation.

    The derived variable registry is attached to catalogs that support it,
    enabling users to query derived variables directly.

    """
    if not getattr(self, "_initialized", False):
        super().__init__()

        # Get the derived variable registry (lazy import to avoid circular imports)
        from climakitae.new_core.derived_variables import get_registry

        self._derived_registry = get_registry()

        # Open catalogs with derived variable registry attached
        # Note: Only attach registry to catalogs with compatible schemas.
        # The HDP catalog uses station-based schema (station_id) rather than
        # gridded variable schema (variable_id), so skip registry attachment.
        self[CATALOG_CADCAT] = intake.open_esm_datastore(
            DATA_CATALOG_URL, registry=self._derived_registry
        )
        self[CATALOG_BOUNDARY] = intake.open_catalog(BOUNDARY_CATALOG_URL)
        try:
            self[CATALOG_REN_ENERGY_GEN] = intake.open_esm_datastore(
                RENEWABLES_CATALOG_URL, registry=self._derived_registry
            )
        except Exception as e:
            logger.warning(
                "Failed to load renewables catalog: %s. Renewables data will be unavailable.",
                e,
            )
            self[CATALOG_REN_ENERGY_GEN] = None
        # HDP catalog has different schema - no derived variable support
        try:
            self[CATALOG_HDP] = intake.open_esm_datastore(HDP_CATALOG_URL)
        except Exception as e:
            logger.warning(
                "Failed to load HDP catalog: %s. HDP data will be unavailable.", e
            )
            self[CATALOG_HDP] = None

        self.catalog_df = self.merge_catalogs()
        stations_df = pd.read_csv(HADISD_STATIONS_URL)
        # Convert string columns to object dtype to avoid StringDtype issues in pandas 2.2+
        for col in stations_df.select_dtypes(include=["string", "object"]).columns:
            if col not in ["LON_X", "LAT_Y"]:
                stations_df[col] = stations_df[col].astype("object")
        self["stations"] = gpd.GeoDataFrame(
            stations_df,
            crs="EPSG:4326",
            geometry=gpd.points_from_xy(stations_df.LON_X, stations_df.LAT_Y),
        )

        self._initialized = True
        # Initialize boundaries with lazy loading
        self._boundaries = UNSET
        self._boundaries_lock = threading.Lock()
        self.available_boundaries = UNSET

data property

Access data catalog.

Returns:

Type Description
esm_datastore

The main climate data catalog.

boundary property

Access boundary catalog.

Returns:

Type Description
Catalog

The boundary conditions catalog.

renewables property

Access renewables catalog.

Returns:

Type Description
esm_datastore

The renewables data catalog.

hdp property

Access historical data platform (histwxstns) catalog.

Returns:

Type Description
esm_datastore

The histwxstns data catalog.

boundaries property

Access boundaries data with lazy loading (thread-safe).

Returns:

Type Description
Boundaries

The lazy-loading boundaries data manager.

derived_registry property

Access the derived variable registry.

The registry contains definitions for derived variables that can be computed from source variables during data loading.

Returns:

Type Description
DerivedVariableRegistry

The intake-esm derived variable registry attached to the catalogs.

Examples:

>>> catalog = DataCatalog()
>>> print(catalog.derived_registry)
DerivedVariableRegistry({'wind_speed_10m': ..., 'heat_index': ...})

__new__()

Override new to implement thread-safe singleton pattern.

Uses double-checked locking to ensure thread-safe singleton creation while minimizing lock contention after initialization.

Returns:

Type Description
DataCatalog

The singleton instance of DataCatalog.

Source code in climakitae/new_core/data_access/data_access.py
def __new__(cls) -> "DataCatalog":
    """Override __new__ to implement thread-safe singleton pattern.

    Uses double-checked locking to ensure thread-safe singleton creation
    while minimizing lock contention after initialization.

    Returns
    -------
    DataCatalog
        The singleton instance of DataCatalog.

    """
    # Fast path: if already initialized, return without lock
    if cls._instance is not UNSET:
        return cls._instance

    # Slow path: acquire lock for initialization
    with cls._lock:
        # Double-check after acquiring lock
        if cls._instance is UNSET:
            cls._instance = super(DataCatalog, cls).__new__(cls)
            cls._instance._initialized = False
    return cls._instance

merge_catalogs()

Merge the AE intake catalogs into a single DataFrame.

This method combines the AE data catalogs into a unified DataFrame for easier searching and querying across all available datasets.

Returns:

Type Description
DataFrame

A DataFrame containing the merged data from AE catalogs with an additional 'catalog' column identifying the source catalog.

Source code in climakitae/new_core/data_access/data_access.py
def merge_catalogs(self) -> pd.DataFrame:
    """Merge the AE intake catalogs into a single DataFrame.

    This method combines the AE data catalogs into a unified
    DataFrame for easier searching and querying across all available datasets.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing the merged data from AE catalogs with an
        additional 'catalog' column identifying the source catalog.

    """
    data_df = self.data.df
    data_df["catalog"] = CATALOG_CADCAT

    dfs = [data_df]
    if self[CATALOG_REN_ENERGY_GEN] is not None:
        ren_df = self.renewables.df
        ren_df["catalog"] = CATALOG_REN_ENERGY_GEN
        dfs.append(ren_df)
    if self[CATALOG_HDP] is not None:
        hdp_df = self.hdp.df
        hdp_df["catalog"] = CATALOG_HDP
        dfs.append(hdp_df)

    ret = pd.concat(dfs, ignore_index=True)

    return ret

resolve_catalog_key(key)

Resolve and validate a catalog key.

This method validates the provided catalog key and attempts to find the closest match if the exact key is not found. This is a pure function that does not modify any instance state, making it thread-safe.

Parameters:

Name Type Description Default
key str

Key of the catalog to resolve. Should be one of the available catalog keys.

required

Returns:

Type Description
str or None

The resolved catalog key if valid or a close match is found, None if no valid key can be determined.

Warns:

Type Description
UserWarning

If the catalog key is not found and suggestions are provided.

Examples:

>>> catalog = DataCatalog()
>>> resolved = catalog.resolve_catalog_key("cadcat")
>>> resolved
'cadcat'
Source code in climakitae/new_core/data_access/data_access.py
def resolve_catalog_key(self, key: str) -> Optional[str]:
    """Resolve and validate a catalog key.

    This method validates the provided catalog key and attempts to find
    the closest match if the exact key is not found. This is a pure function
    that does not modify any instance state, making it thread-safe.

    Parameters
    ----------
    key : str
        Key of the catalog to resolve. Should be one of the available catalog keys.

    Returns
    -------
    str or None
        The resolved catalog key if valid or a close match is found,
        None if no valid key can be determined.

    Warns
    -----
    UserWarning
        If the catalog key is not found and suggestions are provided.

    Examples
    --------
    >>> catalog = DataCatalog()
    >>> resolved = catalog.resolve_catalog_key("cadcat")
    >>> resolved
    'cadcat'

    """
    if key in self:
        return key

    logger.warning(
        "\n\nCatalog key '%s' not found."
        "\nAttempting to find intended catalog key.\n\n",
        key,
    )
    logger.info("Available catalog keys: %s", list(self.keys()))
    closest = _get_closest_options(key, list(self.keys()))
    if not closest:
        logger.warning(
            "No catalog found for '%s'. Available options: %s",
            key,
            list(self.keys()),
        )
        return None

    match len(closest):
        case 0:
            logger.warning(
                "No catalog found for '%s'. Available options: %s",
                key,
                list(self.keys()),
            )
            return None
        case 1:
            logger.warning(
                "\n\nUsing closest match '%s' for catalog '%s'.",
                closest[0],
                key,
            )
            return closest[0]
        case _:
            logger.warning(
                "Multiple closest matches found for '%s': %s. "
                "Please specify a more precise key.",
                key,
                closest,
            )
            return None

set_catalog_key(key)

Set the catalog key (DEPRECATED - use resolve_catalog_key instead).

.. deprecated:: 1.5.0 This method stores mutable state on the singleton which is not thread-safe. Use :meth:resolve_catalog_key and pass the key directly to :meth:get_data instead.

Parameters:

Name Type Description Default
key str

Key of the catalog to set.

required

Returns:

Type Description
DataCatalog

The current instance (for backward compatibility).

Source code in climakitae/new_core/data_access/data_access.py
def set_catalog_key(self, key: str) -> "DataCatalog":
    """Set the catalog key (DEPRECATED - use resolve_catalog_key instead).

    .. deprecated:: 1.5.0
        This method stores mutable state on the singleton which is not
        thread-safe. Use :meth:`resolve_catalog_key` and pass the key
        directly to :meth:`get_data` instead.

    Parameters
    ----------
    key : str
        Key of the catalog to set.

    Returns
    -------
    DataCatalog
        The current instance (for backward compatibility).

    """
    import warnings

    warnings.warn(
        "set_catalog_key() is deprecated and not thread-safe. "
        "Use resolve_catalog_key() and pass catalog_key to get_data() instead.",
        DeprecationWarning,
        stacklevel=2,
    )
    self._catalog_key = self.resolve_catalog_key(key)
    return self

set_catalog(name, catalog)

Set a named catalog.

Parameters:

Name Type Description Default
name str

Name of the catalog to set.

required
catalog str

URL or path to the catalog file.

required

Returns:

Type Description
DataCatalog

The current instance of DataCatalog allowing method chaining.

Source code in climakitae/new_core/data_access/data_access.py
def set_catalog(self, name: str, catalog: str) -> "DataCatalog":
    """Set a named catalog.

    Parameters
    ----------
    name : str
        Name of the catalog to set.
    catalog : str
        URL or path to the catalog file.

    Returns
    -------
    DataCatalog
        The current instance of DataCatalog allowing method chaining.

    """
    self[name] = intake.open_esm_datastore(catalog)
    return self

get_data(query, catalog_key=None)

Get data from the specified catalog (thread-safe).

This method queries the specified catalog using the provided parameters and returns the matching datasets as a dictionary. The catalog_key is passed as a parameter rather than stored as instance state, making this method safe to call from multiple threads simultaneously.

Parameters:

Name Type Description Default
query dict

Query parameters for filtering data. The available parameters depend on the catalog and may include items like 'variable', 'scenario', 'model', etc.

required
catalog_key str

The key identifying which catalog to query. If not provided, falls back to the deprecated instance attribute (for backward compatibility).

None

Returns:

Type Description
dict[str, Dataset]

The requested dataset(s) from the catalog, keyed by dataset identifiers.

Raises:

Type Description
ValueError

If no catalog_key is provided and no default is available.

Examples:

>>> catalog = DataCatalog()
>>> query = {"variable_id": "tas", "experiment_id": "historical"}
>>> data = catalog.get_data(query, catalog_key="cadcat")
Source code in climakitae/new_core/data_access/data_access.py
def get_data(
    self, query: Dict[str, Any], catalog_key: Optional[str] = None
) -> Dict[str, xr.Dataset]:
    """Get data from the specified catalog (thread-safe).

    This method queries the specified catalog using the provided parameters
    and returns the matching datasets as a dictionary. The catalog_key is
    passed as a parameter rather than stored as instance state, making this
    method safe to call from multiple threads simultaneously.

    Parameters
    ----------
    query : dict
        Query parameters for filtering data. The available parameters
        depend on the catalog and may include items like 'variable',
        'scenario', 'model', etc.
    catalog_key : str, optional
        The key identifying which catalog to query. If not provided,
        falls back to the deprecated instance attribute (for backward
        compatibility).

    Returns
    -------
    dict[str, xr.Dataset]
        The requested dataset(s) from the catalog, keyed by dataset identifiers.

    Raises
    ------
    ValueError
        If no catalog_key is provided and no default is available.

    Examples
    --------
    >>> catalog = DataCatalog()
    >>> query = {"variable_id": "tas", "experiment_id": "historical"}
    >>> data = catalog.get_data(query, catalog_key="cadcat")

    """
    # Use provided catalog_key, fall back to deprecated instance attr
    effective_key = catalog_key
    if effective_key is None:
        effective_key = getattr(self, "_catalog_key", None)
    if effective_key is None:
        raise ValueError(
            "catalog_key must be provided. Use resolve_catalog_key() to "
            "validate the key before calling get_data()."
        )

    logger.info("Querying %s catalog", effective_key)
    logger.debug("Query parameters: %s", query)

    # Strip internal metadata keys that shouldn't be passed to catalog search
    # These are used internally for derived variable handling
    internal_keys = {"_derived_variable", "_source_variables", "_catalog_key"}
    search_query = {k: v for k, v in query.items() if k not in internal_keys}

    logger.debug("Querying %s catalog with query: %s", effective_key, search_query)

    logger.debug("Executing catalog search")

    # Check if a distributed client is active - if so, force synchronous scheduler
    # during data loading to prevent intake_esm from sending open_dataset tasks
    # to the cluster (workers may not have data access). The data remains lazy
    # (dask arrays) - we're only forcing the metadata/catalog operations to run locally.
    scheduler_override = None
    try:
        from dask.distributed import get_client

        client = get_client()
        if client.status == "running":
            scheduler_override = "synchronous"
            logger.debug(
                "Distributed client detected, using synchronous scheduler for data loading"
            )
    except (ImportError, ValueError):
        # No distributed client active, use default scheduler
        pass

    with dask.config.set(scheduler=scheduler_override):
        result = (
            self[effective_key]
            .search(**search_query)
            .to_dataset_dict(
                # Use consolidated=None for compatibility with both Zarr v2 and v3.
                # - True: requires consolidated metadata (fails on Zarr v3 without it)
                # - False: always reads metadata from individual arrays
                # - None: uses consolidated if available, falls back to individual reads
                zarr_kwargs={"consolidated": None},
                storage_options={"anon": True},
                progressbar=False,
            )
        )
    logger.info("Retrieved %d dataset(s) from catalog", len(result))
    logger.debug("Retrieved datasets: %s", list(result.keys()))

    # For HDP data, rename station coordinate to station_id for consistency
    if effective_key == CATALOG_HDP:
        for key in result:
            result[key] = result[key].rename({"station": "station_id"})
            logger.debug("Renamed station → station_id for dataset %s", key)

    # Apply derived variable computation if requested. If the intake-esm
    # registry was attached it may already have computed the derived
    # variable during `to_dataset_dict`. Avoid blindly re-applying the
    # derived function to prevent double-computation and accidental
    # removal of source variables.
    derived_var = query.get("_derived_variable")
    if derived_var:
        # Check if derived variable was already computed. The variable may
        # exist under the registered name OR the user's function may have
        # created a variable with a different name. We detect this by
        # checking if dependencies are missing (indicating the function ran).
        source_vars_from_query = query.get("_source_variables") or []

        def _should_skip_application(ds, derived_name, source_vars):
            """Determine if derived variable application should be skipped.

            Returns True if:
            - The derived variable already exists by exact name, OR
            - ALL source dependencies are missing (indicating drop_dependencies=True
              ran and cleaned them up after computation)

            Returns False if some but not all source vars are missing and the
            derived variable is also absent — this means the catalog returned an
            incomplete dataset (e.g. u10/v10 don't exist at the requested
            table_id), not that the function already ran.
            """
            # Exact match - derived variable exists
            if derived_name in ds.data_vars:
                return True, "exact_match"

            if source_vars:
                missing_deps = [v for v in source_vars if v not in ds.data_vars]
                if missing_deps:
                    # All deps gone + derived var absent → function ran and dropped them
                    if len(missing_deps) == len(source_vars):
                        return True, f"missing_deps:{missing_deps}"
                    # Partial deps missing + derived var absent → incomplete retrieval
                    logger.warning(
                        "Derived variable '%s' cannot be computed: source variables "
                        "%s were not retrieved from the catalog. Check that all "
                        "required variables exist at the requested table_id/resolution.",
                        derived_name,
                        missing_deps,
                    )
                    return False, f"incomplete_retrieval:{missing_deps}"

            return False, None

        try:
            skip_reasons = {}
            for key, ds in result.items():
                should_skip, reason = _should_skip_application(
                    ds, derived_var, source_vars_from_query
                )
                skip_reasons[key] = (should_skip, reason)

            all_skip = all(skip for skip, _ in skip_reasons.values())
        except Exception:
            all_skip = False
            skip_reasons = {}

        if all_skip:
            # Log detailed reason for skipping
            for key, (_, reason) in skip_reasons.items():
                if reason == "exact_match":
                    logger.debug(
                        "Derived variable '%s' already exists in dataset %s",
                        derived_var,
                        key,
                    )
                elif reason and reason.startswith("missing_deps"):
                    # Warn if variable name doesn't match but function ran
                    logger.debug(
                        "Derived variable '%s' not found by name in dataset %s, "
                        "but dependencies are missing (%s) - assuming function already ran. "
                        "Consider naming your output variable to match the registered name.",
                        derived_var,
                        key,
                        reason,
                    )
            logger.debug(
                "Skipping derived variable '%s' re-application for all datasets",
                derived_var,
            )
        else:
            result = self._apply_derived_variable(result, derived_var)

        # Post-retrieval fallback: ensure derived variable has spatial metadata
        # (CRS or grid_mapping). Intake-esm or the wrapped function may not
        # always successfully copy CRS metadata (name mismatches or upstream
        # behavior). If the derived variable exists but lacks CRS/grid_mapping,
        # attempt to copy metadata from source variables listed in the query
        # or from the registry metadata as a fallback.
        try:
            from climakitae.new_core.derived_variables.registry import (
                preserve_spatial_metadata,
                get_derived_variable_info,
            )

            source_vars_from_query = query.get("_source_variables") or []

            for key, ds in list(result.items()):
                if derived_var not in ds.data_vars:
                    # nothing to do for this dataset
                    continue

                da = ds[derived_var]
                has_crs = False
                try:
                    if hasattr(da, "rio") and da.rio.crs is not None:
                        has_crs = True
                except Exception:
                    has_crs = False

                has_grid_mapping = bool(da.attrs.get("grid_mapping"))

                if has_crs or has_grid_mapping:
                    # metadata present
                    continue

                # Determine candidate source variables
                candidates = (
                    list(source_vars_from_query) if source_vars_from_query else []
                )
                if not candidates:
                    info = get_derived_variable_info(derived_var)
                    if info:
                        candidates = list(info.depends_on)

                # Pick the first source var present in the dataset
                source_var = None
                for sv in candidates:
                    if sv in ds:
                        source_var = sv
                        break

                if source_var:
                    try:
                        preserve_spatial_metadata(ds, derived_var, source_var)
                        logger.debug(
                            "Post-retrieval: preserved metadata for '%s' in dataset %s from '%s'",
                            derived_var,
                            key,
                            source_var,
                        )
                    except Exception as e:
                        logger.debug(
                            "Post-retrieval: failed to preserve metadata for '%s' in dataset %s: %s",
                            derived_var,
                            key,
                            e,
                        )
                else:
                    logger.debug(
                        "Post-retrieval: no source variable available to preserve metadata for '%s' in dataset %s",
                        derived_var,
                        key,
                    )
        except Exception:
            # Be defensive: failures here should not bring down data retrieval.
            logger.debug(
                "Derived-variable post-retrieval metadata fallback failed",
                exc_info=True,
            )

    return result

list_clip_boundaries()

List all available boundary options for clipping operations.

This method populates the available_boundaries attribute with a dictionary of boundary categories and their available options. It's a convenience method that provides direct access to boundary options without needing to instantiate a Clip processor.

Notes

After calling this method, the available boundaries can be accessed via the available_boundaries attribute.

Examples:

>>> catalog = DataCatalog()
>>> catalog.list_clip_boundaries()
>>> print(catalog.available_boundaries["states"])
['AZ', 'CA', 'CO', 'ID', 'MT', 'NV', 'NM', 'OR', 'UT', 'WA', 'WY']
Source code in climakitae/new_core/data_access/data_access.py
def list_clip_boundaries(self) -> dict[str, list[str]]:
    """List all available boundary options for clipping operations.

    This method populates the `available_boundaries` attribute with a
    dictionary of boundary categories and their available options. It's a
    convenience method that provides direct access to boundary options
    without needing to instantiate a Clip processor.

    Notes
    -----
    After calling this method, the available boundaries can be accessed
    via the `available_boundaries` attribute.

    Examples
    --------
    >>> catalog = DataCatalog()
    >>> catalog.list_clip_boundaries()
    >>> print(catalog.available_boundaries["states"])
    ['AZ', 'CA', 'CO', 'ID', 'MT', 'NV', 'NM', 'OR', 'UT', 'WA', 'WY']

    """
    boundary_dict = self.boundaries.boundary_dict()

    # Create a clean dictionary with boundary categories and their available options
    self.available_boundaries = {}

    for category, lookups in boundary_dict.items():
        # Skip special categories that don't represent actual boundary data
        if category in ["none", "lat/lon"]:
            continue

        # Convert keys to a sorted list for better presentation
        boundary_keys = sorted(list(lookups.keys()))
        self.available_boundaries[category] = boundary_keys

    return self.available_boundaries

print_clip_boundaries()

Print all available boundary options for clipping in a user-friendly format.

This method provides a nicely formatted output showing all boundary categories and their available options for clipping operations. The output is formatted to be readable and includes summarized counts for categories with many options.

Examples:

>>> catalog = DataCatalog()
>>> catalog.print_clip_boundaries()
Available Boundary Options for Clipping:
========================================

states: - AZ, CA, CO, ID, MT ... and 6 more options

Source code in climakitae/new_core/data_access/data_access.py
def print_clip_boundaries(self) -> None:
    """Print all available boundary options for clipping in a user-friendly format.

    This method provides a nicely formatted output showing all boundary
    categories and their available options for clipping operations. The
    output is formatted to be readable and includes summarized counts for
    categories with many options.

    Examples
    --------
    >>> catalog = DataCatalog()
    >>> catalog.print_clip_boundaries()
    Available Boundary Options for Clipping:
    ========================================

    states:
      - AZ, CA, CO, ID, MT
        ... and 6 more options

    """
    try:
        self.list_clip_boundaries()
    except Exception as e:
        logger.error("Error accessing boundary data: %s", e, exc_info=True)
        return

    logger.info("Available Boundary Options for Clipping:")
    logger.info("%s", "=" * 40)
    logger.info("")

    for category, boundary_list in self.available_boundaries.items():
        logger.info("%s:", category)

        # Format the list nicely - wrap long lists
        if len(boundary_list) <= 5:
            # For short lists, show all on one line
            logger.info("  - %s", ", ".join(boundary_list))
        else:
            # For longer lists, show first few and count
            displayed = boundary_list[:5]
            remaining = len(boundary_list) - 5
            logger.info("  - %s", ", ".join(displayed))
            if remaining > 0:
                logger.info("    ... and %d more options", remaining)

reset()

Reset the DataCatalog instance to its initial state.

This method clears any deprecated mutable state and resets the instance to its original state. The catalogs themselves remain loaded and available.

Note: With thread-safe design, there is minimal mutable state to reset. This method is maintained for backward compatibility.

Source code in climakitae/new_core/data_access/data_access.py
def reset(self) -> None:
    """Reset the DataCatalog instance to its initial state.

    This method clears any deprecated mutable state and resets the instance
    to its original state. The catalogs themselves remain loaded and available.

    Note: With thread-safe design, there is minimal mutable state to reset.
    This method is maintained for backward compatibility.

    """
    # Clear deprecated _catalog_key if it exists (backward compatibility)
    if hasattr(self, "_catalog_key"):
        self._catalog_key = None

Boundaries

Lazy-loading geospatial polygon data manager for ClimakitAE.

This class provides efficient access to various boundary datasets stored in S3 parquet catalogs. Data is loaded only when first accessed, improving memory usage and initialization performance. All lookup dictionaries are cached to avoid recomputation.

The class supports geographic subsetting for climate data analysis by providing access to various administrative and utility boundaries in California and the western United States. All data access is optimized for memory efficiency through lazy loading and intelligent caching.

Parameters:

Name Type Description Default
boundary_catalog Catalog

Intake catalog instance for accessing boundary parquet files from S3

required

Attributes:

Name Type Description
_cat Catalog

Reference to the boundary catalog instance used for data access

Properties

_us_states : pd.DataFrame US western states with names, abbreviations, and geometries (lazy-loaded) _ca_counties : pd.DataFrame California counties with names and geometries, sorted alphabetically (lazy-loaded) _ca_watersheds : pd.DataFrame California HUC8 watersheds with names and geometries, sorted alphabetically (lazy-loaded) _ca_utilities : pd.DataFrame California electric utilities (IOUs and POUs) with names and geometries (lazy-loaded) _ca_forecast_zones : pd.DataFrame California electricity demand forecast zones with processed names (lazy-loaded) _ca_electric_balancing_areas : pd.DataFrame Electric balancing authority areas with filtered geometries (lazy-loaded)

Methods:

Name Description
boundary_dict

Return dictionary of all boundary lookup dictionaries for UI population

preload_all

Preload all boundary data for performance-critical scenarios

clear_cache

Clear all cached data and lookup dictionaries to free memory

validate_catalog

Validate that required catalog entries exist and are accessible

get_memory_usage

Get detailed memory usage information for loaded boundary datasets

load

Deprecated method for backward compatibility - use preload_all() instead

Examples:

Basic usage with lazy loading:

>>> import intake
>>> catalog = intake.open_catalog('boundaries.yaml')
>>> boundaries = Boundaries(catalog)
>>>
>>> # Data loads automatically when accessed
>>> counties = boundaries._ca_counties
>>> watersheds = boundaries._ca_watersheds

Getting boundary options for UI components:

>>> boundary_options = boundaries.boundary_dict()
>>> state_options = boundary_options['states']
>>> county_options = boundary_options['CA counties']

Performance optimization:

>>> # Preload all data if you know you'll need it
>>> boundaries.preload_all()
>>>
>>> # Check memory usage
>>> usage = boundaries.get_memory_usage()
>>> print(f"Total memory: {usage['total_human']}")

Memory management:

>>> # Clear cache to free memory
>>> boundaries.clear_cache()
>>>
>>> # Data will be reloaded on next access
>>> counties = boundaries._ca_counties
Notes
  • All boundary data is cached after first access for performance
  • The class automatically validates catalog structure on initialization
  • Processing includes sorting, filtering, and name standardization
  • Memory usage can be monitored and managed through provided methods
  • Western states are ordered according to WESTERN_STATES_LIST constant
  • Utilities are ordered with priority utilities first, then alphabetically

Initialize the Boundaries class with a boundary catalog.

Sets up the lazy-loading infrastructure and validates the catalog structure to ensure all required boundary datasets are available. No data is loaded during initialization - it's loaded on first access.

Parameters:

Name Type Description Default
boundary_catalog Catalog

Intake catalog instance for accessing boundary parquet files. Must contain entries for: 'states', 'counties', 'huc8', 'utilities', 'dfz', and 'eba'.

required

Raises:

Type Description
ValueError

If the catalog is missing required entries

Examples:

>>> import intake
>>> catalog = intake.open_catalog('s3://bucket/boundaries.yaml')
>>> boundaries = Boundaries(catalog)
Source code in climakitae/new_core/data_access/boundaries.py
def __init__(self, boundary_catalog: intake.catalog.Catalog):
    """Initialize the Boundaries class with a boundary catalog.

    Sets up the lazy-loading infrastructure and validates the catalog
    structure to ensure all required boundary datasets are available.
    No data is loaded during initialization - it's loaded on first access.

    Parameters
    ----------
    boundary_catalog : intake.catalog.Catalog
        Intake catalog instance for accessing boundary parquet files.
        Must contain entries for: 'states', 'counties', 'huc8',
        'utilities', 'dfz', and 'eba'.

    Raises
    ------
    ValueError
        If the catalog is missing required entries

    Examples
    --------
    >>> import intake
    >>> catalog = intake.open_catalog('s3://bucket/boundaries.yaml')
    >>> boundaries = Boundaries(catalog)

    """
    self._cat = boundary_catalog

    # Private storage for lazy-loaded DataFrames
    self.__us_states: Optional[pd.DataFrame] = None
    self.__ca_counties: Optional[pd.DataFrame] = None
    self.__ca_watersheds: Optional[pd.DataFrame] = None
    self.__ca_utilities: Optional[pd.DataFrame] = None
    self.__ca_forecast_zones: Optional[pd.DataFrame] = None
    self.__ca_electric_balancing_areas: Optional[pd.DataFrame] = None
    self.__ca_census_tracts: Optional[pd.DataFrame] = None

    # Cache for lookup dictionaries
    self._lookup_cache: Dict[str, Dict[str, int]] = {}

    # Validate catalog on initialization
    self.validate_catalog()

validate_catalog()

Validate that required catalog entries exist and are accessible.

Checks for the presence of all required boundary datasets in the catalog. This ensures that the boundary data can be loaded when requested by the user.

Raises:

Type Description
ValueError

If any required catalog entries are missing. The error message will list all missing entries.

Notes

Required catalog entries: - 'states': US state boundaries - 'counties': California county boundaries - 'huc8': California watershed boundaries (HUC8 level) - 'utilities': California electric utility boundaries - 'dfz': California demand forecast zones - 'eba': Electric balancing authority areas

Source code in climakitae/new_core/data_access/boundaries.py
def validate_catalog(self) -> None:
    """Validate that required catalog entries exist and are accessible.

    Checks for the presence of all required boundary datasets in the
    catalog. This ensures that the boundary data can be loaded when
    requested by the user.

    Raises
    ------
    ValueError
        If any required catalog entries are missing. The error message
        will list all missing entries.

    Notes
    -----
    Required catalog entries:
    - 'states': US state boundaries
    - 'counties': California county boundaries
    - 'huc8': California watershed boundaries (HUC8 level)
    - 'utilities': California electric utility boundaries
    - 'dfz': California demand forecast zones
    - 'eba': Electric balancing authority areas
    """
    required_entries = ["states", "counties", "huc8", "utilities", "dfz", "eba"]
    missing = [entry for entry in required_entries if not hasattr(self._cat, entry)]
    if missing:
        raise ValueError(f"Missing required catalog entries: {missing}")

boundary_dict()

Return dictionary of all boundary lookup dictionaries for UI population.

Creates a comprehensive dictionary of all available boundary datasets with their corresponding lookup dictionaries. This is primarily used to populate user interface components that allow boundary selection for geographic subsetting of climate data.

The returned dictionary maps boundary category names to lookup dictionaries that map specific boundary names to their DataFrame indices. This enables efficient boundary selection and data subsetting operations.

Returns:

Type Description
Dict[str, Dict[str, int]]

Nested dictionary structure: - Outer keys: boundary category names (e.g., 'states', 'CA counties') - Inner dictionaries: map boundary names to DataFrame indices

Available categories: - 'none': No geographic subsetting - 'lat/lon': Custom coordinate-based selection - 'states': Western US states - 'CA counties': California counties (alphabetical) - 'CA watersheds': California HUC8 watersheds (alphabetical) - 'CA Electric Load Serving Entities (IOU & POU)': Electric utilities - 'CA Electricity Demand Forecast Zones': Forecast zones - 'CA Electric Balancing Authority Areas': Balancing areas

Examples:

>>> boundaries = Boundaries(catalog)
>>> boundary_options = boundaries.boundary_dict()
>>>
>>> # Get available states
>>> states = boundary_options['states']
>>> print(states.keys())  # ['CA', 'OR', 'WA', ...]
>>>
>>> # Get available counties
>>> counties = boundary_options['CA counties']
>>> alameda_idx = counties['Alameda']
>>>
>>> # Use in UI dropdown population
>>> for category, options in boundary_options.items():
>>>     populate_dropdown(category, options.keys())
Notes
  • Lookup dictionaries are cached for performance
  • Western states follow ordering in WESTERN_STATES_LIST
  • Utilities are ordered with priority utilities first
  • All other boundaries are sorted alphabetically
Source code in climakitae/new_core/data_access/boundaries.py
def boundary_dict(self) -> Dict[str, Dict[str, int]]:
    """Return dictionary of all boundary lookup dictionaries for UI population.

    Creates a comprehensive dictionary of all available boundary datasets
    with their corresponding lookup dictionaries. This is primarily used
    to populate user interface components that allow boundary selection
    for geographic subsetting of climate data.

    The returned dictionary maps boundary category names to lookup dictionaries
    that map specific boundary names to their DataFrame indices. This enables
    efficient boundary selection and data subsetting operations.

    Returns
    -------
    Dict[str, Dict[str, int]]
        Nested dictionary structure:
        - Outer keys: boundary category names (e.g., 'states', 'CA counties')
        - Inner dictionaries: map boundary names to DataFrame indices

        Available categories:
        - 'none': No geographic subsetting
        - 'lat/lon': Custom coordinate-based selection
        - 'states': Western US states
        - 'CA counties': California counties (alphabetical)
        - 'CA watersheds': California HUC8 watersheds (alphabetical)
        - 'CA Electric Load Serving Entities (IOU & POU)': Electric utilities
        - 'CA Electricity Demand Forecast Zones': Forecast zones
        - 'CA Electric Balancing Authority Areas': Balancing areas

    Examples
    --------
    >>> boundaries = Boundaries(catalog)
    >>> boundary_options = boundaries.boundary_dict()
    >>>
    >>> # Get available states
    >>> states = boundary_options['states']
    >>> print(states.keys())  # ['CA', 'OR', 'WA', ...]
    >>>
    >>> # Get available counties
    >>> counties = boundary_options['CA counties']
    >>> alameda_idx = counties['Alameda']
    >>>
    >>> # Use in UI dropdown population
    >>> for category, options in boundary_options.items():
    >>>     populate_dropdown(category, options.keys())

    Notes
    -----
    - Lookup dictionaries are cached for performance
    - Western states follow ordering in WESTERN_STATES_LIST
    - Utilities are ordered with priority utilities first
    - All other boundaries are sorted alphabetically

    """
    return {
        "none": {"entire domain": 0},
        "lat/lon": {"coordinate selection": 0},
        "states": self._get_us_states(),
        "CA counties": self._get_ca_counties(),
        "CA watersheds": self._get_ca_watersheds(),
        "CA Electric Load Serving Entities (IOU & POU)": self._get_ious_pous(),
        "CA Electricity Demand Forecast Zones": self._get_forecast_zones(),
        "CA Electric Balancing Authority Areas": self._get_electric_balancing_areas(),
        "CA Census Tracts": self._get_ca_census_tracts(),
    }

load()

Preload all boundary data (deprecated - data loads automatically when accessed).

This method is kept for backward compatibility. Data now loads automatically when first accessed through the property system.

Source code in climakitae/new_core/data_access/boundaries.py
def load(self) -> None:
    """Preload all boundary data (deprecated - data loads automatically when accessed).

    This method is kept for backward compatibility. Data now loads automatically
    when first accessed through the property system.

    Deprecated
    ----------
    1.5.0
        Use ``preload_all()`` instead for explicit preloading, or simply
        access data normally for automatic lazy loading.

    """
    logger.warning(
        "The load() method is deprecated. Data now loads automatically when accessed. "
        "Use preload_all() for explicit preloading."
    )
    self.preload_all()

preload_all()

Preload all boundary data for performance-critical scenarios.

Forces immediate loading of all boundary datasets and builds all lookup caches. This eliminates lazy loading delays for subsequent data access operations, making it ideal for performance-critical scenarios or when you know all boundary data will be needed.

The method loads all six boundary datasets: - US western states - California counties - California watersheds - California utilities - California forecast zones - California electric balancing areas

And builds all corresponding lookup dictionaries for fast boundary selection operations.

Examples:

>>> boundaries = Boundaries(catalog)
>>>
>>> # Preload for performance-critical batch processing
>>> boundaries.preload_all()
>>>
>>> # All subsequent access is now immediate
>>> for county in boundaries._ca_counties.itertuples():
>>>     process_county_data(county)
Notes
  • Increases initial memory usage but eliminates loading delays
  • Useful for batch processing or repeated boundary access
  • Data remains cached until clear_cache() is called
  • Memory usage can be monitored with get_memory_usage()
Source code in climakitae/new_core/data_access/boundaries.py
def preload_all(self) -> None:
    """Preload all boundary data for performance-critical scenarios.

    Forces immediate loading of all boundary datasets and builds all
    lookup caches. This eliminates lazy loading delays for subsequent
    data access operations, making it ideal for performance-critical
    scenarios or when you know all boundary data will be needed.

    The method loads all six boundary datasets:
    - US western states
    - California counties
    - California watersheds
    - California utilities
    - California forecast zones
    - California electric balancing areas

    And builds all corresponding lookup dictionaries for fast boundary
    selection operations.

    Examples
    --------
    >>> boundaries = Boundaries(catalog)
    >>>
    >>> # Preload for performance-critical batch processing
    >>> boundaries.preload_all()
    >>>
    >>> # All subsequent access is now immediate
    >>> for county in boundaries._ca_counties.itertuples():
    >>>     process_county_data(county)

    Notes
    -----
    - Increases initial memory usage but eliminates loading delays
    - Useful for batch processing or repeated boundary access
    - Data remains cached until clear_cache() is called
    - Memory usage can be monitored with get_memory_usage()

    """
    # Force loading of all properties
    _ = (
        self._us_states,
        self._ca_counties,
        self._ca_watersheds,
        self._ca_utilities,
        self._ca_forecast_zones,
        self._ca_electric_balancing_areas,
        self._ca_census_tracts,
    )

    # Build all lookup caches
    _ = (
        self._get_us_states(),
        self._get_ca_counties(),
        self._get_ca_watersheds(),
        self._get_forecast_zones(),
        self._get_ious_pous(),
        self._get_electric_balancing_areas(),
        self._get_ca_census_tracts(),
    )

clear_cache()

Clear all cached data and lookup dictionaries to free memory.

Removes all loaded boundary DataFrames and lookup dictionaries from memory, returning the Boundaries instance to its initial state. Data will be reloaded on next access through the lazy loading mechanism.

This is useful for: - Memory management in long-running applications - Forcing fresh data loads after catalog updates - Resetting state during testing or debugging

Examples:

>>> boundaries = Boundaries(catalog)
>>> boundaries.preload_all()
>>> usage_before = boundaries.get_memory_usage()
>>> print(f"Memory before: {usage_before['total_human']}")
>>>
>>> boundaries.clear_cache()
>>> usage_after = boundaries.get_memory_usage()
>>> print(f"Memory after: {usage_after['total_human']}")  # Much lower
>>>
>>> # Data loads again on next access
>>> counties = boundaries._ca_counties  # Triggers reload
Notes
  • All subsequent data access will trigger fresh loads from catalog
  • Lookup dictionaries will be rebuilt as needed
  • Does not affect the underlying catalog or data sources
  • Memory savings are immediate and substantial for loaded datasets
Source code in climakitae/new_core/data_access/boundaries.py
def clear_cache(self) -> None:
    """Clear all cached data and lookup dictionaries to free memory.

    Removes all loaded boundary DataFrames and lookup dictionaries from
    memory, returning the Boundaries instance to its initial state. Data
    will be reloaded on next access through the lazy loading mechanism.

    This is useful for:
    - Memory management in long-running applications
    - Forcing fresh data loads after catalog updates
    - Resetting state during testing or debugging

    Examples
    --------
    >>> boundaries = Boundaries(catalog)
    >>> boundaries.preload_all()
    >>> usage_before = boundaries.get_memory_usage()
    >>> print(f"Memory before: {usage_before['total_human']}")
    >>>
    >>> boundaries.clear_cache()
    >>> usage_after = boundaries.get_memory_usage()
    >>> print(f"Memory after: {usage_after['total_human']}")  # Much lower
    >>>
    >>> # Data loads again on next access
    >>> counties = boundaries._ca_counties  # Triggers reload

    Notes
    -----
    - All subsequent data access will trigger fresh loads from catalog
    - Lookup dictionaries will be rebuilt as needed
    - Does not affect the underlying catalog or data sources
    - Memory savings are immediate and substantial for loaded datasets

    """
    # Clear raw DataFrames
    self.__us_states = None
    self.__ca_counties = None
    self.__ca_watersheds = None
    self.__ca_utilities = None
    self.__ca_forecast_zones = None
    self.__ca_electric_balancing_areas = None
    self.__ca_census_tracts = None

    # Clear lookup cache
    self._lookup_cache.clear()

get_memory_usage()

Get detailed memory usage information for loaded boundary datasets.

Analyzes memory consumption of all loaded boundary DataFrames and provides both detailed per-dataset usage and summary statistics. Useful for memory monitoring and optimization decisions.

Returns:

Type Description
Dict[str, Union[int, str]]

Comprehensive memory usage information:

Per-dataset usage (bytes): - 'us_states': Memory used by US states DataFrame (0 if not loaded) - 'ca_counties': Memory used by CA counties DataFrame (0 if not loaded) - 'ca_watersheds': Memory used by CA watersheds DataFrame (0 if not loaded) - 'ca_utilities': Memory used by CA utilities DataFrame (0 if not loaded) - 'ca_forecast_zones': Memory used by forecast zones DataFrame (0 if not loaded) - 'ca_electric_balancing_areas': Memory used by balancing areas DataFrame (0 if not loaded)

Summary statistics: - 'total_bytes': Total memory usage in bytes - 'total_human': Human-readable total memory usage (e.g., '15.2 MB') - 'loaded_datasets': Count of currently loaded datasets - 'cached_lookups': Count of cached lookup dictionaries

Examples:

>>> boundaries = Boundaries(catalog)
>>> boundaries.preload_all()
>>> usage = boundaries.get_memory_usage()
>>>
>>> print(f"Total memory: {usage['total_human']}")
>>> print(f"Loaded datasets: {usage['loaded_datasets']}/6")
>>> print(f"Largest dataset: {max(usage['us_states'], usage['ca_counties'])}")
>>>
>>> # Check if specific dataset is loaded
>>> if usage['ca_counties'] > 0:
>>>     print("Counties data is loaded")
>>> # Monitor memory before/after operations
>>> usage_before = boundaries.get_memory_usage()
>>> boundaries.clear_cache()
>>> usage_after = boundaries.get_memory_usage()
>>> saved = usage_before['total_bytes'] - usage_after['total_bytes']
>>> print(f"Memory freed: {boundaries._format_bytes(saved)}")
Notes
  • Memory usage includes deep analysis of DataFrame contents
  • Unloaded datasets report 0 bytes usage
  • Lookup dictionary cache usage is counted separately
  • Total includes all loaded DataFrames but not lookup dictionaries
Source code in climakitae/new_core/data_access/boundaries.py
def get_memory_usage(self) -> Dict[str, Union[int, str]]:
    """Get detailed memory usage information for loaded boundary datasets.

    Analyzes memory consumption of all loaded boundary DataFrames and
    provides both detailed per-dataset usage and summary statistics.
    Useful for memory monitoring and optimization decisions.

    Returns
    -------
    Dict[str, Union[int, str]]
        Comprehensive memory usage information:

        Per-dataset usage (bytes):
        - 'us_states': Memory used by US states DataFrame (0 if not loaded)
        - 'ca_counties': Memory used by CA counties DataFrame (0 if not loaded)
        - 'ca_watersheds': Memory used by CA watersheds DataFrame (0 if not loaded)
        - 'ca_utilities': Memory used by CA utilities DataFrame (0 if not loaded)
        - 'ca_forecast_zones': Memory used by forecast zones DataFrame (0 if not loaded)
        - 'ca_electric_balancing_areas': Memory used by balancing areas DataFrame (0 if not loaded)

        Summary statistics:
        - 'total_bytes': Total memory usage in bytes
        - 'total_human': Human-readable total memory usage (e.g., '15.2 MB')
        - 'loaded_datasets': Count of currently loaded datasets
        - 'cached_lookups': Count of cached lookup dictionaries

    Examples
    --------
    >>> boundaries = Boundaries(catalog)
    >>> boundaries.preload_all()
    >>> usage = boundaries.get_memory_usage()
    >>>
    >>> print(f"Total memory: {usage['total_human']}")
    >>> print(f"Loaded datasets: {usage['loaded_datasets']}/6")
    >>> print(f"Largest dataset: {max(usage['us_states'], usage['ca_counties'])}")
    >>>
    >>> # Check if specific dataset is loaded
    >>> if usage['ca_counties'] > 0:
    >>>     print("Counties data is loaded")

    >>> # Monitor memory before/after operations
    >>> usage_before = boundaries.get_memory_usage()
    >>> boundaries.clear_cache()
    >>> usage_after = boundaries.get_memory_usage()
    >>> saved = usage_before['total_bytes'] - usage_after['total_bytes']
    >>> print(f"Memory freed: {boundaries._format_bytes(saved)}")

    Notes
    -----
    - Memory usage includes deep analysis of DataFrame contents
    - Unloaded datasets report 0 bytes usage
    - Lookup dictionary cache usage is counted separately
    - Total includes all loaded DataFrames but not lookup dictionaries

    """
    usage = {}
    total_bytes = 0

    datasets = {
        "us_states": self.__us_states,
        "ca_counties": self.__ca_counties,
        "ca_watersheds": self.__ca_watersheds,
        "ca_utilities": self.__ca_utilities,
        "ca_forecast_zones": self.__ca_forecast_zones,
        "ca_electric_balancing_areas": self.__ca_electric_balancing_areas,
        "ca_census_tracts": self.__ca_census_tracts,
    }

    for name, df in datasets.items():
        if df is not None:
            memory_bytes = df.memory_usage(deep=True).sum()
            usage[name] = memory_bytes
            total_bytes += memory_bytes
        else:
            usage[name] = 0

    usage["total_bytes"] = total_bytes
    usage["total_human"] = self._format_bytes(total_bytes)
    usage["loaded_datasets"] = len(
        [df for df in datasets.values() if df is not None]
    )
    usage["cached_lookups"] = len(self._lookup_cache)

    return usage