Skip to content

Legacy Core Module

The legacy climakitae.core module provides the original function-based interface for climate data access. It is maintained for backward compatibility; new code should use climakitae.new_core.user_interface.ClimateData instead. See Legacy API status for the deprecation timeline.

Submodule reference

The legacy core is split across several modules. Each has its own dedicated, auto-generated reference page:

Module What's in it Reference
climakitae.core.data_interface DataParameters (param-based query class) and get_data (top-level entrypoint) Data Interface (Detailed)
climakitae.core.boundaries Boundaries singleton — counties, watersheds, utilities, etc. Boundaries (Detailed)
climakitae.core.data_load Internal data-loading helpers used by get_data rendered inline below
climakitae.core.data_export Multi-format export (NetCDF, CSV, Zarr, GeoTIFF) rendered inline below
climakitae.core.constants UNSET, WARMING_LEVELS, SSPS, _NEW_ATTRS_KEY, model lists rendered inline below
climakitae.core.paths S3 catalog URLs and file path constants rendered inline below

Data Loading

load(xr_da, progress_bar=False)

Read lazily loaded dask array into memory for faster access

Parameters:

Name Type Description Default
xr_da DataArray
required
progress_bar boolean
False

Returns:

Name Type Description
da_computed DataArray
Source code in climakitae/core/data_load.py
def load(xr_da: xr.DataArray, progress_bar: bool = False) -> xr.DataArray:
    """Read lazily loaded dask array into memory for faster access

    Parameters
    ----------
    xr_da : xr.DataArray
    progress_bar : boolean

    Returns
    -------
    da_computed : xr.DataArray

    """

    # Check if data is already loaded into memory
    if xr_da.chunks is None:
        print("Your data is already loaded into memory")
        return xr_da

    # Get memory information
    avail_mem = psutil.virtual_memory().available  # Available system memory
    xr_data_nbytes = xr_da.nbytes  # Memory of data

    # If it will cause the system to have less than 256MB after loading the data, do not allow the compute to proceed.
    if avail_mem - xr_data_nbytes < 268435456:
        print("Available memory: {0}".format(readable_bytes(avail_mem)))
        print("Total memory of input data: {0}".format(readable_bytes(xr_data_nbytes)))
        warnings.warn(
            "Your input dataset may be too large to read into memory!",
            UserWarning,
            stacklevel=999,
        )
        # take user input on continuing
        proceed = input(
            "If you continue, your system may become unresponsive. Do you want to proceed? (y/n): "
        )
        if proceed.lower() != "y":
            raise MemoryError("Process aborted by user.")
    else:
        print(
            "Processing data to read {0} of data into memory... ".format(
                readable_bytes(xr_data_nbytes)
            ),
            end="",
        )
        if progress_bar:
            with ProgressBar():
                print("\r")
                da_computed = xr_da.compute()
        else:
            da_computed = xr_da.compute()
        print("Complete!")
        return da_computed

area_subset_geometry(selections)

Get geometry to perform area subsetting with.

Parameters:

Name Type Description Default
selections DataParameters

object holding user's selections

required

Returns:

Name Type Description
ds_region geometry

geometry to use for subsetting

Source code in climakitae/core/data_load.py
def area_subset_geometry(
    selections: "DataParameters",
) -> list[shapely.geometry.polygon.Polygon] | None:
    """Get geometry to perform area subsetting with.

    Parameters
    ----------
    selections : DataParameters
        object holding user's selections

    Returns
    -------
    ds_region : shapely.geometry
        geometry to use for subsetting

    """

    def _override_area_selections(selections: "DataParameters") -> tuple[str, str]:
        """Account for 'station' special-case
        You need to retrieve the entire domain because the shapefiles will cut out
        the ocean grid cells, but the some station's closest gridcells are the ocean!

        Parameters
        ----------
        selections : DataParameters
            object holding user's selections

        Returns
        -------
        area_subset : str
        cached_area : str

        """
        if selections.data_type == "Stations":
            area_subset = "none"
            cached_area = "entire domain"
        else:
            area_subset = selections.area_subset
            cached_area = selections.cached_area

        return area_subset, cached_area

    def _set_subarea(
        boundary_dataset: Boundaries, shape_indices: list[int]
    ) -> GeoDataFrame:
        return boundary_dataset.loc[shape_indices].geometry.union_all()

    def _get_as_shapely(selections: "DataParameters") -> shapely.geometry:
        """Takes the location data, and turns it into a
        shapely box object. Just doing polygons for now. Later other point/station data
        will be available too.

        Parameters
        ----------
        selections : DataParameters
            Data settings (variable, unit, timescale, etc)

        Returns
        -------
        shapely_geom : shapely.geometry

        """
        # Box is formed using the following shape:
        #   shapely.geometry.box(minx, miny, maxx, maxy)
        shapely_geom = box(
            selections.longitude[0],  # minx
            selections.latitude[0],  # miny
            selections.longitude[1],  # maxx
            selections.latitude[1],  # maxy
        )
        return shapely_geom

    area_subset, cached_area = _override_area_selections(selections)

    def _get_shape_indices(
        selections: "DataParameters", area_subset: str, cached_area: str
    ) -> list:
        """Gets the indices of the Boundary parquet file that match the area_subet and cached_area.

        Parameters
        ----------
        selections : DataParameters
            Data settings (variable, unit, timescale, etc)

        area_subset : str
            dataset to use from Boundaries for sub area selection

        cached_area : list of strs
            one or more features from area_subset datasets to use for selection

        Returns
        -------
        list

        """
        shape_indices = list(
            {
                key: selections._geography_choose[area_subset][key]
                for key in cached_area
            }.values()
        )
        return shape_indices

    match area_subset:
        case "lat/lon":
            geom = _get_as_shapely(selections)
            if not geom.is_valid:
                raise ValueError(
                    "Please go back to 'select' and choose" + " a valid lat/lon range."
                )
            ds_region = [geom]
        case "states":
            ds_region = [
                _set_subarea(
                    selections._geographies._us_states,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA counties":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_counties,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA watersheds":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_watersheds,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA Electric Load Serving Entities (IOU & POU)":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_utilities,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA Electricity Demand Forecast Zones":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_forecast_zones,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case "CA Electric Balancing Authority Areas":
            ds_region = [
                _set_subarea(
                    selections._geographies._ca_electric_balancing_areas,
                    _get_shape_indices(selections, area_subset, cached_area),
                )
            ]
        case _:
            ds_region = None
    return ds_region

read_catalog_from_select(selections)

The primary and first data loading method, called by DataParameters.retrieve, it returns a DataArray (which can be quite large) containing everything requested by the user (which is stored in 'selections').

Parameters:

Name Type Description Default
selections DataParameters

object holding user's selections

required

Returns:

Name Type Description
da DataArray

output data

Source code in climakitae/core/data_load.py
def read_catalog_from_select(selections: "DataParameters") -> xr.DataArray:
    """The primary and first data loading method, called by
    DataParameters.retrieve, it returns a DataArray (which can be quite large)
    containing everything requested by the user (which is stored in 'selections').

    Parameters
    ----------
    selections : DataParameters
        object holding user's selections

    Returns
    -------
    da : xr.DataArray
        output data

    """

    if selections.approach == "Warming Level":
        selections.time_slice = (1950, 2100)  # Retrieve entire time period

    # Raise appropriate errors for time-based retrieval
    if selections.approach == "Time":
        if (selections.scenario_ssp != []) and (
            "Historical Reconstruction" in selections.scenario_historical
        ):
            raise ValueError(
                "Historical Reconstruction data is not available with SSP data. Please modify your selections and try again."
            )

        # Validate unit selection
        # Returns None if units are valid, raises error if not
        _check_valid_unit_selection(selections)

        # Raise error if no scenarios are selected
        scenario_selections = selections.scenario_ssp + selections.scenario_historical
        if scenario_selections == []:
            raise ValueError("Please select as least one dataset.")

    # Raise error if station data selected, but no station is selected
    if (selections.data_type == "Stations") and (
        selections.stations in [[], ["No stations available at this location"]]
    ):
        raise ValueError(
            "Please select at least one weather station, or retrieve gridded data."
        )

    # For station data, need to expand time slice to ensure the historical period is included
    # At the end, the data will be cut back down to the user's original selection
    if selections.data_type == "Stations":
        original_time_slice = selections.time_slice  # Preserve original user selections
        original_scenario_historical = selections.scenario_historical.copy()
        if "Historical Climate" not in selections.scenario_historical:
            selections.scenario_historical.append("Historical Climate")
        obs_data_bounds = (
            1980,
            2014,
        )  # Bounds of the observational data used in bias-correction
        if original_time_slice[0] > obs_data_bounds[0]:
            selections.time_slice = (obs_data_bounds[0], original_time_slice[1])
        if original_time_slice[1] < obs_data_bounds[1]:
            selections.time_slice = (selections.time_slice[0], obs_data_bounds[1])

    ## ------ Deal with derived variables ------
    orig_var_id_selection = selections.variable_id[0]
    orig_unit_selection = selections.units
    orig_variable_selection = selections.variable

    # Get data attributes beforehand since selections is modified
    data_attrs = _get_data_attributes(selections)
    if "_derived" in orig_var_id_selection:
        match orig_var_id_selection:
            case "wind_speed_derived":  # Hourly
                da = _get_wind_speed_derived(selections)
            case "wind_direction_derived":  # Hourly
                da = _get_wind_dir_derived(selections)
            case "dew_point_derived":  # Monthly/daily
                da = _get_monthly_daily_dewpoint(selections)
            case "dew_point_derived_hrly":  # Hourly
                da = _get_hourly_dewpoint(selections)
            case "rh_derived":  # Hourly
                da = _get_hourly_rh(selections)
            case "q2_derived":  # Hourly
                da = _get_hourly_specific_humidity(selections)
            case "fosberg_index_derived":  # Hourly
                da = _get_fosberg_fire_index(selections)
            case "noaa_heat_index_derived":  # Hourly
                da = _get_noaa_heat_index(selections)
            case "effective_temp_index_derived":
                da = _get_eff_temp(selections)
            case _:  # none of the above
                raise ValueError(
                    "You've encountered a bug. No data available for selected derived variable."
                )

        # ------ Set attributes ------
        # Convert units before copying data attributes
        da = convert_units(da, selected_units=orig_unit_selection)
        da.name = orig_variable_selection  # Set name of DataArray

        # Reset selections to user's original selections
        selections.variable_id = [orig_var_id_selection]
        selections.units = orig_unit_selection

        # Some of the derived variables may be constructed from data that comes from the same institution
        # The dev team hasn't looked into this yet -- opportunity for future improvement
        data_attrs = data_attrs | {"institution": "Multiple"}
        da.attrs = data_attrs

    # Rotate wind vectors
    elif (
        any(x in selections.variable_id for x in ["u10", "v10"])
        and selections.downscaling_method == "Dynamical"
    ):
        if "u10" in selections.variable_id:
            da = _get_Uearth(selections)
        elif "v10" in selections.variable_id:
            da = _get_Vearth(selections)

    # Any other variable... i.e. not an index, derived var, or a WRF wind vector
    else:
        da = _get_data_one_var(selections)

    # Assure that CRS and grid_mapping are in place for all data returned
    if (selections.downscaling_method == "Dynamical") and (
        "Lambert_Conformal" in da.coords
    ):
        da.attrs = da.attrs | {"grid_mapping": "Lambert_Conformal"}
    elif selections.downscaling_method in ["Statistical", "Dynamical+Statistical"]:
        da = da.rio.write_crs("epsg:4326", inplace=True)

    if selections.data_type == "Stations":
        # Bias-correct the station data
        # Preserve attributes from the gridded data (e.g. `location_subset`) which
        # can be lost during the station bias-correction step. Capture them here
        # and re-attach after `_station_apply`.
        try:
            gridded_attrs = dict(da.attrs) if hasattr(da, "attrs") else {}
        except Exception:
            gridded_attrs = {}

        da = _station_apply(selections, da, original_time_slice)

        # Re-attach gridded attributes onto each station variable if they are missing.
        # Do not overwrite any existing station-specific attributes.
        try:
            if isinstance(da, xr.Dataset):
                for var in da.data_vars:
                    for k, v in gridded_attrs.items():
                        if k not in da[var].attrs:
                            da[var].attrs[k] = v
            elif isinstance(da, xr.DataArray):
                for k, v in gridded_attrs.items():
                    if k not in da.attrs:
                        da.attrs[k] = v
        except Exception:
            # If anything goes wrong attaching attributes, proceed without failing
            # the entire retrieval - attribute preservation is best-effort.
            pass

        # Ensure station-specific metadata exists for each returned station variable.
        # Some mapping/execution paths can drop attributes added in the inner
        # bias-correction function; reconstruct them from the station GeoDataFrame
        # when missing so tests and callers can rely on their presence.
        try:
            if isinstance(da, xr.Dataset):
                for var in da.data_vars:
                    attrs = da[var].attrs
                    # Lookup corresponding row in stations GeoDataFrame
                    try:
                        st_row = selections._stations_gdf.loc[
                            selections._stations_gdf["station"] == var
                        ].iloc[0]
                    except Exception:
                        st_row = None

                    # Station coordinates
                    if "station_coordinates" not in attrs:
                        try:
                            if st_row is not None:
                                lat = (
                                    st_row["LAT_Y"]
                                    if "LAT_Y" in st_row
                                    else st_row.get("latitude", None)
                                )
                                lon = (
                                    st_row["LON_X"]
                                    if "LON_X" in st_row
                                    else st_row.get("longitude", None)
                                )
                                if lat is not None and lon is not None:
                                    da[var].attrs["station_coordinates"] = (
                                        float(lat),
                                        float(lon),
                                    )
                        except Exception:
                            pass

                    # Station elevation
                    if "station_elevation" not in attrs:
                        try:
                            if st_row is not None and "elevation" in st_row:
                                elev = st_row["elevation"]
                                # Keep a human-readable string similar to preprocessing
                                da[var].attrs["station_elevation"] = f"{elev} meters"
                        except Exception:
                            pass

                    # Bias adjustment descriptor
                    if "bias_adjustment" not in attrs:
                        try:
                            # best-effort human-readable descriptor
                            da[var].attrs[
                                "bias_adjustment"
                            ] = "QuantileDeltaMapping.adjust(sim, )"
                        except Exception:
                            pass
        except Exception:
            # Best-effort: don't fail retrieval for metadata reconstruction issues
            pass

        # Reset original selections
        if "Historical Climate" not in original_scenario_historical:
            selections.scenario_historical.remove("Historical Climate")
            try:
                da["scenario"] = [
                    x.split("Historical + ")[1] for x in da.scenario.values
                ]
            except Exception:
                # best-effort: if the scenario coordinate isn't present or in
                # unexpected format, ignore and continue
                pass
        selections.time_slice = original_time_slice

    if selections.approach == "Warming Level":
        # Process data object using warming levels approach
        # Dimensions and coordinates will change
        # See function documentation for more information
        da = _apply_warming_levels_approach(da, selections)

        # Reset original selections
        selections.scenario_ssp = ["n/a"]
        selections.scenario_historical = ["n/a"]

    return da

Data Export

remove_zarr(filename)

Remove Zarr directory structure helper function. As Zarr format is a directory tree it is not easily removed using JupyterHUB GUI. This function simply deletes an entire directory tree.

Parameters:

Name Type Description Default
filename str

Output Zarr file name (without file extension, i.e. "my_filename" instead of "my_filename.zarr").

required
Source code in climakitae/core/data_export.py
def remove_zarr(filename: str):
    """Remove Zarr directory structure helper function. As Zarr format is a directory
    tree it is not easily removed using JupyterHUB GUI. This function simply deletes
    an entire directory tree.

    Parameters
    ----------
    filename : str
        Output Zarr file name (without file extension, i.e. "my_filename" instead
        of "my_filename.zarr").

    """
    if type(filename) is not str:
        raise Exception(
            (
                "Please pass a string"
                " (any characters surrounded by quotation marks)"
                " for your file name."
            )
        )
    filename = filename.split(".")[0]

    dir_path = filename + ".zarr"

    try:
        shutil.rmtree(dir_path)
        print(f"Zarr dataset '{dir_path}' deleted successfully.")
    except FileNotFoundError:
        print(f"Zarr dataset '{dir_path}' not found.")
    except OSError as e:
        print(f"Error deleting Zarr dataset '{dir_path}': {e}")

export(data, filename='dataexport', format='NetCDF', mode='local')

Save xarray data as NetCDF, Zarr, or CSV in the current working directory, or if Zarr optionally stream the export file to an AWS S3 scratch bucket and give download URL. NetCDF can only be written to the HUB user partition if it will fit. Zarr can either be written to the HUB user partition or to S3 scratch bucket using the mode option.

Parameters:

Name Type Description Default
data DataArray | Dataset

Data to export, as output by e.g. DataParameters.retrieve().

required
filename str

Output file name (without file extension, i.e. "my_filename" instead of "my_filename.nc"). The default is "dataexport".

'dataexport'
format str

File format ("Zarr", "NetCDF", "CSV"). The default is "NetCDF".

'NetCDF'
mode str

Save location logic for Zarr file ("local", "s3"). The default is "local"

'local'

Returns:

Type Description
None
Source code in climakitae/core/data_export.py
def export(
    data: xr.DataArray | xr.Dataset,
    filename: str = "dataexport",
    format: str = "NetCDF",
    mode: str = "local",
):
    """Save xarray data as NetCDF, Zarr, or CSV in the current working directory, or if Zarr optionally
    stream the export file to an AWS S3 scratch bucket and give download URL. NetCDF can only be written
    to the HUB user partition if it will fit. Zarr can either be written to the HUB user partition or to
    S3 scratch bucket using the mode option.

    Parameters
    ----------
    data : xr.DataArray | xr.Dataset
        Data to export, as output by e.g. `DataParameters.retrieve()`.
    filename : str, optional
        Output file name (without file extension, i.e. "my_filename" instead
        of "my_filename.nc"). The default is "dataexport".
    format : str, optional
        File format ("Zarr", "NetCDF", "CSV"). The default is "NetCDF".
    mode : str, optional
        Save location logic for Zarr file ("local", "s3"). The default is "local"

    Returns
    -------
    None

    """
    ftype = type(data)

    if ftype not in [xr.core.dataset.Dataset, xr.core.dataarray.DataArray]:
        raise Exception(
            "Cannot export object of type "
            + str(ftype).strip("<class >")
            + ". Please pass an Xarray Dataset or DataArray."
        )

    if type(filename) is not str:
        raise Exception(
            (
                "Please pass a string"
                " (any characters surrounded by quotation marks)"
                " for your file name."
            )
        )
    filename = filename.split(".")[0]

    req_format = format.lower()

    if req_format not in ["zarr", "netcdf", "csv"]:
        raise Exception('Please select "Zarr", "NetCDF" or "CSV" as the file format.')

    extension_dict = {"zarr": ".zarr", "netcdf": ".nc", "csv": ".csv.gz"}

    save_name = filename + extension_dict[req_format]

    if (mode == "s3") and (req_format != "zarr"):
        raise Exception('To export to AWS S3 you must use the format="Zarr" option.')

    # now here is where exporting actually begins
    # we will have different functions for each file type
    # to keep things clean-ish
    match req_format:
        case "zarr":
            _export_to_zarr(data, save_name, mode)
        case "netcdf":
            _export_to_netcdf(data, save_name)
        case "csv":
            _export_to_csv(data, save_name)
        case _:
            raise Exception(
                'Please select "Zarr", "NetCDF" or "CSV" as the file format.'
            )

write_tmy_file(filename_to_export, df, years, location_name, station_code, stn_lat, stn_lon, stn_state, stn_elev=0.0, file_ext='tmy')

Exports TMY data either as .epw or .tmy file

Parameters:

Name Type Description Default
filename_to_export str

Filename string, constructed with station name and simulation

required
df DataFrame

Dataframe of TMY data to export

required
years Tuple[int, int]

Tuple containing climatology start and end years

required
location_name str

Location name string, often station name

required
station_code int

Station code

required
stn_lat float

Station latitude

required
stn_lon float

Station longitude

required
stn_state str

State of station location

required
stn_elev float

Elevation of station, default is 0.0

0.0
file_ext str

File extension for export, default is .tmy, options are "tmy" and "epw"

'tmy'

Returns:

Type Description
None
Source code in climakitae/core/data_export.py
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
def write_tmy_file(
    filename_to_export: str,
    df: pd.DataFrame,
    years: Tuple[int, int],
    location_name: str,
    station_code: int,
    stn_lat: float,
    stn_lon: float,
    stn_state: str,
    stn_elev: float = 0.0,
    file_ext: str = "tmy",
):
    """Exports TMY data either as .epw or .tmy file

    Parameters
    ----------
    filename_to_export : str
        Filename string, constructed with station name and simulation
    df : pd.DataFrame
        Dataframe of TMY data to export
    years : Tuple[int, int]
        Tuple containing climatology start and end years
    location_name : str
        Location name string, often station name
    station_code : int
        Station code
    stn_lat : float
        Station latitude
    stn_lon : float
        Station longitude
    stn_state : str
        State of station location
    stn_elev : float, optional
        Elevation of station, default is 0.0
    file_ext : str, optional
        File extension for export, default is .tmy, options are "tmy" and "epw"

    Returns
    -------
    None

    """
    station_df = pd.read_csv(HADISD_STATIONS_URL)

    # check that data passed is a DataFrame object
    if type(df) != pd.DataFrame:
        raise ValueError(
            "The function requires a pandas DataFrame object as the data input"
        )

    # normalize simulation column name
    if "simulation" in df.columns and "sim" not in df.columns:
        df = df.rename(columns={"simulation": "sim"})

    # size check on TMY dataframe
    df = _tmy_8760_size_check(df)

    # Normalize time format: fix functions in _tmy_8760_size_check may
    # convert time to datetime objects (with seconds).  Re-format to
    # consistent "%Y-%m-%d %H:%M" strings so downstream writers and
    # _tmy_reset_time_for_gwl see a uniform format.
    df["time"] = pd.to_datetime(df["time"]).dt.strftime("%Y-%m-%d %H:%M")

    def _utc_offset_timezone(lat, lon):
        """Based on user input of lat lon, returns the UTC offset for that timezone

        Parameters
        ----------
        lat : float
            latitude of point of interest
        lon : float
            longitude of point of interest

        Returns
        -------
        str

        Modified from:
        https://stackoverflow.com/questions/5537876/get-utc-offset-from-time-zone-name-in-python

        """
        tf = TimezoneFinder()
        tzn = tf.timezone_at(lng=lon, lat=lat)

        time_now = datetime.datetime.now(pytz.timezone(tzn))
        tz_offset = time_now.utcoffset().total_seconds() / 60 / 60

        diff = "{:d}".format(int(tz_offset))

        return diff

    # custom location input handling
    match station_code:
        case str():  # custom code passed
            station_code = station_code
            state = stn_state
            timezone = _utc_offset_timezone(lon=stn_lon, lat=stn_lat)
            elevation = (
                stn_elev  # default of 0.0 on custom inputs if elevation is not provided
            )

        case int():  # hadisd station code passed
            # look up info
            if station_code in station_df["station id"].values:
                state = station_df.loc[station_df["station id"] == station_code][
                    "state"
                ].values[0]
                elevation = station_df.loc[station_df["station id"] == station_code][
                    "elevation"
                ].values[0]
                station_code = str(station_code)[:6]
                timezone = _utc_offset_timezone(lon=stn_lon, lat=stn_lat)
        case _:
            raise ValueError("station_code needs to be either str or int")

    def _tmy_header(
        location_name: str,
        station_code: int,
        stn_lat: float,
        stn_lon: float,
        state: str,
        timezone: str,
        elevation: float,
        df: pd.DataFrame,
    ) -> list[str]:
        """Constructs the header for the TMY output file in .tmy format

        Parameters
        ----------
        location_name : str
        station_code : int
        stn_lat : float
        stn_lon : float
        state : str
        timezone : str
        elevation : float
        df : pd.DataFrame

        Returns
        -------
        headers : list[str]

        Source: https://www.nrel.gov/docs/fy08osti/43156.pdf (pg. 3)

        """
        # line 1 - site information
        # line 1: USAF, station name quote delimited, state, time zone, lat, lon, elev (m)
        line_1 = "{0},'{1}',{2},{3},{4},{5},{6},{7}\n".format(
            station_code,
            location_name,
            state,
            timezone,
            stn_lat,
            stn_lon,
            elevation,
            df["sim"].values[0],
        )

        # line 2 - data field name and units, manually setting to ensure matches TMY3 labeling
        line_2 = (
            ",".join(
                [
                    "Air temperature at 2m (degC)",
                    "Dew point temperature at 2m (degC)",
                    "Relative humidity (0-100)",
                    "Instantaneous downwelling shortwave flux at bottom (W/m2)",
                    "Shortwave surface downward direct normal irradiance (W/m2)",
                    "Shortwave surface downward diffuse irradiance (W/m2)",
                    "Instantaneous downwelling longwave flux at bottom (W/m2)",
                    "Wind speed at 10m (m/s)",
                    "Wind direction at 10m (degrees)",
                    "Surface pressure (Pa)",
                ]
            )
            + "\n"
        )

        headers = [line_1, line_2]

        return headers

    def _epw_header(
        location_name: str,
        station_code: int,
        stn_lat: float,
        stn_lon: float,
        state: str,
        timezone: str,
        elevation: float,
        years: Tuple[int, int],
        df: pd.DataFrame,
    ) -> list[str]:
        """Constructs the header for the TMY output file in .epw format

        Parameters
        ----------
        location_name : str
        station_code : int
        stn_lat : float
        stn_lon : float
        state : str
        timezone : str
        elevation : float
        df : pd.DataFrame

        Returns
        -------
        headers : list[str]

        Source: EnergyPlus Version 23.1.0 Documentation

        """
        # line 1 - location, location name, state, country, WMO, lat, lon
        # line 1 - location, location name, state, country, weather station number (2 cols), lat, lon, time zone, elevation
        line_1 = f"LOCATION,{location_name.upper()},{state},USA,{'Custom_{}'.format(station_code)},{station_code},{stn_lat},{stn_lon},{timezone},{elevation}\n"

        # line 2 - design conditions, leave blank for now
        line_2 = "DESIGN CONDITIONS\n"

        # line 3 - typical/extreme periods, leave blank for now
        line_3 = "TYPICAL/EXTREME PERIODS\n"

        # line 4 - ground temperatures, leave blank for now
        line_4 = "GROUND TEMPERATURES\n"

        # line 5 - holidays/daylight savings, leap year (yes/no), daylight savings start, daylight savings end, num of holidays
        line_5 = "HOLIDAYS/DAYLIGHT SAVINGS,No,0,0,0\n"

        if "warming_level" in df.columns:
            warming_level = df["warming_level"].values[0]
            simulation = df["sim"].values[0]
            # line 6 - comments 1, going to include simulation + warming level information here
            line_6 = f"COMMENTS 1,TMY data produced on the Cal-Adapt: Analytics Engine, Warming Level: {warming_level}{degree_sign}C, Simulation: {simulation}\n"
            # line 7 - comments 2, including date range here from which TMY calculated
            line_7 = f"COMMENTS 2,TMY data produced using {warming_level}{degree_sign}C warming level. Year corresponds to index (1-30) in 30-year window centered on warming level. Model years for {warming_level}{degree_sign}C warming level in simulation {simulation} are {years[0]}-{years[1]}\n"
        else:
            # line 6 - comments 1, going to include simulation + scenario information here
            if "scenario" in df.columns:
                # get_data approach has a separate scenario column
                # the scenario is not included in the simulation name
                scenario = df["scenario"].values[0]
                line_6 = f"COMMENTS 1,TMY data produced on the Cal-Adapt: Analytics Engine, Simulation: {df['sim'].values[0]}, Scenario: {scenario}\n"
            else:
                # new core approach does not have a separate scenario column, scenario is included in simulation name
                # scenario information is included in the simulation name
                line_6 = f"COMMENTS 1,TMY data produced on the Cal-Adapt: Analytics Engine, Simulation: {df['sim'].values[0]}\n"
            # line 7 - comments 2, including date range here from which TMY calculated
            line_7 = f"COMMENTS 2,TMY data produced using {years[0]}-{years[1]} climatological period\n"

        # line 8 - data periods, num data periods, num records per hour, data period name, data period start day of week, data period start (Jan 1), data period end (Dec 31)
        line_8 = "DATA PERIODS,1,1,Data,,1/ 1,12/31\n"

        headers = [line_1, line_2, line_3, line_4, line_5, line_6, line_7, line_8]

        return headers

    # typical meteorological year format
    match file_ext:
        case "tmy":
            path_to_file = filename_to_export + ".tmy"

            with open(path_to_file, "w") as f:
                f.writelines(
                    _tmy_header(
                        location_name,
                        station_code,
                        stn_lat,
                        stn_lon,
                        state,
                        timezone,
                        elevation,
                        df,
                    )
                )  # writes required header lines
                # Keep only time + the 10 TMY variables, in the order
                # that matches the line-2 header written by _tmy_header.
                tmy_data_cols = [
                    "time",
                    "Air Temperature at 2m",
                    "Dew point temperature",
                    "Relative humidity",
                    "Instantaneous downwelling shortwave flux at bottom",
                    "Shortwave surface downward direct normal irradiance",
                    "Shortwave surface downward diffuse irradiance",
                    "Instantaneous downwelling longwave flux at bottom",
                    "Wind speed at 10m",
                    "Wind direction at 10m",
                    "Surface Pressure",
                ]
                df = df[tmy_data_cols]
                dfAsString = df.to_csv(sep=",", header=False, index=False)
                f.write(dfAsString)  # writes data in TMY format
            print(
                f"TMY data exported to .tmy format with filename {path_to_file}, with size {len(df)}"
            )
        # energy plus weather format
        case "epw":
            path_to_file = filename_to_export + ".epw"
            with open(path_to_file, "w") as f:
                f.writelines(
                    _epw_header(
                        location_name,
                        station_code,
                        stn_lat,
                        stn_lon,
                        state,
                        timezone,
                        elevation,
                        years,
                        df,
                    )
                )  # writes required header lines
                # WL time change happens in _epw_format_data if needed
                df_string = _epw_format_data(df).to_csv(
                    sep=",", header=False, index=False
                )
                f.write(df_string)  # writes data in EPW format
            print(
                f"TMY data exported to .epw format with filename {filename_to_export}, with size {len(df)}"
            )
        case "csv":
            columns = [
                "index",
                "simulation",
                "time",
                "lat",
                "lon",
                "Air Temperature at 2m",
                "Dew point temperature",
                "Relative humidity",
                "Instantaneous downwelling shortwave flux at bottom",
                "Shortwave surface downward direct normal irradiance",
                "Shortwave surface downward diffuse irradiance",
                "Instantaneous downwelling longwave flux at bottom",
                "Wind speed at 10m",
                "Wind direction at 10m",
                "Surface Pressure",
            ]

            if "warming_level" in df.columns:
                df["centered_year"] = pd.to_numeric(
                    df["centered_year"], downcast="integer"
                )
                # set position of GWL specific columns
                columns.insert(3, "warming_level")
                columns.insert(6, "centered_year")
            else:
                # set order of scenario column
                columns.insert(5, "scenario")
            df = df.rename(columns={"sim": "simulation"})
            df = df[columns]
            path_to_file = filename_to_export + ".csv"
            df.to_csv(path_to_file, index=False)
            print(
                f"TMY data exported to .csv format with filename {filename_to_export}, with size {len(df)}"
            )
        case _:
            print(
                'Please pass either "tmy","epw", or "csv" as a file format for export.'
            )

Constants

This module defines constants across the codebase

Paths

This module defines package level paths

Migration Note

For new code, use the modern climakitae.new_core interface:

from climakitae.new_core.user_interface import ClimateData

# Note: WRF uses 't2max'; LOCA2 uses 'tasmax'.
data = (ClimateData()
    .catalog("cadcat")
    .activity_id("LOCA2")
    .grid_label("d03")
    .table_id("day")
    .variable("tasmax")
    .get())

See the Legacy → ClimateData migration guide.