Source code for morse.middleware.hla.abstract_hla

import logging; logger = logging.getLogger("morse." + __name__)
from morse.middleware import AbstractDatastream

[docs]class AbstractHLAOutput(AbstractDatastream):
[docs] def initialize(self): self.amb = self.kwargs['__hla_node'].morse_ambassador self._hla_name = self.component_instance.robot_parent.name() self._obj = None
[docs] def register_object(self, handle): self._obj = self.amb.register_object(handle, self._hla_name)
[docs] def publish_attributes(self, obj_handle, attr_handles): self.amb.publish_attributes(self._hla_name, obj_handle, attr_handles)
[docs] def update_attribute(self, to_send): if not self._obj: self._obj = self.amb.get_object(self._hla_name) if not self._obj: logger.warning("Cannot find reference for object %s : don't update its attribute" % self._hla_name) else: self.amb.update_attribute(self._obj, to_send)
[docs] def finalize(self): self.amb.delete_object(self._hla_name)
[docs]class AbstractHLAInput(AbstractDatastream):
[docs] def initialize(self): self.amb = self.kwargs['__hla_node'].morse_ambassador self._amb = self.kwargs['__hla_node'].morse_ambassador self._hla_name = self.component_instance.robot_parent.name() self._obj_handle = None
[docs] def suscribe_attributes(self, obj_handle, attr_handles): self._obj_handle = obj_handle self.amb.suscribe_attributes(self._hla_name, obj_handle, attr_handles)
[docs] def get_attributes(self): return self.amb.get_attributes(self._hla_name)
[docs] def hla_name(self): return self._hla_name
[docs] def finalize(self): self.amb.unsuscribe_attributes(self._obj_handle)