File size: 7,195 Bytes
c73381c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec48b04
 
 
 
 
 
c73381c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec48b04
c73381c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f1aa39
c73381c
 
 
 
 
 
 
 
 
 
7f1aa39
 
 
 
 
 
 
 
 
 
 
c73381c
 
 
ec48b04
 
 
 
c73381c
 
 
 
 
 
 
 
 
7f1aa39
c73381c
 
 
7f1aa39
c73381c
 
 
 
 
 
7f1aa39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c73381c
 
 
 
 
ec48b04
 
 
 
c73381c
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# {% include 'templates/license_header' %}

import os
from typing import Optional

import click
from pipelines import (
    feature_engineering,
    inference,
    breast_cancer_training,
    breast_cancer_deployment_pipeline
)
from zenml.client import Client
from zenml.logger import get_logger

logger = get_logger(__name__)


@click.command(
    help="""
ZenML Starter project CLI v0.0.1.

Run the ZenML starter project with basic options.

Examples:

  \b
  # Run the feature engineering pipeline
    python run.py --feature-pipeline
  
  \b
  # Run the training pipeline
    python run.py --training-pipeline

  \b 
  # Run the training pipeline with versioned artifacts
    python run.py --training-pipeline --train-dataset-version-name=1 --test-dataset-version-name=1

  \b
  # Run the inference pipeline
    python run.py --inference-pipeline

"""
)
@click.option(
    "--train-dataset-name",
    default="dataset_trn",
    type=click.STRING,
    help="The name of the train dataset produced by feature engineering.",
)
@click.option(
    "--train-dataset-version-name",
    default=None,
    type=click.STRING,
    help="Version of the train dataset produced by feature engineering. "
    "If not specified, a new version will be created.",
)
@click.option(
    "--test-dataset-name",
    default="dataset_tst",
    type=click.STRING,
    help="The name of the test dataset produced by feature engineering.",
)
@click.option(
    "--test-dataset-version-name",
    default=None,
    type=click.STRING,
    help="Version of the test dataset produced by feature engineering. "
    "If not specified, a new version will be created.",
)
@click.option(
    "--config",
    default=None,
    type=click.STRING,
    help="The name of the config",
)
@click.option(
    "--feature-pipeline",
    is_flag=True,
    default=False,
    help="Whether to run the pipeline that creates the dataset.",
)
@click.option(
    "--training-pipeline",
    is_flag=True,
    default=False,
    help="Whether to run the pipeline that trains the model.",
)
@click.option(
    "--inference-pipeline",
    is_flag=True,
    default=False,
    help="Whether to run the pipeline that performs inference.",
)
@click.option(
    "--deployment-pipeline",
    is_flag=True,
    default=False,
    help="Whether to run the pipeline that deploys the model.",
)
def main(
    train_dataset_name: str = "dataset_trn",
    train_dataset_version_name: Optional[str] = None,
    test_dataset_name: str = "dataset_tst",
    test_dataset_version_name: Optional[str] = None,
    config: Optional[str] = None,
    feature_pipeline: bool = False,
    training_pipeline: bool = False,
    inference_pipeline: bool = False,
    deployment_pipeline: bool = False,
):
    """Main entry point for the pipeline execution.

    This entrypoint is where everything comes together:

      * configuring pipeline with the required parameters
        (some of which may come from command line arguments, but most
        of which comes from the YAML config files)
      * launching the pipeline
    """
    config_folder = os.path.join(
        os.path.dirname(os.path.realpath(__file__)),
        "configs",
    )
    client = Client()

    # Execute Feature Engineering Pipeline
    if feature_pipeline:
        pipeline_args = {}
        pipeline_args["config_path"] = os.path.join(
            config_folder, "feature_engineering.yaml"
        )
        run_args_feature = {}
        feature_engineering.with_options(**pipeline_args)(**run_args_feature)
        logger.info("Feature Engineering pipeline finished successfully!")
        train_dataset_artifact = client.get_artifact_version(
            train_dataset_name
        )
        test_dataset_artifact = client.get_artifact_version(test_dataset_name)
        logger.info(
            "The latest feature engineering pipeline produced the following "
            f"artifacts: \n\n1. Train Dataset - Name: {train_dataset_name}, "
            f"Version Name: {train_dataset_artifact.version} \n2. Test Dataset: "
            f"Name: {test_dataset_name}, Version Name: {test_dataset_artifact.version}"
        )
        
    # Execute Training Pipeline
    if training_pipeline:
        pipeline_args = {}
        if config is None:
            pipeline_args["config_path"] = os.path.join(config_folder, "training.yaml")
        else:
            pipeline_args["config_path"] = os.path.join(config_folder, config)
        run_args_train = {}

        # If train_dataset_version_name is specified, use versioned artifacts
        if train_dataset_version_name or test_dataset_version_name:
            # However, both train and test dataset versions must be specified
            assert (
                train_dataset_version_name is not None
                and test_dataset_version_name is not None
            )
            train_dataset_artifact = client.get_artifact_version(
                train_dataset_name, train_dataset_version_name
            )
            # If train dataset is specified, test dataset must be specified
            test_dataset_artifact = client.get_artifact_version(
                test_dataset_name, test_dataset_version_name
            )
            # Use versioned artifacts
            run_args_train["train_dataset_id"] = train_dataset_artifact.id
            run_args_train["test_dataset_id"] = test_dataset_artifact.id

            from zenml.config import DockerSettings

            # The actual code will stay the same, all that needs to be done is some configuration
            step_args = {}

            # We configure which step operator should be used
            # M5 Large is what we need for this big data!
            step_args["settings"] = {"step_operator.sagemaker": {"estimator_args": {"instance_type" : "ml.m5.large"}}}

            # Update the step. We could also do this in YAML
            model_trainer = model_trainer.with_options(**step_args)

            docker_settings = DockerSettings(
                requirements=[
                    "pyarrow",
                    "scikit-learn==1.1.1"
                ],
            )

            pipeline_args = {
                "enable_cache": True, 
                "settings": {"docker": docker_settings}
            }

        breast_cancer_training.with_options(**pipeline_args)(**run_args_train)
        logger.info("Training pipeline finished successfully!")

    if inference_pipeline:
        pipeline_args = {}
        if config is None:
            pipeline_args["config_path"] = os.path.join(config_folder, "inference.yaml")
        else:
            pipeline_args["config_path"] = os.path.join(config_folder, config) 
        run_args_inference = {}
        inference.with_options(**pipeline_args)(**run_args_inference)
        logger.info("Inference pipeline finished successfully!")

    if deployment_pipeline:
        pipeline_args = {}
        pipeline_args["config_path"] = os.path.join(config_folder, "deployment.yaml")
        run_args_inference = {}
        breast_cancer_deployment_pipeline.with_options(**pipeline_args)(**run_args_inference)
        logger.info("Deployment pipeline finished successfully!")

if __name__ == "__main__":
    main()