Source code for morse.middleware.pocolibs.overlays.rflex_overlay

import logging; logger = logging.getLogger("morse." + __name__)
from import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs.actuators.genpos import GenPosPoster
from morse.middleware.pocolibs_datastream import PosterNotFound, DummyPoster

[docs]class RflexModule(MorseOverlay): def __init__(self, overlaid_object): # Call the constructor of the parent class MorseOverlay.__init__(self, overlaid_object) self._clean_track = False self._cntrl = DummyPoster("rflexCntrl")
[docs] def interrupt(self): self.overlaid_object.stop() if self._clean_track: self.overlaid_object.input_functions.pop() MorseOverlay.interrupt(self)
[docs] @service def InitClient(self, *args): pass
[docs] @service def EndClient(self, *args): pass
[docs] @service def SetWdogRef(self, *args): pass
[docs] @service def GetWdogRef(self, *args): pass
[docs] @service def SetMode(self, mode): self._mode = mode
[docs] @service def GetMode(self): return status.SUCCESS, self._mode
[docs] @service def PomTagging(self, *args): pass
[docs] @service def SetPos(self, *args): pass
[docs] @service def SetPosFromMEPoster(self, *args): pass
[docs] @service def SetVarparams(self, *args): pass
@async_service def Stop(self, *args): self.overlaid_object.stop() del self.overlaid_object.input_functions[:] self.completed(status.SUCCESS) @async_service def TrackEnd(self, *args): self.overlaid_object.stop() self.completed(status.SUCCESS) @async_service def GotoSpeed(self, numRef, updatedPeriod, v, vt, w, *args): self.overlaid_object.set_speed(float(v), float(w)) self.completed(status.SUCCESS) @interruptible @async_service def TrackSpeedStart(self, poster_name): try: poster = GenPosPoster(self.overlaid_object, { 'poster' : poster_name, 'delay' : False }) except PosterNotFound: return self.completed(status.FAILED, ["POSTER_NOT_FOUND"]) self._clean_track = True self.overlaid_object.input_functions.append(poster.default)
[docs] @service def GetGeoConfig(self, *args): pass
[docs] @service def SetGeoConfig(self, *args): pass
[docs] @service def SonarOn(self, *args): pass
[docs] @service def SonarOff(self, *args): pass
[docs] @service def BrakeOn(self, *args): pass
[docs] @service def BrakeOff(self, *args): pass
[docs] @service def GetJoystick(self, *args): pass
[docs] @service def MonitorBattery(self, *args): pass
[docs] @service def Gyro(self, *args): pass
[docs] @service def GetGyro(self, *args): pass
[docs] @service def SetOdometryMethod(self, *args): pass
[docs] @service def Log(self, *args): pass
[docs] @service def StopLog(self, *args): pass
[docs] def name(self): return "rflex"