- Replace bool parallel with uint32_t threads parameter (0=auto, 1=serial, >1=parallel) - Implement streaming parallel processing for multi-chunk MCAP files - Thread cap: max(8, CPU core count) to balance performance and resource usage - Performance: 14.5x speedup (4.94s → 0.34s) with only 7.8% memory increase - Add -j/--threads option to CLI for parallel query control - Update all documentation: README, API.md, QUERY.md, QUERY_ARCHITECTURE.md - Add comprehensive tests: thread scaling, edge cases, consistency checks Memory usage comparison: - Serial: 103 MB - 8 threads: 111 MB (+7.8%) Throughput comparison: - Serial: 117K msg/s - Parallel: 2.14M msg/s (14.5x) |
||
|---|---|---|
| cli | ||
| docs | ||
| include/mmcap | ||
| internal | ||
| manifest | ||
| scripts | ||
| src | ||
| tests | ||
| third_party/nlohmann | ||
| .clang-format | ||
| .clang-tidy | ||
| .gitignore | ||
| AGENTS.md | ||
| CHANGELOG.md | ||
| CMakeLists.txt | ||
| README.md | ||
| test_serialization_compat.cpp | ||
| test_simple.cpp | ||
README.md
libmmcap
一个高性能的 C++17 库,用于读写 MCAP 文件,支持自动文件分片、高性能搜索和读写分离等高级功能。
目录
功能特性
核心功能
| 功能 | 描述 | 状态 |
|---|---|---|
| 并行查询 | 多线程并行处理 chunk,显著提升查询性能 | ✅ |
| 自动文件分片 | 按大小或时间自动切分大型录制文件 | ✅ |
| 消息索引优化 | 跳过不包含相关消息的块,实现快速查询 | ✅ |
| 主题/通道过滤 | 无需全量扫描即可读取特定主题 | ✅ |
| CRC 错误容忍 | 遇到单个块损坏时继续处理其他块 | ✅ |
| 高性能搜索 | 通过块索引实现 O(log n) 时间范围查询 | ✅ |
| 读写分离 | 并发读写操作互不干扰 | ✅ |
| SQL-like 查询 | 支持 JSON 嵌套、数组操作的高级查询 | ✅ |
为什么选择 libmmcap?
虽然 官方 MCAP C++ 库 提供了基础功能,但 libmmcap 针对特定生产需求进行了优化:
-
自动分片:官方库需要手动管理大型录制文件。libmmcap 可在达到大小或时间阈值时自动切分文件,使用可配置的命名模式如
{basename}_part_{N}.mcap。 -
优化查询:标准 MCAP 读取器必须扫描所有块才能找到时间范围内的消息。对于包含多个通道的大型文件,这会变得非常缓慢。libmmcap 实现了消息索引感知读取,在查询特定时间范围或主题时可以跳过整个块。
-
SQL-like 高级查询:独有的
query功能支持 SQL-like 语法查询 MCAP 文件,包括 JSON 嵌套字段访问(data.level=2)、数组操作(SIZE,CONTAINS)、正则匹配等,无需编写复杂的过滤代码。 -
容错性:生产录制可能遇到磁盘错误、网络中断或损坏。一个在第一个错误时就完全失败的库不适合关键任务应用。libmmcap 记录警告并在可能时继续处理。
-
读写分离:并发访问模式(例如在写入新数据的同时读取历史数据)不需要复杂的锁定或协调。
快速开始
带自动分片的基础写入
#include <mmcap/writer.h>
// 配置自动分片:每个文件 100MB
McapWriterOptions options;
options.max_file_size = 100 * 1024 * 1024; // 100MB
options.shard_pattern = "{basename}_part_{N}.mcap";
McapWriter writer(options);
writer.open("/data/recording.mcap");
// 注册模式和创建通道
SchemaRegistrationInfo schema{...};
uint16_t schema_id;
writer.register_schema(schema, schema_id);
ChannelId channel_id;
writer.get_or_create_channel("/sensor/imu", schema_id, channel_id);
// 写入消息 - 达到阈值时自动切换文件
for (const auto& data : sensor_data) {
writer.write_message(channel_id, data, timestamp, sequence);
}
writer.close();
带主题过滤的读取
#include <mmcap/reader.h>
McapReader reader;
reader.open("/data/recording.mcap");
// 只读取特定主题
ReadFilter filter;
filter.topics = {"/sensor/imu", "/sensor/camera"};
auto result = reader.read_messages(filter);
for (const auto& msg : result.messages) {
process_message(msg);
}
带优化的时间范围查询
// 查询特定时间范围 - 自动跳过不相关的块
auto result = reader.read_messages(
1000000000ULL, // 开始时间 (纳秒)
2000000000ULL, // 结束时间 (纳秒)
filter
);
SQL-like 高级查询
#include <mmcap/query.h>
McapReader reader;
reader.open("/data/recording.mcap");
// 解析 SQL-like 查询
auto opts = parse_query(
"SELECT topic,data.level WHERE topic=/logger_out AND data.level=2 LIMIT 100"
);
// 执行查询(流式回调)
query_ex(reader, opts, [](const QueryRow& row) {
std::cout << "Topic: " << row.at("topic")
<< ", Level: " << row.at("data.level") << "\n";
return true; // 继续下一条
});
支持的查询特性:
- 字段选择:
SELECT topic, data, log_time - 条件过滤:
WHERE topic=/xxx AND data.level=2 - 时间范围:
WHERE log_time>1769884680000000000 - 正则匹配:
WHERE topic~/sensor/ - JSON 嵌套:
WHERE data.position.x>100 - 数组操作:
WHERE data.packages SIZE = 21 - 数组包含:
WHERE data.packages[*].name CONTAINS 'xxx'
CLI 工具
libmmcap 提供与官方 mcap CLI 兼容的命令行工具,并扩展了独有的 query 功能:
# 文件信息
mmcap info file.mcap
# 输出消息(支持 JSON)
mmcap cat file.mcap --json --topics /logger_out
# SQL-like 查询(libmmcap 独有)
mmcap query file.mcap "SELECT topic,data WHERE topic=/logger_out LIMIT 100"
mmcap query file.mcap "SELECT * WHERE topic~/sensor/ AND log_time>1769884680000000000"
# 并行查询(-j 选项指定线程数)
mmcap query -j 4 file.mcap "SELECT topic,data WHERE topic=/logger_out"
mmcap query -j 0 file.mcap "SELECT topic,data WHERE topic=/logger_out" # 自动检测
并行查询选项:
-j 1(默认): 单线程串行处理-j 0: 自动检测硬件并发数-j N: 使用 N 个线程并行处理
详见 Query 使用指南。
架构概览
系统架构
下图展示了 libmmcap 的高层架构及其组件间的交互:
flowchart TB
subgraph Client["客户端应用"]
direction TB
App[应用程序代码]
end
subgraph libmmcap["libmmcap 库"]
direction TB
subgraph Writer["写入模块"]
W_API[写入器 API]
Chunking[分块引擎]
Sharding[分片控制器]
Compression[压缩层]
end
subgraph Reader["读取模块"]
R_API[读取器 API]
Filtering[过滤引擎]
IndexOpt[消息索引优化器]
Recovery[恢复处理器]
end
subgraph Indexing["索引子系统"]
ChunkIdx[块索引]
MsgIdx[消息索引]
SchemaIdx[模式缓存]
end
end
subgraph Storage["存储层"]
direction TB
Active[.mcap.active]
Complete[.mcap]
Sidecar[.idx/.chm/.schema]
end
App --> W_API
App --> R_API
W_API --> Chunking
Chunking --> Compression
Compression --> Sharding
Sharding --> Active
R_API --> Filtering
Filtering --> IndexOpt
IndexOpt --> Recovery
Recovery --> Complete
Recovery --> Sidecar
Chunking --> ChunkIdx
Chunking --> MsgIdx
Reader --> SchemaIdx
架构设计理念:
该架构采用分层设计,在写入和读取路径之间分离关注点,同时维护共享的索引结构。写入模块专注于通过分块和压缩实现高效的数据摄取,而读取模块通过智能过滤和基于索引的优化优先考虑查询性能。存储层使用附属文件模式(.idx、.chm、.schema)来支持从不完整录制中恢复,而无需完整文件扫描。
写入路径数据流
了解数据如何通过写入路径有助于解释分片和分块行为:
sequenceDiagram
participant Client as 客户端代码
participant Writer as McapWriter
participant Buffer as 块缓冲区
participant Sharding as 分片逻辑
participant File as 文件系统
Client->>Writer: write_message(数据, 时间戳)
alt 缓冲区满或大小阈值
Writer->>Buffer: flush_chunk()
Buffer->>File: 写入压缩块
alt 触发分片
Writer->>Sharding: should_shard()
Sharding->>File: close_current()
Sharding->>File: open_new_shard()
Sharding-->>Writer: 新文件句柄
end
else 缓冲区未满
Writer->>Buffer: 缓冲消息
end
Client->>Writer: close()
Writer->>Buffer: flush_all()
Buffer->>File: 写入剩余块
Writer->>File: 写入页脚并重命名
写入路径设计原理:
写入路径使用延迟刷新策略以最大化压缩效率。消息在内存中缓冲,直到达到块大小阈值或文件大小/时间限制触发分片。这种批处理方法允许压缩算法找到更好的模式,相比单消息压缩。分片逻辑集成到刷新路径中以确保数据一致性 - 当达到分片边界时,所有待处理数据在切换到新文件之前被刷新。
带消息索引优化的读取路径
读取路径展示了消息索引优化如何实现高效的时间范围查询:
flowchart LR
subgraph Query["查询输入"]
TimeRange[时间范围]
Topics[主题过滤]
end
subgraph Optimization["消息索引优化"]
direction TB
ParseIdx[从页脚解析<br/>块索引]
FilterChunks[按时间范围<br/>过滤块]
CheckMsgIdx[检查每个块的<br/>消息索引]
SkipChunks[跳过不含<br/>相关消息的块]
end
subgraph Reading["数据读取"]
direction TB
Decompress[解压<br/>相关块]
FilterMsgs[按主题<br/>过滤消息]
Return[返回客户端]
end
TimeRange --> ParseIdx
Topics --> FilterChunks
ParseIdx --> FilterChunks
FilterChunks --> CheckMsgIdx
CheckMsgIdx --> SkipChunks
SkipChunks --> Decompress
Decompress --> FilterMsgs
FilterMsgs --> Return
优化原理:
传统 MCAP 读取器必须解压缩所有块才能找到特定时间范围内的消息。借助消息索引优化,读取器可以:
- 从页脚读取块索引以获取块元数据(时间范围、偏移量)
- 根据查询的时间范围过滤块
- 对于每个候选块,检查其消息索引以查看是否包含相关通道的消息
- 跳过不含相关消息的块(避免解压缩)
- 只解压缩包含匹配消息的块
这在通道数量多但查询只涉及少数通道的场景中提供了显著的性能提升。
API 文档
- API 参考 - 完整的 C++ API 文档,包括写入器、读取器、过滤器和查询 API
- Query 使用指南 - SQL-like 查询语法详细说明和示例
- Query 架构设计 - Query 功能实现原理和架构
- 查询测试报告 - 性能测试结果和边界验证
Query 功能
libmmcap 提供独有的 SQL-like 查询功能,支持:
- 字段选择:
SELECT topic, data, log_time - 条件过滤:
WHERE topic=/xxx AND data.level=2 - 时间范围:
WHERE log_time>1769884680000000000 - 正则匹配:
WHERE topic~/sensor/ - JSON 嵌套:
WHERE data.position.x>100 - 数组操作:
WHERE data.packages SIZE = 21 - 数组包含:
WHERE data.packages[*].name CONTAINS 'xxx' - 并行查询:通过线程池加速处理
CLI 示例:
# 基础查询
mmcap query file.mcap "SELECT topic,data.level WHERE topic=/logger_out AND data.level=2 LIMIT 100"
# 并行查询(4 线程)
mmcap query -j 4 file.mcap "SELECT topic,data.level WHERE topic=/logger_out LIMIT 100"
# 自动检测线程数
mmcap query -j 0 file.mcap "SELECT topic,data.level WHERE topic=/logger_out LIMIT 100"
C++ API 示例:
auto opts = parse_query("SELECT topic,data.level WHERE topic=/logger_out LIMIT 100");
// 串行查询(默认)
query_ex(reader, opts, [](const QueryRow& row) {
std::cout << row.at("topic") << ": " << row.at("data.level") << "\n";
return true;
});
// 并行查询(4 线程)- 性能提升 10-17x
query_ex(reader, opts, [](const QueryRow& row) {
std::cout << row.at("topic") << ": " << row.at("data.level") << "\n";
return true;
}, 4); // 第 4 个参数指定线程数
// 自动检测硬件线程数
query_ex(reader, opts, callback, 0);
并行查询参数说明:
threads = 1(默认): 单线程串行处理threads = 0: 自动检测硬件并发数threads > 1: 使用指定数量的线程并行处理(最大限制为 8)
详见 Query 使用指南。
示例代码
更多示例,请参见 docs/EXAMPLES.md。
性能
并行查询性能
在多 chunk 文件上使用不同线程数测试(598,503 条消息,1327 个 chunks):
| 线程数 | 时间 | 吞吐量 | 内存占用 | 加速比 |
|---|---|---|---|---|
| 1 (串行) | 4.94s | 117K msg/s | 103 MB | 1.0x |
| 2 | 0.37s | 1.96M msg/s | 106 MB | 13.4x |
| 4 | 0.34s | 2.14M msg/s | 108 MB | 14.5x |
| 8 | 0.34s | 2.14M msg/s | 111 MB | 14.5x |
关键发现:
- 速度:14.5x 加速(4.94s → 0.34s)
- 内存:仅增长 7.8%(103 → 111 MB)
- 线程上限:
max(8, CPU核心数),低配机器也能用多线程
CLI 使用示例:
# 串行查询(默认)
time mmcap query -j 1 file.mcap "SELECT topic,data WHERE topic=/logger_out"
# real 0m4.94s
# 4 线程并行
time mmcap query -j 4 file.mcap "SELECT topic,data WHERE topic=/logger_out"
# real 0m0.34s (14.5x 加速)
C++ API 使用:
// 串行(默认)
query_ex(reader, opts, callback); // 或 query_ex(reader, opts, callback, 1);
// 并行 4 线程
query_ex(reader, opts, callback, 4);
// 自动检测硬件线程数
query_ex(reader, opts, callback, 0);
消息索引优化影响
在包含 100 个通道但只有 2 个被查询的文件上测试:
| 指标 | 传统读取 | 优化读取 | 提升 |
|---|---|---|---|
| 扫描块数 | 100 | 100 | - |
| 解压块数 | 100 | 2 | 98% ↓ |
| 查询时间 | 10.2s | 0.3s | 34x ↑ |
自动分片开销
分片切换开销(刷新 + 关闭 + 打开):
| 操作 | 耗时 |
|---|---|
| 刷新待处理数据 | ~5ms |
| 关闭当前文件 | ~2ms |
| 打开新文件 | ~3ms |
| 总计 | ~10ms |
构建
依赖
- C++17 兼容编译器
- CMake 3.16+
- Catch2 3.x(用于测试,如未找到则自动获取)
构建步骤
mkdir build && cd build
cmake ..
cmake --build .
CMake 选项
| 选项 | 默认值 | 说明 |
|---|---|---|
LIBMMCAP_BUILD_TESTS |
ON | 构建测试 |
LIBMMCAP_BUILD_EXAMPLES |
OFF | 构建示例代码 |
LIBMMCAP_BUILD_SHARED |
OFF | 构建动态库(默认静态库) |
LIBMMCAP_BUILD_CLI |
ON | 构建 CLI 工具 |
LIBMMCAP_BUILD_DEB |
OFF | 生成 DEB 安装包 |
LIBMMCAP_USE_ZSTD |
ON | 启用 zstd 压缩 |
示例:
# 构建动态库 + CLI,不构建测试
cmake .. -DLIBMMCAP_BUILD_SHARED=ON -DLIBMMCAP_BUILD_TESTS=OFF
# 生成 DEB 包
cmake .. -DLIBMMCAP_BUILD_DEB=ON
make && cpack -G DEB
测试
cd build
ctest --output-on-failure
代码风格
- K&R 括号(同行)
- 4 空格缩进
- 120 字符行长度限制
- 函数/变量使用 snake_case
- 类型使用 PascalCase
- 私有成员使用尾部下划线
使用 clang-format 格式化:
clang-format -i src/*.cpp include/**/*.h
许可证
MIT