import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service
from morse.core import status, blenderapi, mathutils
from morse.blender.main import reset_objects as main_reset, close_all as main_close, quit as main_terminate
from morse.core.abstractobject import AbstractObject
from morse.core.exceptions import *
import json
[docs]def get_structured_children_of(blender_object):
""" Returns a nested dictionary of the given objects children, recursively.
The retun format is as follows:
{blender_object.name: [children_dictionary, position, orientation]}
where children_dictionary is another of the same format, but with the keys
being the children of blender_object. This continues down the entire tree
structure.
:param KX_GameObject blender_object: The Blender object to return children
for.
"""
children = blender_object.children
orientation = blender_object.worldOrientation.to_quaternion()
position = blender_object.worldPosition
structure = { blender_object.name: [{},
(position.x, position.y, position.z),
(orientation.x, orientation.y,
orientation.z, orientation.w)
]
}
for c in children:
structure[blender_object.name][0].update(
get_structured_children_of(c) )
return structure
[docs]def get_obj_by_name(name):
"""
Return object in the scene associated to :param name:
If it does not exists, throw a MorseRPCInvokationError
"""
scene = blenderapi.scene()
if name not in scene.objects:
raise MorseRPCInvokationError(
"Object '%s' does not appear in the scene." % name)
return scene.objects[name]
[docs]class Supervision(AbstractObject):
def __init__(self):
AbstractObject.__init__(self)
[docs] def name(self):
return "simulation"
[docs] @service
def list_robots(self):
""" Return a list of the robots in the current scenario
Uses the list generated during the initialisation of the scenario
"""
return [obj.name for obj in blenderapi.persistantstorage().robotDict.keys()]
[docs] @service
def reset_objects(self):
""" Restore all simulation objects to their original position
Upon receiving the request using sockets, call the
'reset_objects' function located in morse/blender/main.py
"""
contr = blenderapi.controller()
main_reset(contr)
return "Objects restored to initial position"
[docs] @service
def quit(self):
""" Cleanly quit the simulation
"""
contr = blenderapi.controller()
main_close(contr)
main_terminate(contr)
[docs] @service
def terminate(self):
""" Terminate the simulation (no finalization done!)
"""
contr = blenderapi.controller()
main_terminate(contr)
[docs] @service
def activate(self, component_name):
""" Enable the functionality of the component specified
"""
try:
blenderapi.persistantstorage().componentDict[component_name]._active = True
except KeyError as detail:
logger.warn("Component %s not found. Can't activate" % detail)
raise MorseRPCTypeError("Component %s not found. Can't activate" % detail)
[docs] @service
def deactivate(self, component_name):
""" Stop the specified component from calling its default_action method
"""
try:
blenderapi.persistantstorage().componentDict[component_name]._active = False
except KeyError as detail:
logger.warn("Component %s not found. Can't deactivate" % detail)
raise MorseRPCTypeError("Component %s not found. Can't deactivate" % detail)
[docs] @service
def suspend_dynamics(self):
""" Suspends physics for all object in the scene.
"""
scene = blenderapi.scene()
for object in scene.objects:
object.suspendDynamics()
return "Physics is suspended"
[docs] @service
def restore_dynamics(self):
""" Resumes physics for all object in the scene.
"""
scene = blenderapi.scene()
for object in scene.objects:
object.restoreDynamics()
return "Physics is resumed"
[docs] @service
def details(self):
"""Returns a structure containing all possible details
about the simulation currently running, including
the list of robots, the list of services and datastreams,
the list of middleware in use, etc.
"""
simu = blenderapi.persistantstorage()
details = {}
# Retrieves the list of services and associated middlewares
services = {}
services_iface = {}
for n, i in simu.morse_services.request_managers().items():
services.update(i.services())
for cmpt in i.services():
services_iface.setdefault(cmpt, []).append(n)
def cmptdetails(c):
c = simu.componentDict[c.name]
cmpt = {"type": type(c).__name__,}
if c.name() in services:
cmpt["services"] = services[c.name()]
cmpt["service_interfaces"] = services_iface[c.name()]
if c.name() in simu.datastreams:
streams = simu.datastreams[c.name()]
cmpt["stream_interfaces"] = [(stream[0], stream[2]) for stream in streams]
return cmpt
def robotdetails(r):
robot = {"name": r.name(),
"type": type(r).__name__,
"components": {c.name:cmptdetails(c) for c in r.components},
}
if r.name() in services:
robot["services"] = services[r.name()]
robot["services_interfaces"] = services_iface[r.name()]
return robot
for n, i in simu.stream_managers.items():
pass
details['robots'] = [robotdetails(r) for n, r in simu.robotDict.items()]
details['environment'] = blenderapi.getssr()['environment_file']
return details
[docs] @service
def set_log_level(self, component, level):
"""
Allow to change the logger level of a specific component
:param string component: the name of the logger you want to modify
:param string level: the desired level of logging
"""
my_logger = logging.getLogger('morse.' + component)
try:
my_logger.setLevel(level)
except ValueError as exn:
raise MorseRPCInvokationError(str(exn))
[docs] @service
def get_scene_objects(self):
""" Returns a hierarchial dictonary structure of all objects in the scene
along with their positions and orientations, formated as a Python string
representation.
The structure:
{object_name: [dict_of_children, position_tuple, quaternion_tuple],
object_name: [dict_of_children, position_tuple, quaternion_tuple],
...}
"""
scene = blenderapi.scene()
# Special Morse items to remove from the list
remove_items = ['Scene_Script_Holder', 'CameraFP', '__default__cam__',
'MORSE.Properties', '__morse_dt_analyser']
top_levelers = [o for o in scene.objects
if o.parent is None and
not o.name in remove_items]
objects = {}
for obj in top_levelers:
objects.update(get_structured_children_of(obj))
return objects
[docs] @service
def set_object_visibility(self, object_name, visible, do_children):
""" Set the visibility of an object in the simulation.
Note: The object will still have physics and dynamics despite being invisible.
:param string object_name: The name of the object to change visibility of.
:param visible boolean: Make the object visible(True) or invisible(False)
:param do_children boolean: If True then the visibility of all children of
object_name is also set."""
blender_object = get_obj_by_name(object_name)
blender_object.setVisible(visible, do_children)
return visible
[docs] @service
def set_object_dynamics(self, object_name, state):
""" Enable or disable the dynamics for an individual object.
Note: When turning on dynamics, the object will continue with the velocities
it had when it was turned off.
:param string object_name: The name of the object to change.
:param state boolean: Turn on dynamics(True), or off (False)
"""
blender_object = get_obj_by_name(object_name)
if state:
blender_object.restoreDynamics()
else:
blender_object.suspendDynamics()
return state
[docs] @service
def set_camarafp_far_clip(self, far_clip):
""" Set the CamaraFP (MORSE' environment camera) far clip distance
:param far_clip: The camera's far clip distance.
:type far_clip: float
"""
if far_clip > 0:
blender_object = get_obj_by_name('CameraFP')
blender_object.far = far_clip
return far_clip
return 0
[docs] @service
def set_camarafp_position(self, position):
""" Set the CamaraFP (MORSE' environment camera) world position. [x, y, z]
:param position: The camera's world position. [x, y, z].
:type position: list(float)
"""
blender_object = get_obj_by_name('CameraFP')
blender_object.worldPosition = position
return position
[docs] @service
def get_camarafp_position(self):
""" Get the CamaraFP (MORSE' environment camera) world position. [x, y, z]
:returns: The camera's world position. [x, y, z].
"""
blender_object = get_obj_by_name('CameraFP')
return blender_object.worldPosition
[docs] @service
def set_camarafp_projection_matrix(self, projection_matrix):
""" Set the CamaraFP (MORSE' environment camera) 4x4 projection matrix
:param projection_matrix: The camera's 4x4 projection matrix.
:type projection_matrix: 4x4 Matrix [[float]]
"""
try:
blender_object = get_obj_by_name('CameraFP')
blender_object.projection_matrix = mathutils.Matrix(projection_matrix)
return projection_matrix
except SystemError: # if the matrix is not 4x4 numpy raises a SystemError
raise MorseRPCInvokationError( "The Matrix must be 4x4 [[float]]" )
[docs] @service
def get_camarafp_projection_matrix(self):
""" Get the CamaraFP (MORSE' environment camera) 4x4 projection matrix
"""
blender_object = get_obj_by_name('CameraFP')
return [list(vec) for vec in blender_object.projection_matrix]
[docs] @service
def set_object_position(self, object_name, position, orientation = None):
""" Set the position (and optionally orientation of an object in
the simulation. [x, y, z]
:param position: The objects's world position. [x, y, z].
:type position: list(float)
:param orientation: (optional) The object's world
orientation [roll pitch yaw] in radians
:type: orientation: list(float)
"""
blender_object = get_obj_by_name(object_name)
blender_object.worldPosition = position
if orientation:
blender_object.worldOrientation = orientation