MLOps Continuous Delivery with Model Unit Testing





5.00/5 (1 vote)
In this article, we develop a model unit testing container.
In this series of articles, we’ll walk you through the process of applying CI/CD to the AI tasks. You’ll end up with a functional pipeline that meets the requirements of level 2 in the Google MLOps Maturity Model. We’re assuming that you have some familiarity with Python, Deep Learning, Docker, DevOps, and Flask.
In the previous articles of this series, we explained how to continuously integrate model changes and continuously train our model when new data was gathered. In this article, we’ll test the trained models in an environment that mimics the production one. We’ll load the model saved in the testing registry, expose it via a clone of the model API, and run tests against it. You are welcome to add your own tests in this stage. The diagram below shows where we are in our project process.
The code file structure is as follows:
Get the original code from its repository.
data_utils.py
The data_utils.py file contains the functions that check if a model exists in the testing model registry and, if yes, loads this model:
import datetime from google.cloud import storage import pandas as pd import numpy as np import os import cv2 import sys def previous_model(bucket_name,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client) return status,None except Exception as e: print('Something went wrong when trying to check if previous model exists GCS bucket. Exception: '+e,flush=True) return None,e def load_model(bucket_name,model_filename): try: storage_client = storage.Client() #if running on GCP bucket = storage_client.bucket(bucket_name) blob1 = bucket.blob('{}/{}'.format('testing',model_filename)) blob1.download_to_filename('/root/'+str(model_filename)) return True,None except Exception as e: print('Something went wrong when trying to load previous model from GCS bucket. Exception: '+e,flush=True) return False,e
email_notifications.py
The the email_notifications.py file handles the notifications sent to the product owner regarding successful or problematic code execution:
import smtplib
import os
# Email variables definition
sender = ‘example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = ‘example@gmail.com’
smtp_password = ‘your_password’
def send_update(message):
message = 'Subject: {}\n\n{}'.format('An automatic unit testing has ended recently.', message)
try:
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email.',flush=True)
print('Exception: ',e)
return
def exception(e_message):
try:
message = 'Subject: {}\n\n{}'.format('Something went wrong with the testing API.', e_message)
server = smtplib.SMTP(smtp_provider,smtp_port)
server.starttls()
server.login(smtp_account,smtp_password)
server.sendmail(sender, receiver, message)
return
except Exception as e:
print('Something went wrong. Unable to send email.',flush=True)
print('Exception: ',e)
return
task.py
The task.py file handles the container execution. It orchestrates the Flask application initialization and ending, model loading, model testing, and email notifications:
import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash,Flask,Response,request,jsonify
import threading
import requests
import time
# IMPORTANT
# If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/
# Starting flask app
app = Flask(__name__)
# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
class_names = ['Normal','Viral Pneumonia','COVID-19']
headers = {'content-type': 'image/png'}
api = 'http://127.0.0.1:5000/' # self app
global model
@app.before_first_request
def before_first_request():
def initialize_job():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
global model
# Checking if there's any model saved at testing on GCS
model_gcs = data_utils.previous_model(bucket_name,model_name)
# If any model exists at testing, load it, test it on data and use it on the API
if model_gcs[0] == True:
model_gcs = data_utils.load_model(bucket_name,model_name)
if model_gcs[0] == True:
try:
model = load_model(model_name)
except Exception as e:
email_notifications.exception('Something went wrong trying to test old /testing model. Exception: '+str(e))
sys.exit(1)
else:
email_notifications.exception('Something went wrong when trying to load old /testing model. Exception: '+str(model_gcs[1]))
sys.exit(1)
if model_gcs[0] == False:
email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
sys.exit(1)
if model_gcs[0] == None:
email_notifications.exception('Something went wrong when trying to check if old testing model exists. Exception: '+model_gcs[1]+'. Aborting automatic testing.')
sys.exit(1)
api_test()
thread = threading.Thread(target=initialize_job)
thread.start()
@app.route('/init', methods=['GET','POST'])
def init():
message = {'message': 'API initialized.'}
response = jsonpickle.encode(message)
return Response(response=response, status=200, mimetype="application/json")
@app.route('/', methods=['POST'])
def index():
if request.method=='POST':
try:
#Converting string that contains image to uint8
image = np.fromstring(request.data,np.uint8)
image = image.reshape((128,128,3))
image = [image]
image = np.array(image)
image = image.astype(np.float16)
result = model.predict(image)
result = np.argmax(result)
message = {'message': '{}'.format(str(result))}
json_response = jsonify(message)
return json_response
except Exception as e:
message = {'message': 'Error: '+str(e)}
json_response = jsonify(message)
email_notifications.exception('Something went wrong when trying to make prediction via testing API. Exception: '+str(e)+'. Aborting automatic testing.')
return json_response
else:
message = {'message': 'Error. Please use this API in a proper manner.'}
json_response = jsonify(message)
return json_response
def self_initialize():
def initialization():
global started
started = False
while started == False:
try:
server_response = requests.get('http://127.0.0.1:5000/init')
if server_response.status_code == 200:
started = True
except:
pass
time.sleep(3)
thread = threading.Thread(target=initialization)
thread.start()
def api_test():
try:
image = cv2.imread('TEST_IMAGE.jpg')
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = cv2.resize(image, (128, 128))
result = requests.post(api, data=image.tostring(),headers=headers)
result = result.json()
prediction = int(result['message'])
if prediction == 1:
email_notifications.send_update('Testing stage has ended successfully. Shutting down container. Check the GCP logs for more information.')
sys.exit(0)
else:
email_notifications.send_update('Testing stage has crashed. Check the GCP logs for more information.')
sys.exit(1)
except Exception as e:
email_notifications.exception('Testing stage crashed with an exception: '+str(e)+'. Check the GCP logs for more information.')
sys.exit(1)
if __name__ == '__main__':
self_initialize()
app.run(host='0.0.0.0',debug=True,threaded=True)
Dockerfile
Our Dockerfile provides the rules for container building:
FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0 WORKDIR /root RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python Flask jsonpickle RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev ADD "https://www.random.org/cgi-bin/randbyte?nbytes=10&format=h" skipcache RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git RUN mv /root/AutomaticTraining-UnitTesting/data_utils.py /root RUN mv /root/AutomaticTraining-UnitTesting/task.py /root RUN mv /root/AutomaticTraining-UnitTesting/email_notifications.py /root RUN mv /root/AutomaticTraining-UnitTesting/TEST_IMAGE.jpg /root EXPOSE 5000 ENTRYPOINT ["python","task.py"]
Once you’ve built and ran the container locally, you’ll end up with a functional model unit tester. It allows you to verify that the model about to be deployed to production outputs the expected results, with no errors or failures.
Feel free to include additional tests in this job. Usually, such tests are business-case-dependent.
Next Steps
In the next article, we’ll build an API that will load our model from the production registry to enable the prediction service described in the Google MLOps Maturity Model. Stay tuned!