pymorse¶
A Python interface to control MORSE, the robotics simulator.
The pymorse
library exposes MORSE services and data stream with a
friendly Python API.
It uses underneath the MORSE socket API.
Usage¶
Creating a connection to the simulator¶
- Import the
pymorse
module - Create a context with the
with
statement:
import pymorse
with pymorse.Morse() as simu:
# ...
pass
The context manager will take care of properly closing the connection to the
simulator. You can also directly create an instance of the
pymorse.Morse
class, passing the host and/or port of the
simulator (defaults to localhost:4000). In this case, you must call
pymorse.Morse.close()
before leaving.
- Once created, the context generates proxies for every robots in the scene, and every sensors and actuators for each robot.
First example¶
Let consider MORSE has been started with the following simulation script:
from morse.builder import *
# Add a robot with a position sensor and a motion controller
r2d2 = ATRV()
pose = Pose()
pose.add_interface('socket')
r2d2.append(pose)
motion = Waypoint()
motion.add_interface('socket')
r2d2.append(motion)
# Environment
env = Environment('land-1/trees')
The following Python program sends a destination to the robot, and prints in background its pose:
import pymorse
def print_pos(pose):
print("I'm currently at %s" % pose)
with pymorse.Morse() as simu:
# subscribes to updates from the Pose sensor by passing a callback
simu.r2d2.pose.subscribe(print_pos)
# sends a destination
simu.r2d2.motion.publish({'x' : 10.0, 'y': 5.0, 'z': 0.0,
'tolerance' : 0.5,
'speed' : 1.0})
# Leave a couple of millisec to the simulator to start the action
simu.sleep(0.1)
# waits until we reach the target
while simu.r2d2.motion.get_status() != "Arrived":
simu.sleep(0.5)
print("Here we are!")
Note
Note that here we use pymorse.Morse.sleep()
instead of standard
time.sleep
, the former allowing to consider the simulation time
while the second use ‘system’ time.
Data stream manipulation¶
Every component (sensor or actuator) exposes a datastream interface, either read-only (for sensors) or write-only (for actuators)
They are accessible by their names, as defined in the simulation script (cf example above).
Streams are Python pymorse.Stream
objects. It offers several
methods to read data:
pymorse.Stream.get()
: blocks until new data are available and returns them.pymorse.Stream.last()
: returns the last/the n last (if an integer argument is passed) records received, or none/less, if none/less have been received.pymorse.Stream.subscribe()
(andpymorse.Stream.unsubscribe()
): this method is called with a callback that is triggered everytime incoming data is received.
These methods are demonstrated in the example below:
import pymorse
def printer(data):
print("Incoming data! " + str(data))
with pymorse.Morse("localhost", 4000) as simu:
try:
# Get the 'Pose' sensor datastream
pose = simu.r2d2.pose
# Blocks until something is available
print(pose.get())
# Asynchronous read: the following line do not block.
pose.subscribe(printer)
# Read for 10 sec
simu.sleep(10)
except pymorse.MorseServerError as mse:
print('Oups! An error occured!')
print(mse)
Writing on actuator’s datastreams is achieved with the
pymorse.Stream.publish()
method, as illustrated in the first example
above.
The data format is always formatted as a JSON dictionary (which means that, currently, binary data like images are not supported). The documentation page of each component specify the exact content of the dictionary.
Services¶
Some components export RPC services. Please refer to the components’ documentation for details.
These services can be accessed from pymorse, and mostly look like regular methods:
import pymorse
with pymorse.Morse() as simu:
# Invokes the get_status service from the 'Waypoints' actuator
print(simu.r2d2.motion.get_status())
However, these call are asynchronous: a call to simu.r2d2.motion.get_status() does not block, and returns instead a future. See the concurrent.futures API to learn more about futures.
Non-blocking call are useful for long lasting services, like in the example below:
import pymorse
def done(evt):
print("We have reached our destination!")
with pymorse.Morse() as simu:
# Start the motion. It may take several seconds before finishing
# The line below is however non-blocking
goto_action = simu.r2d2.motion.goto(2.5, 0, 0)
# Register a callback to know when the action is done
goto_action.add_done_callback(done)
print("Am I currently moving? %s" % goto_action.running())
while goto_action.running():
simu.sleep(0.5)
Use the cancel method on the future returned by the RPC call to abort the service.
To actually wait for a result, call the result method on the future:
import pymorse
with pymorse.Morse() as simu:
status = simu.r2d2.motion.get_status().result()
if status == "Arrived":
print("Here we are")
However, for certain common cases (printing or comparing the return value), the result() method is automatically called. Thus, the previous code sample can be rewritten:
import pymorse
with pymorse.Morse() as simu:
if simu.r2d2.motion.get_status() == "Arrived":
print("Here we are")
Simulator control¶
Several services are available to control the overall behaviour of the simulator.
The whole list of such services is here: Supervision services.
For instance, you can stop the simulator (MORSE will quit) with
pymorse.quit()
, and reset it with pymorse.Morse.reset()
.
These methods are demonstrated in the example below:
import pymorse
with pymorse.Morse("localhost", 4000) as simu:
try:
print(simu.robots)
simu.quit()
except pymorse.MorseServerError as mse:
print('Oups! An error occured!')
print(mse)
Logging¶
This library use the standard Python logging mechanism. You can retrieve pymorse log messages through the “pymorse” logger.
The complete example below shows how to retrieve the logger and how to configure it to print debug messages on the console.
import logging
import pymorse
logger = logging.getLogger("pymorse")
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)-15s %(name)s: %(levelname)s - %(message)s')
console.setFormatter(formatter)
logger.addHandler(console)
with pymorse.Morse("localhost", 4000) as simu:
try:
print(simu.robots)
simu.quit()
except pymorse.MorseServerError as mse:
print('Oups! An error occured!')
print(mse)
-
class
Morse
(host='localhost', port=4000)[source]¶ Bases:
object
-
cancel
(service_id)[source]¶ Send a cancelation request for an existing (running) service.
If the service is not running or does not exist, the request is ignored.
-
initialize_api
()[source]¶ This method asks MORSE for the scene structure, and dynamically creates corresponding objects in ‘self’.
-
poll_thread
= None¶
-
rpc
(component, service, *args)[source]¶ Calls a service from the simulator.
The call will block until a response from the simulator is received.
Parameters: - component – the component that expose the service (like a robot name)
- service – the name of the service
- args... – (variadic) each service parameter, as a separate argument
-
-
exception
MorseServiceError
[source]¶ Bases:
Exception
Morse Service Exception thrown when unknown error
-
exception
MorseServiceFailed
[source]¶ Bases:
Exception
Morse Service Exception thrown when failed error
-
exception
MorseServicePreempted
[source]¶ Bases:
Exception
Morse Service Exception thrown when preempted error
-
class
Robot
(morse, name, fqn, services=[])[source]¶ Bases:
dict
,pymorse.pymorse.Component