## Introduction to Kedro
### Why the unified data interface framework is needed
Machine Learning projects involves with loading and saving various data in various ways such as:
- files in local/network file system, Hadoop Distributed File System (HDFS), Amazon S3, Google Cloud Storage
- e.g. CSV, JSON, YAML, pickle, images, models, etc.
- databases
- Postgresql, MySQL etc.
- Spark
- REST API (HTTP(S) requests)
It is often the case that many Machine Learning Engineers code both data loading/saving and data transformation mixed in the same Python module or Jupyter notebook during experimentation/prototyping phase and suffer later on because:
- During experimentation/prototyping, we often want to save the intermediate data after each transformation.
- In production environments, we often want to skip saving data to minimize latency and storage space.
- To benchmark the performance or troubleshoot, we often want to switch the data source.
- e.g. read image files in local storage or download images through REST API
The proposed solution is the unified data interface.
Here is a simple demo example to predict survival on the [Titanic](https://www.kaggle.com/c/titanic/data).
Pipeline visualized by Kedro-viz
Common code to define the tasks/operations/transformations:
```python
# Define tasks
def train_model(model, df, cols_features, col_target):
# train a model here
return model
def run_inference(model, df, cols_features):
# run inference here
return df
```
It is notable that you do _not_ need to add any Kedro-related code here to use Kedro later on.
Furthermore, you do _not_ need to add any MLflow-related code here to use MLflow later on as Kedro hooks provided by PipelineX can handle behind the scenes.
This advantage enables you to keep your pipelines for experimentation/prototyping/benchmarking production-ready.
1. Plain code:
```python
# Configure: can be written in a config file (YAML, JSON, etc.)
train_data_filepath = "data/input/train.csv"
train_data_load_args = {"float_precision": "high"}
test_data_filepath = "data/input/test.csv"
test_data_load_args = {"float_precision": "high"}
pred_data_filepath = "data/load/pred.csv"
pred_data_save_args = {"index": False, "float_format": "%.16e"}
model_kind = "LogisticRegression"
model_params_dict = {
"C": 1.23456
"max_iter": 987
"random_state": 42
}
# Run tasks
import pandas as pd
if model_kind == "LogisticRegression":
from sklearn.linear_model import LogisticRegression
model = LogisticRegression(**model_params_dict)
train_df = pd.read_csv(train_data_filepath, **train_data_load_args)
model = train_model(model, train_df)
test_df = pd.read_csv(test_data_filepath, **test_data_load_args)
pred_df = run_inference(model, test_df)
pred_df.to_csv(pred_data_filepath, **pred_data_save_args)
```
2. Following the data interface framework, objects with `_load`, and `_save` methods, proposed by [Kedro](https://github.com/quantumblacklabs/kedro) and supported by PipelineX:
```python
# Define a data interface: better ones such as "CSVDataSet" are provided by Kedro
import pandas as pd
from pathlib import Path
class CSVDataSet:
def __init__(self, filepath, load_args={}, save_args={}):
self._filepath = filepath
self._load_args = {}
self._load_args.update(load_args)
self._save_args = {"index": False}
self._save_args.update(save_args)
def _load(self) -> pd.DataFrame:
return pd.read_csv(self._filepath, **self._load_args)
def _save(self, data: pd.DataFrame) -> None:
save_path = Path(self._filepath)
save_path.parent.mkdir(parents=True, exist_ok=True)
data.to_csv(str(save_path), **self._save_args)
# Configure data interface: can be written in catalog config file using Kedro
train_dataset = CSVDataSet(
filepath="data/input/train.csv",
load_args={"float_precision": "high"},
# save_args={"float_format": "%.16e"}, # You can set save_args for future use
)
test_dataset = CSVDataSet(
filepath="data/input/test.csv",
load_args={"float_precision": "high"},
# save_args={"float_format": "%.16e"}, # You can set save_args for future use
)
pred_dataset = CSVDataSet(
filepath="data/load/pred.csv",
# load_args={"float_precision": "high"}, # You can set load_args for future use
save_args={"float_format": "%.16e"},
)
model_kind = "LogisticRegression"
model_params_dict = {
"C": 1.23456
"max_iter": 987
"random_state": 42
}
cols_features = [
"Pclass", # The passenger's ticket class
"Parch", # # of parents / children aboard the Titanic
]
col_target = "Survived" # Column used as the target: whether the passenger survived or not
# Run tasks: can be configured as a pipeline using Kedro
# and can be written in parameters config file using PipelineX
if model_kind == "LogisticRegression":
from sklearn.linear_model import LogisticRegression
model = LogisticRegression(**model_params_dict)
train_df = train_dataset._load()
model = train_model(model, train_df, cols_features, col_target)
test_df = test_dataset._load()
pred_df = run_inference(model, test_df, cols_features)
pred_dataset._save(pred_df)
```
Just following the data interface framework might be somewhat beneficial in the long run, but not enough.
Let's see what Kedro and PipelineX can do.
### Kedro overview
Kedro is a Python package to develop pipelines consisting of:
- data interface sets (data loading/saving wrappers, called "DataSets", that follows the unified data interface framework) such as:
- [`pandas.CSVDataSet`](https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.pandas.CSVDataSet.html#kedro.extras.datasets.pandas.CSVDataSet): a CSV file in local or cloud (Amazon S3, Google Cloud Storage) utilizing [filesystem_spec (`fsspec`)](https://github.com/intake/filesystem_spec)
- [`pickle.PickleDataSet`](https://kedro.readthedocs.io/en/latest/kedro.extras.datasets.pickle.PickleDataSet.html): a pickle file in local or cloud (Amazon S3, Google Cloud Storage) utilizing [filesystem_spec (`fsspec`)](https://github.com/intake/filesystem_spec)
- [`pandas.SQLTableDataSet`](https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.pandas.SQLTableDataSet.html#kedro.extras.datasets.pandas.SQLTableDataSet): a table data in an SQL database supported by [SQLAlchemy](https://www.sqlalchemy.org/features.html)
- [data interface sets for Spark, Google BigQuery, Feather, HDF, Parquet, Matplotlib, NetworkX, Excel, and more provided by Kedro](https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.html#data-sets)
- Custom data interface sets provided by Kedro users
- tasks/operations/transformations (called "Nodes") provided by Kedro users such as:
- data pre-processing
- training a model
- inference using a model
- inter-task dependency provided by Kedro users
Kedro pipelines can be run sequentially or in parallel.
Regarding Kedro, please see:
- <[Kedro's document](https://kedro.readthedocs.io/en/stable/)>
- <[YouTube playlist: Writing Data Pipelines with Kedro](https://www.youtube.com/playlist?list=PLTU89LAWKRwEdiDKeMOU2ye6yU9Qd4MRo)>
- <[Python Packages for Pipeline/Workflow](https://github.com/Minyus/Python_Packages_for_Pipeline_Workflow)>
Here is a simple example Kedro project.
```yaml
# catalog.yml
train_df:
type: pandas.CSVDataSet # short for kedro.extras.datasets.pandas.CSVDataSet
filepath: data/input/train.csv
load_args:
float_precision: high
# save_args: # You can set save_args for future use
# float_format": "%.16e"
test_df:
type: pandas.CSVDataSet # short for kedro.extras.datasets.pandas.CSVDataSet
filepath: data/input/test.csv
load_args:
float_precision: high
# save_args: # You can set save_args for future use
# float_format": "%.16e"
pred_df:
type: pandas.CSVDataSet # short for kedro.extras.datasets.pandas.CSVDataSet
filepath: data/load/pred.csv
# load_args: # You can set load_args for future use
# float_precision: high
save_args:
float_format: "%.16e"
```
```yaml
# parameters.yml
model:
!!python/object:sklearn.linear_model.LogisticRegression
C: 1.23456
max_iter: 987
random_state: 42
cols_features: # Columns used as features in the Titanic data table
- Pclass # The passenger's ticket class
- Parch # # of parents / children aboard the Titanic
col_target: Survived # Column used as the target: whether the passenger survived or not
```
```python
# pipeline.py
from kedro.pipeline import Pipeline, node
from my_module import train_model, run_inference
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=train_model,
inputs=["params:model", "train_df", "params:cols_features", "params:col_target"],
outputs="model",
),
node(
func=run_inference,
inputs=["model", "test_df", "params:cols_features"],
outputs="pred_df",
),
]
)
```
```python
# run.py
from kedro.runner import SequntialRunner
# Set up ProjectContext here
context = ProjectContext()
context.run(pipeline_name="__default__", runner=SequentialRunner())
```
Kedro pipelines can be visualized using [kedro-viz](https://github.com/quantumblacklabs/kedro-viz).
Kedro pipelines can be productionized using:
- [kedro-airflow](https://github.com/quantumblacklabs/kedro-airflow): converts a Kedro pipeline into Airflow Python operators.
- [kedro-docker](https://github.com/quantumblacklabs/kedro-docker): builds a Docker image that can run a Kedro pipeline
- [kedro-argo](https://github.com/nraw/kedro-argo): converts a Kedro pipeline into an Argo (backend of Kubeflow) pipeline