Source code for morse.middleware.ros.static_tf
import logging; logger = logging.getLogger("morse." + __name__)
from morse.middleware.ros import ROSPublisherTF
from morse.middleware.ros.tfMessage import tfMessage
import rospy
[docs]class StaticTFPublisher(ROSPublisherTF):
""" Publish the (static) transform between robot and this sensor/actuator with 1Hz. """
last_ts = 0
child_frame_id = None
parent_frame_id = None
[docs] def initialize(self):
ROSPublisherTF.initialize(self)
self.child_frame_id = self.kwargs.get('child_frame_id', self.frame_id)
self.parent_frame_id = self.kwargs.get('parent_frame_id', 'base_link')
logger.info("Initialized the ROS static TF publisher with frame_id '%s' " + \
"and child_frame_id '%s'", self.parent_frame_id, self.child_frame_id)
[docs] def finalize(self):
ROSPublisherTF.finalize(self)
[docs] def default(self, ci='unused'):
# publish with 1Hz
if self.data['timestamp'] > self.last_ts + 1.0:
# date timestamp forward, just like for static transform publisher
ts = rospy.Time.from_sec(self.data['timestamp'] + 1.0)
self.send_transform_robot(ts, self.child_frame_id, self.parent_frame_id)
self.last_ts = self.data['timestamp']
[docs]class StaticTF2Publisher(StaticTFPublisher):
""" Publish the (static) transform between robot and this sensor/actuator, using a latched
topic (TF2 convention).
"""
topic_tf_static = None
init_tr = False
[docs] def initialize(self):
StaticTFPublisher.initialize(self)
if not StaticTF2Publisher.topic_tf_static:
StaticTF2Publisher.topic_tf_static = \
rospy.Publisher("/tf_static", tfMessage, queue_size=1, latch=True)
[docs] def default(self, ci='unused'):
if not self.init_tr and ('valid' not in self.data or self.data['valid']):
ts = rospy.Time.from_sec(0)
self.send_transform_robot(ts, self.child_frame_id, self.parent_frame_id)
self.init_tr = True
# TF publish method
[docs] def publish_tf(self, message):
""" Publish the TF data on the rostopic
"""
StaticTF2Publisher.topic_tf_static.publish(message)