Source code for kappa_sdk.kw.model.model_parser

import io
from xml.etree.ElementTree import ElementTree, Element, fromstring, register_namespace
from typing import Optional, Any, Dict, List, cast, Union
from .model_parameter import ModelParameter
from ..parser_exception import ParserException
from .improve_target import ImproveTarget


[docs] class ModelParser: """ Performs various operations with XML representation of KW models """ __ns = {'AA': 'KWKA_XML_Export', 'KWKAModel': 'KWKA_XML_Model', 'KWKA_XML_NLR': "KWKA_XML_NLR", "KWKANUMWELLPAR": "KWKA_NUM_WELL_PAR"} def __init__(self, xml_string: str) -> None: register_namespace('AA', 'KWKA_XML_Export') register_namespace('KWKAModel', 'KWKA_XML_Model') register_namespace('AB', 'http://www.w3.org/2001/XMLSchema-instance') register_namespace('KWKA_XML_NLR', "KWKA_XML_NLR") register_namespace("KWKANUMWELLPAR", "KWKA_NUM_WELL_PAR") register_namespace('ns', 'KWKA_XML_Model') self.__tree = ElementTree(fromstring(xml_string)) self.__root = self.__tree.getroot() def __find_parameter_in_element(self, element: Element, namespace_prefix: str, parameter_name: str) -> Element: parameter = element.find(f"{namespace_prefix}:{parameter_name}", self.__ns) if parameter is None: raise ParserException(f"Cannot find {self.__ns[namespace_prefix]}:{parameter_name} in the xml document") return parameter def __get_model_element(self, analysis_id: str) -> Element: models_container_element = self.__find_parameter_in_element(self.__root, 'AA', 'Models') for model_element in models_container_element.findall('AA:ModelPerProject', self.__ns): model_element_find = self.__find_parameter_in_element(model_element, 'KWKAModel', 'AnalysisID') if model_element_find.text == analysis_id: try: model_element = self.__find_parameter_in_element(model_element, 'KWKAModel', "NumericalModel") self.__is_numerical = True except ParserException: self.__is_numerical = False return model_element else: raise ParserException('Document does not contain model for analysis <{}>'.format(analysis_id)) def __get_model_definition_element(self, analysis_id: str, definition_category_name: str, parameter_name: str, sub_parameter_name: Optional[str] = None) -> \ Optional[Element]: model_element = self.__get_model_element(analysis_id) for parameter_element in model_element.findall('KWKAModel:{}'.format(definition_category_name), self.__ns): if sub_parameter_name is None: return parameter_element.findall('KWKAModel:{}'.format(parameter_name), self.__ns)[0] else: for subparameter_element in parameter_element.findall('KWKAModel:{}'.format(parameter_name), self.__ns): try: return subparameter_element.findall('KWKAModel:{}'.format(sub_parameter_name), self.__ns)[0] except IndexError: return subparameter_element.findall('KWKA_XML_NLR:{}'.format(sub_parameter_name), self.__ns)[0] else: raise ParserException("Cannot find the model definition in the XML representation of the KW model") def __get_parameter_element(self, model_element: Element, parameter_name: str, conditions: Optional[Dict[Any, Any]]) -> \ Optional[Element]: matching_parameter_elements = list() parameter_elements = model_element.findall('KWKAModel:NumModelParameters', self.__ns) if self.__is_numerical \ else model_element.findall('KWKAModel:ModelParameter', self.__ns) for parameter_element in parameter_elements: type_attr = parameter_element.attrib['{http://www.w3.org/2001/XMLSchema-instance}type'] if type_attr == parameter_name or (':' in type_attr and type_attr.split(':')[1] == parameter_name): matching_parameter_elements.append(parameter_element) if len(matching_parameter_elements) == 0: return None if conditions is None: return matching_parameter_elements[0].find('KWKAModel:Value', self.__ns) else: for parameter_element in matching_parameter_elements: matching_conditions = False for attribute_name, attribute_value in conditions.items(): sub_element = parameter_element.find('KWKAModel:{}'.format(attribute_name), self.__ns) if sub_element is not None: if sub_element.text == attribute_value: matching_conditions = True else: matching_conditions = False break if matching_conditions: return parameter_element return None def __get_parameter_value_element(self, model_element: Element, parameter_name: str, conditions: Optional[Dict[Any, Any]]) -> Element: parameter_element = self.__get_parameter_element(model_element, parameter_name, conditions) if parameter_element is not None: return self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'Value') raise ParserException('Model does not contain parameter <{}> that matches given conditions'.format(parameter_name)) def get_parameter_value(self, analysis_id: str, xml_parameter_locator: ModelParameter) -> str: model_element = self.__get_model_element(analysis_id) parameter_element = self.__get_parameter_value_element(model_element, xml_parameter_locator.parameter_type, xml_parameter_locator.conditions) return str(parameter_element.text) def set_parameter_value(self, analysis_id: str, xml_parameter_locator: ModelParameter, value: str) -> None: model_element = self.__get_model_element(analysis_id) parameter_element = self.__get_parameter_value_element(model_element, xml_parameter_locator.parameter_type, xml_parameter_locator.conditions) parameter_element.text = str(value) def add_model_description_element(self, analysis_id: str, parameter_name: str, sub_parameter_name: str, parameter_value: str, position: int = 0) -> None: parameter_element = self.__get_model_definition_element(analysis_id, 'ModelDescription', parameter_name) element = Element(sub_parameter_name) element.text = parameter_value if parameter_element is None: raise ParserException( 'Model does not contain "ModelDescription" with parameter <{}> with sub-parameter <{}>'.format(parameter_name, sub_parameter_name)) parameter_element.insert(int(position), element) def add_improve_parameter(self, analysis_id: str, parameter_name: str, parameter_value: str, position: int = 0) -> None: model_element = self.__get_model_element(analysis_id) parameter_element = model_element.findall('KWKAModel:ImproveParameters', self.__ns)[0] element = Element(parameter_name) if element is None: raise ParserException('Model does not contain "KWKAModel:ImproveParameters" with parameter <{}>'.format(parameter_name)) element.text = parameter_value parameter_element.insert(int(position), element) def get_model_description_value(self, analysis_id: str, parameter_name: str, sub_parameter_name: Optional[str] = None) -> str: parameter_element = self.__get_model_definition_element(analysis_id, 'ModelDescription', parameter_name, sub_parameter_name) if parameter_element is None: raise ParserException('Model does not contain "ModelDescription" with parameter <{}>'.format(parameter_name)) return str(parameter_element.text) def set_model_description_value(self, analysis_id: str, parameter_name: str, sub_parameter_name: Optional[str], value: str) -> None: parameter_element = self.__get_model_definition_element(analysis_id, 'ModelDescription', parameter_name, sub_parameter_name) if parameter_element is None: raise ParserException( 'Model does not contain "ModelDescription" with parameter <{}> with sub-parameter <{}>'.format(parameter_name, sub_parameter_name)) parameter_element.text = str(value) def get_improve_parameter_value(self, analysis_id: str, parameter_name: str, sub_parameter_name: Optional[str] = None) -> Optional[str]: parameter_element = self.__get_model_definition_element(analysis_id, 'ImproveParameters', parameter_name, sub_parameter_name) if parameter_element is None: raise ParserException( 'Model does not contain "ImproveParameters" with parameter <{}> with sub-parameter <{}>'.format(parameter_name, sub_parameter_name)) return str(parameter_element.text) def set_improve_parameter_value(self, analysis_id: str, parameter_name: str, sub_parameter_name: str, value: Union[bool, str]) -> None: parameter_element = self.__get_model_definition_element(analysis_id, 'ImproveParameters', parameter_name, sub_parameter_name) if parameter_element is None: raise ParserException( 'Model does not contain "ImproveParameters" with parameter <{}> with sub-parameter <{}>'.format(parameter_name, sub_parameter_name)) parameter_element.text = str(value).lower() def set_model_parameter_improve_value(self, analysis_id: str, parameter: ModelParameter, improve_parameter_name: str, value: str) -> None: model_element = self.__get_model_element(analysis_id) parameter_element = self.__get_parameter_element(model_element, parameter.parameter_type, parameter.conditions) if parameter_element is None: raise ParserException(f"Cannot find the parameter {parameter.parameter_type} with conditions {parameter.conditions} in the model") improve_element = self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'Improve') improve_parameter_element = self.__find_parameter_in_element(improve_element, "KWKAModel", f"{improve_parameter_name}") improve_parameter_element.text = str(value).lower() def get_improve_parameter_targets(self, analysis_id: str) -> List[ImproveTarget]: improve_targets_elements = self.__get_improve_parameter_target_elements(analysis_id) improve_target_list = list() for parameter_element in improve_targets_elements: target_type_value = cast(str, self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'Type').text) is_selected_value = bool(self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'IsSelected').text) global_weight_value = cast(float, self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'GlobalWeight').text) improve_target_list.append(ImproveTarget(target_type_value, global_weight_value, is_selected_value)) return improve_target_list def set_improve_parameter_targets(self, analysis_id: str, improve_parameter_targets: List[ImproveTarget]) -> None: improve_targets_elements = self.__get_improve_parameter_target_elements(analysis_id) for target in improve_parameter_targets: for parameter_element in improve_targets_elements: type_element = self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'Type') if target.target_type == type_element.text: self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'IsSelected').text = str(target.is_selected).lower() self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'GlobalWeight').text = str(target.global_weight) break else: raise ValueError(f"ImproveTargets Type {target.target_type} not found in the xml document") def set_improve_dt_start(self, analysis_id: str, dt_start: float) -> None: model_element = self.__get_model_element(analysis_id) improve_parameters_element = self.__find_parameter_in_element(model_element, 'KWKAModel', 'ImproveParameters') improve_parameter_element = self.__find_parameter_in_element(improve_parameters_element, "KWKAModel", "ElapsedDtStart") improve_parameter_element.text = str(dt_start).lower() def __get_improve_parameter_target_elements(self, analysis_id: str) -> List[Element]: model_element = self.__get_model_element(analysis_id) improve_parameter_element = self.__find_parameter_in_element(model_element, 'KWKAModel', 'ImproveParameters') return improve_parameter_element.findall('KWKAModel:{}'.format('ImproveTargets'), self.__ns) def export(self) -> str: stream = io.StringIO() self.__tree.write(stream, encoding='unicode', method='xml', xml_declaration=True) return stream.getvalue() def include_or_exclude_parameter_for_improve(self, analysis_id: str, parameter_name: str, is_included: bool, conditions: Optional[Dict[Any, Any]]) -> None: model_element = self.__get_model_element(analysis_id) parameter_element = self.__get_parameter_element(model_element, parameter_name, conditions) if parameter_element is not None: improve_el = self.__find_parameter_in_element(parameter_element, 'KWKAModel', 'Improve') include_el = self.__find_parameter_in_element(improve_el, 'KWKAModel', 'Include') include_el.text = "true" if is_included else "false" def remove_analysis(self, analysis_id: str) -> None: models_container_element = self.__find_parameter_in_element(self.__root, 'AA', 'Models') for model_element in models_container_element.findall('AA:ModelPerProject', self.__ns): model_element_find = self.__find_parameter_in_element(model_element, 'KWKAModel', 'AnalysisID') if model_element_find.text == analysis_id: models_container_element.remove(model_element) break def get_metadata(self, analysis_id: str) -> Dict[str, str]: models_container_element = self.__find_parameter_in_element(self.__root, 'AA', 'Models') for model_element in models_container_element.findall('AA:ModelPerProject', self.__ns): model_element_find = self.__find_parameter_in_element(model_element, 'KWKAModel', 'AnalysisID') if model_element_find.text == analysis_id: break else: raise ParserException('Document does not contain model for analysis <{}>'.format(analysis_id)) metadata_dict: Dict[str, str] = dict() metadata_element = self.__find_parameter_in_element(model_element, 'KWKAModel', 'ModelMetaData') for metadata in metadata_element: child_id = None child_name = None for metadata_parameter in metadata: if "id" in metadata_parameter.tag.lower(): child_id = str(metadata_parameter.text) elif "name" in metadata_parameter.tag.lower(): child_name = str(metadata_parameter.text) if child_id is not None and child_name is not None: metadata_dict[child_name] = child_id return metadata_dict def remove_model_description(self, analyses_id: List[str]) -> None: for analysis_id in analyses_id: model_element = self.__get_model_element(analysis_id) model_description = "NumModelDescription" if self.__is_numerical else 'ModelDescription' model_description_element = self.__find_parameter_in_element(model_element, 'KWKAModel', model_description) model_element.remove(model_description_element)