import os # 设置显存空闲block最大分割阈值 os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:32' # 设置work目录 os.environ['TRANSFORMERS_CACHE'] = os.path.dirname(os.path.abspath(__file__))+"/work/" os.environ['HF_MODULES_CACHE'] = os.path.dirname(os.path.abspath(__file__))+"/work/" import json # triton_python_backend_utils is available in every Triton Python model. You # need to use this module to create inference requests and responses. It also # contains some utility functions for extracting information from model_config # and converting Triton input/output types to numpy types. import triton_python_backend_utils as pb_utils import sys import gc import time import logging import torch from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM import numpy as np gc.collect() torch.cuda.empty_cache() logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) class TritonPythonModel: """Your Python model must use the same class name. Every Python model that is created must have "TritonPythonModel" as the class name. """ def initialize(self, args): """`initialize` is called only once when the model is being loaded. Implementing `initialize` function is optional. This function allows the model to intialize any state associated with this model. Parameters ---------- args : dict Both keys and values are strings. The dictionary keys and values are: * model_config: A JSON string containing the model configuration * model_instance_kind: A string containing model instance kind * model_instance_device_id: A string containing model instance device ID * model_repository: Model repository path * model_version: Model version * model_name: Model name """ # You must parse model_config. JSON string is not parsed here self.model_config = json.loads(args['model_config']) output_response_config = pb_utils.get_output_config_by_name(self.model_config, "response") output_history_config = pb_utils.get_output_config_by_name(self.model_config, "history") # Convert Triton types to numpy types self.output_response_dtype = pb_utils.triton_string_to_numpy(output_response_config['data_type']) self.output_history_dtype = pb_utils.triton_string_to_numpy(output_history_config['data_type']) ChatGLM_path = os.path.dirname(os.path.abspath(__file__))+"/chatglm3-6b-32k" self.tokenizer = AutoTokenizer.from_pretrained(ChatGLM_path, trust_remote_code=True) #下面to('cuda:'+args['model_instance_device_id'])这里一定要注意,这里是把实例部署到对应的显卡上,如果不写会分散到所有显卡上或者集中到一个显卡上,都会造成问题 model = AutoModelForCausalLM.from_pretrained(ChatGLM_path, torch_dtype=torch.float16, trust_remote_code=True).half().to('cuda:'+args['model_instance_device_id']) self.model = model.eval() logging.info("model init success") def execute(self, requests): """`execute` MUST be implemented in every Python model. `execute` function receives a list of pb_utils.InferenceRequest as the only argument. This function is called when an inference request is made for this model. Depending on the batching configuration (e.g. Dynamic Batching) used, `requests` may contain multiple requests. Every Python model, must create one pb_utils.InferenceResponse for every pb_utils.InferenceRequest in `requests`. If there is an error, you can set the error argument when creating a pb_utils.InferenceResponse Parameters ---------- requests : list A list of pb_utils.InferenceRequest Returns ------- list A list of pb_utils.InferenceResponse. The length of this list must be the same as `requests` """ output_response_dtype = self.output_response_dtype output_history_dtype = self.output_history_dtype # output_dtype = self.output_dtype responses = [] # Every Python backend must iterate over everyone of the requests # and create a pb_utils.InferenceResponse for each of them. for request in requests: prompt = pb_utils.get_input_tensor_by_name(request, "prompt").as_numpy()[0] prompt = prompt.decode('utf-8') history_origin = pb_utils.get_input_tensor_by_name(request, "history").as_numpy() if len(history_origin) > 0: history = np.array([item.decode('utf-8') for item in history_origin]).reshape((-1,2)).tolist() else: history = [] temperature = pb_utils.get_input_tensor_by_name(request, "temperature").as_numpy()[0] temperature = float(temperature.decode('utf-8')) max_token = pb_utils.get_input_tensor_by_name(request, "max_token").as_numpy()[0] max_token = int(max_token.decode('utf-8')) history_len = pb_utils.get_input_tensor_by_name(request, "history_len").as_numpy()[0] history_len = int(history_len.decode('utf-8')) # 日志输出传入信息 in_log_info = { "in_prompt":prompt, "in_history":history, "in_temperature":temperature, "in_max_token":max_token, "in_history_len":history_len } logging.info(in_log_info) response,history = self.model.chat(self.tokenizer, prompt, history=history[-history_len:] if history_len > 0 else [], max_length=max_token, temperature=temperature) # 日志输出处理后的信息 out_log_info = { "out_response":response, "out_history":history } logging.info(out_log_info) response = np.array(response) history = np.array(history) response_output_tensor = pb_utils.Tensor("response",response.astype(self.output_response_dtype)) history_output_tensor = pb_utils.Tensor("history",history.astype(self.output_history_dtype)) final_inference_response = pb_utils.InferenceResponse(output_tensors=[response_output_tensor,history_output_tensor]) responses.append(final_inference_response) # Create InferenceResponse. You can set an error here in case # there was a problem with handling this inference request. # Below is an example of how you can set errors in inference # response: # # pb_utils.InferenceResponse( # output_tensors=..., TritonError("An error occured")) # You should return a list of pb_utils.InferenceResponse. Length # of this list must match the length of `requests` list. return responses def finalize(self): """`finalize` is called only once when the model is being unloaded. Implementing `finalize` function is OPTIONAL. This function allows the model to perform any necessary clean ups before exit. """ print('Cleaning up...')
import json # triton_python_backend_utils is available in every Triton Python model. You # need to use this module to create inference requests and responses. It also # contains some utility functions for extracting information from model_config # and converting Triton input/output types to numpy types. import triton_python_backend_utils as pb_utils import sys import gc import time import logging import torch from transformers import AutoTokenizer, AutoModel, AutoModelForCausalLM import numpy as np
# 使用vllm进行加速 from vllm import LLM, SamplingParams
# 强制垃圾回收,可以确保尽可能多地释放内存资源,减少内存使用峰值。 gc.collect()
# 由于我只在4号卡上运行,不需要这。 # torch.cuda.empty_cache() logging.basicConfig(format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s', level=logging.INFO) class TritonPythonModel: """Your Python model must use the same class name. Every Python model that is created must have "TritonPythonModel" as the class name. """ def initialize(self, args): """`initialize` is called only once when the model is being loaded. Implementing `initialize` function is optional. This function allows the model to intialize any state associated with this model. Parameters ---------- args : dict Both keys and values are strings. The dictionary keys and values are: * model_config: A JSON string containing the model configuration * model_instance_kind: A string containing model instance kind * model_instance_device_id: A string containing model instance device ID * model_repository: Model repository path * model_version: Model version * model_name: Model name """ # You must parse model_config. JSON string is not parsed here self.model_config = json.loads(args['model_config']) output_response_config = pb_utils.get_output_config_by_name(self.model_config, "response") output_history_config = pb_utils.get_output_config_by_name(self.model_config, "history") # Convert Triton types to numpy types self.output_response_dtype = pb_utils.triton_string_to_numpy(output_response_config['data_type']) self.output_history_dtype = pb_utils.triton_string_to_numpy(output_history_config['data_type']) # 加载模型本身 ChatGLM_path = os.path.dirname(os.path.abspath(__file__))+"/glm-4-9b-chat" #self.tokenizer = AutoTokenizer.from_pretrained(ChatGLM_path, trust_remote_code=True) #下面to('cuda:'+args['model_instance_device_id'])把实例部署到对应的显卡上,不写会分散到所有显卡上或者集中到一个显卡上 # vllm初始化 max_model_len, tp_size = 20400, 1 self.model = LLM(model=ChatGLM_path, tokenizer=ChatGLM_path, tensor_parallel_size=tp_size, dtype='float16', max_model_len=max_model_len, enforce_eager=True, trust_remote_code=True) # 我需要它在指定的卡上运行 # self.model = model.eval()
logging.info("model init success") def execute(self, requests): """`execute` MUST be implemented in every Python model. `execute` function receives a list of pb_utils.InferenceRequest as the only argument. This function is called when an inference request is made for this model. Depending on the batching configuration (e.g. Dynamic Batching) used, `requests` may contain multiple requests. Every Python model, must create one pb_utils.InferenceResponse for every pb_utils.InferenceRequest in `requests`. If there is an error, you can set the error argument when creating a pb_utils.InferenceResponse Parameters ---------- requests : list A list of pb_utils.InferenceRequest Returns ------- list A list of pb_utils.InferenceResponse. The length of this list must be the same as `requests` """ output_response_dtype = self.output_response_dtype output_history_dtype = self.output_history_dtype # output_dtype = self.output_dtype responses = [] # Every Python backend must iterate over everyone of the requests # and create a pb_utils.InferenceResponse for each of them.
for request in requests: prompt = pb_utils.get_input_tensor_by_name(request, "prompt").as_numpy()[0] prompt = prompt.decode('utf-8') history_origin = pb_utils.get_input_tensor_by_name(request, "history").as_numpy() if len(history_origin) > 0: history = np.array([item.decode('utf-8') for item in history_origin]).reshape((-1,2)).tolist() else: history = [] temperature = pb_utils.get_input_tensor_by_name(request, "temperature").as_numpy()[0] temperature = float(temperature.decode('utf-8')) max_token = pb_utils.get_input_tensor_by_name(request, "max_token").as_numpy()[0] max_token = int(max_token.decode('utf-8')) history_len = pb_utils.get_input_tensor_by_name(request, "history_len").as_numpy()[0] history_len = int(history_len.decode('utf-8')) # 日志输出传入信息 in_log_info = { "in_prompt":prompt, "in_history":history, "in_temperature":temperature, "in_max_token":max_token, "in_history_len":history_len } logging.info(in_log_info)
# 日志输出处理后的信息 out_log_info = { "out_response":response, "out_history":history } logging.info(out_log_info) response = np.array(response) # history = np.array(history) response_output_tensor = pb_utils.Tensor("response",response.astype(self.output_response_dtype)) # history_output_tensor = pb_utils.Tensor("history",history.astype(self.output_history_dtype)) # final_inference_response = pb_utils.InferenceResponse(output_tensors=[response_output_tensor,history_output_tensor]) final_inference_response = pb_utils.InferenceResponse(output_tensors=[response_output_tensor]) responses.append(final_inference_response) # Create InferenceResponse. You can set an error here in case # there was a problem with handling this inference request. # Below is an example of how you can set errors in inference # response: # # pb_utils.InferenceResponse( # output_tensors=..., TritonError("An error occured")) # You should return a list of pb_utils.InferenceResponse. Length # of this list must match the length of `requests` list. return responses def finalize(self): """`finalize` is called only once when the model is being unloaded. Implementing `finalize` function is OPTIONAL. This function allows the model to perform any necessary clean ups before exit. """ print('Cleaning up...')