测试
性能测试API暂不稳定。需要收集更多意见。
此部分的API在0.3.1b1版本有了破坏性变更;
torchpipe.utils.test 集成了多客户端/多线程性能测试和并发推理工具。 通常情况下,我们推荐功能更强大的locust,然而,如果你只需要一个简单的脚本,去方便的测本地函数和远程函数调用,并给出测试报告,或者并发的调用函数并处理结果,可以用此工具。
locust | torchpipe.utils.test |
---|---|
基于事件驱动,可以支持更高的并发数, 可分布式 | 基于本地线程,并发数有限,不支持分布式 |
拥有web端模式 | 无 |
支持扩展 | 支持扩展 |
locust -f target.py | python函数 |
测试
torchpipe.utils.test.test_from_raw_file
def test_from_raw_file(forward_function:
Callable[[List[tuple[str, bytes]]]] |
List[Callable[[List[tuple[str, bytes]]]]],
file_dir: str,
num_clients=10,
batch_size=1,
total_number=10000,
num_preload=1000,
recursive=True,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG'])
读取给定文件路径下的指定后缀名的文件,并发送入用户给定的
forward_function
中。
- forward_function - 前向函数, 函数参数为
List[Tuple[str, bytes]]
,代表batchsize[(id,文件数据)]。测试程序将反复执行此函数,并记录此函数的延迟相关数据。 也可以为多个此类函数(List[Callable]
), 此时长度需要为num_clients. - file_dir - 文件路径
- num_clients - 客户端数目/并发路数
- batch_size 每次请求的数据batch大小, 代表执行一次
forward_function
所消耗的测试数据量 - total_number 总共的数据量。仅当
num_preload>0
时有效。 - num_preload 预读取文件数目。当
num_preload<=0
时,将遍历所有文件;当num_preload>0
时,将预先读取这个数量的数据,其他数据将被舍弃。当0<num_preload<total_number
时,会循环送入数据。 - recursive 是否递归读取目录下的文件
- ext - 文件后缀
示例
from typing import List,Tuple
import torchpipe as tp
from torchpipe.utils.test import test_from_raw_file
model = tp.pipe({"backend":"DecodeMat","instance_num":10})
def forward_function(files : List[Tuple[str,bytes]]):
inputs = []
for file_path, file_bytes in files:
input = {"data": file_bytes}
inputs.append(input)
model(inputs)
for input in inputs:
result = input["result"]
# 如果需要保存结果,可将(img_path, result)保存在全局Queue中
## 预读取1000张图片测试速度
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)
## 遍历全部数据
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,num_preload=0)
Result
Project | Value |
---|---|
tool's version | 20230421.0 |
num_clients | 10 |
total_number | 10000 |
throughput::qps | 1642.34 |
throughput::avg | 6.09(ms) |
latency::TP50 | 5.17 |
latency::TP90 | 7.56 |
latency::TP99 | 21.88 |
latency::avg | 6.08 |
-50,-40,-20,-10,-1 | 24.82,25.84,29.37,30.6,50.34 |
torchpipe.utils.test.throughput
从0.3.2rc2版本开始生效
def throughput(args_dict_or_str, num_clients=10, total_number=10000)
给定模型参数,测试该模型在torchpipe的吞吐,torchpipe的默认配置max=4,instance=2,模型默认采用tensorrt加速。
- args_dict_or_str - 测试模型参数,支持dict和str两种类型,dict类型参数请参考TensorrtTensor算子参数进行设置,必须包含关键key:
model
。 str类型参数为模型的路径,支持.onnx和.trt两种类型。dict参数的好处在于可以控制更多的参数,比如precision
,max_batch_size
等。str则均采用默认值。 - num_clients - 客户端数目/并发路数,默认为10。
- total_number 总共的数据量,默认为10000。
示例代码
import torchpipe
import torch
import os
## test throughput
onnx_path = os.path.join("/tmp", f"resnet50.onnx")
precision = "fp16"
dict_args = {
'model' : onnx_path,
'precision' : precision,
}
# type1: 采用dict方式
result = torchpipe.utils.test.throughput(dict_args, num_clients=5, total_number=5000)
# type2: 采用str方式
result = torchpipe.utils.test.throughput(onnx_path, num_clients=5, total_number=5000)
torchpipe.utils.test.test_function
def test_function(forward_function:Union[Callable,List[Callable]],
num_clients=10,
batch_size=1,
total_number=10000)
重复执行用户给定的
forward_function
函数。
- forward_function - 前向函数。测试程序将反复执行此函数,并记录此函数的延迟相关数据。 也可以为List
[Function]
, 此时其长度需要为num_clients. - num_clients - 客户端数目/并发路数
- batch_size 每次请求的数据batch大小, 代表执行一次
forward_function
所消耗的测试数据量 - total_number 总共的数据量。
示例
import torchpipe as tp
from torchpipe.utils.test import test_function
model = tp.pipe({"backend":"DecodeMat","instance_num":10})
file_path = "assets/norm_jpg/dog.jpg"
with open(file_path, 'rb') as f:
file_bytes=f.read()
def run():
input = {"data": file_bytes}
model(input)
assert(input["result"].shape==(576, 768, 3))
torchpipe.utils.test.test_function(run, num_clients=5, batch_size=1,total_number=1000)
torchpipe.utils.test.test
def torchpipe.utils.test.test(sample :Union[Sampler, List[Sampler]],
total_number=10000):
模拟多个客户端执行
sample
指定的计算并统计计算结果。
- sample - 客户端数据生成器,它的个数就代表客户端的数量,需要定义为以下类的子类:
class torchpipe.utils.test.Sampler:
def __init__(self):
pass
def __call__(self, start_index: int) -> None:
raise NotImplementedError
def batchsize(self):
return 1
- total_number - 将计算的总数据量。系统每执行一次
Sampler().__call__(start_index)
, 数据量为Sampler().batchsize()
。调用时满足0 <= start_index <= total_number - Sampler.batchsize()
.
torchpipe.utils.test.RandomSampler
从传入的数据(List)中随机选取batch_size的数据。需要实现其
forward
函数作为真正的计算部分。
class torchpipe.utils.test.RandomSampler(Sampler):
def __init__(self, data_source: List, batch_size=1):
super().__init__()
self.data_source = data_source
self.batch_size = batch_size
assert(0 < len(self.data_source))
for i in range(batch_size):
if len(self.data_source) < batch_size:
self.data_source.append(self.data_source[i])
def __call__(self, start_index: int):
data = random.sample(self.data_source, self.batch_size)
self.forward(data)
def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")
def batchsize(self):
return self.batch_size
示例
def test_function_from_preload(forward_function,
file_dir:str,
num_clients=10,
batch_size=1,
total_number=10000,
recursive=True,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
from torchpipe.utils.test.Sampler import RandomSampler, preload
data = preload(file_dir=file_dir, recursive=recursive, max_number=1000, ext=ext)
assert(len(data) > 0)
forwards = [RandomSampler(data, batch_size) for i in range(num_clients)]
for m in forwards:
m.forward = forward_function
torchpipe.utils.test.test(forwards, total_number)
torchpipe.utils.test.LoopSampler
对传入的数据(List)全量按顺序的送入计算。与SequentialSampler不同的是,允许
start_index > len(self.data) - self.batch_size
, 也就是允许total_number > len(self.data)
.
class LoopSampler(Sampler):
def __init__(self, data: List, batch_size=1):
super().__init__()
self.data = data
self.batch_size = batch_size
assert(len(data) >= batch_size)
self.length = len(data)
for i in range(batch_size):
self.data.append(data[i])
def __call__(self, start_index: int) -> None:
start_index = start_index%(self.length)
data = self.data[start_index: start_index+self.batch_size]
self.forward(data)
def batchsize(self):
return self.batch_size
def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")
例子
示例:遍历目录下的文件
以下演示了如何将文件夹下的所有目标文件遍历一遍并获取结果。
import torchpipe
class FileSampler(torchpipe.utils.test.LoopSampler):
def __init__(self, data: List, batch_size=1):
super.__init__(data, batch_size)
self.local_result = {}
def forward(self, data: List):
raw_bytes = []
for file_path in data:
with open(file_path, "rb") as f:
raw_bytes.append((file_path, f.read()))
self.handle_data(raw_bytes)
def handle_data(self, raw_bytes):
raise RuntimeError("Requires users to implement this function")
def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]
forwards = [FileSampler(files, i, batch_size) for i in range(num_clients)]
torchpipe.utils.test.test(forwards, len(files))
用户可以把结果临时保存在self.local_result
中,最终进行汇总,也可以在运行时实时校验数据。
示例:thrift
首先,根据实际使用的thrift协议定义其数据前向函数:
import torchpipe
class ThriftSampler(torchpipe.utils.test.FileSampler):
def __init__(self, data: List, host, port,batch_size=1, ):
super.__init__(data, batch_size)
self.local_result = {}
from serve import InferenceService
from serve.ttypes import InferenceParams
self.InferenceParams = InferenceParams
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
self.transport = TSocket.TSocket(host, port)
self.transport = TTransport.TBufferedTransport(self.transport)
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = InferenceService.Client(self.protocol)
# Connect!
self.transport.open()
self.client.ping()
def handle_data(self, raw_bytes: List[Tuple[str, bytes]]):
return self.client.infer_batch([self.InferenceParams(*x) for x in raw_bytes])
def __del__(self):
self.transport.close()
如果进行速度测试,可以预读取图片:
# for test speed
def test_function_from_preload():
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]
forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]
forward_methods = [x.handle_data for x in forwards]
torchpipe.uitls.test.test_from_raw_file(forward_methods, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)
否则,可以考虑全量数据测试:
def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]
forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]
torchpipe.utils.test.test(forwards, len(files))
不同batchsize的客户端
在此处例子中,我们使用十个客户端,客户端每次请求的数据量分别为1,2,3,...10。我们验证了在这种情况下结果的一致性。 通常来讲,用户可遍历一个目录中的所有数据,并多次重复发送,以验证结果的稳定性和一致性。
如果模型本身batchsize>1,tensorrt推理结果可能会有少量差异(输入数据的batchsize不同时,模型选取的优化方法可能不同);如果batchsize==1,这种差异不应该存在,然而重新生成模型后,推理结果仍然有可能变化。