hydrolink/gen_python/hydrolink_gen_python.py

213 lines
6.8 KiB
Python

import os
from hydroparse import HydroMessageType, HydroField, HydroEnum, HydroEnumEntry
def to_camel_case(text: str):
return "".join(x.capitalize() for x in text.lower().split("_"))
def indent_block(text: str, level: int, indent_width=4):
indent = " " * indent_width * level
return text.replace("\n", "\n" + indent)
def generate_hydrolink_msg(out_dir):
in_dir = os.path.dirname(os.path.realpath(__file__))
infilepath = os.path.join(in_dir, "hydrolink_msg.py")
outfilepath = os.path.join(out_dir, "hydrolink_msg.py")
with open(infilepath, "r") as fin:
with open(outfilepath, "w") as fout:
fout.write(fin.read())
def generate_enums(out_dir, enums: list[HydroEnum]):
with open(os.path.join(out_dir, "hydrolink_enums.py"), "w") as f:
f.write("""from dataclasses import dataclass
@dataclass(frozen=True)
class EnumEntry:
description: str
value: int
""")
for enum in enums:
f.write(f"""@dataclass(frozen=True)
class {enum.name}:
{indent_block("\n".join(f"{entry.name} = EnumEntry(value={int(entry.value)}, description='''{entry.description}''')" for entry in enum.entries), level=1)}
""")
def generate_hydrolink(out_dir, msgs):
with open(os.path.join(out_dir, "hydro.py"), "w") as f:
f.write(f"""import struct
from .hydrolink_msg import HydrolinkHeader, HydrolinkMessage, CrcXmodem
from .hydrolink_msg_ids import {", ".join(f"HYDROLINK_MSG_ID_{msg.name}" for msg in msgs)}
{"\n".join(f"from .hydrolink_{msg.name_lower}_message import Hydrolink{to_camel_case(msg.name)}Message" for msg in msgs)}
from . import hydrolink_enums as ENUMS
def cobs_decode(data):
output = []
index = 1
offset = data[0] - 1
while index < len(data):
if offset == 0:
output.append(0)
offset = data[index]
else:
output.append(data[index])
index = index + 1
offset = offset - 1
return bytearray(output)
def cobs_encode(data):
output = [0 for i in range(len(data) + 2)]
dst_index = 1
zero_offset = 1
for src_byte in data:
if src_byte == 0:
output[dst_index - zero_offset] = zero_offset
zero_offset = 1
else:
output[dst_index] = src_byte
zero_offset += 1
dst_index += 1
output[dst_index - zero_offset] = zero_offset
output[dst_index] = 0
return output
MSG_ID_TO_CLASS_MAP = {{
{indent_block("\n".join(f"HYDROLINK_MSG_ID_{msg.name}: Hydrolink{to_camel_case(msg.name)}Message," for msg in msgs), 1)}
}}
crc_unpacker = struct.Struct("!H")
class HydrolinkError(Exception):
pass
class HydrolinkCrcError(HydrolinkError):
pass
class HydrolinkMessageLengthError(HydrolinkError):
pass
def validate_message(msg_buffer):
crc_length = 2
n_overhead = crc_length + HydrolinkHeader.length()
n_buffer = len(msg_buffer)
if n_buffer < n_overhead:
err = f"Message buffer length ({{n_buffer}} < minimal packet size {{n_overhead}})"
raise HydrolinkMessageLengthError(err)
header = HydrolinkHeader.from_buffer(msg_buffer)
msg_class = MSG_ID_TO_CLASS_MAP[header.msg_id]
if n_buffer != n_overhead + header.payload_length:
err = f"buffer length ({{n_buffer}} - {{n_overhead}} != {{header.payload_length}}) and declared payload length do not match!"
raise HydrolinkMessageLengthError(err)
crc_engine = CrcXmodem()
print(f"Computing crc over {{[int(x) for x in msg_buffer[:-crc_length]]}}")
crc_engine.accumulate(msg_buffer[:-crc_length])
crc_engine.accumulate(struct.pack("B", msg_class.crc_extra))
crc_computed = crc_engine.crc
(crc_transmitted,) = crc_unpacker.unpack(msg_buffer[-crc_length:])
if crc_computed != crc_transmitted:
err = f"Computed crc ({{crc_computed}} does not match transmitted one ({{crc_transmitted}}))"
raise HydrolinkCrcError(err)
return msg_class.from_buffer(msg_buffer)
""")
def generate_message_ids(out_dir, msgs: list[HydroMessageType]):
with open(os.path.join(out_dir, "hydrolink_msg_ids.py"), "w") as f:
for msg in msgs:
f.write(f"""
HYDROLINK_MSG_ID_{msg.name.upper()} = {msg.id}""")
def generate_message(out_dir, msg: HydroMessageType):
filename = f"hydrolink_{msg.name_lower}_message.py"
with open(os.path.join(out_dir, filename), "w") as f:
f.write(f"""
from .hydrolink_msg import HydrolinkMessage, HydrolinkHeader
import struct
class Hydrolink{to_camel_case(msg.name_lower)}Message(HydrolinkMessage):
id = {msg.id}
name = "HYDROLINK_{msg.name}_MSG"
fieldnames = {msg.fieldnames}
ordered_fieldnames = {msg.ordered_fieldnames}
crc_extra = {msg.crc_extra}
unpacker = struct.Struct('{msg.fmtstr}')
length = {msg.wire_length}
def __init__(self, {", ".join(msg.ordered_fieldnames)}):
super().__init__(msg_id={msg.id}, name=self.name)
{indent_block("\n".join(f"self.{fieldname} = {fieldname}" for fieldname in msg.fieldnames), 2)}
self._fieldnames = self.fieldnames
def pack(self, src_id, dst_id):
header = HydrolinkHeader(msg_id=self.id, dst_id=dst_id, src_id=src_id)
packed_payload = self.unpacker.pack({", ".join("self." + field.name for field in msg.ordered_fields)})
return HydrolinkMessage.pack(self, header=header, crc_extra={msg.crc_extra}, payload=packed_payload)
@classmethod
def unpack_fields(cls, msg_buffer):
crc_length = 2
return cls.unpacker.unpack(msg_buffer[HydrolinkHeader.length():-crc_length])
@classmethod
def from_buffer(cls, msg_buffer):
header = HydrolinkHeader.from_buffer(msg_buffer)
return cls(*cls.unpack_fields(msg_buffer))
""")
def format_str(field: HydroField):
# maps field types to format strings according to the 'struct' module
map = {
"float": "f",
"double": "d",
"char": "c",
"int8_t": "b",
"uint8_t": "B",
"int16_t": "h",
"uint16_t": "H",
"int32_t": "i",
"uint32_t": "I",
"int64_t": "q",
"uint64_t": "Q",
}
if field.array_length > 0:
if field.type == "char":
return str(field.array_length) + "s"
return str(field.array_length) + map[field.type]
return map[field.type]
def generate(xmls, out_dir):
msgs = []
for x in xmls:
msgs.extend(x.messages)
generate_message_ids(out_dir, msgs)
generate_hydrolink(out_dir, msgs)
msg: HydroMessageType
for msg in msgs:
msg.fmtstr = "!"
for field in msg.ordered_fields:
msg.fmtstr += format_str(field)
generate_message(out_dir, msg)
generate_hydrolink_msg(out_dir)
enums = []
for x in xmls:
enums.extend(x.enums)
generate_enums(out_dir, enums)
# generate_protocol(xmls[0], out_dir)
# generate_enums_h(out_dir, xmls[0].enums)