Architecture & Internals
This guide explains the internal architecture of climakitae's new_core module for developers and contributors. It covers system design, component responsibilities, key patterns, and how to extend the system.
Audience
This is the contributor / extender reference. If you only want to use the ClimateData interface, start with Core Concepts or the How-To Guides.
On this page
- System Overview — pipeline architecture and data flow
- Core Components —
ClimateData,DatasetFactory,Dataset,DataCatalog, registries - Key Design Patterns — lazy evaluation, fluent builder, processor priorities, history tracking
- Extending the System — adding processors, validators, catalogs
- Threading & Concurrency — per-thread instances, catalog singleton safety
- Internal APIs — registry inspection helpers
- Testing New Components — unit / integration patterns
- Common Pitfalls
- Resources
System Overview
The new_core module uses a pipeline architecture with layered responsibilities:
┌─────────────────────────────────────────────────────────────┐
│ ClimateData (user_interface.py) │
│ Fluent API: .catalog("cadcat").variable("tas").get() │
└────────────────┬────────────────────────────────────────────┘
│ Creates
▼
┌─────────────────────────────────────────────────────────────┐
│ DatasetFactory (dataset_factory.py) │
│ - Resolves catalog + validates parameters │
│ - Creates configured Dataset object │
└────────────────┬────────────────────────────────────────────┘
│ Instantiates
▼
┌─────────────────────────────────────────────────────────────┐
│ Dataset.execute() (dataset.py) │
│ - Validates query against catalog validators │
│ - Retrieves data from DataCatalog │
│ - Runs processors in priority order │
│ - Returns lazy xarray.Dataset │
└────────────────┬────────────────────────────────────────────┘
┌───────┴───────┬──────────────┬──────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌──────────────┐ ┌──────────┐ ┌────────────┐
│ Query │ │ Validators │ │Processors│ │ DataAccess │
│ Params │ │ (Registry) │ │(Registry)│ │ (Singleton)│
└────────┘ └──────────────┘ └──────────┘ └────────────┘
│
┌─────────┴─────────┐
│ │
┌─────┴──────┐ ┌────────┴───────┐
│ Catalog │ │ Processor │
│ Validators │ │ Validators │
└────────────┘ └────────────────┘
Data Flow
- User calls ClimateData: Builds fluent query chain (catalog, variable, experiment, etc.)
.get()triggers DatasetFactory: Factory resolves catalog type and creates Dataset instance- Dataset validates: Runs catalog-specific validator to normalize/validate parameters
- Dataset retrieves data: Queries DataCatalog singleton to fetch data from intake
- Processors execute in priority order: Transform data (clip, time_slice, export, etc.)
- Returns lazy xarray: Dask-backed Dataset, computation deferred until explicit
.compute()
Core Components
1. ClimateData (User Interface)
File: climakitae/new_core/user_interface.py (~1200 lines)
Responsibility: Fluent API entry point for users. All setter methods chain and return self.
Key Methods:
cd = ClimateData(verbosity=0)
cd.catalog("cadcat") # Set catalog
cd.activity_id("WRF") # Set downscaling method
cd.variable("t2max") # Set variable
cd.experiment_id("ssp245") # Set scenario
cd.table_id("mon") # Set temporal resolution
cd.grid_label("d03") # Set spatial resolution
cd.processes({"clip": "Los Angeles"}) # Set processors
cd.show_variable_options() # Discover available variables
data = cd.get() # Execute query, return xarray
Internal Contract:
- All parameter setters must return self
- Query is reset after .get() is called
- Parameter values stored internally, passed to DatasetFactory on .get()
- Discovery methods (show_*_options()) delegate to DatasetFactory
2. DatasetFactory
File: climakitae/new_core/dataset_factory.py (~600 lines)
Responsibility: Catalog resolution and Dataset construction.
Key Methods:
factory = DatasetFactory()
# Resolve catalog by name (with fuzzy matching)
catalog_obj = factory.get_catalog("cadcat")
# Get all valid options for a parameter
variables = factory.get_variable_options("cadcat", "WRF")
institutions = factory.get_institution_id_options("cadcat")
# Create configured Dataset for a query
query = {"catalog": "cadcat", "variable_id": "t2max", ...}
dataset = factory.create_dataset(query)
Internal Logic:
1. Receives query dict from ClimateData
2. Resolves catalog key (name → catalog type)
3. Selects appropriate catalog validator (e.g., CadcatParamValidator)
4. Creates Dataset instance with validator and default processors
Key Property: Catalog Mapping
- cadcat → main climate data (LOCA2, WRF)
- renewable energy generation → wind/solar capacity factors
- hdp → historical data platform (weather stations)
3. Dataset (Pipeline Executor)
File: climakitae/new_core/dataset.py (~380 lines)
Responsibility: Execute the data processing pipeline.
Key Method:
Pipeline Stages:
- Validation: Run catalog-specific validator
- Normalizes parameter names
- Validates required parameters
- Suggests alternatives for typos
-
Returns validated query or None
-
Data Access: Query DataCatalog for data
- Resolves intake catalog
- Executes intake-esm query
-
Returns xarray.Dataset with dask arrays (lazy)
-
Processing: Run registered processors in priority order
- Each processor receives data + context dict
- Processors transform data while preserving laziness
- Processors update context with metadata
-
Execution order determined by priority value
-
Return: xarray.Dataset (lazy, ready for
.compute())
Context Dict: Processors store metadata in context[_NEW_ATTRS_KEY]
context = {}
result = dataset.execute(query, context=context)
# context now contains metadata from all processors
4. DataCatalog (Data Access Layer)
File: climakitae/new_core/data_access/data_access.py (~620 lines)
Responsibility: Thread-safe singleton managing all catalog connections.
Key Properties:
catalog = DataCatalog()
catalog.data # intake_esm for main climate data
catalog.boundary # intake for geographic boundaries
catalog.renewables # intake_esm for renewable energy
catalog.hdp # intake_esm for historical data platform
catalog.catalog_df # Merged DataFrame of all ESM catalogs
catalog.boundaries # Lazy-loading Boundaries manager
Key Methods:
# Get data with explicit catalog key (thread-safe)
data = catalog.get_data(query_dict, catalog_key="cadcat")
# Resolve catalog key with fuzzy matching
key = catalog.resolve_catalog_key("cadcat")
# List available boundaries
boundaries = catalog.list_clip_boundaries()
# Get station metadata
stations = catalog.get_stations()
Thread-Safety Pattern:
# ✅ Thread-safe: Pass catalog_key as parameter
def get_data(query, catalog_key="cadcat"):
# Lookup is atomic, no state mutation
return self._catalogs[catalog_key].search(**query).to_dask()
# ❌ Not thread-safe: State stored on singleton
self._current_key = catalog_key # DON'T DO THIS
Connection Management:
- Catalogs loaded lazily on first access
- intake-esm catalogs cached in _catalogs dict
- Boundaries loaded on-demand via lazy property
- Thread lock ensures atomic initialization
5. ParameterValidator (Registry Pattern)
File: climakitae/new_core/param_validation/abc_param_validation.py (~550 lines)
Responsibility: Validate and normalize query parameters.
Two Registry Types:
Catalog Validators
Validate parameters for entire data catalogs (one per catalog type):
@register_catalog_validator("cadcat")
class CadcatParamValidator(ParameterValidator):
def is_valid_query(self, query):
# Catalog-specific validation
if "variable_id" not in query:
return None # Invalid
return self._validate_and_normalize(query)
Processor Validators
Validate parameters for specific processors:
@register_processor_validator("clip")
def validate_clip(value, **kwargs):
# Processor-specific validation
if not isinstance(value, (str, tuple, list)):
return False
return True
Validation Flow:
1. Dataset calls validator.is_valid_query(query)
2. Validator checks required parameters
3. Validator searches catalog_df for matching datasets
4. Validator calls processor validators for each processor
5. Returns validated query (normalized keys) or None if invalid
Key Methods:
# Load and cache catalog dataframe
validator.load_catalog_df()
# Find matching catalog entries
matches = validator.find_catalog_entries({"variable_id": "t2max"})
# Get default processors for this query
processors = validator.get_default_processors(query)
# Suggest alternatives for typos
suggestions = validator._get_closest_options("tasxxx", "variable_id", n=3)
6. DataProcessor (Registry Pattern)
File: climakitae/new_core/processors/abc_data_processor.py (~150 lines)
Responsibility: Transform data while preserving lazy evaluation.
Required Methods:
@register_processor("my_processor", priority=80)
class MyProcessor(DataProcessor):
def __init__(self, value):
self.value = value
self.name = "my_processor"
self.needs_catalog = False # True if needs DataCatalog access
def execute(self, result, context) -> xr.Dataset:
"""Transform data. Preserve lazy evaluation with dask."""
# ✅ CORRECT: Return new object, don't mutate
return result.sel(lat=slice(33, 35))
def update_context(self, context):
"""Record metadata about processing step."""
if _NEW_ATTRS_KEY not in context:
context[_NEW_ATTRS_KEY] = {}
context[_NEW_ATTRS_KEY]["my_processor"] = "Applied successfully"
def set_data_accessor(self, catalog):
"""Optional: Receive DataCatalog reference if needs_catalog=True."""
self._catalog = catalog
Priority guidelines (real values used by the shipped processors):
0–10 : Pre-processing (filter_unadjusted_models=0, drop_leap_days=1,
convert_units=5, warming_level=10) — catalog refinement & GWL subset
50–70 : Combination + correction + spatial
(concat=50, bias_adjust_model_to_station=60, clip=65,
convert_to_local_time=70)
150–7500 : Temporal subsetting & metric computation
(time_slice=150, metric_calc=7500)
9998–9999: Finalization (update_attributes=9998, export=9999)
When registering a custom processor, pick a priority that puts it in the phase that matches its intent. See the Processors index for the full registry.
Key Rules:
- No in-place mutation: Always return new objects
- Preserve laziness: Don't call .load() or .compute()
- Handle edge cases: Return data with warnings instead of raising
- Update context: Document what the processor did
Registry Access:
from climakitae.new_core.processors.abc_data_processor import _PROCESSOR_REGISTRY
# Inspect registered processors
for key, (cls, priority) in sorted(_PROCESSOR_REGISTRY.items(),
key=lambda x: x[1][1]):
print(f"{key}: {cls.__name__} (priority={priority})")
Key Design Patterns
Understanding Processors: Spatial Subsetting
The Clip processor is the primary tool for spatial subsetting. It extracts data for specific regions while maintaining data integrity and lazy evaluation.
Key Characteristics: - Supports 5 input modes: named boundaries, points, bounding boxes, weather stations, shapefiles - Preserves coordinate systems and projections - Works with lazy dask arrays (no premature loading) - Returns masked or clipped Dataset depending on mode
Efficiency Principle: Always apply spatial subsetting EARLY in your query chain, before aggregation or computation, to minimize data movement.
# ✅ EFFICIENT: Clip first, then aggregate
data = (cd
.variable("tasmax")
.processes({
"clip": "Los Angeles",
"time_slice": ("2030", "2060")
})
.get())
mean_temp = data["tasmax"].mean(dim=["lat", "lon"]).compute()
# ❌ INEFFICIENT: Load all data then subset
data = cd.variable("tasmax").get()
clipped = data.sel(lat=slice(33.5, 35), lon=slice(-119, -117))
mean_temp = clipped["tasmax"].mean().compute()
See Processor: Clip for detailed API reference.
Understanding Processors: Bias Correction
WRF model output can be bias-corrected using historical weather station observations to improve local accuracy.
Purpose: Adjust systematic model bias while preserving projected climate trends using Quantile Delta Mapping (QDM).
Current Scope: - ✅ WRF data only (not LOCA2) - ✅ Hourly temperature (t2) only - ✅ HadISD weather stations (~600 globally, ~200 western US)
When to Use: - Local impact assessment where historical accuracy matters - Building/infrastructure design requiring site-specific bias correction - When observation-corrected distribution is important
When NOT to Use: - Regional/state-level planning (raw model suitable for large areas) - LOCA2 data (already bias-corrected during downscaling) - Other variables/resolutions (not yet supported)
See Processor: Bias Adjust Model to Station for detailed usage.
Understanding Processors: Data Export Pipeline
The Export processor writes climate data to disk in multiple formats optimized for different use cases.
Supported Formats: - NetCDF: Standard scientific format with CF conventions (default) - Zarr: Cloud-optimized chunked storage for large datasets - CSV: Tabular format for time series and spreadsheet analysis - GeoTIFF: Raster format compatible with GIS software (QGIS, ArcGIS)
Key Features:
- Handles gridded datasets, multi-point extractions, and point collections
- Optional location-based filenames (e.g., data_34.05N_118.25W.nc)
- S3 cloud storage support (Zarr format only)
- Export method options: "data", "skip_existing", "none"
- Automatic format inference or explicit specification
Efficiency: - Export should be the LAST processor (priority 9999) - Processes build computation graph, export writes results - For large datasets, prefer Zarr for cloud storage or incremental processing
# Simple NetCDF export
data = (cd
.variable("tasmax")
.processes({
"time_slice": ("2030-01-01", "2060-12-31"),
"clip": "Los Angeles",
"export": {
"filename": "la_temperature",
"file_format": "NetCDF"
}
})
.get())
# Cloud-optimized Zarr export
data = (cd
.variable("pr")
.processes({
"export": {
"filename": "precipitation_data",
"file_format": "Zarr",
"mode": "s3" # Store on S3
}
})
.get())
See Processor: Export for complete API reference.
Understanding Processors: Context Metadata
Each processor updates a context dictionary to track what transformations were applied.
Purpose: Maintain metadata about processing steps for reproducibility and debugging.
How It Works:
from climakitae.core.constants import _NEW_ATTRS_KEY
class MyProcessor(DataProcessor):
def update_context(self, context):
"""Record metadata about this processing step."""
if _NEW_ATTRS_KEY not in context:
context[_NEW_ATTRS_KEY] = {}
context[_NEW_ATTRS_KEY]["my_processor"] = "Applied filter > 100"
# Access processing history
result, context = dataset.execute_with_context(query)
for step, description in context["_new_attributes"].items():
print(f"{step}: {description}")
Benefits: - Track all processing steps applied to data - Enable reproducible analysis workflows - Debug unexpected data anomalies - Document data provenance
1. Fluent Interface (Method Chaining)
All parameter setters return self to enable chaining:
# ✅ CORRECT: Chain multiple setters before .get()
data = (ClimateData()
.catalog("cadcat")
.activity_id("WRF")
.variable("t2max")
.get())
# ❌ INCORRECT: State resets after .get()
cd = ClimateData()
d1 = cd.variable("tasmax").get()
d2 = cd.variable("pr").get() # Lost context - create new instance
Implementation:
2. Registry + Decorator Pattern
Components register at import time using decorators:
# Processor registration
@register_processor("clip", priority=65)
class Clip(DataProcessor):
...
# Validator registration
@register_catalog_validator("cadcat")
class CadcatValidator(ParameterValidator):
...
# Processor parameter validator
@register_processor_validator("warming_level")
def validate_warming_level(value, **kwargs):
...
Registry Storage:
_PROCESSOR_REGISTRY = {} # {key: (class, priority)}
_CATALOG_VALIDATOR_REGISTRY = {} # {catalog: validator_class}
_PROCESSOR_VALIDATOR_REGISTRY = {} # {processor_key: validator_fn}
Discovery:
# List all registered processors
for key in _PROCESSOR_REGISTRY:
print(key)
# Get processor class and priority
cls, priority = _PROCESSOR_REGISTRY["clip"]
3. Singleton Pattern (Thread-Safe)
DataCatalog uses double-checked locking:
class DataCatalog(dict):
_instance = UNSET
_lock = threading.Lock()
def __new__(cls):
# Fast path (no lock)
if cls._instance is not UNSET:
return cls._instance
# Slow path (with lock) - only for first initialization
with cls._lock:
if cls._instance is UNSET:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
Thread-Safety Contract: - Pass mutable state as parameters, not stored on singleton - Each thread can safely call methods concurrently - Catalog connections are immutable after initialization
4. Lazy Evaluation (Dask Integration)
All operations return dask-backed xarray objects:
# ✅ CORRECT: Computation deferred
result = dataset.execute(query) # Returns lazy xarray.Dataset
final = result.mean(dim='time').compute() # Triggers computation
# ❌ INCORRECT: Eager loading
result = dataset.execute(query).load() # Loads entire dataset!
final = result.mean(dim='time') # Already computed
Why Lazy Evaluation Matters:
- Climate datasets are huge (100GB+ common)
- .load() exhausts memory
- Operations build a computation graph
- .compute() only evaluates what's needed
- Subset data spatially BEFORE aggregating to reduce computation
Extending the System
Adding a New Processor
Steps:
-
Create processor file in
climakitae/new_core/processors/:# my_processor.py from climakitae.new_core.processors.abc_data_processor import ( DataProcessor, register_processor ) from climakitae.core.constants import _NEW_ATTRS_KEY @register_processor("my_key", priority=80) class MyProcessor(DataProcessor): def __init__(self, value): self.value = value self.name = "my_key" self.needs_catalog = False def execute(self, result, context): # Transform data return result.where(result > self.value) def update_context(self, context): if _NEW_ATTRS_KEY not in context: context[_NEW_ATTRS_KEY] = {} context[_NEW_ATTRS_KEY][self.name] = f"Filtered > {self.value}" def set_data_accessor(self, catalog): if self.needs_catalog: self._catalog = catalog -
Create validator (if processor has parameters):
-
Add tests:
-
Import in module: Ensure the processor is imported in
climakitae/new_core/processors/__init__.py:
Adding a New Catalog Validator
Steps:
-
Create validator in
climakitae/new_core/param_validation/:# my_catalog_param_validator.py from climakitae.new_core.param_validation.abc_param_validation import ( ParameterValidator, register_catalog_validator ) @register_catalog_validator("my_catalog") class MyCatalogValidator(ParameterValidator): def is_valid_query(self, query): # Validate required parameters if "required_param" not in query: return None # Validate and normalize return self._validate_and_normalize(query) -
Register catalog in DatasetFactory:
-
Add tests:
Adding a New Data Catalog
Steps:
-
Add URL constant in
climakitae/core/paths.py: -
Initialize in DataCatalog:
-
Add lazy property:
-
Register in DatasetFactory:
Threading & Concurrency
Thread-Safe Data Queries
import concurrent.futures
from climakitae.new_core.user_interface import ClimateData
# ✅ CORRECT: Create new ClimateData instance per thread
def fetch_scenario(scenario):
cd = ClimateData(verbosity=-1) # New instance
return (cd
.catalog("cadcat")
.activity_id("WRF")
.experiment_id(scenario)
.get())
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(fetch_scenario, s): s
for s in ["historical", "ssp245", "ssp370"]
}
results = {futures[f]: f.result() for f in concurrent.futures.as_completed(futures)}
# ❌ INCORRECT: Sharing ClimateData across threads
cd = ClimateData()
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(cd.experiment_id(s).get) for s in scenarios]
# Race condition: experiment_id() overwrites shared state
DataCatalog Thread Safety
The DataCatalog singleton is thread-safe by design:
# ✅ Thread-safe: Multiple threads accessing same catalog
def query_data(scenario):
catalog = DataCatalog() # Same instance for all threads
return catalog.get_data(
{"variable_id": "t2max", "experiment_id": scenario},
catalog_key="cadcat" # Passed as parameter, not stored
)
# Each thread can safely call without locking
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query_data, s) for s in scenarios]
results = [f.result() for f in futures]
Internal APIs
Accessing Registries Programmatically
Processors:
from climakitae.new_core.processors.abc_data_processor import _PROCESSOR_REGISTRY
# List all processors in priority order
for key, (cls, priority) in sorted(
_PROCESSOR_REGISTRY.items(),
key=lambda x: x[1][1] # Sort by priority
):
print(f"{priority:4d}: {key:20s} ({cls.__name__})")
Validators:
from climakitae.new_core.param_validation.abc_param_validation import (
_CATALOG_VALIDATOR_REGISTRY, _PROCESSOR_VALIDATOR_REGISTRY
)
# List all catalog validators
for catalog_key, validator_class in _CATALOG_VALIDATOR_REGISTRY.items():
print(f"Catalog: {catalog_key:30s} → {validator_class.__name__}")
# List all processor validators
for processor_key, validator_fn in _PROCESSOR_VALIDATOR_REGISTRY.items():
print(f"Processor: {processor_key:30s} → {validator_fn.__name__}")
Context Dictionary
Processors communicate via the context dict:
result = dataset.execute(query, context={})
# After execution, context contains metadata:
# {
# "climate_data_attributes": {
# "clip": "Clipped to bounding box (34.0, 35.0)",
# "time_slice": "Subset to 2015-01-01 to 2015-12-31",
# ...
# }
# }
Access in processor:
from climakitae.core.constants import _NEW_ATTRS_KEY
def execute(self, result, context):
if _NEW_ATTRS_KEY not in context:
context[_NEW_ATTRS_KEY] = {}
context[_NEW_ATTRS_KEY]["my_processor"] = "Processed"
return result
Testing New Components
Test Structure
Tests mirror source structure:
- Source: climakitae/new_core/processors/clip.py
- Tests: tests/new_core/processors/test_clip.py
Mocking Strategy
Patch at import location (not definition):
# ✅ CORRECT: Patch where imported
@patch("climakitae.new_core.user_interface.DatasetFactory")
def test_init(self, mock_factory):
...
# ❌ INCORRECT: Patch at definition
@patch("climakitae.new_core.dataset_factory.DatasetFactory")
Test Data Fixtures
Use tests/conftest.py fixtures for realistic datasets:
def test_my_processor(test_data_2022_monthly_45km):
"""Test processor with sample xarray dataset."""
processor = MyProcessor(value=100)
result = processor.execute(test_data_2022_monthly_45km, {})
assert result is not None
Common Pitfalls
| Pitfall | Problem | Solution |
|---|---|---|
| In-place mutation | Breaks lazy evaluation | Return new objects: return data.where(...) not data.values[:] = ... |
| Eager loading | OOM errors on large datasets | Never call .load() or .compute() in processors |
| Shared mutable state | Race conditions in threads | Pass parameters, don't store on singleton |
| Query reset after .get() | Lost context in loops | Create new ClimateData() per query |
| Wrong patch location | Mocks don't intercept calls | Patch import location: "module.ClassName" |
| Forgetting registration | Components not discovered | Use @register_* decorators at class definition |
| Context not updated | No metadata about processing | Update context[_NEW_ATTRS_KEY] in processors |
| Priority conflicts | Wrong execution order | Pick a priority that places the processor in the correct phase (see Processors index). Lower numbers run first. |
Resources
- Registry Pattern:
climakitae/new_core/processors/abc_data_processor.py - Processor Template:
climakitae/new_core/processors/template.py - ClimateData:
climakitae/new_core/user_interface.py - Tests:
tests/new_core/