DDS是ros2相对于ros1而言最大的变化,话题、服务、动作,他们底层通信的具体实现过程,都是靠DDS来完成的,它相当于是ROS机器人系统中的神经网络。
DDS本质上也是一种通信模型,常见的通信模型如下:
- 第一种,点对点模型,许多客户端连接到一个服务端,每次通信时,通信双方必须建立一条连接。当通信节点增多时,连接数也会增多。而且每个客户端都需要知道服务器的具体地址和所提供的服务,一旦服务器地址发生变化,所有客户端都会受到影响。
- 第二种,Broker模型,针对点对点模型进行了优化,由Broker集中处理所有人的请求,并进一步找到真正能响应该服务的角色。这样客户端就不用关心服务器的具体地址了。不过问题也很明显,Broker作为核心,它的处理速度会影响所有节点的效率,当系统规模增长到一定程度,Broker就会成为整个系统的性能瓶颈。更麻烦是,如果Broker发生异常,可能导致整个系统都无法正常运转。之前的ROS1系统,使用的就是类似这样的架构。
- 第三种,广播模型,所有节点都可以在通道上广播消息,并且节点都可以收到消息。这个模型解决了服务器地址的问题,而且通信双方也不用单独建立连接,但是广播通道上的消息太多了,所有节点都必须关心每条消息,其实很多是和自己没有关系的。
- 第四种,就是以数据为中心的DDS模型了,这种模型与广播模型有些类似,所有节点都可以在DataBus上发布和订阅消息。但它的先进之处在于,通信中包含了很多并行的通路,每个节点可以只关心自己感兴趣的消息,忽略不感兴趣的消息,有点像是一个旋转火锅,各种好吃的都在这个DataBus传送,我们只需要拿自己想吃的就行,其他的和我们没有关系。
DDS的全称是Data Distribution Service,也就是数据分发服务,2004年由对象管理组织OMG发布和维护,是一套专门为实时系统设计的数据分发/订阅标准。DDS强调以数据为中心,可以提供丰富的服务质量策略,以保障数据进行实时、高效、灵活地分发,可满足各种分布式实时通信应用需求。

DDS是一种通信的标准,就像4G、5G一样,既然是标准,那大家都可以按照这个标准来实现对应的功能,所以华为、高通都有很多5G的技术专利,DDS也是一样,能够按照DDS标准实现的通信系统很多,这里每一个红色模块,就是某一企业或组织实现的一种DDS系统。
可用的DDS标准有很多,具体而言,他们肯定都符合基本标准,但还是会有性能上的差别,ROS2的原则就是尽量兼容,让用户根据使用场景选择,比如个人开发,我们选择一个开源版本的DDS就行,如果是工业应用,那可能得选择一个商业授权的版本了。
为了实现对多个DDS的兼容,ROS设计了一个Middleware中间件,也就是一个统一的标准,不管我们用那个DDS,保证上层编程使用的函数接口都是一样的。此时兼容性的问题就转移给了DDS厂商,如果他们想让自己的DDS系统进入ROS生态,就得按照ROS的接口标准,开发一个驱动,也就是这个部分。
无论如何,ROS的宗旨不变,要提高软件代码的复用性,下边DDS任你边,上边的软件没影响。
DDS中的基本结构是Domain,Domain将各个应用程序绑定在一起进行通信,之前配置的那个DOMAIN ID,就是对全局数据空间的分组定义,只有处于同一个DOMAIN小组中的节点才能互相通信。这样可以避免无用数据占用的资源。
DDS中另外一个重要特性就是质量服务策略,QoS。
QoS是一种网络传输策略,应用程序指定所需要的网络传输质量行为,QoS服务实现这种行为要求,尽可能地满足客户对通信质量的需求,可以理解为数据提供者和接收者之间的合约。
具体策略:
- DEADLINE策略,表示通信数据必须要在每次截止时间内完成一次通信;
- HISTORY策略,表示针对历史数据的一个缓存大小;
- RELIABILITY策略,表示数据通信的模式,配置成BEST_EFFORT,就是尽力传输模式,网络情况不好的时候,也要保证数据流畅,此时可能会导致数据丢失,配置成RELIABLE,就是可信赖模式,可以在通信中尽量保证图像的完整性,我们可以根据应用功能场景选择合适的通信模式;
- DURABILITY策略,可以配置针对晚加入的节点,也保证有一定的历史数据发送过去,可以让新节点快速适应系统。
当两个窗口选择的通讯传输模式不一样时,同样不能相互传递数据。
如,若发布者指定qos为reliability且配置为BEST_EFFORT:
ros2 topic pub /chatter std_msgs/msg/Int32 "data: 42" --qos-reliability best_effort
订阅者却指定qos为reliability且配置为RELIABLE:
ros2 topic echo /chatter --qos-reliability reliable
在订阅者处和发布者处都会报一个警告且订阅者无法收到消息,但进程仍旧持续。
订阅者换成同样的qos配置后就可以接收到消息了:
ros2 topic echo /chatter --qos-reliability best_effort
在topic命令后边跟一个”–verbose”参数可以主动查看话题的QoS策略:
ros2 topic info /chatter --verbose
在代码中,可以通过rclpy.qos库函数配置qos配置。
在发布者中,创建话题发布者的类对象时,修改传递参数为qos对象即可:
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from std_msgs.msg import String # 字符串消息类型
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS类
"""
创建一个发布者节点
"""
class PublisherNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
qos_profile = QoSProfile( # 创建一个QoS原则
# reliability=QoSReliabilityPolicy.BEST_EFFORT,
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1 # 缓存大小
)
self.pub = self.create_publisher(String, "chatter", qos_profile) # 创建发布者对象(消息类型、话题名、QoS原则),在以前的尝试中,这里第三个参数仅仅是缓存队列大小。
self.timer = self.create_timer(0.5, self.timer_callback) # 创建一个定时器(单位为秒的周期,定时执行的回调函数)
def timer_callback(self): # 创建定时器周期执行的回调函数
msg = String() # 创建一个String类型的消息对象
msg.data = 'Hello World' # 填充消息对象中的消息数据
self.pub.publish(msg) # 发布话题消息
self.get_logger().info('Publishing: "%s"' % msg.data)# 输出日志信息,提示已经完成话题发布
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = PublisherNode("qos_helloworld_pub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
相较于(3)话题学习笔记内代码,唯一的变动就是创造发布者类对象时,用qos对象取代了原本的缓存队列长度参数。
在订阅者中,也是同样的,在订阅者类对象处设置qos对象进行配置:
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from std_msgs.msg import String # ROS2标准定义的String消息
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy # ROS2 QoS类
"""
创建一个订阅者节点
"""
class SubscriberNode(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
qos_profile = QoSProfile( # 创建一个QoS原则
# reliability=QoSReliabilityPolicy.BEST_EFFORT,
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
)
self.sub = self.create_subscription(\
String, "chatter", self.listener_callback, qos_profile) # 创建订阅者对象(消息类型、话题名、订阅者回调函数、QoS原则)
def listener_callback(self, msg): # 创建回调函数,执行收到话题消息后对数据的处理
self.get_logger().info('I heard: "%s"' % msg.data) # 输出日志信息,提示订阅收到的话题消息
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = SubscriberNode("qos_helloworld_sub") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
与发布者一样,创造订阅者类对象时,用qos对象取代了原本的缓存队列长度参数。