Source code for middlewares.yarp.yarp_datastream_testing

#! /usr/bin/env python
"""
This script tests some of the base functionalities of MORSE.
"""

import sys
import math
import subprocess
import yarp
from morse.testing.testing import MorseTestCase
from pymorse import Morse

# Include this import to be able to use your test file as a regular 
# builder script, ie, usable with: 'morse [run|exec] base_testing.py
try:
    from morse.builder import *
except ImportError:
    pass

[docs]def send_speed_(s, v, w): b = s.prepare() b.clear() b.addDouble(v) b.addDouble(w) s.write(True)
[docs]def send_speed(s, morse, v, w, t): send_speed_(s, v, w) morse.sleep(t) send_speed_(s, 0.0, 0.0)
[docs]class YARP_MW_Test(MorseTestCase):
[docs] def setUpMw(self): print("Launching yarpserver3") try: self.yarp_process = subprocess.Popen(['yarpserver3']) except OSError as ose: testlogger.error("Error while launching yarpserver3 ! Check you can run it from command-line\n") raise ose
[docs] def tearDownMw(self): self.yarp_process.terminate()
[docs] def setUpEnv(self): """ Defines the test scenario, using the Builder API. """ robot = ATRV() pose = Pose() pose.translate(z=-0.10) robot.append(pose) pose.add_stream('yarp') motion = MotionVW() robot.append(motion) motion.add_stream('yarp') env = Environment('empty', fastmode = True) env.add_service('socket')
[docs] def test_vw_controller(self): """ This test is guaranteed to be started only when the simulator is ready. """ pass yarp.Network.init() pose_stream = yarp.BufferedPortBottle() pose_stream.open("/morse/test/pose/in") yarp.Network.connect("/morse/robot/pose/out", "/morse/test/pose/in") cmd_stream = yarp.BufferedPortBottle() cmd_stream.open("/morse/test/vw/out") yarp.Network.connect("/morse/test/vw/out", "/morse/robot/motion/in") with Morse() as morse: # Read the start position, it must be (0.0, 0.0, 0.0) precision = 0.20 pose = pose_stream.read() for i in range(1, 7): self.assertAlmostEqual(pose.get(i).asDouble(), 0.0, delta = precision) send_speed(cmd_stream, morse, 1.0, 0.0, 2.0) pose = pose_stream.read() self.assertAlmostEqual(pose.get(1).asDouble(), 2.0, delta = precision) self.assertAlmostEqual(pose.get(2).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(3).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(4).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(5).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(6).asDouble(), 0.0, delta = precision) send_speed(cmd_stream, morse, -1.0, 0.0, 2.0) pose = pose_stream.read() for i in range(1, 7): self.assertAlmostEqual(pose.get(i).asDouble(), 0.0, delta = precision) send_speed(cmd_stream, morse, 1.0, -math.pi/4.0, 2.0) pose = pose_stream.read() self.assertAlmostEqual(pose.get(1).asDouble(), 4.0 / math.pi, delta = precision) self.assertAlmostEqual(pose.get(2).asDouble(), -4.0 / math.pi , delta = precision) self.assertAlmostEqual(pose.get(3).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(4).asDouble(), -math.pi/2.0, delta = precision) self.assertAlmostEqual(pose.get(5).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(6).asDouble(), 0.0, delta = precision) send_speed(cmd_stream, morse, 0.5, -math.pi/8.0, 12.0) pose = pose_stream.read() for i in range(1, 7): self.assertAlmostEqual(pose.get(i).asDouble(), 0.0, delta = precision) send_speed(cmd_stream, morse, -2.0, math.pi/2.0, 3.0) pose = pose_stream.read() self.assertAlmostEqual(pose.get(1).asDouble(), 4.0 / math.pi, delta = precision) self.assertAlmostEqual(pose.get(2).asDouble(), -4.0 / math.pi , delta = precision) self.assertAlmostEqual(pose.get(3).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(4).asDouble(), -math.pi/2.0, delta = precision) self.assertAlmostEqual(pose.get(5).asDouble(), 0.0, delta = precision) self.assertAlmostEqual(pose.get(6).asDouble(), 0.0, delta = precision) yarp.Network.fini() ########################## Run these tests ##########################
if __name__ == "__main__": from morse.testing.testing import main main(YARP_MW_Test)