0%

OpenDDS 学习使用指南

OpenDDS 学习使用指南

基于 OpenDDS 项目源代码和文档的全面学习路径

OpenDDS 概述

什么是 OpenDDS?

OpenDDS 是一个开源的 C++ 实现,实现了对象管理组(OMG)的”实时系统数据分发服务(DDS)“规范。它是一个基于发布-订阅和分布式缓存模型的分布式应用程序开发框架。

关键特性

  • 跨平台:基于 ACE 抽象层,提供平台可移植性
  • 多语言支持:C++ 实现,提供 Java 绑定
  • 标准兼容:实现 DDS 1.4 规范和相关标准
  • 安全支持:支持 DDS Security 规范
  • 类型系统:支持 XTypes 规范
  • 多种传输协议:TCP/IP、UDP/IP、IP 多播、RTPS over UDP/IP、共享内存

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
OpenDDS/
├── dds/ # 核心 DDS 实现
│ ├── DCPS/ # 数据分发服务核心
│ ├── InfoRepo/ # 信息仓库实现
│ ├── security/ # 安全功能
│ └── RTPS/ # RTPS 协议实现
├── docs/ # 文档
│ ├── devguide/ # 开发指南
│ ├── design/ # 设计文档
│ └── history/ # 历史文档
├── examples/ # 示例代码
│ └── DCPS/ # DCPS 示例
├── tests/ # 测试套件
├── tools/ # 工具
│ ├── monitor/ # 监控工具
│ ├── dissector/ # Wireshark 解析器
│ └── modeling/ # 建模工具
└── performance-tests/ # 性能测试

核心架构和概念

DDS 基本概念

1. 域(Domain)

  • 分布式系统中的基本分区单位
  • 每个实体属于一个域,只能与同一域中的其他实体交互
  • 通过整数标识符识别

2. 域参与者(Domain Participant)

  • 应用程序在特定域中交互的入口点
  • 工厂对象,用于创建主题、发布者、订阅者等

3. 主题(Topic)

  • 发布者和订阅者之间交互的基本手段
  • 每个主题在域内有唯一名称
  • 支持多对多通信

4. 数据类型(Data Type)

  • 使用 OMG IDL(接口描述语言)定义
  • 支持静态类型和动态类型
  • 可以定义键(Key)字段

5. 发布-订阅模型

  • 发布者(Publisher):创建数据写入器
  • 数据写入器(DataWriter):发布数据样本
  • 订阅者(Subscriber):创建数据读取器
  • 数据读取器(DataReader):接收数据样本

OpenDDS 特定概念

1. 信息仓库(InfoRepo)

  • 集中式发现服务
  • 管理域参与者的注册和发现
  • 支持持久化配置

2. RTPS 发现

  • 基于 RTPS 协议的分布式发现
  • 无需中央服务器
  • 支持互操作性

3. 传输协议

  • TCP/IP:可靠的有连接传输
  • UDP/IP:无连接传输
  • IP 多播:一对多传输
  • RTPS over UDP:标准互操作性协议
  • 共享内存:高性能本地通信

安装和构建

系统要求

必需依赖

  1. ACE/TAO:抽象层和 IDL 编译
  2. Perl:配置脚本和构建工具
  3. C++ 编译器:支持 C++11 或更高版本

可选依赖

  • Xerces-C++:XML 解析
  • OpenSSL:安全功能
  • Qt5:图形界面工具

构建方法

方法1:使用 configure 脚本(传统方法)

1
2
3
4
5
6
7
8
9
# 克隆仓库
git clone https://github.com/OpenDDS/OpenDDS.git
cd OpenDDS

# 运行配置脚本
./configure

# 构建
make

方法2:使用 CMake(推荐)

1
2
3
4
5
6
7
8
# 创建构建目录
mkdir build && cd build

# 配置
cmake .. -DCMAKE_BUILD_TYPE=Release

# 构建
cmake --build .

方法3:使用 Docker(快速开始)

1
2
3
4
5
# 拉取 Docker 镜像
docker pull objectcomputing/opendds

# 运行容器
docker run -it objectcomputing/opendds

平台支持

操作系统

  • Linux(Red Hat, CentOS, Fedora, Debian, Ubuntu, openSUSE)
  • Windows(7, 10, Server 2012+)
  • macOS
  • 嵌入式系统(Android, iOS, VxWorks, LynxOS)

编译器

  • GCC 4.8.5+
  • Clang 3.4+
  • Microsoft Visual C++ 2010+

快速开始

HelloWorld 示例

1. 定义数据类型(IDL)

1
2
3
4
5
6
7
8
9
10
11
// HelloWorld.idl
module HelloWorld {
const long HELLO_WORLD_DOMAIN = 111;

@topic
struct Message {
string value;
};

const string MESSAGE_TOPIC_NAME = "Message";
};

2. 生成类型支持代码

1
2
# 使用 opendds_idl 工具
opendds_idl HelloWorld.idl

3. 发布者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// publisher.cpp
#include "HelloWorldTypeSupportImpl.h"
#include <dds/DCPS/Service_Participant.h>

int main(int argc, char* argv[]) {
// 初始化 DDS
DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);

// 创建域参与者
DDS::DomainParticipant_var participant =
dpf->create_participant(HelloWorld::HELLO_WORLD_DOMAIN,
PARTICIPANT_QOS_DEFAULT,
0, 0);

// 注册类型
HelloWorld::MessageTypeSupport_var ts = new HelloWorld::MessageTypeSupportImpl();
ts->register_type(participant, "");

// 创建主题
DDS::Topic_var topic = participant->create_topic(
HelloWorld::MESSAGE_TOPIC_NAME,
ts->get_type_name(),
TOPIC_QOS_DEFAULT, 0, 0);

// 创建发布者和数据写入器
DDS::Publisher_var publisher = participant->create_publisher(
PUBLISHER_QOS_DEFAULT, 0, 0);

DDS::DataWriter_var writer = publisher->create_datawriter(
topic, DATAWRITER_QOS_DEFAULT, 0, 0);

HelloWorld::MessageDataWriter_var message_writer =
HelloWorld::MessageDataWriter::_narrow(writer);

// 发布消息
HelloWorld::Message message;
message.value = "Hello, OpenDDS!";

message_writer->write(message, DDS::HANDLE_NIL);

// 清理
participant->delete_contained_entities();
dpf->delete_participant(participant);

return 0;
}

4. 订阅者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// subscriber.cpp
#include "HelloWorldTypeSupportImpl.h"
#include <dds/DCPS/Service_Participant.h>
#include <iostream>

class MessageListener : public DDS::DataReaderListener {
public:
virtual void on_data_available(DDS::DataReader_ptr reader) {
HelloWorld::MessageDataReader_var message_reader =
HelloWorld::MessageDataReader::_narrow(reader);

HelloWorld::Message message;
DDS::SampleInfo info;

while (message_reader->take_next_sample(message, info) == DDS::RETCODE_OK) {
if (info.valid_data) {
std::cout << "Received: " << message.value << std::endl;
}
}
}

// 其他回调方法...
};

int main(int argc, char* argv[]) {
// 初始化(与发布者类似)
DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);
DDS::DomainParticipant_var participant =
dpf->create_participant(HelloWorld::HELLO_WORLD_DOMAIN,
PARTICIPANT_QOS_DEFAULT, 0, 0);

// 注册类型和创建主题(与发布者类似)

// 创建订阅者和数据读取器
DDS::Subscriber_var subscriber = participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT, 0, 0);

DDS::DataReader_var reader = subscriber->create_datareader(
topic, DATAREADER_QOS_DEFAULT, new MessageListener(), 0);

// 等待消息
std::cout << "Waiting for messages..." << std::endl;
ACE_OS::sleep(10);

// 清理
participant->delete_contained_entities();
dpf->delete_participant(participant);

return 0;
}

5. 构建和运行

1
2
3
4
5
6
7
8
9
10
11
12
# 构建
opendds_idl HelloWorld.idl
g++ -I. -I$DDS_ROOT -I$ACE_ROOT publisher.cpp *.cpp -lOpenDDS_Dcps -lACE

# 运行信息仓库(如果需要)
$DDS_ROOT/bin/DCPSInfoRepo -ORBEndpoint iiop://localhost:12345

# 运行订阅者
./subscriber -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo

# 运行发布者
./publisher -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo

核心编程模型

1. 应用程序生命周期

初始化阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 1. 初始化服务参与者
DDS::DomainParticipantFactory_var dpf = TheParticipantFactoryWithArgs(argc, argv);

// 2. 创建域参与者
DDS::DomainParticipant_var participant =
dpf->create_participant(domainId, PARTICIPANT_QOS_DEFAULT, 0, 0);

// 3. 注册类型
TypeSupport_var ts = new TypeSupportImpl();
ts->register_type(participant, "");

// 4. 创建主题
DDS::Topic_var topic = participant->create_topic(
topicName, typeName, TOPIC_QOS_DEFAULT, 0, 0);

发布者配置

1
2
3
4
5
6
7
8
9
10
// 1. 创建发布者
DDS::Publisher_var publisher = participant->create_publisher(
PUBLISHER_QOS_DEFAULT, 0, 0);

// 2. 创建数据写入器
DDS::DataWriter_var writer = publisher->create_datawriter(
topic, DATAWRITER_QOS_DEFAULT, 0, 0);

// 3. 窄化到具体类型
MessageDataWriter_var message_writer = MessageDataWriter::_narrow(writer);

订阅者配置

1
2
3
4
5
6
7
8
9
10
// 1. 创建订阅者
DDS::Subscriber_var subscriber = participant->create_subscriber(
SUBSCRIBER_QOS_DEFAULT, 0, 0);

// 2. 创建数据读取器
DDS::DataReader_var reader = subscriber->create_datareader(
topic, DATAREADER_QOS_DEFAULT, listener, mask);

// 3. 窄化到具体类型
MessageDataReader_var message_reader = MessageDataReader::_narrow(reader);

清理阶段

1
2
3
4
5
6
7
8
// 1. 删除包含的实体
participant->delete_contained_entities();

// 2. 删除参与者
dpf->delete_participant(participant);

// 3. 关闭服务
TheServiceParticipant->shutdown();

2. 数据发布和订阅

发布数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 创建数据样本
Message message;
message.id = 1;
message.text = "Sample message";

// 发布数据
DDS::ReturnCode_t ret = message_writer->write(message, DDS::HANDLE_NIL);

// 带实例句柄的发布
DDS::InstanceHandle_t handle = message_writer->register_instance(message);
ret = message_writer->write(message, handle);

// 注销实例
message_writer->unregister_instance(message, handle);

订阅数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 轮询方式读取
MessageSeq messages;
DDS::SampleInfoSeq infos;
DDS::ReturnCode_t ret = message_reader->read(messages, infos, 10,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);

// 逐个读取
Message message;
DDS::SampleInfo info;
while (message_reader->take_next_sample(message, info) == DDS::RETCODE_OK) {
if (info.valid_data) {
// 处理有效数据
}
}

// 条件读取
DDS::ReadCondition_var condition = message_reader->create_readcondition(
DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);

3. 监听器模式

创建监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class CustomListener : public DDS::DataReaderListener {
public:
virtual void on_data_available(DDS::DataReader_ptr reader) {
// 处理数据可用
}

virtual void on_requested_deadline_missed(
DDS::DataReader_ptr reader,
const DDS::RequestedDeadlineMissedStatus& status) {
// 处理截止期限错过
}

virtual void on_requested_incompatible_qos(
DDS::DataReader_ptr reader,
const DDS::RequestedIncompatibleQosStatus& status) {
// 处理不兼容的 QoS
}

virtual void on_sample_rejected(
DDS::DataReader_ptr reader,
const DDS::SampleRejectedStatus& status) {
// 处理样本被拒绝
}

virtual void on_liveliness_changed(
DDS::DataReader_ptr reader,
const DDS::LivelinessChangedStatus& status) {
// 处理活跃性变化
}

virtual void on_subscription_matched(
DDS::DataReader_ptr reader,
const DDS::SubscriptionMatchedStatus& status) {
// 处理订阅匹配
}

virtual void on_sample_lost(
DDS::DataReader_ptr reader,
const DDS::SampleLostStatus& status) {
// 处理样本丢失
}
};

使用监听器

1
2
3
4
5
6
7
8
9
10
11
// 创建监听器实例
DDS::DataReaderListener_var listener(new CustomListener());

// 设置监听器掩码
DDS::StatusMask mask = DDS::DATA_AVAILABLE_STATUS |
DDS::SUBSCRIPTION_MATCHED_STATUS |
DDS::SAMPLE_REJECTED_STATUS;

// 创建带监听器的数据读取器
DDS::DataReader_var reader = subscriber->create_datareader(
topic, DATAREADER_QOS_DEFAULT, listener, mask);

4. 等待集模式

创建等待集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 创建等待集
DDS::WaitSet_var wait_set = new DDS::WaitSet();

// 创建读取条件
DDS::ReadCondition_var read_condition = message_reader->create_readcondition(
DDS::NOT_READ_SAMPLE_STATE,
DDS::NEW_VIEW_STATE,
DDS::ALIVE_INSTANCE_STATE);

// 附加条件到等待集
wait_set->attach_condition(read_condition);

// 等待条件
DDS::ConditionSeq active_conditions;
DDS::Duration_t timeout = {10, 0}; // 10秒超时

DDS::ReturnCode_t ret = wait_set->wait(active_conditions, timeout);

// 处理激活的条件
for (CORBA::ULong i = 0; i < active_conditions.length(); ++i) {
if (active_conditions[i] == read_condition) {
// 读取数据
MessageSeq messages;
DDS::SampleInfoSeq infos;
message_reader->read_w_condition(messages, infos,
DDS::LENGTH_UNLIMITED,
read_condition);
}
}

// 清理
wait_set->detach_condition(read_condition);
message_reader->delete_readcondition(read_condition);

高级特性

1. QoS 策略配置

常用 QoS 策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 配置数据写入器 QoS
DDS::DataWriterQos dw_qos;
publisher->get_default_datawriter_qos(dw_qos);

// 可靠性
dw_qos.reliability.kind = DDS::RELIABLE_RELIABILITY_QOS;
dw_qos.reliability.max_blocking_time.sec = 5;
dw_qos.reliability.max_blocking_time.nanosec = 0;

// 持久性
dw_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;

// 截止期限
dw_qos.deadline.period.sec = 1;
dw_qos.deadline.period.nanosec = 0;

// 活跃性
dw_qos.liveliness.kind = DDS::AUTOMATIC_LIVELINESS_QOS;
dw_qos.liveliness.lease_duration.sec = 5;
dw_qos.liveliness.lease_duration.nanosec = 0;

// 创建带自定义 QoS 的数据写入器
DDS::DataWriter_var writer = publisher->create_datawriter(
topic, dw_qos, 0, 0);

配置数据读取器 QoS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 配置数据读取器 QoS
DDS::DataReaderQos dr_qos;
subscriber->get_default_datareader_qos(dr_qos);

// 历史记录
dr_qos.history.kind = DDS::KEEP_LAST_HISTORY_QOS;
dr_qos.history.depth = 10;

// 资源限制
dr_qos.resource_limits.max_samples = 100;
dr_qos.resource_limits.max_instances = 5;
dr_qos.resource_limits.max_samples_per_instance = 20;

// 时间过滤
dr_qos.time_based_filter.minimum_separation.sec = 0;
dr_qos.time_based_filter.minimum_separation.nanosec = 100000000; // 100ms

// 创建带自定义 QoS 的数据读取器
DDS::DataReader_var reader = subscriber->create_datareader(
topic, dr_qos, listener, mask);

2. 发现机制

信息仓库发现

1
2
3
4
5
// 通过命令行参数指定
// -DCPSInfoRepo corbaloc::localhost:12345/DCPSInfoRepo

// 通过 API 配置
TheServiceParticipant->set_repo_ior("corbaloc::localhost:12345/DCPSInfoRepo");

RTPS 发现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 在配置文件中指定
// rtps.ini
[common]
DCPSGlobalTransportConfig=$file
DCPSDefaultDiscovery=RTPS

[domain/0]
DiscoveryConfig=rtps_disc

[rtps_disc]
Discovery=rtps.discovery
ResendPeriod=30

// 通过 API 配置
TheServiceParticipant->set_default_discovery("RTPS");

静态发现

1
2
3
4
5
6
7
8
9
10
11
// 在配置文件中定义静态端点
[static_discovery/participant1]
participant_id=1
participant_domain=0
participant_qos=participant_qos_profile

[static_discovery/participant1/publication/pub1]
topic_name=MyTopic
type_name=MyType
writer_id=1
writer_qos=datawriter_qos_profile

3. 传输配置

TCP 传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 创建 TCP 传输实例
OpenDDS::DCPS::TransportInst_rch tcp_inst =
TheTransportRegistry->create_inst("tcp1", "tcp");

OpenDDS::DCPS::TcpInst* tcp =
dynamic_cast<OpenDDS::DCPS::TcpInst*>(tcp_inst.in());

// 配置 TCP 参数
tcp->local_address("127.0.0.1:12345");
tcp->enable_nagle_algorithm(false);
tcp->conn_retry_initial_delay(1000);
tcp->conn_retry_backoff_multiplier(2);
tcp->conn_retry_attempts(5);

// 创建传输配置
OpenDDS::DCPS::TransportConfig_rch config =
TheTransportRegistry->create_config("tcp_config");
config->instances_.push_back(tcp_inst);

// 设置为默认配置
TheTransportRegistry->global_config(config);

UDP 传输

1
2
3
4
5
6
7
8
9
// 创建 UDP 传输实例
OpenDDS::DCPS::TransportInst_rch udp_inst =
TheTransportRegistry->create_inst("udp1", "udp");

OpenDDS::DCPS::UdpInst* udp =
dynamic_cast<OpenDDS::DCPS::UdpInst*>(udp_inst.in());

// 配置 UDP 参数
udp->local_address("127.0.0.1:12346");

多播传输

1
2
3
4
5
6
7
8
9
10
11
// 创建多播传输实例
OpenDDS::DCPS::TransportInst_rch multicast_inst =
TheTransportRegistry->create_inst("multicast1", "multicast");

OpenDDS::DCPS::MulticastInst* multicast =
dynamic_cast<OpenDDS::DCPS::MulticastInst*>(multicast_inst.in());

// 配置多播参数
multicast->group_address("239.255.0.1:12347");
multicast->local_address("127.0.0.1:0");
multicast->ttl(1);

4. 安全功能

启用 DDS 安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 加载安全插件
TheServiceParticipant->set_security(true);

// 配置身份验证
DDS::PropertySeq properties;
properties.length(3);

properties[0].name = "dds.sec.auth.identity_ca";
properties[0].value = "file:identity_ca.pem";

properties[1].name = "dds.sec.auth.private_key";
properties[1].value = "file:private_key.pem";

properties[2].name = "dds.sec.auth.identity_certificate";
properties[2].value = "file:identity_certificate.pem";

// 创建带安全配置的参与者
DDS::DomainParticipantQos participant_qos;
dpf->get_default_participant_qos(participant_qos);
participant_qos.property.value = properties;

DDS::DomainParticipant_var participant =
dpf->create_participant(domainId, participant_qos, 0, 0);

权限控制

1
2
3
4
5
6
7
8
9
10
11
12
13
// 配置访问控制
properties.length(2);

properties[0].name = "dds.sec.access.permissions_ca";
properties[0].value = "file:permissions_ca.pem";

properties[1].name = "dds.sec.access.governance";
properties[1].value = "file:governance.xml";

// 配置权限文档
properties.length(1);
properties[0].name = "dds.sec.access.permissions";
properties[0].value = "file:permissions.xml";

5. 动态类型和 XTypes

使用动态类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 创建动态类型支持
DDS::DynamicTypeSupport_var type_support =
new DDS::DynamicTypeSupport(dynamic_type);

// 注册动态类型
type_support->register_type(participant, type_name);

// 创建动态数据
DDS::DynamicData_var dynamic_data = type_support->create_data();

// 设置字段值
DDS::ReturnCode_t ret = dynamic_data->set_string_value(
0, // member_id
"Hello, Dynamic World!");

// 发布动态数据
DDS::DynamicDataWriter_var dynamic_writer =
DDS::DynamicDataWriter::_narrow(writer);
ret = dynamic_writer->write(*dynamic_data, DDS::HANDLE_NIL);

读取动态数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建动态数据读取器
DDS::DynamicDataReader_var dynamic_reader =
DDS::DynamicDataReader::_narrow(reader);

// 读取动态数据
DDS::DynamicDataSeq data_seq;
DDS::SampleInfoSeq info_seq;
ret = dynamic_reader->read(data_seq, info_seq,
DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE,
DDS::ANY_VIEW_STATE,
DDS::ANY_INSTANCE_STATE);

// 访问字段值
for (CORBA::ULong i = 0; i < data_seq.length(); ++i) {
if (info_seq[i].valid_data) {
CORBA::String_var value;
ret = data_seq[i]->get_string_value(value, 0);
if (ret == DDS::RETCODE_OK) {
std::cout << "Received: " << value.in() << std::endl;
}
}
}

工具和实用程序

1. 监控工具

OpenDDS Monitor

1
2
3
4
5
# 启动监控工具
$DDS_ROOT/tools/monitor/monitor

# 连接到信息仓库
# 在 GUI 中输入信息仓库的 IOR

Excel RTD

1
2
3
4
# 注册 RTD 服务器
regsvr32 $DDS_ROOT/lib/OpenDDS_RTD.dll

# 在 Excel 中加载 OpenDDS.xlam 插件

2. 诊断工具

opendds_idl

1
2
3
4
5
# 生成类型支持代码
opendds_idl MyTypes.idl

# 生成带特定选项的代码
opendds_idl --no-default-nested --language C++11 MyTypes.idl

DCPSInfoRepo

1
2
3
4
5
6
7
8
# 启动信息仓库
DCPSInfoRepo -ORBEndpoint iiop://localhost:12345 -d domain_ids

# 持久化配置
DCPSInfoRepo -ORBEndpoint iiop://localhost:12345 -d domain_ids -f persistence_file

# 查看帮助
DCPSInfoRepo -h

3. 配置管理

配置文件示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# common.ini
[common]
DCPSGlobalTransportConfig=TheTransportConfig
DCPSDefaultDiscovery=TheDiscoveryConfig

[domain/0]
DiscoveryConfig=TheDiscoveryConfig

[transport/the_transport_config]
transports=tcp1

[transport/tcp1]
transport_type=tcp
local_address=127.0.0.1:0
enable_nagle_algorithm=false

[rtps_discovery/the_discovery_config]
Discovery=rtps.discovery
ResendPeriod=30

使用配置文件

1
2
3
4
5
// 通过命令行参数
// -DCPSConfigFile common.ini

// 通过 API
TheServiceParticipant->load_configuration("common.ini");

最佳实践

1. 性能优化

内存管理

1
2
3
4
5
// 使用智能指针管理 DDS 对象
DDS::DomainParticipant_var participant; // 自动引用计数

// 及时清理资源
participant->delete_contained_entities();

批量操作

1
2
3
4
5
6
7
8
9
10
// 批量写入数据
MessageSeq messages(10);
DDS::SampleInfoSeq infos(10);

for (int i = 0; i < 10; ++i) {
messages[i].id = i;
messages[i].text = CORBA::string_dup("Message");
}

DDS::ReturnCode_t ret = message_writer->write(messages, infos);

QoS 调优

1
2
3
4
// 根据应用需求调整 QoS
// 实时应用:使用 BEST_EFFORT 可靠性,减少延迟
// 关键应用:使用 RELIABLE 可靠性,确保数据传递
// 大数据量:调整资源限制,防止内存溢出

2. 错误处理

检查返回码

1
2
3
4
5
6
DDS::ReturnCode_t ret = message_writer->write(message, DDS::HANDLE_NIL);

if (ret != DDS::RETCODE_OK) {
std::cerr << "Write failed: " << ret << std::endl;
// 适当的错误处理
}

异常处理

1
2
3
4
5
6
7
8
9
10
try {
DDS::DomainParticipant_var participant =
dpf->create_participant(domainId, PARTICIPANT_QOS_DEFAULT, 0, 0);
} catch (const CORBA::Exception& e) {
std::cerr << "CORBA exception: " << e << std::endl;
} catch (const std::exception& e) {
std::cerr << "Standard exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown exception" << std::endl;
}

3. 调试技巧

启用调试输出

1
2
3
4
5
// 设置调试级别
OpenDDS::DCPS::set_DCPS_debug_level(1); // 1-10,数字越大输出越详细

// 启用传输调试
TheTransportRegistry->set_log_level(1);

使用日志文件

1
2
3
# 设置环境变量
export ACE_LOG_MSG=/tmp/opendds.log
export DCPSDebugLevel=5

故障排除

常见问题

1. 连接问题

1
2
3
4
5
6
问题:无法连接到信息仓库
解决:
1. 检查信息仓库是否运行:ps aux | grep DCPSInfoRepo
2. 检查端口是否被占用:netstat -an | grep 12345
3. 检查防火墙设置
4. 验证 IOR 字符串格式

2. 发现失败

1
2
3
4
5
6
问题:发布者和订阅者无法发现彼此
解决:
1. 确保使用相同的域 ID
2. 检查发现配置(InfoRepo 或 RTPS)
3. 验证网络配置
4. 检查 QoS 兼容性

3. 内存泄漏

1
2
3
4
5
6
问题:内存使用持续增长
解决:
1. 确保正确调用 delete_contained_entities()
2. 检查循环引用
3. 使用内存分析工具(valgrind)
4. 调整资源限制 QoS

4. 性能问题

1
2
3
4
5
6
问题:延迟高或吞吐量低
解决:
1. 调整传输配置(TCP_NODELAY)
2. 优化 QoS 设置
3. 使用批量操作
4. 考虑使用共享内存传输

调试命令

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看 DDS 进程
ps aux | grep -E "(publisher|subscriber|DCPSInfoRepo)"

# 查看网络连接
netstat -an | grep -E "(12345|7400)"

# 查看日志文件
tail -f /tmp/opendds.log

# 使用 GDB 调试
gdb ./publisher
(gdb) break main
(gdb) run -DCPSConfigFile config.ini

进一步学习资源

官方文档

  1. 在线文档:https://opendds.readthedocs.io/
  2. 开发者指南docs/devguide/ 目录
  3. 设计文档docs/design/ 目录
  4. API 文档:使用 Doxygen 生成

示例代码

  1. HelloWorldtests/DCPS/HelloWorld/ - 最简单的示例
  2. Messengertests/DCPS/Messenger/ - 经典示例
  3. Shapes Demoexamples/DCPS/ishapes/ - 图形化示例
  4. StockQuoterexamples/DCPS/IntroductionToOpenDDS/ - 完整示例

社区资源

  1. GitHub 仓库:https://github.com/OpenDDS/OpenDDS
  2. 讨论区:GitHub Discussions
  3. 邮件列表:opendds-users@googlegroups.com
  4. Stack Overflow:使用 [opendds] 标签

相关标准

  1. DDS 规范:OMG DDS 1.4
  2. RTPS 规范:DDSI-RTPS 2.3
  3. DDS Security:DDS Security 1.1
  4. XTypes:DDS XTypes 1.3

进阶主题

  1. 自定义传输:实现自定义传输插件
  2. 协议扩展:扩展 RTPS 协议
  3. 工具开发:开发监控和诊断工具
  4. 集成测试:编写全面的测试套件

总结

OpenDDS 是一个功能强大、标准兼容的 DDS 实现,适用于构建高性能、可靠的分布式系统。通过本指南,你应该能够:

  1. 理解 OpenDDS 的核心概念和架构
  2. 安装和配置 OpenDDS 开发环境
  3. 编写基本的发布-订阅应用程序
  4. 配置 QoS 策略以满足不同需求
  5. 使用高级特性如安全、动态类型等
  6. 诊断和解决常见问题

建议的学习路径: 1. 从 HelloWorld 示例开始,理解基本概念 2. 尝试 Messenger 示例,学习完整的工作流程 3. 研究 QoS 配置,理解不同策略的影响 4. 探索高级特性,根据项目需求选择使用 5. 参与社区,学习最佳实践和最新发展

OpenDDS 的学习曲线可能较陡,但一旦掌握,你将能够构建出高性能、可靠的分布式系统。祝学习顺利!