triton + chatglm4 部署

简单介绍

triton的推理架构由两部分组成,一部分由client,一部分由k8s cluster组成。

  • 集群:总体上是一个k8s集群,要不然启动,生命周期维护,弹性扩容,需要一个管理系统会自动化运维,编排。
  • 前端均衡器:前排有Load Balancer负载均衡器,把用户请求分配各个推理容器。推理容器会启动多个,如何分担推理容器。
  • 模型仓库:把模型文件管理起来。
  • 指标监控系统:运维系统正常监控都有。
  • triton推理服务: 启动多个节点,多个模型框架的支持,因为有的模型onnx,有的pytorch,有的tensorflow,有的tensorRT,
    资料:

https://github.com/triton-inference-server/tutorials

trinton的层数

k8s层,管理多个节点

一个triton节点都是一个模型(一个gpu,或多个gpu)

tensorRt(对NN模型的一个加速库)

trinton具备的功能

  • 要支持多模型的框架(tensorflow, pytorch主流,tensorRT,ONNX RT,自定义)
  • 异构支持,cpu,gpu,多gpu
  • 并发模型执行(cpu级别的优化)
  • 支持http,rest,grpc(http2),apis
  • k8s集群的融入和指标监控系统
  • 模型的管理加载更新
  • 重点就是推理服务队列的分发

如何设计triton?

  • (从推理框架解耦)推理从请求到结束的生命周期,当收到一个调用qwen的一个请求,如何调用?比如说写了些python代码(pytorch)去实现推理,但是推理代码到底是由人员开发,还是交给pytorch本身?所以需要做的是要让它和推理框架解耦化。
  • (后端管理)还有常见的后端代理管理。
  • (并发)模型的并发支持,多线程。
  • (队列)请求队列的调度管理
  • (回复管理)推理结束不单是简单的转发,信息整合回复管理,也就是推理结果管理。
  • (GPRC & HTTP)GPRC和HTTP服务
    单发模型场景:据个例子如cv,输入一个图像返回一个字符串

piepline场景:就是过程涉及多个模型的串联

状态模型场景 :比如llm以及其上下文

并发场景

最常见的并发场景1:单一模型多个线程进行推理。

最常见的并发场景2:多模型多线程。

主要triton抓住了3种模型特征,无状态模型,有状态的模型,和集成模型。

几个组件介绍

  • dynamic batching scheduler:先打个batch,再送去dynamic batcher里去,把requests进行grouping到一个batch,提升gpu的吞吐量。
  • streaming inferece request:进来一个片段就要处理,语音片段,同一语音片段会进去同一个线程的batch。
  • 后端解耦:backend api需要和triton进行解耦,需要些custom backend 也就是C API,同时利用到了dynamic batcher之类的。
  • model analyzer: triton一个附加功能,是一个client,对推理请求扫描,扫描延迟和吞吐量,扫描gpu的显存footprints。

基本5个模块(一个简单的例子)

最基本的5个模块:

  • model repository
  • 配置served model
  • 登录triton server
  • 配置集成模型
  • 发送request去triton server

目录结构

在model_repository目录下装4个二级目录(densenet_onnx, inception_graphdef, resenet50)

第三级目录比如,在desenet_onnx目录下放版本号,放config.pbtxt配置,可能还有带有label.txt。

文件格式

版本目录下可以放很多种格式

model.py (python)

dali (model.dali)

openvino (model.xml, model.bin)

custom (model.so)

torch(model.pt)

onnx (model.onnx)

tensorRT (model.plan)

模型版本和版本目录名一致,推理指定版本号找到模型文件。

config文件的作用

config定义了scheduling策略,batchsize之类的,input,output。还有另外的一个label.txt, 是对于分类模型,用来直接转化为字符串。

启动命令nvi

1
tritonserver --model-repository=/triton/models_repository

不想要编译triton,直接下载triton的镜像,在镜像里去运行。
启动后triton模型库加载成功,看到一个model ready,甚至看到了监听0.0.0.0: 8000端口。

使用 python image_client.py -m resnet50_trt -c 1 -b 1 -a ../pics/ 这样执行推理。

config文件编写

  • 指定模型在平台和backend选择。
  • 还有max_batch_size指定模型推理的batch_size, 别超过gpu显存。
  • 输入tensor叫啥,输出tensor叫啥。
    在server tensorRT,onnx,tensorflow不需要指定config文件,但是pytorch必须指定config文件。对于platform和backend如何填写。

指定platform和backend

tensorTR可以指定tensorrt_plan,或者tensorrt。对于 pytorch指定pytorch_libtorch或者pytorch。

20.1.05必须指定backend是

指定input&output

需要指定input的name,data_type,dims, pytroch必须INPUT__0等等。如果支持可变维度,可以把-1设置到可变维度那一栏。max_batch_size为0,这个时候可以使用reshape{shape:[1,3,224,244]}。

指定policy

也可以指定version_policy: {all{}} 所有版本都serve

可以指定version_policy: latest { num_version: {}}, 指定最新的version

也可以指定特定的版本

指定instance_group

这个指定是利用了对同一个模型开启多个instance_excution,并行执行提高模型吞吐。

1
2
3
4
5
6
7
8
9
10
11
12
13
instance_group [
{
count:2
kind: KIND_CPU
}{
count:1
kind:KIND_GPU
gpus:[0]
}{
count:2
kind:KIND_GPU
gpus:[1, 2] //指定的使用gpu
}

指定策略

如果不在config文件里面配策略,不写dynamic_batching,默认使用default scheduling。

默认的batch_size是多少就是多少。

仅对stateless,和streaming的模型,是没有用的,对于dynamic batcher策略。

1
2
3
4
dynamic_batching {
preferred_batch_size:[4,8]
max_queue_delay_microseconds:100
}

拼的batch_size越大,吞吐量越高,但是delay时间越长。
server端对于比较小的请求,拼接成为比较大的batch去推理。

dynamic参数可以设置。preserve_ordering(先进先出),priority_levels(batch的优先级),queuePolicy队列长度(过长的队列会丢弃请求)

  • 对于sequence Batcher,是针对stateful Model, 就在其他资料上说。
  • 对于emsemble batcher,也是同上。
    还可以设置热身策略, 对每个模型的instance进行热身,进入ready模式。

triton server

1
2
3
4
5
6
7
docker run --gpus all -it --rm \
--shm-size=1g \
-p8000:8000 -p8001:8001 -p8002:8002 \ (8000 http, 8001 grpc, 8002 metric)
-v <host_model_repo>:<container_model_repo> \
nvcr.io/nvidia/tritonserver:21.07-py3

$ tritonserver --model-repository <repo>

启动server

1
curl -v <ip>:8000/v2/health/ready

检查server是否ok
常用的triton server的参数:

–log-verbose 0/1 控制日志

–strict-model-config true/false 控制模型的配置文件是否需要

–strict-readiness 模型库全部模型上线才ready

–exit-on-error 只要有一个模型load失败就fail

–http/grpc/metrics/-port 指定三个重要端口

–model-control-model none/poll/explicit 可以手动/自动/卸载更新模型

–repository-poll-secs 检查模型是否更新频率

–load-model 可以指定加载模型卸载模型

–pinned-memory-pool-byte-size 是cpu效率关键

–cuda-memory-pool-byte-size gpu显存设置

–backend-diretory 指定自己动态so库

–repoagent-directory 加密时有用

send request to Triton Server

grpc和http的协议代码区别很小,以grpc为例子。

1
2
3
4
5
6
import tritonclient.http as httpclient
import tritonclient.grpc as grpcclient
//如果使用grpc
triton_client = grpcclient.inferenceServerClient(url=FLAGS.url, verbose=FLAGS.verbose, concurrency=concurrency)
//如果使用http
triton_client = httpclient.inferenceServerClient(url=FLAGS.url, verbose=FLAGS.verbose, concurrency=concurrency)

获取meta_data,进而获取所有的config,进而继续获得各种参数如
max_batch_size, input_name, output_name, c, h, w, format, dtype等等

框架backend设计理论

简单的说,tirton的自定义backend,在目录下实现一个后端文件,那么triton会自动创建triton inference 实例。除了初始化,执行,结束需要用户手动实现。里面的创建该实例,都是由triton去负责的。这是为了适配各种各样的推理后端框架 pytorch, tensorflow等等。

(实践) Triton + chatglm4 + vllm 单机模式

triton server docker

第一步当然是先docker pull triton的镜像。

使用服务器拉取镜像在封闭受控的环境是非常痛苦的,尤其是服务器上存在各种各样的容器的情况下。

配置docker参考这篇:https://zeonzhuang.com/2024/05/14/ai%E5%B7%A5%E7%A8%8B/dify/#more

这里存放了各种triton server的镜像文件。

https://catalog.ngc.nvidia.com/orgs/nvidia/containers/tritonserver

xx.yy-vllm-python-py3 镜像是有triton的同时,支持python框架后端+vllm,这个是我所青睐的

执行命令:

1
docker pull nvcr.io/nvidia/tritonserver:24.05-vllm-python-py3

完成后,可以进入镜像,执行命令测试triton

1
tritonserver --help

正常的方式是使用正确的命令进去镜像

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
docker run -itd (交互,守护模式)
--name chatglmtest (指定名称)
--gpus all (指定gpu)
--shm-size=1g (共享内存)
--ulimit memlock=-1 (接触内存锁限制)
-p 8000:8000 -p 8001:8001 -p 8002:8002 (映射端口)
--net=host (主机网络共享)
-v /home/server/model_repository:/models (映射模型文件)
--ulimit stack=67108864 (指定堆栈区大小)
nvcr.io/nvidia/tritonserver:23.12-py3 (运行指定镜像)

//进行了些修改,由于我运行的是triton+vllm_chatglm4修改了名称
//由于大量8开头端口占用了,所以该了映射为4开头,但是这里偷懒用了主机共享网络不需要映射端口
//对于目录映射,是triton的重头戏,直接映射triton目录
docker run -itd --name triton_vllm_chatglm4 --gpus all --shm-size=1g --ulimit memlock=-1 -p 8000:4000 -p 8001:4001 -p 8002:4002 --net=host -v /home/zzh/triton:/triton --ulimit stack=67108864 nvcr.io/nvidia/tritonserver:24.05-vllm-python-py3

目录结构

对于目录映射这里提一下,triton运行时,需要指定model_repository

  • 一级目录(server目录):这个models_repository下面会有数个模型目录,qwen2,chatglm4,llama3等等
  • 二级目录(单个模型目录):这个目录自由命名,如chatglm4就这样命名
  • 三级目录 (版本目录,config文件)
  • 四级目录(版本目录下的模型权重目录,work目录下的模型推理文件目录, model.py自定义推理框架python后端)
    根据以上的目录架构理论,我在本地构建了一个

triton目录

->model_repository

->chatglm4

->1

-> glm-4-9b-chat

-> work

-> model.py (后端文件, 使用vllm)

->config.pbtxt

进入成功以后,测试tritonserver成功以后,开始对model.py进行修改。

model推理python后端

对于后端推理,为了使用triton,需要完成TritonPythonModel类的开发。该类对象不是由使用者创建,但是初始化,执行,结束都是由使用者编写。

参考对比

这个函数只有模型被load到显存的时候,才会被调用一次,主要用于读取相关启动参数。

可以通过观察2个不同的模型推理文件来进行对比。

https://github.com/THUDM/GLM-4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from transformers import AutoTokenizer
from vllm import LLM, SamplingParams

# GLM-4-9B-Chat-1M
# max_model_len, tp_size = 1048576, 4
# 如果遇见 OOM 现象,建议减少max_model_len,或者增加tp_size

max_model_len, tp_size = 131072, 1
model_name = "THUDM/glm-4-9b-chat"
prompt = [{"role": "user", "content": "你好"}]
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)



llm = LLM(
model=model_name,
tensor_parallel_size=tp_size,
max_model_len=max_model_len,
trust_remote_code=True,
enforce_eager=True,
# GLM-4-9B-Chat-1M 如果遇见 OOM 现象,建议开启下述参数
# enable_chunked_prefill=True,
# max_num_batched_tokens=8192
)

stop_token_ids = [151329, 151336, 151338]
sampling_params = SamplingParams(temperature=0.95, max_tokens=1024, stop_token_ids=stop_token_ids)
inputs = tokenizer.apply_chat_template(prompt, tokenize=False, add_generation_prompt=True)
outputs = llm.generate(prompts=inputs, sampling_params=sampling_params)
print(outputs[0].outputs[0].text)

也就是vllm封装了LLM类,和SamplingParams类。

  • tokenizer依旧是使用hugging face的库,autoTokenizer去初始化一个tokenizer,然后prompt作为一个数组(元素是dict),会被tokenizer使用apply_chat_template转化成模型可以接受的input。
  • 而这个LLM类会接model_name(仓名或者是本地路径名),ternsor_parallel_size, max_model_len,trust_remote_code, enforce_eager, 创建出一个llm对象,该对象有一个generate函数,就是执行推理的地方,但是该generate函数会接一个prompts,和一个sampling_params作为入参,这个sampling_params就是包含温度,max_tokens, stop_token_ids,所有的heavy job都被vllm给承包了。
    https://blog.csdn.net/cm2010_03_31/article/details/135986638
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
# 设置显存空闲block最大分割阈值
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:32'
# 设置work目录
 
os.environ['TRANSFORMERS_CACHE'] = os.path.dirname(os.path.abspath(__file__))+"/work/"
os.environ['HF_MODULES_CACHE'] = os.path.dirname(os.path.abspath(__file__))+"/work/"
 
import json
 
# triton_python_backend_utils is available in every Triton Python model. You
# need to use this module to create inference requests and responses. It also
# contains some utility functions for extracting information from model_config
# and converting Triton input/output types to numpy types.
import triton_python_backend_utils as pb_utils
import sys
import gc
import time
import logging
import torch
from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM
import numpy as np
 
gc.collect()
torch.cuda.empty_cache()
 
logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                    level=logging.INFO)
 
class TritonPythonModel:
    """Your Python model must use the same class name. Every Python model
    that is created must have "TritonPythonModel" as the class name.
    """
 
    def initialize(self, args):
        """`initialize` is called only once when the model is being loaded.
        Implementing `initialize` function is optional. This function allows
        the model to intialize any state associated with this model.
        Parameters
        ----------
        args : dict
          Both keys and values are strings. The dictionary keys and values are:
          * model_config: A JSON string containing the model configuration
          * model_instance_kind: A string containing model instance kind
          * model_instance_device_id: A string containing model instance device ID
          * model_repository: Model repository path
          * model_version: Model version
          * model_name: Model name
        """
        # You must parse model_config. JSON string is not parsed here
        self.model_config = json.loads(args['model_config'])
        
        output_response_config = pb_utils.get_output_config_by_name(self.model_config, "response")
        output_history_config = pb_utils.get_output_config_by_name(self.model_config, "history")
 
        # Convert Triton types to numpy types
        self.output_response_dtype = pb_utils.triton_string_to_numpy(output_response_config['data_type'])
        self.output_history_dtype = pb_utils.triton_string_to_numpy(output_history_config['data_type'])
        
        ChatGLM_path = os.path.dirname(os.path.abspath(__file__))+"/chatglm3-6b-32k"
        self.tokenizer = AutoTokenizer.from_pretrained(ChatGLM_path, trust_remote_code=True)
        #下面to('cuda:'+args['model_instance_device_id'])这里一定要注意,这里是把实例部署到对应的显卡上,如果不写会分散到所有显卡上或者集中到一个显卡上,都会造成问题
        model = AutoModelForCausalLM.from_pretrained(ChatGLM_path,
                                          torch_dtype=torch.float16,               trust_remote_code=True).half().to('cuda:'+args['model_instance_device_id'])
        self.model = model.eval()
        logging.info("model init success")
        
    def execute(self, requests):
        """`execute` MUST be implemented in every Python model. `execute`
        function receives a list of pb_utils.InferenceRequest as the only
        argument. This function is called when an inference request is made
        for this model. Depending on the batching configuration (e.g. Dynamic
        Batching) used, `requests` may contain multiple requests. Every
        Python model, must create one pb_utils.InferenceResponse for every
        pb_utils.InferenceRequest in `requests`. If there is an error, you can
        set the error argument when creating a pb_utils.InferenceResponse
        Parameters
        ----------
        requests : list
          A list of pb_utils.InferenceRequest
        Returns
        -------
        list
          A list of pb_utils.InferenceResponse. The length of this list must
          be the same as `requests`
          
        """
        output_response_dtype = self.output_response_dtype
        output_history_dtype = self.output_history_dtype
 
        # output_dtype = self.output_dtype
        responses = []
        # Every Python backend must iterate over everyone of the requests
        # and create a pb_utils.InferenceResponse for each of them.
        for request in requests:
            prompt = pb_utils.get_input_tensor_by_name(request, "prompt").as_numpy()[0]
            prompt = prompt.decode('utf-8')
            history_origin = pb_utils.get_input_tensor_by_name(request, "history").as_numpy()
            if len(history_origin) > 0:
                history = np.array([item.decode('utf-8') for item in history_origin]).reshape((-1,2)).tolist()
            else:
                history = []
            temperature = pb_utils.get_input_tensor_by_name(request, "temperature").as_numpy()[0]
            temperature = float(temperature.decode('utf-8'))
            max_token = pb_utils.get_input_tensor_by_name(request, "max_token").as_numpy()[0]
            max_token = int(max_token.decode('utf-8'))
            history_len = pb_utils.get_input_tensor_by_name(request, "history_len").as_numpy()[0]
            history_len = int(history_len.decode('utf-8'))
            
            # 日志输出传入信息
            in_log_info = {
                "in_prompt":prompt,
                "in_history":history,
                "in_temperature":temperature,
                "in_max_token":max_token,
                "in_history_len":history_len
                       }
            logging.info(in_log_info)
            response,history = self.model.chat(self.tokenizer,
                                               prompt,
                                               history=history[-history_len:] if history_len > 0 else [],
                                               max_length=max_token,
                                               temperature=temperature)
            # 日志输出处理后的信息
            out_log_info = {
                "out_response":response,
                "out_history":history
                       }
            logging.info(out_log_info)
            response = np.array(response)
            history = np.array(history)
            
            response_output_tensor = pb_utils.Tensor("response",response.astype(self.output_response_dtype))
            history_output_tensor = pb_utils.Tensor("history",history.astype(self.output_history_dtype))
 
            final_inference_response = pb_utils.InferenceResponse(output_tensors=[response_output_tensor,history_output_tensor])
            responses.append(final_inference_response)
            # Create InferenceResponse. You can set an error here in case
            # there was a problem with handling this inference request.
            # Below is an example of how you can set errors in inference
            # response:
            #
            # pb_utils.InferenceResponse(
            #    output_tensors=..., TritonError("An error occured"))
 
        # You should return a list of pb_utils.InferenceResponse. Length
        # of this list must match the length of `requests` list.
        return responses
 
    def finalize(self):
        """`finalize` is called only once when the model is being unloaded.
        Implementing `finalize` function is OPTIONAL. This function allows
        the model to perform any necessary clean ups before exit.
        """
        print('Cleaning up...')
  • 在初始化的时候,这里的代码读取了model_config, 然后使用了pb_utils进而读取了triton的config文件,最重要的两步是吧tokenizer,和llm创建出来后,把llm通过to(‘cuda:’+args[‘model_instance_device_id’])放置在应该的gpu上面去。
  • 而这里的execute函数,会在triton server接收到了pb_util.InferenceRequest数组之后对他们进行处理。然后返回pb_util.InferenceRespond数组作为输出。处理的过程中,需要对每个request进行遍历,构建出重要的四个参数,温度,历史对话,max_token。接着通过llm的接口,得到response的参数后,组装出inferenceResponse。

triton + vllm 代码

在服务器上,使用vscode remote插件连接远程gpu服务器后,将model.py写好部署。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import os
'''
设置显存空闲block最大分割阈值
max_split_size_mb:32 表示 PyTorch 将显存中空闲块的最大分割阈值设置为 32 MB。
这意味着,当有大于 32 MB 的空闲显存块时,PyTorch 可能会将其分割成更小的块,以便更灵活地分配和管理显存。
'''
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:32'

# 设置work目录, 指定transformer库的缓存目录
os.environ['TRANSFORMERS_CACHE'] = os.path.dirname(os.path.abspath(__file__))+"/work/"
os.environ['HF_MODULES_CACHE'] = os.path.dirname(os.path.abspath(__file__))+"/work/"

import json
 
# triton_python_backend_utils is available in every Triton Python model. You
# need to use this module to create inference requests and responses. It also
# contains some utility functions for extracting information from model_config
# and converting Triton input/output types to numpy types.
import triton_python_backend_utils as pb_utils
import sys
import gc
import time
import logging
import torch
from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM
import numpy as np

# 使用vllm进行加速
from vllm import LLM, SamplingParams

# 强制垃圾回收,可以确保尽可能多地释放内存资源,减少内存使用峰值。
gc.collect()

# 由于我只在4号卡上运行,不需要这。
# torch.cuda.empty_cache()
 
logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                    level=logging.INFO)
 
class TritonPythonModel:
    """Your Python model must use the same class name. Every Python model
    that is created must have "TritonPythonModel" as the class name.
    """
 
    def initialize(self, args):
        """`initialize` is called only once when the model is being loaded.
        Implementing `initialize` function is optional. This function allows
        the model to intialize any state associated with this model.
        Parameters
        ----------
        args : dict
          Both keys and values are strings. The dictionary keys and values are:
          * model_config: A JSON string containing the model configuration
          * model_instance_kind: A string containing model instance kind
          * model_instance_device_id: A string containing model instance device ID
          * model_repository: Model repository path
          * model_version: Model version
          * model_name: Model name
        """
        # You must parse model_config. JSON string is not parsed here
        self.model_config = json.loads(args['model_config'])
        
        output_response_config = pb_utils.get_output_config_by_name(self.model_config, "response")
        output_history_config = pb_utils.get_output_config_by_name(self.model_config, "history")
 
        # Convert Triton types to numpy types
        self.output_response_dtype = pb_utils.triton_string_to_numpy(output_response_config['data_type'])
        self.output_history_dtype = pb_utils.triton_string_to_numpy(output_history_config['data_type'])
        
        # 加载模型本身
        ChatGLM_path = os.path.dirname(os.path.abspath(__file__))+"/glm-4-9b-chat"
        #self.tokenizer = AutoTokenizer.from_pretrained(ChatGLM_path, trust_remote_code=True)
        #下面to('cuda:'+args['model_instance_device_id'])把实例部署到对应的显卡上,不写会分散到所有显卡上或者集中到一个显卡上
        # vllm初始化
        max_model_len, tp_size = 20400, 1
        self.model = LLM(model=ChatGLM_path,
                    tokenizer=ChatGLM_path,
                    tensor_parallel_size=tp_size,
                    dtype='float16',
                    max_model_len=max_model_len,
                    enforce_eager=True,
                    trust_remote_code=True)
        # 我需要它在指定的卡上运行
        # self.model = model.eval()

        self.stop_token_ids = [151329, 151336, 151338]
        # samplingParams不需要,post请求会带上这些参数

        logging.info("model init success")
        
    def execute(self, requests):
        """`execute` MUST be implemented in every Python model. `execute`
        function receives a list of pb_utils.InferenceRequest as the only
        argument. This function is called when an inference request is made
        for this model. Depending on the batching configuration (e.g. Dynamic
        Batching) used, `requests` may contain multiple requests. Every
        Python model, must create one pb_utils.InferenceResponse for every
        pb_utils.InferenceRequest in `requests`. If there is an error, you can
        set the error argument when creating a pb_utils.InferenceResponse
        Parameters
        ----------
        requests : list
          A list of pb_utils.InferenceRequest
        Returns
        -------
        list
          A list of pb_utils.InferenceResponse. The length of this list must
          be the same as `requests`
          
        """
        output_response_dtype = self.output_response_dtype
        output_history_dtype = self.output_history_dtype
 
        # output_dtype = self.output_dtype
        responses = []
        # Every Python backend must iterate over everyone of the requests
        # and create a pb_utils.InferenceResponse for each of them.

        for request in requests:
            prompt = pb_utils.get_input_tensor_by_name(request, "prompt").as_numpy()[0]
            prompt = prompt.decode('utf-8')
            history_origin = pb_utils.get_input_tensor_by_name(request, "history").as_numpy()
            if len(history_origin) > 0:
                history = np.array([item.decode('utf-8') for item in history_origin]).reshape((-1,2)).tolist()
            else:
                history = []
            temperature = pb_utils.get_input_tensor_by_name(request, "temperature").as_numpy()[0]
            temperature = float(temperature.decode('utf-8'))
            max_token = pb_utils.get_input_tensor_by_name(request, "max_token").as_numpy()[0]
            max_token = int(max_token.decode('utf-8'))
            history_len = pb_utils.get_input_tensor_by_name(request, "history_len").as_numpy()[0]
            history_len = int(history_len.decode('utf-8'))
            
            # 日志输出传入信息
            in_log_info = {
                "in_prompt":prompt,
                "in_history":history,
                "in_temperature":temperature,
                "in_max_token":max_token,
                "in_history_len":history_len
                       }
            logging.info(in_log_info)

            # 执行推理
            sampling_params = SamplingParams(temperature=temperature, max_tokens=max_token, stop_token_ids=self.stop_token_ids)
            response = self.model.generate(prompts=prompt,sampling_params=sampling_params)

            # 日志输出处理后的信息
            out_log_info = {
                "out_response":response,
                "out_history":history
                       }
            logging.info(out_log_info)
            response = np.array(response)
            # history = np.array(history)
            
            response_output_tensor = pb_utils.Tensor("response",response.astype(self.output_response_dtype))
            # history_output_tensor = pb_utils.Tensor("history",history.astype(self.output_history_dtype))
 
            #  final_inference_response = pb_utils.InferenceResponse(output_tensors=[response_output_tensor,history_output_tensor])
            final_inference_response = pb_utils.InferenceResponse(output_tensors=[response_output_tensor])
            responses.append(final_inference_response)
            # Create InferenceResponse. You can set an error here in case
            # there was a problem with handling this inference request.
            # Below is an example of how you can set errors in inference
            # response:
            #
            # pb_utils.InferenceResponse(
            #    output_tensors=..., TritonError("An error occured"))
 
        # You should return a list of pb_utils.InferenceResponse. Length
        # of this list must match the length of `requests` list.
        return responses
 
    def finalize(self):
        """`finalize` is called only once when the model is being unloaded.
        Implementing `finalize` function is OPTIONAL. This function allows
        the model to perform any necessary clean ups before exit.
        """
        print('Cleaning up...')

常见问题

  • oom问题: 这个时候和指定gpu有关,9b的模型实测跑起来使用float16的话,大概指定pt数目为1(单卡部署),如果还是不够,则需要缩短上下文长度,glm4-9b-chat支持128k,但是内存不构,连40K跑起来都无法装入一整张32GV100, 最后博主缩短到20K(20400)的情况下,才可以运行,大概静态显存是占用23G。
  • gpu指定的问题: 在启动docker镜像的时候如果是其他gpu已经被占用的情况下(博主的情况只能使用1块32G的V100),使用这个docker参数,-gpus=’”device=4”,这里的指定让代码无需去指定。
  • vllm构造问题:vllm一些问题,最好使用vllm的llm接口,同时把tokenizer一并传入。
    1
    2
    3
    4
    5
    6
    7
    8
      max_model_len, tp_size = 20400, 1
      self.model = LLM(model=ChatGLM_path,
                        tokenizer=ChatGLM_path,
                        tensor_parallel_size=tp_size,
                        dtype='float16',
                        max_model_len=max_model_len,
                        enforce_eager=True,
                        trust_remote_code=True)

一些总结和理解

triton的下一层各种backend是可以兼容,比如vllm加速推理。

triton的更上一层是可以用k8s抽象管理的。

参考

https://docs.vllm.ai/en/latest/dev/offline_inference/llm.html#

https://blog.csdn.net/cm2010_03_31/article/details/135986638

https://github.com/THUDM/GLM-4