213 lines
6.8 KiB
Python
213 lines
6.8 KiB
Python
import os
|
|
from hydroparse import HydroMessageType, HydroField, HydroEnum, HydroEnumEntry
|
|
|
|
|
|
def to_camel_case(text: str):
|
|
return "".join(x.capitalize() for x in text.lower().split("_"))
|
|
|
|
|
|
def indent_block(text: str, level: int, indent_width=4):
|
|
indent = " " * indent_width * level
|
|
return text.replace("\n", "\n" + indent)
|
|
|
|
def generate_hydrolink_msg(out_dir):
|
|
in_dir = os.path.dirname(os.path.realpath(__file__))
|
|
infilepath = os.path.join(in_dir, "hydrolink_msg.py")
|
|
outfilepath = os.path.join(out_dir, "hydrolink_msg.py")
|
|
with open(infilepath, "r") as fin:
|
|
with open(outfilepath, "w") as fout:
|
|
fout.write(fin.read())
|
|
|
|
def generate_enums(out_dir, enums: list[HydroEnum]):
|
|
with open(os.path.join(out_dir, "hydrolink_enums.py"), "w") as f:
|
|
f.write("""from dataclasses import dataclass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class EnumEntry:
|
|
description: str
|
|
value: int
|
|
|
|
|
|
""")
|
|
for enum in enums:
|
|
f.write(f"""@dataclass(frozen=True)
|
|
class {enum.name}:
|
|
{indent_block("\n".join(f"{entry.name} = EnumEntry(value={int(entry.value)}, description='''{entry.description}''')" for entry in enum.entries), level=1)}
|
|
|
|
|
|
""")
|
|
|
|
|
|
def generate_hydrolink(out_dir, msgs):
|
|
with open(os.path.join(out_dir, "hydro.py"), "w") as f:
|
|
f.write(f"""import struct
|
|
from .hydrolink_msg import HydrolinkHeader, HydrolinkMessage, CrcXmodem
|
|
from .hydrolink_msg_ids import {", ".join(f"HYDROLINK_MSG_ID_{msg.name}" for msg in msgs)}
|
|
{"\n".join(f"from .hydrolink_{msg.name_lower}_message import Hydrolink{to_camel_case(msg.name)}Message" for msg in msgs)}
|
|
from . import hydrolink_enums as ENUMS
|
|
|
|
|
|
def cobs_decode(data):
|
|
output = []
|
|
index = 1
|
|
offset = data[0] - 1
|
|
while index < len(data):
|
|
if offset == 0:
|
|
output.append(0)
|
|
offset = data[index]
|
|
else:
|
|
output.append(data[index])
|
|
index = index + 1
|
|
offset = offset - 1
|
|
return bytearray(output)
|
|
|
|
|
|
def cobs_encode(data):
|
|
output = [0 for i in range(len(data) + 2)]
|
|
dst_index = 1
|
|
zero_offset = 1
|
|
for src_byte in data:
|
|
if src_byte == 0:
|
|
output[dst_index - zero_offset] = zero_offset
|
|
zero_offset = 1
|
|
else:
|
|
output[dst_index] = src_byte
|
|
zero_offset += 1
|
|
dst_index += 1
|
|
|
|
output[dst_index - zero_offset] = zero_offset
|
|
output[dst_index] = 0
|
|
|
|
return output
|
|
|
|
|
|
MSG_ID_TO_CLASS_MAP = {{
|
|
{indent_block("\n".join(f"HYDROLINK_MSG_ID_{msg.name}: Hydrolink{to_camel_case(msg.name)}Message," for msg in msgs), 1)}
|
|
}}
|
|
|
|
crc_unpacker = struct.Struct("!H")
|
|
|
|
class HydrolinkError(Exception):
|
|
pass
|
|
|
|
class HydrolinkCrcError(HydrolinkError):
|
|
pass
|
|
|
|
class HydrolinkMessageLengthError(HydrolinkError):
|
|
pass
|
|
|
|
def validate_message(msg_buffer):
|
|
crc_length = 2
|
|
n_overhead = crc_length + HydrolinkHeader.length()
|
|
n_buffer = len(msg_buffer)
|
|
if n_buffer < n_overhead:
|
|
err = f"Message buffer length ({{n_buffer}} < minimal packet size {{n_overhead}})"
|
|
raise HydrolinkMessageLengthError(err)
|
|
header = HydrolinkHeader.from_buffer(msg_buffer)
|
|
msg_class = MSG_ID_TO_CLASS_MAP[header.msg_id]
|
|
if n_buffer != n_overhead + header.payload_length:
|
|
err = f"buffer length ({{n_buffer}} - {{n_overhead}} != {{header.payload_length}}) and declared payload length do not match!"
|
|
raise HydrolinkMessageLengthError(err)
|
|
crc_engine = CrcXmodem()
|
|
print(f"Computing crc over {{[int(x) for x in msg_buffer[:-crc_length]]}}")
|
|
crc_engine.accumulate(msg_buffer[:-crc_length])
|
|
crc_engine.accumulate(struct.pack("B", msg_class.crc_extra))
|
|
crc_computed = crc_engine.crc
|
|
(crc_transmitted,) = crc_unpacker.unpack(msg_buffer[-crc_length:])
|
|
if crc_computed != crc_transmitted:
|
|
err = f"Computed crc ({{crc_computed}} does not match transmitted one ({{crc_transmitted}}))"
|
|
raise HydrolinkCrcError(err)
|
|
return msg_class.from_buffer(msg_buffer)
|
|
""")
|
|
|
|
|
|
|
|
def generate_message_ids(out_dir, msgs: list[HydroMessageType]):
|
|
with open(os.path.join(out_dir, "hydrolink_msg_ids.py"), "w") as f:
|
|
for msg in msgs:
|
|
f.write(f"""
|
|
HYDROLINK_MSG_ID_{msg.name.upper()} = {msg.id}""")
|
|
|
|
def generate_message(out_dir, msg: HydroMessageType):
|
|
filename = f"hydrolink_{msg.name_lower}_message.py"
|
|
with open(os.path.join(out_dir, filename), "w") as f:
|
|
f.write(f"""
|
|
from .hydrolink_msg import HydrolinkMessage, HydrolinkHeader
|
|
import struct
|
|
|
|
|
|
class Hydrolink{to_camel_case(msg.name_lower)}Message(HydrolinkMessage):
|
|
id = {msg.id}
|
|
name = "HYDROLINK_{msg.name}_MSG"
|
|
fieldnames = {msg.fieldnames}
|
|
ordered_fieldnames = {msg.ordered_fieldnames}
|
|
crc_extra = {msg.crc_extra}
|
|
unpacker = struct.Struct('{msg.fmtstr}')
|
|
length = {msg.wire_length}
|
|
|
|
def __init__(self, {", ".join(msg.ordered_fieldnames)}):
|
|
super().__init__(msg_id={msg.id}, name=self.name)
|
|
{indent_block("\n".join(f"self.{fieldname} = {fieldname}" for fieldname in msg.fieldnames), 2)}
|
|
self._fieldnames = self.fieldnames
|
|
|
|
def pack(self, src_id, dst_id):
|
|
header = HydrolinkHeader(msg_id=self.id, dst_id=dst_id, src_id=src_id)
|
|
packed_payload = self.unpacker.pack({", ".join("self." + field.name for field in msg.ordered_fields)})
|
|
return HydrolinkMessage.pack(self, header=header, crc_extra={msg.crc_extra}, payload=packed_payload)
|
|
|
|
@classmethod
|
|
def unpack_fields(cls, msg_buffer):
|
|
crc_length = 2
|
|
return cls.unpacker.unpack(msg_buffer[HydrolinkHeader.length():-crc_length])
|
|
|
|
@classmethod
|
|
def from_buffer(cls, msg_buffer):
|
|
header = HydrolinkHeader.from_buffer(msg_buffer)
|
|
return cls(*cls.unpack_fields(msg_buffer))
|
|
|
|
|
|
""")
|
|
|
|
|
|
def format_str(field: HydroField):
|
|
# maps field types to format strings according to the 'struct' module
|
|
map = {
|
|
"float": "f",
|
|
"double": "d",
|
|
"char": "c",
|
|
"int8_t": "b",
|
|
"uint8_t": "B",
|
|
"int16_t": "h",
|
|
"uint16_t": "H",
|
|
"int32_t": "i",
|
|
"uint32_t": "I",
|
|
"int64_t": "q",
|
|
"uint64_t": "Q",
|
|
}
|
|
if field.array_length > 0:
|
|
if field.type == "char":
|
|
return str(field.array_length) + "s"
|
|
return str(field.array_length) + map[field.type]
|
|
return map[field.type]
|
|
|
|
|
|
def generate(xmls, out_dir):
|
|
msgs = []
|
|
for x in xmls:
|
|
msgs.extend(x.messages)
|
|
generate_message_ids(out_dir, msgs)
|
|
generate_hydrolink(out_dir, msgs)
|
|
msg: HydroMessageType
|
|
for msg in msgs:
|
|
msg.fmtstr = "!"
|
|
for field in msg.ordered_fields:
|
|
msg.fmtstr += format_str(field)
|
|
generate_message(out_dir, msg)
|
|
generate_hydrolink_msg(out_dir)
|
|
enums = []
|
|
for x in xmls:
|
|
enums.extend(x.enums)
|
|
generate_enums(out_dir, enums)
|
|
# generate_protocol(xmls[0], out_dir)
|
|
# generate_enums_h(out_dir, xmls[0].enums)
|