Skip to content

Dataset Pipeline

Data processing pipeline execution.

A pipeline-based data processing class for climate data workflows.

The Dataset class serves as a central orchestrator that coordinates data access, parameter validation, and sequential processing steps. It implements a fluent interface pattern allowing method chaining for building complex data workflows.

Attributes:

Name Type Description
data_access DataCatalog or UNSET

The data catalog instance used for retrieving raw data from various sources.

parameter_validator ParameterValidator or UNSET

The parameter validator instance used for validating query parameters.

processing_pipeline list of DataProcessor or UNSET

A list of processing steps to be executed sequentially on the data.

Methods:

Name Description
execute

Execute the complete data processing pipeline and return the result.

with_param_validator

Set the parameter validator for the dataset (method chaining).

with_catalog

Set the data catalog for the dataset (method chaining).

with_processing_step

Add a processing step to the pipeline (method chaining).

Raises:

Type Description
TypeError

If provided components don't match expected types.

AttributeError

If provided components lack required methods.

ValueError

If required components are missing during execution.

RuntimeError

If the processing pipeline encounters execution errors.

Notes
  • Processing steps are executed in the order they are added to the pipeline
  • The context dictionary is passed through all processing steps and may be modified
  • Steps that require data access can set needs_catalog = True to receive the data accessor
  • Validation failures return an empty xarray.Dataset rather than raising exceptions
  • All components (validator, catalog, processors) must implement their respective interfaces
See Also

DataCatalog : Interface for data access components ParameterValidator : Interface for parameter validation components DataProcessor : Interface for data processing components

Initialize the Dataset class.

Attributes:

Name Type Description
data_access DataCatalog or UNSET

The data catalog instance used for retrieving raw data from various sources.

parameter_validator ParameterValidator or UNSET

The parameter validator instance used for validating query parameters.

processing_pipeline list of DataProcessor or UNSET

A list of processing steps to be executed sequentially on the data.

Source code in climakitae/new_core/dataset.py
def __init__(self):
    """Initialize the Dataset class.

    Attributes
    ----------
    data_access : DataCatalog or UNSET
        The data catalog instance used for retrieving raw data from various sources.
    parameter_validator : ParameterValidator or UNSET
        The parameter validator instance used for validating query parameters.
    processing_pipeline : list of DataProcessor or UNSET
        A list of processing steps to be executed sequentially on the data.

    """
    self.data_access = UNSET
    self.parameter_validator = UNSET
    self.processing_pipeline = UNSET  # list of processing steps

execute(parameters=UNSET)

Execute the dataset processing pipeline.

Parameters:

Name Type Description Default
parameters Dict[str, Any]

Parameters to pass to the processing pipeline

UNSET

Returns:

Type Description
Dataset

Result of the processing pipeline

Source code in climakitae/new_core/dataset.py
def execute(self, parameters: Dict[str, Any] = UNSET) -> xr.Dataset:
    """Execute the dataset processing pipeline.

    Parameters
    ----------
    parameters : Dict[str, Any], optional
        Parameters to pass to the processing pipeline

    Returns
    -------
    xr.Dataset
        Result of the processing pipeline

    """
    logger.info("Executing dataset processing pipeline")
    logger.debug("Pipeline parameters: %s", parameters)

    # Initialize context with parameters
    context = parameters.copy() if parameters is not UNSET else {}

    # Validate parameters if validator is set
    valid_query = UNSET
    if self.parameter_validator is not UNSET:
        logger.debug("Validating parameters")
        if not isinstance(self.parameter_validator, ParameterValidator):
            logger.error("Invalid parameter validator type")
            raise TypeError(
                "Parameter validator must be an instance of ParameterValidator."
            )
        valid_query = self.parameter_validator.is_valid_query(context)
        if valid_query is None:
            logger.error("Parameter validation failed")
            return None  # return None if validation fails
        logger.info("Parameter validation successful")

    # Check if data access is properly configured
    if self.data_access is UNSET:
        logger.error("Data accessor not configured")
        raise ValueError("Data accessor is not configured.")

    # Extract catalog_key from context for thread-safe data access
    catalog_key = context.get("_catalog_key")
    if catalog_key is None:
        logger.error("No catalog_key found in context")
        raise ValueError(
            "catalog_key must be provided in the query context. "
            "This is typically set by DatasetFactory.create_dataset()."
        )

    # Initialize the processing result - will be updated through pipeline steps
    logger.debug(
        "Retrieving data from data accessor with catalog_key=%s", catalog_key
    )
    current_result = self.data_access.get_data(valid_query, catalog_key=catalog_key)
    logger.info("Data retrieved successfully")

    # Check if we have a processing pipeline
    if self.processing_pipeline is UNSET or not self.processing_pipeline:
        logger.info("No processing pipeline defined, returning raw data")
        # If no pipeline is defined, just return the raw data from data_access
        return current_result

    # Execute each step in the pipeline in sequence
    logger.info("Executing %d processing steps", len(self.processing_pipeline))

    try:
        for i, step in enumerate(self.processing_pipeline, 1):
            step_name = getattr(step, "name", type(step).__name__)
            logger.debug(
                "Executing processing step %d/%d: %s",
                i,
                len(self.processing_pipeline),
                step_name,
            )

            # Some steps might need access to the data_access component
            # steps that need it should define and set `needs_catalog = True`
            # in their __init__ method
            if getattr(step, "needs_catalog", False):
                logger.debug("Step requires data catalog access")
                step.set_data_accessor(self.data_access)

            # Execute the current step
            # context is updated in place by the step
            logger.debug("BEFORE:\n%s", current_result)
            current_result = step.execute(current_result, context)
            logger.debug("AFTER:\n%s", current_result)

            if current_result is None:
                logger.warning(
                    "Processing step %s returned None",
                    step_name,
                )
            logger.debug("Processing step %d completed successfully", i)

        logger.info("All processing steps completed successfully")
        return current_result

    except Exception as e:
        # Get detailed traceback information
        tb_info = traceback.format_exc()
        # Log the traceback for debugging
        logger.error(
            "Exception during pipeline execution: %s", str(e), exc_info=True
        )
        logger.debug("Exception traceback:\n%s", tb_info)
        raise RuntimeError(f"Error in processing pipeline: {str(e)}") from e

with_param_validator(parameter_validator)

Set a new parameter validator.

Parameters:

Name Type Description Default
parameter_validator ParameterValidator

Parameter validator to set for the dataset.

required

Returns:

Type Description
Dataset

The current instance of Dataset allowing method chaining.

Raises:

Type Description
TypeError

If the parameter validator is not an instance of ParameterValidator.

Source code in climakitae/new_core/dataset.py
def with_param_validator(
    self, parameter_validator: ParameterValidator
) -> "Dataset":
    """Set a new parameter validator.

    Parameters
    ----------
    parameter_validator : ParameterValidator
        Parameter validator to set for the dataset.

    Returns
    -------
    Dataset
        The current instance of Dataset allowing method chaining.

    Raises
    ------
    TypeError
        If the parameter validator is not an instance of ParameterValidator.

    """
    if not parameter_validator:
        logger.warning(
            "No parameter validator provided. This may lead to unvalidated queries.",
        )
    if not isinstance(parameter_validator, ParameterValidator):
        raise TypeError(
            "Parameter validator must be an instance of ParameterValidator."
        )
    self.parameter_validator = parameter_validator
    return self

with_catalog(catalog)

Set a new data catalog.

Parameters:

Name Type Description Default
catalog DataCatalog

Data catalog to set for the dataset.

required

Returns:

Type Description
Dataset

The current instance of Dataset allowing method chaining.

Raises:

Type Description
TypeError

If the catalog is not an instance of DataCatalog.

AttributeError

If the catalog does not have a 'get_data' method.

TypeError

If the 'get_data' method is not callable.

Source code in climakitae/new_core/dataset.py
def with_catalog(self, catalog: DataCatalog) -> "Dataset":
    """Set a new data catalog.

    Parameters
    ----------
    catalog : DataCatalog
        Data catalog to set for the dataset.

    Returns
    -------
    Dataset
        The current instance of Dataset allowing method chaining.

    Raises
    ------
    TypeError
        If the catalog is not an instance of DataCatalog.
    AttributeError
        If the catalog does not have a 'get_data' method.
    TypeError
        If the 'get_data' method is not callable.

    """
    if not isinstance(catalog, DataCatalog):
        raise TypeError("Data catalog must be an instance of DataCatalog.")
    if not hasattr(catalog, "get_data"):
        raise AttributeError(
            "Data catalog must have a 'get_data' method to retrieve data."
        )
    if not callable(getattr(catalog, "get_data")):
        raise TypeError("'get_data' method in data catalog must be callable.")
    self.data_access = catalog
    return self

with_processing_step(step)

Add a new processing step to the pipeline.

Parameters:

Name Type Description Default
step DataProcessor

Processing step to add to the pipeline. Must have 'execute' and 'update_context' methods.

required

Returns:

Type Description
Dataset

The current instance of Dataset allowing method chaining.

Raises:

Type Description
TypeError

If the step is not an instance of DataProcessor.

AttributeError

If the step does not have 'execute', 'update_context', or 'set_data_accessor' methods.

Source code in climakitae/new_core/dataset.py
def with_processing_step(self, step: DataProcessor) -> "Dataset":
    """Add a new processing step to the pipeline.

    Parameters
    ----------
    step : DataProcessor
        Processing step to add to the pipeline. Must have 'execute' and 'update_context' methods.

    Returns
    -------
    Dataset
        The current instance of Dataset allowing method chaining.

    Raises
    ------
    TypeError
        If the step is not an instance of DataProcessor.
    AttributeError
        If the step does not have 'execute', 'update_context', or 'set_data_accessor' methods.

    """
    if not isinstance(step, DataProcessor):
        raise TypeError("Processing step must be an instance of DataProcessor.")

    if not hasattr(step, "execute") or not callable(getattr(step, "execute")):
        raise AttributeError("Processing step must have an 'execute' method.")

    if not hasattr(step, "update_context") or not callable(
        getattr(step, "update_context")
    ):
        raise AttributeError(
            "Processing step must have an 'update_context' method."
        )

    if not hasattr(step, "set_data_accessor") or not callable(
        getattr(step, "set_data_accessor")
    ):
        raise AttributeError(
            "Processing step must have a 'set_data_accessor' method."
        )

    if self.processing_pipeline is UNSET:
        self.processing_pipeline = []

    self.processing_pipeline.append(step)
    return self