Source code for pipelinex.framework.context.flexible_catalog_context

import copy
import logging
from pathlib import Path
from typing import Any, Dict  # NOQA

from .context import KedroContext
from kedro.io import DataCatalog
from pipelinex import HatchDict

log = logging.getLogger(__name__)


[docs]class FlexibleCatalogContext(KedroContext): """ Convert Kedrex's Syntactic Sugar to pure Kedro Catalog. """ def _create_catalog( # pylint: disable=no-self-use,too-many-arguments self, conf_catalog: Dict[str, Any], conf_creds: Dict[str, Any], save_version: str = None, journal: Any = None, load_versions: Dict[str, str] = None, ) -> DataCatalog: """DataCatalog instantiation. Allow whether to apply CachedDataSet using `cached` key. Returns: DataCatalog defined in `catalog.yml`. """ conf_catalog = self._format_kedro_catalog(conf_catalog) return DataCatalog.from_config( conf_catalog, conf_creds, save_version=save_version, load_versions=load_versions, ) def _format_kedro_catalog(self, conf_catalog): conf_catalog = HatchDict(conf_catalog).get() default_dict = {} if "/" in conf_catalog: default_dict = conf_catalog.pop("/") if "PIPELINE_JSON_TEXT" in conf_catalog: pipeline_json_text_dataset = conf_catalog.pop("PIPELINE_JSON_TEXT") assert isinstance(pipeline_json_text_dataset, dict) pipeline_json_text_dataset.setdefault( "type", "kedro.extras.datasets.text.TextDataSet" ) self._pipeline_json_text_dataset = HatchDict( pipeline_json_text_dataset, obj_key="type" ).get() conf_catalog_processed = {} for ds_name, ds_dict_ in conf_catalog.items(): ds_dict = copy.deepcopy(default_dict) if isinstance(ds_dict_, dict): ds_dict.update(ds_dict_) _check_type(ds_dict) ds_name, ds_dict = self._format_kedro_dataset(ds_name, ds_dict) conf_catalog_processed[ds_name] = ds_dict return conf_catalog_processed def _format_kedro_dataset(self, ds_name, ds_dict): ds_name, ds_dict = self._set_filepath(ds_name, ds_dict) ds_name, ds_dict = self._enable_caching(ds_name, ds_dict) return ds_name, ds_dict def _set_filepath(self, ds_name, ds_dict): if not any( [ (key in ds_dict) for key in ["filepath", "path", "url", "urls", "table_name", "sql"] ] ): ds_dict["filepath"] = ds_name ds_name = Path(ds_name).stem return ds_name, ds_dict def _enable_caching(self, ds_name, ds_dict): cached = False if "cached" in ds_dict: cached = ds_dict.pop("cached") if cached and (ds_dict.get("type") != "kedro.io.CachedDataSet"): ds_dict = { "type": "kedro.io.CachedDataSet", "dataset": ds_dict, } return ds_name, ds_dict
def _check_type(ds_dict): type = ds_dict.get("type") if type and (not type.endswith("DataSet")): log.warning("type: '{}' does not end with 'DataSet'.".format(type))