常用的ml模型(建立MLOps模型API)
常用的ml模型(建立MLOps模型API)status=storage.Blob(bucket=bucket name='{}/{}'.format('testing' model_filename)).exists(storage_client)经过status=storage.Blob(bucket=bucket name='{}/{}'.format('production' model_filename)).exists(storage_client)import tensorflow as tf from tensorflow.keras.models import load_model import jsonpickle import data_utils email_notifications import sys import os from goo
在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。
在上一篇文章中,我们讨论了MLCI/CD管道中的单元测试步骤。在这一篇中,我们将构建模型API以支持预测服务。
下图显示了我们在项目流程中的位置。
代码文件的结构如下:
本文中的大多数代码与上一篇代码几乎相同,因此我们仅关注它们之间的区别。
在此存储库中找到完整的代码,因为下面显示的摘录是精简版本。
task.py协调容器中程序执行的task.py文件如下所示:
import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash Flask Response request jsonify
import threading
import requests
import time
# IMPORTANT
# If you're running this container locally and you want to access the API via local browser use http://172.17.0.2:5000/
# Starting flask app
app = Flask(__name__)
# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
global model
@app.before_first_request
def before_first_request():
def initialize_job():
if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
tf.config.set_soft_device_placement(True)
tf.debugging.set_log_device_placement(True)
global model
# Checking if there's any model saved at testing on GCS
model_gcs = data_utils.previous_model(bucket_name model_name)
# If any model exists at prod load it test it on data and use it on the API
if model_gcs[0] == True:
model_gcs = data_utils.load_model(bucket_name model_name)
if model_gcs[0] == True:
try:
model = load_model(model_name)
except Exception as e:
email_notifications.exception('Something went wrong trying to production model. Exception: ' str(e))
sys.exit(1)
else:
email_notifications.exception('Something went wrong when trying to load production model. Exception: ' str(model_gcs[1]))
sys.exit(1)
if model_gcs[0] == False:
email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
sys.exit(1)
if model_gcs[0] == None:
email_notifications.exception('Something went wrong when trying to check if production model exists. Exception: ' model_gcs[1] '. Aborting execution.')
sys.exit(1)
thread = threading.Thread(target=initialize_job)
thread.start()
@app.route('/init' methods=['GET' 'POST'])
def init():
message = {'message': 'API initialized.'}
response = jsonpickle.encode(message)
return Response(response=response status=200 mimetype="application/json")
@app.route('/' methods=['POST'])
def index():
if request.method=='POST':
try:
#Converting string that contains image to uint8
image = np.fromstring(request.data np.uint8)
image = image.reshape((128 128 3))
image = [image]
image = np.array(image)
image = image.astype(np.float16)
result = model.predict(image)
result = np.argmax(result)
message = {'message': '{}'.format(str(result))}
json_response = jsonify(message)
return json_response
except Exception as e:
message = {'message': 'Error'}
json_response = jsonify(message)
email_notifications.exception('Something went wrong when trying to make prediction via Production API. Exception: ' str(e) '. Aborting execution.')
return json_response
else:
message = {'message': 'Error. Please use this API in a proper manner.'}
json_response = jsonify(message)
return json_response
def self_initialize():
def initialization():
global started
started = False
while started == False:
try:
server_response = requests.get('http://127.0.0.1:5000/init')
if server_response.status_code == 200:
print('API has started successfully quitting initialization job.')
started = True
except:
print('API has not started. Still attempting to initialize it.')
time.sleep(3)
thread = threading.Thread(target=initialization)
thread.start()
if __name__ == '__main__':
self_initialize()
app.run(host='0.0.0.0' debug=True threaded=True)123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109复制代码类型:[cpp]
data_utils.py
该data_utils.py文件不同于其以前的版本仅在加载从生产注册表模型中的一部分。不同之处在于:
status=storage.Blob(bucket=bucket name='{}/{}'.format('testing' model_filename)).exists(storage_client)经过status=storage.Blob(bucket=bucket name='{}/{}'.format('production' model_filename)).exists(storage_client)
blob1=bucket.blob('{}/{}'.format('testing' model_filename))byblob1=bucket.blob('{}/{}'.format('production' model_filename))
Docker文件在我们的Dockerfile中,替换
运行gitclone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git
和
运行gitclone https://github.com/sergiovirahonda/AutomaticTraining-PredictionAPI.git
在本地构建和运行容器后,应该可以通过POST请求在http://172.17.0.2:5000/上获得功能齐全的预测服务。