Source code for morse.middleware.ros_request_manager

import logging; logger = logging.getLogger("morse." + __name__)

import re
import threading
from functools import partial

from morse.core import status, services
from morse.core.request_manager import RequestManager

try:
    import rospy
    import actionlib_msgs
except ImportError as ie:
    raise ImportError("Could not import some ROS modules."
                      " Check your ROS configuration is ok. Details:\n" + str(ie))

[docs]def ros_timer(callable_obj, frequency): # Shamelessly stolen from actionlib/action_server.py rate = rospy.Rate(frequency) rospy.logdebug("Starting timer") while not rospy.is_shutdown(): try: rate.sleep() callable_obj() except rospy.exceptions.ROSInterruptException: rospy.logdebug("Sleep interrupted")
[docs]class RosAction(object): """ Implements a minimal action state machine. See http://www.ros.org/wiki/actionlib/DetailedDescription for the possible states. """ def __init__(self, manager, component, action, rostype): self.manager = manager self._pending_goals = {} self.goal_lock = threading.Lock() self.component = component self._action = action # robot.001.sensor.001 = robot001.sensor001 name = re.sub(r'\.([0-9]+)', r'\1', component) self.name = name.replace(".", "/") + "/" + action # Retrieves the types of the goal msg, feedback msg and result # msg (check roslib.message.Message for details) rosinstance = rostype() types = [type(getattr(rosinstance, t)) for t in rosinstance.__slots__] self.result_type = types[1] self.feedback_type = types[2] # read the frequency with which to publish status from the parameter server # (taken from actionlib/action_server.py) self.status_frequency = rospy.get_param(self.name + "/status_frequency", 5.0) queue_size = max(1, self.status_frequency) # Create the 5 topics required by an action server self.goal_topic = rospy.Subscriber(self.name + "/goal", types[0], self.on_goal) self.result_topic = rospy.Publisher(self.name + "/result", types[1], queue_size=queue_size) self.feedback_topic = rospy.Publisher(self.name + "/feedback", types[2], queue_size=queue_size) self.cancel_topic = rospy.Subscriber(self.name + "/cancel", actionlib_msgs.msg.GoalID, self.on_cancel) self.status_topic = rospy.Publisher(self.name + "/status", actionlib_msgs.msg.GoalStatusArray, queue_size=queue_size) status_list_timeout = rospy.get_param(self.name + "/status_list_timeout", 5.0) self.status_list_timeout = rospy.Duration(status_list_timeout) self.status_timer = threading.Thread(None, ros_timer, None, (self._publish_status,self.status_frequency) ) self.status_timer.start()
[docs] def setstatus(self, id, status): with self.goal_lock: goal_id = self._pending_goals[id]['goal_id'] goal_status = actionlib_msgs.msg.GoalStatus(goal_id = goal_id, status = status) self._pending_goals[id]['status'] = goal_status
[docs] def getstatus(self, id): with self.goal_lock: return self._pending_goals[id]['status'].status
[docs] def set_internal_id(self, id, morse_id): with self.goal_lock: self._pending_goals[id]['morse_id'] = morse_id
[docs] def manage_internal_id(self, morse_id): """ Check if this ROS action manager manages the given internal request ID. This is used by RosRequestManager to dispatch the service 'on completion' event. """ return self.get_id_from_internal_id(morse_id) is not None
[docs] def get_id_from_internal_id(self, morse_id): with self.goal_lock: for id, goal in self._pending_goals.items(): if goal.setdefault('morse_id') == morse_id: return id
[docs] def on_goal(self, goal): logger.info("Got a new goal for ROS action " + self.name) # Workaround for encoding issues (-> goal_id.id comes as bytes, # and must be converted to string before being reserialized) id = actionlib_msgs.msg.GoalID(id = goal.goal_id.id, stamp = goal.goal_id.stamp) with self.goal_lock: self._pending_goals[id.id] = {'goal_id': id, 'status': None} self.setstatus(id.id, actionlib_msgs.msg.GoalStatus.PENDING) is_sync, morse_id = self.manager.on_incoming_request(self.component, self._action, [goal.goal]) # is_sync should be only True for ROS services! if is_sync: # TODO: clean terminated goals 'after a few seconds' (as # said by actionlib doc) self.setstatus(id.id, actionlib_msgs.msg.GoalStatus.REJECTED) logger.error("Internal error: This ROS action is bound to a " "synchronous MORSE service! ({})".format(component_name + '.' + service_name)) self.set_internal_id(id.id, morse_id) self.setstatus(id.id, actionlib_msgs.msg.GoalStatus.ACTIVE) logger.debug("Started action. GoalID=" + id.id + " MORSE ID=" + str(morse_id))
[docs] def on_cancel(self, goal_id): logger.info("Got a cancel request for ROS action " + self.name) current_status = None try: current_status = self.getstatus(goal_id.id) except KeyError: # Unknown GoalID...? skipping this cancel request. logger.info("I can not find any goal matching this cancel request. Skipping it.") return if current_status == actionlib_msgs.msg.GoalStatus.PENDING: self.setstatus(goal_id.id, actionlib_msgs.msg.GoalStatus.RECALLING) else: #current status = ACTIVE (or smth else...) self.setstatus(goal_id.id, actionlib_msgs.msg.GoalStatus.PREEMPTING) self.manager.abort_request(self._pending_goals[goal_id.id]['morse_id'])
[docs] def on_result(self, morse_id, state, result): logger.info("Got a result for action " + self.name + ": " + str(state) + " " + str(result)) id = self.get_id_from_internal_id(morse_id) if state == status.PREEMPTED: logger.info("The action " + self.name + " has been preempted. " "Reporting it to ROS system.") self.setstatus(id, actionlib_msgs.msg.GoalStatus.PREEMPTED) if state == status.FAILED: logger.info("The action " + self.name + " has been aborted. " "Reporting it to ROS system.") self.setstatus(id, actionlib_msgs.msg.GoalStatus.ABORTED) if state == status.SUCCESS: logger.debug("The action " + self.name + " has succeeded. Reporting" " it to ROS system and publishing results.") self.setstatus(id, actionlib_msgs.msg.GoalStatus.SUCCEEDED) self.publish_result(id, result)
[docs] def publish_result(self, goal_id, result): goal_status = None with self.goal_lock: goal_status = self._pending_goals[goal_id]['status'] res = self.result_type(status=goal_status, result=result) self.result_topic.publish(res)
def _publish_status(self): """ This private method is called asynchronously to update the status of pending goals. """ status_array = actionlib_msgs.msg.GoalStatusArray() with self.goal_lock: for goal in self._pending_goals.values(): if goal['status']: status_array.status_list.append(goal['status']) status_array.header.stamp = rospy.Time.now() self.status_topic.publish(status_array)
[docs]class RosRequestManager(RequestManager): def __init__(self): RequestManager.__init__(self) self._services = {} self._actions = [] def __str__(self): return "ROS Request Manager"
[docs] def initialization(self): # Init MORSE-node in ROS rospy.init_node('morse', disable_signals=True) logger.info("ROS node 'morse' successfully created.") return True
[docs] def finalization(self): # do not shutdown ros since we cant restart it later on (see rospy). return True
[docs] def register_ros_action(self, method, component_name, service_name): rostype = None try: rostype = method._ros_action_type # Is it a ROS action? logger.info(component_name + "." + service_name + " is a ROS action of type " + str(rostype)) except AttributeError: logger.info(component_name + "." + service_name + " has no ROS-specific action type. Skipping it.") return False self._actions.append(RosAction(self, component_name, service_name, rostype)) logger.info("Created new ROS action server for {}.{}".format( component_name, service_name)) return True
[docs] def register_ros_service(self, method, component_name, service_name): rostype = None try: rostype = method._ros_service_type # Is it a ROS service? logger.info(component_name + "." + service_name + " is a ROS service of type " + str(rostype)) except AttributeError: logger.info(component_name + "." + service_name + " has no ROS-specific service type. Skipping it.") return False cb = self.add_ros_handler(component_name, service_name) # robot.001.sensor.001 = robot001.sensor001 name = re.sub(r'\.([0-9]+)', r'\1', component_name) _service_name = name.replace(".", "/") + "/" + service_name try: s = rospy.Service(_service_name, rostype, cb) logger.info("Created new ROS service for %s.%s" % \ (component_name, service_name)) except Exception as e: logger.warning("Could not initiate rospy.Service\n" + str(e)) return True
[docs] def post_registration(self, component_name, service_name, is_async): """ We create here ROS services (for *synchronous* services) and ROS actions (for *asynchronous* services). In order not to interfere with neither ROS own service wrapping mechanism or MORSE service invokation mechanisms, we create here, 'on the fly', handlers for each service exposed through ROS. ROS requires type for the services/actions. Those can be set with the :py:meth:`ros_action` and :py:meth:`ros_service` decorators. If none is set, the action/service is discarded. """ rostype = None # TODO: I access here an internal member of the parent class to # retrieve the ROS type, if set. _services should probably be # 'officially' exposed method, is_async = self._services[(component_name, service_name)] if is_async: return self.register_ros_action(method, component_name, service_name) else: return self.register_ros_service(method, component_name, service_name)
[docs] def add_ros_handler(self, component_name, service_name): """ Dynamically creates custom ROS->MORSE dispatchers for ROS *services* only. ROS actions are dealt with in the :py:class:`RosAction` class. """ def innermethod(request): # TODO: when this will be actually called? If ROS service # management lives in its own thread, it can be basically # called at any time, which is dangerous. # The request type is generated from the .srv definition. # roslib.message.Message defines the method __getstate__() # the retrieve a 'flat' view of all parameters. args = request.__getstate__() logger.info("ROS->MORSE dispatcher for " + service_name + \ " got incoming request: " + str(args)) is_sync, value = self.on_incoming_request(component_name, service_name, args) # is_sync should be always True for ROS services! if not is_sync: logger.error("Internal error: This ROS " "service is bound to an asynchronous MORSE" " service! ({})".format(component_name + '.' + service_name)) raise rospy.service.ServiceException("MORSE Internal error! Check MORSE logs for details.") state, result = value if state == status.SUCCESS: # Here, we 'hope' that the return value of the MORSE # service is a valid dataset to construct the # ServiceResponse expected by ROS return result else: # failure! raise rospy.service.ServiceException(result) innermethod.__doc__ = "This method is invoked directly by MORSE when " +\ "a new service request comes in. This handler simply redispatch it to " +\ "MORSE own service invokation system.\n\n" +\ "The method takes a ROS ServiceRequest as unique parameter, and must " +\ "return a ROS ServiceResponse" innermethod.__name__ = "_" + service_name + "_ros_handler" logger.debug("Created new ROS->MORSE dispatcher for service {}.{}".format( component_name, service_name)) return innermethod
[docs] def on_service_completion(self, request_id, result): # First, figure out which 'ROSAction' manages this request id: manager = None for action in self._actions: if action.manage_internal_id(request_id): manager = action break if manager is None: logger.error("A ROS action call has been lost! Nobody manage request " + request_id) # Then, dispatch the 'on completion' event. status, value = result manager.on_result(request_id, status, value)
[docs] def main(self): pass
[docs]def ros_action(fn = None, type = None, name = None): """ The @ros_action decorator. This decorator is very similar to the standard :py:meth:`morse.core.services.async_service` decorator. It sets a class method to be a asynchronous service, exposed as a ROS action of type `type`. This decorator can only be used on methods in classes inheriting from :py:class:`morse.core.object.Object`. :param callable fn: [automatically set by Python to point to the decorated function] :param class type: you MUST set this parameter to define the type of the ROS action. :param string name: by default, the name of the service is the name of the method. You can override it by setting the 'name' argument. Your ROS action will appear as /component_instance/<name>/... """ if not type: logger.error("You must provide a valid ROS action type when using the " + \ "@ros_action decorator, e.g. @ros_action(type=MyRosAction). Action ignored.") return if not hasattr(fn, "__call__"): return partial(ros_action, type = type, name = name) fn._ros_action_type = type return services.service(fn, component = None, name = name, asynchronous = True)
[docs]def ros_service(fn = None, type = None, component = None, name = None): """ The @ros_service decorator. This decorator is very similar to the standard :py:meth:`morse.core.services.service` decorator. It sets a free function or class method to be a (synchronous) service, exposed as a ROS service of type `type`. This decorator works both with free function and for methods in classes inheriting from :py:class:`morse.core.object.Object`. In the former case, you must specify a component (your service will belong to this namespace), in the later case, it is automatically set to the name of the corresponding MORSE component. :param callable fn: [automatically set by Python to point to the decorated function] :param class type: you MUST set this parameter to define the type of the ROS action. :param string component: you MUST set this parameter to define the name of the component which export the service ONLY for free functions. Cf explanation above. :param string name: by default, the name of the service is the name of the method. You can override it by setting the 'name' argument. Your ROS service will appear as /<name> """ if not type: logger.error("You must provide a valid ROS action type when using the " + \ "@ros_action decorator, e.g. @ros_action(type=MyRosAction). Service ignored.") return if not hasattr(fn, "__call__"): return partial(ros_service, type = type, component = component, name = name) fn._ros_service_type = type return services.service(fn, component = component, name = name, asynchronous = False)