Source code for morse.middleware.hla_datastream

import logging; logger = logging.getLogger("morse." + __name__)
import hla.rti as rti
import sys

from morse.core.datastream import DatastreamManager
from morse.core import blenderapi

[docs]class MorseBaseAmbassador(rti.FederateAmbassador): def __init__(self, rtia, federation, time_sync, timestep, lookahead): self._rtia = rtia self.federation = federation self._time_sync = time_sync self.synchronisation_points = {} self.registred_objects = {} # name -> obj_handle self.registred_class_ref = {} # obj_handle -> int self._object_handles = {} # string -> obj_handle self._attributes_handles = {} # (obj_handle, string) -> attr_handle self._objects_discovered = {} # obj_name -> obj self._attributes_subscribed = {} # obj_name -> [attr_handle] self._attributes_published = {} # obj_name -> [attr_handle] self._attributes_values = {} # obj_name -> { attr_handle -> value } self.timestep = timestep self.lookahead = lookahead
[docs] def initialize_time_regulation(self): self.logical_time = self._rtia.queryFederateTime() logger.debug("federation %s time %f timestep %f" % (self.federation, self.logical_time, self.timestep)) self.constraint_enabled = False self.regulator_enabled = False self.granted = False self._rtia.enableTimeConstrained() self._rtia.enableTimeRegulation(self.logical_time, self.timestep) while not (self.constraint_enabled and self.regulator_enabled): self._rtia.tick(0, self.lookahead)
[docs] def advance_time(self): if self._time_sync: self.granted = False self._rtia.timeAdvanceRequest(self.logical_time + self.timestep) while not self.granted: self._rtia.tick(0, self.lookahead) else: self._rtia.tick()
[docs] def register_sync_point(self, label): self._rtia.registerFederationSynchronizationPoint(label, "Waiting for other simulators")
[docs] def wait_until_sync(self, label): # Make sure that we receive the announce sync point while not label in self.synchronisation_points: self._rtia.tick() self._rtia.synchronizationPointAchieved(label) while not self.synchronisation_points[label]: self._rtia.tick()
[docs] def register_object(self, handle, name): logger.debug("REGISTER object %s => %s" % (name, handle)) obj = self._rtia.registerObjectInstance(handle, name) self.registred_objects[name] = obj return obj
[docs] def delete_object(self, name): self._rtia.deleteObjectInstance( self.registred_objects[name], name) del self.registred_objects[name]
[docs] def get_object(self, name): if name in self.registred_objects: return self.registred_objects[name] if name in self._objects_discovered: return self._objects_discovered[name] return None
[docs] def object_handle(self, name): handle = self._object_handles.get(name, None) if not handle: handle = self._rtia.getObjectClassHandle(name) self._object_handles[name] = handle return handle
[docs] def attribute_handle(self, name, obj_handle): handle = self._attributes_handles.get((obj_handle, name), None) if not handle: handle = self._rtia.getAttributeHandle(name, obj_handle) self._attributes_handles[(obj_handle, name)] = handle return handle
[docs] def suscribe_attributes(self, name, obj_handle, attr_handles): logger.debug("suscribe_attributes %s %s => %s" % (name, obj_handle, attr_handles)) curr_tracked_attr = set(self._attributes_subscribed.get(name, [])) res = list(curr_tracked_attr.union(attr_handles)) self._attributes_subscribed[name] = res self._rtia.subscribeObjectClassAttributes(obj_handle, res) ref_cnt = self.registred_class_ref.get(obj_handle, 0) self.registred_class_ref[obj_handle] = ref_cnt + 1 logger.debug("registred_class_ref %s => %d" % (obj_handle, ref_cnt + 1))
[docs] def publish_attributes(self, name, obj_handle, attr_handles): logger.debug("publish_attributes %s %s" % (name, attr_handles)) curr_tracked_attr = set(self._attributes_published.get(name, [])) res = list(curr_tracked_attr.union(attr_handles)) self._attributes_published[name] = res self._rtia.publishObjectClass(obj_handle, attr_handles)
[docs] def unsuscribe_attributes(self, obj_handle): logger.debug("unsuscribe_attributes %s" % (obj_handle)) if not obj_handle in self.registred_class_ref: return self.registred_class_ref[obj_handle] -= 1 logger.debug("registred_class_ref %s => %d" % (obj_handle, self.registred_class_ref[obj_handle])) if self.registred_class_ref[obj_handle] == 0: self._rtia.unsubscribeObjectClass(obj_handle) del self.registred_class_ref[obj_handle]
[docs] def get_attributes(self, obj_name): return self._attributes_values.get(obj_name, None)
[docs] def update_attribute(self, obj_handle, value): logger.debug("update_attributes %s for %s" % (value, obj_handle)) if self._time_sync: self._rtia.updateAttributeValues(obj_handle, value, "morse_update", self.logical_time + self.timestep) else: self._rtia.updateAttributeValues(obj_handle, value, "morse_update")
# Callbacks for FedereteAmbassadors
[docs] def discoverObjectInstance(self, obj, objectclass, name): logger.debug("DISCOVER %s %s %s" % (name, obj, objectclass)) self._objects_discovered[name] = obj subscribed_attributes = self._attributes_subscribed.get(name, None) if subscribed_attributes: self._rtia.requestObjectAttributeValueUpdate(obj, subscribed_attributes) default_value = {} for attr in subscribed_attributes: default_value[attr] = None self._attributes_values[name] = default_value published_attributes = self._attributes_published.get(name, None) if published_attributes: logger.debug("attributeOwnershipAcquisition %s %s %s" % (name, obj, published_attributes)) self._rtia.attributeOwnershipAcquisition(obj, published_attributes, "morse_owner")
[docs] def attributeOwnershipAcquisitionNotification(self, obj, attr): obj_name = self._rtia.getObjectInstanceName(obj) logger.debug("attributeOwnershipAcquisitionNotification %s %s %s" % (obj, obj_name, attr))
[docs] def reflectAttributeValues(self, obj, attributes, tag, order, transport, time=None, retraction=None): try: obj_name = self._rtia.getObjectInstanceName(obj) logger.debug("reflectAttributeValues for %s %s" % (obj_name, attributes)) attr_entry = self._attributes_values.get(obj_name, None) if not attr_entry: return for key in attr_entry.keys(): if key in attributes: attr_entry[key] = attributes[key] except rti.ObjectNotKnown: logger.warning("Receive an RAV for object %s but it is not anymore " "in the simulation" % obj)
[docs] def timeConstrainedEnabled(self, time): logger.debug("Constrained at time %f" % time) self.logical_time = time self.constraint_enabled = True
[docs] def timeRegulationEnabled(self, time): logger.debug("Regulator at time %f" % time) self.logical_time = time self.regulator_enabled = True
[docs] def timeAdvanceGrant(self, time): logger.debug("time Advance granted %f" % time) self.logical_time = time self.granted = True
[docs] def announceSynchronizationPoint(self, label, tag): self.synchronisation_points[label] = False
[docs] def federationSynchronized(self, label): self.synchronisation_points[label] = True
[docs]class HLABaseNode: def __init__(self, klass, fom, node_name, federation, sync_point, sync_register, time_sync, timestep, lookahead): """ Initializes HLA (connection to RTIg, FOM file, publish robots...) """ logger.info("Initializing HLA node.") self._federation = federation self._sync_point = sync_point self._sync_register = sync_register self._time_sync = time_sync try: logger.debug("Creating RTIA...") self.rtia = rti.RTIAmbassador() logger.debug("RTIA created!") try: self.rtia.createFederationExecution(federation, fom) logger.info("%s federation created", federation) except rti.FederationExecutionAlreadyExists: logger.debug("%s federation already exists", federation) except rti.CouldNotOpenFED: logger.error("FED file not found! " + \ "Please check that the '.fed' file is in the CERTI " + \ "search path of RTIg.") raise except rti.ErrorReadingFED: logger.error("Error when reading FED file! " + \ "Please check the '.fed' file syntax.") raise logger.debug("Creating MorseAmbassador...") self.morse_ambassador = klass(self.rtia, federation, time_sync, timestep, lookahead) try: self.rtia.joinFederationExecution(node_name, federation, self.morse_ambassador) except rti.FederateAlreadyExecutionMember: logger.error("A Federate with name %s has already registered."+\ " Change the name of your federate or " + \ "check your federation architecture.", self.node_name) raise except rti.CouldNotOpenFED: logger.error("FED file not found! Please check that the " + \ "'.fed' file is in the CERTI search path.") raise except rti.ErrorReadingFED: logger.error("Error when reading FED file! "+ \ "Please check the '.fed' file syntax.") raise logger.info("HLA middleware initialized.") except Exception as error: logger.error("Error when connecting to the RTIg: %s." + \ "Please check your HLA network configuration.", error) raise
[docs] def init_time(self): if self._sync_point: if self._sync_register: self.morse_ambassador.register_sync_point(self._sync_point) print("Press ENTER when all simulators are ready") sys.stdin.read(1) self.morse_ambassador.wait_until_sync(self._sync_point) if self._time_sync: self.morse_ambassador.initialize_time_regulation()
[docs] def finalize(self): """ Close all open HLA connections. """ logger.info("Resigning from the HLA federation") if self.morse_ambassador: del self.morse_ambassador self.rtia.resignFederationExecution( rti.ResignAction.DeleteObjectsAndReleaseAttributes) try: self.rtia.destroyFederationExecution(self._federation) except: pass del self.rtia
[docs]class HLADatastreamManager(DatastreamManager): """ External communication using sockets. """ def __init__(self, args, kwargs): """ Initialize the socket connections """ # Call the constructor of the parent class DatastreamManager.__init__(self, args, kwargs) self._time_initialized = False try: fom = kwargs["fom"] node_name = kwargs["name"] federation = kwargs["federation"] sync_point = kwargs.get("sync_point", None) sync_register = kwargs.get("sync_register", False) self.time_sync = kwargs.get("time_sync", False) timestep = kwargs.get("timestep", 1.0 / blenderapi.getfrequency()) lookahead = kwargs.get("lookahead", timestep) self.stop_time = kwargs.get("stop_time", float("inf")) self.node = HLABaseNode(MorseBaseAmbassador, fom, node_name, federation, sync_point, sync_register, self.time_sync, timestep, lookahead) except KeyError as error: logger.error("One of [fom, name, federation] attribute is not configured: " "Cannot create HLADatastreamManager") raise
[docs] def finalize(self): DatastreamManager.finalize(self) self.node.finalize()
[docs] def register_component(self, component_name, component_instance, mw_data): """ Open the port used to communicate by the specified component. """ mw_data[3]['__hla_node'] = self.node DatastreamManager.register_component(self, component_name, component_instance, mw_data)
[docs] def action(self): if self._time_initialized: self.node.morse_ambassador.advance_time() else: self.node.init_time() self._time_initialized = True if self.time_sync and \ self.stop_time < self.node.morse_ambassador.logical_time: blenderapi.persistantstorage().serviceObjectDict["simulation"].quit()