Source code for morse.core.abstractobject

import logging; logger = logging.getLogger("morse." + __name__)
from abc import ABCMeta, abstractmethod
import morse.core.services
from collections import OrderedDict

from morse.core.exceptions import MorseRPCInvokationError, MorseServiceAlreadyRunningError

[docs]class AbstractObject(object): """ An abstract object. All components in MORSE (either physical components like actuators or sensors or pseudo components like component overlays) inherit from AbstractObject. This abstract class provides all RPC service-related mechanics. """ # Make this an abstract class __metaclass__ = ABCMeta def __init__(self): self.on_completion = None """ When a task is considered 'completed' (the semantic of 'completed' is left to each component), the default_action method is expected to call this callback (if not None) with the task status (from ``core.status.*``) + optional return value as a tuple. For instance:: self.on_completion((status.FAILED, "Couldn't reach the target")) self.on_completion((status.SUCCESS, {'x':1.0, 'y':0.54})) self.on_completion((status.SUCCESS)) """ # Dictionary to store the data used by each component self.local_data = OrderedDict() # Define lists of dynamically added functions self.del_functions = []
[docs] def finalize(self): """ Destructor method. """ logger.info("%s: I'm dying!!" % self.name()) # Call specific functions added to this object for function in self.del_functions: function()
[docs] def register_services(self): """ Register the component services, if any. Methods to register are marked '_morse_service' by the '@service' decorator. """ for fn in [getattr(self, fn) for fn in dir(self) if hasattr(getattr(self, fn), "_morse_service")]: name = fn._morse_service_name if fn._morse_service_name else fn.__name__ morse.core.services.do_service_registration(fn, self.name(), name, fn._morse_service_is_async)
[docs] def completed(self, status, result = None): """ This method must be invoked by the component when a service completes. Calling this method triggers the notification of task completion to the client. :param morse.core.status status: status (success, failure...) of the task :param result: results of the service, if any (may be any valid Python object) """ if self.on_completion: """ In overlays or in a chain of service, it is legal to call again a service in the completion handler. However, to test if there is a running service or not for a service, we test if self.on_completion is None or not. So, we need to make sure that self.on_completion is None when calling the completion handler. To do that, store another reference, clean up the one stored in self.on_completion, and call it through our reference """ fn = self.on_completion self.on_completion = None fn((status, result))
[docs] def interrupt(self): """ This method is automatically invoked by the component when a service is interrupted, basically to notify to the client that the task has been interrupted. It can be overriden in each component to provide a true interrupt, for exemple resseting the speed command to 0.0. If you override it, do not forget to call ``self.completed`` with ``status.PREEMPTED``. """ import morse.core.status self.completed(morse.core.status.PREEMPTED)
[docs] def set_service_callback(self, cb): """ Sets the callback function that is to be invoked when the current request completes. This is automatically set by the @async_service decorator and should not usually be directly called. """ if self.on_completion: # A callback is already registered -> a service is running old_cb = self.on_completion if not hasattr(old_cb.service, "_morse_service_interruptible"): # No policy (interruptible/noninterruptible) set for # the service currently running. We throw an exception # to be caught by the middleware. Up to it to define # the default policy. raise MorseServiceAlreadyRunningError(old_cb.service, "A service (" + \ old_cb.service.__name__ + ") is already running. I do not know what to do.") interruptible = old_cb.service._morse_service_interruptible if interruptible: self.interrupt() else: raise MorseRPCInvokationError("A non-interruptible service (" + old_cb.service.__name__ + ") is already running") self.on_completion = cb
[docs] @abstractmethod def name(self): """ The name of the component, as appearing when exposing services for instance. """ pass
[docs] def print_data(self): """ Print the current data for the component instance. """ for variable, data in self.local_data.items(): res = variable + str(data) + " " logger.info("%s" % res)