Source code for morse.middleware.pocolibs.overlays.viam_overlay

import logging; logger = logging.getLogger("morse." + __name__)
from import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs_datastream import DummyPoster

[docs]class ViamModule(MorseOverlay): def __init__(self, overlaid_object): # Call the constructor of the parent class MorseOverlay.__init__(self, overlaid_object) self._cntrl = DummyPoster("viamCntrl")
[docs] def Acquire_cb(self, answer): status, res = answer return status, [ self._bench, self._n]
[docs] @service def Init(self, *args): pass
[docs] @service def Configure(self, *args): pass
[docs] @service def Reset(self, *args): pass
[docs] @service def DriverLoad(self, *args): pass
[docs] @service def BusPrint(self, *args): pass
[docs] @service def BankCreate(self, *args): pass
[docs] @service def CameraCreate(self, *args): pass
[docs] @service def BankAddCamera(self, *args): pass
[docs] @service def PushFormatFilter(self, *args): pass
[docs] @service def UpdateFormatFilter(self, *args): pass
[docs] @service def PushGeoFilter(self, *args): pass
[docs] @service def UpdateGeoFilter(self, *args): pass
[docs] @service def PushLumFilter(self, *args): pass
[docs] @service def UpdateLumFilter(self, *args): pass
[docs] @service def PushColorFilter(self, *args): pass
[docs] @service def UpdateColorFilter(self, *args): pass
[docs] @service def PushImProcFilter(self, *args): pass
[docs] @service def UpdateImProcFilter(self, *args): pass
[docs] @service def PushMiscFilter(self, *args): pass
[docs] @service def UpdateMiscFilter(self, *args): pass
[docs] @service def FilterList(self, *args): pass
[docs] @service def FilterGetCap(self, *args): pass
[docs] @service def CameraListHWModes(self, *args): pass
[docs] @service def CameraSetHWMode(self, *args): pass
[docs] @service def Display(self, *args): pass
[docs] @service def Calibrate(self, *args): pass
[docs] @service def CalibrationIO(self, *args): pass
[docs] @service def Save(self, *args): pass
@interruptible @async_service def Acquire(self, bench, n): n = int(n) self._n = n self._bench = bench if n == 0: n = -1 self.overlaid_object.capture(self.chain_callback(self.Acquire_cb), n) @async_service def Stop(self, bench): self.completed(status.SUCCESS)
[docs] def name(self): return "viam"