Source code for morse.middleware.yarp.yarp_json

import logging; logger = logging.getLogger("morse." + __name__)
import json
from collections import OrderedDict
from morse.middleware.yarp_datastream import YarpPublisher, YarpReader
from morse.middleware.socket_datastream import MorseEncoder


[docs]class YarpJsonPublisher(YarpPublisher): _type_name = "json encoded data in yarp::bottle"
[docs] def encode(self, bottle): js = json.dumps(self.data, cls=MorseEncoder) bottle.addString(js)
[docs]class YarpJsonReader(YarpReader):
[docs] def default(self, ci): message_bottle = self.port.read(False) if message_bottle is not None: message_data = message_bottle.get(0).toString() # Deserialise the data directly into a temporary ordered dictionary json_dict = json.loads(message_data, object_pairs_hook=OrderedDict) # Fill the component's 'local_data' dictionary, # while also casting to the correct data types for variable, data in self.data.items(): if isinstance(data, int): self.data[variable] = int(json_dict[variable]) elif isinstance(data, float): self.data[variable] = float(json_dict[variable]) elif isinstance(data, str): self.data[variable] = json_dict[variable] else: logger.error("Unknown data type at 'read_json_data'")
[docs]class YarpJsonWaypointReader(YarpReader):
[docs] def default(self, ci): """ Read a destination waypoint in the format used for ROSACE The structure read by JSON is expected to have two properties: point: structure with 'x', 'y', 'z' radius: the tolerance around those points """ message_bottle = self.port.read(False) if message_bottle is not None: message_data = message_bottle.get(0).toString() # Deserialise the data directly into a temporary ordered dictionary json_dict = json.loads(message_data, object_pairs_hook=OrderedDict) point = json_dict['point'] tolerance = json_dict['radius'] self.data['x'] = float(point['x']) self.data['y'] = float(point['y']) self.data['z'] = float(point['z']) self.data['tolerance'] = float(tolerance) self.data['speed'] = 3.0