Source code for morse.middleware.mavlink.abstract_mavlink

from morse.middleware import AbstractDatastream
import logging; logger = logging.getLogger("morse." + __name__)

[docs]class classproperty(object): def __init__(self, fget): self.fget = fget def __get__(self, owner_self, owner_cls): return self.fget(owner_cls)
[docs]class MavlinkDatastream(AbstractDatastream): @classproperty def _type_url(cls): return "https://pixhawk.ethz.ch/mavlink/#" + cls._type_name
[docs] def setup(self, conn_manager, mav, boot_time): self._conn = conn_manager.get(self.kwargs['device']) self._boot_time = boot_time self._mav = mav
[docs]class MavlinkSensor(MavlinkDatastream):
[docs] def initialize(self): self._mavlink_client = None self._boot_time = 0 self._mav = None self._msg = None
""" common messages requires time in ms since boot, so provide an helper function to compute that """
[docs] def time_since_boot(self): return int((self.data['timestamp'] - self._boot_time) * 1000)
""" The make_msg is a method which must be overrided The goal of the method is to set with something sensible the self._msg field """
[docs] def make_msg(self): pass
[docs] def default(self, ci = 'unused'): self.make_msg() self._conn.write(self._msg.pack(self._mav))
[docs]class MavlinkActuator(MavlinkDatastream):
[docs] def initialize(self): self._conn = None self._msg = None
[docs] def process_msg(self): """ The process_msg must be overriden. It should convert mavlink format into the internal Morse format. The last mavlink message is stored in self._msg. """ pass
[docs] def default(self, ci = 'unused'): last_msg = None missed = -1 self._msg = self._conn.recv_match(type=self._type_name, blocking=False) while self._msg: last_msg = self._msg self._msg = self._conn.recv_msg() missed += 1 if not last_msg: return False self._msg = last_msg if missed != 0: logger.warning('Dropped %d messages' % missed) logger.debug('Received %s' % self._msg) self.process_msg() return True