"""
Provide an implementation of MessageBuffer as used by CERTI
"""
import sys
import hla.omt as fom
from functools import partial
[docs]class MessageBufferReader:
def __init__(self, data):
self.data = data
self.index = 0
self.endian = self.read_octet()
if self.endian[0] == 0:
self.read_int16 = partial(self.__read_fom, fom.HLAinteger16LE)
self.read_int32 = partial(self.__read_fom, fom.HLAinteger32LE)
self.read_int64 = partial(self.__read_fom, fom.HLAinteger64LE)
self.read_float = partial(self.__read_fom, fom.HLAfloat32LE)
self.read_double = partial(self.__read_fom, fom.HLAfloat64LE)
else:
self.read_int16 = partial(self.__read_fom, fom.HLAinteger16BE)
self.read_int32 = partial(self.__read_fom, fom.HLAinteger32BE)
self.read_int64 = partial(self.__read_fom, fom.HLAinteger64BE)
self.read_float = partial(self.__read_fom, fom.HLAfloat32BE)
self.read_double = partial(self.__read_fom, fom.HLAfloat64BE)
self.size = self.read_int32()
def __read_fom(self, obj):
value, off = obj.unpack(self.data[self.index:])
self.index += off
return value
[docs] def read_octet(self):
return self.__read_fom(fom.HLAoctet)
[docs] def read_string(self):
len = self.read_int32()
res = ""
for i in (0, len):
res[i] = self.read_octet()
return res
[docs]class MessageBufferWriter:
MAGIC_HEADER_SIZE = 5
def __init__(self):
self.data = bytes()
self.little_endian = (sys.byteorder == 'little')
if self.little_endian:
self.add_octet = partial(self.__add_data, fom.HLAoctet)
self.add_int16 = partial(self.__add_data, fom.HLAinteger16LE)
self.add_int32 = partial(self.__add_data, fom.HLAinteger32LE)
self.add_int64 = partial(self.__add_data, fom.HLAinteger64LE)
self.add_float = partial(self.__add_data, fom.HLAfloat32LE)
self.add_double = partial(self.__add_data, fom.HLAfloat64LE)
self.write_octet = partial(self.__write_data, fom.HLAoctet)
self.write_int16 = partial(self.__write_data, fom.HLAinteger16LE)
self.write_int32 = partial(self.__write_data, fom.HLAinteger32LE)
self.write_int64 = partial(self.__write_data, fom.HLAinteger64LE)
self.write_float = partial(self.__write_data, fom.HLAfloat32LE)
self.write_double = partial(self.__write_data, fom.HLAfloat64LE)
else:
self.add_octet = partial(self.__add_data, fom.HLAoctet)
self.add_int16 = partial(self.__add_data, fom.HLAinteger16BE)
self.add_int32 = partial(self.__add_data, fom.HLAinteger32BE)
self.add_int64 = partial(self.__add_data, fom.HLAinteger64BE)
self.add_float = partial(self.__add_data, fom.HLAfloat32BE)
self.add_double = partial(self.__add_data, fom.HLAfloat64BE)
self.write_octet = partial(self.__write_data, fom.HLAoctet)
self.write_int16 = partial(self.__write_data, fom.HLAinteger16BE)
self.write_int32 = partial(self.__write_data, fom.HLAinteger32BE)
self.write_int64 = partial(self.__write_data, fom.HLAinteger64BE)
self.write_float = partial(self.__write_data, fom.HLAfloat32BE)
self.write_double = partial(self.__write_data, fom.HLAfloat64BE)
def __add_data(self, obj, value):
self.data += obj.pack(value)
def __write_data(self, obj, value):
self.__add_data(obj, value)
return self.write()
[docs] def add_string(self, value):
len_s = len(value)
self.add_int32(len_s)
for i in range(0, len_s):
self.add_octet(bytes(value[i], 'utf-8'))
[docs] def write_string(self, value):
self.add_string(value)
return self.write()
[docs] def write(self):
res = bytes()
size_packet = len(self.data) + self.MAGIC_HEADER_SIZE
if self.little_endian:
res = fom.HLAoctet.pack(b'\0')
res += fom.HLAinteger32LE.pack(size_packet)
else:
res = fom.HLAoctet.pack(b'\1')
res += fom.HLAinteger32BE.pack(size_packet)
res += self.data
return res