推理
torchpipe.pipe
初始化
torchpipe.pipe 的初始化接口为
import torchpipe as tp
models = tp.pipe(config: Union[Dict[str, str] | Dict[str, Dict[str, str]] | str])
- 从字典
- 从双层字典
- 从toml文件
class torchpipe.pipe(config: Dict[str, str])
config: Dict[str, str]
: 配置参数会传入途径的后端进行解析。特定的后端可能会对配置进行参数展开.
config = {"backend":"DecodeMat"};
class torchpipe.pipe(config: Dict[str, Dict[str, str]])
用于多个节点配置:
{节点名: Dict[str, str]}
. 节点名不可为global
或者空。
config: Dict[str, Dict[str, str]]
: 配置参数会传入途径的后端进行解析。特定的后端可能会对配置进行参数展开. 可通过global
设置全局默认参数。
config = {"decoder":{"backend":"DecodeMat"},
"decoder_gpu":{"backend":"SyncTensor[DecodeTensor]"},
"global":{"instance_num":"2"}};
class torchpipe.pipe(config: str)
从文件解析参数。
config: str
: 以.toml
为后缀的toml格式文件。
# 值为str,或者可以转化为str,如 int, float. 不接受布尔值。
instance_num=2
[decoder]
backend="DecodeMat"
[decoder_gpu]
backend="SyncTensor[DecodeTensor]"
前向
torchpipe.pipe 的前向接口为
class torchpipe.pipe
def __call__(self, data: Dict[str, Any] | List[Dict[str, Any]]) -> None
线程安全的前向计算
- 单个数据
- 多个数据
def __call__(self, data: Dict[str, Any]) -> None
def __call__(self, data: List[Dict[str, Any]]) -> None
当其中一个数据计算过程抛出异常时,所有结果都不可用。当部分结果没有'result'时,其他结果仍然可用。
一般来讲,这个接口不建议用在除了在python中一次性处理超大量数据以外的其他场景。
批量处理数据的替代方式
- 计算流水线保持简单,通过线程池的方式去处理多输入:每一个
self.executor =ThreadPoolExecutor(max_workers=15)
# forward pass
future_tasks = self.executor.map(self.forward_for_single_input, data)forward_for_single_input
函数定义了数据从输入到最终结果的多个计算步骤。 - 对于涉及到检测等上下文分裂的情景,可以通过MapReduce进行调度。
系统保留键值
定义 | 备注 | |
---|---|---|
TASK_DATA_KEY | data | 用于输入之一键值 |
TASK_RESULT_KEY | result | 用于输出之一键值 |
TASK_CONTEXT_KEY | context | 用于全局共享上下文语法糖 |
TASK_EVENT_KEY | event | |
"_*" | 所有以下划线开始的字符串 | |
TASK_NODE_NAME_KEY | node_name | |
"global" | 目前用于表示全局设置 | |
"default" | ||
"TASK_*_KEY" | 以TASK_ 开头以_KEY 结束的字符串 |