0%

OpenDDS QoS 策略完整指南

OpenDDS QoS 策略完整指南

概述

Quality of Service (QoS) 策略是 OpenDDS 中控制实体行为的关键机制。本指南基于 OpenDDS 官方文档和源代码分析,提供完整的 QoS 策略分类、适用性分析和配置指南。 ## QoS 策略完整分类表

1. 可靠性相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Reliability QoS Topic, DataWriter, DataReader DataWriter: RELIABLE
其他: BEST_EFFORT
不可变 影响关联 控制数据传输的可靠性(尽力而为 vs 可靠)
History QoS Topic, DataWriter, DataReader KEEP_LAST, depth=1 不可变 不影响关联 控制实例历史记录的保留策略
Resource Limits QoS Topic, DataWriter, DataReader 无限制 不可变 不影响关联 限制资源使用(最大样本数、实例数等)

2. 实时性相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Deadline QoS Topic, DataWriter, DataReader 无限期 可变 影响关联 设置数据发布/订阅的截止期限
Latency Budget QoS Topic, DataWriter, DataReader 零延迟 可变 影响关联 指定延迟预算,用于监控目的
Transport Priority QoS Topic, DataWriter 0 不可变 不影响关联 控制传输优先级(线程和 DiffServ)
Time Based Filter QoS DataReader 零间隔 可变 不影响关联 基于时间间隔过滤数据更新

3. 持久性相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Durability QoS Topic, DataWriter, DataReader VOLATILE 不可变 影响关联 控制数据的持久性级别
Durability Service QoS Topic, DataWriter 零清理延迟 不可变 不影响关联 配置持久性服务的缓存参数
Lifespan QoS Topic, DataWriter 无限期 可变 不影响关联 设置样本的过期时间

4. 发现和关联相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Liveliness QoS Topic, DataWriter, DataReader AUTOMATIC, 无限期 不可变 影响关联 控制活跃性检测机制
Partition QoS Publisher, Subscriber 空序列 可变 影响关联 创建逻辑分区进行隔离
Ownership QoS Topic, DataWriter, DataReader SHARED 不可变 影响关联 控制实例的所有权模式
Ownership Strength QoS DataWriter 0 可变 不影响关联 设置独占所有权时的强度值

5. 数据管理相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Destination Order QoS Topic, DataWriter, DataReader BY_RECEPTION_TIMESTAMP 不可变 影响关联 控制样本的排序方式
Presentation QoS Publisher, Subscriber INSTANCE, false, false 不可变 影响关联 控制数据表示的顺序和范围
Writer Data Lifecycle QoS DataWriter true 可变 不影响关联 控制写入器实例的生命周期
Reader Data Lifecycle QoS DataReader 无限期 可变 不影响关联 控制读取器资源的自动清理

6. 应用数据相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
User Data QoS DomainParticipant, DataWriter, DataReader 空序列 可变 不影响关联 附加用户自定义数据到实体
Topic Data QoS Topic 空序列 可变 不影响关联 附加用户自定义数据到主题
Group Data QoS Publisher, Subscriber 空序列 可变 不影响关联 附加用户自定义数据到组

7. 配置相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Entity Factory QoS DomainParticipantFactory, DomainParticipant, Publisher, Subscriber true 可变 不影响关联 控制实体创建时是否自动启用
Property QoS DomainParticipant 空序列 可变 间接影响 包含键值对属性(主要用于安全)

8. 类型系统相关策略

策略名称 适用实体 默认值 可修改性 关联影响 功能描述
Data Representation QoS DataWriter, DataReader 传输相关 不可变 影响关联 定义数据编码表示方式
Type Consistency Enforcement QoS DataReader ALLOW_TYPE_COERCION 不可变 影响关联 控制类型一致性强制规则

实体适用性矩阵

QoS 策略 DomainParticipant Publisher Subscriber Topic DataWriter DataReader
User Data
Group Data
Topic Data
Partition
Durability
Durability Service
Deadline
Latency Budget
Liveliness
Reliability
Destination Order
History
Resource Limits
Transport Priority
Lifespan
Ownership
Ownership Strength
Presentation
Time Based Filter
Writer Data Lifecycle
Reader Data Lifecycle
Entity Factory
Property
Data Representation
Type Consistency Enforcement

典型应用场景和配置指南

1. 实时控制系统

特点:低延迟、高优先级、严格的时间要求

推荐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 数据写入器配置
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);

// 低延迟配置
dw_qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS; // 优先考虑延迟
dw_qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
dw_qos.history.depth = 1; // 只保留最新数据

// 实时性要求
dw_qos.deadline.period.sec = 0;
dw_qos.deadline.period.nanosec = 50000000; // 50ms 截止期限

dw_qos.transport_priority.value = 10; // 高传输优先级

// 数据读取器配置
DDS::DataReaderQos dr_qos;
sub->get_default_datareader_qos(dr_qos);

dr_qos.time_based_filter.minimum_separation.sec = 0;
dr_qos.time_based_filter.minimum_separation.nanosec = 20000000; // 20ms 过滤间隔

适用场景: - 机器人控制 - 自动驾驶系统 - 工业自动化 - 游戏服务器

2. 可靠数据传输系统

特点:数据完整性、不丢失、可恢复

推荐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 数据写入器配置
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);

// 高可靠性配置
dw_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
dw_qos.reliability.max_blocking_time.sec = 10; // 10秒阻塞时间

// 历史记录配置
dw_qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS; // 保留所有历史

// 资源限制
dw_qos.resource_limits.max_samples = 1000;
dw_qos.resource_limits.max_instances = 100;
dw_qos.resource_limits.max_samples_per_instance = 100;

// 持久性配置
dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS; // 本地持久化

// 数据读取器配置
DDS::DataReaderQos dr_qos;
sub->get_default_datareader_qos(dr_qos);

dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec = 30; // 30秒后清理

适用场景: - 金融交易系统 - 医疗数据记录 - 日志收集系统 - 配置管理系统

3. 分布式缓存系统

特点:数据持久化、缓存管理、生命周期控制

推荐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 数据写入器配置
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);

// 持久性配置
dw_qos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS; // 进程级持久化

// 持久性服务配置
dw_qos.durability_service.history_kind = DDS::KEEP_LAST_HISTORY_QOS;
dw_qos.durability_service.history_depth = 100;
dw_qos.durability_service.max_samples = 10000;
dw_qos.durability_service.max_instances = 1000;
dw_qos.durability_service.max_samples_per_instance = 100;

// 生命周期配置
dw_qos.lifespan.duration.sec = 3600; // 1小时过期
dw_qos.lifespan.duration.nanosec = 0;

// 所有权配置(用于故障转移)
dw_qos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
dw_qos.ownership_strength.value = 1; // 所有权强度

适用场景: - 分布式数据库缓存 - 会话状态管理 - 配置服务器 - 服务发现注册表

4. 容错和高可用系统

特点:故障检测、自动恢复、负载均衡

推荐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 活跃性配置
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);

dw_qos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
dw_qos.liveliness.lease_duration.sec = 5; // 5秒租约期限

// 所有权配置
dw_qos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
dw_qos.ownership_strength.value = 100; // 高所有权强度

// 截止期限监控
dw_qos.deadline.period.sec = 2;
dw_qos.deadline.period.nanosec = 0; // 2秒截止期限

// 数据读取器配置
DDS::DataReaderQos dr_qos;
sub->get_default_datareader_qos(dr_qos);

// 读取器数据生命周期
dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec = 10; // 10秒清理
dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec = 5; // 5秒清理

适用场景: - 主备切换系统 - 负载均衡集群 - 监控告警系统 - 服务网格

5. 大数据量流处理系统

特点:高吞吐量、内存管理、数据过滤

推荐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 数据写入器配置
DDS::DataWriterQos dw_qos;
pub->get_default_datawriter_qos(dw_qos);

// 资源限制配置
dw_qos.resource_limits.max_samples = 10000;
dw_qos.resource_limits.max_instances = 1000;
dw_qos.resource_limits.max_samples_per_instance = 100;

// 历史记录配置
dw_qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
dw_qos.history.depth = 10; // 保留最近10个样本

// 目标顺序配置
dw_qos.destination_order.kind = DDS::BY_RECEPTION_TIMESTAMP_DESTINATIONORDER_QOS;

// 数据读取器配置
DDS::DataReaderQos dr_qos;
sub->get_default_datareader_qos(dr_qos);

// 基于时间的过滤
dr_qos.time_based_filter.minimum_separation.sec = 0;
dr_qos.time_based_filter.minimum_separation.nanosec = 100000000; // 100ms 过滤间隔

// 分区配置(用于负载均衡)
DDS::PublisherQos pub_qos;
participant->get_default_publisher_qos(pub_qos);
pub_qos.partition.name.length(1);
pub_qos.partition.name[0] = CORBA::string_dup("high_throughput");

适用场景: - 物联网数据采集 - 日志分析系统 - 实时监控 - 市场数据流

详细配置示例

1. 完整的发布者配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 创建域参与者
DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);
DDS::DomainParticipant_var participant =
dpf->create_participant(domainId, PARTICIPANT_QOS_DEFAULT, 0, 0);

// 配置发布者 QoS
DDS::PublisherQos pub_qos;
participant->get_default_publisher_qos(pub_qos);

// 分区配置
pub_qos.partition.name.length(2);
pub_qos.partition.name[0] = CORBA::string_dup("sensors");
pub_qos.partition.name[1] = CORBA::string_dup("control");

// 组数据配置
pub_qos.group_data.value.length(16);
for (int i = 0; i < 16; ++i) {
pub_qos.group_data.value[i] = i; // 示例数据
}

// 表示配置
pub_qos.presentation.access_scope = DDS::TOPIC_PRESENTATION_QOS;
pub_qos.presentation.coherent_access = true;
pub_qos.presentation.ordered_access = true;

// 创建发布者
DDS::Publisher_var publisher = participant->create_publisher(pub_qos, 0, 0);

// 配置数据写入器 QoS
DDS::DataWriterQos dw_qos;
publisher->get_default_datawriter_qos(dw_qos);

// 可靠性配置
dw_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
dw_qos.reliability.max_blocking_time.sec = 5;

// 历史记录配置
dw_qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;

// 资源限制
dw_qos.resource_limits.max_samples = 1000;
dw_qos.resource_limits.max_instances = 100;
dw_qos.resource_limits.max_samples_per_instance = 10;

// 活跃性配置
dw_qos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
dw_qos.liveliness.lease_duration.sec = 10;

// 截止期限
dw_qos.deadline.period.sec = 1;

// 持久性
dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;

// 目标顺序
dw_qos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;

// 用户数据
dw_qos.user_data.value.length(32);
for (int i = 0; i < 32; ++i) {
dw_qos.user_data.value[i] = i % 256;
}

// 创建数据写入器
DDS::DataWriter_var writer = publisher->create_datawriter(topic, dw_qos, 0, 0);

2. 完整的订阅者配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// 创建订阅者
DDS::SubscriberQos sub_qos;
participant->get_default_subscriber_qos(sub_qos);

// 分区配置(必须与发布者匹配才能接收数据)
sub_qos.partition.name.length(1);
sub_qos.partition.name[0] = CORBA::string_dup("sensors");

DDS::Subscriber_var subscriber = participant->create_subscriber(sub_qos, 0, 0);

// 配置数据读取器 QoS
DDS::DataReaderQos dr_qos;
subscriber->get_default_datareader_qos(dr_qos);

// 可靠性配置(必须与写入器兼容)
dr_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;

// 历史记录配置
dr_qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;

// 资源限制
dr_qos.resource_limits.max_samples = 1000;
dr_qos.resource_limits.max_instances = 100;
dr_qos.resource_limits.max_samples_per_instance = 10;

// 活跃性配置
dr_qos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
dr_qos.liveliness.lease_duration.sec = 10;

// 截止期限(必须大于或等于写入器的截止期限)
dr_qos.deadline.period.sec = 2;

// 持久性
dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;

// 目标顺序
dr_qos.destination_order.kind = DDS::BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS;

// 基于时间的过滤
dr_qos.time_based_filter.minimum_separation.sec = 0;
dr_qos.time_based_filter.minimum_separation.nanosec = 100000000; // 100ms

// 读取器数据生命周期
dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec = 30;
dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec = 15;

// 用户数据
dr_qos.user_data.value.length(16);
for (int i = 0; i < 16; ++i) {
dr_qos.user_data.value[i] = (i + 1) % 256;
}

// 创建监听器
class DataReaderListener : public DDS::DataReaderListener {
public:
virtual void on_data_available(DDS::DataReader_ptr reader) {
// 处理数据
}

virtual void on_requested_deadline_missed(
DDS::DataReader_ptr reader,
const DDS::RequestedDeadlineMissedStatus& status) {
std::cout << "Deadline missed: " << status.total_count << std::endl;
}

virtual void on_liveliness_changed(
DDS::DataReader_ptr reader,
const DDS::LivelinessChangedStatus& status) {
std::cout << "Liveliness changed: "
<< status.alive_count << " alive, "
<< status.not_alive_count << " not alive" << std::endl;
}
};

// 创建带监听器的数据读取器
DDS::DataReaderListener_var listener(new DataReaderListener());
DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS |
DDS::REQUESTED_DEADLINE_MISSED_STATUS |
DDS::LIVELINESS_CHANGED_STATUS;

DDS::DataReader_var reader = subscriber->create_datareader(
topic, dr_qos, listener, mask);

OpenDDS 特有扩展和限制

1. 传输优先级策略的限制

OpenDDS 特有行为: - 仅对 TCP 和 UDP 传输实现 - 不可变(与 DDS 规范不同) - 映射到线程优先级和 DiffServ 代码点

配置示例

1
dw_qos.transport_priority.value = 10;  // 0-63,值越高优先级越高

2. 数据表示策略的默认值

OpenDDS 特有默认值: - RTPS/UDP 传输:DataWriter 默认 XCDR2,DataReader 默认接受 XCDR 和 XCDR2 - 其他传输:默认使用 UNALIGNED_CDR

配置示例

1
2
3
4
// 显式配置数据表示
dw_qos.representation.value.length(2);
dw_qos.representation.value[0] = DDS::XCDR2_DATA_REPRESENTATION;
dw_qos.representation.value[1] = DDS::XCDR_DATA_REPRESENTATION;

3. 类型一致性强制策略的支持

OpenDDS 限制: - 仅支持 ignore_member_names 成员 - 仅在使用 RTPS 发现时支持 - 其他成员应保持默认值

配置示例

1
dr_qos.type_consistency.ignore_member_names = true;  // 仅按成员 ID 匹配

4. 安全相关的属性策略

OpenDDS 扩展: - 主要用于 DDS 安全配置 - 可以通过属性配置安全凭证和策略

配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DDS::DomainParticipantQos dp_qos;
dpf->get_default_participant_qos(dp_qos);

dp_qos.property.value.length(3);
dp_qos.property.value[0].name = "dds.sec.auth.identity_ca";
dp_qos.property.value[0].value = "file:identity_ca.pem";
dp_qos.property.value[0].propagate = false;

dp_qos.property.value[1].name = "dds.sec.auth.private_key";
dp_qos.property.value[1].value = "file:private_key.pem";
dp_qos.property.value[1].propagate = false;

dp_qos.property.value[2].name = "dds.sec.auth.identity_certificate";
dp_qos.property.value[2].value = "file:identity_certificate.pem";
dp_qos.property.value[2].propagate = false;

最佳实践总结

1. 性能优化建议

内存管理: - 根据数据量设置合理的 Resource Limits - 使用 KEEP_LAST 历史策略并设置适当的深度 - 配置 Reader Data Lifecycle 自动清理过期数据

网络优化: - 使用 Partition 进行逻辑隔离,减少不必要的网络流量 - 配置 Transport Priority 对关键数据进行优先级处理 - 考虑使用共享内存传输进行本地通信

延迟优化: - 实时系统使用 BEST_EFFORT 可靠性 - 配置适当的 Deadline 进行时间监控 - 使用 Time Based Filter 减少数据处理频率

2. 可靠性设计建议

数据完整性: - 关键系统使用 RELIABLE 可靠性 - 配置适当的 max_blocking_time 避免无限阻塞 - 使用 KEEP_ALL 历史策略确保数据不丢失

容错设计: - 配置 Liveliness 检测进行故障发现 - 使用 EXCLUSIVE_OWNERSHIPOwnership Strength 实现主备切换 - 设置适当的 Durability 级别确保数据可恢复

监控和告警: - 实现监听器处理 on_requested_deadline_missed - 监控 on_liveliness_changed 进行健康检查 - 处理 on_sample_rejected 了解数据丢失情况

3. 配置管理建议

代码组织: - 创建 QoS 配置工厂函数,便于重用 - 使用配置文件管理不同环境的 QoS 设置 - 实现 QoS 配置验证,确保兼容性

测试策略: - 测试不同 QoS 配置下的性能表现 - 验证 QoS 兼容性规则 - 测试故障恢复场景

文档和维护: - 记录 QoS 配置决策和原因 - 定期审查和优化 QoS 设置 - 建立 QoS 配置变更管理流程

QoS 兼容性规则

1. 关联建立的基本规则

数据写入器(DataWriter)和数据读取器(DataReader)要建立关联,必须满足以下兼容性规则:

QoS 策略 兼容性规则 说明
Liveliness Writer.kind ≥ Reader.kind
Writer.lease_duration ≤ Reader.lease_duration
写入器必须提供不低于读取器要求的活跃性级别
Reliability Writer.kind ≥ Reader.kind 可靠读取器只能关联可靠写入器
Durability Writer.kind ≥ Reader.kind 写入器必须提供不低于读取器要求的持久性级别
Deadline Reader.period ≥ Writer.period 读取器截止期限必须大于等于写入器截止期限
Latency Budget Writer.duration ≥ Reader.duration 写入器延迟预算必须大于等于读取器延迟预算
Destination Order Writer.kind ≥ Reader.kind 写入器排序方式必须不低于读取器要求
Ownership Writer.kind == Reader.kind 所有权模式必须相同
Partition 至少有一个分区名称匹配 使用通配符进行匹配
Presentation Writer.access_scope ≥ Reader.access_scope
Writer.coherent_access ≤ Reader.coherent_access
Writer.ordered_access ≤ Reader.ordered_access
复杂的多层兼容性规则

2. 兼容性检查工具

创建 QoS 兼容性检查函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
bool check_qos_compatibility(
const DDS::DataWriterQos& writer_qos,
const DDS::DataReaderQos& reader_qos) {

// 检查可靠性
if (reader_qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS &&
writer_qos.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS) {
return false;
}

// 检查持久性
if (reader_qos.durability.kind > writer_qos.durability.kind) {
return false;
}

// 检查活跃性
if (reader_qos.liveliness.kind > writer_qos.liveliness.kind) {
return false;
}

if (writer_qos.liveliness.lease_duration.sec >
reader_qos.liveliness.lease_duration.sec) {
return false;
}

// 检查截止期限
if (reader_qos.deadline.period.sec < writer_qos.deadline.period.sec) {
return false;
}

// 检查所有权
if (reader_qos.ownership.kind != writer_qos.ownership.kind) {
return false;
}

return true;
}

实用配置模板

1. 实时传感器数据模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 实时传感器数据配置
DDS::DataWriterQos create_sensor_writer_qos(DDS::Publisher_ptr publisher) {
DDS::DataWriterQos qos;
publisher->get_default_datawriter_qos(qos);

// 低延迟配置
qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
qos.history.depth = 1;

// 实时性要求
qos.deadline.period.sec = 0;
qos.deadline.period.nanosec = 100000000; // 100ms

qos.transport_priority.value = 5;

// 资源限制
qos.resource_limits.max_samples = 100;
qos.resource_limits.max_instances = 50;
qos.resource_limits.max_samples_per_instance = 2;

return qos;
}

DDS::DataReaderQos create_sensor_reader_qos(DDS::Subscriber_ptr subscriber) {
DDS::DataReaderQos qos;
subscriber->get_default_datareader_qos(qos);

// 兼容性配置
qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
qos.deadline.period.sec = 0;
qos.deadline.period.nanosec = 200000000; // 200ms(大于写入器)

// 数据过滤
qos.time_based_filter.minimum_separation.sec = 0;
qos.time_based_filter.minimum_separation.nanosec = 50000000; // 50ms

return qos;
}

2. 可靠命令控制模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 可靠命令控制配置
DDS::DataWriterQos create_command_writer_qos(DDS::Publisher_ptr publisher) {
DDS::DataWriterQos qos;
publisher->get_default_datawriter_qos(qos);

// 高可靠性配置
qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
qos.reliability.max_blocking_time.sec = 5;

qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;

// 持久性配置
qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;

// 资源限制
qos.resource_limits.max_samples = 1000;
qos.resource_limits.max_instances = 100;
qos.resource_limits.max_samples_per_instance = 10;

// 所有权配置(用于主备切换)
qos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
qos.ownership_strength.value = 100;

return qos;
}

DDS::DataReaderQos create_command_reader_qos(DDS::Subscriber_ptr subscriber) {
DDS::DataReaderQos qos;
subscriber->get_default_datareader_qos(qos);

// 兼容性配置
qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
qos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;

// 数据生命周期管理
qos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec = 30;
qos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec = 15;

return qos;
}

3. 配置管理工厂类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class QosFactory {
public:
enum Profile {
REAL_TIME_SENSOR,
RELIABLE_COMMAND,
DISTRIBUTED_CACHE,
HIGH_AVAILABILITY,
BIG_DATA_STREAM
};

static DDS::DataWriterQos create_writer_qos(
DDS::Publisher_ptr publisher,
Profile profile) {

DDS::DataWriterQos qos;
publisher->get_default_datawriter_qos(qos);

switch (profile) {
case REAL_TIME_SENSOR:
qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
qos.history.depth = 1;
qos.deadline.period.nanosec = 100000000;
break;

case RELIABLE_COMMAND:
qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
qos.reliability.max_blocking_time.sec = 5;
qos.history.kind = DDS::KEEP_ALL_HISTORY_QOS;
qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
break;

case DISTRIBUTED_CACHE:
qos.durability.kind = DDS::TRANSIENT_DURABILITY_QOS;
qos.lifespan.duration.sec = 3600;
qos.durability_service.history_depth = 100;
break;

case HIGH_AVAILABILITY:
qos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
qos.liveliness.lease_duration.sec = 5;
qos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
qos.ownership_strength.value = 100;
break;

case BIG_DATA_STREAM:
qos.resource_limits.max_samples = 10000;
qos.resource_limits.max_instances = 1000;
qos.resource_limits.max_samples_per_instance = 100;
qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
qos.history.depth = 10;
break;
}

return qos;
}

static DDS::DataReaderQos create_reader_qos(
DDS::Subscriber_ptr subscriber,
Profile profile) {

DDS::DataReaderQos qos;
subscriber->get_default_datareader_qos(qos);

switch (profile) {
case REAL_TIME_SENSOR:
qos.reliability.kind = DDS::BEST_EFFORT_RELIABILITY_QOS;
qos.time_based_filter.minimum_separation.nanosec = 50000000;
break;

case RELIABLE_COMMAND:
qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
qos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec = 30;
break;

case DISTRIBUTED_CACHE:
// 使用默认配置
break;

case HIGH_AVAILABILITY:
qos.liveliness.kind = DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS;
qos.ownership.kind = DDS::EXCLUSIVE_OWNERSHIP_QOS;
break;

case BIG_DATA_STREAM:
qos.time_based_filter.minimum_separation.nanosec = 100000000;
break;
}

return qos;
}
};

故障排除指南

1. 常见 QoS 问题

问题1:数据写入器和读取器无法关联

可能原因: - QoS 不兼容(最常见) - 分区不匹配 - 主题类型不匹配

解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 检查 QoS 兼容性
std::cout << "Writer Reliability: " << writer_qos.reliability.kind << std::endl;
std::cout << "Reader Reliability: " << reader_qos.reliability.kind << std::endl;
std::cout << "Writer Durability: " << writer_qos.durability.kind << std::endl;
std::cout << "Reader Durability: " << reader_qos.durability.kind << std::endl;

// 实现监听器检测不兼容的 QoS
class IncompatibleQosListener : public DDS::DataReaderListener {
virtual void on_requested_incompatible_qos(
DDS::DataReader_ptr reader,
const DDS::RequestedIncompatibleQosStatus& status) {

std::cout << "Incompatible QoS detected!" << std::endl;
for (CORBA::ULong i = 0; i < status.policies.length(); ++i) {
std::cout << "Policy ID: " << status.policies[i].policy_id
<< ", Count: " << status.policies[i].count << std::endl;
}
}
};

问题2:数据丢失或延迟过高

可能原因: - 资源限制过小 - 网络拥堵 - QoS 配置不当

解决方案

1
2
3
4
5
6
7
8
9
// 调整资源限制
dw_qos.resource_limits.max_samples = 1000; // 增加缓存大小
dw_qos.resource_limits.max_samples_per_instance = 100;

// 调整可靠性设置
dw_qos.reliability.max_blocking_time.sec = 10; // 增加阻塞时间

// 使用传输优先级
dw_qos.transport_priority.value = 10;

问题3:内存使用持续增长

可能原因: - 未配置 Reader Data Lifecycle - Resource Limits 设置过大 - 数据未及时清理

解决方案

1
2
3
4
5
6
7
8
// 配置自动清理
dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay.sec = 30;
dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay.sec = 15;

// 设置合理的资源限制
dr_qos.resource_limits.max_samples = 1000;
dr_qos.resource_limits.max_instances = 100;
dr_qos.resource_limits.max_samples_per_instance = 10;

2. 调试和监控

启用调试输出

1
2
3
4
5
6
7
8
9
// 设置 OpenDDS 调试级别
OpenDDS::DCPS::set_DCPS_debug_level(5);

// 启用传输调试
TheTransportRegistry->set_log_level(3);

// 环境变量方式
// export DCPSDebugLevel=5
// export ACE_LOG_MSG=/tmp/opendds.log

实现监控监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MonitoringListener : public DDS::DataReaderListener {
virtual void on_sample_rejected(
DDS::DataReader_ptr reader,
const DDS::SampleRejectedStatus& status) {

std::cout << "Samples rejected: " << status.total_count
<< ", Reason: " << status.last_reason << std::endl;
}

virtual void on_sample_lost(
DDS::DataReader_ptr reader,
const DDS::SampleLostStatus& status) {

std::cout << "Samples lost: " << status.total_count << std::endl;
}

virtual void on_subscription_matched(
DDS::DataReader_ptr reader,
const DDS::SubscriptionMatchedStatus& status) {

std::cout << "Subscription matched: "
<< status.current_count << " current, "
<< status.total_count << " total" << std::endl;
}
};

总结

OpenDDS 的 QoS 策略提供了强大的机制来控制分布式系统的行为。通过合理配置 QoS,可以实现:

  1. 性能优化:根据应用需求平衡延迟和可靠性
  2. 资源管理:控制内存和网络资源的使用
  3. 容错设计:实现故障检测和自动恢复
  4. 数据管理:控制数据的生命周期和持久性
  5. 系统隔离:使用分区进行逻辑隔离

关键建议: - 从简单配置开始,根据需要逐步调整 - 测试不同 QoS 配置下的系统行为 - 实现监控和告警机制 - 记录配置决策和变更历史 - 定期审查和优化 QoS 设置

通过本指南,您应该能够: 1. 理解 OpenDDS 支持的所有 QoS 策略 2. 为不同应用场景选择合适的 QoS 配置 3. 避免常见的 QoS 兼容性问题 4. 实现高效的分布式系统设计

OpenDDS 的 QoS 系统虽然复杂,但提供了极大的灵活性。通过深入理解和合理应用,可以构建出高性能、可靠、可维护的分布式系统。