Source code for morse.middleware.yarp_datastream

import logging; logger = logging.getLogger("morse." + __name__)
import sys
import re
import yarp
import mathutils
from morse.helpers.transformation import Transformation3d
from morse.middleware.abstract_datastream import AbstractDatastream
from morse.core.datastream import *

[docs]class YarpPort(AbstractDatastream):
[docs] def initialize(self, port_creator_fn, is_input): self.is_input = is_input self.port_name = self._get_port_name() self.port = port_creator_fn() if 'topic' in self.kwargs: self.connect2topic(self.kwargs['topic'])
def _get_port_name(self): if 'port' in self.kwargs: return self.kwargs['port'] else: port_name = re.sub(r'\.([0-9]+)', r'\1', self.component_name) # '/robot001/sensor001' return '/morse/' + port_name.replace('.', '/') + \ ('/in' if self.is_input else '/out')
[docs] def connect2topic(self, topic): """ Connect a yarp port to a specific yarp topic. """ if self.is_input: yarp.Network.connect("topic://"+topic, self.port_name) else: yarp.Network.connect(self.port_name, "topic://"+topic)
[docs] def finalize(self): self.port.close()
[docs]class YarpPublisher(YarpPort): _type_name = "yarp::Bottle"
[docs] def initialize(self): YarpPort.initialize(self, yarp.BufferedPortBottle, False)
[docs] def encode_message(self, bottle, data, component_name): """ Prepare the content of the bottle This function can be recursively called in case of list processing """ if isinstance(data, int): bottle.addInt(data) elif isinstance(data, float): bottle.addDouble(data) elif isinstance(data, str): bottle.addString(data) elif isinstance(data, tuple): self.encode_message(bottle, list(data), component_name) elif isinstance(data, list): m_bottle = bottle.addList() for m_data in data: self.encode_message(m_bottle, m_data, component_name) elif isinstance(data, dict): for key, value in data.items(): m_bottle = bottle.addList() self.encode_message(m_bottle, key, component_name) self.encode_message(m_bottle, value, component_name) elif isinstance(data, mathutils.Vector): self.encode_message(bottle, data[:], component_name) elif isinstance(data, mathutils.Matrix): self.encode_message(bottle, data[:], component_name) elif isinstance(data, mathutils.Quaternion): self.encode_message(bottle, [data.x , data.y, data.z, data.w], component_name) elif isinstance(data, mathutils.Euler): self.encode_message(bottle, [data.z , data.y, data.x], component_name) elif isinstance(data, Transformation3d): self.encode_message(bottle, [data.x , data.y, data.z, data.yaw, data.pitch, data.roll], component_name) else: logger.error("Unknown data type in component '%s'" % component_name)
[docs] def default(self, ci): bottle = self.port.prepare() bottle.clear() self.encode(bottle) self.port.write()
[docs] def encode(self, bottle): for data in self.encode_message(bottle, data, self.component_name)
[docs]class YarpReader(YarpPort): _type_name = "yarp::Bottle"
[docs] def initialize(self): YarpPort.initialize(self, yarp.BufferedPortBottle, True)
[docs] def default(self, ci): message_data = if message_data is not None: # Data elements are of type defined in data_types i = 0 for variable, data in if isinstance(data, int): msg_data = message_data.get(i).asInt()[variable] = msg_data logger.debug("read %s as %d" % (variable, msg_data)) elif isinstance(data, float): msg_data = message_data.get(i).asDouble()[variable] = msg_data logger.debug("read %s as %f" % (variable, msg_data)) elif isinstance(data, str): msg_data = message_data.get(i).toString()[variable] = msg_data logger.debug("read %s as %s" % (variable, msg_data)) else: logger.error("Unknown data type at 'read_message', " "with component '%s'" % self.component_name)"DATA: ", data, " | TYPE: ", type(data)) i += 1 return True else: return False
[docs]class YarpDatastreamManager(DatastreamManager): """ Handle communication between Blender and YARP.""" def __init__(self, args, kwargs): """ Initialize the network and connect to the yarp server.""" # Call the constructor of the parent class DatastreamManager.__init__(self, args, kwargs) yarp.Network.init()
[docs] def finalize(self): """ Close all currently opened ports and release the network.""" yarp.Network.fini()'Yarp Mid: ports have been closed.')