前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ROS2中零拷贝实现进程内节点之间的高效通信

ROS2中零拷贝实现进程内节点之间的高效通信

作者头像
点云PCL博主
发布2022-12-27 18:17:15
2K0
发布2022-12-27 18:17:15
举报
文章被收录于专栏:点云PCL点云PCL

背景

ROS应用程序通常由单个“节点”组成,这些节点执行单个的任务,并与系统的其他部分分离。这促进了故障隔离、更快的开发、模块化和代码重用,但往往以性能为代价。在最初开发ROS1之后,对节点的有效组合的需求变得明显,所以开发了Nodelets 。在ROS2中旨在通过解决一些需要节点重构的基本问题来改进节点的设计。

文章主要内容

  • 介绍ros2下实现进程内(intra_process)话题发布和订阅。
  • 在同一进程内的不同节点,可以通过共享指针方式实现内容读取,减少消息的拷贝开销,intra_process对于不同进程间的节点是无法实现零拷贝的。根据subscription的回调类型决定如何分发消息,一对一则零拷贝,一对多会自动拷贝n-1个msg。
  • 对于图像之类数据量比较大的节点间处理的效率和性能将大大提高。

在本demo将重点介绍如何手动组合节点,方法是分别定义节点,但将它们组合在不同的流程布局中,而不更改节点的代码或限制其功能。展示了当使用 std::unique_ptr发布和订阅时,实现进程内发布/订阅连接,可以实现消息的零拷贝传输。

首先,我们来看一下源代码:

代码语言:javascript
复制
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"
using namespace std::chrono_literals;
// Node that produces messages.
struct Producer : public rclcpp::Node
{
  Producer(const std::string & name, const std::string & output)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;
    // Create a timer which publishes on the output topic at ~1Hz.
    auto callback = [captured_pub]() -> void {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        static int32_t count = 0;
        std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());
        msg->data = count++;
        printf(
          "Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(std::move(msg));
      };
    timer_ = this->create_wall_timer(1s, callback);
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};
// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
  Consumer(const std::string & name, const std::string & input)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::Int32>(
      input,
      10,
      [](std_msgs::msg::Int32::UniquePtr msg) {
        printf(
          " Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
      });
  }

  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  auto producer = std::make_shared<Producer>("producer", "number");
  auto consumer = std::make_shared<Consumer>("consumer", "number");
  executor.add_node(producer);
  executor.add_node(consumer);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}

可以看到Producer 继承自Node且设置了use_intra_process_comms为true,这样可以使用intra_process机制。可以看到Producer 继承自Node且设置了use_intra_process_comms为true,这样可以使用intra_process机制。通过查看主函数可以看到,我们有一个producer和一个consumer节点,将它们添加到一个单线程执行器中,然后调用spin。查看producer结构中“producer”节点的实现,您可以看到我们创建了一个发布“number”主题的发布者和一个定时创建新消息、打印出内存中的地址及其内容值并发布的计时器。“consumer”节点稍微简单一些,可以在consumer结构中看到它的实现,因为它只订阅“number”主题,并打印它接收的消息的地址和值。

producer如期打印出地址和对应的值,而consumer打印出匹配的地址和对应的值。这表明进程内通信确实有效,避免了不必要的复制,至少对于简单的图像是如此的。

通过执行ros2 run intra_process_demo two_node_pipeline可执行文件打印结果如下

消息以大约每秒一条的速度发送。这是因为我们告诉计时器大约每秒发射一次。此外,我们注意到第一条消息(值为0)没有相应的“Received message…”行。这是因为发布/订阅是“best effort”的,没有启用任何类似“锁定”的行为。这意味着,如果发布者在订阅建立之前发布消息,订阅将不会收到该消息。这种竞争条件可能导致前几条消息丢失。在这种情况下,由于它们每秒只出现一次,通常只有第一条消息丢失。最后可以看到具有相同值的“Published message…”和“Received message…”行也具有相同的地址。这表明收到的msg地址与发布msg地址相同,并且不是副本。

这是因为我们正在发布和订阅std::unique_ptrs,它允许消息的所有权在系统中安全移动。当然也可以使用const&和std::shared_ptr发布和订阅,但在这种情况下不会出现零拷贝。

这里说明一下std::unique_ptr和std::shared_ptr的用法和区别

智能指针的作用是:智能指针用于确保当对象不再使用时对象可以被自动删除。

std::unique_ptr :一个std::unique_ptr会指向一个对象且不允许其他指针指向。创建一个unique指针,放入容器(例如map),但是将实际指针返回,赋值到其他object.field。一个被std::unique_ptr指向的对象可以被move到另一个指针指向,指向的对象消亡的时候,容器会自动释放所有资源。因此,std::unique_ptr起到一个管理内存资源的作用,实际的指针可以到处使用。

std::shared_ptr:本身的生命周期比较固定,std::shared_ptr在多个对象之间共享一个指针,这些对象的生命周期动态性比较强,当所有的对象结束时指针被释放。

所以std::shared_ptr是共享内存的指针,std::unique_ptr是不用自己管理内存的指针。

循环零拷贝

这个demo与上一个类似,但不是producer为每个迭代创建一个新消息,这个demo只使用一个消息实例。这是通过创建一个循环,并通过外部节点在回调函数执行器之前来“启动(kicking off)”通信来实现发布msg:

代码语言:javascript
复制
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;
// This node receives an Int32, waits 1 second, then increments and sends it.
struct IncrementerPipe : public rclcpp::Node
{
  IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub = this->create_publisher<std_msgs::msg::Int32>(out, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;
    // Create a subscription on the input topic.
    sub = this->create_subscription<std_msgs::msg::Int32>(
      in,
      10,
      [captured_pub](std_msgs::msg::Int32::UniquePtr msg) {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        printf(
          "Received message with value:         %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        printf("  sleeping for 1 second...\n");
        if (!rclcpp::sleep_for(1s)) {
          return;    // Return if the sleep failed (e.g. on ctrl-c).
        }
        printf("  done.\n");
        msg->data++;    // Increment the message's data.
        printf(
          "Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(std::move(msg));    // Send the message along to the output topic.
      });
  }
  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;
};


int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;
  // Create a simple loop by connecting the in and out topics of two IncrementerPipe's.
  // The expectation is that the address of the message being passed between them never changes.
  auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");
  auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");
  rclcpp::sleep_for(1s);  // Wait for subscriptions to be established to avoid race conditions.
  // Publish the first message (kicking off the cycle).
  std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());
  msg->data = 42;
  printf(
    "Published first message with value:  %d, and address: 0x%" PRIXPTR "\n", msg->data,
    reinterpret_cast<std::uintptr_t>(msg.get()));
  pipe1->pub->publish(std::move(msg));
  executor.add_node(pipe1);
  executor.add_node(pipe2);
  executor.spin();
  rclcpp::shutdown();
  return 0;
}

与前一个实例不同的是这个demo只使用一个节点,用不同的名称和配置实例化了两次。这一行pipe1->pub->pub(msg);启动进程,但从那时起,每个节点在其自己的订阅回调函数中调用publish,在节点之间来回传递消息。这里的期望是节点每秒来回传递一次消息,每次都增加消息的值。因为该消息是作为unique_ptr发布和订阅的,所以在开始时创建的相同消息将持续被使用。运行后打印的结果如下:

代码语言:javascript
复制
Published first message with value:  42, and address: 0x7fd2ce0a2bc0
Received message with value:         42, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0
Received message with value:         43, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0
Received message with value:         44, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0
Received message with value:         45, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0
Received message with value:         46, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0
Received message with value:         47, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
[...]

从这里可以看到每次迭代中不断增加的数字,从42开始……并且在整个过程中它都重复使用同一条消息,并且它的指针地址从不改变,这避免了不必要的复制。

所以接下来当我们的工程中需要传递大量的图片或者点云数据的时候,我们可以使用这种方式实现进程间的高效的通信,接下来我们将实现一个以opencv图像传输的demo,使用OpenCV来捕获图像、标注图像和查看图像。

首先将有一条由三个节点组成的流水线,如下所示:camera_node->watermark_node->image_view_node

camera_node从计算机上的相机设备号读取原始图像,在图像上写入一些信息并将其发布。watermark_node订阅camera_node的输出,并在发布之前添加一些文本信息在图像上。最后,image_view_node订阅watermark_node的输出,在图像上写入更多文本信息,然后使用cv::imshow将其可视化。

在每个节点中,将正在发送的消息或已接收的消息的地址都写到图像中,水印信息和图像可视化节点被设计为修改图像而不复制图像,因此,只要节点处于相同的进程中,并且图相保持在如上所述的流程中,打印在图像上的地址就应该是相同的。运行节点如图

这里可以通过按空格键暂停图像的渲染,然后再次按空格键继续渲染。您也可以按q或ESC退出。如果暂停图像查看器,应该能够比较图像上写入的地址,并查看它们是否相同。

具有两个图像可视化的流程

这个例子有两个图像可视化节点,所有节点仍在同一进程中,但现在应该会显示两个图像可视化窗口。

与上一个实例一样,可以使用空格键暂停渲染,然后再次按空格键继续。这样停止查看更新答应到屏幕上的指针是否有变化。正如您在上面的示例图像中看到的,我们有一个图像,所有指针都相同,然后有另一个图像具有与第一个图像相同的指针,但第二个图像上的最后一个指针不同。要理解为什么会发生这种情况,请从graph’s topology方面思考:

camera_node -> watermark_node -> image_view_node-> image_view_node2

camera_node和watermark_node之间的链接可以使用相同的指针,而无需复制,因为只有一个进程内订阅应该向其传递消息。但是对于watermark_node和两个图像可视化节点之间的链接,关系是一对多的,因此如果图像可视化节点使用unique_ptr回调,则不可能将同一指针的所有权传递给这两个节点。然而,它可以传递给其中一个。哪一个将获得原始指针并没有定义,而只是最后一个被传递。

注意,图像可视化节点未订阅unique_ptr回调。相反使用const shared_ptrs订阅。这意味着系统向两个回调传递相同的shared_ptr。处理第一个进程内订阅时,内部存储的unique_ptr将升级为shared_ptr。每个回调将接收同一消息的共享所有权。

带有进程间可视化的流程

另一件重要的事情是,在进行进程间订阅时,避免进程内零拷贝行为的中断,为了测试这一点,可以运行第一个图像流程示例image_pipeline_all_in_one,然后运行一个独立的image_view_node实例。运行起来的结果如下

很难同时暂停两个图像,因此图像可能不会对齐,但需要注意的是,image_pipeline_all_in_one图像视图显示了每个步骤的相同地址。这意味着即使订阅了外部视图,也会保留进程内零拷贝。您还可以看到,进程间图像视图的前两行文本的进程ID和第三行文本中独立图像查看器的进程是ID不同。

参考文献:

https://docs.ros.org/en/galactic/Tutorials/Demos/Intra-Process-Communication.html

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-11-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 点云PCL 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档