Batch Processing: Multiple Points or Scenarios
Efficiently process many geographic locations or climate scenarios.
Process Multiple Points
from climakitae.tools import batch_select
import pandas as pd
# Define locations of interest
locations = pd.DataFrame({
"name": ["LA", "SF", "SD", "Sacramento"],
"latitude": [34.05, 37.77, 32.72, 38.58],
"longitude": [-118.25, -122.42, -117.16, -121.49]
})
# Batch process
results = batch_select(
cd,
locations,
variable="t2max",
activity_id="WRF",
institution_id="UCLA", # Specify WRF producer
table_id="mon"
)
# Results: dict[location_name] = xr.Dataset
for name, data in results.items():
print(f"{name}: {data['t2max'].mean().compute():.1f} K")
Process Multiple Scenarios
# Compare warming levels across emissions scenarios
warming_levels = [1.5, 2.0, 3.0]
scenarios = ["ssp245", "ssp370", "ssp585"]
results = {}
for scenario in scenarios:
scenario_data = {}
for gwl in warming_levels:
data = (cd
.activity_id("WRF")
.institution_id("UCLA")
.experiment_id(scenario)
.variable("tasmax")
.processes({
"warming_level": {"warming_levels": [gwl]},
"clip": "Los Angeles"
})
.get())
scenario_data[gwl] = data["tasmax"].mean(dim=["lat", "lon"]).compute()
results[scenario] = scenario_data
# Analyze: how much warmer at higher emission levels?
# results["ssp585"][2.0] vs results["ssp245"][2.0]
Large Batch with Progress Tracking
from tqdm import tqdm
counties = ["Los Angeles", "San Francisco", "Sacramento", "Fresno", "San Diego"]
results = {}
for county in tqdm(counties, desc="Processing counties"):
data = (cd
.variable("tasmax")
.processes({
"time_slice": ("2030-01-01", "2060-12-31"),
"clip": county,
"warming_level": {"warming_levels": [2.0]}
})
.get())
results[county] = data["tasmax"].mean(dim=["lat", "lon"]).compute()
print("✅ Batch processing complete")
Parallel Batch Processing
from multiprocessing import Pool
import functools
def query_county(county_name):
"""Query temperature for one county"""
cd = ClimateData(verbosity=-1) # Quiet mode
data = (cd
.variable("tasmax")
.processes({"clip": county_name})
.get())
return county_name, data["tasmax"].mean().compute()
counties = ["Los Angeles", "San Francisco", "Sacramento", "Fresno"]
with Pool(processes=4) as pool:
results = dict(pool.map(query_county, counties))
# Results computed in parallel on 4 cores
Distributed Computation with Coiled (AWS)
climakitae returns lazy Dask-backed xarray objects, so any Dask scheduler — including a Coiled cluster — takes over automatically when .compute() is called. No climakitae-specific integration is needed.
import coiled
from climakitae.new_core.user_interface import ClimateData
# Spin up a Coiled cluster in us-west-2 (same region as Cal-Adapt S3 data)
cluster = coiled.Cluster(
n_workers=10,
region="us-west-2",
name="climakitae-batch",
)
client = cluster.get_client() # Registers cluster as default Dask scheduler
# From here, all .compute() calls run on the cluster
cd = ClimateData(verbosity=-1)
results = {}
for scenario in ["historical", "ssp245", "ssp370", "ssp585"]:
data = (cd
.catalog("cadcat")
.activity_id("LOCA2")
.experiment_id(scenario)
.variable("tasmax")
.table_id("mon")
.grid_label("d03")
.processes({
"time_slice": ("2020-01-01", "2060-12-31"),
"clip": "Los Angeles",
})
.get())
# Computation is dispatched to Coiled workers
results[scenario] = data["tasmax"].mean(dim=["lat", "lon"]).compute()
cluster.shutdown()
Region matters
Place your Coiled cluster in us-west-2. Cal-Adapt data lives in S3 us-west-2
buckets, so workers co-located in the same region avoid egress costs and
significantly reduce transfer latency.
Best Practice: Cache Intermediate Results
import os
from pathlib import Path
# Store results to avoid re-querying
cache_dir = Path("climate_data_cache")
cache_dir.mkdir(exist_ok=True)
counties = ["Los Angeles", "San Francisco"]
for county in counties:
cache_file = cache_dir / f"{county.lower()}_2030_2060.nc"
if cache_file.exists():
# Load from cache
import xarray as xr
data = xr.open_dataset(cache_file)
else:
# Query and cache
data = (cd
.activity_id("WRF")
.institution_id("UCLA")
.variable("tasmax")
.processes({
"time_slice": ("2030-01-01", "2060-12-31"),
"clip": county
})
.get())
data.to_netcdf(cache_file)
print(f"{county}: mean={data['tasmax'].mean().values:.1f}K")