Skip to content

Legacy Data Loading

The climakitae.core.data_load module holds the internal helpers that assemble and load legacy datasets behind get_data() and DataParameters.retrieve().

Warning

This page documents a legacy support module. It is kept for backward compatibility. New code should use climakitae.new_core.user_interface.ClimateData.

What this module does

  • Builds the intake-esm query from a populated DataParameters object.
  • Streams the matching datasets from S3 as lazily loaded xarray objects.
  • Applies the unit, area-average, and concatenation steps the legacy pipeline expects before returning data to the caller.

These functions are rarely called directly; they are the machinery behind the legacy Data Interface.


Public API

load(xr_da, progress_bar=False)

Read lazily loaded dask array into memory for faster access

Parameters:

Name Type Description Default
xr_da DataArray
required
progress_bar boolean
False

Returns:

Name Type Description
da_computed DataArray
Source code in climakitae/core/data_load.py
def load(xr_da: xr.DataArray, progress_bar: bool = False) -> xr.DataArray:
    """Read lazily loaded dask array into memory for faster access

    Parameters
    ----------
    xr_da : xr.DataArray
    progress_bar : boolean

    Returns
    -------
    da_computed : xr.DataArray

    """

    # Check if data is already loaded into memory
    if xr_da.chunks is None:
        print("Your data is already loaded into memory")
        return xr_da

    # Get memory information
    avail_mem = psutil.virtual_memory().available  # Available system memory
    xr_data_nbytes = xr_da.nbytes  # Memory of data

    # If it will cause the system to have less than 256MB after loading the data, do not allow the compute to proceed.
    if avail_mem - xr_data_nbytes < 268435456:
        print("Available memory: {0}".format(readable_bytes(avail_mem)))
        print("Total memory of input data: {0}".format(readable_bytes(xr_data_nbytes)))
        warnings.warn(
            "Your input dataset may be too large to read into memory!",
            UserWarning,
            stacklevel=999,
        )
        # take user input on continuing
        proceed = input(
            "If you continue, your system may become unresponsive. Do you want to proceed? (y/n): "
        )
        if proceed.lower() != "y":
            raise MemoryError("Process aborted by user.")
    else:
        print(
            "Processing data to read {0} of data into memory... ".format(
                readable_bytes(xr_data_nbytes)
            ),
            end="",
        )
        if progress_bar:
            with ProgressBar():
                print("\r")
                da_computed = xr_da.compute()
        else:
            da_computed = xr_da.compute()
        print("Complete!")
        return da_computed

area_subset_geometry(selections)

Get geometry to perform area subsetting with.

Parameters:

Name Type Description Default
selections DataParameters

object holding user's selections

required

Returns:

Name Type Description
ds_region geometry

geometry to use for subsetting

Source code in climakitae/core/data_load.py
def area_subset_geometry(
    selections: "DataParameters",
) -> list[shapely.geometry.polygon.Polygon] | None:
    """Get geometry to perform area subsetting with.

    Parameters
    ----------
    selections : DataParameters
        object holding user's selections

    Returns
    -------
    ds_region : shapely.geometry
        geometry to use for subsetting

    """

    def _override_area_selections(selections: "DataParameters") -> tuple[str, str]:
        """Account for 'station' special-case
        You need to retrieve the entire domain because the shapefiles will cut out
        the ocean grid cells, but the some station's closest gridcells are the ocean!

        Parameters
        ----------
        selections : DataParameters
            object holding user's selections

        Returns
        -------
        area_subset : str
        cached_area : str

        """
        if selections.data_type == "Stations":
            area_subset = "none"
            cached_area = "entire domain"
        else:
            area_subset = selections.area_subset
            cached_area = selections.cached_area

        return area_subset, cached_area

    def _set_subarea(
        boundary_dataset: Boundaries, shape_indices: list[int]
    ) -> GeoDataFrame:
        return boundary_dataset.loc[shape_indices].geometry.union_all()

    def _get_as_shapely(selections: "DataParameters") -> shapely.geometry:
        """Takes the location data, and turns it into a
        shapely box object. Just doing polygons for now. Later other point/station data
        will be available too.

        Parameters
        ----------
        selections : DataParameters
            Data settings (variable, unit, timescale, etc)

        Returns
        -------
        shapely_geom : shapely.geometry

        """
        # Box is formed using the following shape:
        #   shapely.geometry.box(minx, miny, maxx, maxy)
        shapely_geom = box(
            selections.longitude[0],  # minx
            selections.latitude[0],  # miny
            selections.longitude[1],  # maxx
            selections.latitude[1],  # maxy
        )
        return shapely_geom

    area_subset, cached_area = _override_area_selections(selections)

    def _get_shape_indices(
        selections: "DataParameters", area_subset: str, cached_area: str
    ) -> list:
        """Gets the indices of the Boundary parquet file that match the area_subet and cached_area.

        Parameters
        ----------
        selections : DataParameters
            Data settings (variable, unit, timescale, etc)

        area_subset : str
            dataset to use from Boundaries for sub area selection

        cached_area : list of strs
            one or more features from area_subset datasets to use for selection

        Returns
        -------
        list

        """
        shape_indices = list(
            {
                key: selections._geography_choose[area_subset][key]
                for key in cached_area
            }.values()
        )
        return shape_indices

    match area_subset:
        case "lat/lon":
            geom = _get_as_shapely(selections)
            if not geom.is_valid:
                raise ValueError(
                    "Please go back to 'select' and choose" + " a valid lat/lon range."
                )
            ds_region = [geom]
        case "states":
            ds_region = [
                _set_subarea(
                    selections._geographies._us_states,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA counties":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_counties,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA watersheds":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_watersheds,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA Electric Load Serving Entities (IOU & POU)":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_utilities,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA Electricity Demand Forecast Zones":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_forecast_zones,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA Electric Balancing Authority Areas":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_electric_balancing_areas,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case _:
            ds_region = None
    return ds_region

read_catalog_from_select(selections)

The primary and first data loading method, called by DataParameters.retrieve, it returns a DataArray (which can be quite large) containing everything requested by the user (which is stored in 'selections').

Parameters:

Name Type Description Default
selections DataParameters

object holding user's selections

required

Returns:

Name Type Description
da DataArray

output data

Source code in climakitae/core/data_load.py
def read_catalog_from_select(selections: "DataParameters") -> xr.DataArray:
    """The primary and first data loading method, called by
    DataParameters.retrieve, it returns a DataArray (which can be quite large)
    containing everything requested by the user (which is stored in 'selections').

    Parameters
    ----------
    selections : DataParameters
        object holding user's selections

    Returns
    -------
    da : xr.DataArray
        output data

    """

    if selections.approach == "Warming Level":
        selections.time_slice = (1950, 2100)  # Retrieve entire time period

    # Raise appropriate errors for time-based retrieval
    if selections.approach == "Time":
        if (selections.scenario_ssp != []) and (
            "Historical Reconstruction" in selections.scenario_historical
        ):
            raise ValueError(
                "Historical Reconstruction data is not available with SSP data. Please modify your selections and try again."
            )

        # Validate unit selection
        # Returns None if units are valid, raises error if not
        _check_valid_unit_selection(selections)

        # Raise error if no scenarios are selected
        scenario_selections = selections.scenario_ssp + selections.scenario_historical
        if scenario_selections == []:
            raise ValueError("Please select as least one dataset.")

    # Raise error if station data selected, but no station is selected
    if (selections.data_type == "Stations") and (
        selections.stations in [[], ["No stations available at this location"]]
    ):
        raise ValueError(
            "Please select at least one weather station, or retrieve gridded data."
        )

    # For station data, need to expand time slice to ensure the historical period is included
    # At the end, the data will be cut back down to the user's original selection
    if selections.data_type == "Stations":
        original_time_slice = selections.time_slice  # Preserve original user selections
        original_scenario_historical = selections.scenario_historical.copy()
        if "Historical Climate" not in selections.scenario_historical:
            selections.scenario_historical.append("Historical Climate")
        obs_data_bounds = (
            1980,
            2014,
        )  # Bounds of the observational data used in bias-correction
        if original_time_slice[0] > obs_data_bounds[0]:
            selections.time_slice = (obs_data_bounds[0], original_time_slice[1])
        if original_time_slice[1] < obs_data_bounds[1]:
            selections.time_slice = (selections.time_slice[0], obs_data_bounds[1])

    ## ------ Deal with derived variables ------
    orig_var_id_selection = selections.variable_id[0]
    orig_unit_selection = selections.units
    orig_variable_selection = selections.variable

    # Get data attributes beforehand since selections is modified
    data_attrs = _get_data_attributes(selections)
    if "_derived" in orig_var_id_selection:
        match orig_var_id_selection:
            case "wind_speed_derived":  # Hourly
                da = _get_wind_speed_derived(selections)
            case "wind_direction_derived":  # Hourly
                da = _get_wind_dir_derived(selections)
            case "dew_point_derived":  # Monthly/daily
                da = _get_monthly_daily_dewpoint(selections)
            case "dew_point_derived_hrly":  # Hourly
                da = _get_hourly_dewpoint(selections)
            case "rh_derived":  # Hourly
                da = _get_hourly_rh(selections)
            case "q2_derived":  # Hourly
                da = _get_hourly_specific_humidity(selections)
            case "fosberg_index_derived":  # Hourly
                da = _get_fosberg_fire_index(selections)
            case "noaa_heat_index_derived":  # Hourly
                da = _get_noaa_heat_index(selections)
            case "effective_temp_index_derived":
                da = _get_eff_temp(selections)
            case _:  # none of the above
                raise ValueError(
                    "You've encountered a bug. No data available for selected derived variable."
                )

        # ------ Set attributes ------
        # Convert units before copying data attributes
        da = convert_units(da, selected_units=orig_unit_selection)
        da.name = orig_variable_selection  # Set name of DataArray

        # Reset selections to user's original selections
        selections.variable_id = [orig_var_id_selection]
        selections.units = orig_unit_selection

        # Some of the derived variables may be constructed from data that comes from the same institution
        # The dev team hasn't looked into this yet -- opportunity for future improvement
        data_attrs = data_attrs | {"institution": "Multiple"}
        da.attrs = data_attrs

    # Rotate wind vectors
    elif (
        any(x in selections.variable_id for x in ["u10", "v10"])
        and selections.downscaling_method == "Dynamical"
    ):
        if "u10" in selections.variable_id:
            da = _get_Uearth(selections)
        elif "v10" in selections.variable_id:
            da = _get_Vearth(selections)

    # Any other variable... i.e. not an index, derived var, or a WRF wind vector
    else:
        da = _get_data_one_var(selections)

    # Assure that CRS and grid_mapping are in place for all data returned
    if (selections.downscaling_method == "Dynamical") and (
        "Lambert_Conformal" in da.coords
    ):
        da.attrs = da.attrs | {"grid_mapping": "Lambert_Conformal"}
    elif selections.downscaling_method in ["Statistical", "Dynamical+Statistical"]:
        da = da.rio.write_crs("epsg:4326", inplace=True)

    if selections.data_type == "Stations":
        # Bias-correct the station data
        # Preserve attributes from the gridded data (e.g. `location_subset`) which
        # can be lost during the station bias-correction step. Capture them here
        # and re-attach after `_station_apply`.
        try:
            gridded_attrs = dict(da.attrs) if hasattr(da, "attrs") else {}
        except Exception:
            gridded_attrs = {}

        da = _station_apply(selections, da, original_time_slice)

        # Re-attach gridded attributes onto each station variable if they are missing.
        # Do not overwrite any existing station-specific attributes.
        try:
            if isinstance(da, xr.Dataset):
                for var in da.data_vars:
                    for k, v in gridded_attrs.items():
                        if k not in da[var].attrs:
                            da[var].attrs[k] = v
            elif isinstance(da, xr.DataArray):
                for k, v in gridded_attrs.items():
                    if k not in da.attrs:
                        da.attrs[k] = v
        except Exception:
            # If anything goes wrong attaching attributes, proceed without failing
            # the entire retrieval - attribute preservation is best-effort.
            pass

        # Ensure station-specific metadata exists for each returned station variable.
        # Some mapping/execution paths can drop attributes added in the inner
        # bias-correction function; reconstruct them from the station GeoDataFrame
        # when missing so tests and callers can rely on their presence.
        try:
            if isinstance(da, xr.Dataset):
                for var in da.data_vars:
                    attrs = da[var].attrs
                    # Lookup corresponding row in stations GeoDataFrame
                    try:
                        st_row = selections._stations_gdf.loc[
                            selections._stations_gdf["station"] == var
                        ].iloc[0]
                    except Exception:
                        st_row = None

                    # Station coordinates
                    if "station_coordinates" not in attrs:
                        try:
                            if st_row is not None:
                                lat = (
                                    st_row["LAT_Y"]
                                    if "LAT_Y" in st_row
                                    else st_row.get("latitude", None)
                                )
                                lon = (
                                    st_row["LON_X"]
                                    if "LON_X" in st_row
                                    else st_row.get("longitude", None)
                                )
                                if lat is not None and lon is not None:
                                    da[var].attrs["station_coordinates"] = (
                                        float(lat),
                                        float(lon),
                                    )
                        except Exception:
                            pass

                    # Station elevation
                    if "station_elevation" not in attrs:
                        try:
                            if st_row is not None and "elevation" in st_row:
                                elev = st_row["elevation"]
                                # Keep a human-readable string similar to preprocessing
                                da[var].attrs["station_elevation"] = f"{elev} meters"
                        except Exception:
                            pass

                    # Bias adjustment descriptor
                    if "bias_adjustment" not in attrs:
                        try:
                            # best-effort human-readable descriptor
                            da[var].attrs[
                                "bias_adjustment"
                            ] = "QuantileDeltaMapping.adjust(sim, )"
                        except Exception:
                            pass
        except Exception:
            # Best-effort: don't fail retrieval for metadata reconstruction issues
            pass

        # Reset original selections
        if "Historical Climate" not in original_scenario_historical:
            selections.scenario_historical.remove("Historical Climate")
            try:
                da["scenario"] = [
                    x.split("Historical + ")[1] for x in da.scenario.values
                ]
            except Exception:
                # best-effort: if the scenario coordinate isn't present or in
                # unexpected format, ignore and continue
                pass
        selections.time_slice = original_time_slice

    if selections.approach == "Warming Level":
        # Process data object using warming levels approach
        # Dimensions and coordinates will change
        # See function documentation for more information
        da = _apply_warming_levels_approach(da, selections)

        # Reset original selections
        selections.scenario_ssp = ["n/a"]
        selections.scenario_historical = ["n/a"]

    return da