hydrolink/gen_python/hydrolink_msg.py

175 lines
4.5 KiB
Python

import struct
class CrcXmodem:
def __init__(self):
self.crc = 0
def reset(self):
self.crc = 0
def accumulate(self, data: bytearray):
for b in data:
self.crc = self.crc ^ ((b << 8) & 0xFFFF)
for _ in range(8):
if self.crc & 0x8000:
self.crc = ((self.crc << 1) & 0xFFFF) ^ 0x1021
else:
self.crc = (self.crc << 1) & 0xFFFF
def accumulate_str(self, data: str):
try:
bytesarray = bytearray(data)
except TypeError:
bytesarray = bytearray(data.encode())
self.accumulate(bytesarray)
def to_string(s):
try:
return s.decode("utf-8")
except Exception:
pass
try:
s2 = s.encode("utf-8", "ignore")
x = "%s" % s2
return s2
except Exception:
pass
r = ""
try:
for c in s:
r2 = r + c
r2 = r2.encode("ascii", "ignore")
x = "%s" % r2 # noqa: F841
r = r2
except Exception:
pass
return r + "_XXX"
class HydrolinkHeader:
packer = struct.Struct("!BBBB")
def __init__(self, msg_id=0, payload_length=0, dst_id=0, src_id=0):
self.payload_length = payload_length
self.msg_id = msg_id
self.src_id = src_id
self.dst_id = dst_id
def pack(self):
return self.packer.pack(self.msg_id, self.payload_length, self.dst_id, self.src_id)
def unpack(self, msg_buffer: bytes):
x = self.packer.unpack(msg_buffer[0:self.length()])
self.msg_id = x[0]
self.payload_length = x[1]
self.dst_id = x[2]
self.src_id = x[3]
return self
@classmethod
def from_buffer(cls, msg_buffer: bytes):
return cls().unpack(msg_buffer)
@staticmethod
def length():
return 4
class HydrolinkMessage:
def __init__(self, msg_id, name):
self._header = HydrolinkHeader(msg_id=msg_id)
self._payload = None
self._msg_buffer = None
self._crc = None
self._fieldnames = []
self._type = name
@staticmethod
def crc_from_buffer(msg_buffer: bytes):
return struct.unpack("!H", msg_buffer[-2:])
def format_attr(self, field):
raw_attr = getattr(self, field)
if isinstance(raw_attr, bytes):
raw_attr = to_string(raw_attr).rstrip("\\00")
return raw_attr
def get_header(self):
return self._header
def get_payload(self):
return self._payload
def get_msg_buffer(self):
return self._msg_buffer
def get_crc(self):
return self._crc
def get_fieldnames(self):
return self._fieldnames
def get_type(self):
return self._type
def get_msg_id(self):
return self._header.msg_id
def get_dst_id(self):
return self._header.dst_id
def get_src_id(self):
return self._header.src_id
def __str__(self):
ret = f"{self._type}" + " {"
for name in self._fieldnames:
value = self.format_attr(name)
ret += f"{name}: {value}, "
ret = ret[0:-2] + "}"
return ret
def __eq__(self, other):
if other is None:
return False
if self.get_type() != other.get_type():
return False
if self.get_src_id() != other.get_src_id():
return False
if self.get_dst_id() != other.get_dst_id():
return False
for name in self._fieldnames:
if self.format_attr(name) != other.format_attr(name):
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def to_dict(self):
d = {}
d["type"] = self._type
for name in self._fieldnames:
d[name] = self.format_attr(name)
return d
def pack(self, header: HydrolinkHeader, crc_extra, payload):
self._payload = payload
self._header = header
self._header.payload_length = len(self._payload)
self._msg_buffer = self._header.pack() + self._payload
crc = CrcXmodem()
crc.accumulate(self._msg_buffer)
crc.accumulate(struct.pack("B", crc_extra))
self._crc = crc.crc
self._msg_buffer += struct.pack("!H", self._crc)
return self._msg_buffer
def __getitem(self, key):
if self._instances is None:
raise IndexError()
if key not in self._instances:
raise KeyError()
return self._instances[key]