Source code for mivot_validator.instance_checking.instance_snippet_builder

"""
Created on 21 Apr 2023

use the snippet_builder to build a concrete MIVOT view of
the class model_name:class_name of the model
serialized in provided generic MIVOT snippet

@author: julien abid
"""

import os

from lxml import etree

from mivot_validator.utils.dmtype_utils import DmtypeUtils
from mivot_validator.instance_checking.snippet_builder import Builder
from mivot_validator.utils.xml_utils import XmlUtils
from mivot_validator.instance_checking.instance_checker import InstanceChecker


[docs] class BColors: """ Color codes for terminal output """ GRAY = "\033[37m" OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" RED = "\033[31m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m"
[docs] def setup_elements(graph, dmtype, abstract_list): """ Sets up the elements of the graph """ res = graph[dmtype] for k, v in graph.items(): for i in range(len(v)): if k != dmtype and v[i] in res: res[res.index(v[i])] = [v[i], f"{k}/{v[i]}"] for el in res: if isinstance(el, list): if el[0] in abstract_list: res.remove(el) if el in abstract_list or el == "mango:AssociatedMangoInstance": res.remove(el) return res
[docs] def add_value(dict_obj, key, value): """ Adds a key-value pair to the dictionary. If the key already exists in the dictionary, it will associate multiple values with that key instead of overwritting its value """ if key not in dict_obj: dict_obj[key] = value elif isinstance(dict_obj[key], list): dict_obj[key].append(value) else: dict_obj[key] = [dict_obj[key], value]
[docs] def remove_value(dict_obj, key, value): """ Removes a value from a key that has multiple values associated with it. If the key only has one value associated with it, it will remove the key from the dictionary """ if key not in dict_obj: return if isinstance(dict_obj[key], list): dict_obj[key].remove(value) if len(dict_obj[key]) == 1: dict_obj[key] = dict_obj[key][0] if len(dict_obj[key]) == 0: del dict_obj[key] else: del dict_obj[key]
[docs] class InstanceSnippetBuilder: """ Build a concrete MIVOT view of the class model_name:class_name of the model serialized in provided generic MIVOT snippet """ def __init__(self, vodmlid, output_name, session, concrete_list=None): """ :vodmlid: id of the class from which the concrete instance will be built :output_name: file name of the generated mivot instance :concrete_list: list of abstract classes [{"dmtype", "dmrole", "context", "class"}, ...] """ self.vodmlid = vodmlid self.xml_file = None self.output_dir = session.tmp_data_path self.output_name = output_name self.buffer = "" self.build_file = self.xml_file self.added_model = [] self.dmrole = None self.dmroles = {} self.dmtype = None self.concrete_list = concrete_list self.inheritance_graph = {} self.abstract_classes = [] self.collections = [] self.mapping_block = None self.session = session self._build_reference_snippet() def _build_reference_snippet(self): """ Extract the requested instance from the model no concrete class here, it will be used as basis for building the output """ model, dmtype = DmtypeUtils.split_dmtype(self.vodmlid) generic = Builder(model, dmtype, self.session) generic.build() self.xml_file = generic.outputname self.build_file = self.xml_file
[docs] def build(self): """ Build the concrete MIVOT snippet """ self.mapping_block = XmlUtils.xmltree_from_file(self.build_file) if not os.path.exists("../tmp_snippets/temp"): os.makedirs("../tmp_snippets/temp") if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) parent_key = None open_count = 0 actual_collection = None lines = [] with open(self.build_file, "r", encoding="utf-8") as file: for line in file: if "left blank" not in line: self.buffer += line if "<COLLECTION" in line: self.collections.append(self.get_dm_role(line)) actual_collection = ( self.collections[-1] if len(self.collections) > 0 else None ) if "</COLLECTION" in line: self.collections.pop() actual_collection = ( self.collections[-1] if len(self.collections) > 0 else None ) if "</INSTANCE" in line: open_count -= 1 if open_count == 0: parent_key = None if "<INSTANCE" in line: if "/>" not in line: open_count += 1 if parent_key is None: parent_key = self.get_dm_type(line) if self.get_dm_type(line).split(":")[0] not in self.added_model: self.inheritance_graph.update( **InstanceChecker._build_inheritence_graph( self.session.get_vodml( self.get_dm_type(line).split(":")[0] ) ) ) self.add_abstract_classes(self.get_dm_type(line).split(":")[0]) self.added_model.append(self.get_dm_type(line).split(":")[0]) if self.get_dm_type(line) in self.abstract_classes: previous_line = lines[-1] if len(lines) > 0 else "" if "<COLLECTION" in previous_line: if ( actual_collection != "mango:Property.associatedProperties" and actual_collection == self.collections[-1] ): instance_count = 0 self.dmrole = self.get_dm_role(line) self.dmtype = self.get_dm_type(line) while self.ask_for_collection( self.collections[-1], instance_count, parent_key ): instance_count += 1 print( f"{BColors.OKCYAN}{BColors.UNDERLINE}" f"List of possible concrete classes :{BColors.ENDC}" ) print( f"{BColors.OKCYAN}DMTYPE: {BColors.BOLD}" f"{self.get_dm_type(line)}{BColors.ENDC}" ) print( f"{BColors.OKCYAN}CONTEXT: {BColors.BOLD}" f"{parent_key}{BColors.ENDC}" ) print( f"{BColors.OKCYAN}DMROLE: {BColors.BOLD}" f"{self.get_dm_role(line)}{BColors.ENDC}" ) choice = self.populate_choices( self.inheritance_graph, parent_key, ) file = None if choice != "None": add_value(self.dmroles, choice, "") file = self.get_instance( choice.split(":")[0], choice.split(":")[1] ) if file is not None: self.buffer = self.buffer.replace(line, "") self.build_file = file self.build() self.build_file = self.xml_file elif ( actual_collection == "mango:Property.associatedProperties" ): self.buffer = self.buffer.replace( line, "<!-- PUT HERE REFERENCES TO OTHER INSTANCE YOU " "WANT TO " "ASSOCIATE OR REMOVE THIS COLLECTION -->\n", ) else: self.dmrole = self.get_dm_role(line) self.dmtype = self.get_dm_type(line) print( f"{BColors.OKCYAN}{BColors.UNDERLINE}" f"List of possible concrete classes :{BColors.ENDC}" ) print( f"{BColors.OKCYAN}DMTYPE: {BColors.BOLD}" f"{self.get_dm_type(line)}{BColors.ENDC}" ) print( f"{BColors.OKCYAN}CONTEXT: {BColors.BOLD}" f"{parent_key}{BColors.ENDC}" ) print( f"{BColors.OKCYAN}DMROLE: {BColors.BOLD}" f"{self.dmrole}{BColors.ENDC}" ) choice = self.populate_choices( self.inheritance_graph, parent_key, ) file = None if choice != "None": if self.dmrole is not None: add_value(self.dmroles, choice, self.dmrole) else: add_value(self.dmroles, choice, "") file = self.get_instance( choice.split(":")[0], choice.split(":")[1] ) else: self.buffer = self.buffer.replace(line, "") if file is not None: self.buffer = self.buffer.replace(line, "") self.build_file = file self.build() self.build_file = self.xml_file if "left blank" not in line: lines.append(line)
[docs] def output_result(self): """ Write the concrete MIVOT snippet in the output directory """ if not self.output_name.endswith(".xml"): self.output_name += ".xml" output_file = os.path.join(self.output_dir, os.path.basename(self.output_name)) result = etree.fromstring(self.buffer) XmlUtils.xmltree_to_file(result, output_file) self.clean(output_file) self.insert_dm_roles(output_file, self.dmroles) if os.path.exists("../tmp_snippets/temp"): os.system("rm -rf ../tmp_snippets/temp") print( f"{BColors.OKGREEN}Concrete MIVOT snippet for " f"{BColors.BOLD}{os.path.basename(self.xml_file)} stored in " f"{BColors.BOLD}{output_file}{BColors.ENDC}" )
[docs] def ask_for_collection(self, actual_collection, instance_count, parent_key): """ Ask the user if he wants to add another property in the collection :param actual_collection: the context of the property :param instance_count: the number of instance in the collection :param parent_key: the parent key of the property """ if self.concrete_list is None: print( f"{BColors.OKCYAN} Do you want to add an Instance " f"in this collection ( {actual_collection} ) " f"for {parent_key}? \n " f"ACTUAL NUMBER OF INSTANCE : {instance_count} \n " f"(y/n){BColors.ENDC}" ) choice = input() if choice == "y": state = True elif choice == "n": state = False else: print(f"{BColors.WARNING}Please enter a valid choice{BColors.ENDC}") return self.ask_for_collection( actual_collection, instance_count, parent_key ) return state for cc_dict in self.concrete_list: if self.dmrole == cc_dict["dmrole"] and self.dmtype == cc_dict["dmtype"]: return True return False
[docs] @staticmethod def remove_instance(xml_file, dmtype): """ Remove the instance of a class containing given dmtype and all its children """ with open(xml_file, "r", encoding="utf-8") as file: buffer = "" counter = 0 is_dmtype = False for line in file: if counter == 0: buffer += line if f'dmtype="{dmtype}"' in line: is_dmtype = True if "<INSTANCE" in line and "/>" not in line and is_dmtype: counter += 1 if "</INSTANCE" in line: counter -= 1 with open(xml_file, "w", encoding="utf-8") as file: file.write(buffer)
[docs] @staticmethod def clean(xml_file): """ Remove all empty collection in the xml file """ to_exclude = [] with open(xml_file, "r", encoding="utf-8") as file: previous_line = "" counter = 0 for line in file: if ( InstanceSnippetBuilder.get_dm_role(previous_line) != "mango:Property.associatedProperties" ): if ( "</COLLECTION>" in line and "<COLLECTION" in previous_line or "<INSTANCE" in previous_line and "/>" in previous_line ): if "<INSTANCE" in previous_line and "/>" in previous_line: to_exclude.append(counter - 2) to_exclude.append(counter - 1) to_exclude.append(counter) previous_line = line counter += 1 buffer = "" with open(xml_file, "r", encoding="utf-8") as file: counter = 0 for line in file: if counter not in to_exclude: buffer += line counter += 1 with open(xml_file, "w", encoding="utf-8") as file: file.write(buffer)
[docs] def insert_dm_roles(self, xml_file, dmroles): """ Insert the dmroles in the concrete MIVOT snippet """ with open(xml_file, "r", encoding="utf-8") as file: buffer = "" for line in file: if 'dmrole=""' in line: if self.get_dm_type(line) in list(dmroles.keys()): if len(dmroles) > 0: if isinstance(list(dmroles.values())[0], list): dmrole = list(dmroles.values())[0][0] remove_value( dmroles, list(dmroles.keys())[0], list(dmroles.values())[0][0], ) else: dmrole = list(dmroles.values())[0] remove_value( dmroles, list(dmroles.keys())[0], list(dmroles.values())[0], ) else: dmrole = "" line = line.replace('dmrole=""', f'dmrole="{dmrole}"') buffer += line with open(xml_file, "w", encoding="utf-8") as file: file.write(buffer)
[docs] def get_instance(self, model_name, class_name): """ Get the instance of the class class_name of the model model_name :return: the instance path """ builder = Builder(model_name, class_name, self.session) builder.build() return builder.outputname
[docs] @staticmethod def get_dm_role(line): """ Get the dmrole from the line :return: the dmrole """ dmrole = "" if "dmrole" in line: dmrole = line.split("dmrole")[1].split('"')[1] return dmrole
[docs] @staticmethod def get_dm_type(line): """ Get the dmtype from the line :return: the dmtype """ dmtype = "" if "dmtype" in line: dmtype = line.split("dmtype")[1].split('"')[1] return dmtype
[docs] def add_abstract_classes(self, model): """ Add the abstract classes to the list """ for i in ["objectType", "dataType"]: xml_tree = XmlUtils.xmltree_from_file(self.session.get_vodml(model)).xpath( f".//{i}" ) for ele in xml_tree: if ( "abstract" in ele.attrib and ele.attrib.get("abstract").lower() == "true" ): for tags in ele.getchildren(): if ( tags.tag == "vodml-id" and tags.text not in self.abstract_classes ): self.abstract_classes.append(f"{model}:{tags.text}") continue
[docs] def populate_choices(self, els, parent_key): """ Make an input with choices from the list """ min_occurs = 1 if len(self.dmrole) > 0: to_check = self.dmrole.split(":")[1] xml_tree = XmlUtils.xmltree_from_file( self.session.get_vodml(parent_key.split(":")[0]) ).xpath(".//objectType/attribute") else: to_check = self.dmtype.split(":")[1] xml_tree = XmlUtils.xmltree_from_file( self.session.get_vodml(self.dmtype.split(":")[0]) ).xpath(".//objectType") for ele in xml_tree: for tags in ele.getchildren(): if tags.tag == "vodml-id" and tags.text == to_check: ext = ele.xpath(".//multiplicity/minOccurs")[0] min_occurs = int(ext.text) if ext.text is not None else 1 elements = setup_elements(els, self.dmtype, self.abstract_classes) if self.concrete_list is not None and len(self.concrete_list) > 0: cc_dict = ( self.concrete_list[0] if isinstance(self.concrete_list, list) else self.concrete_list ) print(f"{BColors.BOLD}{BColors.OKBLUE}INFORMATIONS GIVEN: {BColors.ENDC}") print(f'{BColors.OKBLUE}dmrole: {cc_dict["dmrole"]}{BColors.ENDC}') print(f'{BColors.OKBLUE}dmtype: {cc_dict["dmtype"]}{BColors.ENDC}') print(f'{BColors.OKBLUE}context: {cc_dict["context"]}{BColors.ENDC}') print(f'{BColors.OKBLUE}class: {cc_dict["class"]}{BColors.ENDC}') if ( self.dmrole == cc_dict["dmrole"] and self.dmtype == cc_dict["dmtype"] and parent_key == cc_dict["context"] ): for element in elements: if isinstance(element, list): if cc_dict["class"] in element[0]: print("Found concrete class: " + cc_dict["class"]) self.concrete_list.pop(0) return cc_dict["class"] elif cc_dict["class"] == element: print("Found concrete class: " + cc_dict["class"]) self.concrete_list.pop(0) return cc_dict["class"] print( f'{BColors.WARNING}{cc_dict["class"]}' f" is an invalid proposition " f"for {self.dmtype} (for {self.dmrole}).\n{BColors.ENDC}" ) else: if min_occurs == 0: print( f"{BColors.WARNING}{self.dmrole} is optional, " f"skipping... {BColors.ENDC}" ) return "None" if self.dmrole != cc_dict["dmrole"]: print( f'{BColors.WARNING}{cc_dict["dmrole"]} ' f"is an invalid dmrole for class {self.dmtype}" f" in parent class {parent_key}. " f"Actual dmrole: {self.dmrole}\n{BColors.ENDC}" ) if self.dmtype != cc_dict["dmtype"]: print( f'{BColors.WARNING}{cc_dict["dmtype"]} ' f"is an invalid dmtype for dmrole {self.dmrole}" f" in parent class {parent_key}. " f"Actual dmtype: {self.dmtype}\n{BColors.ENDC}" ) if parent_key != cc_dict["context"]: print( f'{BColors.WARNING}{cc_dict["context"]} ' f"is an invalid parent class for dmrole {self.dmrole}" f" and dmtype {self.dmtype}. " f"Actual parent class: {parent_key}\n{BColors.ENDC}" ) self.concrete_list.pop(0) if min_occurs == 0: if "None" not in elements: elements.append("None") print(f"{BColors.OKBLUE}Please choose from the list below : {BColors.ENDC}") for i, element in enumerate(elements): if isinstance(element, list): print(f"{BColors.GRAY}{str(i)} : {element[1]}{BColors.ENDC}") else: print(f"{BColors.GRAY}{str(i)} : {element}{BColors.ENDC}") choice = input("Your choice : ") if choice.isdigit() and int(choice) < len(elements): if isinstance(elements[int(choice)], list): return elements[int(choice)][0] return elements[int(choice)] print(f"{BColors.WARNING}Wrong choice, please try again.{BColors.ENDC}") return self.populate_choices(els, parent_key)