Posted by @Narayan-007:
Hi, I am using ROS2 Humble Ubuntu 22.04 and trying to integrate my navigation-2 enabled physical robot, Originbot, with the RMF software. However, I am facing some difficulties in doing so. When I launch and run all the necessary files, everything seems to be working ok except for my fleet_adapter.py. How do I go about fixing it? I am getting 2 main errors. One of them is the following below:
Coordinate transformation error: 40041472014.35689
RMF to Robot transform:
rotation:0.1669748345397551
scale:22436.581919543834
trans:[-144220.278841231, 26674.508565925025]
Robot to RMF transform:
rotation:-0.16697483453975506
scale:6.903932766499679e-06
trans:[5.659233432489587, -5.09914031887319]
Another error is that the RMF system is not able to identify the physical robot, meaning it does not recognise whether the robot is there or not. I have launched and run the following codes (an adaptation of the code from the Robot Fleet Management in TheConstruct, specifically unit 6):
rmf_to_nav.py:
#! /usr/bin/env python3
from tf_transformations import quaternion_from_euler
from tf_transformations import euler_from_quaternion
from rclpy.qos import ReliabilityPolicy, QoSProfile, HistoryPolicy, DurabilityPolicy
from rmf_fleet_msgs.msg import PathRequest
from rmf_fleet_msgs.msg import Location
from sensor_msgs.msg import BatteryState
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped, Pose, Quaternion
from geometry_msgs.msg import PoseWithCovarianceStamped
from rclpy.executors import MultiThreadedExecutor
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rmf_fleet_msgs.msg import RobotState, RobotMode
import rmf_fleet_msgs
"""
NavigateToPose.action
# Error codes
# Note: The expected priority order of the errors should match the message order
uint16 NONE=0
# goal definition
geometry_msgs/PoseStamped pose
string behavior_tree
---
# result definition
std_msgs/Empty result
uint16 error_code
---
# feedback definition
geometry_msgs/PoseStamped current_pose
builtin_interfaces/Duration navigation_time
builtin_interfaces/Duration estimated_time_remaining
int16 number_of_recoveries
float32 distance_remaining
"""
class RmfToNav(Node):
def __init__(self, name="originbot_1", timer_period=0.1, timer_robot_state_pub_period=0.5):
super().__init__('nav_to_pose_action_client')
self.moving_to_waypoint = False
self.pos1 = True
self.robot_path_request_list = []
self.current_task_id = ""
self.current_location = Location()
self.current_level_name = "L1"
self.current_obey_approach_speed_limit = False
self.current_approach_speed_limit = 0.0
self.current_waypoint_location = None
self.current_battery_percent = 100.0
self.translation_x = 0.0
self.translation_y = 0.0
self._timer_period = timer_period
self._state_pub_period = timer_robot_state_pub_period
# Load parameters
self.declare_parameter('barista_name', "originbot_1")
self.declare_parameter('fleet_name', "originbot")
self.getting_params()
action_server_name = "/"+self.robot_name+"/navigate_to_pose"
self._action_client = ActionClient(
self, NavigateToPose, action_server_name)
self.group3 = MutuallyExclusiveCallbackGroup()
self.timer = self.create_timer(
self._timer_period,
self.timer_callback,
callback_group=self.group3)
self.init_subscribers()
self.init_publishers()
self.get_logger().info('Client ready...')
def getting_params(self):
self.robot_name = self.get_parameter(
'barista_name').get_parameter_value().string_value
self.get_logger().info("############### robot_name ==="+str(self.robot_name))
self.fleet_name = self.get_parameter(
'fleet_name').get_parameter_value().string_value
self.get_logger().info("############### fleet_name ==="+str(self.fleet_name))
self.get_logger().info("############### translation_x ==="+str(self.translation_x))
self.get_logger().info("############### translation_y ==="+str(self.translation_y))
def init_subscribers(self):
self.group1 = MutuallyExclusiveCallbackGroup()
self.group2 = MutuallyExclusiveCallbackGroup()
self.group_amcl = MutuallyExclusiveCallbackGroup()
# Init Subscribers
battery_state_topic = "/" + self.robot_name + "/" + 'battery_state'
self.create_subscription(
BatteryState,
battery_state_topic,
self.bat_state_listener_callback,
QoSProfile(depth=1, reliability=ReliabilityPolicy.BEST_EFFORT),
callback_group=self.group1)
robot_request_topic = "/robot_path_requests"
self.create_subscription(
PathRequest,
robot_request_topic,
self.robot_req_listener_callback,
QoSProfile(depth=1, reliability=ReliabilityPolicy.RELIABLE),
callback_group=self.group2)
amcl_pose_topic = "/" + self.robot_name + "/amcl_pose"
self.create_subscription(
PoseWithCovarianceStamped,
amcl_pose_topic,
self.amcl_pose_listener_callback,
QoSProfile(depth=1,
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
durability=DurabilityPolicy.TRANSIENT_LOCAL
),
callback_group=self.group_amcl)
self.get_logger().info("Create sub for amcl ="+str(amcl_pose_topic))
def amcl_pose_listener_callback(self, msg):
"""
### geometry_msgs/msg/PoseWithCovarianceStamped
# This expresses an estimated pose with a reference coordinate frame and timestamp
std_msgs/Header header
builtin_interfaces/Time stamp
int32 sec
uint32 nanosec
string frame_id
PoseWithCovariance pose
Pose pose
Point position
float64 x
float64 y
float64 z
Quaternion orientation
float64 x 0
float64 y 0
float64 z 0
float64 w 1
float64[36] covariance
"""
self.current_location = self.pose_to_location(msg.pose.pose)
self.get_logger().info("Updated Robot POSE ="+str(self.current_location))
def pose_to_location(self, in_pose):
"""
Pose pose
Point position
float64 x
float64 y
float64 z
Quaternion orientation
float64 x 0
float64 y 0
float64 z 0
float64 w 1
float64[36] covariance
"""
"""
location:
t:
sec: 40
nanosec: 576999999
x: 9.214224815368652
y: -9.836607933044434
yaw: 0.0066444906406104565
obey_approach_speed_limit: false
approach_speed_limit: 0.0
level_name: L1
index: 0
"""
out_location = Location()
try:
# TODO: COMMENTES TO GO STEP BY STEP
out_location.x = in_pose.position.x - self.translation_x
out_location.y = in_pose.position.y - self.translation_y
except Exception as ex:
self.get_logger().error('EXCEPTION='+str(ex))
q = (in_pose.orientation.x, in_pose.orientation.y,
in_pose.orientation.z, in_pose.orientation.w)
roll, pitch, yaw = euler_from_quaternion(q)
out_location.yaw = yaw
out_location.obey_approach_speed_limit = self.current_obey_approach_speed_limit
out_location.approach_speed_limit = self.current_approach_speed_limit
out_location.level_name = self.current_level_name
return out_location
def init_publishers(self):
self.robot_state_msg = RobotState()
self.robot_request_seq = 0
name_robot_state_topic = "/robot_state"
self.robot_state_msg_pub = self.create_publisher(
RobotState, name_robot_state_topic, 1)
self.group4 = MutuallyExclusiveCallbackGroup()
self.timer_state_pub = self.create_timer(
self._state_pub_period,
self.robot_state_pub_callback,
callback_group=self.group3)
def robot_state_pub_callback(self):
"""
Robot State def: https://github.com/open-rmf/rmf_internal_msgs/blob/main/rmf_fleet_msgs/msg/RobotState.msg
string name
string model
# task_id is copied in from the most recent Request message,
# such as ModeRequest, DestinationRequest, or PathRequest
string task_id
# The sequence number of this message. Every new message should increment the
# sequence number by 1.
uint64 seq
RobotMode mode
float32 battery_percent
Location location
Location[] path
# RobotMode.msg: https://github.com/open-rmf/rmf_internal_msgs/blob/main/rmf_fleet_msgs/msg/RobotMode.msg
uint32 mode
uint32 MODE_IDLE=0
uint32 MODE_CHARGING=1
uint32 MODE_MOVING=2
uint32 MODE_PAUSED=3
uint32 MODE_WAITING=4
uint32 MODE_EMERGENCY=5
uint32 MODE_GOING_HOME=6
uint32 MODE_DOCKING=7
# Use this when a command received from the fleet adapter
# has a problem and needs to be recomputed.
uint32 MODE_ADAPTER_ERROR=8
uint32 MODE_CLEANING=9
uint64 mode_request_id
# How to use enums?
rmf_fleet_msgs.msg.RobotMode.MODE_IDLE
rmf_fleet_msgs.msg.RobotMode.MODE_MOVING
"""
new_robot_state = RobotState()
# We get the latest data
new_robot_state.name = self.robot_name
new_robot_state.model = self.fleet_name
new_robot_state.task_id = self.current_task_id
new_robot_state.seq = self.robot_request_seq
self.robot_request_seq += 1
# Robot Mode, if moving or not for the moment
# TODO: Add charging mode when detecting charge
new_mode = RobotMode()
if self.moving_to_waypoint:
new_mode.mode = rmf_fleet_msgs.msg.RobotMode.MODE_MOVING
else:
new_mode.mode = rmf_fleet_msgs.msg.RobotMode.MODE_IDLE
new_robot_state.mode = new_mode
# Battery percent
new_robot_state.battery_percent = self.current_battery_percent
# Robot Current Pose, where the robot think it is
new_robot_state.location = self.current_location
# Current Waypoint that is being followed.
# Empty if robot is IDDLE
if self.current_waypoint_location:
new_robot_state.path.append(self.current_waypoint_location)
else:
new_robot_state.path = []
self.robot_state_msg_pub.publish(new_robot_state)
def robot_req_listener_callback(self, msg):
self.get_logger().info("Evaluating if Request if for us " +
str(self.fleet_name)+","+self.robot_name)
if msg.fleet_name == self.fleet_name and msg.robot_name == self.robot_name:
self.get_logger().info("I received New Robot Path Request: "+str(msg.path))
self.robot_path_request_list = msg.path
self.current_task_id = msg.task_id
self.get_logger().info("NEW WAYPOINTS TO FOLLOW RECIEVED!")
else:
self.get_logger().info("Robot Request not for this robot")
self.get_logger().info("msg.fleet_name="+str(msg.fleet_name) +
"//"+"self.fleet_name="+str(self.fleet_name))
self.get_logger().info("msg.robot_name="+str(msg.robot_name) +
"//"+"self.robot_name="+str(self.robot_name))
def bat_state_listener_callback(self, msg):
"""
We multiply by 100 because teh custom adapter reads in 100%
not in 1 percent
"""
self.current_battery_percent = msg.percentage * 100.0
self.get_logger().debug('I received as Battery State: "%s"' %
str(self.current_battery_percent))
def send_goal(self, new_pose):
self.get_logger().info('Send Goal Start...')
goal_msg = NavigateToPose.Goal()
goal_msg.pose = PoseStamped()
goal_msg.pose.header.stamp = self.get_clock().now().to_msg()
goal_msg.pose.header.frame_id = "map"
goal_msg.pose.pose = new_pose
self.get_logger().info('Waiting for server...')
self._action_client.wait_for_server()
self.get_logger().info('Waiting for server...DONE')
self.moving_to_waypoint = True
self._send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
self.get_logger().info('Send Goal Done...')
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
self.moving_to_waypoint = False
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result))
self.moving_to_waypoint = False
self.clean_locations()
def clean_locations(self):
self.current_waypoint_location = None
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
# self.get_logger().info(
# 'Received feedback: {0}'.format(feedback.current_pose.pose.position))
def timer_callback(self):
# And if we are currently processing one
if not self.moving_to_waypoint:
# We check if we have waypoints to be sent
if len(self.robot_path_request_list) > 0:
# We send the next waypoint, an dremove it from the list
new_pose = Pose()
self.current_waypoint_location = self.robot_path_request_list.pop(
0)
# Location ALWAYS is in RMF coordinates
# BUT TH EPOSE WE HAVE TO SEND HAS TO BE IN MAP NAV COORDINATES
try:
new_pose.position.x = self.current_waypoint_location.x + self.translation_x
new_pose.position.y = self.current_waypoint_location.y + self.translation_y
except Exception as ex:
self.get_logger().error('EXCEPTION2='+str(ex))
new_pose.orientation = self.yaw_to_quat_msg(
self.current_waypoint_location.yaw)
self.current_obey_approach_speed_limit = self.current_waypoint_location.obey_approach_speed_limit
self.current_approach_speed_limit = self.current_waypoint_location.approach_speed_limit
self.current_level_name = self.current_waypoint_location.level_name
# self.get_logger().info("SENDING WAYPOINT...")
self.send_goal(new_pose)
else:
pass
# self.get_logger().info("NO WAYPOINTS...")
else:
self.get_logger().info("WAITING FOR GOAL TO BE REACHED....")
def yaw_to_quat_msg(self, yaw):
quaternion_v = quaternion_from_euler(0, 0, yaw)
q_msg = Quaternion()
q_msg.x = quaternion_v[0]
q_msg.y = quaternion_v[1]
q_msg.z = quaternion_v[2]
q_msg.w = quaternion_v[3]
return q_msg
def main(args=None):
rclpy.init(args=args)
try:
action_client = RmfToNav()
num_threads = 6
executor = MultiThreadedExecutor(num_threads=num_threads)
executor.add_node(action_client)
try:
executor.spin()
finally:
executor.shutdown()
action_client.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
robot_api_server_originbot.py:
#! /usr/bin/env python3
import sys
import math
import threading
import rclpy
import uvicorn
from fastapi import FastAPI
from rclpy.node import Node
from rclpy.qos import qos_profile_system_default
from rclpy.qos import ReliabilityPolicy, QoSProfile
from rmf_fleet_msgs.msg import RobotState
from rmf_fleet_msgs.msg import PathRequest
from rmf_fleet_msgs.msg import Location
from std_msgs.msg import String
from pydantic import BaseModel
from typing import Optional
from rmf_fleet_msgs.msg import RobotMode
app = FastAPI(title="ROS2 Robot Rest API",
description="This is the API for a ROS2 Fleet of robots using RMF",
version="1.0.0",)
# CLASSES FOR THE API RETURN AND REQUEST
class Response(BaseModel):
data: Optional[dict] = None
success: bool
msg: str
class Request(BaseModel):
map_name: str
task: Optional[str] = None
destination: Optional[dict] = None
data: Optional[dict] = None
speed_limit: Optional[float] = None
#######
###
# MAIN CLASS
class SimpleRobotApiServer(Node):
def __init__(self, fleet_name, robot_in_fleet_list, linear_vel, angular_vel, timer_period=0.1):
print("Init Start Complete Robot Api server for "+str(fleet_name))
self._fleet_name = fleet_name
self._robot_in_fleet_list = robot_in_fleet_list
self._linear_vel = linear_vel
self._angular_vel = angular_vel
self._timer_update_period = timer_period
super().__init__(self._fleet_name+'_simple_robot_api_server_node')
# INIT Section
self.init_vars()
self.init_subscribers()
self.init_publishers()
################################
# APP REST API METHOD DEFINITION
################################
# Check Connection
@app.get('/'+self._fleet_name+'/check_connection/',
response_model=Response)
async def check_connection():
"""
Return True if connection to the robot API server is successful:
"""
message = "Connection to fleet "+self._fleet_name+" OK"
data = {'success': True, 'msg': message}
return data
@app.get('/'+self._fleet_name+'/status/',
response_model=Response)
async def status(robot_name: Optional[str] = None):
data = {'data': {},
'success': False,
'msg': ''}
state = self.robots_fleet.get(robot_name)
if state is None:
return data
else:
data['data'] = self.get_robot_state(state, robot_name)
data['success'] = True
return data
@app.post('/'+self._fleet_name+'/navigate/',
response_model=Response)
async def navigate(robot_name: str, dest: Request):
data = {'success': False, 'msg': ''}
if robot_name not in self.robots_fleet:
data = {'success': False, 'msg': "Robot " +
str(robot_name)+" not in fleet "+str(self._fleet_name)}
return data
elif len(dest.destination) < 1:
data = {'success': False, 'msg': 'Destination empty=' +
str(dest.destination)+"="}
return data
else:
target_x = dest.destination['x']
target_y = dest.destination['y']
target_yaw = dest.destination['yaw']
result_send_path_request = self.send_path_request(
target_x, target_y, target_yaw, robot_name)
data['success'] = result_send_path_request
data['msg'] = "Teleport Nav Goal sent"
return data
@app.post('/'+self._fleet_name+'/start_task/',
response_model=Response)
async def start_process(robot_name: str, task: Request):
data = {'success': False, 'msg': ''}
if robot_name not in self.robots_fleet:
data['msg'] = "Robot " + \
str(robot_name)+" not in fleet "+str(self._fleet_name)
return data
elif len(task.task) < 1:
data['msg'] = 'Task empty=' + str(task.task)+"="
return data
else:
process_name = task.task
if "pick" in process_name:
self.get_logger().info('Starting pick task..')
self.publish_task_done(
object_picked_name="cube_rubish", robot_name=robot_name)
self.get_logger().info('Starting pick task..DONE')
else:
self.get_logger().info('Process not supported by RobotHWI ='+str(process_name))
self.get_logger().info('Process Done...')
data['success'] = True
return data
@app.get('/'+self._fleet_name+'/stop_robot/',
response_model=Response)
async def stop(robot_name: str):
data = {'success': False, 'msg': ''}
if robot_name not in self.robots_fleet:
data['msg'] = "Robot " + \
str(robot_name)+" not in fleet "+str(self._fleet_name)
return data
self.send_stop_request(robot_name)
data['success'] = True
return data
#################################
self.get_logger().info('Init DONE')
def init_vars(self):
"""
We initialise all the variables that we will use to store the state
and commands issued
"""
# We create the var robots_fleet, to store their current state
none_list = len(self._robot_in_fleet_list) * [None]
self.robots_fleet = dict(zip(self._robot_in_fleet_list, none_list))
# Init Fleet Vehicles Traits
self.vehicle_traits_linear_nominal_velocity = self._linear_vel
self.vehicle_traits_rotational_nominal_velocity = self._angular_vel
# We init the dictionary that states if robot has a pending task
# Example: {"box_bot_1":None,"box_bot_2":32}, box_bot_1 has nothing pending
# box_bot_2 has 32 task pending.
none_list = len(self._robot_in_fleet_list) * [None]
self.robot_pending_tasks = dict(
zip(self._robot_in_fleet_list, none_list))
# We need a list of each of the task id's given to the robots
# The reason is taht we can't repeat task ids in each robot.
task_id_list = len(self._robot_in_fleet_list) * [0]
self.robot_next_task_id = dict(
zip(self._robot_in_fleet_list, task_id_list))
# We need to initialise also the data from estimated duration and completed task
dest_duration_list = len(self._robot_in_fleet_list) * [0.0]
self.robot_dest_duration = dict(
zip(self._robot_in_fleet_list, dest_duration_list))
def init_subscribers(self):
# Init Subscribers
robot_state_topic = "/robot_state"
self.create_subscription(
RobotState,
robot_state_topic,
self.robot_state_callback,
QoSProfile(depth=10, reliability=ReliabilityPolicy.RELIABLE))
self.get_logger().info("init_subscribers DONE")
def robot_state_callback(self, msg):
if msg.name in self.robots_fleet:
# We save the state of the robot
self.robots_fleet[msg.name] = msg
# self.get_logger().info(str(msg.name)+"==robot_state_callback, update pos=" +
# str(self.robots_fleet[msg.name].location)+", bat="+str(self.robots_fleet[msg.name].battery_percent))
self.update_pending_tasks(msg.name, msg.mode.mode)
else:
self.get_logger().error(str(msg.name)+"==robot_state_callback, Robot Not in fleet==" +
str(msg.name)+","+str(self.robots_fleet))
def update_pending_tasks(self, robot_name, robot_mode):
pending_task = self.robot_pending_tasks[robot_name]
self.get_logger().info(str(robot_name)+",pending=>"+str(pending_task) +
"=== robot_mode=>"+str(robot_mode)+"<==")
# We check if the robot is Iddle, which means that it has finished that task
if robot_mode == RobotMode.MODE_IDLE:
# We remove that task form the list of that robot
self.robot_pending_tasks[robot_name] = None
# We set duration to 0.0
self.robot_dest_duration[robot_name] = 0.0
else:
self.get_logger().info(str(robot_name)+",MOVING STILL")
def robot_completed_request(self, robot_name):
return self.robot_pending_tasks[robot_name] is None
def init_publishers(self):
self.robot_request_seq = 0
task_done_topic = "/task_done"
self.task_done_pub = self.create_publisher(
String, task_done_topic, 1)
self.path_pub = self.create_publisher(
PathRequest,
'/robot_path_requests',
qos_profile=qos_profile_system_default)
def get_robot_state(self, state, robot_name):
"""
We format from the following message: RobotState
"""
data = {}
position = [state.location.x, state.location.y]
angle = state.location.yaw
data['robot_name'] = robot_name
data['map_name'] = state.location.level_name
data['position'] =\
{'x': position[0], 'y': position[1], 'yaw': angle}
data['battery'] = state.battery_percent
# This is hardcoded because its teleporting
data['destination_arrival_duration'] = self.robot_dest_duration[robot_name]
data['completed_nav_request'] = self.robot_completed_request(
robot_name)
# TODO: We supose all the tasks are finished inmediatelly when executed
data['completed_task_request'] = True
return data
def publish_task_done(self, object_picked_name, robot_name):
task_done_msg = String()
task_done_msg.data = str(robot_name) + \
">>Task Done: "+str(object_picked_name)
self.task_done_pub.publish(task_done_msg)
def send_path_request(self, in_x, in_y, in_yaw, robot_name, map_name="L1", speed_limit=100.0):
#######
target_x = in_x
target_y = in_y
target_yaw = in_yaw
target_map = map_name
target_speed_limit = speed_limit
t = self.get_clock().now().to_msg()
path_request = PathRequest()
robot_state = self.robots_fleet[robot_name]
cur_x = robot_state.location.x
cur_y = robot_state.location.y
cur_yaw = robot_state.location.yaw
cur_loc = robot_state.location
path_request.path.append(cur_loc)
# We calculate the estimated duration of this path
# Based on current_location and destination_location
disp = self.disp([target_x, target_y], [cur_x, cur_y])
duration = int(disp/self.vehicle_traits_linear_nominal_velocity) +\
int(abs(abs(cur_yaw) - abs(target_yaw)) /
self.vehicle_traits_rotational_nominal_velocity)
# We store this duration in dict
self.robot_dest_duration[robot_name] = duration
t.sec = t.sec + duration
target_loc = Location()
target_loc.t = t
target_loc.x = target_x
target_loc.y = target_y
target_loc.yaw = target_yaw
target_loc.level_name = target_map
if target_speed_limit > 0:
target_loc.obey_approach_speed_limit = True
target_loc.approach_speed_limit = target_speed_limit
path_request.fleet_name = self._fleet_name
path_request.robot_name = robot_name
path_request.path.append(target_loc)
# We set a New CMD_id
cmd_id = self.generate_task_robot(robot_name)
path_request.task_id = str(cmd_id)
self.path_pub.publish(path_request)
self.get_logger().info(
f'Sending path request for {robot_name}: {cmd_id}')
return True
def send_stop_request(self, robot_name):
robot_state = self.robots_fleet[robot_name]
path_request = PathRequest()
path_request.fleet_name = self._fleet_name
path_request.robot_name = robot_name
path_request.path = []
# Appending the current location twice will effectively tell the
# robot to stop
path_request.path.append(robot_state.location)
path_request.path.append(robot_state.location)
# We set the cmd_id
cmd_id = self.generate_task_robot(robot_name)
path_request.task_id = str(cmd_id)
self.path_pub.publish(path_request)
self.get_logger().info(
f'Sending stop request for {robot_name}: {cmd_id}')
return True
def generate_task_robot(self, robot_name):
"""
We generate the task id
We assign it to the pensidng task id of the robot
We Generate the next one
"""
next_id = self.robot_next_task_id[robot_name]
self.robot_pending_tasks[robot_name] = next_id
self.robot_next_task_id[robot_name] += 1
return self.robot_pending_tasks[robot_name]
def disp(self, A, B):
"""
Disparity Between two 2D points
"""
return math.sqrt((A[0]-B[0])**2 + (A[1]-B[1])**2)
def main(argv=None):
# Init rclpy and adapter
rclpy.init(args=argv)
fleet_name = "originbot"
robots_in_fleet_list = ["originbot_1"]
linear_vel = 0.5
angular_vel = 0.6
robot_api_server_obj = SimpleRobotApiServer(
fleet_name, robots_in_fleet_list, linear_vel, angular_vel)
spin_thread = threading.Thread(
target=rclpy.spin, args=(robot_api_server_obj,))
spin_thread.start()
uvicorn.run(app,
host='127.0.0.1',
port='8000',
log_level='warning')
if __name__ == '__main__':
main(sys.argv)
RobotClientAPI.py:
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
'''
The RobotAPI class is a wrapper for API calls to the robot. Here users
are expected to fill up the implementations of functions which will be used
by the RobotCommandHandle. For example, if your robot has a REST API, you
will need to make http request calls to the appropriate endpoints within
these functions.
'''
import requests
from urllib.error import HTTPError
class RobotAPI:
# The constructor below accepts parameters typically required to submit
# http requests. Users should modify the constructor as per the
# requirements of their robot's API
def __init__(self, prefix: str, user: str, password: str):
self.prefix = prefix
self.user = user
self.password = password
self.timeout = 5.0
self.debug = False
def check_connection(self):
''' Return True if connection to the robot API server is successful'''
if self.data() is None:
return False
return True
def position(self, robot_name: str):
''' Return [x, y, theta] expressed in the robot's coordinate frame or
None if any errors are encountered'''
response = self.data(robot_name)
if response is not None:
if self.debug:
print(f'Response: {response}')
if not response['success']:
print(f'Response for {robot_name} was not successful')
return None
position = response['data']['position']
x = position['x']
y = position['y']
angle = position['yaw']
return [x, y, angle]
print(f'No response received for {robot_name}')
return None
def navigate(self,
robot_name: str,
cmd_id: int,
pose,
map_name: str,
speed_limit=0.0):
''' Request the robot to navigate to pose:[x,y,theta] where x, y and
and theta are in the robot's coordinate convention. This function
should return True if the robot has accepted the request,
else False'''
assert(len(pose) > 2)
url = self.prefix +\
f'/open-rmf/rmf_demos_fm/navigate/?robot_name={robot_name}' \
f'&cmd_id={cmd_id}'
data = {} # data fields: task, map_name, destination{}, data{}
data['map_name'] = map_name
data['destination'] = {'x': pose[0], 'y': pose[1], 'yaw': pose[2]}
data['speed_limit'] = speed_limit
try:
response = requests.post(url, timeout=self.timeout, json=data)
response.raise_for_status()
if self.debug:
print(f'Response: {response.json()}')
return response.json()['success']
except HTTPError as http_err:
print(f'HTTP error: {http_err}')
except Exception as err:
print(f'Other error: {err}')
return False
def start_process(self,
robot_name: str,
cmd_id: int,
process: str,
map_name: str):
''' Request the robot to begin a process. This is specific to the robot
and the use case. For example, load/unload a cart for Deliverybot
or begin cleaning a zone for a cleaning robot.
Return True if the robot has accepted the request, else False'''
url = self.prefix +\
f"/open-rmf/rmf_demos_fm/start_task?robot_name={robot_name}" \
f"&cmd_id={cmd_id}"
# data fields: task, map_name, destination{}, data{}
data = {'task': process, 'map_name': map_name}
try:
response = requests.post(url, timeout=self.timeout, json=data)
response.raise_for_status()
if self.debug:
print(f'Response: {response.json()}')
return response.json()['success']
except HTTPError as http_err:
print(f'HTTP error: {http_err}')
except Exception as err:
print(f'Other error: {err}')
return False
def stop(self, robot_name: str, cmd_id: int):
''' Command the robot to stop.
Return True if robot has successfully stopped. Else False'''
url = self.prefix +\
f'/open-rmf/rmf_demos_fm/stop_robot?robot_name={robot_name}' \
f'&cmd_id={cmd_id}'
try:
response = requests.get(url, self.timeout)
response.raise_for_status()
if self.debug:
print(f'Response: {response.json()}')
return response.json()['success']
except HTTPError as http_err:
print(f'HTTP error: {http_err}')
except Exception as err:
print(f'Other error: {err}')
return False
def navigation_remaining_duration(self, robot_name: str, cmd_id: int):
''' Return the number of seconds remaining for the robot to reach its
destination'''
response = self.data(robot_name)
if response is None:
return None
if response is not None:
arrival = response['data'].get('destination_arrival')
if arrival is not None:
if arrival.get('cmd_id') != cmd_id:
return None
return arrival.get('duration')
else:
return None
else:
return None
def navigation_completed(self, robot_name: str, cmd_id: int):
''' Return True if the last request the robot successfully completed
matches cmd_id. Else False.'''
response = self.data(robot_name)
if response is not None:
data = response.get('data')
if data is not None:
return data['last_completed_request'] == cmd_id
return False
def process_completed(self, robot_name: str, cmd_id: int):
''' Return True if the robot has successfully completed its previous
process request. Else False.'''
return self.navigation_completed(robot_name, cmd_id)
def battery_soc(self, robot_name: str):
''' Return the state of charge of the robot as a value between 0.0
and 1.0. Else return None if any errors are encountered'''
response = self.data(robot_name)
if response is not None:
return response['data']['battery']/100.0
else:
return None
def requires_replan(self, robot_name: str):
'''Return whether the robot needs RMF to replan'''
response = self.data(robot_name)
if response is not None:
return response['data'].get('replan', False)
return False
def toggle_action(self, robot_name: str, toggle: bool):
'''Request to toggle the robot's mode_teleop parameter.
Return True if the toggle request is successful'''
url = self.prefix +\
f"/open-rmf/rmf_demos_fm/toggle_action?robot_name={robot_name}"
data = {'toggle': toggle}
try:
response = requests.post(url, timeout=self.timeout, json=data)
response.raise_for_status()
if self.debug:
print(f'Response: {response.json()}')
return response.json()['success']
except HTTPError as http_err:
print(f'HTTP error: {http_err}')
except Exception as err:
print(f'Other error: {err}')
return False
def data(self, robot_name=None):
if robot_name is None:
url = self.prefix + f'/open-rmf/rmf_demos_fm/status/'
else:
url = self.prefix +\
f'/docs#/default/{robot_name}/status'
try:
response = requests.get(url, timeout=self.timeout)
response.raise_for_status()
if self.debug:
print(f'Response: {response.json()}')
return response.json()
except HTTPError as http_err:
print(f'HTTP error: {http_err}')
except Exception as err:
print(f'Other error: {err}')
return None
RobotCommandHandle.py:
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
import rclpy.node
from rclpy.duration import Duration
from rclpy.qos import QoSProfile
from rclpy.qos import QoSHistoryPolicy as History
from rclpy.qos import QoSDurabilityPolicy as Durability
from rclpy.qos import QoSReliabilityPolicy as Reliability
from rclpy.qos import qos_profile_system_default
import rmf_adapter as adpt
import rmf_adapter.plan as plan
import rmf_adapter.schedule as schedule
from rmf_fleet_msgs.msg import DockSummary, ModeRequest
import numpy as np
import threading
import math
import enum
import time
from datetime import timedelta
# States for RobotCommandHandle's state machine used when guiding robot along
# a new path
class RobotState(enum.IntEnum):
IDLE = 0
MOVING = 1
# Custom wrapper for Plan::Waypoint. We use this to modify position of
# waypoints to prevent backtracking
class PlanWaypoint:
def __init__(self, index, wp: plan.Waypoint):
# the index of the Plan::Waypoint in the waypoints in follow_new_path
self.index = index
self.position = wp.position
self.time = wp.time
self.graph_index = wp.graph_index
self.approach_lanes = wp.approach_lanes
class RobotCommandHandle(adpt.RobotCommandHandle):
def __init__(self,
name,
fleet_name,
config,
node,
graph,
vehicle_traits,
map_name,
start,
position,
charger_waypoint,
update_frequency,
lane_merge_distance,
adapter,
api):
adpt.RobotCommandHandle.__init__(self)
self.debug = False
self.name = name
self.fleet_name = 'fishbot'
self.config = config
self.node = node
self.graph = graph
self.vehicle_traits = vehicle_traits
self.map_name = map_name
# Get the index of the charger waypoint
waypoint = self.graph.find_waypoint(charger_waypoint)
assert waypoint, f"Charger waypoint {charger_waypoint} \
does not exist in the navigation graph"
self.charger_waypoint_index = waypoint.index
self.update_frequency = update_frequency
self.lane_merge_distance = lane_merge_distance
self.update_handle = None # RobotUpdateHandle
self.battery_soc = 1.0
self.api = api
self.position = position # (x,y,theta) in RMF crs (meters,radians)
self.initialized = False
self.state = RobotState.IDLE
self.dock_name = ""
# TODO(YV): Remove self.adapter. This is only being used for time point
# comparison with Plan::Waypoint::time
self.adapter = adapter
self.action_execution = None
self.requested_waypoints = [] # RMF Plan waypoints
self.remaining_waypoints = []
self.docks = {}
# RMF location trackers
self.last_known_lane_index = None
self.last_known_waypoint_index = None
self.last_replan_time = None
# if robot is waiting at a waypoint. This is a Graph::Waypoint index
self.on_waypoint = None
# if robot is travelling on a lane. This is a Graph::Lane index
self.on_lane = None
self.target_waypoint = None # this is a Plan::Waypoint
# The graph index of the waypoint the robot is currently docking into
self.dock_waypoint_index = None
# The graph index of the waypoint the robot starts or ends an action
self.action_waypoint_index = None
self.current_cmd_id = 0
self.started_action = False
# Threading variables
self._lock = threading.Lock()
self._follow_path_thread = None
self._quit_path_event = threading.Event()
self._dock_thread = None
self._quit_dock_event = threading.Event()
self._stopping_thread = None
self._quit_stopping_event = threading.Event()
self.node.get_logger().info(
f"The robot is starting at: [{self.position[0]:.2f}, "
f"{self.position[1]:.2f}, {self.position[2]:.2f}]")
# Update tracking variables
if start.lane is not None: # If the robot is on a lane
self.last_known_lane_index = start.lane
self.on_lane = start.lane
self.last_known_waypoint_index = start.waypoint
else: # Otherwise, the robot is on a waypoint
self.last_known_waypoint_index = start.waypoint
self.on_waypoint = start.waypoint
transient_qos = QoSProfile(
history=History.KEEP_LAST,
depth=1,
reliability=Reliability.RELIABLE,
durability=Durability.TRANSIENT_LOCAL)
self.node.create_subscription(
DockSummary,
'dock_summary',
self.dock_summary_cb,
qos_profile=transient_qos)
self.node.create_subscription(
ModeRequest,
'action_execution_notice',
self.mode_request_cb,
qos_profile=qos_profile_system_default)
self.update_thread = threading.Thread(target=self.update)
self.update_thread.start()
self.initialized = True
def next_cmd_id(self):
self.current_cmd_id = self.current_cmd_id + 1
if self.debug:
print(f'Issuing cmd_id for {self.name}: {self.current_cmd_id}')
return self.current_cmd_id
def sleep_for(self, seconds):
goal_time =\
self.node.get_clock().now() + Duration(nanoseconds=1e9*seconds)
while (self.node.get_clock().now() <= goal_time):
time.sleep(0.001)
def wait_on(self, event: threading.Event, seconds):
goal_time = (
self.node.get_clock().now() + Duration(nanoseconds=1e9*seconds)
)
while self.node.get_clock().now() <= goal_time:
if event.wait(0.001):
return True
return False
def clear(self):
self.requested_waypoints = []
self.remaining_waypoints = []
self.state = RobotState.IDLE
def interrupt(self):
if self.debug:
print(
f'Interrupting {self.name} '
f'(latest cmd_id is {self.current_cmd_id})'
)
self._quit_dock_event.set()
self._quit_path_event.set()
self._quit_stopping_event.set()
if self._follow_path_thread is not None:
if self._follow_path_thread.is_alive():
self._follow_path_thread.join()
if self._dock_thread is not None:
if self._dock_thread.is_alive():
self._dock_thread.join()
if self._stopping_thread is not None:
if self._stopping_thread.is_alive():
self._stopping_thread.join()
def stop(self):
if self.debug:
plan_id = self.update_handle.unstable_current_plan_id()
print(f'stop for {self.name} with PlanId {plan_id}')
self.interrupt()
# Stop the robot. Tracking variables should remain unchanged.
with self._lock:
self._quit_stopping_event.clear()
def _stop():
while not self._quit_stopping_event.is_set():
self.node.get_logger().info(
f"Requesting {self.name} to stop..."
)
if self.api.stop(self.name, self.next_cmd_id()):
break
self._quit_stopping_event.wait(0.1)
self._stopping_thread = threading.Thread(target=_stop)
self._stopping_thread.start()
def replan(self):
if self.update_handle is not None:
now = self.adapter.now()
if self.last_replan_time is not None:
# TODO(MXG): Make the 15s replan cooldown configurable
if now - self.last_replan_time < timedelta(seconds=15.0):
return
self.last_replan_time = now
self.update_handle.replan()
self.node.get_logger().info(
f'Requesting replan for {self.name} because of an obstacle'
)
def follow_new_path(
self,
waypoints,
next_arrival_estimator,
path_finished_callback):
if self.debug:
plan_id = self.update_handle.unstable_current_plan_id()
print(f'follow_new_path for {self.name} with PlanId {plan_id}')
self.interrupt()
with self._lock:
self._follow_path_thread = None
self._quit_path_event.clear()
self.clear()
self.node.get_logger().info(f"Received new path for {self.name}")
self.remaining_waypoints = self.filter_waypoints(waypoints)
assert next_arrival_estimator is not None
assert path_finished_callback is not None
def _follow_path():
target_pose = None
path_index = 0
while self.remaining_waypoints \
or self.state == RobotState.MOVING:
# Save the current_cmd_id before checking if we need to
# abort. We should always be told to abort before the
# current_cmd_id gets modified, so whatever the value of
# current_cmd_id is before being told to abort will be the
# value that we want. If we are saving the wrong value
# here, then the next thing we will be told to do is abort.
cmd_id = self.current_cmd_id
# Check if we need to abort
if self._quit_path_event.is_set():
self.node.get_logger().info(
f"[{self.name}] aborting path request"
)
return
# State machine
if self.state == RobotState.IDLE or target_pose is None:
# Assign the next waypoint
self.target_waypoint = self.remaining_waypoints[0]
path_index = self.remaining_waypoints[0].index
# Move robot to next waypoint
target_pose = self.target_waypoint.position
[x, y] = target_pose[:2]
theta = target_pose[2]
speed_limit = \
self.get_speed_limit(self.target_waypoint)
response = self.api.navigate(
self.name,
self.next_cmd_id(),
[x, y, theta],
self.map_name,
speed_limit
)
if response:
self.remaining_waypoints = \
self.remaining_waypoints[1:]
self.state = RobotState.MOVING
else:
self.node.get_logger().info(
f"Robot {self.name} failed to request "
f"navigation to "
f"[{x:.0f}, {y:.0f}, {theta:.0f}]."
f"Retrying...")
self._quit_path_event.wait(0.1)
elif self.state == RobotState.MOVING:
if self.api.requires_replan(self.name):
self.replan()
if self._quit_path_event.wait(0.1):
return
# Check if we have reached the target
with self._lock:
if self.api.navigation_completed(
self.name, cmd_id):
self.node.get_logger().info(
f"Robot [{self.name}] has reached the "
f"destination for cmd_id {cmd_id}"
)
self.state = RobotState.IDLE
graph_index = self.target_waypoint.graph_index
if graph_index is not None:
self.on_waypoint = graph_index
self.last_known_waypoint_index = \
graph_index
else:
self.on_waypoint = None # still on a lane
else:
# Update the lane the robot is on
lane = self.get_current_lane()
if lane is not None:
self.on_waypoint = None
self.on_lane = lane
else:
# The robot may either be on the previous
# waypoint or the target one
if self.target_waypoint.graph_index is \
not None \
and self.dist(
self.position, target_pose) < 0.5:
self.on_waypoint =\
self.target_waypoint.graph_index
elif self.last_known_waypoint_index is \
not None and self.dist(
self.position,
self.graph.get_waypoint(
self.last_known_waypoint_index
).location) < 0.5:
self.on_waypoint =\
self.last_known_waypoint_index
else:
# update_off_grid()
self.on_lane = None
self.on_waypoint = None
duration = self.api.navigation_remaining_duration(
self.name, cmd_id
)
if path_index is not None and duration is not None:
next_arrival_estimator(
path_index,
timedelta(seconds=duration)
)
if (not self.remaining_waypoints) \
and self.state == RobotState.IDLE:
path_finished_callback()
self.node.get_logger().info(
f"Robot {self.name} has successfully navigated along "
f"requested path."
)
self._follow_path_thread = threading.Thread(
target=_follow_path)
self._follow_path_thread.start()
def dock(
self,
dock_name,
docking_finished_callback):
''' Docking is very specific to each application. Hence, the user will
need to customize this function accordingly. In this example, we
assume the dock_name is the same as the name of the waypoints that
the robot is trying to dock into. We then call api.start_process()
to initiate the robot specific process. This could be to start a
cleaning process or load/unload a cart for delivery.
'''
self._quit_dock_event.clear()
if self._dock_thread is not None:
self._dock_thread.join()
self.dock_name = dock_name
assert docking_finished_callback is not None
self.docking_finished_callback = docking_finished_callback
# Get the waypoint that the robot is trying to dock into
dock_waypoint = self.graph.find_waypoint(self.dock_name)
assert(dock_waypoint)
self.dock_waypoint_index = dock_waypoint.index
def _dock():
# Request the robot to start the relevant process
self.node.get_logger().info(
f"Requesting robot {self.name} to dock at {self.dock_name}")
self.api.start_process(self.name, self.dock_name, self.map_name)
with self._lock:
self.on_waypoint = None
self.on_lane = None
self.sleep_for(0.1)
while (not self.api.process_completed(self.name)):
# Check if we need to abort
if self._quit_dock_event.is_set():
self.node.get_logger().info("Aborting docking")
return
self.node.get_logger().info("Robot is processing Task...")
self.sleep_for(0.1)
with self._lock:
self.on_waypoint = self.dock_waypoint_index
self.dock_waypoint_index = None
self.docking_finished_callback()
self.node.get_logger().info(">>>>>>> Task completed")
self._dock_thread = threading.Thread(target=_dock)
self._dock_thread.start()
def get_position(self):
''' This helper function returns the live position of the robot in the
RMF coordinate frame'''
position = self.api.position(self.name)
if position is not None:
x, y = [position[0], position[1]]
theta = position[2]
# Wrap theta between [-pi, pi]. Else arrival estimate will
# assume robot has to do full rotations and delay the schedule
if theta > np.pi:
theta = theta - (2 * np.pi)
if theta < -np.pi:
theta = (2 * np.pi) + theta
return [x, y, theta]
else:
self.node.get_logger().error(
"Unable to retrieve position from robot.")
return self.position
def get_battery_soc(self):
battery_soc = self.api.battery_soc(self.name)
if battery_soc is not None:
return battery_soc
else:
self.node.get_logger().error(
"Unable to retrieve battery data from robot.")
return self.battery_soc
def update(self):
while rclpy.ok():
self.position = self.get_position()
self.battery_soc = self.get_battery_soc()
if self.update_handle is not None:
self.update_state()
sleep_duration = float(1.0/self.update_frequency)
self.sleep_for(sleep_duration)
def update_state(self):
self.update_handle.update_battery_soc(self.battery_soc)
# Update position
with self._lock:
if (self.on_waypoint is not None): # if robot is on a waypoint
self.update_handle.update_current_waypoint(
self.on_waypoint, self.position[2])
elif (self.on_lane is not None): # if robot is on a lane
# We only keep track of the forward lane of the robot.
# However, when calling this update it is recommended to also
# pass in the reverse lane so that the planner does not assume
# the robot can only head forwards. This would be helpful when
# the robot is still rotating on a waypoint.
forward_lane = self.graph.get_lane(self.on_lane)
entry_index = forward_lane.entry.waypoint_index
exit_index = forward_lane.exit.waypoint_index
reverse_lane = self.graph.lane_from(exit_index, entry_index)
lane_indices = [self.on_lane]
if reverse_lane is not None: # Unidirectional graph
lane_indices.append(reverse_lane.index)
self.update_handle.update_current_lanes(
self.position, lane_indices)
elif (self.dock_waypoint_index is not None):
self.update_handle.update_off_grid_position(
self.position, self.dock_waypoint_index)
# if robot is performing an action
elif (self.action_execution is not None):
if not self.started_action:
self.started_action = True
self.api.toggle_action(self.name, self.started_action)
self.update_handle.update_off_grid_position(
self.position, self.action_waypoint_index)
# if robot is merging into a waypoint
elif (self.target_waypoint is not None and
self.target_waypoint.graph_index is not None):
self.update_handle.update_off_grid_position(
self.position, self.target_waypoint.graph_index)
else: # if robot is lost
self.update_handle.update_lost_position(
self.map_name,
self.position,
max_merge_lane_distance=self.lane_merge_distance
)
def get_current_lane(self):
def projection(current_position,
target_position,
lane_entry,
lane_exit):
px, py, _ = current_position
p = np.array([px, py])
t = np.array(target_position)
entry = np.array(lane_entry)
exit = np.array(lane_exit)
return np.dot(p - t, exit - entry)
if self.target_waypoint is None:
return None
approach_lanes = self.target_waypoint.approach_lanes
# Spin on the spot
if approach_lanes is None or len(approach_lanes) == 0:
return None
# Determine which lane the robot is currently on
for lane_index in approach_lanes:
lane = self.graph.get_lane(lane_index)
p0 = self.graph.get_waypoint(lane.entry.waypoint_index).location
p1 = self.graph.get_waypoint(lane.exit.waypoint_index).location
p = self.position
before_lane = projection(p, p0, p0, p1) < 0.0
after_lane = projection(p, p1, p0, p1) >= 0.0
if not before_lane and not after_lane: # The robot is on this lane
return lane_index
return None
def dist(self, A, B):
''' Euclidian distance between A(x,y) and B(x,y)'''
assert(len(A) > 1)
assert(len(B) > 1)
return math.sqrt((A[0] - B[0])**2 + (A[1] - B[1])**2)
def get_speed_limit(self, target_waypoint):
approach_lane_limit = np.inf
approach_lanes = target_waypoint.approach_lanes
for lane_index in approach_lanes:
lane = self.graph.get_lane(lane_index)
lane_limit = lane.properties.speed_limit
if lane_limit is not None:
if lane_limit < approach_lane_limit:
approach_lane_limit = lane_limit
return approach_lane_limit if approach_lane_limit != np.inf else 0.0
def filter_waypoints(self, wps: list):
''' Return filtered PlanWaypoints'''
assert(len(wps) > 0)
p = np.array([self.position[0], self.position[1]])
waypoints = []
for i in range(len(wps)):
waypoints.append(PlanWaypoint(i, wps[i]))
# If the robot is already in the middle of two waypoints, then we can
# truncate all the waypoints that come before it.
begin_at_index = 0
for i in reversed(range(len(waypoints)-1)):
i0 = i
i1 = i+1
p0 = waypoints[i0].position
p0 = np.array([p0[0], p0[1]])
p1 = waypoints[i1].position
p1 = np.array([p1[0], p1[1]])
dp_lane = p1 - p0
lane_length = np.linalg.norm(dp_lane)
if lane_length < 1e-3:
continue
n_lane = dp_lane/lane_length
p_l = p - p0
p_l_proj = np.dot(p_l, n_lane)
if lane_length < p_l_proj:
# Check if the robot's position is close enough to the lane
# endpoint to merge it
if np.linalg.norm(p - p1) <= self.lane_merge_distance:
begin_at_index = i1
break
# Otherwise, continue to the next lane because the robot is not
# between the lane endpoints
continue
if p_l_proj < 0.0:
# Check if the robot's position is close enough to the lane
# start point to merge it
if np.linalg.norm(p - p0) <= self.lane_merge_distance:
begin_at_index = i0
break
# Otherwise, continue to the next lane because the robot is not
# between the lane endpoints
continue
lane_dist = np.linalg.norm(p_l - p_l_proj*n_lane)
if lane_dist <= self.lane_merge_distance:
begin_at_index = i1
break
if begin_at_index > 0:
del waypoints[:begin_at_index]
return waypoints
def complete_robot_action(self):
with self._lock:
if self.action_execution is None:
return
self.action_execution.finished()
self.action_execution = None
self.started_action = False
self.api.toggle_action(self.name, self.started_action)
self.node.get_logger().info(f"Robot {self.name} has completed the"
f" action it was performing")
def newly_closed_lanes(self, closed_lanes):
need_to_replan = False
current_lane = self.get_current_lane()
if self.target_waypoint is not None and \
self.target_waypoint.approach_lanes is not None:
for lane_idx in self.target_waypoint.approach_lanes:
if lane_idx in closed_lanes:
need_to_replan = True
# The robot is currently on a lane that has been closed.
# We take this to mean that the robot needs to reverse.
if lane_idx == current_lane:
lane = self.graph.get_lane(current_lane)
return_waypoint = lane.entry.waypoint_index
reverse_lane = \
self.graph.lane_from(lane.entry.waypoint_index,
lane.exit.waypoint_index)
with self._lock:
if reverse_lane:
# Update current lane to reverse back to
# start of the lane
self.on_lane = reverse_lane.index
else:
# Update current position and waypoint index
# to return to
self.target_waypoint = return_waypoint
if not need_to_replan and self.target_waypoint is not None:
# Check if the remainder of the current plan has been invalidated
# by the lane closure
for wp in self.remaining_waypoints:
for lane in wp.approach_lanes:
if lane in closed_lanes:
need_to_replan = True
break
if need_to_replan:
break
if need_to_replan:
self.update_handle.replan()
def dock_summary_cb(self, msg):
for fleet in msg.docks:
if(fleet.fleet_name == self.fleet_name):
for dock in fleet.params:
self.docks[dock.start] = dock.path
def mode_request_cb(self, msg):
if not msg.fleet_name or msg.fleet_name != self.fleet_name or\
not msg.robot_name or msg.robot_name != self.name:
return
if msg.mode.mode == RobotState.IDLE:
self.complete_robot_action()
fleet_adapter.py:
# Copyright 2021 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import argparse
import yaml
import nudged
import time
import threading
import rclpy
import rclpy.node
from rclpy.parameter import Parameter
import rmf_adapter as adpt
import rmf_adapter.vehicletraits as traits
import rmf_adapter.battery as battery
import rmf_adapter.geometry as geometry
import rmf_adapter.graph as graph
import rmf_adapter.plan as plan
from rmf_task_msgs.msg import TaskProfile, TaskType
from functools import partial
from .RobotCommandHandle import RobotCommandHandle
from .RobotClientAPI import RobotAPI
# ------------------------------------------------------------------------------
# Helper functions
# ------------------------------------------------------------------------------
def initialize_fleet(config_yaml, nav_graph_path, node, use_sim_time, server_uri):
# Profile and traits
fleet_config = config_yaml['rmf_fleet']
profile = traits.Profile(geometry.make_final_convex_circle(
fleet_config['profile']['footprint']),
geometry.make_final_convex_circle(fleet_config['profile']['vicinity']))
vehicle_traits = traits.VehicleTraits(
linear=traits.Limits(*fleet_config['limits']['linear']),
angular=traits.Limits(*fleet_config['limits']['angular']),
profile=profile)
vehicle_traits.differential.reversible = fleet_config['reversible']
# Battery system
voltage = fleet_config['battery_system']['voltage']
capacity = fleet_config['battery_system']['capacity']
charging_current = fleet_config['battery_system']['charging_current']
battery_sys = battery.BatterySystem.make(
voltage, capacity, charging_current)
# Mechanical system
mass = fleet_config['mechanical_system']['mass']
moment = fleet_config['mechanical_system']['moment_of_inertia']
friction = fleet_config['mechanical_system']['friction_coefficient']
mech_sys = battery.MechanicalSystem.make(mass, moment, friction)
# Power systems
ambient_power_sys = battery.PowerSystem.make(
fleet_config['ambient_system']['power'])
tool_power_sys = battery.PowerSystem.make(
fleet_config['tool_system']['power'])
# Power sinks
motion_sink = battery.SimpleMotionPowerSink(battery_sys, mech_sys)
ambient_sink = battery.SimpleDevicePowerSink(
battery_sys, ambient_power_sys)
tool_sink = battery.SimpleDevicePowerSink(battery_sys, tool_power_sys)
nav_graph = graph.parse_graph(nav_graph_path, vehicle_traits)
# Adapter
fleet_name = fleet_config['name']
adapter = adpt.Adapter.make(f'{fleet_name}_fleet_adapter')
if use_sim_time:
adapter.node.use_sim_time()
assert adapter, ("Unable to initialize fleet adapter. Please ensure "
"RMF Schedule Node is running")
adapter.start()
time.sleep(1.0)
fleet_handle = adapter.add_fleet(fleet_name, vehicle_traits, nav_graph, server_uri)
if not fleet_config['publish_fleet_state']:
fleet_handle.fleet_state_publish_period(None)
# Account for battery drain
drain_battery = fleet_config['account_for_battery_drain']
recharge_threshold = fleet_config['recharge_threshold']
recharge_soc = fleet_config['recharge_soc']
finishing_request = fleet_config['task_capabilities']['finishing_request']
node.get_logger().info(f"Finishing request: [{finishing_request}]")
# Set task planner params
ok = fleet_handle.set_task_planner_params(
battery_sys,
motion_sink,
ambient_sink,
tool_sink,
recharge_threshold,
recharge_soc,
drain_battery,
finishing_request)
assert ok, ("Unable to set task planner params")
task_capabilities = []
if fleet_config['task_capabilities']['loop']:
node.get_logger().info(
f"Fleet [{fleet_name}] is configured to perform Loop tasks")
task_capabilities.append(TaskType.TYPE_LOOP)
if fleet_config['task_capabilities']['delivery']:
node.get_logger().info(
f"Fleet [{fleet_name}] is configured to perform Delivery tasks")
task_capabilities.append(TaskType.TYPE_DELIVERY)
if fleet_config['task_capabilities']['clean']:
node.get_logger().info(
f"Fleet [{fleet_name}] is configured to perform Clean tasks")
task_capabilities.append(TaskType.TYPE_CLEAN)
# Callable for validating requests that this fleet can accommodate
def _task_request_check(task_capabilities, msg: TaskProfile):
if msg.description.task_type in task_capabilities:
return True
else:
return False
fleet_handle.accept_task_requests(
partial(_task_request_check, task_capabilities))
# Transforms
rmf_coordinates = config_yaml['reference_coordinates']['rmf']
robot_coordinates = config_yaml['reference_coordinates']['robot']
transforms = {
'rmf_to_robot': nudged.estimate(rmf_coordinates, robot_coordinates),
'robot_to_rmf': nudged.estimate(robot_coordinates, rmf_coordinates)}
transforms['orientation_offset'] = \
transforms['rmf_to_robot'].get_rotation()
mse = nudged.estimate_error(transforms['rmf_to_robot'],
rmf_coordinates,
robot_coordinates)
print(f"Coordinate transformation error: {mse}")
print("RMF to Robot transform:")
print(f" rotation:{transforms['rmf_to_robot'].get_rotation()}")
print(f" scale:{transforms['rmf_to_robot'].get_scale()}")
print(f" trans:{transforms['rmf_to_robot'].get_translation()}")
print("Robot to RMF transform:")
print(f" rotation:{transforms['robot_to_rmf'].get_rotation()}")
print(f" scale:{transforms['robot_to_rmf'].get_scale()}")
print(f" trans:{transforms['robot_to_rmf'].get_translation()}")
def _updater_inserter(cmd_handle, update_handle):
"""Insert a RobotUpdateHandle."""
cmd_handle.update_handle = update_handle
# Initialize robot API for this fleet
api = RobotAPI(
fleet_config['fleet_manager']['prefix'],
fleet_config['fleet_manager']['user'],
fleet_config['fleet_manager']['password'])
# Initialize robots for this fleet
missing_robots = config_yaml['robots']
def _add_fleet_robots():
robots = {}
while len(missing_robots) > 0:
time.sleep(0.2)
for robot_name in list(missing_robots.keys()):
node.get_logger().debug(f"Connecting to robot: {robot_name}")
position = api.position(robot_name)
if position is None:
continue
if len(position) > 2:
node.get_logger().info(f"Initializing robot: {robot_name}")
robots_config = config_yaml['robots'][robot_name]
rmf_config = robots_config['rmf_config']
robot_config = robots_config['robot_config']
initial_waypoint = rmf_config['start']['waypoint']
initial_orientation = rmf_config['start']['orientation']
starts = []
time_now = adapter.now()
if (initial_waypoint is not None) and\
(initial_orientation is not None):
node.get_logger().info(
f"Using provided initial waypoint "
"[{initial_waypoint}] "
f"and orientation [{initial_orientation:.2f}] to "
f"initialize starts for robot [{robot_name}]")
# Get the waypoint index for initial_waypoint
initial_waypoint_index = nav_graph.find_waypoint(
initial_waypoint).index
starts = [plan.Start(time_now,
initial_waypoint_index,
initial_orientation)]
else:
node.get_logger().info(
f"Running compute_plan_starts for robot: "
"{robot_name}")
starts = plan.compute_plan_starts(
nav_graph,
rmf_config['start']['map_name'],
position,
time_now)
if starts is None or len(starts) == 0:
node.get_logger().error(
f"Unable to determine StartSet for {robot_name}")
continue
robot = RobotCommandHandle(
name=robot_name,
fleet_name=fleet_name,
config=robot_config,
node=node,
graph=nav_graph,
vehicle_traits=vehicle_traits,
transforms=transforms,
map_name=rmf_config['start']['map_name'],
start=starts[0],
position=position,
charger_waypoint=rmf_config['charger']['waypoint'],
update_frequency=rmf_config.get(
'robot_state_update_frequency', 1),
adapter=adapter,
api=api)
if robot.initialized:
robots[robot_name] = robot
# Add robot to fleet
fleet_handle.add_robot(robot,
robot_name,
profile,
[starts[0]],
partial(_updater_inserter,
robot))
node.get_logger().info(
f"Successfully added new robot: {robot_name}")
else:
node.get_logger().error(
f"Failed to initialize robot: {robot_name}")
del missing_robots[robot_name]
else:
pass
node.get_logger().debug(
f"{robot_name} not found, trying again...")
return
add_robots = threading.Thread(target=_add_fleet_robots, args=())
add_robots.start()
return adapter
# ------------------------------------------------------------------------------
# Main
# ------------------------------------------------------------------------------
def main(argv=sys.argv):
# Init rclpy and adapter
rclpy.init(args=argv)
adpt.init_rclcpp()
args_without_ros = rclpy.utilities.remove_ros_args(argv)
parser = argparse.ArgumentParser(
prog="fleet_adapter",
description="Configure and spin up the fleet adapter")
parser.add_argument("-c", "--config_file", type=str, required=True,
help="Path to the config.yaml file")
parser.add_argument("-n", "--nav_graph", type=str, required=True,
help="Path to the nav_graph for this fleet adapter")
parser.add_argument("-s", "--server_uri", type=str, required=False, default="",
help="URI of the api server to transmit state and task information.")
parser.add_argument("--use_sim_time", action="store_true",
help='Use sim time, default: false')
args = parser.parse_args(args_without_ros[1:])
print(f"Starting fleet adapter...")
config_path = args.config_file
nav_graph_path = args.nav_graph
# Load config and nav graph yamls
with open(config_path, "r") as f:
config_yaml = yaml.safe_load(f)
# ROS 2 node for the command handle
fleet_name = config_yaml['rmf_fleet']['name']
node = rclpy.node.Node(f'{fleet_name}_command_handle')
# Enable sim time for testing offline
if args.use_sim_time:
param = Parameter("use_sim_time", Parameter.Type.BOOL, True)
node.set_parameters([param])
if args.server_uri == "":
server_uri = None
else:
server_uri = args.server_uri
adapter = initialize_fleet(
config_yaml,
nav_graph_path,
node,
args.use_sim_time,
server_uri)
# Create executor for the command handle node
rclpy_executor = rclpy.executors.SingleThreadedExecutor()
rclpy_executor.add_node(node)
# Start the fleet adapter
rclpy_executor.spin()
# Shutdown
node.destroy_node()
rclpy_executor.shutdown()
rclpy.shutdown()
if __name__ == '__main__':
main(sys.argv)
originbot_config.yaml:
# FLEET CONFIG =================================================================
# RMF Fleet parameters
rmf_fleet:
name: "originbot"
fleet_manager:
prefix: "http://127.0.0.1:8000"
user: "some_user"
password: "some_password"
limits:
linear: [0.7, 0.5] # velocity, acceleration
angular: [0.4, 1.0] # velocity, acceleration
profile: # Robot profile is modelled as a circle
footprint: 1.0 # radius in m
vicinity: 1.2 # radius in m
reversible: False # whether robots in this fleet can reverse
# TODO Update battery parameters with actual specs
battery_system:
voltage: 12.0 # V
capacity: 20.0 # Ahr
charging_current: 10.0 # A
mechanical_system:
mass: 70.0 # kg
moment_of_inertia: 40.0 #kgm^2
friction_coefficient: 0.22
ambient_system:
power: 100.0 # W
tool_system:
power: 0.0 # W
recharge_threshold: 0.20 # Battery level below which robots in this fleet will not operate
recharge_soc: 1.0 # Battery level to which robots in this fleet should be charged up to during recharging tasks
publish_fleet_state: 10.0 # Publish frequency for fleet state, ensure that it is same as robot_state_update_frequency
account_for_battery_drain: True
task_capabilities: # Specify the types of RMF Tasks that robots in this fleet are capable of performing
loop: True
delivery: True
clean: False
finishing_request: "park" # [park, charge, nothing]
# Originbot CONFIG =================================================================
robots:
# Here the user is expected to append the configuration for each robot in the
# fleet.
# Configuration for originbot_0
# originbot_0:
# robot_config:
# max_delay: 15.0 # allowed seconds of delay of the current itinerary before it gets interrupted and replanned
# rmf_config:
# robot_state_update_frequency: 10.0
# start:
# map_name: "L1"
# waypoint: ""
# orientation: 0.0 # radians
# charger:
# waypoint: ""
# Configuration for originbot_1
originbot_1:
robot_config:
max_delay: 15.0 # allowed seconds of delay of the current itinerary before it gets interrupted and replanned
rmf_config:
robot_state_update_frequency: 10.0
start:
map_name: "L1"
waypoint: "originbot_spawn"
orientation: 0.0 # radians
charger:
waypoint: "originbot_spawn"
# TRANSFORM CONFIG =============================================================
# For computing transforms between Robot and RMF coordinate systems
# If its a 1:1 correspondance just place the same numbers
reference_coordinates:
rmf: [[10.2837, -5.6326], # Right corner
[5.3339, -0.3840], # Top Corner
[0.2560, -5.2059], # Left Corner
[4.8645, -9.8143], # Bottom Corner
[3.7977, -4.3524], # Our seats
[3.9684, -6.5713], # Circular Robot Base
[6.2300, -3.4990], # Table leg on the right
[6.8274, -6.2300], # Bottom Right Arm
[8.5769, -8.9182]] # Island
robot: [[8.04443, -1.89213], # Right corner
[2.2344, 4.35452], # Top Corner
[-3.61153, -1.37832], # Left Corner
[1.50039, -692627], # Bottom Corner
[0.462738, -0.356078], # Our seats
[0.684927, -2.9862], # Circular Robot Base
[3.30796, 0.658291], # Table leg on the right
[4.03551, -2.54359], # Bottom Right Arm
[5.95516, -5.64273]] # Island
adapters_rmf_only.launch.xml:
<?xml version='1.0' ?>
<launch>
<arg name="use_sim_time" default="true"/>
<arg name="failover_mode" default="false"/>
<arg name="map_name" default="lab07" description="Name of the rmf_gazebo map to simulate" />
<!-- Common launch -->
<include file="$(find-pkg-share rmf_demos)/common.launch.xml">
<arg name="use_sim_time" value="$(var use_sim_time)"/>
<arg name="viz_config_file" value ="$(find-pkg-share rmf_gazebo)/rviz_config/my_environment_test_2.rviz"/>
<arg name="config_file" value="$(find-pkg-share physical_rmf)/rmf_config/$(var map_name).building.yaml"/>
<arg name="dashboard_config_file" value="$(find-pkg-share physical_rmf)/dashboard_config/dashboard_config_physical.json"/>
</include>
</launch>
These are the commands that I used to launch and run the files:
ros2 run physical_rmf rmf_to_nav.launch.xml
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
ros2 launch physical_rmf adapters_rmf_only.launch.xml
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
ros2 run physical_api robot_api_server_originbot
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export NAV_GRAPH=~/rmf_ws/install/physical_rmf/share/physical_rmf/maps/lab07/nav_graphs/1.yaml
export CONFIG_FILE=~/rmf_ws/src/physical_rmf/adapter_config/originbot_config.yaml
ros2 run physical_api fleet_adapter -c $CONFIG_FILE -n $NAV_GRAPH (x3 export)