版权声明:本文为博主原创文章,未经博主允许不得转载。有问题可以加微信:lp9628(注明CSDN)。 https://blog.csdn.net/u014365862/article/details/81009551
MachineLP的Github(欢迎follow):https://github.com/MachineLP
(1)克隆到本地: git clone https://github.com/tensorflow/serving.git
(2)cd到serving目录下面
(3)pip install tensorflow-serving-api
(4)运行: python tensorflow_serving/example/mnist_saved_model.py ./mnist_model (下载数据需要访问外国网站)
mnist_saved_model.py的代码如下:完成训练 和 模型及其变量的保存。
from __future__ import print_function
import os
import sys
# This is a placeholder for a Google-internal import.
import tensorflow as tf
import mnist_input_data
tf.app.flags.DEFINE_integer('training_iteration', 1000,
'number of training iterations.')
tf.app.flags.DEFINE_integer('model_version', 1, 'version number of the model.')
tf.app.flags.DEFINE_string('work_dir', './tmp', 'Working directory.')
FLAGS = tf.app.flags.FLAGS
def main(_):
if len(sys.argv) < 2 or sys.argv[-1].startswith('-'):
print('Usage: mnist_export.py [--training_iteration=x] '
'[--model_version=y] export_dir')
sys.exit(-1)
if FLAGS.training_iteration <= 0:
print('Please specify a positive value for training iteration.')
sys.exit(-1)
if FLAGS.model_version <= 0:
print('Please specify a positive value for version number.')
sys.exit(-1)
# Train model
print('Training model...')
mnist = mnist_input_data.read_data_sets(FLAGS.work_dir, one_hot=True)
sess = tf.InteractiveSession()
serialized_tf_example = tf.placeholder(tf.string, name='tf_example')
feature_configs = {'x': tf.FixedLenFeature(shape=[784], dtype=tf.float32),}
tf_example = tf.parse_example(serialized_tf_example, feature_configs)
x = tf.identity(tf_example['x'], name='x') # use tf.identity() to assign name
y_ = tf.placeholder('float', shape=[None, 10])
w = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
sess.run(tf.global_variables_initializer())
y = tf.nn.softmax(tf.matmul(x, w) + b, name='y')
cross_entropy = -tf.reduce_sum(y_ * tf.log(y))
train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
values, indices = tf.nn.top_k(y, 10)
table = tf.contrib.lookup.index_to_string_table_from_tensor(
tf.constant([str(i) for i in range(10)]))
prediction_classes = table.lookup(tf.to_int64(indices))
for _ in range(FLAGS.training_iteration):
batch = mnist.train.next_batch(50)
train_step.run(feed_dict={x: batch[0], y_: batch[1]})
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, 'float'))
print('training accuracy %g' % sess.run(
accuracy, feed_dict={
x: mnist.test.images,
y_: mnist.test.labels
}))
print('Done training!')
# Export model
# WARNING(break-tutorial-inline-code): The following code snippet is
# in-lined in tutorials, please update tutorial documents accordingly
# whenever code changes.
export_path_base = sys.argv[-1]
export_path = os.path.join(
tf.compat.as_bytes(export_path_base),
tf.compat.as_bytes(str(FLAGS.model_version)))
print('Exporting trained model to', export_path)
builder = tf.saved_model.builder.SavedModelBuilder(export_path)
# Build the signature_def_map.
classification_inputs = tf.saved_model.utils.build_tensor_info(
serialized_tf_example)
classification_outputs_classes = tf.saved_model.utils.build_tensor_info(
prediction_classes)
classification_outputs_scores = tf.saved_model.utils.build_tensor_info(values)
classification_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={
tf.saved_model.signature_constants.CLASSIFY_INPUTS:
classification_inputs
},
outputs={
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES:
classification_outputs_classes,
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_SCORES:
classification_outputs_scores
},
method_name=tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))
tensor_info_x = tf.saved_model.utils.build_tensor_info(x)
tensor_info_y = tf.saved_model.utils.build_tensor_info(y)
prediction_signature = (
tf.saved_model.signature_def_utils.build_signature_def(
inputs={'images': tensor_info_x},
outputs={'scores': tensor_info_y},
method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME))
legacy_init_op = tf.group(tf.tables_initializer(), name='legacy_init_op')
builder.add_meta_graph_and_variables(
sess, [tf.saved_model.tag_constants.SERVING],
signature_def_map={
'predict_images':
prediction_signature,
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY:
classification_signature,
},
legacy_init_op=legacy_init_op)
builder.save()
print('Done exporting!')
if __name__ == '__main__':
tf.app.run()
(5)编译模型服务、启动服务:
bazel build -c opt //tensorflow_serving/model_servers:tensorflow_model_server
bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server --port=9000 --model_name=mnist --model_base_path=/Users/liupeng/Desktop/tf_serving/serving/mnist_model/ (使用绝对路径)
结果显示:
(6)服务测试环节:
python tensorflow_serving/example/mnist_client.py --num_tests=1000 --server=localhost:9000
结果如下:
mnist_client.py的代码:完成服务的调用。
from __future__ import print_function
import sys
import threading
# This is a placeholder for a Google-internal import.
from grpc.beta import implementations
import numpy
import tensorflow as tf
from tensorflow_serving.apis import predict_pb2
from tensorflow_serving.apis import prediction_service_pb2
import mnist_input_data
tf.app.flags.DEFINE_integer('concurrency', 1,
'maximum number of concurrent inference requests')
tf.app.flags.DEFINE_integer('num_tests', 100, 'Number of test images')
tf.app.flags.DEFINE_string('server', '', 'PredictionService host:port')
tf.app.flags.DEFINE_string('work_dir', '/tmp', 'Working directory. ')
FLAGS = tf.app.flags.FLAGS
class _ResultCounter(object):
"""Counter for the prediction results."""
def __init__(self, num_tests, concurrency):
self._num_tests = num_tests
self._concurrency = concurrency
self._error = 0
self._done = 0
self._active = 0
self._condition = threading.Condition()
def inc_error(self):
with self._condition:
self._error += 1
def inc_done(self):
with self._condition:
self._done += 1
self._condition.notify()
def dec_active(self):
with self._condition:
self._active -= 1
self._condition.notify()
def get_error_rate(self):
with self._condition:
while self._done != self._num_tests:
self._condition.wait()
return self._error / float(self._num_tests)
def throttle(self):
with self._condition:
while self._active == self._concurrency:
self._condition.wait()
self._active += 1
def _create_rpc_callback(label, result_counter):
"""Creates RPC callback function.
Args:
label: The correct label for the predicted example.
result_counter: Counter for the prediction result.
Returns:
The callback function.
"""
def _callback(result_future):
"""Callback function.
Calculates the statistics for the prediction result.
Args:
result_future: Result future of the RPC.
"""
exception = result_future.exception()
if exception:
result_counter.inc_error()
print(exception)
else:
sys.stdout.write('.')
sys.stdout.flush()
response = numpy.array(
result_future.result().outputs['scores'].float_val)
prediction = numpy.argmax(response)
if label != prediction:
result_counter.inc_error()
result_counter.inc_done()
result_counter.dec_active()
return _callback
def do_inference(hostport, work_dir, concurrency, num_tests):
"""Tests PredictionService with concurrent requests.
Args:
hostport: Host:port address of the PredictionService.
work_dir: The full path of working directory for test data set.
concurrency: Maximum number of concurrent requests.
num_tests: Number of test images to use.
Returns:
The classification error rate.
Raises:
IOError: An error occurred processing test data set.
"""
test_data_set = mnist_input_data.read_data_sets(work_dir).test
host, port = hostport.split(':')
channel = implementations.insecure_channel(host, int(port))
stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)
result_counter = _ResultCounter(num_tests, concurrency)
for _ in range(num_tests):
request = predict_pb2.PredictRequest()
request.model_spec.name = 'mnist'
request.model_spec.signature_name = 'predict_images'
image, label = test_data_set.next_batch(1)
request.inputs['images'].CopyFrom(
tf.contrib.util.make_tensor_proto(image[0], shape=[1, image[0].size]))
result_counter.throttle()
result_future = stub.Predict.future(request, 5.0) # 5 seconds
result_future.add_done_callback(
_create_rpc_callback(label[0], result_counter))
return result_counter.get_error_rate()
def main(_):
if FLAGS.num_tests > 10000:
print('num_tests should not be greater than 10k')
return
if not FLAGS.server:
print('please specify server host:port')
return
error_rate = do_inference(FLAGS.server, FLAGS.work_dir,
FLAGS.concurrency, FLAGS.num_tests)
print('\nInference error rate: %s%%' % (error_rate * 100))
if __name__ == '__main__':
tf.app.run()
深度学习框架TensorFlow不仅在学术界得到了普及,在工业界也有非常广泛的应用。日常我们接触到的TensorFlow的用法大多为基于Python的实验用法,并不能直接用于工业界的线上产品。本文介绍一种简单的发布TensorFlow模型的方法。
在工业产品中使用TensorFlow模型的方法有哪些?
(1)用TensorFlow的C++/Java/Nodejs API直接使用保存的TensorFlow模型:类似Caffe,适合做桌面软件。
(2)直接将使用TensorFlow的Python代码放到Flask等Web程序中,提供Restful接口:实现和调试方便,但效率不太高,不大适合高负荷场景,且没有版本管理、模型热更新等功能。
(3)将TensorFlow模型托管到TensorFlow Serving中,提供RPC或Restful服务:实现方便,高效,自带版本管理、模型热更新等,很适合大规模线上业务。
本文介绍的是方法3,如何用最简单的方法将TensorFlow发布到TensorFlow Serving中。
使用TensorFlow Serving需要有特定的模型保存形式, 可通过下面代码转换:
# coding=utf-8
import tensorflow as tf
# 模型版本号
model_version = 1
# 定义模型
x = tf.placeholder(tf.float32, shape=[None, 4], name="x")
y = tf.layers.dense(x, 10, activation=tf.nn.softmax)
with tf.Session() as sess:
# 初始化变量
sess.run(tf.global_variables_initializer())
# 模型训练过程,省略
# ......
# 保存训练好的模型到"model/版本号"中
tf.saved_model.simple_save(
session=sess,
export_dir="model/{}".format(model_version),
inputs={"x": x},
outputs={"y": y}
)
tf.saved_model.simple_save的理解:
# 最简单的保存方法是使用tf.saved_model.simple_save函数,代码如下:
tf.saved_model.simple_save(sess,
"./model",
inputs={"myInput": x},
outputs={"myOutput": y})
#这段代码将模型保存在**./model**目录。
# 你也可以采用比较复杂的写法:
builder = tf.saved_model.builder.SavedModelBuilder("./model")
signature = predict_signature_def(inputs={'myInput': x},
outputs={'myOutput': y})
builder.add_meta_graph_and_variables(sess=sess,
tags=[tag_constants.SERVING],
signature_def_map={'predict': signature})
builder.save()
#在simple_save方法中,系统会给一个默认的tag: “serve”,也可以用tag_constants.SERVING这个常量。
#加载
#对不同语言而言,加载过程有些类似,这里还是以python为例:
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
with tf.Session(graph=tf.Graph()) as sess:
tf.saved_model.loader.load(sess, ["serve"], "./model")
graph = tf.get_default_graph()
input = np.expand_dims(mnist.test.images[0], 0)
x = sess.graph.get_tensor_by_name('myInput:0')
y = sess.graph.get_tensor_by_name('myOutput:0')
batch_xs, batch_ys = mnist.test.next_batch(1)
scores = sess.run(y,
feed_dict={x: batch_xs})
print("predict: %d, actual: %d" % (np.argmax(scores, 1), np.argmax(batch_ys, 1)))
代码中除了最后一句,其它部分都是常规的TensorFlow代码,模型定义、进入Session、模型训练等。代码的最后用tf.saved_model.simple_save将模型保存为SavedModel。注意,这里将模型保存在了"model/版本号"文件夹中,而不是直接保存在了"model"文件夹中,这是因为TensorFlow Serving要求在模型目录下加一层版本目录,来进行版本维护、热更新等:
安装TensorFlow Serving
对于Ubuntu或Debian(Bash on Windows10也可以),可以使用apt-get安装Tensorflow Serving。先用下面的命令添加软件源:
echo "deb [arch=amd64] http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list && \
curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | sudo apt-key add -
添加成功后可直接用apt-get进行安装:
apt-get update && apt-get install tensorflow-model-server
方法二:用Docker安装
TensorFlow Serving官方提供了Docker容器,可以一键安装:
docker pull tensorflow/serving
将模型发布到TensorFlow Serving中:
下面的方法基于在本机使用apt-get安装TensorFlow Serving的方法。对于Docker用户,需要将模型挂载或复制到Docker中,按照Docker中的路径来执行下面的教程。
用下面这行命令,就可以启动TensorFlow Serving,并将刚才保存的模型发布到TensorFlow Serving中。注意,这里的模型所在路径是刚才"model"目录的路径,而不是"model/版本号"目录的路径,因为TensorFlow Serving认为用户的模型所在路径中包含了多个版本的模型。
tensorflow_model_server --port=8500 --rest_api_port=8501 --model_name=模型名 --model_base_path=模型所在路径
客户端可以用GRPC和Restful两种方式来调用TensorFlow Serving,这里我们介绍基于Restful的方法,可以看到,命令中指定的Restful服务端口为8501,我们可以用curl命令来查看服务的状态:
curl http://localhost:8501/v1/models/model
执行结果:
{
"model_version_status": [
{
"version": "1",
"state": "AVAILABLE",
"status": {
"error_code": "OK",
"error_message": ""
}
}
]
}
下面我们用curl向TensorFlow Serving发送一个输入x=[1.1, 1.2, 0.8, 1.3],来获取预测的输出信息y:
curl -d '{"instances": [[1.1,1.2,0.8,1.3]]}' -X POST http://localhost:8501/v1/models/模型名:predict
服务器返回的结果如下:
{
"predictions": [[0.0649088, 0.0974758, 0.0456831, 0.297224, 0.152209, 0.0177431, 0.104193, 0.0450511, 0.13074, 0.044771]]
}
我们的模型成功地输出了y=[0.0649088, 0.0974758, 0.0456831, 0.297224, 0.152209, 0.0177431, 0.104193, 0.0450511, 0.13074, 0.044771]
这里我们使用的是curl命令,在实际工程中,使用requests(Python)、OkHttp(Java)等Http请求库可以用类似的方法方便地请求TensorFlow Serving来获取模型的预测结果。
版本维护和模型热更新
刚才我们将模型保存在了"model/1"中,其中1是模型的版本号。如果我们的算法工程师研发出了更好的模型,此时我们并不需要将TensorFlow Serving重启,只需要将新模型发布在"model/新版本号"中,如"model/2"。TensorFlow Serving就会自动发布新版本的模型,客户端也可以请求新版本对应的API了。
.pb格式文件如何在TensorFlow serving中启动? 需要进行转化:参考:https://blog.csdn.net/mouxiaoqiu/article/details/81220222
具体还可以借鉴:构建并用 TensorFlow Serving 部署 Wide & Deep 模型:https://juejin.im/entry/59fc393f51882546b15bdec2