pymorse

A Python interface to control MORSE, the robotics simulator.

The pymorse library exposes MORSE services and data stream with a friendly Python API.

It uses underneath the MORSE socket API.

Usage

Creating a connection to the simulator

  • Import the pymorse module
  • Create a context with the with statement:
import pymorse

with pymorse.Morse() as simu:
    # ...
    pass

The context manager will take care of properly closing the connection to the simulator. You can also directly create an instance of the pymorse.Morse class, passing the host and/or port of the simulator (defaults to localhost:4000). In this case, you must call pymorse.Morse.close() before leaving.

  • Once created, the context generates proxies for every robots in the scene, and every sensors and actuators for each robot.

First example

Let consider MORSE has been started with the following simulation script:

from morse.builder import *

# Add a robot with a position sensor and a motion controller
r2d2 = ATRV()

pose = Pose()
pose.add_interface('socket')
r2d2.append(pose)

motion = Waypoint()
motion.add_interface('socket')
r2d2.append(motion)


# Environment
env = Environment('land-1/trees')

The following Python program sends a destination to the robot, and prints in background its pose:

import pymorse

def print_pos(pose):
    print("I'm currently at %s" % pose)

with pymorse.Morse() as simu:

    # subscribes to updates from the Pose sensor by passing a callback
    simu.r2d2.pose.subscribe(print_pos)

    # sends a destination
    simu.r2d2.motion.publish({'x' : 10.0, 'y': 5.0, 'z': 0.0,
                              'tolerance' : 0.5,
                              'speed' : 1.0})

    # Leave a couple of millisec to the simulator to start the action
    simu.sleep(0.1)

    # waits until we reach the target
    while simu.r2d2.motion.get_status() != "Arrived":
        simu.sleep(0.5)

    print("Here we are!")

Note

Note that here we use pymorse.Morse.sleep() instead of standard time.sleep, the former allowing to consider the simulation time while the second use ‘system’ time.

Data stream manipulation

Every component (sensor or actuator) exposes a datastream interface, either read-only (for sensors) or write-only (for actuators)

They are accessible by their names, as defined in the simulation script (cf example above).

Streams are Python pymorse.Stream objects. It offers several methods to read data:

  • pymorse.Stream.get(): blocks until new data are available and returns them.
  • pymorse.Stream.last(): returns the last/the n last (if an integer argument is passed) records received, or none/less, if none/less have been received.
  • pymorse.Stream.subscribe() (and pymorse.Stream.unsubscribe()): this method is called with a callback that is triggered everytime incoming data is received.

These methods are demonstrated in the example below:

import pymorse

def printer(data):
    print("Incoming data! " + str(data))

with pymorse.Morse("localhost", 4000) as simu:

    try:
        # Get the 'Pose' sensor datastream
        pose = simu.r2d2.pose

        # Blocks until something is available
        print(pose.get())

        # Asynchronous read: the following line do not block.
        pose.subscribe(printer)

        # Read for 10 sec
        simu.sleep(10)

    except pymorse.MorseServerError as mse:
        print('Oups! An error occured!')
        print(mse)

Writing on actuator’s datastreams is achieved with the pymorse.Stream.publish() method, as illustrated in the first example above.

The data format is always formatted as a JSON dictionary (which means that, currently, binary data like images are not supported). The documentation page of each component specify the exact content of the dictionary.

Services

Some components export RPC services. Please refer to the components’ documentation for details.

These services can be accessed from pymorse, and mostly look like regular methods:

import pymorse

with pymorse.Morse() as simu:

    # Invokes the get_status service from the 'Waypoints' actuator
    print(simu.r2d2.motion.get_status())

However, these call are asynchronous: a call to simu.r2d2.motion.get_status() does not block, and returns instead a future. See the concurrent.futures API to learn more about futures.

Non-blocking call are useful for long lasting services, like in the example below:

import pymorse

def done(evt):
    print("We have reached our destination!")

with pymorse.Morse() as simu:

    # Start the motion. It may take several seconds before finishing
    # The line below is however non-blocking
    goto_action = simu.r2d2.motion.goto(2.5, 0, 0)

    # Register a callback to know when the action is done
    goto_action.add_done_callback(done)

    print("Am I currently moving? %s" % goto_action.running())

    while goto_action.running():
        simu.sleep(0.5)

Use the cancel method on the future returned by the RPC call to abort the service.

To actually wait for a result, call the result method on the future:

import pymorse

with pymorse.Morse() as simu:

    status = simu.r2d2.motion.get_status().result()

    if status == "Arrived":
        print("Here we are")

However, for certain common cases (printing or comparing the return value), the result() method is automatically called. Thus, the previous code sample can be rewritten:

import pymorse

with pymorse.Morse() as simu:

    if simu.r2d2.motion.get_status() == "Arrived":
        print("Here we are")

Simulator control

Several services are available to control the overall behaviour of the simulator.

The whole list of such services is here: Supervision services.

For instance, you can stop the simulator (MORSE will quit) with pymorse.quit(), and reset it with pymorse.Morse.reset().

These methods are demonstrated in the example below:

import pymorse

with pymorse.Morse("localhost", 4000) as simu:

    try:
        print(simu.robots)
        simu.quit()

    except pymorse.MorseServerError as mse:
        print('Oups! An error occured!')
        print(mse)

Logging

This library use the standard Python logging mechanism. You can retrieve pymorse log messages through the “pymorse” logger.

The complete example below shows how to retrieve the logger and how to configure it to print debug messages on the console.

import logging
import pymorse

logger = logging.getLogger("pymorse")

console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)-15s %(name)s: %(levelname)s - %(message)s')
console.setFormatter(formatter)

logger.addHandler(console)

with pymorse.Morse("localhost", 4000) as simu:

    try:
        print(simu.robots)

        simu.quit()

    except pymorse.MorseServerError as mse:
        print('Oups! An error occured!')
        print(mse)
class Component(morse, name, fqn, stream=None, port=None, services=[])[source]

Bases: object

close()[source]
lazy_init()[source]
class Morse(host='localhost', port=4000)[source]

Bases: object

activate(cmpnt)[source]
cancel(service_id)[source]

Send a cancelation request for an existing (running) service.

If the service is not running or does not exist, the request is ignored.

close(cancel_async_services=False, wait_publishers=True)[source]
deactivate(cmpnt)[source]
get_publisher_streams()[source]
get_stream_port(stream)[source]
initialize_api()[source]

This method asks MORSE for the scene structure, and dynamically creates corresponding objects in ‘self’.

is_up()[source]
poll_thread = None
quit()[source]
reset()[source]
rpc(component, service, *args)[source]

Calls a service from the simulator.

The call will block until a response from the simulator is received.

Parameters:
  • component – the component that expose the service (like a robot name)
  • service – the name of the service
  • args... – (variadic) each service parameter, as a separate argument
rpc_t(timeout, component, service, *args)[source]
sleep(time)[source]

Wait for time second.

Time may be a float. Contrary to time.sleep, this method consider the simulated time.

spin()[source]
streams()[source]
time()[source]

Return the simulated time, in seconds, since Epoch

The precision of the value depends on the underlying python precision of time, and the frequency of simulator

exception MorseServiceError[source]

Bases: Exception

Morse Service Exception thrown when unknown error

exception MorseServiceFailed[source]

Bases: Exception

Morse Service Exception thrown when failed error

exception MorseServicePreempted[source]

Bases: Exception

Morse Service Exception thrown when preempted error

class ResponseCallback(req_id)[source]

Bases: object

callback(res_bytes)[source]
cancel_all()[source]
class Robot(morse, name, fqn, services=[])[source]

Bases: dict, pymorse.pymorse.Component

normalize_name(name)[source]

Normalize Blender names to get valid Python identifiers

parse_response(raw)[source]
rpc_get_result(response)[source]