Source code for morse.middleware.mavlink_datastream

import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.datastream import DatastreamManager
from morse.core import blenderapi

from pymavlink.dialects.v10 import common as mavlink
from pymavlink.mavutil import mavlink_connection 

[docs]class MavlinkConnManager: def __init__(self, mav): self._manager = {} self._mav = mav
[docs] def get(self, conn): if conn not in self._manager: self._manager[conn] = mavlink_connection(conn) return self._manager[conn]
[docs] def send_hearbeat(self): msg = mavlink.MAVLink_heartbeat_message( mavlink.MAV_TYPE_GENERIC, mavlink.MAV_AUTOPILOT_GENERIC, mavlink.MAV_MODE_TEST_ARMED, 0, mavlink.MAV_STATE_ACTIVE, 3) for conn in self._manager.values(): conn.write(msg.pack(self._mav))
[docs]class MavlinkDatastreamManager(DatastreamManager): """ External communication using Mavlink protocol """ def __init__(self, args, kwargs): """ Initialize the socket connections """ # Call the constructor of the parent class DatastreamManager.__init__(self, args, kwargs) self._mav = mavlink.MAVLink(None) self._conn_manager = MavlinkConnManager(self._mav) self._boot_time = blenderapi.persistantstorage().time.time
[docs] def register_component(self, component_name, component_instance, mw_data): """ Open the port used to communicate by the specified component. """ # Create a socket server for this component datastream = DatastreamManager.register_component(self, component_name, component_instance, mw_data) datastream.setup(self._conn_manager, self._mav, self._boot_time)
[docs] def action(self): self._conn_manager.send_hearbeat()