目标:使用 Python 创建并运行一个发布者和订阅者节点。
教程级别:初学者
时间:20 分钟
目录
背景
先决条件
任务
1. 创建一个包
2. 编写发布者节点
3. 编写订阅者节点
4. 构建并运行
摘要
下一步
相关内容
背景
在本教程中,您将创建节点,这些节点通过主题相互传递字符串消息形式的信息。这里使用的示例是一个简单的“说话者”和“监听者”系统;一个节点发布数据,另一个节点订阅主题以便接收该数据。
这些示例中使用的代码可以在这里找到。
先决条件
在之前的教程中,您学习了如何创建工作区和创建包。
建议具备基础的 Python 理解,但不是完全必需的。
任务
1. 创建一个包
打开一个新的终端并且初始化您的 ROS 2 安装,这样 ros2
命令就会生效。
导航到在之前教程中创建的 ros2_ws
目录。
请记住,包应该在 src
目录中创建,而不是在工作区的根目录中。因此,请导航到 ros2_ws/src
,然后运行包创建命令:
cxy@ubuntu2404-cxy:~/ros2_ws/src$ ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub
going to create a new package
package name: py_pubsub
destination directory: /home/cxy/ros2_ws/src
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['cxy <cxy@todo.todo>']
licenses: ['Apache-2.0']
build type: ament_python
dependencies: []
creating folder ./py_pubsub
creating ./py_pubsub/package.xml
creating source folder
creating folder ./py_pubsub/py_pubsub
creating ./py_pubsub/setup.py
creating ./py_pubsub/setup.cfg
creating folder ./py_pubsub/resource
creating ./py_pubsub/resource/py_pubsub
creating ./py_pubsub/py_pubsub/__init__.py
creating folder ./py_pubsub/test
creating ./py_pubsub/test/test_copyright.py
creating ./py_pubsub/test/test_flake8.py
creating ./py_pubsub/test/test_pep257.py
您的终端将返回一条消息,确认您的包 py_pubsub
及其所有必要的文件和文件夹已创建。
2. 编写发布者节点
导航到 ros2_ws/src/py_pubsub/py_pubsub
。请记住,这个目录是一个 Python 包,它的名称与其嵌套的 ROS 2 包的名称相同。
下载示例说话者代码,请输入以下命令:
cxy@ubuntu2404-cxy:~/ros2_ws/src/py_pubsub/py_pubsub$ wget https://raw.githubusercontent.com/ros2/examples/jazzy/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
--2024-07-06 11:23:35-- https://raw.githubusercontent.com/ros2/examples/jazzy/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
正在连接 127.0.0.1:2334... 已连接。
已发出 Proxy 请求,正在等待回应... 200 OK
长度:1576 (1.5K) [text/plain]
正在保存至: ‘publisher_member_function.py’
publisher_member_fu 100%[===================>] 1.54K --.-KB/s 用时 0s
2024-07-06 11:23:37 (6.48 MB/s) - 已保存 ‘publisher_member_function.py’ [1576/1576])
现在将有一个名为 publisher_member_function.py
的新文件与 __init__.py
相邻。
使用您喜欢的文本编辑器打开文件。
# 版权 2016 开源机器人基金会,公司。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)许可;
# 除非符合许可证,否则不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则在许可证下分发的软件
# 将按“原样”基础分发,无任何明示或暗示的保证或条件。
# 请参阅许可证以了解管理权限和
# 许可证下的限制。
import rclpy # 导入 ROS 客户端库
from rclpy.node import Node # 从 rclpy.node 模块导入 Node 类
from std_msgs.msg import String # 从 std_msgs.msg 模块导入 String 消息类型
class MinimalPublisher(Node): # 定义一个名为 MinimalPublisher 的类,它继承自 Node 类
def __init__(self): # 定义类的初始化函数
super().__init__('minimal_publisher') # 调用父类(Node)的初始化函数,并设置节点名称为 'minimal_publisher'
self.publisher_ = self.create_publisher(String, 'topic', 10) # 创建一个发布者,消息类型为 String,主题名为 'topic',队列长度为 10
timer_period = 0.5 # 设置定时器的时间间隔,单位为秒
self.timer = self.create_timer(timer_period, self.timer_callback) # 创建一个定时器,每隔 timer_period 秒就调用一次 self.timer_callback 函数
self.i = 0 # 初始化一个变量 i,用于记录发布的消息数量
def timer_callback(self): # 定义定时器回调函数
msg = String() # 创建一个 String 类型的消息对象
msg.data = 'Hello World: %d' % self.i # 设置消息的内容
self.publisher_.publish(msg) # 发布消息
self.get_logger().info('Publishing: "%s"' % msg.data) # 在终端上打印一条日志信息
self.i += 1 # 每发布一条消息,就将 i 的值加 1
def main(args=None): # 定义主函数
rclpy.init(args=args) # 初始化 ROS 客户端库
minimal_publisher = MinimalPublisher() # 创建一个 MinimalPublisher 类的实例
rclpy.spin(minimal_publisher) # 进入循环,等待消息到来
# 显式销毁节点
# (可选 - 否则当垃圾收集器销毁节点对象时
# 它将自动完成)
minimal_publisher.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS 客户端库
if __name__ == '__main__': # 如果直接运行这个文件(而不是 import 这个文件)
main() # 调用主函数
检查代码 2.1
源代码中的第一行在注释后导入 rclpy
,以便可以使用其 Node
类。
import rclpy
from rclpy.node import Node
下一条语句导入了节点用来构建其在主题上传递的数据的内置字符串消息类型。
from std_msgs.msg import String
这些行代表节点的依赖项。请记住,依赖项需要添加到 package.xml
中,在下一节中您将执行此操作。
接下来,创建了 MinimalPublisher
类,它继承自(或是 Node
的子类)。
class MinimalPublisher(Node):
以下是类构造函数的定义。 super().__init__
调用 Node
类的构造函数,并为其提供节点名称,在本例中为 minimal_publisher
。
create_publisher
声明该节点发布 std_msgs.msg
模块导入的类型为 String
的消息,通过名为 topic
的主题,且“队列大小”为 10。队列大小是必需的 QoS(服务质量)设置,如果订阅者接收消息不够快,它会限制排队消息的数量。
接下来,创建了一个定时器,每 0.5 秒执行一次回调。 self.i
是回调中使用的计数器。
def __init__(self):
super().__init__('minimal_publisher')
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
timer_callback
创建一条附加了计数器值的消息,并使用 get_logger().info
发布到控制台。
def timer_callback(self):
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
最后,主函数被定义。
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_publisher.destroy_node()
rclpy.shutdown()
首先初始化 rclpy
库,然后创建节点,再对节点进行“旋转”,以便调用其回调函数。
2.2 添加依赖项
返回到 ros2_ws/src/py_pubsub
目录, setup.py
、 setup.cfg
和 package.xml
文件已为您创建。
使用您的文本编辑器打开 package.xml
。
如前所述教程中提到的,确保填写 <description>
、 <maintainer>
和 <license>
标签:
<description>Examples of minimal publisher/subscriber using rclpy</description>
<maintainer email="you@email.com">Your Name</maintainer>
<license>Apache-2.0</license>
在上面的行之后,添加以下依赖项,对应于您节点的导入语句:
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
此声明表示当代码执行时,包需要 rclpy
和 std_msgs
。
<?xml version="1.0"?> <!-- XML 文件的版本 -->
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <!-- XML 文件的模式定义 -->
<package format="3"> <!-- 定义一个 ROS 包,格式为 3 -->
<name>py_pubsub</name> <!-- 包的名称 -->
<version>0.0.0</version> <!-- 包的版本号 -->
<description>Examples of minimal publisher/subscriber using rclpy</description> <!-- 包的描述 -->
<maintainer email="cxy@126.com">cxy</maintainer> <!-- 包的维护者和他的电子邮件地址 -->
<license>Apache-2.0</license> <!-- 包的许可证 -->
<exec_depend>rclpy</exec_depend> <!-- 运行此包需要的依赖包 -->
<exec_depend>std_msgs</exec_depend> <!-- 运行此包需要的依赖包 -->
<test_depend>ament_copyright</test_depend> <!-- 测试此包需要的依赖包 -->
<test_depend>ament_flake8</test_depend> <!-- 测试此包需要的依赖包 -->
<test_depend>ament_pep257</test_depend> <!-- 测试此包需要的依赖包 -->
<test_depend>python3-pytest</test_depend> <!-- 测试此包需要的依赖包 -->
<export> <!-- 导出的信息 -->
<build_type>ament_python</build_type> <!-- 构建类型 -->
</export>
</package>
确保保存文件。
2.3 添加一个入口点
打开 setup.py
文件。再次,将 maintainer
、 maintainer_email
、 description
和 license
字段与您的 package.xml
匹配:
maintainer='YourName',
maintainer_email='you@email.com',
description='Examples of minimal publisher/subscriber using rclpy',
license='Apache-2.0',
在 entry_points
字段的 console_scripts
括号内添加以下行:
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main',
],
},
别忘了保存。
2.4 检查 setup.cfg
setup.cfg
文件的内容应该会像这样自动正确填充:
[develop]
script_dir=$base/lib/py_pubsub
[install]
install_scripts=$base/lib/py_pubsub
这只是告诉 setuptools 将你的可执行文件放在 lib
,因为 ros2 run
会在那里查找它们。
您现在可以构建您的包,源本地设置文件,并运行它,但让我们先创建订阅者节点,这样您就可以看到完整系统的运作。
3. 编写订阅者节点
返回 ros2_ws/src/py_pubsub/py_pubsub
以创建下一个节点。在您的终端输入以下代码:
cxy@ubuntu2404-cxy:~/ros2_ws/src/py_pubsub/py_pubsub$ wget https://raw.githubusercontent.com/ros2/examples/jazzy/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
--2024-07-06 11:46:47-- https://raw.githubusercontent.com/ros2/examples/jazzy/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
正在连接 127.0.0.1:2334... 已连接。
已发出 Proxy 请求,正在等待回应... 200 OK
长度: 1469 (1.4K) [text/plain]
正在保存至: ‘subscriber_member_function.py’
subscriber_member_f 100%[===================>] 1.43K --.-KB/s 用时 0s
2024-07-06 11:46:49 (11.7 MB/s) - 已保存 ‘subscriber_member_function.py’ [1469/1469])
现在目录中应该有这些文件:
3.1 检查代码
打开 subscriber_member_function.py
使用你的文本编辑器。
# 版权 2016 开源机器人基金会,公司。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)许可;
# 除非符合许可证,否则不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则在许可证下分发的软件
# 将按“原样”基础分发,无任何明示或暗示的保证或条件。
# 请参阅许可证以了解管理权限和
# 许可证下的限制。
import rclpy # 导入 ROS 客户端库
from rclpy.node import Node # 从 rclpy.node 模块导入 Node 类
from std_msgs.msg import String # 从 std_msgs.msg 模块导入 String 消息类型
class MinimalSubscriber(Node): # 定义一个名为 MinimalSubscriber 的类,它继承自 Node 类
def __init__(self): # 定义类的初始化函数
super().__init__('minimal_subscriber') # 调用父类(Node)的初始化函数,并设置节点名称为 'minimal_subscriber'
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10) # 创建一个订阅者,消息类型为 String,主题名为 'topic',队列长度为 10,当收到消息时调用 self.listener_callback 函数
self.subscription # 防止未使用变量警告
def listener_callback(self, msg): # 定义监听回调函数
self.get_logger().info('I heard: "%s"' % msg.data) # 在终端上打印一条日志信息,内容为收到的消息
def main(args=None): # 定义主函数
rclpy.init(args=args) # 初始化 ROS 客户端库
minimal_subscriber = MinimalSubscriber() # 创建一个 MinimalSubscriber 类的实例
rclpy.spin(minimal_subscriber) # 进入循环,等待消息到来
# 显式销毁节点
# (可选 - 否则当垃圾收集器销毁节点对象时
# 它将自动完成)
minimal_subscriber.destroy_node() # 销毁节点
rclpy.shutdown() # 关闭 ROS 客户端库
if __name__ == '__main__': # 如果直接运行这个文件(而不是 import 这个文件)
main() # 调用主函数
订阅者节点的代码与发布者的几乎相同。构造函数使用与发布者相同的参数创建一个订阅者。回想一下主题教程中提到的,发布者和订阅者使用的主题名称和消息类型必须匹配,才能使它们能够通信。
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
订阅者的构造函数和回调不包括任何定时器定义,因为它不需要定时器。一旦收到消息,它的回调就会被调用。
回调定义只是将一个信息消息和它收到的数据一起打印到控制台。回想一下发布者定义 msg.data = 'Hello World: %d' % self.i
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
main
的定义几乎完全相同,只是将发布者的创建和旋转替换为订阅者。
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
由于此节点与发布者具有相同的依赖项,因此无需向 package.xml
添加任何新内容。 setup.cfg
文件也可以保持不变。
3.2 添加一个入口点
重新打开 setup.py
,并在发布者入口点下面添加订阅者节点的入口点。 entry_points
字段现在应该如下所示:
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main',
'listener = py_pubsub.subscriber_member_function:main',
],
},
请确保保存文件,然后您的发布/订阅系统应该就绪。
from setuptools import find_packages, setup # 导入 setuptools 库的 find_packages 和 setup 函数
package_name = 'py_pubsub' # 定义包名
setup( # 调用 setup 函数来配置包
name=package_name, # 包名
version='0.0.0', # 包的版本号
packages=find_packages(exclude=['test']), # 使用 find_packages 函数查找包,排除名为 'test' 的包
data_files=[ # 数据文件列表
('share/ament_index/resource_index/packages',
['resource/' + package_name]), # 第一个数据文件的路径和文件名
('share/' + package_name, ['package.xml']), # 第二个数据文件的路径和文件名
],
install_requires=['setuptools'], # 安装此包需要的其他包
zip_safe=True, # 此包可以安全地从 zip 文件中导入
maintainer='cxy', # 维护者的名字
maintainer_email='cxy@126.com', # 维护者的电子邮件地址
description='Examples of minimal publisher/subscriber using rclpy', # 包的描述
license='Apache-2.0', # 包的许可证
tests_require=['pytest'], # 测试此包需要的其他包
entry_points={ # 入口点
'console_scripts': [ # 控制台脚本入口点
'talker = py_pubsub.publisher_member_function:main', # 当运行 'talker' 命令时,会调用 'py_pubsub.publisher_member_function' 模块的 'main' 函数
'listener = py_pubsub.subscriber_member_function:main', # 当运行 'listener' 命令时,会调用 'py_pubsub.subscriber_member_function' 模块的 'main' 函数
],
},
)
4. 构建并运行
您可能已经在 ROS 2 系统中安装了 rclpy
和 std_msgs
包。在构建之前,在工作空间的根目录( ros2_ws
)运行 rosdep
以检查缺失的依赖项是一个好习惯:
rosdep install -i --from-path src --rosdistro jazzy -y
仍在您的工作区的根目录中, ros2_ws
,构建您的新包:
cxy@ubuntu2404-cxy:~/ros2_ws$ colcon build --packages-select py_pubsub
Starting >>> py_pubsub
Finished <<< py_pubsub [3.03s]
Summary: 1 package finished [11.2s]
打开一个新的终端,导航到 ros2_ws
,并且导入设置文件:
source install/setup.bash
现在运行 talker 节点:
ros2 run py_pubsub talker
终端应该每 0.5 秒发布一次信息消息,如下:
cxy@ubuntu2404-cxy:~/ros2_ws$ source install/setup.bash
cxy@ubuntu2404-cxy:~/ros2_ws$ ros2 run py_pubsub talker
[INFO] [1720238366.729769667] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [1720238367.169861013] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [1720238367.669947277] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [1720238368.170122810] [minimal_publisher]: Publishing: "Hello World: 3"
[INFO] [1720238368.670373114] [minimal_publisher]: Publishing: "Hello World: 4"
打开另一个终端,再次从 ros2_ws
内部加载设置文件,然后启动监听节点:
cxy@ubuntu2404-cxy:~/ros2_ws$ source install/setup.bash
cxy@ubuntu2404-cxy:~/ros2_ws$
cxy@ubuntu2404-cxy:~/ros2_ws$ ros2 run py_pubsub listener
[INFO] [1720238372.201524573] [minimal_subscriber]: I heard: "Hello World: 11"
[INFO] [1720238372.671306025] [minimal_subscriber]: I heard: "Hello World: 12"
[INFO] [1720238373.169087107] [minimal_subscriber]: I heard: "Hello World: 13"
[INFO] [1720238373.671070379] [minimal_subscriber]: I heard: "Hello World: 14"
[INFO] [1720238374.171012765] [minimal_subscriber]: I heard: "Hello World: 15"
监听器将开始向控制台打印消息,从发布者当时的消息计数开始,如下:
在每个终端输入 Ctrl+C
以停止节点旋转。
摘要
您创建了两个节点,用于通过主题发布和订阅数据。在运行它们之前,您已将它们的依赖项和入口点添加到包配置文件中。
下一步
接下来,您将使用服务/客户端模型创建另一个简单的 ROS 2 包。同样,您可以选择用 C++ https://docs.ros.org/en/jazzy/Tutorials/Beginner-Client-Libraries/Writing-A-Simple-Cpp-Service-And-Client.html 或 Python https://docs.ros.org/en/jazzy/Tutorials/Beginner-Client-Libraries/Writing-A-Simple-Py-Service-And-Client.html 编写。
相关内容
有几种方法可以在 Python 中编写发布者和订阅者;请查看 ros2/examples 仓库中的 minimal_publisher
和 minimal_subscriber
包。https://github.com/ros2/examples/tree/jazzy/rclpy/topics