99 lines
4.3 KiB
Python
99 lines
4.3 KiB
Python
import xml.parsers.expat
|
|
import os
|
|
from .hydro_message_type import HydroMessageType
|
|
from .hydro_field import HydroField
|
|
from .hydro_enum import HydroEnum, HydroEnumEntry
|
|
|
|
|
|
class HydroXml:
|
|
def __init__(self, filename):
|
|
self.filename = filename
|
|
self.basename = os.path.basename(filename)
|
|
self.basenamer_upper = self.basename.upper()
|
|
self.messages: list[HydroMessageType] = []
|
|
self.enums: list[HydroEnum] = []
|
|
self.in_element_list = []
|
|
with open(filename, "rb") as f:
|
|
self.parser = xml.parsers.expat.ParserCreate()
|
|
self.parser.StartElementHandler = self._start_element
|
|
self.parser.EndElementHandler = self._end_element
|
|
self.parser.CharacterDataHandler = self._element_text
|
|
self.parser.ParseFile(f)
|
|
self.message_lengths = {}
|
|
self.message_names = {}
|
|
self.largest_payload = 0
|
|
|
|
msg: HydroMessageType
|
|
for msg in self.messages:
|
|
print(msg.name)
|
|
msg.update_all_field_properties()
|
|
key = msg.id
|
|
self.message_lengths[key] = msg.wire_length
|
|
self.message_names[key] = msg.name
|
|
if msg.wire_length > self.largest_payload:
|
|
self.largest_payload = msg.wire_length
|
|
|
|
def _check_attributes(self, attributes, check, where):
|
|
for c in check:
|
|
if c not in attributes:
|
|
raise Exception(
|
|
f"Expected missing {where} '{c}' attribute at {self.filename}:{self.parser.CurrentLineNumber}"
|
|
)
|
|
|
|
def _start_element(self, name, attributes):
|
|
self.in_element_list.append(name)
|
|
in_element = ".".join(self.in_element_list)
|
|
if in_element == "hydrolink.messages.message":
|
|
# make sure at least the attributes 'name' and 'id' are defined
|
|
self._check_attributes(attributes, ["name", "id"], "message")
|
|
self.messages.append(
|
|
HydroMessageType(
|
|
attributes["name"], attributes["id"], self.parser.CurrentLineNumber
|
|
)
|
|
)
|
|
elif in_element == "hydrolink.messages.message.field":
|
|
# makre sure at least the 'name' and 'type' attribute are set for a field
|
|
self._check_attributes(attributes, ["name", "type"], "field")
|
|
units = attributes.get("units", "")
|
|
if units:
|
|
units = f"[{units}]"
|
|
print_format = attributes.get("print_format", None)
|
|
new_field = HydroField(
|
|
name=attributes["name"],
|
|
type=attributes["type"],
|
|
print_format=print_format,
|
|
xml=self,
|
|
units=units,
|
|
)
|
|
self.messages[-1].add_field(new_field)
|
|
elif in_element == "hydrolink.enums.enum":
|
|
# make sure that the name of the enum is defined
|
|
self._check_attributes(attributes, ['name'], 'enum')
|
|
bitmask = 'bitmask' in attributes and attributes['bitmask'] == 'true'
|
|
self.enums.append(HydroEnum(attributes['name'], self.parser.CurrentLineNumber, bitmask=bitmask))
|
|
elif in_element == "hydrolink.enums.enum.entry":
|
|
if 'value' in attributes:
|
|
value = eval(attributes["value"])
|
|
autovalue = False
|
|
else:
|
|
value = self.enums[-1].highest_value + 1
|
|
autovalue = True
|
|
if (self.enums[-1].start_value is None or value < self.enums[-1].start_value):
|
|
self.enums[-1].start_value = value
|
|
if (value > self.enums[-1].highest_value):
|
|
self.enums[-1].highest_value = value
|
|
self.enums[-1].entries.append(HydroEnumEntry(attributes['name'], value, auto=autovalue))
|
|
|
|
def _end_element(self, name):
|
|
self.in_element_list.pop()
|
|
|
|
def _element_text(self, text):
|
|
in_element = ".".join(self.in_element_list)
|
|
if in_element == "hydrolink.messages.message.description":
|
|
self.messages[-1].description += text
|
|
elif in_element == "hydrolink.messages.message.field":
|
|
self.messages[-1].fields[-1].description += text
|
|
elif in_element == "hydrolink.enums.enum.description":
|
|
self.enums[-1].description += text
|
|
elif in_element == "hydrolink.enums.enum.entry.description":
|
|
self.enums[-1].entries[-1].description += text
|