The performance testing API is currently unstable. We need to collect more feedback. This particular API underwent breaking changes in version 0.3.1b1.
torchpipe.utils.test is a performance testing and concurrent inference tool which integrated multi-client/multi-threaded. In general, we recommend using the more powerful tool called Locust for performance testing. However, if you only require a simple script to conveniently test local and remote function calls, generate test reports, or perform concurrent function invocations and result processing, this tool can be used.
locust | torchpipe.utils.test |
Event-driven, supports higher concurrency, distributed | Based on local threads, limited concurrency, does not support distributed |
Has a web interface | None |
Supports extensions | Supports extensions |
locust -f target.py | Python functions |
def test_from_raw_file(forward_function:
Callable[[List[tuple[str, bytes]]]] |
List[Callable[[List[tuple[str, bytes]]]]],
file_dir: str,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG'])
Read the files with a specific file extension from the given file path and pass them to the user's specified
- forward_function - Forward function, with a function parameter
List[Tuple[str, bytes]]
, representing batchsize [(id, file data)]. The testing program will repeatedly execute this function and record latency-related data. It can also be multiple functions of this type (List[Callable]
), in which case the length should be equal to num_clients. - file_dir - File path
- num_clients - Number of clients/Concurrent channels
- batch_size Batch size of data for each request, representing the amount of test data consumed by executing the
once. - total_number Total amount of data. Only valid when
num_preload > 0
. - num_preload The number of preloaded files. When
, all files will be traversed; whennum_preload>0
, this amount of data will be preloaded, and other data will be discarded. When0<num_preload<total_number
, the data will be fed into the pipeline repeatedly. - recursive Whether to recursively read files under the directory
- ext - File extension
from typing import List,Tuple
import torchpipe as tp
from torchpipe.utils.test import test_from_raw_file
model = tp.pipe({"backend":"DecodeMat","instance_num":10})
def forward_function(files : List[Tuple[str,bytes]]):
inputs = []
for file_path, file_bytes in files:
input = {"data": file_bytes}
for input in inputs:
result = input["result"]
# If you need to save the results, you can save (img_path, result) in a global Queue.
## preload 1000 images to test speed
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)
## Iterate through all the data
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,num_preload=0)
Project | Value |
tool's version | 20230421.0 |
num_clients | 10 |
total_number | 10000 |
throughput::qps | 1642.34 |
throughput::avg | 6.09(ms) |
latency::TP50 | 5.17 |
latency::TP90 | 7.56 |
latency::TP99 | 21.88 |
latency::avg | 6.08 |
-50,-40,-20,-10,-1 | 24.82,25.84,29.37,30.6,50.34 |
Effective from version 0.3.2rc2 onwards
def throughput(args_dict_or_str, num_clients=10, total_number=10000)
Given the model parameters, test the throughput of the model using torchpipe with the default configuration of max=4 and instance=2. The model will be accelerated using TensorRT by default.
- args_dict_or_str - Model parameters for testing, supports both dict and str types. For dict type parameters, please refer to the TensorrtTensor operator parameters for configuration.
It must include the key
. For str type parameters, the value should be the path to the model, supporting both .onnx and .trt formats. The advantage of using dict parameters is that it allows for more control over additional parameters, such asprecision
. The default values will be used for str type parameters. - num_clients - Number of clients or concurrent channels, default is 10.
- total_number - Total amount of data, default is 10000.
import torchpipe
import torch
import os
## test throughput
onnx_path = os.path.join("/tmp", f"resnet50.onnx")
precision = "fp16"
dict_args = {
'model' : onnx_path,
'precision' : precision,
# type1: use dict type
result = torchpipe.utils.test.throughput(dict_args, num_clients=5, total_number=5000)
# type2: use str type
result = torchpipe.utils.test.throughput(onnx_path, num_clients=5, total_number=5000)
def test_function(forward_function:Union[Callable,List[Callable]],
Repeatedly execute the
provided by the user
- forward_function - Forward function; the testing program will repeatedly execute this function and record latency-related data. It can also be a list
, in which case its length should be equal tonum_clients
. - num_clients - Number of clients/Concurrent channels
- batch_size Batch size of data for each request, representing the amount of test data consumed by executing the
once. - total_number Total amount of data.
import torchpipe as tp
from torchpipe.utils.test import test_function
model = tp.pipe({"backend":"DecodeMat","instance_num":10})
file_path = "assets/norm_jpg/dog.jpg"
with open(file_path, 'rb') as f:
def run():
input = {"data": file_bytes}
assert(input["result"].shape==(576, 768, 3))
torchpipe.utils.test.test_function(run, num_clients=5, batch_size=1,total_number=1000)
def torchpipe.utils.test.test(sample :Union[Sampler, List[Sampler]],
Simulate multiple clients executing the computation specified by
and collect the computation results.
- sample - Client Data Generator, the number of which represents the quantity of clients. It needs to be defined as a subclass of the following classes:
class torchpipe.utils.test.Sampler:
def __init__(self):
def __call__(self, start_index: int) -> None:
raise NotImplementedError
def batchsize(self):
return 1
- total_number - total number to test。Each time the system executes
, the data size will beSampler().batchsize()
. The call should meet the condition0 <= start_index <= total_number - Sampler.batchsize()
To randomly select batch_size data from the given input list, you need to implement the
function as the actual computation part.
class torchpipe.utils.test.RandomSampler(Sampler):
def __init__(self, data_source: List, batch_size=1):
self.data_source = data_source
self.batch_size = batch_size
assert(0 < len(self.data_source))
for i in range(batch_size):
if len(self.data_source) < batch_size:
def __call__(self, start_index: int):
data = random.sample(self.data_source, self.batch_size)
def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")
def batchsize(self):
return self.batch_size
def test_function_from_preload(forward_function,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
from torchpipe.utils.test.Sampler import RandomSampler, preload
data = preload(file_dir=file_dir, recursive=recursive, max_number=1000, ext=ext)
assert(len(data) > 0)
forwards = [RandomSampler(data, batch_size) for i in range(num_clients)]
for m in forwards:
m.forward = forward_function
torchpipe.utils.test.test(forwards, total_number)
The data from the input list will be fed into the computation in a sequential order. Unlike the SequentialSampler, it allows
start_index > len(self.data) - self.batch_size
, which meanstotal_number > len(self.data)
is allowed.
class LoopSampler(Sampler):
def __init__(self, data: List, batch_size=1):
self.data = data
self.batch_size = batch_size
assert(len(data) >= batch_size)
self.length = len(data)
for i in range(batch_size):
def __call__(self, start_index: int) -> None:
start_index = start_index%(self.length)
data = self.data[start_index: start_index+self.batch_size]
def batchsize(self):
return self.batch_size
def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")
To iterate through files in a directory:
The following demonstrates how to iterate through all target files in a folder and obtain the results.
import torchpipe
class FileSampler(torchpipe.utils.test.LoopSampler):
def __init__(self, data: List, batch_size=1):
super.__init__(data, batch_size)
self.local_result = {}
def forward(self, data: List):
raw_bytes = []
for file_path in data:
with open(file_path, "rb") as f:
raw_bytes.append((file_path, f.read()))
def handle_data(self, raw_bytes):
raise RuntimeError("Requires users to implement this function")
def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]
forwards = [FileSampler(files, i, batch_size) for i in range(num_clients)]
torchpipe.utils.test.test(forwards, len(files))
Users can temporarily store the results in self.local_result
and then consolidate them at the end, or they can validate the data in real-time while running.
Example: thrift
First, define the data forwarding function based on the actual use of the Thrift protocol:
import torchpipe
class ThriftSampler(torchpipe.utils.test.FileSampler):
def __init__(self, data: List, host, port,batch_size=1, ):
super.__init__(data, batch_size)
self.local_result = {}
from serve import InferenceService
from serve.ttypes import InferenceParams
self.InferenceParams = InferenceParams
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
self.transport = TSocket.TSocket(host, port)
self.transport = TTransport.TBufferedTransport(self.transport)
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = InferenceService.Client(self.protocol)
# Connect!
def handle_data(self, raw_bytes: List[Tuple[str, bytes]]):
return self.client.infer_batch([self.InferenceParams(*x) for x in raw_bytes])
def __del__(self):
To conduct speed tests, you can pre-load the images:
# for test speed
def test_function_from_preload():
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]
forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]
forward_methods = [x.handle_data for x in forwards]
torchpipe.uitls.test.test_from_raw_file(forward_methods, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)
Otherwise, you can consider conducting a full data test:
def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]
forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]
torchpipe.utils.test.test(forwards, len(files))
Clients with Different Batch Sizes
In the example provided here, we use ten clients, each requesting different amounts of data per request, ranging from 1 to 10. We validate the consistency of the results in this case.
Typically, users can iterate through all the data in a directory and repeatedly send requests to verify the stability and consistency of the results.
If the model itself has a batch size larger than 1, there may be slight differences in the inference results with TensorRT (as the optimization methods chosen by the model may differ for different input data batch sizes). If the batch size is 1, such differences should not exist. However, even after regenerating the model, there is still a possibility of changes in the inference results.