Skip to main content

Test

Verification Stage

The performance testing API is currently unstable. We need to collect more feedback. This particular API underwent breaking changes in version 0.3.1b1.

torchpipe.utils.test is a performance testing and concurrent inference tool which integrated multi-client/multi-threaded. In general, we recommend using the more powerful tool called Locust for performance testing. However, if you only require a simple script to conveniently test local and remote function calls, generate test reports, or perform concurrent function invocations and result processing, this tool can be used.

locusttorchpipe.utils.test
Event-driven, supports higher concurrency, distributedBased on local threads, limited concurrency, does not support distributed
Has a web interfaceNone
Supports extensionsSupports extensions
locust -f target.pyPython functions

Test

torchpipe.utils.test.test_from_raw_file

def test_from_raw_file(forward_function: 
Callable[[List[tuple[str, bytes]]]] |
List[Callable[[List[tuple[str, bytes]]]]],
file_dir: str,
num_clients=10,
batch_size=1,
total_number=10000,
num_preload=1000,
recursive=True,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG'])

Read the files with a specific file extension from the given file path and pass them to the user's specified forward_function.

Parameters
  • forward_function - Forward function, with a function parameter List[Tuple[str, bytes]], representing batchsize [(id, file data)]. The testing program will repeatedly execute this function and record latency-related data. It can also be multiple functions of this type (List[Callable]), in which case the length should be equal to num_clients.
  • file_dir - File path
  • num_clients - Number of clients/Concurrent channels
  • batch_size Batch size of data for each request, representing the amount of test data consumed by executing the forward_function once.
  • total_number Total amount of data. Only valid when num_preload > 0.
  • num_preload The number of preloaded files. When num_preload<=0, all files will be traversed; when num_preload>0, this amount of data will be preloaded, and other data will be discarded. When 0<num_preload<total_number, the data will be fed into the pipeline repeatedly.
  • recursive Whether to recursively read files under the directory
  • ext - File extension
Example
from typing import List,Tuple
import torchpipe as tp
from torchpipe.utils.test import test_from_raw_file

model = tp.pipe({"backend":"DecodeMat","instance_num":10})

def forward_function(files : List[Tuple[str,bytes]]):
inputs = []
for file_path, file_bytes in files:
input = {"data": file_bytes}
inputs.append(input)
model(inputs)

for input in inputs:
result = input["result"]
# If you need to save the results, you can save (img_path, result) in a global Queue.

## preload 1000 images to test speed
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)
## Iterate through all the data
test_from_raw_file(forward_function, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,num_preload=0)
Result
ProjectValue
tool's version20230421.0
num_clients10
total_number10000
throughput::qps1642.34
throughput::avg6.09(ms)
latency::TP505.17
latency::TP907.56
latency::TP9921.88
latency::avg6.08
-50,-40,-20,-10,-124.82,25.84,29.37,30.6,50.34

torchpipe.utils.test.throughput

Effective from version 0.3.2rc2 onwards

def throughput(args_dict_or_str, num_clients=10, total_number=10000)

Given the model parameters, test the throughput of the model using torchpipe with the default configuration of max=4 and instance=2. The model will be accelerated using TensorRT by default.

Parameters
  • args_dict_or_str - Model parameters for testing, supports both dict and str types. For dict type parameters, please refer to the TensorrtTensor operator parameters for configuration. It must include the key model. For str type parameters, the value should be the path to the model, supporting both .onnx and .trt formats. The advantage of using dict parameters is that it allows for more control over additional parameters, such as precision and max_batch_size. The default values will be used for str type parameters.
  • num_clients - Number of clients or concurrent channels, default is 10.
  • total_number - Total amount of data, default is 10000.
Examples
import torchpipe
import torch
import os

## test throughput
onnx_path = os.path.join("/tmp", f"resnet50.onnx")
precision = "fp16"
dict_args = {
'model' : onnx_path,
'precision' : precision,
}

# type1: use dict type
result = torchpipe.utils.test.throughput(dict_args, num_clients=5, total_number=5000)

# type2: use str type
result = torchpipe.utils.test.throughput(onnx_path, num_clients=5, total_number=5000)

torchpipe.utils.test.test_function

def test_function(forward_function:Union[Callable,List[Callable]], 
num_clients=10,
batch_size=1,
total_number=10000)

Repeatedly execute the forward_function provided by the user

参数
  • forward_function - Forward function; the testing program will repeatedly execute this function and record latency-related data. It can also be a list [Function], in which case its length should be equal to num_clients.
  • num_clients - Number of clients/Concurrent channels
  • batch_size Batch size of data for each request, representing the amount of test data consumed by executing the forward_function once.
  • total_number Total amount of data.
Examples
import torchpipe as tp
from torchpipe.utils.test import test_function

model = tp.pipe({"backend":"DecodeMat","instance_num":10})

file_path = "assets/norm_jpg/dog.jpg"
with open(file_path, 'rb') as f:
file_bytes=f.read()

def run():
input = {"data": file_bytes}
model(input)
assert(input["result"].shape==(576, 768, 3))

torchpipe.utils.test.test_function(run, num_clients=5, batch_size=1,total_number=1000)

torchpipe.utils.test.test

def torchpipe.utils.test.test(sample :Union[Sampler, List[Sampler]],
total_number=10000):

Simulate multiple clients executing the computation specified by sample and collect the computation results.

parameters
  • sample - Client Data Generator, the number of which represents the quantity of clients. It needs to be defined as a subclass of the following classes:
class torchpipe.utils.test.Sampler:
def __init__(self):
pass
def __call__(self, start_index: int) -> None:
raise NotImplementedError

def batchsize(self):
return 1
  • total_number - total number to test。Each time the system executes Sampler().__call__(start_index), the data size will be Sampler().batchsize(). The call should meet the condition 0 <= start_index <= total_number - Sampler.batchsize().

torchpipe.utils.test.RandomSampler

To randomly select batch_size data from the given input list, you need to implement the forward function as the actual computation part.

class torchpipe.utils.test.RandomSampler(Sampler):
def __init__(self, data_source: List, batch_size=1):
super().__init__()
self.data_source = data_source
self.batch_size = batch_size

assert(0 < len(self.data_source))
for i in range(batch_size):
if len(self.data_source) < batch_size:
self.data_source.append(self.data_source[i])


def __call__(self, start_index: int):
data = random.sample(self.data_source, self.batch_size)
self.forward(data)

def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")

def batchsize(self):
return self.batch_size
Examples
def test_function_from_preload(forward_function, 
file_dir:str,
num_clients=10,
batch_size=1,
total_number=10000,
recursive=True,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):
from torchpipe.utils.test.Sampler import RandomSampler, preload
data = preload(file_dir=file_dir, recursive=recursive, max_number=1000, ext=ext)
assert(len(data) > 0)

forwards = [RandomSampler(data, batch_size) for i in range(num_clients)]
for m in forwards:
m.forward = forward_function
torchpipe.utils.test.test(forwards, total_number)

torchpipe.utils.test.LoopSampler

The data from the input list will be fed into the computation in a sequential order. Unlike the SequentialSampler, it allows start_index > len(self.data) - self.batch_size, which means total_number > len(self.data) is allowed.


class LoopSampler(Sampler):
def __init__(self, data: List, batch_size=1):
super().__init__()
self.data = data
self.batch_size = batch_size
assert(len(data) >= batch_size)
self.length = len(data)
for i in range(batch_size):
self.data.append(data[i])

def __call__(self, start_index: int) -> None:
start_index = start_index%(self.length)
data = self.data[start_index: start_index+self.batch_size]
self.forward(data)

def batchsize(self):
return self.batch_size

def forward(self, data: List):
raise RuntimeError("Requires users to implement this function")

Examples

To iterate through files in a directory:

The following demonstrates how to iterate through all target files in a folder and obtain the results.

import torchpipe
class FileSampler(torchpipe.utils.test.LoopSampler):
def __init__(self, data: List, batch_size=1):
super.__init__(data, batch_size)
self.local_result = {}

def forward(self, data: List):
raw_bytes = []
for file_path in data:
with open(file_path, "rb") as f:
raw_bytes.append((file_path, f.read()))
self.handle_data(raw_bytes)

def handle_data(self, raw_bytes):
raise RuntimeError("Requires users to implement this function")

def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):

files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]



forwards = [FileSampler(files, i, batch_size) for i in range(num_clients)]

torchpipe.utils.test.test(forwards, len(files))

Users can temporarily store the results in self.local_result and then consolidate them at the end, or they can validate the data in real-time while running.

Example: thrift

First, define the data forwarding function based on the actual use of the Thrift protocol:

import torchpipe
class ThriftSampler(torchpipe.utils.test.FileSampler):
def __init__(self, data: List, host, port,batch_size=1, ):
super.__init__(data, batch_size)
self.local_result = {}

from serve import InferenceService
from serve.ttypes import InferenceParams
self.InferenceParams = InferenceParams

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

self.transport = TSocket.TSocket(host, port)
self.transport = TTransport.TBufferedTransport(self.transport)
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)

self.client = InferenceService.Client(self.protocol)

# Connect!
self.transport.open()
self.client.ping()

def handle_data(self, raw_bytes: List[Tuple[str, bytes]]):
return self.client.infer_batch([self.InferenceParams(*x) for x in raw_bytes])

def __del__(self):
self.transport.close()

To conduct speed tests, you can pre-load the images:

# for test speed
def test_function_from_preload():
files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]

forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]
forward_methods = [x.handle_data for x in forwards]

torchpipe.uitls.test.test_from_raw_file(forward_methods, os.path.join("assets/norm_jpg/"),
num_clients=10, batch_size=1,total_number=10000)

Otherwise, you can consider conducting a full data test:

def test_all_files(file_dir:str, num_clients=10, batch_size = 1,
ext=[".jpg", '.JPG', '.jpeg', '.JPEG']):

files = [x for x in os.listdir(file_dir) if os.path.splitext(x)[-1] in ext]
files = [os.path.join(file_dir, x) for x in files]

forwards = [ThriftSampler(files, i, batch_size) for i in range(num_clients)]

torchpipe.utils.test.test(forwards, len(files))


Clients with Different Batch Sizes

In the example provided here, we use ten clients, each requesting different amounts of data per request, ranging from 1 to 10. We validate the consistency of the results in this case.

Typically, users can iterate through all the data in a directory and repeatedly send requests to verify the stability and consistency of the results.

Inconsistent Results for TensorRT Model Inference

If the model itself has a batch size larger than 1, there may be slight differences in the inference results with TensorRT (as the optimization methods chosen by the model may differ for different input data batch sizes). If the batch size is 1, such differences should not exist. However, even after regenerating the model, there is still a possibility of changes in the inference results.