Source code for morse.middleware.pocolibs.overlays.rflex_overlay
import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs.actuators.genpos import GenPosPoster
from morse.middleware.pocolibs_datastream import PosterNotFound, DummyPoster
[docs]class RflexModule(MorseOverlay):
def __init__(self, overlaid_object):
# Call the constructor of the parent class
MorseOverlay.__init__(self, overlaid_object)
self._clean_track = False
self._cntrl = DummyPoster("rflexCntrl")
[docs] def interrupt(self):
self.overlaid_object.stop()
if self._clean_track:
self.overlaid_object.input_functions.pop()
MorseOverlay.interrupt(self)
[docs] @service
def InitClient(self, *args):
pass
[docs] @service
def EndClient(self, *args):
pass
[docs] @service
def SetWdogRef(self, *args):
pass
[docs] @service
def GetWdogRef(self, *args):
pass
[docs] @service
def SetMode(self, mode):
self._mode = mode
[docs] @service
def GetMode(self):
return status.SUCCESS, self._mode
[docs] @service
def PomTagging(self, *args):
pass
[docs] @service
def SetPos(self, *args):
pass
[docs] @service
def SetPosFromMEPoster(self, *args):
pass
[docs] @service
def SetVarparams(self, *args):
pass
@async_service
def Stop(self, *args):
self.overlaid_object.stop()
del self.overlaid_object.input_functions[:]
self.completed(status.SUCCESS)
@async_service
def TrackEnd(self, *args):
self.overlaid_object.stop()
self.completed(status.SUCCESS)
@async_service
def GotoSpeed(self, numRef, updatedPeriod, v, vt, w, *args):
self.overlaid_object.set_speed(float(v), float(w))
self.completed(status.SUCCESS)
@interruptible
@async_service
def TrackSpeedStart(self, poster_name):
try:
poster = GenPosPoster(self.overlaid_object,
{ 'poster' : poster_name, 'delay' : False })
except PosterNotFound:
return self.completed(status.FAILED, ["POSTER_NOT_FOUND"])
self._clean_track = True
self.overlaid_object.input_functions.append(poster.default)
[docs] @service
def GetGeoConfig(self, *args):
pass
[docs] @service
def SetGeoConfig(self, *args):
pass
[docs] @service
def SonarOn(self, *args):
pass
[docs] @service
def SonarOff(self, *args):
pass
[docs] @service
def BrakeOn(self, *args):
pass
[docs] @service
def BrakeOff(self, *args):
pass
[docs] @service
def GetJoystick(self, *args):
pass
[docs] @service
def MonitorBattery(self, *args):
pass
[docs] @service
def Gyro(self, *args):
pass
[docs] @service
def GetGyro(self, *args):
pass
[docs] @service
def SetOdometryMethod(self, *args):
pass
[docs] @service
def Log(self, *args):
pass
[docs] @service
def StopLog(self, *args):
pass
[docs] def name(self):
return "rflex"