Source code for morse.middleware.pocolibs.overlays.viam_overlay
import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs_datastream import DummyPoster
[docs]class ViamModule(MorseOverlay):
def __init__(self, overlaid_object):
# Call the constructor of the parent class
MorseOverlay.__init__(self, overlaid_object)
self._cntrl = DummyPoster("viamCntrl")
[docs] def Acquire_cb(self, answer):
status, res = answer
return status, [ self._bench, self._n]
[docs] @service
def Init(self, *args):
pass
[docs] @service
def Reset(self, *args):
pass
[docs] @service
def DriverLoad(self, *args):
pass
[docs] @service
def BusPrint(self, *args):
pass
[docs] @service
def BankCreate(self, *args):
pass
[docs] @service
def CameraCreate(self, *args):
pass
[docs] @service
def BankAddCamera(self, *args):
pass
[docs] @service
def PushGeoFilter(self, *args):
pass
[docs] @service
def UpdateGeoFilter(self, *args):
pass
[docs] @service
def PushLumFilter(self, *args):
pass
[docs] @service
def UpdateLumFilter(self, *args):
pass
[docs] @service
def PushColorFilter(self, *args):
pass
[docs] @service
def UpdateColorFilter(self, *args):
pass
[docs] @service
def PushImProcFilter(self, *args):
pass
[docs] @service
def UpdateImProcFilter(self, *args):
pass
[docs] @service
def PushMiscFilter(self, *args):
pass
[docs] @service
def UpdateMiscFilter(self, *args):
pass
[docs] @service
def FilterList(self, *args):
pass
[docs] @service
def FilterGetCap(self, *args):
pass
[docs] @service
def CameraListHWModes(self, *args):
pass
[docs] @service
def CameraSetHWMode(self, *args):
pass
[docs] @service
def Display(self, *args):
pass
[docs] @service
def Calibrate(self, *args):
pass
[docs] @service
def CalibrationIO(self, *args):
pass
[docs] @service
def Save(self, *args):
pass
@interruptible
@async_service
def Acquire(self, bench, n):
n = int(n)
self._n = n
self._bench = bench
if n == 0:
n = -1
self.overlaid_object.capture(self.chain_callback(self.Acquire_cb), n)
@async_service
def Stop(self, bench):
self.completed(status.SUCCESS)
[docs] def name(self):
return "viam"