Building MLOPs on AWS (Part 2)

Suman Gautam
7 min readFeb 20, 2023

Machine Learning orchestration with AWS StepFunctions

Photo by Ludde Lorentz on Unsplash

This tutorial is a continuation of building a Machine Learning workflow with Amazon SageMaker and other services. In part 1, I mostly discuss about how to perform simple data preprocessing, training and inference using Amazon SageMaker. This is enough if you are testing your Machine Learning model in your local machine or a simple AWS instance. In the real world, we need to perform these kind of tasks on the repeatable fashion (often real-time or at certain frequency). In such case, an automation pipeline becomes necessary. AWS offers multiple options to perform this task, e.g SageMaker Pipeline, StepFunctions. In this tutorial, I will be focussing on how to incorporate SageMaker Machine Learning steps within AWS StepFunctions to create a simple orchestration pipeline.

We will jump right into the AWS StepFunctions workflow but if you needed some refresher on general ML implementation in SageMaker, I have included a link to the part 1 of this tutorial at the end of this page.

Our code will be based on Python kernel and of course Jupyter!

Install necessary libraries

import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install --upgrade stepfunctions

One of the many headaches of using different libraries is to run into version mismatch. I am very certain that down the road, you will also get bogged down in the mire. One of them early error to encounter is with boto3 and botocore. If you get an error related to version conflict with botocore or awscli, do something like below. (Make sure to not copy the version number as shown below, you will need to evaluate which versions you have in your notebook!)

!pip uninstall botocore==1.24.19 --yes
!pip install botocore==1.27.67

# Make sure to enter suitable version required for awscli)

!pip uninstall awscli --yes
!pip install --upgrade awscli

!RESTART the kernel

If you don’t restart your kernel, your version update won’t work. It took me many hours (perhaps days) to figure this out.

You can do it from within the jupyter notebook:

import IPython
IPython.Application.instance().kernel.do_shutdown(True)

Import StepFunctions related libraries

import io
import logging
import os
import random
import time
import uuid
#from stepfunctions.steps import *
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
Chain,
ChoiceRule,
ModelStep,
ProcessingStep,
TrainingStep,
TransformStep,
)
from stepfunctions.workflow import Workflow
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
stepfunctions.set_stream_logger(level=logging.INFO)

SageMaker related libraries

import boto3
import re
import json
from time import gmtime, strftime
import os
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
sess = sagemaker.Session()
region = boto3.session.Session().region_name
bucket = "<your bucket>"
prefix = "<folder to go inside the bucket>"

Define roles:

You can use sagemaker.get_execution_role() if running inside sagemaker’s notebook instance:

sagemaker_execution_role = sagemaker.get_execution_role()

#Replace with ARN if not in an AWS SageMaker notebook

Copy your AmazonSageMaker-StepFunctionsWorkflowExecutionRole ARN and paste inside the workflow_execution_role which looks like:

workflow_execution_role = '<arn:aws:iam::0123456789:role/AmazonSageMaker-StepFunctionsWorkflowExecutionRole...>'

Docker Images:

In the part 1 tutorial, I have a detailed discussion on how to create and use custom docker container for ML in SageMaker. We will continue using the docker image created in that step and use in this tutorial as well. Since we are using custom docker for everything, define your docker image here:

preprocessing_image ='< preprocessing container>'
training_image ='< training container>'
inference_image ='< inference container>'

You can have a single docker container to include all the preprocessing, training and inference scripts. In my case, I combined training and inference into one container and have a separate container for preprocessing.

Workflow

Our current workflow is simple linear workflow to perfom an automated data ingestion to model inference on a batch data:

data ingestion --> processing --> training --> inference 

In AWS StepFunctions, we can chain these steps in a workflow. To make the naming of each of these steps consistent, we should use a structure defined within Execution Inputs schema:

Execution Inputs:

execution_input = ExecutionInput(
schema={
"IngestionJobName": str,
"PreprocessingJobName": str,
"TrainingJobName": str,
"ModelName": str,
"InferenceJobName": str,
}
)

The names inside the quotation mark will be the actual job_name which you will need to put inside each step’s job_name parameter.

Ingestion Step

If you are working with your organization which have data privacy enabled, you may have to go through network configuration for your SageMaker Instance. In my case, the process looked like the following:

security_group_ids = ["<.....>"]
subnets = ["<......>"]
# These parameters shoud be provided to you by your admin

network_config = NetworkConfig(
security_group_ids=security_group_ids,
subnets=subnets,
enable_network_isolation=False
)
#Note this configuration is supplied in the 'network_config' parameter
# in the ScriptProcessor() method

Next we will use SageMaker ScriptProcessor to use custom python code to ingest data from our source (in our case, it was from Snowflake).

# Step-1a: Create a SageMaker ScriptProcessor input/output
ingestion_inputs=[
ProcessingInput(
source='s3://{}/{}'.format(bucket, '.../.../ingest.py'),
destination='/opt/ml/processing/input',
input_name='input'
)
]

ingestion_outputs=[
ProcessingOutput(
source='/opt/ml/processing/output/train',
destination="s3://{}/{}".format(bucket, "data"),
output_name='output'
)
]

# Step-1b: Create a SageMaker ScriptProcessor instance
ingestor = ScriptProcessor(
command=['python3'],
image_uri=preprocessing_image,
role=sagemaker_execution_role,
instance_count=1,
instance_type="ml.t3.medium",
#network_config=network_config
)

# Step-1c: Create Ingestion Step
data_ingestion_step = ProcessingStep(
"<...sagemaker-ingestion-step...>",
processor=ingestor,
job_name=execution_input["IngestionJobName"],
inputs=ingestion_inputs,
outputs=ingestion_outputs,
container_entrypoint=["python3", "/opt/ml/processing/input/ingest.py"],
# Make sure this entrypoint location matches the ProcessingInput/destination
)

Processing Step

# Step-2a: Prepare input data and codes
# Using Script processing
SCRIPT_PREPROCESSING_LOCATION = "container/sub-folder/preprocessing.py"
input_code = sess.upload_data(
SCRIPT_PREPROCESSING_LOCATION,
bucket='<your bucket name here>',
key_prefix="<folder inside the bucket",
)
inputs=[
ProcessingInput(
source=input_data_path,
destination="/opt/ml/processing/input",
input_name="input_data"
),
ProcessingInput(
source=input_code,
destination="/opt/ml/processing/input/code",
input_name="code",
),
]
outputs=[
ProcessingOutput(output_name="train_data",
source="/opt/ml/processing/output/train",
destination="{}/{}".format(output_data_path, "train_data")),
ProcessingOutput(output_name="test_data",
source="/opt/ml/processing/output/test",
destination="{}/{}".format(output_data_path, "test_data")),
]

# Step-2b: Create a SageMaker ScriptProcessor instance
script_processor = ScriptProcessor(
command=['python3'],
image_uri=preprocessing_image,
role=sagemaker_execution_role,
instance_count=1,
instance_type="ml.m5.xlarge",
)
# Step-2c: Create Processing Step
processing_step = ProcessingStep(
"<...sagemaker-processing-step...>",
processor=script_processor,
job_name=execution_input["PreprocessingJobName"],
inputs=inputs,
outputs=outputs,
container_arguments=["--train-test-split-ratio", "0.2"],
container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],
)

If you want to use SKlearn processing, you need add a text file named ‘requirements.txt’ and supply a list libraries that need to be installed:

pandas==1.3.4 
scikit-learn==1.0.2
nltk==3.6.5

You will also have to add following lines of codes in your preprocessing.py file and make sure the opt/ml path matches with the one you will define in the SKLearnProcessor method.

#update preprocessing.py file
import sys
import subprocess
subprocess.check_call([
sys.executable, "-m", "pip", "install", "-r",
"/opt/ml/processing/input/code/extra_libraries/requirements.txt",
])
.
.
.

Then you define the steps similar to the one with ScriptProcessor:

# Step-2a: Prepare input data and codes
# Using SKlearn processing

inputs=[
ProcessingInput(
source=input_data_path,
destination="/opt/ml/processing/input",
input_name="input_data"
),
ProcessingInput(
source='s3://{}/{}'.format(bucket, 'sklearn_processing/code/preprocessing.py'),
destination="/opt/ml/processing/input/code",
input_name="code",
),
ProcessingInput(
source='s3://{}/{}'.format(bucket, 'sklearn_processing/extra_libraries/'),
destination="/opt/ml/processing/input/code/extra_libraries",
input_name="requirements",
),
]

outputs=[
ProcessingOutput(output_name="train_data",
source="/opt/ml/processing/output/train",
destination="{}/{}".format(output_data_path, "train_data")),
ProcessingOutput(output_name="test_data",
source="/opt/ml/processing/output/test",
destination="{}/{}".format(output_data_path, "test_data")),
]

# Step-2b: Create a SageMaker SKlearn Processor instance
sklearn_processor = SKLearnProcessor(
framework_version="0.20.0",
role=sagemaker_execution_role,
instance_type="ml.m5.2xlarge",
instance_count=1,
)

# Step-2c: Create Processing Step
processing_step = ProcessingStep(
"<...sagemaker-processing-step...>",
processor=sklearn_processor,
job_name=execution_input["PreprocessingJobName"],
inputs=inputs,
outputs=outputs,
container_arguments=["--train-test-split-ratio", "0.2"],
container_entrypoint=["python3", "/opt/ml/processing/input/code/preprocessing.py"],

Training Step

# Step-3a: Define inputs for training
preprocessed_training_data_path = '<s3 bucket>/<train_data>/'
preprocessed_test_data_path = '<s3 bucket>/<test_data>/'

# Step-3b: Create a SageMaker Estimator instance
model_estimator = sagemaker.estimator.Estimator(
<docker image>,
role,
instance_count=1,
instance_type="ml.m5.xlarge",
sagemaker_session=sess,
output_path="s3://{}/output".format(bucket),
hyperparameters={....},
)
# Step-3c: Create Training Step
train_step = TrainingStep(
'<...sagemaker-training-step...>',
estimator=model_estimator,
data={"train": sagemaker.TrainingInput(preprocessed_training_data_path, content_type="text/csv")},
job_name=execution_input['TrainingJobName'],
wait_for_completion=True,
)

Inference Step (with Training Step)

# Step-4a: Create Model step
model_step = ModelStep(
"<...sagemaker-model-step...>",
model=train_step.get_expected_model(model_name=None),
model_name=execution_input["ModelName"],

)

# Step-4b: Prepare input datasets for the inference
batch_input = '<s3:bucket>/<folder>/test_data.csv'
transform_output_folder = "batch-transform-output"
batch_output_path = "s3://{}/{}".format(bucket, transform_output_folder)

# Step-4c: Create a SageMaker Transformer instance
model_transformer = model_estimator.transformer(
instance_count = 1,
instance_type = "ml.m5.large",
output_path = batch_output_path,
#assemble_with = "Line",
#accept = "text/csv",
)
# Step-4d: Create Transform Step
transform_step = steps.TransformStep(
"<...sagemaker-transform-step...>",
transformer = model_transformer,
job_name = execution_input["InferenceJobName"],
model_name = execution_input["ModelName"],
data = batch_input,
content_type = "text/csv",
split_type = "Line"
)

Inference Step (Without training step)

If the training has already been done in the past and your focus is to perform batch inference from the existing model artifacts then your ‘model step’ should look like this:

# Step-4a: Create a Create Model step
model_artifacts = "<s3://.../output/model.tar.gz>"

model_step = ModelStep(
"<...sagemaker-model-step...>",
model=sagemaker.model.Model(
image_uri = training_image_cap,
model_data = model_artifacts,
role = sagemaker_execution_role,
),
model_name=execution_input["ModelName"],
)

Then you continue step 4b, 4c, 4d.

Putting all together

Our final step is to chain together all the ‘execution_input’ variables defined above. The steps looks like following:

# Putting together preprocessing and training
workflow_graph = Chain(
[data_ingestion_step, processing_step, train_step, model_step, transform_step]
)
# Next, we define the workflow
branching_workflow = Workflow(
name = "<...MyWorkflow-processing-to-batch-tranform...>",
definition = workflow_graph,
role = workflow_execution_role
)
branching_workflow.create()

If the job fails, you will need to delete the state machine before you could re-run the job or simply update the name. In the jupyter notebook you can delete the latest workflow with:

branching_workflow.delete()

Execution:

The final execution workflow will trigger state machines for each steps.

# Execute the workflow
# Generate unique names for each of the steps using uuid.hex method

workflow_execution = branching_workflow.execute(
inputs = {
"IngestionJobName": "ingesting-{}-{}".format(uuid.uuid1().hex),
"PreprocessingJobName": "sm-preprocessing-{}".format(uuid.uuid1().hex),
"TrainingJobName": "sm-training-{}".format(uuid.uuid1().hex),
"ModelName": "sm-model-{}".format(uuid.uuid1().hex),
"InferenceJobName": "sm-inference-{}".format(uuid.uuid1().hex),
}
)
execution_output = workflow_execution.get_output(wait=True)

To Render workflow in notebook (does not work with jupyter lab):

workflow_execution.render_progress()

From here, you can go to the AWS StepFunctions console management to monitor the workflow execution.

After this, the usual model evaluation stuffs!

The next step is to put these steps together in a pipeline for an automation. I will be discussing some AWS services such as Lambda function and EventBridge to create a weekly data ingestion, preprocessing, training and inference automation in the part III of this tutorial series.

You can find the part 1 tutorial and other reference materials below:

Thank you for reading!

--

--