import logging
#testrunnerlogger = logging.getLogger("test.runner")
testlogger = logging.getLogger("morsetesting.general")
import sys, os, platform
from abc import ABCMeta, abstractmethod
import unittest
import inspect
import tempfile
from time import sleep
import threading # Used to be able to timeout when waiting for Blender initialization
import subprocess
import signal
from morse.testing.exceptions import MorseTestingError
from morse.core.morse_time import TimeStrategies
BLENDER_INITIALIZATION_TIMEOUT = 15 # seconds
MODE_INDEX = 0
CURRENT_TIME_MODE = None
ALL_TIME_MODES = None
INITIALIZED_LOGGER = False
[docs]class MorseTestRunner(unittest.TextTestRunner):
[docs] def setup_logging(self):
global INITIALIZED_LOGGER
logger = logging.getLogger('morsetesting')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(asctime)s (%(levelname)s)] %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
INITIALIZED_LOGGER = True
[docs] def run(self, suite):
global INITIALIZED_LOGGER
if not INITIALIZED_LOGGER:
self.setup_logging()
return unittest.TextTestRunner.run(self, suite)
[docs]def follow(file):
""" Really emulate tail -f
See http://stackoverflow.com/questions/1475950/tail-f-in-python-with-no-time-sleep
for a detailled discussion on the subject
"""
while True:
line = file.readline()
if not line:
sleep(0.1) # Sleep briefly
continue
yield line
[docs]class MorseSwitchTimeMode(unittest.TestCase):
[docs] def test_switch(self):
global ALL_TIME_MODES
global CURRENT_TIME_MODE
global MODE_INDEX
CURRENT_TIME_MODE = ALL_TIME_MODES[MODE_INDEX]
MODE_INDEX += 1
[docs]class MorseTestCase(unittest.TestCase):
# Make this an abstract class
__metaclass__ = ABCMeta
[docs] def setUpMw(self):
""" This method can be overloaded by subclasses to define
environment setup, before the launching of the Morse environment
pass
"""
pass
def _checkMorseException(self):
""" Check in the Morse output if some python error happens"""
with open(self.logfile_name) as log:
lines = follow(log)
for line in lines:
# Python Error Case
if "[ERROR][MORSE]" in line:
testlogger.error(line.strip())
testlogger.error("Exception detected in Morse execution : "
"see %s for details."
" Exiting the current test !" % self.logfile_name)
os.kill(os.getpid(), signal.SIGINT)
return
# End of simulation, exit the thread
if "EXITING SIMULATION" in line:
return
[docs] def setUp(self):
testlogger.info("Starting test " + self.id() + " in " + TimeStrategies.human_repr(CURRENT_TIME_MODE))
self.logfile_name = self.__class__.__name__ + ".log"
# Wait for a second
# to wait for ports open in previous tests to be closed
sleep(1)
self.morse_initialized = False
self.setUpMw()
self.startmorse(self)
self.t = threading.Thread(target=self._checkMorseException)
self.t.start()
[docs] def tearDownMw(self):
""" This method can be overloaded by subclasses to clean up
environment setup
"""
pass
[docs] def tearDown(self):
self.stopmorse()
self.tearDownMw()
self.logfile.close() # force to flush
self.t.join()
[docs] @abstractmethod
def setUpEnv(self):
""" This method must be overloaded by subclasses to define a
simulation environment.
The code must follow the :doc:`Builder API <morse/dev/builder>`
convention (without the import of the `morsebuilder` module which
is automatically added).
"""
pass
[docs] def wait_initialization(self):
""" Wait until Morse is initialized """
testlogger.info("Waiting for MORSE to initialize... (timeout: %s sec)" % \
BLENDER_INITIALIZATION_TIMEOUT)
with open(self.logfile_name) as log:
lines = follow(log)
for line in lines:
if ("[ERROR][MORSE]" in line) or ("INITIALIZATION ERROR" in line):
testlogger.error("Error during MORSE initialization! Check "
"the log file.")
return
if "SCENE INITIALIZED" in line:
self.morse_initialized = True
return
[docs] def run(self, result=None):
""" Overwrite unittest.TestCase::run
Detect KeyBoardInterrupt exception , due to user or a SIGINIT In
particular, it can happen if we detect an exception in the Morse
execution. In this case, clean up correctly the environnement.
"""
try:
return unittest.TestCase.run(self, result)
except KeyboardInterrupt as e:
self.tearDownMw()
if self.pid:
os.kill(self.pid, signal.SIGKILL)
if result:
result.addError(self, sys.exc_info())
def _extract_pid(self):
""" Extract the pid from the log file.
We can not simply rely on Popen.subprocess.pid because we need the PID of the
Blender process itself, not the (Python) MORSE process.
"""
with open(self.logfile_name) as log:
for line in log:
if "PID" in line:
words = line.split()
return int(words[-1])
[docs] def startmorse(self, test_case):
""" This starts MORSE in a new process, passing the script itself as parameter (to
build the scene via the Builder API).
"""
temp_builder_script = self.generate_builder_script(test_case)
try:
original_script_name = os.path.abspath(inspect.stack()[-1][1])
try:
prefix = os.environ['MORSE_ROOT']
except KeyError:
prefix=""
if prefix == "":
cmd = 'morse'
elif platform.system() == 'Windows':
cmd = os.path.join(prefix.strip('\"').strip('\''), "bin", "morserun.py")
else:
cmd = prefix + "/bin/morse"
self.logfile = open(self.logfile_name, 'w')
if platform.system() == 'Windows':
self.morse_process = subprocess.Popen(['python', cmd, 'run', temp_builder_script], stdout=self.logfile, stderr=subprocess.STDOUT)
else:
self.morse_process = subprocess.Popen([cmd, 'run', temp_builder_script], stdout=self.logfile, stderr=subprocess.STDOUT)
except OSError as ose:
testlogger.error("Error while launching MORSE! Check you can run it from command-line\n" + \
" and if you use the $MORSE_ROOT env variable, check it points to a correct " + \
" place!")
raise ose
t = threading.Thread(target=self.wait_initialization)
t.start()
t.join(BLENDER_INITIALIZATION_TIMEOUT)
if self.morse_initialized:
self.pid = self._extract_pid()
testlogger.info("MORSE successfully initialized with PID %s" % self.pid)
else:
self.morse_process.terminate()
raise MorseTestingError("MORSE did not start successfully! Check %s "
"for details." % self.logfile_name)
[docs] def stopmorse(self):
""" Cleanly stop MORSE
"""
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(("localhost", 4000))
sock.send(b"id1 simulation quit\n")
except (socket.error, KeyboardInterrupt):
sock.close()
sock = None
testlogger.info("MORSE crashed")
if sock:
sock.close()
with open(self.logfile_name) as log:
lines = follow(log)
for line in lines:
if "EXITING SIMULATION" in line:
return
if self.pid:
os.kill(self.pid, signal.SIGKILL)
testlogger.info("MORSE stopped")
[docs] def generate_builder_script(self, test_case):
tmp_name = ""
# We need to generate a temp builder file in case of running
# several test cases with different environment:
# Blender must be restarted and called again with the right
# environment.
with tempfile.NamedTemporaryFile(delete = False) as tmp:
tmp.write(b"from morse.builder import *\n")
tmp.write(b"from morse.builder.actuators import *\n")
tmp.write(b"from morse.builder.sensors import *\n")
tmp.write(b"from morse.builder.blenderobjects import *\n")
tmp.write(b"class MyEnv():\n")
tmp.write(inspect.getsource(test_case.setUpEnv).encode())
tmp.write(b" env.set_time_strategy(")
tmp.write(TimeStrategies.python_repr(CURRENT_TIME_MODE))
tmp.write(b")\n")
tmp.write(b"MyEnv().setUpEnv()\n")
tmp.flush()
tmp_name = tmp.name
testlogger.info("Created a temporary builder file for test-case " +\
test_case.__class__.__name__)
return tmp_name
[docs]class MorseMoveTestCase(MorseTestCase):
""" This subclass of MorseTestCase can be used to check for moving
actuator, basically by testing a complete pose
This class assumes lot of stuff to work properly:
- the tested robot is called 'robot'
- the pose sensor is called 'robot.pose'
- if assertAlmostEqualPositionThenFix, it assumes there is a
- a 'robot.motion' actuator (basically what we test)
- a 'robot.teleport' actuator (to move to a specific
situation)
"""
[docs] def assertAlmostEqualPosition(self, morse, tested_pos, precision):
"""
Test against a position, presented as an array of 6 double (x,
y, z, yaw, pitch, roll)
"""
pose = morse.robot.pose.get()
self.assertAlmostEqual(pose['x'], tested_pos[0], delta=precision)
self.assertAlmostEqual(pose['y'], tested_pos[1], delta=precision)
self.assertAlmostEqual(pose['z'], tested_pos[2], delta=precision)
self.assertAlmostEqual(pose['yaw'], tested_pos[3], delta=precision)
self.assertAlmostEqual(pose['pitch'], tested_pos[4], delta=precision)
self.assertAlmostEqual(pose['roll'], tested_pos[5], delta=precision)
[docs] def assertAlmostEqualPositionThenFix(self, morse, tested_pos, precision):
"""
Test against a position, presented as an array of 6 double (x,
y, z, yaw, pitch, roll). When done, move the robot to this
specific position. It allows to reduce the cumulated error
between the different part of the test
"""
self.assertAlmostEqualPosition(morse, tested_pos, precision)
morse.deactivate('robot.motion')
morse.activate('robot.teleport')
# +0.01 on z to be a bit higher than the ground
morse.robot.teleport.publish(
{'x': tested_pos[0],
'y': tested_pos[1],
'z': tested_pos[2] + 0.01,
'yaw': tested_pos[3],
'pitch': tested_pos[4],
'roll': tested_pos[5]})
morse.sleep(0.1)
morse.deactivate('robot.teleport')
morse.activate('robot.motion')
morse.sleep(0.1)
[docs]class MorseBuilderFailureTestCase(MorseTestCase):
""" This subclass of MorseTestCase can be used to test MORSE handles
properly ill-constructed Builder scripts.
It will *fail* if the Blender Game Engine get started.
"""
# Make this an abstract class
__metaclass__ = ABCMeta
[docs] def wait_initialization(self):
""" Wait until Morse is initialized """
testlogger.info("Waiting for MORSE to parse the scene... (timeout: %s sec)" % \
BLENDER_INITIALIZATION_TIMEOUT)
# we assume we will correctly detect Builder script issue, so wait_initialization
# 'succeed'.
self.morse_initialized = True
with open(self.logfile_name) as log:
lines = follow(log)
for line in lines:
if "Blender Game Engine Started" in line:
testlogger.error("Blender Game Engine started!"
" This is not expected."
" See %s for details." % self.logfile_name)
os.kill(os.getpid(), signal.SIGINT)
return
elif "[ERROR][MORSE]" in line:
# use 'info' since we suppose to get this error
testlogger.info("MORSE initialization error: %s"%line.strip())
return
def _checkMorseException(self):
return
[docs]def main(*args, **kwargs):
test_cases = args
time_modes = kwargs.get('time_modes', [TimeStrategies.BestEffort, TimeStrategies.FixedSimulationStep])
import sys
if sys.argv[0].endswith('blender'):
# If we arrive here from within MORSE, we have probably run
# morse [exec|run] my_test.py
# If this case, simply build the environment based on the
# setUpEnv of the first test.
for test_class in test_cases:
test_class().setUpEnv()
return
import unittest
suite = unittest.TestSuite()
loader = unittest.TestLoader()
global ALL_TIME_MODES
ALL_TIME_MODES = time_modes
tests = None
switch = loader.loadTestsFromTestCase(MorseSwitchTimeMode)
for test_class in test_cases:
tests = loader.loadTestsFromTestCase(test_class)
for time_mode in time_modes:
suite.addTests(switch)
suite.addTests(tests)
sys.exit(not MorseTestRunner().run(suite).wasSuccessful())