关于模型部署可能有很多种概念或者解释。

根据看的一些文献和接触的一些开源工具,进行梳理。

模型部署说到底,就是通信传输,以及平台存储打通,以及任务的定时工作(定时工作可能偏向于调度了)。

 

1. 模型部署的抽象理解

1.1 模型导出

对于新训练完的模型进行通信传输(磁盘io或者网络io),这里可以考虑类似于k8或者hdfs这些大数据或者云计算的工具,将模型存储于分布式环境,

即模型从 内存 ---> 缓存端(hdfs或其他s3等介质中,甚至说是磁盘)

1.2 模型文件传输

系统部署中,需要get到新的模型,因此需要到之前的缓存端去读取

即模型 缓存端 ---> 目标系统端

这种情况下,我们可能理解目标系统需要去缓存端提取model,可能会有通信瓶颈,如目标端和缓存端在两个集群上,需要socket或者scp的传输来实现model提取,也即磁盘io或网络io的方式;

1.3 模型读入

在目标端的深度框架(tf/torch/mxnet...)对model文件读取,目标文件通过一些函数调用或者其他工具,如可以使用pyarrow工具进行传递,读取到model;

1.4 模型预测

根据进行深度框架对应的predict或者run等功能实现预测。这里可以有多种方式,如也可以通过spark的udf来实现模型的预测,将model进行广播之后,在对数据集进行udf的map计算。

 

2. 模型的在线预测与部署

tf一般有三个模型的口子,即session、estimator、keras。

先简单看下三者里面最麻烦的session是如何进行模型输入输出与预测的。

keras与estimator或者pytorch的导出方式相对简单,与sklearn等方法较为接近,可以是h5或者pickle等文件形式。

2.1 session环境模型保存与导入

原生的session包含2种模型输出和导入方式,即

###方式1
#1.1 模型写出文件(ckpt或者pb的保存模式差不多)
#save之后,包括图结构,变量的内容,都会被存入到新创建的 saved_model_dir 目录内
builder = tf.saved_model.builder.SavedModelBuilder(modelpath)
builder.add_meta_graph_and_variables(
        sess=sess,
        tags=['test_saved_model'],
        signature_def_map={signature_key: signature},
        clear_devices=True)
builder.save()
    
#1.2 模型导入
meta_graph_def = tf.saved_model.loader.load(sess1, ['test_saved_model'], saved_model_dir)
signature = meta_graph_def.signature_def
x_tensor_name = signature[signature_key].inputs[input_key].name
y_tensor_name = signature[signature_key].outputs[output_key].name
​
##1.3 模型预测
x = sess1.graph.get_tensor_by_name(x_tensor_name)
y = sess1.graph.get_tensor_by_name(y_tensor_name)
feed_dict = {x: np.ones([1, 10])}
pred = sess1.run(y, feed_dict=feed_dict)
​
##详细步骤参考  https://www.jianshu.com/p/de8ae24d574a
#本文仅为代码片段

 

###方式2
#2.1 模型写出文件(ckpt或者pb的保存模式差不多)
saver1  = tf.train.saver()
saver1.save(sess,'net/my_net.ckpt')
​
#2.2 导入图和ckpt,pb不能再进行训练,ckpt可以进一步炼丹
saver = tf.train.import_meta_graph('./xxx/xxx.ckpt.meta')
saver.restore(sess, './xxx/xxx.ckpt')
​
##2.3 模型预测
inputs = tf.get_default_graph().get_tensor_by_name('inputs:0')
prediction = tf.get_default_graph().get_tensor_by_name('prediction:0')
pred = sess.run(prediction, feed_dict={inputs: xxx}
​
#详细步骤参考 https://www.jianshu.com/p/c9fd5c01715e
#本文仅为代码片段

根据上述代码可以发现:

1)与前面第一章讲的类似,都会有对应的导出过程、模型读入、模型预测这些,唯独缺了模型文件传输这一块;

2)实际场景中,可能存在跨集群或者跨环境等,因而需要有模型文件传输这一环节;

3)在不涉及client端的模型框架不一致或者版本不一致等问题上,基本上第一章节的方法都可以有效应对;

 

3. 跨语言/环境的部署与预测

遇到跨语言或者环境时,可能相对较为尴尬,目前没有相对比较好的方法,都在摸索阶段吧。

本文讲解的环境,可能以python和java之间的跨语言为主,可能涉及scala,但和java差别不大。暂时不涉及C/C++,后续遇到了补上。

3.1 cs架构

3.1.1 flask提供服务

采用cs架构,设置一个服务端一个客户端,主要采用flask的方式进行restful实现。

1)当client触发时,会将数据通过http进行传输,

2)server端对模型预测后,将预测结果数据以json或其他的格式,传到约定的port/xxx/进行交互;

3)client端在约定端口的目录下获得结果;

从而形成了模型的预测,且模型不需要再目标客户端进行部署,在server端进行定时更新即可。

3.1.2 类连接池方式

R或者python,都支持与java进行通信连接;

对于数据量大时,需要考虑数据流,减少数据的一些io。

3.1.2.1 R语言实现连接

首先讲R,R语言用到较多的方式,就是Rserver。

1)在建立好R脚本,并且部署好Rserver环境;

2)在client端则编写好java的代码,代码逻辑是调用了R脚本,实现数据交互和在java中自动开启了Rserver;

3) 连接的开启,主要调用了RConnection,也即类似于会话的功能,可以跨线程乃至进程使用;

面对数据量较大时,需要考虑io问题,代码中减少数据io,尽量在其他逻辑中实现。

library(mlr)
library(xgboost)
library(tensorflow)
library(keras)
​
getSquareSum <- function(x,y){
  m <- x*x + y*y
  print("我们执行了R函数")  
  return(m)
}
## 代码取材于来自于公众号“Python爱好者社区”,文章名为 Java调用R与python
## 可以将上述代码修改成keras或者tf的代码,从而实现深度学习的模型调用,相当于在线调用训练或预测

 

同时构造一个Java2r的Java类,用来调用上面的R脚本,并且实现数据交互和在Java中自动开启Rserve:

import org.rosuda.REngine.REXP;
import org.rosuda.REngine.REXPMismatchException;
import org.rosuda.REngine.Rserve.RConnection;
import org.rosuda.REngine.Rserve.RserveException;
​
​
public class Java2r {
​
    public static void main(String[] args) {
​
        System.out.println(StartRserve.checkLocalRserve());
​
        System.out.println("准备开始Java调用R");
        System.out.println("-----------------------------------------------");
        RConnection rConnection = null;
​
        try {
         rConnection = new RConnection();
         rConnection.eval("source('C:/test.R')");
​
        } catch (RserveException e) {
        e.printStackTrace();
        } // 文件名不能带中文,否则报错:eval failed, request status: error code: 127
        int a = 2;
        int b = 3;
        int c = 4;
        int sum = 0;
​
        try {
        sum = rConnection.eval("getSquareSum(" + a + "," + b + ")").asInteger();
        } catch (Exception e) {
        e.printStackTrace();
        }
​
        System.out.println("the sum = " + sum);
        rConnection.close();
​
        // 调用R代码
        System.out.println("调用R代码");
​
        RConnection rc = null;
        try {
            rc = new RConnection();
        } catch (RserveException e) {
            e.printStackTrace();
        }
        REXP x = null;
        try {
            x = rc.eval("library(xgboost);R.version.string");
        } catch (RserveException e) {
            e.printStackTrace();
        }
        try {
            System.out.println(x.asString());
        } catch (REXPMismatchException e) {
            e.printStackTrace();
        }
        rc.close();
        System.out.println("-----------------------------------------------");
        System.out.println("回到Java");
​
    }
} 
//## 代码取材于来自于公众号“Python爱好者社区”,文章名为 Java调用R与python

 

3.1.2.2 Python实现连接(java调用命令行)

早期对机器学习模型的部署,不少前辈采用shell的方式实现,目前可能用到的比较少了。而,强大的java同样可以使用这种方法来实现python脚本的运行,核心代码就是:

Runtime.getRuntime().exec(args1);

但是,这种方法面对大数据量时,往往会力不从心,python代码中尽量避免数据流,减少数据的io。

首先准备要有一个Python脚本文件,当然与R一样,需要准备这些包,比如:

import sys
import pandas as pd
import numpy as np
import sklearn
import xgboost
import lightgbm
import tensorflow as tf
import keras
​
def my_test(str1,str2,str3,str4):
    return "Python函数运行:java调Python测试:"+str1+str2+str3+str4
    
​
if __name__=="__main__":
    print("脚本名:", sys.argv[0])
​
    my_arg = []
    for i in range(0, len(sys.argv)):
        my_arg.append(sys.argv[i])
    print("Java传入的参数长度为:"+str(len(my_arg)))
    
    result = my_test(my_arg[1],my_arg[2],my_arg[3],my_arg[4])
    print(result)
## 代码取材于来自于公众号“Python爱好者社区”,文章名为 Java调用R与python

其次我们构造一个J2py类用来调用上述Python脚本,并且实现Java数据与Python的交互(动态传参的过程):

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
​
​
public class J2py {
​
    public static void main(String[] args) {
        // 需传入的参数
        String a = "你好", b = "123", c = "徐静", d = "qingdao";
        System.out.println("Java中动态参数已经初始化,准备传参");
        // 设置命令行传入参数
        String[] args1 = new String[] { "python","java\\03_project\\J2py\\src\\my_model.py", a,b, c, d }; 
        //Java数据a,b,c,d传入Python
        Process pr;
        try {
            pr = Runtime.getRuntime().exec(args1); //最核心的函数
​
            BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream(), "gbk"));
            String line;
            List<String> lines = new ArrayList<String>();
​
            System.out.println("-----------------------------------------------");
​
            while ((line = in.readLine()) != null) {
                System.out.println(line);
                lines.add(line); //把Python的print值保存了下来
​
            }
            System.out.println("-------------------------------------------------");
​
            in.close();
​
            pr.waitFor();
        } catch (Exception e) {
            e.printStackTrace();
        }
​
        System.out.println("Java调Python结束");
​
    }
​
}
//## 代码取材于来自于公众号“Python爱好者社区”,文章名为 Java调用R与python

 

现在可能py4j会用的多一些,

3.1.3 thrift

前面的flask通过http的方式,在数据量大的情况下,性能影响会较大,涉及到大数据量的传输可能不现实,因此这里面讲另外一块,通过thrift实现跨语言服务,通过Thrift实现python和java之间的RPC调用。

1)主要是将java端的数据集(或者干脆称之为训练集)丢到首先经过序列化,通过thrift;

#thrift -r --gen cpp student.thrift  //生成C++代码
thrift -r --gen py student.thrift  //生成python代码
thrift -r --gen java student.thrift   //生成java代码

2)

这一部分主要参考了https://blog.csdn.net/huanucas/article/details/90146636

3.2 中间层转换

3.2.1 onnx

传统的ml项目则是通过pmml的方式进行预测,但精度会有所损失,这也是无法避免的问题。

而面对深度框架时候,也存在模型的标准格式,可以兼容不同深度学习框架,如采用onnx的中间格式,tf和torch都会兼容这种格式。

具体的模式中体与第一章中讲解的类似,就是讲导出的模型文件,中间转成了onnx,最后模型读入的时候,再根据实际的深度框架再转回来即可。

详细步骤不具体展开。

3.2.2 thrift

这里面讲另外一块,通过thrift实现跨语言服务。由于前面已经阐述过thrift,因此这里不再阐述。RPC或者说序列化中间格式,往往都会混在一块,很难说清楚。

 

3.2.3 tvm

tensorflow对于终端(android/ios)有tflite的强力支持,也有针对移动端剪枝蒸馏的pocketflow库(腾讯开源,依然导出tflite)。

pytorch在1.3之后添加了对移动端的支持,pytorch模型经onnx转化为tvm模型,可能在使用上没有tf的支持度好。

从而回到主题,谈到模型部署在移动端这种环境,与常规的服务端的部署差别较大,需要特定的虚拟机提供支持,即tvm,严格意义上来说,这种部署方式不称之为是介质或者说容器的,类似于jvm,但又似乎不全对,毕竟也是生成了一些中间的文件。因此暂时放置在这里。

  1. 环境准备----编译安装llvm/tvm/onnx,
    这些基本的必要条件,预编译的文件可能会有报错,因而需要自己手动进行源码下载并编译,我们在大数据场景中,如spark/hive等,也会遇到类似的问题。llvm是一个编译器框架。onnx用conda/pip会出现一些问题,也需从源码下载并编译
  2. pytorch转onnx
    这一块可能资料较多,根据对应的场景,图像还是nlp或者其他的,进行一些转换
  3. onnx转tvm
    tvm是深度学习工作负载部署到硬件的端到端IR(中间表示)堆栈,把深度学习模型分发到各种硬件设备上的、端到端的解决方案。
    因而这里是关键的一步,需要依赖llvm。
    对onnx的转换编译完成,得到gemfield.so / gemfield.json / gemfield.params三个文件,
    gemfield.so 动态库文件
    gemfield.json 使用json结构描述了神经网络结构
    gemfield.params 包含了网络权重参数
import onnx
import numpy as np
import tvm
import tvm.relay as relay
​
##1.载入源onnx文件
onnx_model = onnx.load('gemfield.onnx')   
​
x = np.ones([1,3,256,256])
# arch = "arm64"     
# target =  "llvm -target=%s-linux-android" % arch
target = 'llvm' ##这里的转换设备方向是llvm,后续可以修改为安卓,即前一行代码
input_name = 'gemfield'
shape_dict = {input_name: x.shape}
##2.导入onnx
sym, params = relay.frontend.from_onnx(onnx_model, shape_dict)
##3.计算图构建
with relay.build_config(opt_level=1):
 intrp = relay.build_module.create_executor('graph', sym, tvm.cpu(0), target)
​
dtype = 'float32'
##4.
tvm_output = intrp.evaluate(sym)(tvm.nd.array(x.astype(dtype)), **params).asnumpy()
##5.编译生成目标文件
with relay.build_config(opt_level=2):
 graph, lib, params = relay.build_module.build(sym, target, params=params)
​
libpath = "gemfield.so"
lib.export_library(libpath)
​
graph_json_path = "gemfield.json"
with open(graph_json_path, 'w') as fo:
 fo.write(graph)
​
param_path = "gemfield.params"
with open(param_path, 'wb') as fo:
 fo.write(relay.save_param_dict(params))
##代码源自https://zhuanlan.zhihu.com/p/58995914     大佬Gemfield写的。
4.推断
上述编译出来的gemfield.so通过tvm.module加载。使用gemfield.so和tvm模块进行推断。
即尝试在新的环境中测试生成的文件,是否可以在新环境中跑起来,进行结果的预测推断。
代码可以参考前面地址,这里就不贴了。

tvm参考官方文档。https://docs.tvm.ai/tutorials/frontend/from_onnx.html#sphx-glr-tutorials-frontend-from-onnx-py

对于详细的经过tvm可以参考

https://zhuanlan.zhihu.com/p/108679717

https://zhuanlan.zhihu.com/p/58995914

 

3.3 网络权重传递

目前在tf的部署上,使用frozen pb的方案较多,不大使用tf serving,graph对网络搞不定,使用带宽效率不高

 

3.4 函数与依赖调用

实际上,在传统的机器学习项目中,面对java环境也可以类似的,可以将python或R语言生成的model做成一个jar,后续java进行调用即可。

 

这里以spark调用tf模型举例,在java环境下也适用。

使用spark-scala调用tensorflow2.0训练好的模型,具体可以参考

https://github.com/lyhue1991/eat_tensorflow2_in_30_days/blob/master/6-7%2C%E4%BD%BF%E7%94%A8spark-scala%E8%B0%83%E7%94%A8tensorflow%E6%A8%A1%E5%9E%8B.md

大致思路如下:

1)模型导出

将tf训练完成的model进行保存,如pb格式;

2)spark工程中,引入tensorflow依赖,即后续spark将调用这个jar包,如

<dependency>
    <groupId>org.tensorflow</groupId>
    <artifactId>tensorflow</artifactId>
    <version>1.15.0</version>
</dependency>

3)模型读入

模型通过tf的方法载入,由于spark是分布式的,因而需要进行广播,告知所有节点这个model了;

val spark = SparkSession
    .builder()
    .appName("TfRDD")
    .enableHiveSupport()
    .getOrCreate()
val sc = spark.sparkContext
//在Driver端加载模型
val bundle = tf.SavedModelBundle.load("/Users/model/1","serve")
​
//利用广播将模型发送到excutor上
val broads = sc.broadcast(bundle)
​
## 代码截取于https://github.com/lyhue1991/eat_tensorflow2_in_30_days/blob/master/6-7%2C%E4%BD%BF%E7%94%A8spark-scala%E8%B0%83%E7%94%A8tensorflow%E6%A8%A1%E5%9E%8B.md

4)模型预测(推断)

采用rdd或者sparksql均可,两者基本一致,

即将广播中的model获取

调用model中的session方法,和在tf中一样,

最后run

从而udf的编写完成,或者说是推断完成,这样就可以直接采用spark的withcolumn/selectExpr进行实现。rdd的也类似,只是将方法写在了map函数中。

//方法1,rdd实现
//通过mapPartitions调用模型进行批量推断
val rdd_result = rdd_data.mapPartitions(iter => {
    val arr = iter.toArray
    val model = broads.value
    val sess = model.session()
    val x = tf.Tensor.create(arr)
    val y =  sess.runner().feed("serving_default_inputs:0", x)
             .fetch("StatefulPartitionedCall:0").run().get(0)
​
//将预测结果拷贝到相同shape的Float类型的Array中
    val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
    y.copyTo(result)
    result.iterator  
})
​
//方法2,sparksql实现
//构造预测函数,并将其注册成sparkSQL的udf
val tfpredict = (features:WrappedArray[Float])  => {
            val bund = broads.value
            val sess = bund.session()
            val x = tf.Tensor.create(Array(features.toArray))
            val y =  sess.runner().feed("serving_default_inputs:0", x)
                     .fetch("StatefulPartitionedCall:0").run().get(0)
            val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
            y.copyTo(result)
            val y_pred = result(0)(0)
            y_pred
        }
spark.udf.register("tfpredict",tfpredict)
        
//构造DataFrame数据集,将features放到一列中
val dfdata =sc.parallelize(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(7.0f,8.0f))).toDF("features")
​
//调用sparkSQL预测函数,增加一个新的列作为y_preds
val dfresult = dfdata.selectExpr("features","tfpredict(features) as y_preds")
​
## 代码截取于地址与上述一样

 

3.5 Api接口

尽管并不是任何方式都可以完美契合多种语言,但一个接口确实可以帮助解决很多问题,当然背后是不少工程师的心血。

我们常用的spark支持较多的主流的机器学习语言如Python/R/java/scala,H2O工具也是支持这4种语言,也是我工作中最常用的4种语言,当然4种语言各有千秋,其实4者也从前面的介绍中发现,各自有一些连接或者通信方式实现交互。本文主要对Python和R实现Spark的调用进行理解。

3.5.1 py4j

在可能py4j会用的多一些,PySpark 使用了 Py4j开源库,PySpark通过Py4J库与Spark 基于Scala的API进行通信。

Py4J并不特定于PySpark或Spark,Py4J允许任何Python程序与基于JVM的代码进行对话。

PySpark 使用了 Py4j开源库,创建 Python 端的 SparkContext 对象时,实际会启动 JVM,并创建一个 Scala 端的 SparkContext 对象。PySpark满足了对大数据场景的愿望。

关于py4j的使用可以参考官方文档,http://www.py4j.org/py4j_python.html,也是类似于服务端和客户端的操作

pyspark和py4j可以参考 https://www.jianshu.com/p/013fe44422c9?from=timeline&isappinstalled=0

我们在flink的流式在线计算,对模型在线预测,如果是调用python,比如sklearn的算法比如xgb等,可以利用py4j帮助实现;

3.5.2 Sparklyr

类似的R语言也开发了类似的功能,SparkR 和 Sparklyr, 是两个基于Spark的R语言接口,通过简单的语法深度集成到R语言生态中。

Sparklyr 通过拓展程序,graphframes 实现图挖掘,比如Pagerank、LPA等

Sparklyr 通过拓展程序 Rsparkling 实现深度学习,如 Anto-Encoder

SparkR 仅在实时计算上领先于 Sparklyr,在图计算、机器学习、深度学习等领域已经被拉开差距,在大多数场景下,Sparklyr将是一个更好的选择,在不久的将来,Sparklyr也将集成Streaming模块,届时将全面覆盖SparkR功能。

 

上述关于SparkR 和 Sparklyr,都是参考了这篇文章,详细比较可以参考文献https://segmentfault.com/a/1190000013806395?utm_source=tag-newest

 

4.模型部署一些成熟工具的对比

mlflow: 一种机器学习生命周期管理平台,可以快速将机器学习的每个model进行保存,记录实验,同事具备一个前端,支持多语言多框架,将模型保存在本地或者hdfs等,即但需要通过pyarrow作为桥梁,目前是开源的框架,之前还给他们提过一个小小的bug,他们也发现了这个问题,无奈他们最终没有办法修改,api改改太麻烦

https://github.com/mlflow/mlflow

一些使用可以查看https://www.cnblogs.com/CheeseZH/p/11943280.html,或者查看官方文档

 

sagemaker: 是一项完全托管的服务,可以帮助机器学习开发者和数据科学家快速构建、训练和部署模型,没有开源,毕竟是amazon的东西,不过开源的autogulon好像也引用了它;

 

bentoml: An open-source platform for high-performance ML model serving, https://github.com/bentoml/BentoML,开源的框架

操作可以参考https://docs.bentoml.org/en/latest/quickstart.html

三者比较后续补上,目前主要使用mlflow较多

 

 

5.小结

后续还需要补充一些优劣点,路漫漫,不知道为啥,markdown复制过来的格式怎么这么丑

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐