Source code for morse.middleware.pocolibs.overlays.rflex_overlay

import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs.actuators.genpos import GenPosPoster
from morse.middleware.pocolibs_datastream import PosterNotFound, DummyPoster

[docs]class RflexModule(MorseOverlay): def __init__(self, overlaid_object): # Call the constructor of the parent class MorseOverlay.__init__(self, overlaid_object) self._clean_track = False self._cntrl = DummyPoster("rflexCntrl")
[docs] def interrupt(self): self.overlaid_object.stop() if self._clean_track: self.overlaid_object.input_functions.pop() MorseOverlay.interrupt(self)
@service
[docs] def InitClient(self, *args): pass
@service
[docs] def EndClient(self, *args): pass
@service
[docs] def SetWdogRef(self, *args): pass
@service
[docs] def GetWdogRef(self, *args): pass
@service
[docs] def SetMode(self, mode): self._mode = mode
@service
[docs] def GetMode(self): return status.SUCCESS, self._mode
@service
[docs] def PomTagging(self, *args): pass
@service
[docs] def SetPos(self, *args): pass
@service
[docs] def SetPosFromMEPoster(self, *args): pass
@service
[docs] def SetVarparams(self, *args): pass
@async_service
[docs] def Stop(self, *args): self.overlaid_object.stop() del self.overlaid_object.input_functions[:] self.completed(status.SUCCESS)
@async_service
[docs] def TrackEnd(self, *args): self.overlaid_object.stop() self.completed(status.SUCCESS)
@async_service
[docs] def GotoSpeed(self, numRef, updatedPeriod, v, vt, w, *args): self.overlaid_object.set_speed(float(v), float(w)) self.completed(status.SUCCESS)
@interruptible @async_service
[docs] def TrackSpeedStart(self, poster_name): try: poster = GenPosPoster(self.overlaid_object, { 'poster' : poster_name, 'delay' : False }) except PosterNotFound: return self.completed(status.FAILED, ["POSTER_NOT_FOUND"]) self._clean_track = True self.overlaid_object.input_functions.append(poster.default)
@service
[docs] def GetGeoConfig(self, *args): pass
@service
[docs] def SetGeoConfig(self, *args): pass
@service
[docs] def SonarOn(self, *args): pass
@service
[docs] def SonarOff(self, *args): pass
@service
[docs] def BrakeOn(self, *args): pass
@service
[docs] def BrakeOff(self, *args): pass
@service
[docs] def GetJoystick(self, *args): pass
@service
[docs] def MonitorBattery(self, *args): pass
@service
[docs] def Gyro(self, *args): pass
@service
[docs] def GetGyro(self, *args): pass
@service
[docs] def SetOdometryMethod(self, *args): pass
@service
[docs] def Log(self, *args): pass
@service
[docs] def StopLog(self, *args): pass
[docs] def name(self): return "rflex"