Source code for pipelinex.mlflow_on_kedro.hooks.mlflow.mlflow_basic_logger

import time
from datetime import datetime, timedelta
from importlib.util import find_spec
from logging import getLogger
from typing import Any, Dict, List, Optional, Union  # NOQA

from kedro.io import DataCatalog
from kedro.pipeline import Pipeline

from .mlflow_utils import (
    hook_impl,
    mlflow_end_run,
    mlflow_log_metrics,
    mlflow_log_params,
    mlflow_start_run,
)

log = getLogger(__name__)


[docs]def get_timestamp(dt=None, offset_hours=0, fmt="%Y-%m-%dT%H:%M:%S"): dt = dt or datetime.now() return (dt + timedelta(hours=offset_hours)).strftime(fmt)
[docs]def get_timestamp_int(dt=None, offset_hours=0): return int(get_timestamp(dt=dt, offset_hours=offset_hours, fmt="%Y%m%d%H%M%S"))
[docs]def get_timestamps(dt=None, offset_hours=0): dt = dt or datetime.now() timestamp = get_timestamp(dt, offset_hours=offset_hours) timestamp_int = get_timestamp_int(dt, offset_hours=offset_hours) return timestamp, timestamp_int
[docs]class MLflowBasicLoggerHook: """Configures and logs duration time for the pipeline to MLflow"""
[docs] def __init__( self, uri: str = None, experiment_name: str = None, artifact_location: str = None, run_name: str = None, run_id: str = None, nested: bool = False, tags: Optional[Dict[str, Any]] = None, offset_hours: float = 0, enable_logging_time_begin: bool = True, enable_logging_time_end: bool = True, enable_logging_time: bool = True, logging_kedro_run_params: Union[List[str], str] = [], enable_mlflow: bool = True, ): """ Args: uri: The MLflow tracking server URI. `uri` arg fed to: https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.set_tracking_uri experiment_name: The experiment name. `name` arg fed to: https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.create_experiment artifact_location: `artifact_location` arg fed to: https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.create_experiment run_name: Shown as 'Run Name' in MLflow UI. run_id: An existing MLflow experiment run UUID instead of letting MLflow create a new run under the experiment_name. `run_id` arg fed to: https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.start_run nested: `nested` arg fed to: https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.start_run tags: `tags` arg fed to: https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.start_run offset_hours: The offset hour (e.g. 0 for UTC+00:00) to log in MLflow. 0 in default. enable_logging_time_begin: Enable logging the time the Kedro pipeline began. True in default. enable_logging_time_end: Enable logging the time the Kedro pipeline ended. True in default. enable_logging_time: Enable logging the time duration the Kedro pipeline ran. True in default. logging_kedro_run_params: List of Kedro Run Params to log to MLflow or "__ALL__" to log all. [] (Empty) in default. enable_mlflow: Enable configuring and logging to MLflow. """ self.enable_mlflow = find_spec("mlflow") and enable_mlflow self.uri = uri self.experiment_name = experiment_name self.artifact_location = artifact_location self.run_name = run_name self.offset_hours = offset_hours self.enable_logging_time_begin = enable_logging_time_begin self.enable_logging_time_end = enable_logging_time_end self.enable_logging_time = enable_logging_time self.logging_kedro_run_params = logging_kedro_run_params
[docs] @hook_impl def after_catalog_created(self): mlflow_start_run( uri=self.uri, experiment_name=self.experiment_name, artifact_location=self.artifact_location, run_name=self.run_name, enable_mlflow=self.enable_mlflow, )
[docs] @hook_impl def before_pipeline_run( self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog ): if self.logging_kedro_run_params: run_params_renamed = { ("___" + k): v for (k, v) in run_params.items() if ( isinstance(self.logging_kedro_run_params, str) and self.logging_kedro_run_params == "__ALL__" ) or k in self.logging_kedro_run_params } mlflow_log_params(run_params_renamed, enable_mlflow=self.enable_mlflow) if self.enable_logging_time_begin: timestamp, timestamp_int = get_timestamps(offset_hours=self.offset_hours) time_dict = {"__time_begin": timestamp} time_int_dict = {"__time_begin": timestamp_int} mlflow_log_params(time_dict, enable_mlflow=self.enable_mlflow) mlflow_log_metrics(time_int_dict, enable_mlflow=self.enable_mlflow) if self.enable_logging_time: self._time_begin = time.time()
[docs] @hook_impl def after_pipeline_run( self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog ): if self.enable_logging_time_end: timestamp, timestamp_int = get_timestamps(offset_hours=self.offset_hours) time_dict = {"__time_end": timestamp} time_int_dict = {"__time_end": timestamp_int} mlflow_log_params( time_dict, enable_mlflow=self.enable_mlflow, ) mlflow_log_metrics( time_int_dict, enable_mlflow=self.enable_mlflow, ) if self.enable_logging_time: self._time_end = time.time() self._time = self._time_end - self._time_begin time_dict = {"__time": self._time} mlflow_log_metrics(time_dict, enable_mlflow=self.enable_mlflow) mlflow_end_run(self.enable_mlflow)