import struct class CrcXmodem: def __init__(self): self.crc = 0 def reset(self): self.crc = 0 def accumulate(self, data: bytearray): for b in data: self.crc = self.crc ^ ((b << 8) & 0xFFFF) for _ in range(8): if self.crc & 0x8000: self.crc = ((self.crc << 1) & 0xFFFF) ^ 0x1021 else: self.crc = (self.crc << 1) & 0xFFFF def accumulate_str(self, data: str): try: bytesarray = bytearray(data) except TypeError: bytesarray = bytearray(data.encode()) self.accumulate(bytesarray) def to_string(s): try: return s.decode("utf-8") except Exception: pass try: s2 = s.encode("utf-8", "ignore") x = "%s" % s2 return s2 except Exception: pass r = "" try: for c in s: r2 = r + c r2 = r2.encode("ascii", "ignore") x = "%s" % r2 # noqa: F841 r = r2 except Exception: pass return r + "_XXX" class HydrolinkHeader: packer = struct.Struct("!BBBB") def __init__(self, msg_id=0, payload_length=0, dst_id=0, src_id=0): self.payload_length = payload_length self.msg_id = msg_id self.src_id = src_id self.dst_id = dst_id def pack(self): return self.packer.pack(self.msg_id, self.payload_length, self.dst_id, self.src_id) def unpack(self, msg_buffer: bytes): x = self.packer.unpack(msg_buffer[0:self.length()]) self.msg_id = x[0] self.payload_length = x[1] self.dst_id = x[2] self.src_id = x[3] return self @classmethod def from_buffer(cls, msg_buffer: bytes): return cls().unpack(msg_buffer) @staticmethod def length(): return 4 class HydrolinkMessage: def __init__(self, msg_id, name): self._header = HydrolinkHeader(msg_id=msg_id) self._payload = None self._msg_buffer = None self._crc = None self._fieldnames = [] self._type = name @staticmethod def crc_from_buffer(msg_buffer: bytes): return struct.unpack("!H", msg_buffer[-2:]) def format_attr(self, field): raw_attr = getattr(self, field) if isinstance(raw_attr, bytes): raw_attr = to_string(raw_attr).rstrip("\\00") return raw_attr def get_header(self): return self._header def get_payload(self): return self._payload def get_msg_buffer(self): return self._msg_buffer def get_crc(self): return self._crc def get_fieldnames(self): return self._fieldnames def get_type(self): return self._type def get_msg_id(self): return self._header.msg_id def get_dst_id(self): return self._header.dst_id def get_src_id(self): return self._header.src_id def __str__(self): ret = f"{self._type}" + " {" for name in self._fieldnames: value = self.format_attr(name) ret += f"{name}: {value}, " ret = ret[0:-2] + "}" return ret def __eq__(self, other): if other is None: return False if self.get_type() != other.get_type(): return False if self.get_src_id() != other.get_src_id(): return False if self.get_dst_id() != other.get_dst_id(): return False for name in self._fieldnames: if self.format_attr(name) != other.format_attr(name): return False return True def __ne__(self, other): return not self.__eq__(other) def to_dict(self): d = {} d["type"] = self._type for name in self._fieldnames: d[name] = self.format_attr(name) return d def pack(self, header: HydrolinkHeader, crc_extra, payload): self._payload = payload self._header = header self._header.payload_length = len(self._payload) self._msg_buffer = self._header.pack() + self._payload crc = CrcXmodem() crc.accumulate(self._msg_buffer) crc.accumulate(struct.pack("B", crc_extra)) self._crc = crc.crc self._msg_buffer += struct.pack("!H", self._crc) return self._msg_buffer def __getitem(self, key): if self._instances is None: raise IndexError() if key not in self._instances: raise KeyError() return self._instances[key]