Source code for morse.multinode.hla

import logging; logger = logging.getLogger("morse." + __name__)
import os

from morse.core import blenderapi, mathutils
from morse.core.exceptions import MorseMultinodeError
from morse.core.multinode import SimulationNodeClass
from morse.middleware.hla_datastream import MorseBaseAmbassador, HLABaseNode

try:
    import hla.rti as rti
    import hla.omt as fom
except (ImportError, SyntaxError):
    logger.error("No HLA binding found or imported!")
    raise MorseMultinodeError("'hla' python not found, or has syntax errors")

"""
Defines the 'MorseVector' type that will be transfered on the HLA federation.
"""
MorseVector = fom.HLAfixedArray("MorseVector", fom.HLAfloat32LE, 3)

[docs]class MorseAmbassador(MorseBaseAmbassador): """ The Federate Ambassador of the MORSE node. """ def __init__(self, rtia, federation, time_regulation, timestep, lookahead): MorseBaseAmbassador.__init__(self, rtia, federation, time_regulation, timestep, lookahead) logger.debug("MorseAmbassador created.")
[docs] def initialize(self): try: out_robot = self.object_handle("Robot") self.out_position = self.attribute_handle("position", out_robot) self.out_orientation = self.attribute_handle("orientation", out_robot) except rti.NameNotFound: logger.error("'Robot' (or attributes) not declared in FOM." + \ "Your '.fed' file may not be up-to-date.") return False self._rtia.publishObjectClass(out_robot, [self.out_position, self.out_orientation]) robot_dict = blenderapi.persistantstorage().robotDict for obj in robot_dict.keys(): self.register_object(out_robot, obj.name) logger.info( "Pose of robot %s will be published on the %s federation.", obj.name, self.federation) in_robot = self.object_handle("Robot") self.in_position = self.attribute_handle("position", in_robot) self.in_orientation = self.attribute_handle("orientation", in_robot) self.suscribe_attributes(obj.name, in_robot, [self.in_position, self.in_orientation])
[docs] def discoverObjectInstance(self, object, objectclass, name): logger.info( "Robot %s will have its pose reflected in the current node...", name)
[docs] def reflectAttributeValues(self, object, attributes, tag, order, transport, time=None, retraction=None): scene = blenderapi.scene() obj_name = self._rtia.getObjectInstanceName(object) logger.debug("RAV %s", obj_name) try: obj = scene.objects[obj_name] if self.in_position in attributes: pos, offset = MorseVector.unpack(attributes[self.in_position]) # Update the positions of the robots obj.worldPosition = pos if self.in_orientation in attributes: ori, offset = MorseVector.unpack(attributes[self.in_orientation]) # Update the orientations of the robots obj.worldOrientation = mathutils.Euler(ori).to_matrix() except KeyError as detail: logger.debug("Robot %s not found in this simulation scenario," + \ "but present in another node. Ignoring it!", obj_name)
[docs]class HLANode(SimulationNodeClass): """ Implements multinode simulation using HLA. """ time_sync = False fom = "morse.fed" federation = "MORSE"
[docs] def initialize(self): logger.info("Initializing HLA node.") if os.getenv("CERTI_HTTP_PROXY") is None: os.environ["CERTI_HTTP_PROXY"] = "" os.environ["CERTI_HOST"] = str(self.host) os.environ["CERTI_TCP_PORT"] = str(self.port) self.node = HLABaseNode(MorseAmbassador, self.fom, self.node_name, self.federation, None, None, self.time_sync, 1.0, 1.0) self.node.morse_ambassador.initialize()
[docs] def finalize(self): """ Close all open HLA connections. """ del self.node
[docs] def synchronize(self): if not self.node: return scene = blenderapi.scene() for obj in self.node.morse_ambassador.registred_objects.values(): obj_name = self.node.rtia.getObjectInstanceName(obj) obj_pos = scene.objects[obj_name].worldPosition.to_tuple() obj_ori = scene.objects[obj_name].worldOrientation.to_euler() hla_att = { self.node.morse_ambassador.out_position: MorseVector.pack([obj_pos[0], obj_pos[1], obj_pos[2]]), self.node.morse_ambassador.out_orientation: MorseVector.pack([obj_ori.x, obj_ori.y, obj_ori.z])} self.node.morse_ambassador.update_attribute(obj, hla_att) self.node.morse_ambassador.advance_time()