前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【机器人】ROS1工程案例:服务和动作

【机器人】ROS1工程案例:服务和动作

作者头像
杨丝儿
发布2022-02-28 15:20:56
2250
发布2022-02-28 15:20:56
举报
文章被收录于专栏:杨丝儿的小站杨丝儿的小站

我们这一部分模拟一个低能的阶乘计算器,来比较Service和Action的差异。

碎碎念:Hello米娜桑,这里是英国留学中的杨丝儿。我的博客技术点集中在机器人、人工智能可解释性、数学、物理等等,感兴趣地点个关注吧,持续高质量输出中。

⭐️案例目标

说明以下问题

  • ROS中message,action,service的定义方式大同小异。
  • 在消息作为一等公民的ROS软件中,灵活使用面向对象思想。
  • 文件的命名问题,很多文件的命名是很有讲究的,节点名也是。
  • ROS使用大量多线程并行的思想,需要理解并行与并发、python的主锁以及阻塞和上锁的区别。

⭐️准备工作

  • 基础步骤参见:【机器人】ROS程序框架:架构部分
  • 创建factorial_calculator包

⭐️服务部分

✨定义文件

创建一个名为FactorialService.srv,定义输入和输出

代码语言:javascript
复制
uint32 num_in
--- 
uint32 num_out

取消package.xml中的注释

添加信息到CMakeLists.txt

代码语言:javascript
复制
find_package(catkin REQUIRED COMPONENTS
  rospy
  std_msgs # 新增
  message_generation # 新增
)
代码语言:javascript
复制
# Generate services in the 'srv' folder
add_service_files(
  FILES
  FactorialService.srv # 新增
)

取消CMakeLists.txt中的注释

使用$catkin_make编译

✨使用服务

客户端factorial_service_client.py代码

代码语言:javascript
复制
#!/usr/bin/env python3

import rospy
from factorial_calculator.srv import FactorialService 

class FactorialServiceClient():
    def __init__(self):
        rospy.init_node('service_client_factorial')
        rospy.wait_for_service('service/server/factorial/service')
        self.factorise = rospy.ServiceProxy('service/server/factorial/service', FactorialService)
    
    def call(self,num):
        return self.factorise(num).num_out


if __name__ == '__main__':
    client = FactorialServiceClient()
    while input_num := input("Input a num: "):
        if not input_num.isdigit():
            continue
        input_num = int(input_num)
        result = client.call(input_num)
        print(f"Result is: {result}")

服务端factorial_service_server.py代码

代码语言:javascript
复制
#!/usr/bin/env python3

import rospy
from factorial_calculator.srv import FactorialService,FactorialServiceResponse

class FactorialServiceServer():
    def __init__(self):
        rospy.init_node('service_server_factorial')

    def factorise(self,num):
        result = 1
        for i in range(1,num+1):
            result *= i
            rospy.sleep(0.2)
            print(result)
        return result
    
    def run(self):
        service = rospy.Service('service/server/factorial/service', FactorialService, lambda x : FactorialServiceResponse(self.factorise(x.num_in)))
        rospy.spin()
        
if __name__ == '__main__':
    server = FactorialServiceServer()
    server.run()

✨效果

⭐️动作部分

✨定义文件

创建一个名为FactorialAction.action

代码语言:javascript
复制
uint32 num_in
---
uint32 num_process
--- 
uint32 num_out

package.xml添加信息

代码语言:javascript
复制
<build_depend>actionlib_msgs</build_depend>
<exec_depend>actionlib_msgs</exec_depend>

CMakeLists.txt

代码语言:javascript
复制
# Generate actions in the 'action' folder
add_action_files(
  FILES
  FactorialAction.action # 新增
)

添加actionlib_msgs依赖 find_package(catkin REQUIRED COMPONENTS rospy std_msgs actionlib_msgs # 新增 message_generation ) generate_messages( DEPENDENCIES std_msgs # Or other packages containing msgs actionlib_msgs # 新增 ) catkin_package( # INCLUDE_DIRS include # LIBRARIES factorial_calculator # CATKIN_DEPENDS rospy # DEPENDS system_lib actionlib_msgs # 新增 )

$catkin_make

✨使用服务

客户端factorial_action_client.py代码

代码语言:javascript
复制
#!/usr/bin/env python3

import rospy

import actionlib
from factorial_calculator.msg import FactorialActionAction,FactorialActionGoal,FactorialActionResult

class FactorialActionClient():
    def __init__(self):
        # 初始化节点
        rospy.init_node('action_client_factorial')
        
        # 定义动作客户端
        self.client = actionlib.SimpleActionClient('action/server/factorial/action',
                                                   FactorialActionAction)
        self.client.wait_for_server()
          
    def feedback_callback(self,feedback):
        if int(feedback)>100000:
            self.client.cancel_goal()
        print('Feedback:', feedback)
    
    def call(self,num):
        # 定义goal
        goal = FactorialActionGoal()
        goal.num_goal = num
        
        # 发送goal
        self.client.send_goal(goal, feedback_cb=lambda x : self.feedback_callback(x.num_feedback))
        
        # 阻塞等待结果,一般是不会这样使用的
        # 这是多线程中的阻塞步骤
        self.client.wait_for_result()
        return self.client.get_result().num_result


if __name__ == '__main__':
    client = FactorialActionClient()

    # 循环读取终端输入并上传动作服务器执行。
    while input_num := input("Input a num: "):
        if not input_num.isdigit():
            continue
        input_num = int(input_num)
        result = client.call(input_num)
        print(f"Result is: {result}")

服务端factorial_action_server.py代码

代码语言:javascript
复制
#!/usr/bin/env python3

import rospy

import actionlib
from factorial_calculator.msg import FactorialActionAction,FactorialActionGoal,FactorialActionResult,FactorialActionFeedback

class FactorialActionServer():
    def __init__(self):
        rospy.init_node('action_server_factorial')
        self.server = actionlib.SimpleActionServer('action/server/factorial/action', 
                                            FactorialActionAction,
                                            lambda x : self.factorise(x.num_goal),
                                            auto_start=False)

    def factorise(self,num):
        result = 1
        
        if num > 14:
            self.server.set_aborted(FactorialActionResult(result),'[aborted]')
            return
        
        for i in range(1,num+1):
            
            if self.server.is_preempt_requested():
                self.server.set_preempted(FactorialActionResult(result),'[preempted]')
                
            result *= i
            rospy.sleep(0.2)
            self.server.publish_feedback(FactorialActionFeedback(result))
            
        self.server.set_succeeded(FactorialActionResult(result))
        return 
    
    def run(self):
        self.server.start()
        rospy.spin()
        
if __name__ == '__main__':
    server = FactorialActionServer()
    server.run()

✨效果

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ⭐️案例目标
  • ⭐️准备工作
  • ⭐️服务部分
    • ✨定义文件
      • ✨使用服务
        • ✨效果
        • ⭐️动作部分
          • ✨定义文件
            • ✨使用服务
              • ✨效果
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档