From 588ea34510377328205dcf114026cbe9b37c9422 Mon Sep 17 00:00:00 2001 From: Thies Lennart Alff Date: Wed, 19 Feb 2025 23:27:24 +0100 Subject: [PATCH] initial commit --- .gitignore | 3 + __init__.py | 0 gen_c/__init__.py | 0 gen_c/gen_enums.py | 23 ++++ gen_c/gen_messages.py | 82 +++++++++++++ gen_c/hydrolink_gen_c.py | 183 +++++++++++++++++++++++++++++ gen_python/hydrolink_gen_python.py | 179 ++++++++++++++++++++++++++++ gen_python/hydrolink_msg.py | 177 ++++++++++++++++++++++++++++ hydrolink_def.xml | 46 ++++++++ hydrolink_gen.py | 30 +++++ hydroparse/__init__.py | 14 +++ hydroparse/hydro_enum.py | 15 +++ hydroparse/hydro_field.py | 62 ++++++++++ hydroparse/hydro_message_type.py | 110 +++++++++++++++++ hydroparse/hydro_xml.py | 99 ++++++++++++++++ hydroparse/type_lengths.py | 13 ++ 16 files changed, 1036 insertions(+) create mode 100644 .gitignore create mode 100644 __init__.py create mode 100644 gen_c/__init__.py create mode 100644 gen_c/gen_enums.py create mode 100644 gen_c/gen_messages.py create mode 100644 gen_c/hydrolink_gen_c.py create mode 100644 gen_python/hydrolink_gen_python.py create mode 100644 gen_python/hydrolink_msg.py create mode 100644 hydrolink_def.xml create mode 100644 hydrolink_gen.py create mode 100644 hydroparse/__init__.py create mode 100644 hydroparse/hydro_enum.py create mode 100644 hydroparse/hydro_field.py create mode 100644 hydroparse/hydro_message_type.py create mode 100644 hydroparse/hydro_xml.py create mode 100644 hydroparse/type_lengths.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..46a8189 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__ +*.pyc +/generated/ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gen_c/__init__.py b/gen_c/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/gen_c/gen_enums.py b/gen_c/gen_enums.py new file mode 100644 index 0000000..7ef3d7d --- /dev/null +++ b/gen_c/gen_enums.py @@ -0,0 +1,23 @@ +from hydroparse import HydroEnum, HydroEnumEntry +import os + + +def generate_enums_h(dir, enums: list[HydroEnum]): + with open(os.path.join(dir, "hydrolink_enums.h"), "w") as f: + for enum in enums: + f.write(f"""#pragma once +typedef enum {enum.name} {{ +{enum_entries(enum)} +}}{enum.name}; + """) + + +def enum_entries(enum: HydroEnum): + array = [] + entry: HydroEnumEntry + for entry in enum.entries: + array.append( + f"{enum.name}_{entry.name} = {entry.value}, /// {entry.description}" + ) + array.append(f"{enum.name}_ENUM_END = {enum.highest_value + 1}") + return "\n".join(array) diff --git a/gen_c/gen_messages.py b/gen_c/gen_messages.py new file mode 100644 index 0000000..a44a829 --- /dev/null +++ b/gen_c/gen_messages.py @@ -0,0 +1,82 @@ +from hydroparse import HydroMessageType +import os + +NS_PREFIX_LOWER = "hydrolink_" +GENERIC_MSG_NAME = f"{NS_PREFIX_LOWER}msg" +GENERIC_MSG_STRUCT = f"{GENERIC_MSG_NAME}_s" +GENERIC_MSG_TYPE = f"{GENERIC_MSG_NAME}_t" +MSG_ID_PREFIX = "HYDROLINK_MSG_ID_" + + +def generate_message_h(dir, m: HydroMessageType): + msg_name = f"{NS_PREFIX_LOWER}msg_{m.name_lower}" + msg_type = f"{msg_name}_t" + msg_struct = f"{msg_type}_s" + + def decode_payload_str(m: HydroMessageType): + array = [] + for field in m.ordered_fields: + array.append( + f" {m.name_lower}->{field.name} = {msg_name}_get_{field.name}(packet);" + ) + return "\n".join(array) + + struct_fields = [] + + for i, field in enumerate(m.ordered_fields): + if field.units: + units = f"[{field.units}]" + else: + units = "" + if field.array_length: + struct_fields.append( + f"{field.type} {field.name}[{field.array_length}]; /// {units} {field.description}" + ) + else: + struct_fields.append( + f"{field.type} {field.name}; /// {units} {field.description}" + ) + struct_fields_str = "\n".join(struct_fields) + + encode_payload = [] + wire_offset = 0 + for i, field in enumerate(m.ordered_fields): + if field.array_length: + raise Exception("Arrays currently not supported!") + else: + typename = field.type + encode_payload.append( + f" hydrolink_put_{typename}(HYDROLINK_PAYLOAD_NON_CONST(packet), {field.wire_offset}, msg->{field.name});" + ) + wire_offset += field.wire_length + encode_payload_str = "\n".join(encode_payload) + with open(os.path.join(dir, f"hydrolink_msg_{m.name_lower}.h"), "w") as f: + f.write(f"""#pragma once +#include "protocol.h" +#define {MSG_ID_PREFIX}{m.name} {m.id} +#define {MSG_ID_PREFIX}{m.name}_LEN {m.wire_length} +#define {MSG_ID_PREFIX}{m.id}_LEN {m.wire_length} +#define {MSG_ID_PREFIX}{m.name}_CRC_EXTRA {m.crc_extra} + +typedef struct {msg_struct} {{ +{struct_fields_str} +}} {msg_type}; + +void {NS_PREFIX_LOWER}{m.name_lower}_msg_encode(uint8_t src_id, uint8_t dst_id, const {msg_type} *msg, {GENERIC_MSG_TYPE} *packet) {{ + packet->id = {MSG_ID_PREFIX}{m.name}; + packet->src_id = src_id; + packet->dst_id = dst_id; + packet->length = {MSG_ID_PREFIX}{m.id}_LEN + HYDROLINK_NON_PAYLOAD_LEN; +{encode_payload_str} +}} + """) + wire_offset = 0 + for field in m.ordered_fields: + f.write(f""" +static inline {field.type} {msg_name}_get_{field.name}(const {GENERIC_MSG_TYPE} *packet) {{ + return HYDROLINK_RETURN_{field.type}(packet, {field.wire_offset}); +}}""") + f.write(f""" +static inline void {NS_PREFIX_LOWER}msg{m.name_lower}_decode(const {GENERIC_MSG_TYPE} *packet, {msg_type} *{m.name_lower}) {{ +{decode_payload_str(m)} +}}""") diff --git a/gen_c/hydrolink_gen_c.py b/gen_c/hydrolink_gen_c.py new file mode 100644 index 0000000..7682e22 --- /dev/null +++ b/gen_c/hydrolink_gen_c.py @@ -0,0 +1,183 @@ +from hydroparse import TYPE_LENGTHS +from .gen_messages import generate_message_h +from .gen_enums import generate_enums_h +import os + +NS_PREFIX_LOWER = "hydrolink_" +GENERIC_MSG_NAME = f"{NS_PREFIX_LOWER}msg" +GENERIC_MSG_STRUCT = f"{GENERIC_MSG_NAME}_s" +GENERIC_MSG_TYPE = f"{GENERIC_MSG_NAME}_t" +MSG_ID_PREFIX = "HYDROLINK_MSG_ID_" + +def generate_main_header(msgs, dir): + with open(os.path.join(dir, "hydrolink.h"), "w") as f: + f.write(f"""#pragma once +#include "protocol.h" +{"\n".join(f"#include \"{GENERIC_MSG_NAME}_{msg.name_lower}.h\"" for msg in msgs)} +""") + +def generate_protocol(xml, dir): + put_defines = [] + for typename in TYPE_LENGTHS: + length = TYPE_LENGTHS[typename] + if length == 1: + put_defines.append( + f"#define {NS_PREFIX_LOWER}put_{typename}(buf, wire_offset, b) buf[wire_offset] = ({typename})b" + ) + else: + put_defines.append( + f"#define {NS_PREFIX_LOWER}put_{typename}(buf, wire_offset, b) byte_swap_{length}(&buf[wire_offset], (const char *)&b)" + ) + + put_defines_str = "\n".join(put_defines) + with open(os.path.join(dir, "protocol.h"), "w") as f: + f.write(f""" +#pragma once + +#include +#define HYDROLINK_HEADER_LEN 4 +#define HYDROLINK_CRC_LEN 2 +#define HYDROLINK_NON_PAYLOAD_LEN (HYDROLINK_HEADER_LEN + HYDROLINK_CRC_LEN) +#define HYDROLINK_MAX_PAYLOAD_LEN {xml.largest_payload} +#define HYDROLINK_MAX_MSG_LEN (HYDROLINK_MAX_PAYLOAD_LEN + HYDROLINK_NON_PAYLOAD_LEN) + +struct {GENERIC_MSG_STRUCT}; + +typedef struct {GENERIC_MSG_STRUCT} {GENERIC_MSG_TYPE}; + +struct {GENERIC_MSG_STRUCT} {{ + uint8_t id; + uint8_t payload_length; + uint8_t dst_id; + uint8_t src_id; + uint8_t payload[HYDROLINK_MAX_PAYLOAD_LEN]; + uint16_t crc; +}}; + +#define HYDROLINK_PAYLOAD(msg) ((const char *)(&((msg)->payload[0]))) +#define HYDROLINK_PAYLOAD_NON_CONST(msg) ((char *)(&((msg)->payload[0]))) + +{put_defines_str} + +inline void byte_swap_2(char *dst, const char *src) {{ + dst[0] = src[1]; + dst[1] = src[0]; +}} + +inline void byte_swap_4(char *dst, const char *src) {{ + dst[0] = src[3]; + dst[1] = src[2]; + dst[2] = src[1]; + dst[3] = src[0]; +}} + +inline void byte_swap_8(char *dst, const char *src) {{ + dst[0] = src[7]; + dst[1] = src[6]; + dst[2] = src[5]; + dst[3] = src[4]; + dst[4] = src[3]; + dst[5] = src[2]; + dst[6] = src[1]; + dst[7] = src[0]; +}} + """) + f.write(f""" +#define HYDROLINK_MSG_RETURN_TYPE(TYPE, SIZE) \\ +static inline TYPE HYDROLINK_RETURN_## TYPE(const {GENERIC_MSG_TYPE} *msg, uint8_t offset) {{\\ + TYPE ret; \\ + byte_swap_## SIZE((char*)&ret, &HYDROLINK_PAYLOAD(msg)[offset]); \\ + return ret; \\ +}} + """) + for typename in TYPE_LENGTHS: + length = TYPE_LENGTHS[typename] + if not length > 1: + f.write(f""" +#define HYDROLINK_RETURN_{typename}(msg, wire_offset) ({typename})HYDROLINK_PAYLOAD(msg)[wire_offset]""") + else: + f.write(f""" +HYDROLINK_MSG_RETURN_TYPE({typename}, {length}) +""") + f.write(""" +inline void crc_xmodem_init(uint16_t *crc) { *crc = 0; } + +inline void crc_xmodem_accumulate(uint8_t data, uint16_t *crc) { + *crc = *crc ^ ((uint16_t)data << 8); + for (uint8_t j = 0; j < 8; j++) { + if (*crc & 0x8000) { + *crc = (*crc << 1) ^ 0x1021; + } else { + *crc <<= 1; + } + } +} + +inline uint16_t crc_xmodem_calculate(const uint8_t *data, uint8_t length) { + uint16_t crc; + crc_xmodem_init(&crc); + for (uint8_t i = 0; i < length; i++) { + crc_xmodem_accumulate(*data, &crc); + } + return crc; +} +""") + f.write(f""" +void hydrolink_fill_header_and_crc({GENERIC_MSG_TYPE} *msg, uint8_t msg_id, uint8_t payload_length, uint8_t dst_id, uint8_t src_id, uint8_t crc_extra) {{ + msg->id = msg_id; + msg->payload_length = payload_length; + msg->dst_id = dst_id; + msg->src_id = src_id; + uint16_t crc; + crc_xmodem_init(&crc); + for(int i = 0; i < msg->payload_length; ++i) {{ + crc_xmodem_accumulate(msg->payload[i], &crc); + }} + crc_xmodem_accumulate(crc_extra, &crc); + msg->crc = crc; +}} + +uint8_t hydrolink_serialize_message({GENERIC_MSG_TYPE} *msg, uint8_t *buffer) {{ + buffer[0] = msg->id; + buffer[1] = msg->payload_length; + buffer[2] = msg->dst_id; + buffer[3] = msg->src_id; + uint8_t *p = &buffer[4]; + for(int i =0; ipayload_length;++i) {{ + *p++ = msg->payload[i]; + }} + *p++ = (uint8_t)(msg->crc >> 8); + *p++ = (uint8_t)(msg->crc & 0xff); + return HYDROLINK_NON_PAYLOAD_LEN + msg->payload_length; +}} + +uint8_t hydrolink_deserialize_header({GENERIC_MSG_TYPE} *msg, uint8_t *buffer, uint8_t buffer_length) {{ + if (buffer_length < HYDROLINK_NON_PAYLOAD_LEN) {{ + return 0; + }} + msg->id = buffer[0]; + msg->payload_length = buffer[1]; + msg->dst_id = buffer[2]; + msg->src_id = buffer[3]; + msg->crc = buffer[buffer_length-1] | (((uint16_t)buffer[buffer_length-2]) << 8); + if (msg->payload_length + HYDROLINK_NON_PAYLOAD_LEN != buffer_length) {{ + return 0; + }} + uint8_t *p = &buffer[HYDROLINK_HEADER_LEN]; + for (int i = 0; i < msg->payload_length; ++i) {{ + msg->payload[i] = *p++; + }} + return msg->payload_length + HYDROLINK_NON_PAYLOAD_LEN; +}} +""") + + +def generate(xmls, out_dir): + msgs = [] + for x in xmls: + msgs.extend(x.messages) + for msg in msgs: + generate_message_h(os.path.join(out_dir), msg) + generate_main_header(msgs, out_dir) + generate_protocol(xmls[0], out_dir) + generate_enums_h(out_dir, xmls[0].enums) diff --git a/gen_python/hydrolink_gen_python.py b/gen_python/hydrolink_gen_python.py new file mode 100644 index 0000000..4816f6a --- /dev/null +++ b/gen_python/hydrolink_gen_python.py @@ -0,0 +1,179 @@ +import os +from hydroparse import HydroMessageType, HydroField, HydroEnum, HydroEnumEntry + + +def to_camel_case(text: str): + return "".join(x.capitalize() for x in text.lower().split("_")) + + +def indent_block(text: str, level: int, indent_width=4): + indent = " " * indent_width * level + return text.replace("\n", "\n" + indent) + +def generate_hydrolink_msg(out_dir): + in_dir = os.path.dirname(os.path.realpath(__file__)) + infilepath = os.path.join(in_dir, "hydrolink_msg.py") + outfilepath = os.path.join(out_dir, "hydrolink_msg.py") + with open(infilepath, "r") as fin: + with open(outfilepath, "w") as fout: + fout.write(fin.read()) + +def generate_enums(out_dir, enums: list[HydroEnum]): + with open(os.path.join(out_dir, "hydrolink_enums.py"), "w") as f: + f.write("""from dataclasses import dataclass + + +@dataclass(frozen=True) +class EnumEntry: + description: str + value: int + + +""") + for enum in enums: + f.write(f"""@dataclass(frozen=True) +class {enum.name}: + {indent_block("\n".join(f"{entry.name} = EnumEntry(value={int(entry.value)}, description='''{entry.description}''')" for entry in enum.entries), level=1)} + + +""") + + +def generate_hydrolink(out_dir, msgs): + with open(os.path.join(out_dir, "hydro.py"), "w") as f: + f.write(f"""import struct +from .hydrolink_msg import HydrolinkHeader, HydrolinkMessage, CrcXmodem +from .hydrolink_msg_ids import {", ".join(f"HYDROLINK_MSG_ID_{msg.name}" for msg in msgs)} +{"\n".join(f"from .hydrolink_{msg.name_lower}_message import Hydrolink{to_camel_case(msg.name)}Message" for msg in msgs)} +import hydrolink_enums as ENUMS + + +MSG_ID_TO_CLASS_MAP = {{ + {indent_block("\n".join(f"HYDROLINK_MSG_ID_{msg.name}: Hydrolink{to_camel_case(msg.name)}Message," for msg in msgs), 1)} +}} + +crc_unpacker = struct.Struct("!H") + +class HydrolinkError(Exception): + pass + +class HydrolinkCrcError(HydrolinkError): + pass + +class HydrolinkMessageLengthError(HydrolinkError): + pass + +def validate_message(msg_buffer): + crc_length = 2 + n_overhead = crc_length + HydrolinkHeader.length() + n_buffer = len(msg_buffer) + if n_buffer < n_overhead: + err = f"Message buffer length ({{n_buffer}} < minimal packet size {{n_overhead}})" + raise HydrolinkMessageLengthError(err) + header = HydrolinkHeader.from_buffer(msg_buffer) + msg_class = MSG_ID_TO_CLASS_MAP[header.msg_id] + if n_buffer != n_overhead + header.payload_length: + err = f"buffer length ({{n_buffer}} - {{n_overhead}} != {{header.payload_length}}) and declared payload length do not match!" + raise HydrolinkMessageLengthError(err) + crc_engine = CrcXmodem() + print(f"Computing crc over {{[int(x) for x in msg_buffer[:-crc_length]]}}") + crc_engine.accumulate(msg_buffer[:-crc_length]) + crc_engine.accumulate(struct.pack("B", msg_class.crc_extra)) + crc_computed = crc_engine.crc + (crc_transmitted,) = crc_unpacker.unpack(msg_buffer[-crc_length:]) + if crc_computed != crc_transmitted: + err = f"Computed crc ({{crc_computed}} does not match transmitted one ({{crc_transmitted}}))" + raise HydrolinkCrcError(err) + return msg_class.from_buffer(msg_buffer) +""") + + + +def generate_message_ids(out_dir, msgs: list[HydroMessageType]): + with open(os.path.join(out_dir, "hydrolink_msg_ids.py"), "w") as f: + for msg in msgs: + f.write(f""" +HYDROLINK_MSG_ID_{msg.name.upper()} = {msg.id}""") + +def generate_message(out_dir, msg: HydroMessageType): + filename = f"hydrolink_{msg.name_lower}_message.py" + with open(os.path.join(out_dir, filename), "w") as f: + f.write(f""" +from .hydrolink_msg import HydrolinkMessage, HydrolinkHeader +import struct + + +class Hydrolink{to_camel_case(msg.name_lower)}Message(HydrolinkMessage): + id = {msg.id} + name = "HYDROLINK_{msg.name}_MSG" + fieldnames = {msg.fieldnames} + ordered_fieldnames = {msg.ordered_fieldnames} + crc_extra = {msg.crc_extra} + unpacker = struct.Struct('{msg.fmtstr}') + length = {msg.wire_length} + + def __init__(self, {", ".join(msg.ordered_fieldnames)}): + super().__init__(msg_id={msg.id}, name=self.name) + {indent_block("\n".join(f"self.{fieldname} = {fieldname}" for fieldname in msg.fieldnames), 2)} + self._fieldnames = self.fieldnames + + def pack(self, src_id, dst_id): + header = HydrolinkHeader(msg_id=self.id, dst_id=dst_id, src_id=src_id) + packed_payload = self.unpacker.pack({", ".join("self." + field.name for field in msg.ordered_fields)}) + return HydrolinkMessage.pack(self, header=header, crc_extra={msg.crc_extra}, payload=packed_payload) + + @classmethod + def unpack_fields(cls, msg_buffer): + crc_length = 2 + return cls.unpacker.unpack(msg_buffer[HydrolinkHeader.length():-crc_length]) + + @classmethod + def from_buffer(cls, msg_buffer): + header = HydrolinkHeader.from_buffer(msg_buffer) + return cls(*cls.unpack_fields(msg_buffer)) + + +""") + + +def format_str(field: HydroField): + # maps field types to format strings according to the 'struct' module + map = { + "float": "f", + "double": "d", + "char": "c", + "int8_t": "b", + "uint8_t": "B", + "int16_t": "h", + "uint16_t": "H", + "int32_t": "i", + "uint32_t": "I", + "int64_t": "q", + "uint64_t": "Q", + } + if field.array_length > 0: + if field.type == "char": + return str(field.array_length) + "s" + return str(field.array_length) + map[field.type] + return map[field.type] + + +def generate(xmls, out_dir): + msgs = [] + for x in xmls: + msgs.extend(x.messages) + generate_message_ids(out_dir, msgs) + generate_hydrolink(out_dir, msgs) + msg: HydroMessageType + for msg in msgs: + msg.fmtstr = "!" + for field in msg.ordered_fields: + msg.fmtstr += format_str(field) + generate_message(out_dir, msg) + generate_hydrolink_msg(out_dir) + enums = [] + for x in xmls: + enums.extend(x.enums) + generate_enums(out_dir, enums) + # generate_protocol(xmls[0], out_dir) + # generate_enums_h(out_dir, xmls[0].enums) diff --git a/gen_python/hydrolink_msg.py b/gen_python/hydrolink_msg.py new file mode 100644 index 0000000..bf1c779 --- /dev/null +++ b/gen_python/hydrolink_msg.py @@ -0,0 +1,177 @@ +import struct + + +class CrcXmodem: + def __init__(self): + self.crc = 0 + + def reset(self): + self.crc = 0 + + def accumulate(self, data: bytearray): + for b in data: + self.crc = self.crc ^ ((b << 8) & 0xFFFF) + for _ in range(8): + if self.crc & 0x8000: + self.crc = ((self.crc << 1) & 0xFFFF) ^ 0x1021 + else: + self.crc = (self.crc << 1) & 0xFFFF + + def accumulate_str(self, data: str): + try: + bytesarray = bytearray(data) + except TypeError: + bytesarray = bytearray(data.encode()) + self.accumulate(bytesarray) + + +def to_string(s): + try: + return s.decode("utf-8") + except Exception: + pass + try: + s2 = s.encode("utf-8", "ignore") + x = "%s" % s2 + return s2 + except Exception: + pass + r = "" + try: + for c in s: + r2 = r + c + r2 = r2.encode("ascii", "ignore") + x = "%s" % r2 # noqa: F841 + r = r2 + except Exception: + pass + return r + "_XXX" + + +class HydrolinkHeader: + packer = struct.Struct("!BBBB") + + def __init__(self, msg_id=0, payload_length=0, dst_id=0, src_id=0): + self.payload_length = payload_length + self.msg_id = msg_id + self.src_id = src_id + self.dst_id = dst_id + + def pack(self): + return self.packer.pack(self.msg_id, self.payload_length, self.dst_id, self.src_id) + + def unpack(self, msg_buffer: bytes): + x = self.packer.unpack(msg_buffer[0:self.length()]) + self.msg_id = x[0] + self.payload_length = x[1] + self.dst_id = x[2] + self.src_id = x[3] + return self + + @classmethod + def from_buffer(cls, msg_buffer: bytes): + return cls().unpack(msg_buffer) + + @staticmethod + def length(): + return 4 + + +class HydrolinkMessage: + def __init__(self, msg_id, name): + self._header = HydrolinkHeader(msg_id=msg_id) + self._payload = None + self._msg_buffer = None + self._crc = None + self._fieldnames = [] + self._type = name + + @staticmethod + def crc_from_buffer(msg_buffer: bytes): + return struct.unpack("!H", msg_buffer[-2:]) + + def format_attr(self, field): + raw_attr = getattr(self, field) + if isinstance(raw_attr, bytes): + raw_attr = to_string(raw_attr).rstrip("\\00") + return raw_attr + + def get_header(self): + return self._header + + def get_payload(self): + return self._payload + + def get_msg_buffer(self): + return self._msg_buffer + + def get_crc(self): + return self._crc + + def get_fieldnames(self): + return self._fieldnames + + def get_type(self): + return self._type + + def get_msg_id(self): + return self._header.msg_id + + def get_dst_id(self): + return self._header.dst_id + + def get_src_id(self): + return self._header.src_id + + def __str__(self): + ret = f"{self._type}" + " {" + for name in self._fieldnames: + value = self.format_attr(name) + ret += f"{name}: {value}, " + ret = ret[0:-2] + "}" + return ret + + def __eq__(self, other): + if other is None: + return False + if self.get_type() != other.get_type(): + return False + if self.get_src_id() != other.get_src_id(): + return False + if self.get_dst_id() != other.get_dst_id(): + return False + for name in self._fieldnames: + if self.format_attr(name) != other.format_attr(name): + return False + return True + + def __ne__(self, other): + return not self.__eq__(other) + + def to_dict(self): + d = {} + d["type"] = self._type + for name in self._fieldnames: + d[name] = self.format_attr(name) + return d + + def pack(self, header: HydrolinkHeader, crc_extra, payload): + self._payload = payload + self._header = header + self._header.payload_length = len(self._payload) + self._msg_buffer = self._header.pack() + self._payload + crc = CrcXmodem() + print(f"Computing crc over {[int(x) for x in self._msg_buffer]}") + crc.accumulate(self._msg_buffer) + crc.accumulate(struct.pack("B", crc_extra)) + self._crc = crc.crc + print(f"Computed crc: {self._crc}") + self._msg_buffer += struct.pack("!H", self._crc) + return self._msg_buffer + + def __getitem(self, key): + if self._instances is None: + raise IndexError() + if key not in self._instances: + raise KeyError() + return self._instances[key] diff --git a/hydrolink_def.xml b/hydrolink_def.xml new file mode 100644 index 0000000..085025b --- /dev/null +++ b/hydrolink_def.xml @@ -0,0 +1,46 @@ + + + + + Command for getting/setting the state of a switch + + + Command for getting/setting the value of a PWM channel + + + + + + + The received command was unknown or unhandled. + + + + + + + + + + + + + Command defined as in CMD_ID enum. + + + + + + + + + + + + + + + + + + diff --git a/hydrolink_gen.py b/hydrolink_gen.py new file mode 100644 index 0000000..e100564 --- /dev/null +++ b/hydrolink_gen.py @@ -0,0 +1,30 @@ +from gen_c import hydrolink_gen_c +from gen_python import hydrolink_gen_python +import hydroparse + +import pathlib +import os + + +def generate_c(): + this_file_dir = os.path.dirname(os.path.realpath(__file__)) + xml_path = os.path.join(this_file_dir, "hydrolink_def.xml") + m = hydroparse.HydroXml(xml_path) + + path = pathlib.Path(this_file_dir) + path = path / "generated" / "c" / "include" / "hydrolink" + path.mkdir(parents=True, exist_ok=True) + hydrolink_gen_c.generate([m], str(path)) + + path = pathlib.Path(this_file_dir) + path = path / "generated" / "python" / "hydrolink" + path.mkdir(parents=True, exist_ok=True) + hydrolink_gen_python.generate([m], str(path)) + + +def main(): + generate_c() + + +if __name__ == "__main__": + main() diff --git a/hydroparse/__init__.py b/hydroparse/__init__.py new file mode 100644 index 0000000..3872e15 --- /dev/null +++ b/hydroparse/__init__.py @@ -0,0 +1,14 @@ +from .hydro_field import HydroField +from .hydro_message_type import HydroMessageType +from .hydro_xml import HydroXml +from .type_lengths import TYPE_LENGTHS +from .hydro_enum import HydroEnumEntry, HydroEnum + +__all__ = [ + "HydroField", + "HydroMessageType", + "HydroXml", + "TYPE_LENGTHS", + "HydroEnumEntry", + "HydroEnum", +] diff --git a/hydroparse/hydro_enum.py b/hydroparse/hydro_enum.py new file mode 100644 index 0000000..11b8ff6 --- /dev/null +++ b/hydroparse/hydro_enum.py @@ -0,0 +1,15 @@ +class HydroEnumEntry: + def __init__(self, name, value, description='', auto=False): + self.name = name + self.value = value + self.description = description + +class HydroEnum: + def __init__(self, name, linenumber, description='', bitmask=False): + self.name = name + self.linenumber = linenumber + self.description = description + self.start_value = None + self.highest_value = 0 + self.bitmask = bitmask + self.entries: list[HydroEnumEntry] = [] diff --git a/hydroparse/hydro_field.py b/hydroparse/hydro_field.py new file mode 100644 index 0000000..f24300a --- /dev/null +++ b/hydroparse/hydro_field.py @@ -0,0 +1,62 @@ +import re +from .type_lengths import TYPE_LENGTHS + +class HydroField: + def __init__( + self, + name, + type, + print_format, + xml, + description="", + enum="", + display="", + units="", + ): + self.name = name + self.name_upper = name.upper() + + self.description = description + self.array_length = 0 + self.enum = enum + self.display = display + self.units = units + self.print_format = print_format + self.wire_offset = 0 + self._parse_type(type) + + def _parse_type(self, type): + if self._type_is_array(type): + self._parse_array(type) + else: + self._parse_non_array(type) + + def _type_is_array(self, field_type): + return bool("[" in field_type or "]" in field_type) + + def _parse_array(self, field_type): + m = re.search(r"([a-zA-Z0-9]+?)\[(0-9+)\]", field_type) + if not m: + raise Exception(f"Could not parse type: '{field_type}'") + field_type = m.group(1) + length = m.group(2) + if field_type in TYPE_LENGTHS: + self.type = field_type + elif f"{field_type}_t" in TYPE_LENGTHS: + self.type = field_type + "_t" + else: + raise Exception(f"Unknown type: '{field_type}'") + self.type_length = TYPE_LENGTHS[self.type] + self.array_length = length + self.wire_length = self.array_length * self.type_length + + def _parse_non_array(self, field_type): + if field_type in TYPE_LENGTHS: + self.type = field_type + elif f"{field_type}_t" in TYPE_LENGTHS: + self.type = field_type + "_t" + else: + raise Exception(f"Unknown type: '{field_type}'") + self.type_length = TYPE_LENGTHS[self.type] + self.array_length = 0 + self.wire_length = self.type_length diff --git a/hydroparse/hydro_message_type.py b/hydroparse/hydro_message_type.py new file mode 100644 index 0000000..6bb9959 --- /dev/null +++ b/hydroparse/hydro_message_type.py @@ -0,0 +1,110 @@ +import operator +from .hydro_field import HydroField + + +class CrcXmodem: + def __init__(self): + self.crc = 0 + + def reset(self): + self.crc = 0 + + def accumulate(self, data: bytearray): + for b in data: + self.crc = self.crc ^ ((b << 8) & 0xFFFF) + for _ in range(8): + if self.crc & 0x8000: + self.crc = ((self.crc << 1) & 0xFFFF) ^ 0x1021 + else: + self.crc = (self.crc << 1) & 0xFFFF + + def accumulate_str(self, data: str): + try: + bytesarray = bytearray(data) + except TypeError: + bytesarray = bytearray(data.encode()) + self.accumulate(bytesarray) + + +class HydroMessageType: + def __init__(self, name, id, linenumber, description=""): + self.name = name + self.name_lower = name.lower() + self.linenumber = linenumber + self.id = int(id) + self.description = description + self.crc_extra = 0 + # field entries of the message definition + self.fields: list[HydroField] = [] + # for non array fields the length is 1. else the length of the array + self.fieldlengths = [] + # data type of the corresponding field + self.fieldtypes = [] + self.ordered_fieldtypes = [] + # names of the fields + self.fieldnames = [] + self.ordered_fieldnames = [] + # length of the data that gets actually transmitted + self.wire_length = 0 + self.field_offsets = {} + + def _update_crc_extra(self): + crc = CrcXmodem() + crc.accumulate_str(self.name + " ") + field: HydroField + for field in self.ordered_fields: + crc.accumulate_str(field.name + " ") + crc.accumulate_str(field.type + " ") + if field.array_length: + crc.accumulate([field.array_length]) + self.crc_extra = (crc.crc & 0xFF) ^ (crc.crc >> 8) + print(f"CRC_EXTRA: {self.crc_extra}") + + def _sort_fields(self): + # order fields by length (then by name) in descending order + self.ordered_fields: list[HydroField] = sorted( + self.fields, key=operator.attrgetter("type_length", "name"), reverse=True + ) + print(f"Ordered fields: {self.ordered_fields}") + + def add_field(self, field: HydroField): + print(f"Adding field: {field.name}") + self.fields.append(field) + + def _update_fieldnames(self): + self.fieldnames = [field.name for field in self.fields] + + def _update_fieldlengths(self): + self.fieldlengths = [] + for field in self.fields: + length = field.array_length + if not length: + self.fieldlengths.append(1) + elif length > 1 and field.type == "char": + # for strings keep one bye extra (null terminated) + self.fieldlengths.append(1) + else: + self.fieldlengths.append(length) + + def _update_fieldtypes(self): + self.fieldtypes = [field.type for field in self.fields] + + def _update_field_offsets(self): + self.field_offsets = {} + self.wire_length = 0 + self.ordered_fieldnames = [] + self.ordered_fieldtypes = [] + for i, field in enumerate(self.ordered_fields): + field.wire_offset = self.wire_length + self.field_offsets[field.name] = field.wire_offset + self.wire_length += field.wire_length + self.ordered_fieldnames.append(field.name) + self.ordered_fieldtypes.append(field.type) + + def update_all_field_properties(self): + self._update_fieldlengths() + self._update_fieldnames() + self._update_fieldtypes() + self._sort_fields() + self._update_field_offsets() + self._update_crc_extra() diff --git a/hydroparse/hydro_xml.py b/hydroparse/hydro_xml.py new file mode 100644 index 0000000..0caa75d --- /dev/null +++ b/hydroparse/hydro_xml.py @@ -0,0 +1,99 @@ +import xml.parsers.expat +import os +from .hydro_message_type import HydroMessageType +from .hydro_field import HydroField +from .hydro_enum import HydroEnum, HydroEnumEntry + + +class HydroXml: + def __init__(self, filename): + self.filename = filename + self.basename = os.path.basename(filename) + self.basenamer_upper = self.basename.upper() + self.messages: list[HydroMessageType] = [] + self.enums: list[HydroEnum] = [] + self.in_element_list = [] + with open(filename, "rb") as f: + self.parser = xml.parsers.expat.ParserCreate() + self.parser.StartElementHandler = self._start_element + self.parser.EndElementHandler = self._end_element + self.parser.CharacterDataHandler = self._element_text + self.parser.ParseFile(f) + self.message_lengths = {} + self.message_names = {} + self.largest_payload = 0 + + msg: HydroMessageType + for msg in self.messages: + print(msg.name) + msg.update_all_field_properties() + key = msg.id + self.message_lengths[key] = msg.wire_length + self.message_names[key] = msg.name + if msg.wire_length > self.largest_payload: + self.largest_payload = msg.wire_length + + def _check_attributes(self, attributes, check, where): + for c in check: + if c not in attributes: + raise Exception( + f"Expected missing {where} '{c}' attribute at {self.filename}:{self.parser.CurrentLineNumber}" + ) + + def _start_element(self, name, attributes): + self.in_element_list.append(name) + in_element = ".".join(self.in_element_list) + if in_element == "hydrolink.messages.message": + # make sure at least the attributes 'name' and 'id' are defined + self._check_attributes(attributes, ["name", "id"], "message") + self.messages.append( + HydroMessageType( + attributes["name"], attributes["id"], self.parser.CurrentLineNumber + ) + ) + elif in_element == "hydrolink.messages.message.field": + # makre sure at least the 'name' and 'type' attribute are set for a field + self._check_attributes(attributes, ["name", "type"], "field") + units = attributes.get("units", "") + if units: + units = f"[{units}]" + print_format = attributes.get("print_format", None) + new_field = HydroField( + name=attributes["name"], + type=attributes["type"], + print_format=print_format, + xml=self, + units=units, + ) + self.messages[-1].add_field(new_field) + elif in_element == "hydrolink.enums.enum": + # make sure that the name of the enum is defined + self._check_attributes(attributes, ['name'], 'enum') + bitmask = 'bitmask' in attributes and attributes['bitmask'] == 'true' + self.enums.append(HydroEnum(attributes['name'], self.parser.CurrentLineNumber, bitmask=bitmask)) + elif in_element == "hydrolink.enums.enum.entry": + if 'value' in attributes: + value = eval(attributes["value"]) + autovalue = False + else: + value = self.enums[-1].highest_value + 1 + autovalue = True + if (self.enums[-1].start_value is None or value < self.enums[-1].start_value): + self.enums[-1].start_value = value + if (value > self.enums[-1].highest_value): + self.enums[-1].highest_value = value + self.enums[-1].entries.append(HydroEnumEntry(attributes['name'], value, auto=autovalue)) + + def _end_element(self, name): + self.in_element_list.pop() + + def _element_text(self, text): + in_element = ".".join(self.in_element_list) + if in_element == "hydrolink.messages.message.description": + self.messages[-1].description += text + elif in_element == "hydrolink.messages.message.field": + self.messages[-1].fields[-1].description += text + elif in_element == "hydrolink.enums.enum.description": + self.enums[-1].description += text + elif in_element == "hydrolink.enums.enum.entry.description": + self.enums[-1].entries[-1].description += text diff --git a/hydroparse/type_lengths.py b/hydroparse/type_lengths.py new file mode 100644 index 0000000..3163ea7 --- /dev/null +++ b/hydroparse/type_lengths.py @@ -0,0 +1,13 @@ +TYPE_LENGTHS = { + "float": 4, + "double": 8, + "char": 1, + "int8_t": 1, + "uint8_t": 1, + "int16_t": 2, + "uint16_t": 2, + "int32_t": 4, + "uint32_t": 4, + "int64_t": 8, + "uint64_t": 8, +}