# Intro to MLOps using ZenML

## üåç Overview

This repository is a minimalistic MLOps project intended as a starting point to learn how to put ML workflows in production. It features: 

- A feature engineering pipeline that loads data and prepares it for training.
- A training pipeline that loads the preprocessed dataset and trains a model.
- A batch inference pipeline that runs predictions on the trained model with new data.

Follow along this notebook to understand how you can use ZenML to productionalize your ML workflows!

<img src="_assets/pipeline_overview.png" width="50%" alt="Pipelines Overview">

## Run on Colab

You can use Google Colab to see ZenML in action, no signup / installation
required!

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](
https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/quickstart/quickstart.ipynb)

# üë∂ Step 0. Install Requirements

Let's install ZenML to get started. First we'll install the latest version of
ZenML as well as the `sklearn` integration of ZenML:

In [None]:
!pip install "zenml[server]"

In [None]:
from zenml.environment import Environment

if Environment.in_google_colab():
    # Install Cloudflare Tunnel binary
    !wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb && dpkg -i cloudflared-linux-amd64.deb

    # Pull required modules from this example
    !git clone -b main https://github.com/zenml-io/zenml
    !cp -r zenml/examples/quickstart/* .
    !rm -rf zenml


In [None]:
!zenml integration install sklearn -y

import IPython
IPython.Application.instance().kernel.do_shutdown(restart=True)

Please wait for the installation to complete before running subsequent cells. At
the end of the installation, the notebook kernel will automatically restart.

Optional: If you are using [ZenML Cloud](https://zenml.io/cloud), execute the following cell with your tenant URL. Otherwise ignore.

In [None]:
zenml_server_url = "PLEASE_UPDATE_ME"  # in the form "https://URL_TO_SERVER"

!zenml connect --url $zenml_server_url

In [None]:
# Initialize ZenML and set the default stack
!zenml init

!zenml stack set default

In [None]:
# Do the imports at the top
from typing_extensions import Annotated
from sklearn.datasets import load_breast_cancer

import random
import pandas as pd
from zenml import step, ExternalArtifact, pipeline, ModelVersion, get_step_context
from zenml.client import Client
from zenml.logger import get_logger
from uuid import UUID

from typing import Optional, List

from zenml import pipeline

from steps import (
    data_loader,
    data_preprocessor,
    data_splitter,
    model_evaluator,
    inference_preprocessor
)

from zenml.logger import get_logger

logger = get_logger(__name__)

# Initialize the ZenML client to fetch objects from the ZenML Server
client = Client()

## ü•á Step 1: Load your data and execute feature engineering

We'll start off by importing our data. In this quickstart we'll be working with
[the Breast Cancer](https://archive.ics.uci.edu/dataset/17/breast+cancer+wisconsin+diagnostic) dataset
which is publicly available on the UCI Machine Learning Repository. The task is a classification
problem, to predict whether a patient is diagnosed with breast cancer or not.

When you're getting started with a machine learning problem you'll want to do
something similar to this: import your data and get it in the right shape for
your training. ZenML mostly gets out of your way when you're writing your Python
code, as you'll see from the following cell.

<img src=".assets/feature_engineering_pipeline.png" width="50%" alt="Feature engineering pipeline" />

In [None]:
@step
def data_loader_simplified(
    random_state: int, is_inference: bool = False, target: str = "target"
) -> Annotated[pd.DataFrame, "dataset"]:  # We name the dataset 
    """Dataset reader step."""
    dataset = load_breast_cancer(as_frame=True)
    inference_size = int(len(dataset.target) * 0.05)
    dataset: pd.DataFrame = dataset.frame
    inference_subset = dataset.sample(inference_size, random_state=random_state)
    if is_inference:
        dataset = inference_subset
        dataset.drop(columns=target, inplace=True)
    else:
        dataset.drop(inference_subset.index, inplace=True)
    dataset.reset_index(drop=True, inplace=True)
    logger.info(f"Dataset with {len(dataset)} records loaded!")
    return dataset


The whole function is decorated with the `@step` decorator, which
tells ZenML to track this function as a step in the pipeline. This means that
ZenML will automatically version, track, and cache the data that is produced by
this function as an `artifact`. This is a very powerful feature, as it means that you can
reproduce your data at any point in the future, even if the original data source
changes or disappears. 

Note the use of the `typing` module's `Annotated` type hint in the output of the
step. We're using this to give a name to the output of the step, which will make
it possible to access it via a keyword later on.

You'll also notice that we have included type hints for the outputs
to the function. These are not only useful for anyone reading your code, but
help ZenML process your data in a way appropriate to the specific data types.

ZenML is built in a way that allows you to experiment with your data and build
your pipelines as you work, so if you want to call this function to see how it
works, you can just call it directly. Here we take a look at the first few rows
of your training dataset.

In [None]:
df = data_loader_simplified(random_state=42)
df.head()

Everything looks as we'd expect and the values are all in the right format ü•≥.

We're now at the point where can bring this step (and some others) together into a single
pipeline, the top-level organising entity for code in ZenML. Creating such a pipeline is
as simple as adding a `@pipeline` decorator to a function. This specific
pipeline doesn't return a value, but that option is available to you if you need.

In [None]:
@pipeline
def feature_engineering(
    test_size: float = 0.3,
    drop_na: Optional[bool] = None,
    normalize: Optional[bool] = None,
    drop_columns: Optional[List[str]] = None,
    target: Optional[str] = "target",
    random_state: int = 17
):
    """Feature engineering pipeline."""
    # Link all the steps together by calling them and passing the output
    # of one step as the input of the next step.
    raw_data = data_loader(random_state=random_state, target=target)
    dataset_trn, dataset_tst = data_splitter(
        dataset=raw_data,
        test_size=test_size,
    )
    dataset_trn, dataset_tst, _ = data_preprocessor(
        dataset_trn=dataset_trn,
        dataset_tst=dataset_tst,
        drop_na=drop_na,
        normalize=normalize,
        drop_columns=drop_columns,
        target=target,
        random_state=random_state,
    )

We're ready to run the pipeline now, which we can do just as with the step - by calling the
pipeline function itself:

In [None]:
feature_engineering()

Let's run this again with a slightly different test size, to create more datasets:

In [None]:
feature_engineering(test_size=0.25)

Notice the second time around, the data loader step was **cached**, while the rest of the pipeline was rerun. 
This is because ZenML automatically determined that nothing had changed in the data loader step, 
so it didn't need to rerun it.

Let's run this again with a slightly different test size and random state, to disable the cache and to create more datasets:

In [None]:
feature_engineering(test_size=0.25, random_state=104)

At this point you might be interested to view your pipeline runs in the ZenML
Dashboard. In case you are not using a hosted instance of ZenML, you can spin this up by executing the next cell. This will start a
server which you can access by clicking on the link that appears in the output
of the cell.

Log into the Dashboard using default credentials (username 'default' and
password left blank). From there you can inspect the pipeline or the specific
pipeline run.


In [None]:
from zenml.environment import Environment
from zenml.zen_stores.rest_zen_store import RestZenStore


if not isinstance(client.zen_store, RestZenStore):
    # Only spin up a local Dashboard in case you aren't already connected to a remote server
    if Environment.in_google_colab():
        # run ZenML through a cloudflare tunnel to get a public endpoint
        !zenml up --port 8237 & cloudflared tunnel --url http://localhost:8237
    else:
        !zenml up

We can also fetch the pipeline from the server and view the results directly in the notebook:

In [None]:
client = Client()
run = client.get_pipeline("feature_engineering").last_run
print(run.name)

We can also see the data artifacts that were produced by the last step of the pipeline:

In [None]:
run.steps["data_preprocessor"].outputs

In [None]:
# Read one of the datasets. This is the one with a 0.25 test split
run.steps["data_preprocessor"].outputs["dataset_trn"].load()

We can also get the artifacts directly. Each time you create a new pipeline run, a new `artifact version` is created.

You can fetch these artifact and their versions using the `client`: 

In [None]:
# Get artifact version from our run
dataset_trn_artifact_version_via_run = run.steps["data_preprocessor"].outputs["dataset_trn"] 

# Get latest version from client directly
dataset_trn_artifact_version = client.get_artifact_version("dataset_trn")

# This should be true if our run is the latest run and no artifact has been produced
#  in the intervening time
dataset_trn_artifact_version_via_run.id == dataset_trn_artifact_version.id

In [None]:
# Fetch the rest of the artifacts
dataset_tst_artifact_version = client.get_artifact_version("dataset_tst")
preprocessing_pipeline_artifact_version = client.get_artifact_version("preprocess_pipeline")

If you started with a fresh install, then you would have two versions corresponding
to the two pipelines that we ran above. We can even load a artifact version in memory:   

In [None]:
# Load an artifact to verify you can fetch it
dataset_trn_artifact_version.load()

We'll use these artifacts from above in our next pipeline

# ‚åö Step 2: Training pipeline

Now that we have our data it makes sense to train some models to get a sense of
how difficult the task is. The Breast Cancer dataset is sufficiently large and complex 
that it's unlikely we'll be able to train a model that behaves perfectly since the problem 
is inherently complex, but we can get a sense of what a reasonable baseline looks like.

We'll start with two simple models, a SGD Classifier and a Random Forest
Classifier, both batteries-included from `sklearn`. We'll train them both on the
same data and then compare their performance.

<img src=".assets/training_pipeline.png" width="50%" alt="Training pipeline">

In [None]:
import pandas as pd
from sklearn.base import ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier
from typing_extensions import Annotated
from zenml import ArtifactConfig, step
from zenml.logger import get_logger

logger = get_logger(__name__)


@step
def model_trainer(
    dataset_trn: pd.DataFrame,
    model_type: str = "sgd",
) -> Annotated[ClassifierMixin, ArtifactConfig(name="sklearn_classifier", is_model_artifact=True)]:
    """Configure and train a model on the training dataset."""
    target = "target"
    if model_type == "sgd":
        model = SGDClassifier()
    elif model_type == "rf":
        model = RandomForestClassifier()
    else:
        raise ValueError(f"Unknown model type {model_type}")   

    logger.info(f"Training model {model}...")

    model.fit(
        dataset_trn.drop(columns=[target]),
        dataset_trn[target],
    )
    return model


Our two training steps both return different kinds of `sklearn` classifier
models, so we use the generic `ClassifierMixin` type hint for the return type.

ZenML allows you to load any version of any dataset that is tracked by the framework
directly into a pipeline using the `ExternalArtifact` interface. This is very convenient
in this case, as we'd like to send our preprocessed dataset from the older pipeline directly
into the training pipeline.

In [None]:
@pipeline
def training(
    train_dataset_id: Optional[UUID] = None,
    test_dataset_id: Optional[UUID] = None,
    model_type: str = "sgd",
    min_train_accuracy: float = 0.0,
    min_test_accuracy: float = 0.0,
):
    """Model training pipeline.""" 
    if train_dataset_id is None or test_dataset_id is None:
        # If we dont pass the IDs, this will run the feature engineering pipeline   
        dataset_trn, dataset_tst = feature_engineering()
    else:
        # Load the datasets from an older pipeline
        dataset_trn = ExternalArtifact(id=train_dataset_id)
        dataset_tst = ExternalArtifact(id=test_dataset_id) 

    trained_model = model_trainer(
        dataset_trn=dataset_trn,
        model_type=model_type,
    )

    model_evaluator(
        model=trained_model,
        dataset_trn=dataset_trn,
        dataset_tst=dataset_tst,
        min_train_accuracy=min_train_accuracy,
        min_test_accuracy=min_test_accuracy,
    )

The end goal of this quick baseline evaluation is to understand which of the two
models performs better. We'll use the `evaluator` step to compare the two
models. This step takes in the model from the trainer step, and computes its score
over the testing set.

In [None]:
# Use a random forest model with the chosen datasets.
# We need to pass the ID's of the datasets into the function
training(
    model_type="rf",
    train_dataset_id=dataset_trn_artifact_version.id,
    test_dataset_id=dataset_tst_artifact_version.id
)

rf_run = client.get_pipeline("training").last_run

In [None]:
# Use a SGD classifier
sgd_run = training(
    model_type="sgd",
    train_dataset_id=dataset_trn_artifact_version.id,
    test_dataset_id=dataset_tst_artifact_version.id
)

sgd_run = client.get_pipeline("training").last_run

You can see from the logs already how our model training went: the
`RandomForestClassifier` performed considerably better than the `SGDClassifier`.
We can use the ZenML `Client` to verify this:

In [None]:
# The evaluator returns a float value with the accuracy
rf_run.steps["model_evaluator"].output.load() > sgd_run.steps["model_evaluator"].output.load()

# üíØ Step 3: Associating a model with your pipeline

You can see it is relatively easy to train ML models using ZenML pipelines. But it can be somewhat clunky to track
all the models produced as you develop your experiments and use-cases. Luckily, ZenML offers a *Model Control Plane*,
which is a central register of all your ML models.

You can easily create a ZenML `Model` and associate it with your pipelines using the `ModelVersion` object:

In [None]:
pipeline_settings = {}

# Lets add some metadata to the model to make it identifiable
pipeline_settings["model_version"] = ModelVersion(
    name="breast_cancer_classifier",
    license="Apache 2.0",
    description="A breast cancer classifier",
    tags=["breast_cancer", "classifier"],
)

In [None]:
# Let's train the SGD model and set the version name to "sgd"
pipeline_settings["model_version"].version = "sgd"

# the `with_options` method allows us to pass in pipeline settings
#  and returns a configured pipeline
training_configured = training.with_options(**pipeline_settings)

# We can now run this as usual
training_configured(
    model_type="sgd",
    train_dataset_id=dataset_trn_artifact_version.id,
    test_dataset_id=dataset_tst_artifact_version.id
)

In [None]:
# Let's train the RF model and set the version name to "rf"
pipeline_settings["model_version"].version = "rf"

# the `with_options` method allows us to pass in pipeline settings
#  and returns a configured pipeline
training_configured = training.with_options(**pipeline_settings)

# Let's run it again to make sure we have two versions
training_configured(
    model_type="rf",
    train_dataset_id=dataset_trn_artifact_version.id,
    test_dataset_id=dataset_tst_artifact_version.id
)

This time, running both pipelines has created two associated **model versions**.
You can list your ZenML model and their versions as follows:

In [None]:
zenml_model = client.get_model("breast_cancer_classifier")
print(zenml_model)

print(f"Model {zenml_model.name} has {len(zenml_model.versions)} versions")

zenml_model.versions[0].version, zenml_model.versions[1].version

The interesting part is that ZenML went ahead and linked all artifacts produced by the
pipelines to that model version, including the two pickle files that represent our
SGD and RandomForest classifier. We can see all artifacts directly from the model
version object:

In [None]:
# Let's load the RF version
rf_zenml_model_version = client.get_model_version("breast_cancer_classifier", "rf")

# We can now load our classifier directly as well
random_forest_classifier = rf_zenml_model_version.get_artifact("sklearn_classifier").load()

random_forest_classifier

If you are a [ZenML Cloud](https://zenml.io/cloud) user, you can see all of this visualized in the dashboard:

<img src=".assets/cloud_mcp_screenshot.png" width="70%" alt="Model Control Plane">

There is a lot more you can do with ZenML models, including the ability to
track metrics by adding metadata to it, or having them persist in a model
registry. However, these topics can be explored more in the
[ZenML docs](https://docs.zenml.io).

For now, we will use the ZenML model control plane to promote our best
model to `production`. You can do this by simply setting the `stage` of
your chosen model version to the `production` tag.

In [None]:
# Set our best classifier to production
rf_zenml_model_version.set_stage("production", force=True)

Of course, normally one would only promote the model by comparing to all other model
versions and doing some other tests. But that's a bit more advanced use-case. See the
[e2e_batch example](https://github.com/zenml-io/zenml/tree/main/examples/e2e) to get
more insight into that sort of flow!

<img src=".assets/cloud_mcp.png" width="60%" alt="Model Control Plane">

Once the model is promoted, we can now consume the right model version in our
batch inference pipeline directly. Let's see how that works.

# ü´Ö Step 4: Consuming the model in production

The batch inference pipeline simply takes the model marked as `production` and runs inference on it
with `live data`. The critical step here is the `inference_predict` step, where we load the model in memory
and generate predictions:

<img src=".assets/inference_pipeline.png" width="45%" alt="Inference pipeline">

In [None]:
@step
def inference_predict(dataset_inf: pd.DataFrame) -> Annotated[pd.Series, "predictions"]:
    """Predictions step"""
    # Get the model_version
    model_version = get_step_context().model_version

    # run prediction from memory
    predictor = model_version.load_artifact("sklearn_classifier")
    predictions = predictor.predict(dataset_inf)

    predictions = pd.Series(predictions, name="predicted")

    return predictions


Apart from the loading the model, we must also load the preprocessing pipeline that we ran in feature engineering,
so that we can do the exact steps that we did on training time, in inference time. Let's bring it all together:

In [None]:
@pipeline
def inference(preprocess_pipeline_id: UUID):
    """Model batch inference pipeline"""
    # random_state = client.get_artifact_version(id=preprocess_pipeline_id).metadata["random_state"].value
    # target = client.get_artifact_version(id=preprocess_pipeline_id).run_metadata['target'].value
    random_state = 42
    target = "target"

    df_inference = data_loader(
        random_state=random_state, is_inference=True
    )
    df_inference = inference_preprocessor(
        dataset_inf=df_inference,
        # We use the preprocess pipeline from the feature engineering pipeline
        preprocess_pipeline=ExternalArtifact(id=preprocess_pipeline_id),
        target=target,
    )
    inference_predict(
        dataset_inf=df_inference,
    )


The way to load the right model is to pass in the `production` stage into the `ModelVersion` config this time.
This will ensure to always load the production model, decoupled from all other pipelines:

In [None]:
pipeline_settings = {"enable_cache": False}

# Lets add some metadata to the model to make it identifiable
pipeline_settings["model_version"] = ModelVersion(
    name="breast_cancer_classifier",
    version="production", # We can pass in the stage name here!
    license="Apache 2.0",
    description="A breast cancer classifier",
    tags=["breast_cancer", "classifier"],
)

In [None]:
# the `with_options` method allows us to pass in pipeline settings
#  and returns a configured pipeline
inference_configured = inference.with_options(**pipeline_settings)

# Let's run it again to make sure we have two versions
# We need to pass in the ID of the preprocessing done in the feature engineering pipeline
# in order to avoid training-serving skew
inference_configured(
    preprocess_pipeline_id=preprocessing_pipeline_artifact_version.id
)

ZenML automatically links all artifacts to the `production` model version as well, including the predictions
that were returned in the pipeline. This completes the MLOps loop of training to inference:

In [None]:
# Fetch production model
production_model_version = client.get_model_version("breast_cancer_classifier", "production")

# Get the predictions artifact
production_model_version.get_artifact("predictions").load()

You can also see all predictions ever created as a complete history in the dashboard:

<img src=".assets/cloud_mcp_predictions.png" width="70%" alt="Model Control Plane">

## Congratulations!

You're a legit MLOps engineer now! You trained two models, evaluated them against
a test set, registered the best one with the ZenML model control plane,
and served some predictions. You also learned how to iterate on your models and
data by using some of the ZenML utility abstractions. You saw how to view your
artifacts and models via the client as well as the ZenML Dashboard.

## Further exploration

This was just the tip of the iceberg of what ZenML can do; check out the [**docs**](https://docs.zenml.io/) to learn more
about the capabilities of ZenML. For example, you might want to:

- [Deploy ZenML](https://docs.zenml.io/user-guide/production-guide/connect-deployed-zenml) to collaborate with your colleagues.
- Run the same pipeline on a [cloud MLOps stack in production](https://docs.zenml.io/user-guide/production-guide/cloud-stack).
- Track your metrics in an experiment tracker like [MLflow](https://docs.zenml.io/stacks-and-components/component-guide/experiment-trackers/mlflow).

## What next?

* If you have questions or feedback... join our [**Slack Community**](https://zenml.io/slack) and become part of the ZenML family!
* If you want to quickly get started with ZenML, check out the [ZenML Cloud](https://zenml.io/cloud).