pipelinex.flex_kedro.pipeline package

Submodules

pipelinex.flex_kedro.pipeline.pipeline module

class pipelinex.flex_kedro.pipeline.pipeline.FlexiblePipeline(nodes, *, parameters_in_inputs=False, module='', decorator=[], **kwargs)[source]

Bases: kedro.pipeline.pipeline.Pipeline

__init__(nodes, *, parameters_in_inputs=False, module='', decorator=[], **kwargs)[source]

Initialise Pipeline with a list of Node instances.

Parameters:
  • nodes – The iterable of nodes the Pipeline will be made of. If you provide pipelines among the list of nodes, those pipelines will be expanded and all their nodes will become part of this new pipeline.

  • tags – Optional set of tags to be applied to all the pipeline nodes.

Raises:
  • ValueError – When an empty list of nodes is provided, or when not all nodes have unique names.

  • CircularDependencyError – When visiting all the nodes is not possible due to the existence of a circular dependency.

  • OutputNotUniqueError – When multiple Node instances produce the same output.

  • ConfirmNotUniqueError – When multiple Node instances attempt to confirm the same dataset.

Example:

from kedro.pipeline import Pipeline
from kedro.pipeline import node

# In the following scenario first_ds and second_ds
# are data sets provided by io. Pipeline will pass these
# data sets to first_node function and provides the result
# to the second_node as input.

def first_node(first_ds, second_ds):
    return dict(third_ds=first_ds+second_ds)

def second_node(third_ds):
    return third_ds

pipeline = Pipeline([
    node(first_node, ['first_ds', 'second_ds'], ['third_ds']),
    node(second_node, dict(third_ds='third_ds'), 'fourth_ds')])

pipeline.describe()

pipelinex.flex_kedro.pipeline.sub_pipeline module

class pipelinex.flex_kedro.pipeline.sub_pipeline.SubPipeline(inputs=None, outputs=None, func=None, module='', decorator=None, intermediate_node_name_fmt='{}__{:03d}', **kwargs)[source]

Bases: kedro.pipeline.pipeline.Pipeline

__init__(inputs=None, outputs=None, func=None, module='', decorator=None, intermediate_node_name_fmt='{}__{:03d}', **kwargs)[source]

Initialise Pipeline with a list of Node instances.

Parameters:
  • nodes – The iterable of nodes the Pipeline will be made of. If you provide pipelines among the list of nodes, those pipelines will be expanded and all their nodes will become part of this new pipeline.

  • tags – Optional set of tags to be applied to all the pipeline nodes.

Raises:
  • ValueError – When an empty list of nodes is provided, or when not all nodes have unique names.

  • CircularDependencyError – When visiting all the nodes is not possible due to the existence of a circular dependency.

  • OutputNotUniqueError – When multiple Node instances produce the same output.

  • ConfirmNotUniqueError – When multiple Node instances attempt to confirm the same dataset.

Example:

from kedro.pipeline import Pipeline
from kedro.pipeline import node

# In the following scenario first_ds and second_ds
# are data sets provided by io. Pipeline will pass these
# data sets to first_node function and provides the result
# to the second_node as input.

def first_node(first_ds, second_ds):
    return dict(third_ds=first_ds+second_ds)

def second_node(third_ds):
    return third_ds

pipeline = Pipeline([
    node(first_node, ['first_ds', 'second_ds'], ['third_ds']),
    node(second_node, dict(third_ds='third_ds'), 'fourth_ds')])

pipeline.describe()