morse.core package¶
Submodules¶
morse.core.abstractobject module¶
-
class
AbstractObject
[source]¶ Bases:
object
An abstract object. All components in MORSE (either physical components like actuators or sensors or pseudo components like component overlays) inherit from AbstractObject.
This abstract class provides all RPC service-related mechanics.
-
completed
(status, result=None)[source]¶ This method must be invoked by the component when a service completes.
Calling this method triggers the notification of task completion to the client.
Parameters: - status (morse.core.status) – status (success, failure…) of the task
- result – results of the service, if any (may be any valid Python object)
-
interrupt
()[source]¶ This method is automatically invoked by the component when a service is interrupted, basically to notify to the client that the task has been interrupted.
It can be overriden in each component to provide a true interrupt, for exemple resseting the speed command to 0.0.
If you override it, do not forget to call
self.completed
withstatus.PREEMPTED
.
-
on_completion
= None¶ When a task is considered ‘completed’ (the semantic of ‘completed’ is left to each component), the default_action method is expected to call this callback (if not None) with the task status (from
core.status.*
) + optional return value as a tuple.For instance:
self.on_completion((status.FAILED, "Couldn't reach the target")) self.on_completion((status.SUCCESS, {'x':1.0, 'y':0.54})) self.on_completion((status.SUCCESS))
-
morse.core.actuator module¶
morse.core.ansistrm module¶
-
class
ColorizingStreamHandler
(scheme=None)[source]¶ Bases:
logging.StreamHandler
-
bright_scheme
= {10: (None, 'blue', False, False), 20: (None, 'white', False, False), 23: (None, 'green', False, False), 25: (None, 'green', True, False), 30: (None, 'yellow', False, False), 40: (None, 'red', False, False), 50: ('red', 'white', True, False)}¶
-
color_map
= {'black': 0, 'blue': 4, 'cyan': 6, 'green': 2, 'magenta': 5, 'red': 1, 'white': 7, 'yellow': 3}¶
-
csi
= '\x1b['¶
-
dark_scheme
= {10: (None, 'blue', False, False), 20: (None, 'black', False, False), 23: (None, 'green', False, False), 25: (None, 'green', True, False), 30: (None, 'yellow', False, False), 40: (None, 'red', False, False), 50: ('red', 'black', True, False)}¶
-
is_tty
¶
-
mono_scheme
= {10: (None, None, False, False), 20: (None, None, False, False), 23: (None, None, False, False), 25: (None, None, False, False), 30: (None, None, False, False), 40: (None, None, False, False), 50: (None, None, False, False)}¶
-
reset
= '\x1b[0m'¶
-
xmas_scheme
= {10: ('red', 'yellow', False, True), 20: ('red', 'white', False, True), 23: ('red', 'white', False, True), 25: ('red', 'yellow', False, True), 30: ('red', 'yellow', False, True), 40: ('red', 'yellow', False, True), 50: ('red', 'white', False, True)}¶
-
morse.core.blenderapi module¶
This module wraps the calls to the Blender Python API. This is intended for all the cases we need to run MORSE code outside Blender (mostly for documentation generation purposes).
morse.core.datastream module¶
morse.core.exceptions module¶
-
exception
MorseBuilderBadSyntaxError
(value)[source]¶ Bases:
morse.core.exceptions.MorseBuilderError
Morse Error caused by a mistyped method or object name in Builder.
-
exception
MorseBuilderError
(value)[source]¶ Bases:
morse.core.exceptions.MorseError
Morse Error caused by the Builder API.
-
exception
MorseBuilderNoComponentError
(value)[source]¶ Bases:
morse.core.exceptions.MorseBuilderError
Morse Error caused by a wrong component in Builder.
-
exception
MorseBuilderUnexportableError
(value)[source]¶ Bases:
morse.core.exceptions.MorseBuilderError
Morse Error caused by a call to add_{stream, service, interface} on an unexportable component
-
exception
MorseEnvironmentError
(value)[source]¶ Bases:
morse.core.exceptions.MorseError
Morse Error triggered while manipulating MORSE environments (typically, wrong permissions on a file or inexistant environment).
-
exception
MorseMiddlewareError
(value)[source]¶ Bases:
morse.core.exceptions.MorseError
Morse Error caused by a Middleware.
-
exception
MorseMultinodeError
(value)[source]¶ Bases:
morse.core.exceptions.MorseError
Morse Error caused by a Multinode configuration.
-
exception
MorseServiceError
(value)[source]¶ Bases:
morse.core.exceptions.MorseError
Morse Error caused by a Service.
morse.core.external_object module¶
-
class
ExternalObject
(obj, parent=None)[source]¶ Bases:
morse.core.object.Object
Basic Class for all External Object
Provides common attributes.
morse.core.mathutils module¶
This module wraps the calls to the Blender ‘mathutils’ API. This is intended for all the cases we need to run MORSE code outside Blender (mostly for documentation generation purposes).
morse.core.morse_time module¶
This module deals with the time management in Morse, providing several possible implementations.
- At the moment, it provides two implementations:
- best effort (i.e. try to simulate at real-time, by dropping frame). The simulation is less acurate, because the physical steps are not really constant.
- fixed simulation step. Compute all physical / logical step. The simulation will be more precise, but the simulation time will differ from computer clock time.
-
class
BestEffortStrategy
(relative_time)[source]¶ Bases:
object
-
mean
¶
-
real_time
¶
-
update
()[source]¶ The exact physical time elapsed between two logic call is hard to guess. In the nominal case, it is easy, as long as you have one logical step per render step. In other case, it depends on logic_max_step, physics_max_step, and “complex” internal logic.
So, instead of guessing it, observe it. Assuming the physical engine is perfect, put a solid at one meter by sec on x axis, and observe its displacement between two frame. We have:
dx = vx * dtwhere vw = 1.0. So we have dx = dt.
Modern version of Blender (>= 2.77) provides the method bge.logic.getFrameTime() so if this information is available, just use it.
-
-
class
TimeStrategies
[source]¶ Bases:
object
-
BestEffort
= 0¶
-
FixedSimulationStep
= 1¶
-
internal_mapping
= {0: {'impl': <class 'morse.core.morse_time.BestEffortStrategy'>, 'python_repr': b'TimeStrategies.BestEffort', 'human_repr': 'Best Effort'}, 1: {'impl': <class 'morse.core.morse_time.FixedSimulationStepStrategy'>, 'python_repr': b'TimeStrategies.FixedSimulationStep', 'human_repr': 'Fixed Simulation Step'}}¶
-
morse.core.multinode module¶
morse.core.object module¶
-
class
Object
(obj, parent=None)[source]¶ Bases:
morse.core.abstractobject.AbstractObject
Basic Class for all 3D objects (components) used in the simulation. Provides common attributes.
-
action
()[source]¶ Call the regular action function of the component.
Can be redefined in some of the subclases (sensor and actuator).
-
default_action
()[source]¶ Base action performed by any object.
This method should be implemented by all subclasses that will be instanced (GPS, v_Omega, ATRV, etc.).
-
fetch_properties
()[source]¶ Returns the “_properties” of a component :return: a dictionary of the field “_properties”
-
frequency
¶ Frequency of the object action in Hz (float).
-
get_configurations
()[source]¶ Returns the configurations of a component (parsed from the properties).
Returns: a dictionary of the current component’s configurations
-
get_properties
()[source]¶ Returns the properties of a component.
Returns: a dictionary of the current component’s properties
-
in_zones
(name=None, type=None)[source]¶ Determine which zone(s) contain(s) current object
If a :param name: is precised, check only if this specific zone contains the position If a :param type: is precised, only search in the zone of this type.
-
initialize_local_data
()[source]¶ Creates and initializes ‘local data’ fields, according to the current component abstraction level.
-
periodic_call
()[source]¶ Return true if the component must be called on this loop call, False otherwise
-
set_property
(prop_name, prop_val)[source]¶ Modify one property on a component
Parameters: prop_name – the name of the property to modify (as shown the documentation) :param prop_val: the new value of the property. Note that there is no checking about the type of the value so be careful
Returns: nothing
-
morse.core.overlay module¶
-
class
MorseOverlay
(overlaid_object)[source]¶ Bases:
morse.core.abstractobject.AbstractObject
This class allows to define ‘overlay’. An ‘overlay’ is a pseudo component that masks a MORSE default component behind a custom facet (with for instance custom signatures for services, ports, etc.).
This is especially useful when integrating MORSE into an existing robotic architecture where components have custom services/ports/topics whose signature does not match MORSE defaults.
As of MORSE 0.4, only services can currently be overlaid.
-
chain_callback
(fn=None)[source]¶ When calling a component asynchronous service from an overlay, a callback (used to notify the client upon service completion) must be passed through. This callback does not usually appear in the service signature since it is added by the
@async_service
decorator.Under normal circumstances, you must use this method as callback.
For instance, assume a
Dummy
component and an overlayMyDummy
:class Dummy(MorseObject): @async_service def dummy_service(self, arg1): # Here, dummy_service has a callback parameter added # by the decorator pass class MyDummy(MorseAbstractobject): @async_service def mydummy_service(self, arg1): # [...do smthg useful] # We call the overlaid asynchronous service # 'dummy_service' by passing a special callback # returned by 'self.chain_callback()' self.overlaid_object.dummy_service(self.chain_callback(), arg1)
chain_callback
takes a functor as an optional parameter. This functor is called after the (overlaid) service completion, but just before notifying the simulator client.It can be used for output formatting for instance.
The functor must take one single parameter (a tuple
(status, result)
) and must as well return a tuple(status, result)
.class MyDummy(MorseAbstractobject): def mydummy_on_completion(self, result): # This functor - here a simple function - simply # format the result output. # It could do anything else. status, value = result return (status, " . ".join(value)) @async_service def mydummy_service(self, arg1): self.overlaid_object.dummy_service(self.chain_callback(self.mydummy_on_completion), arg1)
Parameters: fn – a functor to be called on service completion, before notifying the clients. Must have the following signature: (status, result) fn((status, result))
-
morse.core.request_manager module¶
-
class
RequestManager
[source]¶ Bases:
object
Basic Class for all request dispatchers, i.e., classes that implement a request service.
- A request service offers typically 2 things:
- the ability for a component (a robot, a sensor or the simulator as a whole) to expose a RPC method (typically for remote configuration or debug),
- an interface with a specific middleware to serialize the RPC call and communicate with the outside world.
Components can register such a service with the ‘register_service’ method. Please check its documentation for details.
To implement a concrete RequestManager (for a new middleware, for instance), the following methods must be overloaded:
initialization()
: perform here middleware specific initializationfinalization()
: perform here middleware specific finalizationpost_registration()
: put here all middleware specific code that must be executed when a new service is registered.on_service_completion()
: this method is called when a ‘long term’ request completes. You should implement here a way to notify your clients.main()
: this method is called at each step of the simulation. You should read there incoming requests and write back results.
When a new request arrives, you must pass it to
on_incoming_request()
that dispatch or invoke properly the request.Subclasses are also expected to overload the special
__str__()
method to provide middleware specific names.-
abort_request
(request_id)[source]¶ This method will interrupt a running asynchronous service, uniquely described by its request_id
-
finalization
()[source]¶ This method is meant to be overloaded by middlewares to perform specific finalizations.
Must return True is the finalization is successful, False in other cases.
-
initialization
()[source]¶ This method is meant to be overloaded by middlewares to perform specific initializations.
Must return True is the initialization is successful, False in other cases.
-
main
()[source]¶ This is the main method of the RequestManagerClass: it reads external incoming requests, dispatch them through the
on_incoming_request()
method, and write back answers.Subclasses are expected to overload this method.
-
on_incoming_request
(component, service, params)[source]¶ This method handles incoming requests: it figures out who registered the service, checks if the service returns immediately or must be started and only later checked for termination, invokes the service, and returns the service result (for service that returns immediately).
If something goes wrong while trying to call the method, a
morse.core.exceptions.MorseRPCInvokationError
is raised.If everything goes well, the method return a tuple:
(True, return_value)
or(False, request_id)
. The first item tells if the service is a synchronous (short-term) service (value isTrue
) or an asynchronous service (False
).For asynchronous services, the returned request id should allow to track the completion of the service. Upon completion,
on_service_completion()
is invoked.
-
on_service_completion
(request_id, result)[source]¶ This method is called when a asynchronous request completes.
Subclasses are expected to overload this method with code to notify the original request emitter.
Parameters: - request_id (uuid) – the request id, as return by
on_incoming_request()
when processing an asynchronous request - result – the service execution result.
- request_id (uuid) – the request id, as return by
-
post_registration
(component_name, service_name, is_async)[source]¶ This method is meant to be overloaded by middlewares that have specific initializations to do when a new service is exposed.
Parameters: - component_name (string) – name of the component that declare this service
- service_name (string) – Name of the service (if not overloaded in the @service decorator, should be the Python function name that implement the service)
- is_async (boolean) – If true, means that the service is asynchronous.
Returns: True if the registration succeeded.
Return type: boolean
-
process
()[source]¶ This method is the one actually called from the MORSE main loop.
It simply updates the list of pending requests (if any) and calls the main processing method.
-
register_async_service
(component_name, callback, service_name=None)[source]¶ Allows a component to register an asynchronous RPC method.
A asynchronous method can last for several cycles without blocking the simulator. The callback method must take as first parameter a callable that must be used to set the results of the service upon completion.
For example, consider the following sample of asynchronous service:
def complex_computation(result_setter, param1, param2): do_computation_step() #should stay short, but can last several simulation steps if computation_done: result_setter(computation_results) request_manager.register_async_service("computer", complex_computation)
As soon as the ‘result_setter’ is called with the results of the service, the clients of this service are notified via their middlewares.
See
register_service()
for detailed documentation of parameters.
-
register_service
(component_name, callback, service_name=None, asynchronous=False)[source]¶ Allows a component to register a synchronous RPC method that is made publicly available to the outside world.
Parameters: - component_name (string) – name of the component that declare this service
- callback (callable) – the method name to invoke on incoming request. If service_name is not defined, it will also be used as the public name of the service. If asynchronous is false (synchronous service), the method is expected to return immediately. In this case, its return value is immediately send back to the original caller.
- asynchronous (boolean) – if true, the service is asynchronous: it can last for
several cycles without blocking the communication interface.
See
register_async_service()
for details. - service_name – if defined, service_name is used as public name for this RPC method.
morse.core.robot module¶
-
class
Robot
(obj, parent=None)[source]¶ Bases:
morse.core.object.Object
Basic Class for all robots
Inherits from the base object class.
-
apply_speed
(kind, linear_speed, angular_speed)[source]¶ Apply speed parameter to the robot
Parameters: kind (string) – the kind of control to apply. Can be one of [‘Position’, ‘Velocity’]. :param list linear_speed: the list of linear speed to apply, for each axis, in m/s. :param list angular_speed: the list of angular speed to apply, for each axis, in rad/s.
-
morse.core.sensor module¶
-
class
Sensor
(obj, parent=None)[source]¶ Bases:
morse.core.object.Object
Basic Class for all sensors
Inherits from the base object class.
morse.core.services module¶
-
class
MorseServices
(impls=None)[source]¶ Bases:
object
-
add_request_manager
(classpath)[source]¶ Initializes and adds a new request manager from its name.
Parameters: classpath (string) – the name (and path) of the Python class that implements the RequestManager interface (eg: ‘morse.middleware.socket_request_manager.SocketRequestManager’, ‘morse.middleware.yarp_request_manager.YarpRequestManager’,…). Returns: True if the request manager has been successfully loaded.
-
register_request_manager_mapping
(component, request_manager)[source]¶ Adds a mapping between a component and a request manager: all services exposed by this component are handled by this request manager.
A component may have 0, 1 or more request managers. if more than one, each request manager can independently invoke the service.
Parameters: - component (string) – the name of the component that use request_manager as request manager.
- request_manager (string) – the classpath of the request manager (eg: ‘morse.middleware.socket_request_manager.SocketRequestManager’, ‘morse.middleware.yarp_request_manager.YarpRequestManager’,…).
-
-
async_service
(fn=None, component=None, name=None)[source]¶ The @async_service decorator.
Refer to the @service decorator for most of the doc.
Asynchronous service specifics:
- The function that is decorated is expected to simply start the service, and immediately return.
- If the service can not be started, the function must throw a
MorseRPCInvokationError
with a error message explaining why the initialization failed.
-
do_service_registration
(fn, component_name=None, service_name=None, asynchronous=False, request_managers=None)[source]¶
-
interruptible
(fn)[source]¶ The @interruptible decorator.
Use this decorator to set an (asynchronous) service to be interruptible.
If MORSE receives a request for a new service while an interruptible service is running, the running service is preempted (the requester receives a
morse.core.status.PREEMPTED
status), and the new one is started.See also
noninterruptible()
decorator.
-
noninterruptible
(fn)[source]¶ The @noninterruptible decorator.
Use this decorator to set an (asynchronous) service to be non interruptible.
If MORSE receives a request for a new service while a non interruptible service is running, a failure message is returned to the requester.
See also
interruptible()
decorator.
-
service
(fn=None, component=None, name=None, asynchronous=False)[source]¶ The @service decorator.
This decorator can be used to automagically register a service in MORSE. Simply decorate the method you want to export as a RPC service with @service and MORSE automatically add and register it with the right middleware (depending on what is specified in the simulation configuration file).
This decorator works both with free function and for methods in classes inheriting from
morse.core.object.Object
. In the former case, you must specify a component (your service will belong to this namespace), in the later case, it is automatically set to the name of the corresponding MORSE component.Parameters: - fn (callable) – [automatically set by Python to point to the decorated function]
- component (string) – you MUST set this parameter to define the name of the component which export the service ONLY for free functions. Cf explanation above.
- name (string) – by default, the name of the service is the name of the method. You can override it by setting the ‘name’ argument.
- asynchronous (boolean) – if set to True (default value when using
@async_service), a new ‘callback’ parameter is added to the method.
This callback is used to notify the service initiator that the service
completed. The callback does not need to be build manually:
morse.core.request_manager.RequestManager.on_incoming_request()
takes care of it.
morse.core.status module¶
-
FAILED
= 'FAILED'¶ The request failed.
-
PREEMPTED
= 'PREEMPTED'¶ The request was interupted and replaced by another one
-
SUCCESS
= 'SUCCESS'¶ The request was successfully achieved
morse.core.wheeled_robot module¶
-
class
PhysicsAckermannRobot
(obj, parent=None)[source]¶ Bases:
morse.core.wheeled_robot.PhysicsWheelRobot
Base class for mobile robots following the Ackermann steering principle
This base class handles the simulation of the physical interactions between Ackermann-like vehicle and the ground. It assumes the vehicle has 4 wheels.
To ensure proper (v, w) enforcement, the model relies on some internal PID controller, hence the different properties.
-
apply_vw_wheels
(vx, vw)[source]¶ Apply (v, w) on the parent robot.
We cannot rely on the theoric ackermann model due to important friction generation by front wheel. So, use a simple PID to guarantee the constraints
-
attach_front_wheel_to_body
(wheel, parent)[source]¶ Attaches the wheel to the given parent using a 6DOF constraint
Set the wheel positions relative to the robot in case the chassis was moved by the builder script or manually in blender
-
-
class
PhysicsDifferentialRobot
(obj, parent=None)[source]¶ Bases:
morse.core.wheeled_robot.PhysicsWheelRobot
Base class for mobile robots using a differential drive motion model.
This base class handles the simulation of the physical interactions between differential-drive robots and the ground.
-
class
PhysicsWheelRobot
(obj, parent)[source]¶ Bases:
morse.core.robot.Robot
Abstract base class for robots with wheels that turn as the robot moves. The wheels must be children of the robot in the Blender file.
morse.core.zone module¶
-
class
Zone
(obj)[source]¶ Bases:
object
Creates a named zone in the 3D environment. This can be used by components via the :py:class:morse.core.ZoneManagerto trigger specific behaviours when the component is inside the zone.
The main method is the contains one, which allow to decide if a point is part of a zone (and so potentially to trigger some behaviour).
Note that for simplicity, we currently assume the zone axis follows the world axis.
-
class
ZoneManager
[source]¶ Bases:
object
Handle the different zones, allowing to search for them ‘efficiently’ (at least in an abstract way)
-
contains
(obj_or_pos, name=None, type=None)[source]¶ Determine which zone(s) contain(s) the position pos
If :param obj_or_pos: is a :py:class:morse.core.object.Object, consider the position of the object. Otherwise, assume it is a position. If a :param name: is precised, check only if this specific zone contains the position If a :param type: is precised, only search in the zone of this type.
The method returns the list of zones containing the position, considering the previous limitation
-
is_in
(obj_or_pos, name=None, type=None)[source]¶ Determine if obj_or_pos is in a zone
If :param obj_or_pos: is a :py:class:morse.core.object.Object, consider the position of the object. Otherwise, assume it is a position. If a :param name: is precised, check only if this specific zone contains the position If a :param type: is precised, only search in the zone of this type.
-