import os from hydroparse import HydroMessageType, HydroField, HydroEnum, HydroEnumEntry def to_camel_case(text: str): return "".join(x.capitalize() for x in text.lower().split("_")) def indent_block(text: str, level: int, indent_width=4): indent = " " * indent_width * level return text.replace("\n", "\n" + indent) def generate_hydrolink_msg(out_dir): in_dir = os.path.dirname(os.path.realpath(__file__)) infilepath = os.path.join(in_dir, "hydrolink_msg.py") outfilepath = os.path.join(out_dir, "hydrolink_msg.py") with open(infilepath, "r") as fin: with open(outfilepath, "w") as fout: fout.write(fin.read()) def generate_enums(out_dir, enums: list[HydroEnum]): with open(os.path.join(out_dir, "hydrolink_enums.py"), "w") as f: f.write("""from dataclasses import dataclass @dataclass(frozen=True) class EnumEntry: description: str value: int """) for enum in enums: f.write(f"""@dataclass(frozen=True) class {enum.name}: {indent_block("\n".join(f"{entry.name} = EnumEntry(value={int(entry.value)}, description='''{entry.description}''')" for entry in enum.entries), level=1)} """) def generate_hydrolink(out_dir, msgs): with open(os.path.join(out_dir, "hydro.py"), "w") as f: f.write(f"""import struct from .hydrolink_msg import HydrolinkHeader, HydrolinkMessage, CrcXmodem from .hydrolink_msg_ids import {", ".join(f"HYDROLINK_MSG_ID_{msg.name}" for msg in msgs)} {"\n".join(f"from .hydrolink_{msg.name_lower}_message import Hydrolink{to_camel_case(msg.name)}Message" for msg in msgs)} import hydrolink_enums as ENUMS MSG_ID_TO_CLASS_MAP = {{ {indent_block("\n".join(f"HYDROLINK_MSG_ID_{msg.name}: Hydrolink{to_camel_case(msg.name)}Message," for msg in msgs), 1)} }} crc_unpacker = struct.Struct("!H") class HydrolinkError(Exception): pass class HydrolinkCrcError(HydrolinkError): pass class HydrolinkMessageLengthError(HydrolinkError): pass def validate_message(msg_buffer): crc_length = 2 n_overhead = crc_length + HydrolinkHeader.length() n_buffer = len(msg_buffer) if n_buffer < n_overhead: err = f"Message buffer length ({{n_buffer}} < minimal packet size {{n_overhead}})" raise HydrolinkMessageLengthError(err) header = HydrolinkHeader.from_buffer(msg_buffer) msg_class = MSG_ID_TO_CLASS_MAP[header.msg_id] if n_buffer != n_overhead + header.payload_length: err = f"buffer length ({{n_buffer}} - {{n_overhead}} != {{header.payload_length}}) and declared payload length do not match!" raise HydrolinkMessageLengthError(err) crc_engine = CrcXmodem() print(f"Computing crc over {{[int(x) for x in msg_buffer[:-crc_length]]}}") crc_engine.accumulate(msg_buffer[:-crc_length]) crc_engine.accumulate(struct.pack("B", msg_class.crc_extra)) crc_computed = crc_engine.crc (crc_transmitted,) = crc_unpacker.unpack(msg_buffer[-crc_length:]) if crc_computed != crc_transmitted: err = f"Computed crc ({{crc_computed}} does not match transmitted one ({{crc_transmitted}}))" raise HydrolinkCrcError(err) return msg_class.from_buffer(msg_buffer) """) def generate_message_ids(out_dir, msgs: list[HydroMessageType]): with open(os.path.join(out_dir, "hydrolink_msg_ids.py"), "w") as f: for msg in msgs: f.write(f""" HYDROLINK_MSG_ID_{msg.name.upper()} = {msg.id}""") def generate_message(out_dir, msg: HydroMessageType): filename = f"hydrolink_{msg.name_lower}_message.py" with open(os.path.join(out_dir, filename), "w") as f: f.write(f""" from .hydrolink_msg import HydrolinkMessage, HydrolinkHeader import struct class Hydrolink{to_camel_case(msg.name_lower)}Message(HydrolinkMessage): id = {msg.id} name = "HYDROLINK_{msg.name}_MSG" fieldnames = {msg.fieldnames} ordered_fieldnames = {msg.ordered_fieldnames} crc_extra = {msg.crc_extra} unpacker = struct.Struct('{msg.fmtstr}') length = {msg.wire_length} def __init__(self, {", ".join(msg.ordered_fieldnames)}): super().__init__(msg_id={msg.id}, name=self.name) {indent_block("\n".join(f"self.{fieldname} = {fieldname}" for fieldname in msg.fieldnames), 2)} self._fieldnames = self.fieldnames def pack(self, src_id, dst_id): header = HydrolinkHeader(msg_id=self.id, dst_id=dst_id, src_id=src_id) packed_payload = self.unpacker.pack({", ".join("self." + field.name for field in msg.ordered_fields)}) return HydrolinkMessage.pack(self, header=header, crc_extra={msg.crc_extra}, payload=packed_payload) @classmethod def unpack_fields(cls, msg_buffer): crc_length = 2 return cls.unpacker.unpack(msg_buffer[HydrolinkHeader.length():-crc_length]) @classmethod def from_buffer(cls, msg_buffer): header = HydrolinkHeader.from_buffer(msg_buffer) return cls(*cls.unpack_fields(msg_buffer)) """) def format_str(field: HydroField): # maps field types to format strings according to the 'struct' module map = { "float": "f", "double": "d", "char": "c", "int8_t": "b", "uint8_t": "B", "int16_t": "h", "uint16_t": "H", "int32_t": "i", "uint32_t": "I", "int64_t": "q", "uint64_t": "Q", } if field.array_length > 0: if field.type == "char": return str(field.array_length) + "s" return str(field.array_length) + map[field.type] return map[field.type] def generate(xmls, out_dir): msgs = [] for x in xmls: msgs.extend(x.messages) generate_message_ids(out_dir, msgs) generate_hydrolink(out_dir, msgs) msg: HydroMessageType for msg in msgs: msg.fmtstr = "!" for field in msg.ordered_fields: msg.fmtstr += format_str(field) generate_message(out_dir, msg) generate_hydrolink_msg(out_dir) enums = [] for x in xmls: enums.extend(x.enums) generate_enums(out_dir, enums) # generate_protocol(xmls[0], out_dir) # generate_enums_h(out_dir, xmls[0].enums)