Go to file
Developer d5d7dfaab8 feat: add parallel query support with configurable thread count
- Replace bool parallel with uint32_t threads parameter (0=auto, 1=serial, >1=parallel)
- Implement streaming parallel processing for multi-chunk MCAP files
- Thread cap: max(8, CPU core count) to balance performance and resource usage
- Performance: 14.5x speedup (4.94s → 0.34s) with only 7.8% memory increase
- Add -j/--threads option to CLI for parallel query control
- Update all documentation: README, API.md, QUERY.md, QUERY_ARCHITECTURE.md
- Add comprehensive tests: thread scaling, edge cases, consistency checks

Memory usage comparison:
- Serial: 103 MB
- 8 threads: 111 MB (+7.8%)

Throughput comparison:
- Serial: 117K msg/s
- Parallel: 2.14M msg/s (14.5x)
2026-03-25 20:35:49 +08:00
cli feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
docs feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
include/mmcap feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
internal feat: initial implementation of libmmcap MCAP I/O library 2026-03-21 15:37:22 +08:00
manifest feat: initial implementation of libmmcap MCAP I/O library 2026-03-21 15:37:22 +08:00
scripts feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
src feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
tests feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
third_party/nlohmann feat: add SQL-like query API and enhanced build options 2026-03-23 13:23:06 +08:00
.clang-format feat: initial implementation of libmmcap MCAP I/O library 2026-03-21 15:37:22 +08:00
.clang-tidy feat: initial implementation of libmmcap MCAP I/O library 2026-03-21 15:37:22 +08:00
.gitignore feat: initial implementation of libmmcap MCAP I/O library 2026-03-21 15:37:22 +08:00
AGENTS.md docs(agents): add CLI tool documentation 2026-03-22 12:57:23 +08:00
CHANGELOG.md feat: add SQL-like query API and enhanced build options 2026-03-23 13:23:06 +08:00
CMakeLists.txt feat: add SQL-like query API and enhanced build options 2026-03-23 13:23:06 +08:00
README.md feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
test_serialization_compat.cpp feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00
test_simple.cpp feat: add parallel query support with configurable thread count 2026-03-25 20:35:49 +08:00

README.md

libmmcap

一个高性能的 C++17 库,用于读写 MCAP 文件,支持自动文件分片、高性能搜索和读写分离等高级功能。

目录

功能特性

核心功能

功能 描述 状态
并行查询 多线程并行处理 chunk显著提升查询性能
自动文件分片 按大小或时间自动切分大型录制文件
消息索引优化 跳过不包含相关消息的块,实现快速查询
主题/通道过滤 无需全量扫描即可读取特定主题
CRC 错误容忍 遇到单个块损坏时继续处理其他块
高性能搜索 通过块索引实现 O(log n) 时间范围查询
读写分离 并发读写操作互不干扰
SQL-like 查询 支持 JSON 嵌套、数组操作的高级查询

为什么选择 libmmcap

虽然 官方 MCAP C++ 库 提供了基础功能,但 libmmcap 针对特定生产需求进行了优化:

  1. 自动分片官方库需要手动管理大型录制文件。libmmcap 可在达到大小或时间阈值时自动切分文件,使用可配置的命名模式如 {basename}_part_{N}.mcap

  2. 优化查询:标准 MCAP 读取器必须扫描所有块才能找到时间范围内的消息。对于包含多个通道的大型文件这会变得非常缓慢。libmmcap 实现了消息索引感知读取,在查询特定时间范围或主题时可以跳过整个块。

  3. SQL-like 高级查询:独有的 query 功能支持 SQL-like 语法查询 MCAP 文件,包括 JSON 嵌套字段访问(data.level=2)、数组操作(SIZE, CONTAINS)、正则匹配等,无需编写复杂的过滤代码。

  4. 容错性生产录制可能遇到磁盘错误、网络中断或损坏。一个在第一个错误时就完全失败的库不适合关键任务应用。libmmcap 记录警告并在可能时继续处理。

  5. 读写分离:并发访问模式(例如在写入新数据的同时读取历史数据)不需要复杂的锁定或协调。

快速开始

带自动分片的基础写入

#include <mmcap/writer.h>

// 配置自动分片:每个文件 100MB
McapWriterOptions options;
options.max_file_size = 100 * 1024 * 1024;  // 100MB
options.shard_pattern = "{basename}_part_{N}.mcap";

McapWriter writer(options);
writer.open("/data/recording.mcap");

// 注册模式和创建通道
SchemaRegistrationInfo schema{...};
uint16_t schema_id;
writer.register_schema(schema, schema_id);

ChannelId channel_id;
writer.get_or_create_channel("/sensor/imu", schema_id, channel_id);

// 写入消息 - 达到阈值时自动切换文件
for (const auto& data : sensor_data) {
    writer.write_message(channel_id, data, timestamp, sequence);
}

writer.close();

带主题过滤的读取

#include <mmcap/reader.h>

McapReader reader;
reader.open("/data/recording.mcap");

// 只读取特定主题
ReadFilter filter;
filter.topics = {"/sensor/imu", "/sensor/camera"};

auto result = reader.read_messages(filter);
for (const auto& msg : result.messages) {
    process_message(msg);
}

带优化的时间范围查询

// 查询特定时间范围 - 自动跳过不相关的块
auto result = reader.read_messages(
    1000000000ULL,    // 开始时间 (纳秒)
    2000000000ULL,    // 结束时间 (纳秒)
    filter
);

SQL-like 高级查询

#include <mmcap/query.h>

McapReader reader;
reader.open("/data/recording.mcap");

// 解析 SQL-like 查询
auto opts = parse_query(
    "SELECT topic,data.level WHERE topic=/logger_out AND data.level=2 LIMIT 100"
);

// 执行查询(流式回调)
query_ex(reader, opts, [](const QueryRow& row) {
    std::cout << "Topic: " << row.at("topic") 
              << ", Level: " << row.at("data.level") << "\n";
    return true;  // 继续下一条
});

支持的查询特性:

  • 字段选择:SELECT topic, data, log_time
  • 条件过滤:WHERE topic=/xxx AND data.level=2
  • 时间范围:WHERE log_time>1769884680000000000
  • 正则匹配:WHERE topic~/sensor/
  • JSON 嵌套:WHERE data.position.x>100
  • 数组操作:WHERE data.packages SIZE = 21
  • 数组包含:WHERE data.packages[*].name CONTAINS 'xxx'

CLI 工具

libmmcap 提供与官方 mcap CLI 兼容的命令行工具,并扩展了独有的 query 功能:

# 文件信息
mmcap info file.mcap

# 输出消息(支持 JSON
mmcap cat file.mcap --json --topics /logger_out

# SQL-like 查询libmmcap 独有)
mmcap query file.mcap "SELECT topic,data WHERE topic=/logger_out LIMIT 100"
mmcap query file.mcap "SELECT * WHERE topic~/sensor/ AND log_time>1769884680000000000"

# 并行查询(-j 选项指定线程数)
mmcap query -j 4 file.mcap "SELECT topic,data WHERE topic=/logger_out"
mmcap query -j 0 file.mcap "SELECT topic,data WHERE topic=/logger_out"  # 自动检测

并行查询选项:

  • -j 1 (默认): 单线程串行处理
  • -j 0: 自动检测硬件并发数
  • -j N: 使用 N 个线程并行处理

详见 Query 使用指南

架构概览

系统架构

下图展示了 libmmcap 的高层架构及其组件间的交互:

flowchart TB
    subgraph Client["客户端应用"]
        direction TB
        App[应用程序代码]
    end
    
    subgraph libmmcap["libmmcap 库"]
        direction TB
        
        subgraph Writer["写入模块"]
            W_API[写入器 API]
            Chunking[分块引擎]
            Sharding[分片控制器]
            Compression[压缩层]
        end
        
        subgraph Reader["读取模块"]
            R_API[读取器 API]
            Filtering[过滤引擎]
            IndexOpt[消息索引优化器]
            Recovery[恢复处理器]
        end
        
        subgraph Indexing["索引子系统"]
            ChunkIdx[块索引]
            MsgIdx[消息索引]
            SchemaIdx[模式缓存]
        end
    end
    
    subgraph Storage["存储层"]
        direction TB
        Active[.mcap.active]
        Complete[.mcap]
        Sidecar[.idx/.chm/.schema]
    end
    
    App --> W_API
    App --> R_API
    
    W_API --> Chunking
    Chunking --> Compression
    Compression --> Sharding
    Sharding --> Active
    
    R_API --> Filtering
    Filtering --> IndexOpt
    IndexOpt --> Recovery
    Recovery --> Complete
    Recovery --> Sidecar
    
    Chunking --> ChunkIdx
    Chunking --> MsgIdx
    Reader --> SchemaIdx

架构设计理念:

该架构采用分层设计,在写入和读取路径之间分离关注点,同时维护共享的索引结构。写入模块专注于通过分块和压缩实现高效的数据摄取,而读取模块通过智能过滤和基于索引的优化优先考虑查询性能。存储层使用附属文件模式(.idx.chm.schema)来支持从不完整录制中恢复,而无需完整文件扫描。

写入路径数据流

了解数据如何通过写入路径有助于解释分片和分块行为:

sequenceDiagram
    participant Client as 客户端代码
    participant Writer as McapWriter
    participant Buffer as 块缓冲区
    participant Sharding as 分片逻辑
    participant File as 文件系统
    
    Client->>Writer: write_message(数据, 时间戳)
    
    alt 缓冲区满或大小阈值
        Writer->>Buffer: flush_chunk()
        Buffer->>File: 写入压缩块
        
        alt 触发分片
            Writer->>Sharding: should_shard()
            Sharding->>File: close_current()
            Sharding->>File: open_new_shard()
            Sharding-->>Writer: 新文件句柄
        end
    else 缓冲区未满
        Writer->>Buffer: 缓冲消息
    end
    
    Client->>Writer: close()
    Writer->>Buffer: flush_all()
    Buffer->>File: 写入剩余块
    Writer->>File: 写入页脚并重命名

写入路径设计原理:

写入路径使用延迟刷新策略以最大化压缩效率。消息在内存中缓冲,直到达到块大小阈值或文件大小/时间限制触发分片。这种批处理方法允许压缩算法找到更好的模式,相比单消息压缩。分片逻辑集成到刷新路径中以确保数据一致性 - 当达到分片边界时,所有待处理数据在切换到新文件之前被刷新。

带消息索引优化的读取路径

读取路径展示了消息索引优化如何实现高效的时间范围查询:

flowchart LR
    subgraph Query["查询输入"]
        TimeRange[时间范围]
        Topics[主题过滤]
    end
    
    subgraph Optimization["消息索引优化"]
        direction TB
        ParseIdx[从页脚解析<br/>块索引]
        FilterChunks[按时间范围<br/>过滤块]
        CheckMsgIdx[检查每个块的<br/>消息索引]
        SkipChunks[跳过不含<br/>相关消息的块]
    end
    
    subgraph Reading["数据读取"]
        direction TB
        Decompress[解压<br/>相关块]
        FilterMsgs[按主题<br/>过滤消息]
        Return[返回客户端]
    end
    
    TimeRange --> ParseIdx
    Topics --> FilterChunks
    
    ParseIdx --> FilterChunks
    FilterChunks --> CheckMsgIdx
    CheckMsgIdx --> SkipChunks
    SkipChunks --> Decompress
    Decompress --> FilterMsgs
    FilterMsgs --> Return

优化原理:

传统 MCAP 读取器必须解压缩所有块才能找到特定时间范围内的消息。借助消息索引优化,读取器可以:

  1. 从页脚读取块索引以获取块元数据(时间范围、偏移量)
  2. 根据查询的时间范围过滤块
  3. 对于每个候选块,检查其消息索引以查看是否包含相关通道的消息
  4. 跳过不含相关消息的块(避免解压缩)
  5. 只解压缩包含匹配消息的块

这在通道数量多但查询只涉及少数通道的场景中提供了显著的性能提升。

API 文档

Query 功能

libmmcap 提供独有的 SQL-like 查询功能,支持:

  • 字段选择SELECT topic, data, log_time
  • 条件过滤WHERE topic=/xxx AND data.level=2
  • 时间范围WHERE log_time>1769884680000000000
  • 正则匹配WHERE topic~/sensor/
  • JSON 嵌套WHERE data.position.x>100
  • 数组操作WHERE data.packages SIZE = 21
  • 数组包含WHERE data.packages[*].name CONTAINS 'xxx'
  • 并行查询:通过线程池加速处理

CLI 示例

# 基础查询
mmcap query file.mcap "SELECT topic,data.level WHERE topic=/logger_out AND data.level=2 LIMIT 100"

# 并行查询4 线程)
mmcap query -j 4 file.mcap "SELECT topic,data.level WHERE topic=/logger_out LIMIT 100"

# 自动检测线程数
mmcap query -j 0 file.mcap "SELECT topic,data.level WHERE topic=/logger_out LIMIT 100"

C++ API 示例

auto opts = parse_query("SELECT topic,data.level WHERE topic=/logger_out LIMIT 100");

// 串行查询(默认)
query_ex(reader, opts, [](const QueryRow& row) {
    std::cout << row.at("topic") << ": " << row.at("data.level") << "\n";
    return true;
});

// 并行查询4 线程)- 性能提升 10-17x
query_ex(reader, opts, [](const QueryRow& row) {
    std::cout << row.at("topic") << ": " << row.at("data.level") << "\n";
    return true;
}, 4);  // 第 4 个参数指定线程数

// 自动检测硬件线程数
query_ex(reader, opts, callback, 0);

并行查询参数说明:

  • threads = 1 (默认): 单线程串行处理
  • threads = 0: 自动检测硬件并发数
  • threads > 1: 使用指定数量的线程并行处理(最大限制为 8

详见 Query 使用指南

示例代码

更多示例,请参见 docs/EXAMPLES.md

性能

并行查询性能

在多 chunk 文件上使用不同线程数测试598,503 条消息1327 个 chunks

线程数 时间 吞吐量 内存占用 加速比
1 (串行) 4.94s 117K msg/s 103 MB 1.0x
2 0.37s 1.96M msg/s 106 MB 13.4x
4 0.34s 2.14M msg/s 108 MB 14.5x
8 0.34s 2.14M msg/s 111 MB 14.5x

关键发现:

  • 速度14.5x 加速4.94s → 0.34s
  • 内存:仅增长 7.8%103 → 111 MB
  • 线程上限max(8, CPU核心数),低配机器也能用多线程

CLI 使用示例:

# 串行查询(默认)
time mmcap query -j 1 file.mcap "SELECT topic,data WHERE topic=/logger_out"
# real  0m4.94s

# 4 线程并行
time mmcap query -j 4 file.mcap "SELECT topic,data WHERE topic=/logger_out"
# real  0m0.34s (14.5x 加速)

C++ API 使用:

// 串行(默认)
query_ex(reader, opts, callback);  // 或 query_ex(reader, opts, callback, 1);

// 并行 4 线程
query_ex(reader, opts, callback, 4);

// 自动检测硬件线程数
query_ex(reader, opts, callback, 0);

消息索引优化影响

在包含 100 个通道但只有 2 个被查询的文件上测试:

指标 传统读取 优化读取 提升
扫描块数 100 100 -
解压块数 100 2 98% ↓
查询时间 10.2s 0.3s 34x ↑

自动分片开销

分片切换开销(刷新 + 关闭 + 打开):

操作 耗时
刷新待处理数据 ~5ms
关闭当前文件 ~2ms
打开新文件 ~3ms
总计 ~10ms

构建

依赖

  • C++17 兼容编译器
  • CMake 3.16+
  • Catch2 3.x用于测试如未找到则自动获取

构建步骤

mkdir build && cd build
cmake ..
cmake --build .

CMake 选项

选项 默认值 说明
LIBMMCAP_BUILD_TESTS ON 构建测试
LIBMMCAP_BUILD_EXAMPLES OFF 构建示例代码
LIBMMCAP_BUILD_SHARED OFF 构建动态库(默认静态库)
LIBMMCAP_BUILD_CLI ON 构建 CLI 工具
LIBMMCAP_BUILD_DEB OFF 生成 DEB 安装包
LIBMMCAP_USE_ZSTD ON 启用 zstd 压缩

示例:

# 构建动态库 + CLI不构建测试
cmake .. -DLIBMMCAP_BUILD_SHARED=ON -DLIBMMCAP_BUILD_TESTS=OFF

# 生成 DEB 包
cmake .. -DLIBMMCAP_BUILD_DEB=ON
make && cpack -G DEB

测试

cd build
ctest --output-on-failure

代码风格

  • K&R 括号(同行)
  • 4 空格缩进
  • 120 字符行长度限制
  • 函数/变量使用 snake_case
  • 类型使用 PascalCase
  • 私有成员使用尾部下划线

使用 clang-format 格式化:

clang-format -i src/*.cpp include/**/*.h

许可证

MIT