Source code for morse.core.datastream
import logging; logger = logging.getLogger("morse." + __name__)
# Modules necessary to dynamically add methods to Middleware subclasses
import os
import sys
import re
import types
from abc import ABCMeta, abstractmethod
from morse.core.sensor import Sensor
from morse.core.actuator import Actuator
from morse.middleware import AbstractDatastream
from morse.helpers.loading import create_instance
[docs]def register_datastream(classpath, component, direction, args):
datastream = create_instance(classpath, component, args)
# Check that datastream implements AbstractDatastream
if not isinstance(datastream, AbstractDatastream):
logger.warning("%s should implement morse.middleware.AbstractDatastream"%classpath)
if direction == 'OUT':
component.output_functions.append(datastream.default)
else:
component.input_functions.append(datastream.default)
# from morse.core.abstractobject.AbstractObject
component.del_functions.append(datastream.finalize)
return datastream
[docs]class DatastreamManager(object):
""" Basic class for all middlewares that export a datastream interface (ie,
all of them)
Provides common attributes. """
# Make this an abstract class
__metaclass__ = ABCMeta
def __init__(self, args, kwargs):
pass
[docs] def finalize(self):
""" Destructor method. """
logger.info("Closing datastream interface <%s>." % self.__class__.__name__)
[docs] def register_component(self, component_name, component_instance, mw_data):
datastream_classpath = mw_data[1] # aka. function name
direction = mw_data[2]
datastream_args = None
if len(mw_data) > 3:
datastream_args = mw_data[3] # aka. kwargs, a dictonnary of args
# Create a socket server for this component
return register_datastream(datastream_classpath, component_instance,
direction,
datastream_args)
[docs] def action(self):
""" Main action, called each simulation round """
pass