Source code for morse.sensors.depth_camera_aggregator

import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import async_service
import morse.core.sensor
from morse.core import blenderapi
from functools import partial
from morse.helpers.components import add_data, add_property

[docs]class DepthCameraAggregator(morse.core.sensor.Sensor): _name = "Depth Camera Aggregator Unit" add_data('points', 'none', 'memoryview', "List of 3D points from the depth " "camera. memoryview of a set of float(x,y,z). The data is of size " "``(nb_points * 12)`` bytes (12=3*sizeof(float).") add_data('nb_points', 0, 'int', "the number of points found in the " "points list. It must be inferior to cam_width * cam_height") add_property("master_camera", '', 'master_camera', 'string', "the name of master camera : all the points will be expressed " "in the frame of this camera") def __init__(self, obj, parent=None): """ Constructor method. Receives the reference to the Blender object. The second parameter should be the name of the object's parent. """ logger.info('%s initialization' % obj.name) # Call the constructor of the parent class morse.core.sensor.Sensor.__init__(self, obj, parent) # Name of the camera self.camera_name_list = [] # Instance of the camera, will be filled later because as this point, # child component are not yet initialized self.camera_list = [] self.master = None # Create a list of the cameras attached to this component for child in obj.children: # Skip this object if it is not a component # It is most likely just a geometric shape object try: child['Component_Tag'] except KeyError: continue camera_name = child.name # Store only the name of the camera All data from the camera # can be accessed later by using # blenderapi.persistantstorage().componentDict[camera_name], # which will return the instance of the camera object self.camera_name_list.append(camera_name) self.converter = None self.capturing = True logger.info("Depth Camera Aggregator has %d cameras" % len(self.camera_name_list)) logger.info('Component initialized')
[docs] def capture_completion(self, answer): self._expected_answer-= 1 if self._expected_answer == 0: status, res = answer self.completed(status, res)
[docs] def interrupt(self): for camera in self.camera_list: camera_instance = blenderapi.persistantstorage().componentDict[camera] camera_instance.interrupt()
@async_service def capture(self, n): """ The service takes an integer an argument and dispatch the call to all its individual cameras. The service ends when each camera has terminated its work. :param n: the number of call to each individual camera """ self._expected_answer = len(self.camera_list) for camera in self.camera_list: camera.capture(partial(self.capture_completion), n)
[docs] def sensor_to_robot_position_3d(self): return self.master.sensor_to_robot_position_3d()
[docs] def default_action(self): """ Main function of this component. """ # Initialiazation part if self.camera_name_list != [] and self.converter is None: for camera_name in self.camera_name_list: logger.info("Getting instance for %s", camera_name) self.camera_list.append(blenderapi.persistantstorage().componentDict[camera_name]) logger.info("Getting master for %s", self.master_camera) self.master = blenderapi.persistantstorage().componentDict[self.master_camera] max_nb_points = 0 for camera in self.camera_list: max_nb_points += camera.image_width * camera.image_height * 3 from morse.sensors.depthaggregator import Aggregator self.converter = Aggregator(max_nb_points) # Prepare list to send images = [] for i, camera in enumerate(self.camera_list): transfo = self.master.position_3d.transformation3d_with(camera.position_3d) image = camera.pts if image is not None: images.append((transfo.x, transfo.y, transfo.z, transfo.roll, transfo.pitch, transfo.yaw, image)) pts = self.converter.merge(images) self.local_data['points'] = pts; self.local_data['nb_points'] = int(len(pts) / 12)