Building MLOPs on AWS (Part 1)

SageMaker ML workflow using custom Docker Containers

Suman Gautam
15 min readNov 21, 2022
Image Source: Daniel Eledut (unsplash)

Taking a Machine Learning (ML) algorithm or a workflow built in the python environment to AWS cloud involves a complex set of steps. For a novice data scientist, it is not a straight forward procedure. Though there are a lot of documentation out there online, most of them don’t provide details and easy to follow guide which makes it clueless for the first time AWS user. Especially, they almost seemed to be designed for those who has some background on data engineering. This tutorial series is an effort to combine different elements of Machine Learning (ML) workflow within the AWS environment.

  • Data Ingestion and EDA
  • Data Preprocessing / Feature Engineering in SageMaker
  • Model Training and hyper-parameter optimization with SageMaker
  • Inference with SageMaker Batch Transform
  • Deployment

To implement this, I used AWS SageMaker service with custom docker container to build an automated pipeline. To include all of the contents in the same page would be rather cumbersome, so in this first part, I will discuss about building docker and using SageMaker skeleton for training and inference. In the part II of this series, I will discuss how the workflow designed with SageMaker skeleton can be orchestrated with AWS StepFunctions. And finally in the part III, I will focus on automating these steps with AWS EventBridge and/or AWS Lambda function.

Before beginning this tutorial, you will need following permissions in your AWS IAM:

  • SageMakerExecutionRole or SageMakerFullAccess
  • AmazonEC2ContainerRegistryFullAccess

Attach the policy to the SageMaker Execution Role associated with the Notebook instance.

Goto the IAM -> select Roles -> select AmazonSageMaker-ExecutionPolicy ( or whatever policy you have)

Add permissions -> Attach Policy-> type the name of the above created policy in the search bar and attach the policy

Now let’s setup the docker container.

Docker Folder Structure:

On your Jupiter Lab, the folder structure should look like this.

|--Container
|--sub-folder
|--nginx.conf
|--predictor.py
|--preprocessing.py
|--serve
|--train
|--wsgi.py
|--misc files
|--build_and_push.sh
|--Dockerfile
|--setup.sh

We have a main folder ‘Container’ that contains all the necessary files. Inside the Container, we have a sub-folder and three shell script files. You can change the name ‘sub-folder’ to your like. We only need to update the contents of Dockerfile and build_and_push.sh files. Inside the ‘sub-folder’, we have several files. You don’t need to change the followings: nginx.conf, serve, wsgi.py. Remaining files are the Python codes for your ML workflow. Also note that the train is not .py but a text file, you just need to copy and paste the contents of your training script here. All the code contents of the files are provided below and for your help.

Dockerfile:

# Build an image that can do data ingestion from Snowflake and preprocessing in SageMaker
# This is a Python 3 image that uses the nginx, gunicorn, flask stack
# for serving inferences in a stable way.
FROM python:3.7-slim-buster
# FROM ubuntu:18.04
# MAINTAINER Amazon AI <sage-learner@amazon.com>
RUN apt-get -y update && apt-get install -y --no-install-recommends \
wget \
python3-pip \
python3-setuptools \
nginx \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
# RUN ln -s /usr/bin/python3 /usr/bin/python
# RUN ln -s /usr/bin/pip3 /usr/bin/pip

# Here we get all python packages.
RUN pip3 install pandas scikit-learn nltk # (you may include versions)
RUN pip3 install flask gunicorn. # (make sure these are not disabled!)

# Set some environment variables. PYTHONUNBUFFERED keeps Python from buffering our standard
# output stream, which means that logs can be delivered to the user quickly. PYTHONDONTWRITEBYTECODE
# keeps Python from writing the .pyc files which are unnecessary in this case. We also update
# PATH so that the train and serve programs are found when the container is invoked.
ENV PYTHONUNBUFFERED=TRUE
ENV PYTHONDONTWRITEBYTECODE=TRUE
ENV PATH="/opt/program:${PATH}"

# Set up the program in the image
# Also, get familiar with docker file system, I have an example scrrenshot coming up later
COPY sub-folder /opt/program
WORKDIR /opt/program
RUN chmod +x /opt/program/preprocessing.py
RUN chmod +x /opt/program/predictor.py
RUN chmod +x /opt/program/train
RUN chmod +x /opt/program/serve

The last three lines with chmod function, add similar lines for extra python codes you are going to execute in your ML workflow. BE CAREFUL when you comment out codes here, you may be tempted to comment out one or two line to see what these codes do, but if you forget to uncomment them later, you will face strange error which you won’t have clue where things went wrong.

Next we build a shell script to push the docker container.

build_and_push.sh

%%sh
# The base name of our container image
algorithm_name= <your algorithm name>
cd <folder>
chmod +x <folder>/preprocessing.py
chmod +x <folder>/predictor.py
chmod +x <folder>/train
chmod +x <folder>/serve
account=$(aws sts get-caller-identity --query Account --output text)
# Get the region defined in the current configuration (default to us-west-2 if none defined)
region=$(aws configure get region)
region=${region:<your region>}
fullname="${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest"
# If the repository doesn't exist in ECR, create it.
aws ecr describe-repositories --repository-names "${algorithm_name}" > /dev/null 2>&1
if [ $? -ne 0 ]
then
aws ecr create-repository --repository-name "${algorithm_name}" > /dev/null
fi
# Get the login command from ECR and execute it directly
aws ecr get-login-password --region ${region}|docker login --username AWS --password-stdin ${fullname}
# Build the docker image locally with the image name and then push it to ECR
# with the full name.
docker build -t ${algorithm_name} .
docker tag ${algorithm_name} ${fullname}
# Push to ECR, after you've tested locally
docker push ${fullname}
echo ${fullname}

You need to update only the strings inside <...>.

Copy the content of the setup.sh file below, you do not need to change anything here.

setup.sh

#!/bin/bash
sudo -n true
if [ $? -eq 0 ]; then
echo "The user has root access."
else
echo "The user does not have root access. Everything required to run the notebook is already installed and setup. We are good to go!"
exit 0
fi
# Do we have GPU support?
nvidia-smi > /dev/null 2>&1
if [ $? -eq 0 ]; then
# check if we have nvidia-docker
NVIDIA_DOCKER=`rpm -qa | grep -c nvidia-docker2`
if [ $NVIDIA_DOCKER -eq 0 ]; then
# Install nvidia-docker2
#sudo pkill -SIGHUP dockerd
sudo yum -y remove docker
sudo yum -y install docker-17.09.1ce-1.111.amzn1
sudo /etc/init.d/docker start
curl -s -L https://nvidia.github.io/nvidia-docker/amzn1/nvidia-docker.repo | sudo tee /etc/yum.repos.d/nvidia-docker.repo
sudo yum install -y nvidia-docker2
sudo cp daemon.json /etc/docker/daemon.json
sudo pkill -SIGHUP dockerd
echo "installed nvidia-docker2"
else
echo "nvidia-docker2 already installed. We are good to go!"
fi
fi
# This is common for both GPU and CPU instances
# check if we have docker-compose
docker-compose version >/dev/null 2>&1
if [ $? -ne 0 ]; then
# install docker compose
pip install docker-compose
fi
# check if we need to configure our docker interface
SAGEMAKER_NETWORK=`docker network ls | grep -c sagemaker-local`
if [ $SAGEMAKER_NETWORK -eq 0 ]; then
docker network create --driver bridge sagemaker-local
fi
# Notebook instance Docker networking fixes
RUNNING_ON_NOTEBOOK_INSTANCE=`sudo iptables -S OUTPUT -t nat | grep -c 169.254.0.2`
# Get the Docker Network CIDR and IP for the sagemaker-local docker interface.
SAGEMAKER_INTERFACE=br-`docker network ls | grep sagemaker-local | cut -d' ' -f1`
DOCKER_NET=`ip route | grep $SAGEMAKER_INTERFACE | cut -d" " -f1`
DOCKER_IP=`ip route | grep $SAGEMAKER_INTERFACE | cut -d" " -f12`
# check if both IPTables and the Route Table are OK.
IPTABLES_PATCHED=`sudo iptables -S PREROUTING -t nat | grep -c 169.254.0.2`
ROUTE_TABLE_PATCHED=`sudo ip route show table agent | grep -c $SAGEMAKER_INTERFACE`
if [ $RUNNING_ON_NOTEBOOK_INSTANCE -gt 0 ]; then
if [ $ROUTE_TABLE_PATCHED -eq 0 ]; then
# fix routing
sudo ip route add $DOCKER_NET via $DOCKER_IP dev $SAGEMAKER_INTERFACE table agent
else
echo "SageMaker instance route table setup is ok. We are good to go."
fi
if [ $IPTABLES_PATCHED -eq 0 ]; then
sudo iptables -t nat -A PREROUTING -i $SAGEMAKER_INTERFACE -d 169.254.169.254/32 -p tcp -m tcp --dport 80 -j DNAT --to-destination 169.254.0.2:9081
echo "iptables for Docker setup done"
else
echo "SageMaker instance routing for Docker is ok. We are good to go!"
fi
fi

Now we go inside the ‘sub-folder’ and copy the content of the followings:

nginx.conf

worker_processes 1;
daemon off; # Prevent forking
pid /tmp/nginx.pid;
error_log /var/log/nginx/error.log;
events {
# defaults
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
access_log /var/log/nginx/access.log combined;

upstream gunicorn {
server unix:/tmp/gunicorn.sock;
}
server {
listen 8080 deferred;
client_max_body_size 5m;
keepalive_timeout 5;
proxy_read_timeout 1200s;
location ~ ^/(ping|invocations) {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
proxy_redirect off;
proxy_pass http://gunicorn;
}
location / {
return 404 "{}";
}
}
}

Sometimes, if your input size is too big, you may update the ‘client_max_body_size’.

serve

#!/usr/bin/env python
# This file implements the scoring service shell. You don't necessarily need to modify it for various
# algorithms. It starts nginx and gunicorn with the correct configurations and then simply waits until
# gunicorn exits.
#
# The flask server is specified to be the app object in wsgi.py
#
# We set the following parameters:
#
# Parameter Environment Variable Default Value
# --------- -------------------- -------------
# number of workers MODEL_SERVER_WORKERS the number of CPU cores
# timeout MODEL_SERVER_TIMEOUT 60 seconds
import multiprocessing
import os
import signal
import subprocess
import sys
cpu_count = multiprocessing.cpu_count()
model_server_timeout = os.environ.get('MODEL_SERVER_TIMEOUT', 60)
model_server_workers = int(os.environ.get('MODEL_SERVER_WORKERS', cpu_count))
def sigterm_handler(nginx_pid, gunicorn_pid):
try:
os.kill(nginx_pid, signal.SIGQUIT)
except OSError:
pass
try:
os.kill(gunicorn_pid, signal.SIGTERM)
except OSError:
pass
sys.exit(0)
def start_server():
print('Starting the inference server with {} workers.'.format(model_server_workers))
# link the log streams to stdout/err so they will be logged to the container logs
subprocess.check_call(['ln', '-sf', '/dev/stdout', '/var/log/nginx/access.log'])
subprocess.check_call(['ln', '-sf', '/dev/stderr', '/var/log/nginx/error.log'])
nginx = subprocess.Popen(['nginx', '-c', '/opt/program/nginx.conf'])
gunicorn = subprocess.Popen(['gunicorn',
'--timeout', str(model_server_timeout),
'-k', 'sync',
'-b', 'unix:/tmp/gunicorn.sock',
'-w', str(model_server_workers),
'wsgi:app'])
signal.signal(signal.SIGTERM, lambda a, b: sigterm_handler(nginx.pid, gunicorn.pid))
# If either subprocess exits, so do we.
pids = set([nginx.pid, gunicorn.pid])
while True:
pid, _ = os.wait()
if pid in pids:
break
sigterm_handler(nginx.pid, gunicorn.pid)
print('Inference server exiting')
# The main routine just invokes the start function.
if __name__ == '__main__':
start_server()

wsgi

from predictor import app
# This is just a simple wrapper for gunicorn to find your app.
# If you want to change the algorithm file, simply change "predictor" above to the
# new file.

Executing the Docker Scripts from Jupyter Notebooks:

# Build the docker container
!/bin/bash container/setup.sh
# Push the container to the AWS ECR
!/bin/bash container/build_and_push.sh

After the successful execution, you should get the ECR address of the container which should look like:

<########>.dkr.ecr.region.amazonaws.com/container_name:latest

Now we are ready to execute ML workflow using SageMaker.

In this section, we will discuss the following three steps, Preprocessing, Training and Inference.

Libraries necessary for the following steps:

import boto3
import re
import json
import os
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
sess = sagemaker.Session()
region = boto3.session.Session().region_name
role = get_execution_role()

In order to define different SageMaker methods using docker container, it is important to understand how files are structured inside the docker upon execution. Below is a screenshot from AWS:

Preprocessing Step:

For this purpose, I am going to discuss how to use SageMaker ScriptProcessor method for preprocessing. ScriptProcessor is useful when you have to use custom functions or some libraries that are not available within the SageMaker. For example, for this exercise I was using NLTK libraries for text processing which was not available in the SageMaker.

Alternatively, you can also use SKLearn Processor, however you need to add one more .txt file as requirements showing what libraries need to be installed at the runtime.

Using ScriptProcessor:

input_data_path = <s3 input data location>
output_data_path = <s3 location after processing>
image_uri = <###.dkr.ecr.region.amazonaws.com/container_name:latest>
script_processor = ScriptProcessor(
command=['python3'],
image_uri,
role=role,
instance_count=1,
#instance_type="ml.m5.xlarge",
instance_type='local', (local mode to execute quickly)
)
script_processor.run(
code="container/sub-folder/preprocessing.py",
inputs=[ProcessingInput(source=input_data_path, destination="/opt/ml/processing/input")],
outputs=[
ProcessingOutput(output_name="train_data",
source="/opt/ml/processing/output/train",
destination="{}/{}".format(output_data_path, "train_data")),
ProcessingOutput(output_name="test_data",
source="/opt/ml/processing/output/test",
destination="{}/{}".format(output_data_path, "test_data")),
],
arguments=["--train-test-split-ratio", "0.2"])

Upon successful execution, SageMaker will output two files: features and labels for separately for training and test data.

Note, the source and destination described within the inputs and outputs section in the code above are the source and destination within the docker which are created dynamically at the runtime. The input_data_path and output_data_path are the actual physical location inside the S3 bucket.

You can examine the output after the successful execution of the preprocessing step as:

preprocessing_job_description = script_processor.jobs[-1].describe()
output_config = preprocessing_job_description["ProcessingOutputConfig"]
for output in output_config["Outputs"]:
if output["OutputName"] == "train_data":
preprocessed_training_data_path = output["S3Output"]["S3Uri"]
if output["OutputName"] == "test_data":
preprocessed_test_data_path = output["S3Output"]["S3Uri"]

Using SKLearnProcessor:

Before using this option, create a folder (e.g extra_libraries) where you will save a file called ‘requirements.txt’. Inside this file you will mention the name of the libraries that needs to be installed.

pandas==1.3.4 
scikit-learn==1.0.2
nltk==3.6.5

In the sklearn_processor.run() method, you specify extra input that to upload the requirement.txt inside the location where codes are uploaded.

You will also have to add following lines of codes in your preprocessing.py file and make sure the opt/ml path matches with the one you will define in the SKLearnProcessor method.

#update preprocessing.py file
import sys
import subprocess
subprocess.check_call([
sys.executable, "-m", "pip", "install", "-r",
"/opt/ml/processing/input/code/extra_libraries/requirements.txt",
])
.
.
.

SKLearnProcessor:

sklearn_processor = SKLearnProcessor(
framework_version="0.20.0",
role=role,
#instance_type="ml.m5.xlarge",
instance_type='local',
instance_count=1,
)
sklearn_processor.run(
code="container/sub-folder/preprocessing.py",
inputs=[
ProcessingInput(source=input_data_path, destination="/opt/ml/processing/input"),
ProcessingInput(source="extra_libraries/", destination="/opt/ml/processing/input/code/extra_libraries")
],
outputs=[
ProcessingOutput(output_name="train_data",
source="/opt/ml/processing/output/train",
destination="{}/{}".format(output_data_path, "train_data")),
ProcessingOutput(output_name="test_data",
source="/opt/ml/processing/output/test",
destination="{}/{}".format(output_data_path, "test_data")),
],
arguments=["--train-test-split-ratio", "0.2"],
)
Note: 
The code files will be uploaded at /opt/ml/processing/input/code/... at runtime.
One benefit of using SKLearnProcessor over ScriptProcessor is that you can
continuously change the codes inside the .py file and execute immediately.
Whereas with the ScriptProcessor, you have to update the docker file and push
the docker container before you can execute the preprocessing steps.
And this will take extra minutes!

The SageMaker will output a feature and label data separately for train and test set. Also, you can have as many ProcessingInput and ProcessingOutput channel defined inside the sklearn_processor.run() method.

Also, using instance_type of ‘local’ will execute your code faster. You should use this mode when testing, and perhaps test on smaller slice of data. As your data size grows, you will find yourself using larger instance type.

Training Step:

We used SageMaker Estimator method to build our training workflow.

sklearn = sagemaker.estimator.Estimator(
image_uri=<########>.dkr.ecr.region.amazonaws.com/container_name:latest,
role=sagemaker.get_execution_role(),
instance_count=1,
instance_type="ml.m5.xlarge",
#instance_type='local', (I think local mode is not supported for training)
sagemaker_session=sagemaker.Session(),
output_path="s3://{}/output".format(<bucket>),
hyperparameters={
"clf__max_iter":1000,
"clf__C": 1,
"clf__solver": "saga",
"clf__class_weight": "balanced",
"clf__n_jobs": -1,
"clf__penalty": "l1",
"clf__tol": 0.001,
"tfidf__tfidf__max_df": 0.06,
"tfidf__tfidf__min_df": 5
},
base_job_name='base_train_hp'
)
sklearn.fit({"train": <preprocessed_training_data_path>})

An example of the training script inside the ‘train’ file:

from __future__ import print_function

import json
import os
import pickle
import sys
import traceback
import argparse

import pandas as pd
import numpy as np

from imblearn.over_sampling import RandomOverSampler
from imblearn.pipeline import Pipeline

from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, fbeta_score

# These are the paths to where SageMaker mounts interesting things
# in your container.

prefix = '/opt/ml/'

input_path = prefix + 'input/data'
output_path = os.path.join(prefix, 'output')
model_path = os.path.join(prefix, 'model')
param_path = os.path.join(prefix, 'input/config/hyperparameters.json')

# the huperparameters are supplied with the sklearn.estimator() method as shown
# int the previous block.


# This algorithm has a single channel of input data called 'train'.
# Since we run in file mode, the input files are copied to the directory specified here.

channel_name='train'
training_path = os.path.join(input_path, channel_name)


def train_and_save(X_train, y_train, args, model_name_prefix=''):
'''
An example of binary classification from text input
'''
# Vectorize text
tfidf = TfidfVectorizer()

# transform the 0th column
column_transformer = ColumnTransformer([('tfidf', tfidf, 'description_clean')],
remainder='passthrough')

# Random oversampler for unbalanced class
ros = RandomOverSampler()

# Create the model object
model_obj = LogisticRegression()

# Initialize Pipeline
pipe = Pipeline([
('tfidf', column_transformer),
('oversamp', ros),
('clf', model_obj)
])

# only set hyperparameters that Pipeline will take
pipe_params = pipe.get_params()
for key, val in args.items():
if key in pipe_params:
pipe = pipe.set_params(**{key: val})

# fit model
pipe.fit(X_train, y_train)

# score the model
metric_score = roc_auc_score(y_train, pipe.predict_proba(X_train)[:, 1])
print('roc_auc: {}'.format(metric_score))

# save the model
with open(os.path.join(model_path, '{}_logreg_model.pkl'.format(model_name_prefix)), 'wb') as out:
pickle.dump(pipe, out)
print('Training for {} {} complete.'.format(args['category'], model_name_prefix))


# The function to execute the training.
def train(args, training_path):
print('Starting the training.')

try:
X_train = pd.read_csv('{}/train_features.csv'.format(training_path))
y_train = pd.read_csv('{}/train_labels.csv'.format(training_path))

# Set features and/or label
X_train = X_train[['input_columnn']]
y_train = y_train['target_column']

train_and_save(X_train, y_train, args)



except Exception as e:
# Write out an error file. This will be returned as the failureReason in the
# DescribeTrainingJob result.
trc = traceback.format_exc()
with open(os.path.join(output_path, 'failure'), 'w') as s:
s.write('Exception during training: ' + str(e) + '\n' + trc)
# Printing this causes the exception to be in the training job logs, as well.
print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr)
# A non-zero exit code causes the training job to be marked as Failed.
sys.exit(255)

def _decode(obj):
'''
This function is to decode the argement parameter supplied
'''
# convert floats if possible
rv = {}
for k, v in obj.items():
if isinstance(v, str):
try:
rv[k] = int(v)
except ValueError:
try:
rv[k] = float(v)
except ValueError:
rv[k] = v
else:
rv[k] = v
return rv


if __name__ == '__main__':
with open(param_path, 'r') as tc:
args = json.load(tc)

args = _decode(args)
train(args, training_path)

# A zero exit code causes the job to be marked a Succeeded.
sys.exit(0)

Inference Step:

In SageMaker, the inference is done in two ways: by creating an endpoint for continuous/real-time prediction or batch transform for one time prediction. For this example, I will be only discussing the batch transform option the my workflow was designed to automate the prediction once a week.

model = sagemaker.model.Model(
model_data='<s3 bucket model location>/.../model.tar.gz',
image_uri,
role=role)
transformer = model.transformer(
instance_count=1,
instance_type="ml.m5.4xlarge",
output_path=batch_output_path,
assemble_with="Line",
accept="text/csv",
)
transformer.transform(
batch_input_path,
content_type="text/csv",
split_type="Line",
#join_source="Input", # to join the prediction with input data
#input_filter="$[1:]", # filter the input columns
#output_filter='$[0,-1]', # filter the output columns
)
transformer.wait()

The SageMaker batch transform should allow to filter input data and output prediction associated with the input data. In my first attempt, this hasn’t worked yet, but I left them commented here so you can try if that works for you.

When creating model object, we have to define the docker container used for the training other wise the job will fail. What if you only have a model available but no container image, or the container image has been updated? This is something I need to do research. If you have suggestion, feel free to comment.

The prediction script looks like this:

from __future__ import print_function

import io
import json
import os
import pickle
import signal
import sys
import traceback

import flask
import pandas as pd

prefix = "/opt/ml/"
model_path = os.path.join(prefix, "model")

# A singleton for holding the model. This simply loads the model and holds it.
# It has a predict function that does a prediction based on the model and the input data.


class ScoringService(object):
model = None # Where we keep the model when it's loaded

@classmethod
def get_model(cls):
"""Get the model object for this instance, loading it if it's not already loaded."""
if cls.model == None:
with open(os.path.join(model_path, "_logreg_model.pkl"), "rb") as inp:
cls.model = pickle.load(inp)
return cls.model

@classmethod
def predict(cls, input):
"""For the input, do the predictions and return them.

Args:
input (a pandas dataframe): The data on which to do the predictions. There will be
one prediction per row in the dataframe"""
clf = cls.get_model()
return clf.predict(input)


# The flask app for serving predictions
app = flask.Flask(__name__)


@app.route("/ping", methods=["GET"])
def ping():
"""Determine if the container is working and healthy. In this sample container, we declare
it healthy if we can load the model successfully."""
health = ScoringService.get_model() is not None # You can insert a health check here

status = 200 if health else 404
return flask.Response(response="\n", status=status, mimetype="application/json")


@app.route("/invocations", methods=["POST"])
def transformation():
"""Do an inference on a single batch of data. In this sample server, we take data as CSV, convert
it to a pandas data frame for internal use and then convert the predictions back to CSV (which really
just means one prediction per line, since there's a single column.
"""
data = None

# Convert from CSV to pandas
if flask.request.content_type == "text/csv":
data = flask.request.data.decode("utf-8")
s = io.StringIO(data)
data = pd.read_csv(s)

else:
return flask.Response(
response="This predictor only supports CSV data", status=415, mimetype="text/plain"
)

print("Invoked with {} records".format(data.shape[0]))

# Do the prediction
predictions = ScoringService.predict(data)

# Convert from numpy back to CSV
out = io.StringIO()
pd.DataFrame({"results": predictions}).to_csv(out, header=False, index=False)
result = out.getvalue()

return flask.Response(response=result, status=200, mimetype="text/csv")

In this exercise, my modeling output consisted of multiple models zipped together into a folder. In order to see how SageMaker stored the trained model in the S3 bucket, following linux based code was useful which you can execute from the Jupyter notebook.

!aws s3 cp <s3 bucket model location>/.../model.tar.gz <new_location>/model.tar.gz
!tar -xvf <new_location>/model.tar.gz

After this, the usual model evaluation stuffs!

The next step is to put these steps together in a pipeline for an automation. For that purpose I will be discussing AWS StepFunctions in the part II of this tutorial series.

Thank you for reading!

Reference:

--

--