动作也是ros2的一种通信机制,目的就是便于对机器人某一完整行为的流程进行管理。
动作就是在下令让机器人完成什么动作时及时反馈,告诉客户端执行到哪一步了,并且让客户端随时可以下令取消动作。
动作和服务类似,使用的也是客户端和服务器模型,客户端发送动作的目标,想让机器人干什么,服务器端执行动作过程, 控制机器人达到运动的目标,同时周期反馈动作执行过程中的状态。
因此,与服务一样,服务器唯一,而客户端不唯一。同时属于同步通信机制。通过.action文件定义通信接口数据结构。
拆开了细分,动作其实是由三个其他通信模块组合而成的,其中两个是服务,一个是话题,其原理结构图如下:

机器人作为服务器端,用户的操作设备为客户端。由客户端发送一条动作执行命令后,服务器端会给出回应表示接收到了命令并开始执行,期间通过话题持续发布自身动作的状态参数,而客户端订阅这个话题,时时获取机器人的状态参数。动作执行完毕后,还会向客户端反馈已完成的信息。
动作虽然是基于话题和服务实现的,但在实际使用中,并不会直接使用话题和服务的编程方法,而是有一套针对动作特性封装好的编程接口。
在命令行窗口内,可以直接指定节点发送指定类型的action,以海龟为例,在命令行直接操控海龟旋转可以使用一下指令:
ros2 action send_goal /turtle1/rotate_absolute turtlesim/action/RotateAbsolute "{theta: -1.57}" --feedback
ros2的工具包action,其中一个命令send_goal。指定的第一个参数是发送的节点,第二个参数是消息类型文件,最后是消息内容。对于海龟而言,theta的冒号后面还要有一个空格。–feedback是用于设置是否要展示每次话题发布的内容。
一个基础.action的实例如下,用来模拟控制机器人旋转动作:
bool enable # 定义动作的目标,表示动作开始的指令
---
bool finish # 定义动作的结果,表示是否成功执行
---
int32 state # 定义动作的反馈,表示当前执行到的位置
包含三个部分:
- 第一块是动作的目标,enable为true时,表示开始运动;
- 第二块是动作的执行结果,finish为true,表示动作执行完成;
- 第三块是动作的周期反馈,表示当前机器人旋转到的角度。
三个部分隶属一个大的结构体(类),具体内部又由三个其他结构体属性,分别为goal、result、feedback。
客户端发送给一个动作目标,服务器控制机器人开始运动,并周期反馈,结束后反馈结束信息。
同理,完成定义后,还需要在功能包的CMakeLists.txt中配置编译选项,让编译器在编译过程中,根据接口定义,自动生成不同语言的代码。
在服务器端和客户端,同样需要按python规则从指定的库导入对应的类名。
一个服务器端示例如下:
import time
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from rclpy.action import ActionServer # ROS2 动作服务器类
from learning_interface.action import MoveCircle # 自定义的圆周运动接口
class MoveCircleActionServer(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self._action_server = ActionServer( # 创建动作服务器(接口类型、动作名、回调函数)
self,
MoveCircle,
'move_circle',
self.execute_callback)
def execute_callback(self, goal_handle): # 执行收到动作目标之后的处理函数
self.get_logger().info('Moving circle...')
feedback_msg = MoveCircle.Feedback() # 创建一个动作反馈信息的消息,本质上是个话题
for i in range(0, 360, 30): # 从0到360度,执行圆周运动,并周期反馈信息
feedback_msg.state = i # 创建反馈信息,表示当前执行到的角度
self.get_logger().info('Publishing feedback: %d' % feedback_msg.state)
goal_handle.publish_feedback(feedback_msg) # 发布反馈信息
time.sleep(0.5)
goal_handle.succeed() # 动作执行成功
result = MoveCircle.Result() # 创建结果消息
result.finish = True
return result # 反馈最终动作执行的结果
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = MoveCircleActionServer("action_move_server") # 创建ROS2节点对象并进行初始化
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
此处服务器端并没有去获取客户端传递来的参数具体是什么,收到消息后执行回调函数就开始模拟运动。其实应该也依旧可以尝试在回调函数里通过MoveCircle.Goal()的方法获取客户端传递来的数据。(已验证过可行)
对应的客户端实例如下:
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
from rclpy.action import ActionClient # ROS2 动作客户端类
from learning_interface.action import MoveCircle # 自定义的圆周运动接口
class MoveCircleActionClient(Node):
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self._action_client = ActionClient( # 创建动作客户端(接口类型、动作名)
self, MoveCircle, 'move_circle')
def send_goal(self, enable): # 创建一个发送动作目标的函数
goal_msg = MoveCircle.Goal() # 创建一个动作目标的消息
goal_msg.enable = enable # 设置动作目标为使能,希望机器人开始运动
self._action_client.wait_for_server() # 等待动作的服务器端启动
self._send_goal_future = self._action_client.send_goal_async(# 异步方式发送动作的目标
goal_msg, # 动作目标
feedback_callback=self.feedback_callback) # 处理周期反馈消息的回调函数
self._send_goal_future.add_done_callback(self.goal_response_callback) # 设置一个服务器收到目标之后反馈时的回调函数
def goal_response_callback(self, future): # 创建一个服务器收到目标之后反馈时的回调函数
goal_handle = future.result() # 接收动作的结果
if not goal_handle.accepted: # 如果动作被拒绝执行
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)') # 动作被顺利执行
self._get_result_future = goal_handle.get_result_async() # 异步获取动作最终执行的结果反馈
self._get_result_future.add_done_callback(self.get_result_callback) # 设置一个收到最终结果的回调函数
def get_result_callback(self, future): # 创建一个收到最终结果的回调函数
result = future.result().result # 读取动作执行的结果
self.get_logger().info('Result: {%d}' % result.finish) # 日志输出执行结果
def feedback_callback(self, feedback_msg): # 创建处理周期反馈消息的回调函数
feedback = feedback_msg.feedback # 读取反馈的数据
self.get_logger().info('Received feedback: {%d}' % feedback.state)
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = MoveCircleActionClient("action_move_client") # 创建ROS2节点对象并进行初始化
node.send_goal(True) # 发送动作目标
rclpy.spin(node) # 循环等待ROS2退出
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
同样的导入方式获取action的类。
客户端设置了三个回调函数,分别为:
- goal_response_callback:目标发送给服务器端后,服务器端会反馈一个消息表示收到了信息。当客户端接收到目标后的返回信息发回客户端后调用的函数。这里的示例中,先判断返回消息是否符合预期要求,没问题后输出日志并异步等待执行,执行完后再调用回调函数“get_result_callback”。
- get_result_callback:机器人执行完整个完整动作后,最终返回结果时,执行该函数。此处示例仅仅只是在日志里展示最终结果而已。
- feedback_callback:机器人动作执行期间持续不断的通过话题反馈消息,期间每次反馈的消息被客户端收到都会执行一次本函数,用于监控机器人的运动状态。