Skip to main content

测试

验证阶段

性能测试API暂不稳定。需要收集更多意见。

此部分的API在0.3.1b1版本有了破坏性变更;

torchpipe.utils.test 集成了多客户端/多线程性能测试和并发推理工具。 通常情况下,我们推荐功能更强大的locust,然而,如果你只需要一个简单的脚本,去方便的测本地函数和远程函数调用,并给出测试报告,或者并发的调用函数并处理结果,可以用此工具。

locusttorchpipe.utils.test
基于事件驱动,可以支持更高的并发数, 可分布式基于本地线程,并发数有限,不支持分布式
拥有web端模式
支持扩展支持扩展
locust -f target.pypython函数

测试

torchpipe.utils.test.test_from_raw_file

def test_from_raw_file(forward_function: 
Callable[[List[tuple[str, bytes]]]] |
List[Callable[[List[tuple[str, bytes]]]]],
file_dir: str,
num_clients=10,
batch_size=1,
total_number=10000,
num_preload=1000,
recursive=True,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG'])

读取给定文件路径下的指定后缀名的文件,并发送入用户给定的forward_function中。

参数
  • forward_function - 前向函数, 函数参数为List[Tuple[str, bytes]],代表batchsize[(id,文件数据)]。测试程序将反复执行此函数,并记录此函数的延迟相关数据。 也可以为多个此类函数(List[Callable]), 此时长度需要为num_clients.
  • file_dir - 文件路径
  • num_clients - 客户端数目/并发路数
  • batch_size 每次请求的数据batch大小, 代表执行一次forward_function所消耗的测试数据量
  • total_number 总共的数据量。仅当num_preload>0时有效。
  • num_preload 预读取文件数目。当num_preload<=0时,将遍历所有文件;当num_preload>0时,将预先读取这个数量的数据,其他数据将被舍弃。当0<num_preload<total_number时,会循环送入数据。
  • recursive 是否递归读取目录下的文件
  • ext - 文件后缀
示例
from typing import List,Tuple
import torchpipe as tp
from torchpipe.utils.test import test_from_raw_file

model = tp.pipe({"backend":"DecodeMat","instance_num":10})

def forward_function(files : List[Tuple[str,bytes]]):
inputs = []
for file_path, file_bytes in files:
input = {"data": file_bytes}
inputs.append(input)
model(inputs)

for input in inputs:
result = input["result"]
# 如果需要保存结果,可将(img_path, result)保存在全局Queue中

## 预读取1000张图片测试速度
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)
## 遍历全部数据
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,num_preload=0)
Result
ProjectValue
tool's version20230421.0
num_clients10
total_number10000
throughput::qps1642.34
throughput::avg6.09(ms)
latency::TP505.17
latency::TP907.56
latency::TP9921.88
latency::avg6.08
-50,-40,-20,-10,-124.82,25.84,29.37,30.6,50.34

torchpipe.utils.test.throughput

从0.3.2rc2版本开始生效

def throughput(args_dict_or_str, num_clients=10, total_number=10000)

给定模型参数,测试该模型在torchpipe的吞吐,torchpipe的默认配置max=4,instance=2,模型默认采用tensorrt加速。

参数
  • args_dict_or_str - 测试模型参数,支持dict和str两种类型,dict类型参数请参考TensorrtTensor算子参数进行设置,必须包含关键key:model。 str类型参数为模型的路径,支持.onnx和.trt两种类型。dict参数的好处在于可以控制更多的参数,比如precisionmax_batch_size等。str则均采用默认值。
  • num_clients - 客户端数目/并发路数,默认为10。
  • total_number 总共的数据量,默认为10000。
示例代码
import torchpipe
import torch
import os

## test throughput
onnx_path = os.path.join("/tmp", f"resnet50.onnx")
precision = "fp16"
dict_args = {
'model' : onnx_path,
'precision' : precision,
}

# type1: 采用dict方式
result = torchpipe.utils.test.throughput(dict_args, num_clients=5, total_number=5000)

# type2: 采用str方式
result = torchpipe.utils.test.throughput(onnx_path, num_clients=5, total_number=5000)

torchpipe.utils.test.test_function

def test_function(forward_function:Union[Callable,List[Callable]], 
num_clients=10,
batch_size=1,
total_number=10000)

重复执行用户给定的forward_function函数。

参数
  • forward_function - 前向函数。测试程序将反复执行此函数,并记录此函数的延迟相关数据。 也可以为List[Function], 此时其长度需要为num_clients.
  • num_clients - 客户端数目/并发路数
  • batch_size 每次请求的数据batch大小, 代表执行一次forward_function所消耗的测试数据量
  • total_number 总共的数据量。
示例
import torchpipe as tp
from torchpipe.utils.test import test_function

model = tp.pipe({"backend":"DecodeMat","instance_num":10})

file_path = "assets/norm_jpg/dog.jpg"
with open(file_path, 'rb') as f:
file_bytes=f.read()

def run():
input = {"data": file_bytes}
model(input)
assert(input["result"].shape==(576, 768, 3))

torchpipe.utils.test.test_function(run, num_clients=5, batch_size=1,total_number=1000)

torchpipe.utils.test.test

def torchpipe.utils.test.test(sample :Union[Sampler, List[Sampler]],
total_number=10000):

模拟多个客户端执行sample指定的计算并统计计算结果。

参数
  • sample - 客户端数据生成器,它的个数就代表客户端的数量,需要定义为以下类的子类:
class torchpipe.utils.test.Sampler:
def __init__(self):
pass
def __call__(self, start_index: int) -> None:
raise NotImplementedError

def batchsize(self):
return 1
  • total_number - 将计算的总数据量。系统每执行一次Sampler().__call__(start_index), 数据量为Sampler().batchsize()。调用时满足0 <= start_index <= total_number - Sampler.batchsize().

torchpipe.utils.test.RandomSampler

从传入的数据(List)中随机选取batch_size的数据。需要实现其forward函数作为真正的计算部分。

class torchpipe.utils.test.RandomSampler(Sampler):
def __init__(self, data_source: List, batch_size=1):
super().__init__()
self.data_source = data_source
self.batch_size = batch_size

assert(0 < len(self.data_source))
for i in range(batch_size):
if len(self.data_source) < batch_size:
self.data_source.append(self.data_source[i])


def __call__(self, start_index: int):
data = random.sample(self.data_source, self.batch_size)
self.forward(data)

def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")

def batchsize(self):
return self.batch_size
示例
def test_function_from_preload(forward_function, 
file_dir:str,
num_clients=10,
batch_size=1,
total_number=10000,
recursive=True,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
from torchpipe.utils.test.Sampler import RandomSampler, preload
data = preload(file_dir=file_dir, recursive=recursive, max_number=1000, ext=ext)
assert(len(data) > 0)

forwards = [RandomSampler(data, batch_size) for i in range(num_clients)]
for m in forwards:
m.forward = forward_function
torchpipe.utils.test.test(forwards, total_number)

torchpipe.utils.test.LoopSampler

对传入的数据(List)全量按顺序的送入计算。与SequentialSampler不同的是,允许start_index > len(self.data) - self.batch_size, 也就是允许total_number > len(self.data).


class LoopSampler(Sampler):
def __init__(self, data: List, batch_size=1):
super().__init__()
self.data = data
self.batch_size = batch_size
assert(len(data) >= batch_size)
self.length = len(data)
for i in range(batch_size):
self.data.append(data[i])

def __call__(self, start_index: int) -> None:
start_index = start_index%(self.length)
data = self.data[start_index: start_index+self.batch_size]
self.forward(data)

def batchsize(self):
return self.batch_size

def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")

例子

示例:遍历目录下的文件

以下演示了如何将文件夹下的所有目标文件遍历一遍并获取结果。

import torchpipe
class FileSampler(torchpipe.utils.test.LoopSampler):
def __init__(self, data: List, batch_size=1):
super.__init__(data, batch_size)
self.local_result = {}

def forward(self, data: List):
raw_bytes = []
for file_path in data:
with open(file_path, "rb") as f:
raw_bytes.append((file_path, f.read()))
self.handle_data(raw_bytes)

def handle_data(self, raw_bytes):
raise RuntimeError("Requires users to implement this function")

def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):

files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]



forwards = [FileSampler(files, i, batch_size) for i in range(num_clients)]

torchpipe.utils.test.test(forwards, len(files))

用户可以把结果临时保存在self.local_result中,最终进行汇总,也可以在运行时实时校验数据。

示例:thrift

首先,根据实际使用的thrift协议定义其数据前向函数:

import torchpipe
class ThriftSampler(torchpipe.utils.test.FileSampler):
def __init__(self, data: List, host, port,batch_size=1, ):
super.__init__(data, batch_size)
self.local_result = {}

from serve import InferenceService
from serve.ttypes import InferenceParams
self.InferenceParams = InferenceParams

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

self.transport = TSocket.TSocket(host, port)
self.transport = TTransport.TBufferedTransport(self.transport)
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)

self.client = InferenceService.Client(self.protocol)

# Connect!
self.transport.open()
self.client.ping()

def handle_data(self, raw_bytes: List[Tuple[str, bytes]]):
return self.client.infer_batch([self.InferenceParams(*x) for x in raw_bytes])

def __del__(self):
self.transport.close()

如果进行速度测试,可以预读取图片:

# for test speed
def test_function_from_preload():
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]

forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]
forward_methods = [x.handle_data for x in forwards]

torchpipe.uitls.test.test_from_raw_file(forward_methods, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)

否则,可以考虑全量数据测试:

def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):

files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]

forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]

torchpipe.utils.test.test(forwards, len(files))


不同batchsize的客户端

此处例子中,我们使用十个客户端,客户端每次请求的数据量分别为1,2,3,...10。我们验证了在这种情况下结果的一致性。 通常来讲,用户可遍历一个目录中的所有数据,并多次重复发送,以验证结果的稳定性和一致性。

tensorrt模型推理结果不一致

如果模型本身batchsize>1,tensorrt推理结果可能会有少量差异(输入数据的batchsize不同时,模型选取的优化方法可能不同);如果batchsize==1,这种差异不应该存在,然而重新生成模型后,推理结果仍然有可能变化。