Source code for middlewares.ros.topics

#! /usr/bin/env python
"""
This script tests some of the base functionalities of MORSE.
"""

import sys
import math
from morse.testing.ros import RosTestCase
from morse.testing.testing import testlogger

import rospy
import std_msgs
import nav_msgs.msg # do not conflict with morse builder
from geometry_msgs.msg import Twist
from time import sleep

# Include this import to be able to use your test file as a regular 
# builder script, ie, usable with: 'morse [run|exec] base_testing.py
try:
    from morse.builder import *
except ImportError:
    pass

[docs]def send_speed(s, v, w, t): msg = Twist() msg.linear.x = v msg.angular.z = w s.publish(msg) sleep(t) msg.linear.x = 0.0 msg.angular.z = 0.0 s.publish(msg)
[docs]class DataStreamTest(RosTestCase):
[docs] def setUpEnv(self): """ Defines the test scenario, using the Builder API. """ robot = ATRV() odometry = Odometry() robot.append(odometry) odometry.add_stream('ros') motion = MotionVW() robot.append(motion) motion.add_stream('ros') env = Environment('empty', fastmode = True) env.add_service('socket')
[docs] def pose_callback(self, data): self.pos = data
[docs] def test_vw_controller(self): # XXX # test the orientation part, but the easy way is to use numpy or # tf, and don't want to add too much dependency for test rospy.init_node('morse_ros_data_stream_test') rospy.Subscriber('/robot/odometry', nav_msgs.msg.Odometry, self.pose_callback) msg = rospy.client.wait_for_message('/robot/odometry', nav_msgs.msg.Odometry, timeout = 10) self.assertIsNotNone(msg) cmd_stream = rospy.Publisher('/robot/motion', Twist) self.assertTrue(hasattr(self, "pos")) precision=0.15 self.assertAlmostEqual(self.pos.pose.pose.position.x, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.y, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.z, 0.0, delta=precision) # sleep to make sure that the other peer can read it ... sleep(1) send_speed(cmd_stream, 1.0, 0.0, 2.0) sleep(1) self.assertAlmostEqual(self.pos.pose.pose.position.x, 2.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.y, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.z, 0.0, delta=precision) send_speed(cmd_stream, -1.0, 0.0, 2.0) self.assertAlmostEqual(self.pos.pose.pose.position.x, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.y, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.z, 0.0, delta=precision) # sleep to make sure that the other peer can read it ... sleep(1) send_speed(cmd_stream, 1.0, -math.pi/4.0, 2.0) sleep(1) self.assertAlmostEqual(self.pos.pose.pose.position.x, 4.0 / math.pi, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.y, -4.0 / math.pi, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.z, 0.0, delta=precision) # sleep to make sure that the other peer can read it ... sleep(1) send_speed(cmd_stream, 0.5, -math.pi/8.0, 12.0) sleep(1) self.assertAlmostEqual(self.pos.pose.pose.position.x, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.y, 0.0, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.z, 0.0, delta=precision) send_speed(cmd_stream, -2.0, math.pi/2.0, 3.0) self.assertAlmostEqual(self.pos.pose.pose.position.x, 4.0 / math.pi, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.y, -4.0 / math.pi, delta=precision) self.assertAlmostEqual(self.pos.pose.pose.position.z, 0.0, delta=precision) ########################## Run these tests ##########################
if __name__ == "__main__": from morse.testing.testing import main main(DataStreamTest, time_modes = [TimeStrategies.BestEffort])