import xml.parsers.expat import os from .hydro_message_type import HydroMessageType from .hydro_field import HydroField from .hydro_enum import HydroEnum, HydroEnumEntry class HydroXml: def __init__(self, filename): self.filename = filename self.basename = os.path.basename(filename) self.basenamer_upper = self.basename.upper() self.messages: list[HydroMessageType] = [] self.enums: list[HydroEnum] = [] self.in_element_list = [] with open(filename, "rb") as f: self.parser = xml.parsers.expat.ParserCreate() self.parser.StartElementHandler = self._start_element self.parser.EndElementHandler = self._end_element self.parser.CharacterDataHandler = self._element_text self.parser.ParseFile(f) self.message_lengths = {} self.message_names = {} self.largest_payload = 0 msg: HydroMessageType for msg in self.messages: print(msg.name) msg.update_all_field_properties() key = msg.id self.message_lengths[key] = msg.wire_length self.message_names[key] = msg.name if msg.wire_length > self.largest_payload: self.largest_payload = msg.wire_length def _check_attributes(self, attributes, check, where): for c in check: if c not in attributes: raise Exception( f"Expected missing {where} '{c}' attribute at {self.filename}:{self.parser.CurrentLineNumber}" ) def _start_element(self, name, attributes): self.in_element_list.append(name) in_element = ".".join(self.in_element_list) if in_element == "hydrolink.messages.message": # make sure at least the attributes 'name' and 'id' are defined self._check_attributes(attributes, ["name", "id"], "message") self.messages.append( HydroMessageType( attributes["name"], attributes["id"], self.parser.CurrentLineNumber ) ) elif in_element == "hydrolink.messages.message.field": # makre sure at least the 'name' and 'type' attribute are set for a field self._check_attributes(attributes, ["name", "type"], "field") units = attributes.get("units", "") if units: units = f"[{units}]" print_format = attributes.get("print_format", None) new_field = HydroField( name=attributes["name"], type=attributes["type"], print_format=print_format, xml=self, units=units, ) self.messages[-1].add_field(new_field) elif in_element == "hydrolink.enums.enum": # make sure that the name of the enum is defined self._check_attributes(attributes, ['name'], 'enum') bitmask = 'bitmask' in attributes and attributes['bitmask'] == 'true' self.enums.append(HydroEnum(attributes['name'], self.parser.CurrentLineNumber, bitmask=bitmask)) elif in_element == "hydrolink.enums.enum.entry": if 'value' in attributes: value = eval(attributes["value"]) autovalue = False else: value = self.enums[-1].highest_value + 1 autovalue = True if (self.enums[-1].start_value is None or value < self.enums[-1].start_value): self.enums[-1].start_value = value if (value > self.enums[-1].highest_value): self.enums[-1].highest_value = value self.enums[-1].entries.append(HydroEnumEntry(attributes['name'], value, auto=autovalue)) def _end_element(self, name): self.in_element_list.pop() def _element_text(self, text): in_element = ".".join(self.in_element_list) if in_element == "hydrolink.messages.message.description": self.messages[-1].description += text elif in_element == "hydrolink.messages.message.field": self.messages[-1].fields[-1].description += text elif in_element == "hydrolink.enums.enum.description": self.enums[-1].description += text elif in_element == "hydrolink.enums.enum.entry.description": self.enums[-1].entries[-1].description += text