Source code for morse.core.datastream

import logging; logger = logging.getLogger("morse." + __name__)
# Modules necessary to dynamically add methods to Middleware subclasses
import os
import sys
import re
import types

from abc import ABCMeta, abstractmethod

from morse.core.sensor import Sensor
from morse.core.actuator import Actuator
from morse.middleware import AbstractDatastream
from morse.helpers.loading import create_instance


[docs]def register_datastream(classpath, component, direction, args): datastream = create_instance(classpath, component, args) # Check that datastream implements AbstractDatastream if not isinstance(datastream, AbstractDatastream): logger.warning("%s should implement morse.middleware.AbstractDatastream"%classpath) if direction == 'OUT': component.output_functions.append(datastream.default) else: component.input_functions.append(datastream.default) # from morse.core.abstractobject.AbstractObject component.del_functions.append(datastream.finalize) return datastream
[docs]class DatastreamManager(object): """ Basic class for all middlewares that export a datastream interface (ie, all of them) Provides common attributes. """ # Make this an abstract class __metaclass__ = ABCMeta def __init__(self, args, kwargs): pass
[docs] def finalize(self): """ Destructor method. """ logger.info("Closing datastream interface <%s>." % self.__class__.__name__)
[docs] def register_component(self, component_name, component_instance, mw_data): datastream_classpath = mw_data[1] # aka. function name direction = mw_data[2] datastream_args = None if len(mw_data) > 3: datastream_args = mw_data[3] # aka. kwargs, a dictonnary of args # Create a socket server for this component return register_datastream(datastream_classpath, component_instance, direction, datastream_args)
[docs] def action(self): """ Main action, called each simulation round """ pass