{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "081d5616",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[1;35mNumExpr defaulting to 8 threads.\u001b[0m\n",
"\u001b[?25l\u001b[2;36mFound existing ZenML repository at path \u001b[0m\n",
"\u001b[2;32m'/home/apenner/PycharmProjects/template-starter/template'\u001b[0m\u001b[2;36m.\u001b[0m\n",
"\u001b[2;32m⠋\u001b[0m\u001b[2;36m Initializing ZenML repository at \u001b[0m\n",
"\u001b[2;36m/home/apenner/PycharmProjects/template-starter/template.\u001b[0m\n",
"\u001b[2K\u001b[1A\u001b[2K\u001b[1A\u001b[2K\u001b[32m⠋\u001b[0m Initializing ZenML repository at \n",
"/home/apenner/PycharmProjects/template-starter/template.\n",
"\n",
"\u001b[1A\u001b[2K\u001b[1A\u001b[2K\u001b[1A\u001b[2K\u001b[1;35mNumExpr defaulting to 8 threads.\u001b[0m\n",
"\u001b[2K\u001b[2;36mActive repository stack set to: \u001b[0m\u001b[2;32m'default'\u001b[0m.\n",
"\u001b[2K\u001b[32m⠙\u001b[0m Setting the repository active stack to 'default'...t'...\u001b[0m\n",
"\u001b[1A\u001b[2K"
]
}
],
"source": [
"!zenml init\n",
"!zenml stack set default"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "79f775f2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[1;35mNumExpr defaulting to 8 threads.\u001b[0m\n"
]
}
],
"source": [
"# Do the imports at the top\n",
"\n",
"import random\n",
"from zenml import ExternalArtifact, pipeline \n",
"from zenml.client import Client\n",
"from zenml.logger import get_logger\n",
"from uuid import UUID\n",
"\n",
"import os\n",
"from typing import Optional, List\n",
"\n",
"from zenml import pipeline\n",
"\n",
"from steps import (\n",
" data_loader,\n",
" data_preprocessor,\n",
" data_splitter,\n",
" model_evaluator,\n",
" model_trainer,\n",
" inference_predict,\n",
" inference_preprocessor\n",
")\n",
"\n",
"logger = get_logger(__name__)\n",
"\n",
"client = Client()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "b50a9537",
"metadata": {},
"outputs": [],
"source": [
"@pipeline\n",
"def feature_engineering(\n",
" test_size: float = 0.2,\n",
" drop_na: Optional[bool] = None,\n",
" normalize: Optional[bool] = None,\n",
" drop_columns: Optional[List[str]] = None,\n",
" target: Optional[str] = \"target\",\n",
"):\n",
" \"\"\"\n",
" Feature engineering pipeline.\n",
"\n",
" This is a pipeline that loads the data, processes it and splits\n",
" it into train and test sets.\n",
"\n",
" Args:\n",
" test_size: Size of holdout set for training 0.0..1.0\n",
" drop_na: If `True` NA values will be removed from dataset\n",
" normalize: If `True` dataset will be normalized with MinMaxScaler\n",
" drop_columns: List of columns to drop from dataset\n",
" target: Name of target column in dataset\n",
" \"\"\"\n",
" ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###\n",
" # Link all the steps together by calling them and passing the output\n",
" # of one step as the input of the next step.\n",
" raw_data = data_loader(random_state=random.randint(0, 100), target=target)\n",
" dataset_trn, dataset_tst = data_splitter(\n",
" dataset=raw_data,\n",
" test_size=test_size,\n",
" )\n",
" dataset_trn, dataset_tst, _ = data_preprocessor(\n",
" dataset_trn=dataset_trn,\n",
" dataset_tst=dataset_tst,\n",
" drop_na=drop_na,\n",
" normalize=normalize,\n",
" drop_columns=drop_columns,\n",
" target=target,\n",
" )\n",
" \n",
" return dataset_trn, dataset_tst"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "bc5feef4-7016-420e-9af9-2e87ff666f74",
"metadata": {},
"outputs": [],
"source": [
"pipeline_args = {}\n",
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"feature_engineering.yaml\")\n",
"fe_p_configured = feature_engineering.with_options(**pipeline_args)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "75cf3740-b2d8-4c4b-b91b-dc1637000880",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[1;35mInitiating a new run for the pipeline: \u001b[0m\u001b[1;36mfeature_engineering\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mReusing registered version: \u001b[0m\u001b[1;36m(version: 1)\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mNew model version \u001b[0m\u001b[1;36m34\u001b[1;35m was created.\u001b[0m\n",
"\u001b[1;35mExecuting a new run.\u001b[0m\n",
"\u001b[1;35mUsing user: \u001b[0m\u001b[1;36malexej@zenml.io\u001b[1;35m\u001b[0m\n",
"\u001b[1;35mUsing stack: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35m artifact_store: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35m orchestrator: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_loader\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mDataset with 541 records loaded!\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_loader\u001b[1;35m has finished in \u001b[0m\u001b[1;36m6.777s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_splitter\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_splitter\u001b[1;35m has finished in \u001b[0m\u001b[1;36m11.345s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_preprocessor\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_preprocessor\u001b[1;35m has finished in \u001b[0m\u001b[1;36m14.866s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mRun \u001b[0m\u001b[1;36mfeature_engineering-2023_12_06-09_08_46_821042\u001b[1;35m has finished in \u001b[0m\u001b[1;36m36.198s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mDashboard URL: https://1cf18d95-zenml.cloudinfra.zenml.io/workspaces/default/pipelines/52874ade-f314-45ab-b9bf-e95fb29290b8/runs/9d9e49b1-d78f-478b-991e-da87b0560512/dag\u001b[0m\n"
]
}
],
"source": [
"latest_run = fe_p_configured()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "69ade540",
"metadata": {},
"outputs": [],
"source": [
"@pipeline\n",
"def training(\n",
" train_dataset_id: Optional[UUID] = None,\n",
" test_dataset_id: Optional[UUID] = None,\n",
" min_train_accuracy: float = 0.0,\n",
" min_test_accuracy: float = 0.0,\n",
"):\n",
" \"\"\"\n",
" Model training pipeline.\n",
"\n",
" This is a pipeline that loads the data, processes it and splits\n",
" it into train and test sets, then search for best hyperparameters,\n",
" trains and evaluates a model.\n",
"\n",
" Args:\n",
" test_size: Size of holdout set for training 0.0..1.0\n",
" drop_na: If `True` NA values will be removed from dataset\n",
" normalize: If `True` dataset will be normalized with MinMaxScaler\n",
" drop_columns: List of columns to drop from dataset\n",
" \"\"\"\n",
" ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###\n",
" # Link all the steps together by calling them and passing the output\n",
" # of one step as the input of the next step.\n",
" \n",
" # Execute Feature Engineering Pipeline\n",
" if train_dataset_id is None or test_dataset_id is None:\n",
" dataset_trn, dataset_tst = feature_engineering()\n",
" else:\n",
" dataset_trn = ExternalArtifact(id=train_dataset_id)\n",
" dataset_tst = ExternalArtifact(id=test_dataset_id)\n",
" \n",
" model = model_trainer(\n",
" dataset_trn=dataset_trn,\n",
" )\n",
"\n",
" model_evaluator(\n",
" model=model,\n",
" dataset_trn=dataset_trn,\n",
" dataset_tst=dataset_tst,\n",
" min_train_accuracy=min_train_accuracy,\n",
" min_test_accuracy=min_test_accuracy,\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "5b1f78df",
"metadata": {},
"outputs": [],
"source": [
"pipeline_args = {}\n",
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"training.yaml\")\n",
"fe_t_configured = training.with_options(**pipeline_args)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "acf306a5",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[1;35mInitiating a new run for the pipeline: \u001b[0m\u001b[1;36mtraining\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mRegistered new version: \u001b[0m\u001b[1;36m(version 2)\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mNew model version \u001b[0m\u001b[1;36m35\u001b[1;35m was created.\u001b[0m\n",
"\u001b[1;35mExecuting a new run.\u001b[0m\n",
"\u001b[1;35mUsing user: \u001b[0m\u001b[1;36malexej@zenml.io\u001b[1;35m\u001b[0m\n",
"\u001b[1;35mUsing stack: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35m artifact_store: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35m orchestrator: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_loader\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mDataset with 541 records loaded!\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_loader\u001b[1;35m has finished in \u001b[0m\u001b[1;36m7.368s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_splitter\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_splitter\u001b[1;35m has finished in \u001b[0m\u001b[1;36m11.009s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_preprocessor\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_preprocessor\u001b[1;35m has finished in \u001b[0m\u001b[1;36m14.134s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mCaching \u001b[0m\u001b[1;36mdisabled\u001b[1;35m explicitly for \u001b[0m\u001b[1;36mmodel_trainer\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mmodel_trainer\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mTraining model DecisionTreeClassifier()...\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mmodel_trainer\u001b[1;35m has finished in \u001b[0m\u001b[1;36m7.035s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mmodel_evaluator\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mTrain accuracy=100.00%\u001b[0m\n",
"\u001b[1;35mTest accuracy=92.66%\u001b[0m\n",
"\u001b[1;35mImplicitly linking artifact \u001b[0m\u001b[1;36moutput\u001b[1;35m to model \u001b[0m\u001b[1;36mbreast_cancer_classifier\u001b[1;35m version \u001b[0m\u001b[1;36m35\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mmodel_evaluator\u001b[1;35m has finished in \u001b[0m\u001b[1;36m6.050s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mRun \u001b[0m\u001b[1;36mtraining-2023_12_06-09_09_41_413455\u001b[1;35m has finished in \u001b[0m\u001b[1;36m51.278s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mDashboard URL: https://1cf18d95-zenml.cloudinfra.zenml.io/workspaces/default/pipelines/787c6360-4499-4e2e-8d50-edaaa3956a6f/runs/2a335b9c-bb8e-425c-80e2-0a6cc0ffe56a/dag\u001b[0m\n"
]
}
],
"source": [
"fe_t_configured()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "ad6aa280",
"metadata": {},
"outputs": [],
"source": [
"from typing import Optional\n",
"\n",
"import pandas as pd\n",
"from typing_extensions import Annotated\n",
"\n",
"from zenml import get_step_context, step\n",
"from zenml.logger import get_logger\n",
"\n",
"logger = get_logger(__name__)\n",
"\n",
"\n",
"@step\n",
"def inference_predict(\n",
" dataset_inf: pd.DataFrame,\n",
") -> Annotated[pd.Series, \"predictions\"]:\n",
" \"\"\"Predictions step.\n",
"\n",
" This is an example of a predictions step that takes the data in and returns\n",
" predicted values.\n",
"\n",
" This step is parameterized, which allows you to configure the step\n",
" independently of the step code, before running it in a pipeline.\n",
" In this example, the step can be configured to use different input data.\n",
" See the documentation for more information:\n",
"\n",
" https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines\n",
"\n",
" Args:\n",
" dataset_inf: The inference dataset.\n",
"\n",
" Returns:\n",
" The predictions as pandas series\n",
" \"\"\"\n",
" ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###\n",
" model_version = get_step_context().model_version\n",
"\n",
" print(model_version)\n",
"\n",
" # run prediction from memory\n",
" predictor = model_version.load_artifact(\"model\")\n",
" predictions = predictor.predict(dataset_inf)\n",
"\n",
" print(predictions)\n",
" predictions = pd.Series(predictions, name=\"predicted\")\n",
" ### YOUR CODE ENDS HERE ###\n",
"\n",
" return predictions\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "517ad39d",
"metadata": {},
"outputs": [],
"source": [
"@pipeline\n",
"def batch_inference():\n",
" \"\"\"\n",
" Model batch inference pipeline.\n",
"\n",
" This is a pipeline that loads the inference data, processes\n",
" it, analyze for data drift and run inference.\n",
" \"\"\"\n",
" ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###\n",
" # Link all the steps together by calling them and passing the output\n",
" # of one step as the input of the next step.\n",
" ########## ETL stage ##########\n",
" random_state = client.get_artifact(\"dataset\").run_metadata[\"random_state\"].value\n",
" target = client.get_artifact(\"dataset_trn\").run_metadata['target'].value\n",
" df_inference = data_loader(\n",
" random_state=random_state, is_inference=True\n",
" )\n",
" df_inference = inference_preprocessor(\n",
" dataset_inf=df_inference,\n",
" preprocess_pipeline=ExternalArtifact(name=\"preprocess_pipeline\"),\n",
" target=target,\n",
" )\n",
" inference_predict(\n",
" dataset_inf=df_inference,\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "f0d9ebb6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[1;35m\u001b[0m\u001b[1;36mversion\u001b[1;35m \u001b[0m\u001b[1;36mproduction\u001b[1;35m matches one of the possible \u001b[0m\u001b[1;36mModelStages\u001b[1;35m and will be fetched using stage.\u001b[0m\n"
]
}
],
"source": [
"pipeline_args = {}\n",
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"inference.yaml\")\n",
"fe_b_configured = batch_inference.with_options(**pipeline_args)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "9901c6d0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[33mUsing an external artifact as step input currently invalidates caching for the step and all downstream steps. Future releases will introduce hashing of artifacts which will improve this behavior.\u001b[0m\n",
"\u001b[1;35mInitiating a new run for the pipeline: \u001b[0m\u001b[1;36mbatch_inference\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mReusing registered version: \u001b[0m\u001b[1;36m(version: 1)\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mExecuting a new run.\u001b[0m\n",
"\u001b[1;35mUsing user: \u001b[0m\u001b[1;36malexej@zenml.io\u001b[1;35m\u001b[0m\n",
"\u001b[1;35mUsing stack: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35m artifact_store: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35m orchestrator: \u001b[0m\u001b[1;36mdefault\u001b[1;35m\u001b[0m\n",
"\u001b[1;35mUsing cached version of \u001b[0m\u001b[1;36mdata_loader\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36mdata_loader\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36minference_preprocessor\u001b[1;35m has started.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36minference_preprocessor\u001b[1;35m has finished in \u001b[0m\u001b[1;36m8.661s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36minference_predict\u001b[1;35m has started.\u001b[0m\n",
"name='breast_cancer_classifier' license='Apache 2.0' description='Classification of Breast Cancer Dataset.' audience=None use_cases=None limitations=None trade_offs=None ethics=None tags=['classification', 'sklearn'] version='production' save_models_to_registry=True suppress_class_validation_warnings=True was_created_in_this_run=False\n",
"\u001b[33mYou specified both an ID as well as a version of the artifacts. Ignoring the version and fetching the artifacts by ID.\u001b[0m\n",
"\u001b[33mYour artifact was materialized under Python version 'unknown' but you are currently using '3.9.13'. This might cause unexpected behavior since pickle is not reproducible across Python versions. Attempting to load anyway...\u001b[0m\n",
"\u001b[33mCould not import Azure service connector: No module named 'azure.identity'.\u001b[0m\n",
"[1 0 0 1 1 0 0 0 0 1 1 0 1 0 1 0 1 1 1 0 0 1 0 1 1 1 1 1]\n",
"\u001b[1;35mStep \u001b[0m\u001b[1;36minference_predict\u001b[1;35m has finished in \u001b[0m\u001b[1;36m18.218s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mRun \u001b[0m\u001b[1;36mbatch_inference-2023_12_06-09_11_29_924914\u001b[1;35m has finished in \u001b[0m\u001b[1;36m32.726s\u001b[1;35m.\u001b[0m\n",
"\u001b[1;35mDashboard URL: https://1cf18d95-zenml.cloudinfra.zenml.io/workspaces/default/pipelines/2979acb2-c862-480a-8f50-a2be4c76a8a2/runs/7886e370-b05a-4205-931e-e4994fabd897/dag\u001b[0m\n"
]
}
],
"source": [
"fe_b_configured()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "98d39df8",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "51690802-31a7-4e6d-9f88-e6457c6c4a96",
"metadata": {},
"source": [
"# Huggingface Model to Sagemaker Endpoint: Automating MLOps with ZenML\n",
"Deploying Huggingface models to AWS Sagemaker endpoints typically only requires a few lines of code. However, there's a growing demand to not just deploy, but to seamlessly automate the entire flow from training to production with comprehensive lineage tracking. ZenML adeptly fills this niche, providing an end-to-end MLOps solution for Huggingface users wishing to deploy to Sagemaker. Below, we’ll walk through the architecture that ZenML employs to bring a Huggingface model into production with AWS Sagemaker. Of course all of this can be adapted to not just Sagemaker, but any other model deployment service like GCP Vertex or Azure ML Platform.\n",
"\n",
"This blog post showcases one way of using ZenML pipelines to achieve this:\n",
"\n",
"- Create and version a dataset in a feature_engineering_pipeline.\n",
"- Train/Finetune a BERT-based Sentiment Analysis NLP model and push to Huggingface Hub in a training_pipeline.\n",
"- Promote this model to Production by comparing to previous models in a promotion_pipeline.\n",
"- Deploy the model at the Production Stage to a AWS Sagemaker endpoint with a deployment_pipeline.\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "500e3c24-b105-4a69-b2fc-e0ce1f1c1d46",
"metadata": {},
"outputs": [],
"source": [
"# Do the imports at the top\n",
"\n",
"import numpy as np\n",
"from datasets import DatasetDict, load_dataset\n",
"from typing_extensions import Annotated\n",
"from zenml import step\n",
"from zenml.logger import get_logger\n",
"\n",
"import os\n",
"from typing import Optional\n",
"from datetime import datetime as dt\n",
"\n",
"from zenml import pipeline\n",
"from zenml.model import ModelConfig\n",
"\n",
"from steps import (\n",
" data_loader,\n",
" notify_on_failure,\n",
" tokenization_step,\n",
" tokenizer_loader,\n",
" generate_reference_and_comparison_datasets,\n",
")\n",
"from zenml.integrations.evidently.metrics import EvidentlyMetricConfig\n",
"from zenml.integrations.evidently.steps import (\n",
" EvidentlyColumnMapping,\n",
" evidently_report_step,\n",
")\n",
"\n",
"from pipelines import (\n",
" sentinment_analysis_deploy_pipeline,\n",
" sentinment_analysis_promote_pipeline,\n",
" sentinment_analysis_training_pipeline,\n",
")\n",
"\n",
"logger = get_logger(__name__)"
]
},
{
"cell_type": "markdown",
"id": "fc77b660-e206-46b1-a924-407e797a8f47",
"metadata": {},
"source": [
"# 🍳Breaking it down\n",
"\n",
"\n",
"\n"
]
},
{
"cell_type": "markdown",
"id": "31edaf46-6981-42be-99b7-9bdd91c160d5",
"metadata": {},
"source": [
"## 👶 Step 1: Start with feature engineering\n",
"\n",
"Automated feature engineering forms the foundation of this MLOps workflow. Thats why the first pipeline is the feature engineering pipeline. This pipeline loads some data from Huggingface and uses a base tokenizer to create a tokenized dataset. The data loader step is a simple Python function that returns a Huggingface dataloader object:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "35de0e4c-b6f8-4b68-927a-f40e4130dc93",
"metadata": {},
"outputs": [],
"source": [
"@step\n",
"def data_loader() -> Annotated[DatasetDict, \"dataset\"]:\n",
" logger.info(f\"Loading dataset airline_reviews... \")\n",
" hf_dataset = load_dataset(\"Shayanvsf/US_Airline_Sentiment\")\n",
" hf_dataset = hf_dataset.rename_column(\"airline_sentiment\", \"label\")\n",
" hf_dataset = hf_dataset.remove_columns(\n",
" [\"airline_sentiment_confidence\", \"negativereason_confidence\"]\n",
" )\n",
" return hf_dataset"
]
},
{
"cell_type": "markdown",
"id": "49e4462c-1e64-48d3-bae7-76696a958646",
"metadata": {},
"source": [
"Notice that you can give each dataset a name with Python’s Annotated object. The DatasetDict is a native Huggingface dataset which ZenML knows how to persist through steps. This flow ensures reproducibility and version control for every dataset iteration.\n",
"\n",
"Also notice this is a simple Python function, that can be called with the `entrypoint` wrapper:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "18144a6b-c266-453d-82c8-b5d6aa1be0aa",
"metadata": {},
"outputs": [],
"source": [
"hf_dataset = data_loader.entrypoint()\n",
"print(hf_dataset)"
]
},
{
"cell_type": "markdown",
"id": "31330d3c-044f-4912-8d36-74146f48cecf",
"metadata": {},
"source": [
"Now we put this a full feature engineering pipeline. Each run of the feature engineering pipeline produces a new dataset to use for the training pipeline. ZenML versions this data as it flows through the pipeline.\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"id": "9511bd84-1e97-42db-9b75-06285cc6904c",
"metadata": {},
"source": [
"### Set your stack"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "76f3a7e7-0d85-43b3-9e9f-4c7f20ea65e6",
"metadata": {},
"outputs": [],
"source": [
"!zenml stack describe hf-sagemaker-local"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "04b0bf69-70c6-4408-b18c-95df9e030c0c",
"metadata": {},
"outputs": [],
"source": [
"!zenml stack set hf-sagemaker-local"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "de5398a4-a9ec-42d6-bbd6-390244c52d13",
"metadata": {},
"outputs": [],
"source": [
"!zenml stack get"
]
},
{
"cell_type": "markdown",
"id": "152f718d-70c2-4a29-a73e-37db85675cb8",
"metadata": {},
"source": [
"### Run the pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7ca6c41e-e4b3-46d2-8264-9a453ac9aa3c",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"@pipeline(on_failure=notify_on_failure)\n",
"def sentinment_analysis_feature_engineering_pipeline(\n",
" lower_case: Optional[bool] = True,\n",
" padding: Optional[str] = \"max_length\",\n",
" max_seq_length: Optional[int] = 128,\n",
" text_column: Optional[str] = \"text\",\n",
" label_column: Optional[str] = \"label\",\n",
"):\n",
" # Link all the steps together by calling them and passing the output\n",
" # of one step as the input of the next step.\n",
"\n",
" ########## Load Dataset stage ##########\n",
" dataset = data_loader()\n",
"\n",
" ########## Data Quality stage ##########\n",
" reference_dataset, comparison_dataset = generate_reference_and_comparison_datasets(\n",
" dataset\n",
" )\n",
" text_data_report = evidently_report_step.with_options(\n",
" parameters=dict(\n",
" column_mapping=EvidentlyColumnMapping(\n",
" target=\"label\",\n",
" text_features=[\"text\"],\n",
" ),\n",
" metrics=[\n",
" EvidentlyMetricConfig.metric(\"DataQualityPreset\"),\n",
" EvidentlyMetricConfig.metric(\n",
" \"TextOverviewPreset\", column_name=\"text\"\n",
" ),\n",
" ],\n",
" # We need to download the NLTK data for the TextOverviewPreset\n",
" download_nltk_data=True,\n",
" ),\n",
" )\n",
" text_data_report(reference_dataset, comparison_dataset)\n",
"\n",
" ########## Tokenization stage ##########\n",
" tokenizer = tokenizer_loader(lower_case=lower_case)\n",
" tokenized_data = tokenization_step(\n",
" dataset=dataset,\n",
" tokenizer=tokenizer,\n",
" padding=padding,\n",
" max_seq_length=max_seq_length,\n",
" text_column=text_column,\n",
" label_column=label_column,\n",
" )\n",
" return tokenizer, tokenized_data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3c8a5be7-ebaa-41c4-ac23-4afc6e7e06aa",
"metadata": {},
"outputs": [],
"source": [
"# Run a pipeline with the required parameters. \n",
"no_cache: bool = True\n",
"zenml_model_name: str = \"distil_bert_sentiment_analysis\"\n",
"max_seq_length = 512\n",
"\n",
"# This executes all steps in the pipeline in the correct order using the orchestrator\n",
"# stack component that is configured in your active ZenML stack.\n",
"model_config = ModelConfig(\n",
" name=zenml_model_name,\n",
" license=\"Apache 2.0\",\n",
" description=\"Show case Model Control Plane.\",\n",
" create_new_model_version=True,\n",
" delete_new_version_on_failure=True,\n",
" tags=[\"sentiment_analysis\", \"huggingface\"],\n",
")\n",
"\n",
"pipeline_args = {}\n",
"\n",
"if no_cache:\n",
" pipeline_args[\"enable_cache\"] = False\n",
"\n",
"# Execute Feature Engineering Pipeline\n",
"pipeline_args[\"model_config\"] = model_config\n",
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"feature_engineering_config.yaml\")\n",
"run_args_feature = {\n",
" \"max_seq_length\": max_seq_length,\n",
"}\n",
"pipeline_args[\n",
" \"run_name\"\n",
"] = f\"sentinment_analysis_feature_engineering_pipeline_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}\"\n",
"p = sentinment_analysis_feature_engineering_pipeline.with_options(**pipeline_args)\n",
"p(**run_args_feature)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0e7c1ea2-64fe-478a-9963-17c7b7f62110",
"metadata": {},
"outputs": [],
"source": [
"from zenml.client import Client\n",
"from IPython.display import display, HTML\n",
"\n",
"client = Client()\n",
"# CHANGE THIS TO THE LATEST RUN ID\n",
"latest_run = client.get_pipeline_run(\"sentinment_analysis_feature_engineering_pipeline_run_2023_11_21_10_55_56\")\n",
"html = latest_run.steps[\"evidently_report_step\"].outputs['report_html'].load()\n",
"display(HTML(html))"
]
},
{
"cell_type": "markdown",
"id": "78ab8771-4421-4975-a3d5-12892a56b805",
"metadata": {},
"source": [
"## 💪 Step 2: Train the model with Huggingface Hub as the model registry\n",
" "
]
},
{
"cell_type": "markdown",
"id": "2843efa8-32b6-4b13-ac85-33c99cc94e3e",
"metadata": {},
"source": [
"Once the feature engineering pipeline has run a few times, we have many datasets to choose from. We can feed our desired one into a function that trains the model on the data. Thanks to the ZenML Huggingface integration, this data is loaded directly from the ZenML artifact store.\n",
"\n",
"\n",
"\n",
"On the left side, we see our local MLOps stack, which defines our infrastructure and tooling we are using for this particular pipeline. ZenML makes it easy to run on a local stack on your development machine, or switch out the stack to run on a AWS Kubeflow-based stack (if you want to scale up).\n",
"\n",
"On the right side is the new kid on the block - the ZenML Model Control Plane. The Model Control Plane is a new feature in ZenML that allows users to have a complete overview of their machine learning models. It allows teams to consolidate all artifacts related to their ML models into one place, and manage its lifecycle easily as you can see from this view from the ZenML Cloud:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4c99b20f-8e3b-4119-86e9-33dd1395470a",
"metadata": {},
"outputs": [],
"source": [
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"trainer_config.yaml\")\n",
"\n",
"pipeline_args[\"enable_cache\"] = True\n",
"\n",
"run_args_train = {\n",
" \"num_epochs\": 1,\n",
" \"train_batch_size\": 64,\n",
" \"eval_batch_size\": 64,\n",
" \"learning_rate\": 2e-4,\n",
" \"weight_decay\": 0.01,\n",
" \"max_seq_length\": 512,\n",
"}\n",
"\n",
"# Use versioned artifacts from the last step\n",
"# run_args_train[\"dataset_artifact_id\"] = latest_run.steps['tokenization_step'].output.id\n",
"# run_args_train[\"tokenizer_artifact_id\"] = latest_run.steps['tokenizer_loader'].output.id\n",
"\n",
"# Configure the model\n",
"pipeline_args[\"model_config\"] = model_config\n",
"\n",
"pipeline_args[\n",
" \"run_name\"\n",
"] = f\"sentinment_analysis_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "96592299-0090-4d2a-962e-6ca232c1fb75",
"metadata": {},
"outputs": [],
"source": [
"sentinment_analysis_training_pipeline.with_options(**pipeline_args)(\n",
" **run_args_train\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e24e29de-6d1b-41da-9ab2-ca2b32f1f540",
"metadata": {},
"outputs": [],
"source": [
"### Check out a new stack\n",
"!zenml stack describe hf-sagemaker-airflow"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7c9a5bee-8465-4d41-888a-093f1f6a2ef1",
"metadata": {},
"outputs": [],
"source": [
"### Change the stack\n",
"!zenml stack set hf-sagemaker-airflow"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d3772c50-1c90-4ffc-8394-c9cfca16cc53",
"metadata": {},
"outputs": [],
"source": [
"sentinment_analysis_training_pipeline.with_options(**pipeline_args)(\n",
" **run_args_train\n",
")"
]
},
{
"cell_type": "markdown",
"id": "be79f454-a45d-4f5f-aa93-330d52069124",
"metadata": {},
"source": [
"## 🫅 Step 3: Promote the model to production\n"
]
},
{
"cell_type": "markdown",
"id": "5a09b432-7a66-473e-bdb6-ffdca730498b",
"metadata": {},
"source": [
"Following training, the automated promotion pipeline evaluates models against predefined metrics, identifying and marking the most performant one as 'Production ready'. This is another common use case for the Model Control Plane; we store the relevant metrics there to access them easily later.\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5bac7ae5-70d0-449c-929c-e175c3062f2d",
"metadata": {},
"outputs": [],
"source": [
"!zenml stack set hf-sagemaker-local"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "170c9ef6-4e6f-4e50-ac37-e05bef8570ea",
"metadata": {},
"outputs": [],
"source": [
"run_args_promoting = {}\n",
"model_config = ModelConfig(name=zenml_model_name)\n",
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"promoting_config.yaml\")\n",
"\n",
"pipeline_args[\"model_config\"] = model_config\n",
"\n",
"pipeline_args[\n",
" \"run_name\"\n",
"] = f\"sentinment_analysis_promoting_pipeline_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e6df11e2-4591-4186-a8f8-243f9c4d1e3d",
"metadata": {},
"outputs": [],
"source": [
"sentinment_analysis_promote_pipeline.with_options(**pipeline_args)(\n",
" **run_args_promoting\n",
")"
]
},
{
"cell_type": "markdown",
"id": "6efc4968-35fd-42e3-ba62-d8e1557aa0d6",
"metadata": {},
"source": [
"## 💯 Step 4: Deploy the model to AWS Sagemaker Endpoints\n"
]
},
{
"cell_type": "markdown",
"id": "577aff86-bde9-48d4-9b52-209cfed9fd4e",
"metadata": {},
"source": [
"This is the final step to automate the deployment of the slated production model to a Sagemaker endpoint. The deployment pipelines handles the complexities of AWS interactions and ensures that the model, along with its full history and context, is transitioned into a live environment ready for use. Here again we use the Model Control Plane interface to query the Huggingface revision and use that information to push to Huggingface Hub.\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1513ab5f-de05-4344-9d2c-fedbfbd21ef0",
"metadata": {},
"outputs": [],
"source": [
"!zenml stack set hf-sagemaker-local"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "606fdb3c-4eca-4d32-bccb-280743d15528",
"metadata": {},
"outputs": [],
"source": [
"pipeline_args[\"config_path\"] = os.path.join(\"configs\", \"deploying_config.yaml\")\n",
"\n",
"# Deploying pipeline has new ZenML model config\n",
"model_config = ModelConfig(\n",
" name=zenml_model_name,\n",
" version=ModelStages.PRODUCTION,\n",
")\n",
"pipeline_args[\"model_config\"] = model_config\n",
"pipeline_args[\"enable_cache\"] = False\n",
"run_args_deploying = {}\n",
"pipeline_args[\n",
" \"run_name\"\n",
"] = f\"sentinment_analysis_deploy_pipeline_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "87f1f982-ab96-4207-8e7e-e318473587e9",
"metadata": {},
"outputs": [],
"source": [
"sentinment_analysis_deploy_pipeline.with_options(**pipeline_args)(\n",
" **run_args_deploying\n",
")"
]
},
{
"cell_type": "markdown",
"id": "594ee4fc-f102-4b99-bdc3-2f1670c87679",
"metadata": {},
"source": [
"ZenML builds upon the straightforward deployment capability of Huggingface models to AWS Sagemaker, and transforms it into a sophisticated, repeatable, and transparent MLOps workflow. It takes charge of the intricate steps necessary for modern ML systems, ensuring that software engineering leads can focus on iteration and innovation rather than operational intricacies.\n",
"\n",
"To delve deeper into each stage, refer to the comprehensive guide on GitHub[: zenml-io/zenml-huggingface-sagemak](https://github.com/zenml-io/zenml-huggingface-sagemaker)er. Additionally[, this YouTube playli](https://www.youtube.com/watch?v=Q1EH2H8Akgo&list=PLhNrLW_IWplw6dBbmGcL828-atJMu3CwF)st provides a detailed visual walkthrough of the entire pipeline: Huggingface to Sagemaker ZenML tutorial.\n",
"\n",
"Interested in standardizing your MLOps workflows? ZenML Cloud is now available to all - get a managed ZenML server with important features such as RBAC and pipeline trigge[rs. Book a ](https://zenml.io/book-a-demo)demo with us now to learn how you can create your own MLOps pipelines today."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}