Source code for morse.middleware.pocolibs.overlays.viam_overlay

import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs_datastream import DummyPoster

[docs]class ViamModule(MorseOverlay): def __init__(self, overlaid_object): # Call the constructor of the parent class MorseOverlay.__init__(self, overlaid_object) self._cntrl = DummyPoster("viamCntrl")
[docs] def Acquire_cb(self, answer): status, res = answer return status, [ self._bench, self._n]
@service
[docs] def Init(self, *args): pass
@service
[docs] def Configure(self, *args): pass
@service
[docs] def Reset(self, *args): pass
@service
[docs] def DriverLoad(self, *args): pass
@service
[docs] def BusPrint(self, *args): pass
@service
[docs] def BankCreate(self, *args): pass
@service
[docs] def CameraCreate(self, *args): pass
@service
[docs] def BankAddCamera(self, *args): pass
@service
[docs] def PushFormatFilter(self, *args): pass
@service
[docs] def UpdateFormatFilter(self, *args): pass
@service
[docs] def PushGeoFilter(self, *args): pass
@service
[docs] def UpdateGeoFilter(self, *args): pass
@service
[docs] def PushLumFilter(self, *args): pass
@service
[docs] def UpdateLumFilter(self, *args): pass
@service
[docs] def PushColorFilter(self, *args): pass
@service
[docs] def UpdateColorFilter(self, *args): pass
@service
[docs] def PushImProcFilter(self, *args): pass
@service
[docs] def UpdateImProcFilter(self, *args): pass
@service
[docs] def PushMiscFilter(self, *args): pass
@service
[docs] def UpdateMiscFilter(self, *args): pass
@service
[docs] def FilterList(self, *args): pass
@service
[docs] def FilterGetCap(self, *args): pass
@service
[docs] def CameraListHWModes(self, *args): pass
@service
[docs] def CameraSetHWMode(self, *args): pass
@service
[docs] def Display(self, *args): pass
@service
[docs] def Calibrate(self, *args): pass
@service
[docs] def CalibrationIO(self, *args): pass
@service
[docs] def Save(self, *args): pass
@interruptible @async_service
[docs] def Acquire(self, bench, n): n = int(n) self._n = n self._bench = bench if n == 0: n = -1 self.overlaid_object.capture(self.chain_callback(self.Acquire_cb), n)
@async_service
[docs] def Stop(self, bench): self.completed(status.SUCCESS)
[docs] def name(self): return "viam"