import logging; logger = logging.getLogger("morse." + __name__)
import socket
import select
import json
import errno
import time
from morse.core.datastream import DatastreamManager
from morse.helpers.transformation import Transformation3d
from morse.middleware import AbstractDatastream
from morse.core import services
from morse.core.exceptions import MorseRPCInvokationError, MorseMiddlewareError
try:
import mathutils
except ImportError:
# running outside Blender
mathutils = None
[docs]class MorseEncoder(json.JSONEncoder):
[docs] def default(self, obj):
if isinstance(obj, mathutils.Vector):
return obj[:]
if isinstance(obj, mathutils.Matrix):
# obj[:][:] gives list(mathutils.Vector)
return [list(vec) for vec in obj]
if isinstance(obj, mathutils.Quaternion):
return {'x' : obj.x, 'y': obj.y, 'z': obj.z, 'w': obj.w }
if isinstance(obj, mathutils.Euler):
return {'yaw': obj.z, 'pitch': obj.y, 'roll': obj.x }
if isinstance(obj, Transformation3d):
return {'x': obj.x, 'y': obj.y, 'z': obj.z,
'yaw': obj.yaw, 'pitch': obj.pitch, 'roll': obj.roll }
return json.JSONEncoder.default(self, obj)
[docs]class SocketServ(AbstractDatastream):
[docs] def initialize(self):
# List of socket clients
self._client_sockets = []
self._message_size = 4096
self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._server.bind(('', self.kwargs['port']))
self._server.listen(1)
logger.info("Socket Mw Server now listening on port " + str(self.kwargs['port']) + \
" for component " + str(self.component_name) + ".")
[docs] def finalize(self):
""" Terminate the ports used to accept requests """
if self._client_sockets:
logger.info("Closing client sockets...")
for s in self._client_sockets:
s.close()
if self._server:
try:
logger.info("Shutting down connections to server...")
self._server.shutdown(socket.SHUT_RDWR)
except socket.error as err_info:
# ignore exception raised on OSX for closed sockets
if err_info.errno != errno.ENOTCONN:
raise
logger.info("Closing socket server...")
self._server.close()
[docs] def close_socket(self, sock):
try:
self._client_sockets.remove(sock)
sock.close()
except socket.error as error_info:
logger.warning("Socket error catched while closing: " + str(error_info))
del self._server
except:
pass
[docs]class SocketPublisher(SocketServ):
_type_name = "straight JSON serialization"
[docs] def default(self, ci='unused'):
sockets = self._client_sockets + [self._server]
try:
inputready, outputready, _ = select.select(sockets, sockets, [], 0)
except select.error:
pass
except socket.error:
pass
if self._server in inputready:
sock, _ = self._server.accept()
self._client_sockets.append(sock)
if outputready:
message = self.encode()
for o in outputready:
try:
o.send(message)
except socket.error:
self.close_socket(o)
[docs] def encode(self):
js = json.dumps(self.component_instance.local_data, cls=MorseEncoder)
return (js + '\n').encode()
[docs]class SocketReader(SocketServ):
_type_name = "straight JSON deserialization"
[docs] def default(self, ci='unused'):
sockets = self._client_sockets + [self._server]
try:
inputready, outputready, exceptready = select.select(sockets, [], [], 0)
except select.error:
pass
except socket.error:
pass
got_new_information = False
for i in inputready:
if i == self._server:
sock, addr = self._server.accept()
logger.debug("New client connected to %s datastream" % self.component_name)
if self._client_sockets:
logger.warning("More than one client trying to write on %s datastream!!" % self.component_name)
self._client_sockets.append(sock)
else:
try:
buf = []
msg = ""
full_msg = False
while not full_msg:
msg = i.recv(self._message_size).decode()
logger.debug("received msg %s" % msg)
if not msg: # client disconnected
self.close_socket(i)
else:
buf.append(msg)
full_msg = (len(msg) != self._message_size)
if not msg.endswith('\n'):
logger.error("Malformed message on socket datastream "+\
"(no linefeed at the end): <%s>" % msg)
continue
msg = ''.join(buf).rstrip("\n").split("\n")
logger.debug("received msg %s" % msg)
if len(msg)>1:
logger.warning("Messages missed on socket datastream! <%s>" % msg[:-1])
self.component_instance.local_data = self.decode(msg[-1]) # keep only the last msg if we got several in row
got_new_information = True
except socket.error as detail:
self.close_socket(i)
return got_new_information
[docs] def decode(self, msg):
return json.loads(msg)
[docs]class SocketDatastreamManager(DatastreamManager):
""" External communication using sockets. """
def __init__(self, args, kwargs):
""" Initialize the socket connections """
# Call the constructor of the parent class
DatastreamManager.__init__(self, args, kwargs)
self.time_sync = kwargs.get('time_sync', False)
self.sync_port = kwargs.get('sync_port', 6000)
if self.time_sync:
self._init_trigger()
# port -> MorseSocketServ
self._server_dict = {}
# component name (string) -> Port (int)
self._component_nameservice = {}
# Base port
self._base_port = 60000
# Register two special services in the socket service manager:
# TODO To use a new special component instead of 'simulation',
# uncomment the line :-)
# blenderapi.persistantstorage().morse_services.register_request_manager_mapping("streams", "SocketRequestManager")
services.do_service_registration(self.list_streams, 'simulation')
services.do_service_registration(self.get_stream_port, 'simulation')
services.do_service_registration(self.get_all_stream_ports, 'simulation')
def __del__(self):
if self.time_sync:
self._end_trigger()
def _init_trigger(self):
self._sync_client = None
self._sync_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sync_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sync_server.bind(('', self.sync_port))
self._sync_server.listen(1)
logger.info("Creating clock synchronisation on port %d" % self.sync_port)
def _wait_trigger(self):
# If there is some client, just wait on it
if self._sync_client:
logger.debug("Waiting trigger")
msg = self._sync_client.recv(2048)
now = time.time()
logger.debug('Synced after %f' % (now - self._last_sync_time))
self._last_sync_time = now
if not msg: #deconnection of client
self._sync_client = None
else:
# Otherwise, we just check if there is some client waiting
# If there is no client, we do not block for the moment to avoid
# weird interaction at the startup
logger.debug("Checking for some client on synchronisation port")
try:
inputready, _, _ = select.select([self._sync_server], [], [], 0)
except select.error:
pass
except socket.error:
pass
if self._sync_server in inputready:
self._sync_client, _ = self._sync_server.accept()
self._last_sync_time = time.time()
def _end_trigger(self):
self._sync_client.close()
self._sync_server.shutdown(socket.SHUT_RDWR)
[docs] def list_streams(self):
""" List all publish streams.
"""
return list(self._component_nameservice.keys())
[docs] def get_stream_port(self, name):
""" Get stream port for stream name.
"""
port = -1
try:
port = self._component_nameservice[name]
except KeyError:
pass
if port < 0:
raise MorseRPCInvokationError("Stream unavailable for component %s" % name)
return port
[docs] def get_all_stream_ports(self):
""" Get stream ports for all streams.
"""
return self._component_nameservice
[docs] def register_component(self, component_name, component_instance, mw_data):
""" Open the port used to communicate by the specified component.
"""
register_success = False
must_inc_base_port = False
kwargs = mw_data[3]
if not 'port' in kwargs:
must_inc_base_port = True
kwargs['port'] = self._base_port
while not register_success:
try:
# Create a socket server for this component
serv = DatastreamManager.register_component(self, component_name,
component_instance, mw_data)
register_success = True
except socket.error as error_info:
if error_info.errno == errno.EADDRINUSE:
kwargs['port'] += 1
if must_inc_base_port:
self._base_port += 1
else:
raise
self._server_dict[kwargs['port']] = serv
self._component_nameservice[component_name] = kwargs['port']
if must_inc_base_port:
self._base_port += 1
[docs] def action(self):
if self.time_sync:
self._wait_trigger()