"""
Created on 21 Feb 2023
@author: laurentmichel
"""
import os
from mivot_validator.utils.xml_utils import XmlUtils
from mivot_validator.utils.dmtype_utils import DmtypeUtils
from mivot_validator.instance_checking.inheritance_checker import InheritanceChecker
from mivot_validator.instance_checking.snippet_builder import Builder
# types to be ignored for now
inheritence_tree = {}
ivoa_types = ["ivoa:RealQuantity", "ivoa:IntQuantity"]
[docs]
class CheckFailedException(Exception):
pass
[docs]
def raise_check_failed_exception(message, tree_element):
"""
Parameters
----------
message: string
Exception message
tree_element: Element (XML)
XML element where the error occured
"""
if tree_element is not None:
XmlUtils.pretty_print(tree_element)
raise CheckFailedException(message)
[docs]
class InstanceChecker:
"""
API operating the validation of mapped instances against the VODML definition
- all ATTRIBUTE/COLLECTION/INSTANCE children of the mapped instance must be
referenced in the VODML with the same dmrole and the same dmtype.
- The dmtype checking takes into account the inheritance
- The mapped instances must not necessary host all the components declared in the VODML
- All the components hosted by the mapped instances must be compliant with the VODML
The VODML files are stored locally for the moment
"""
inheritence_tree = {}
[docs]
@staticmethod
def reset():
"""
Reset the static inheritence tree cache
Mainly used by tests that run the static class several time in one process
"""
InstanceChecker.inheritence_tree = {}
@staticmethod
def _get_vodml_class_tree(model, dmtype, session):
"""
Extract from the VODML file the object to be checked
Store first on disk a VODML representation of the searched object type
and then works with that XML snippet
parameters
----------
model: string
model short name as defined by the VODML
dmtype: string
type of the model component to be checked
return
------
The etree serialisation of the XML snippet
"""
filepath = os.path.join(session.tmp_data_path, f"{model}:{dmtype}")
filepath = filepath.replace(":", ".") + ".xml"
if os.path.exists(filepath) is False:
print(f"-> build snippet for class {model}:{dmtype}")
vodml_filename = session.get_vodml(model)
builder = Builder(
model,
dmtype,
# "Property",
session,
)
# build the XML snippet and store it on disk
builder.build()
InstanceChecker._build_inheritence_graph(vodml_filename)
else:
print(f"-> snippet for class {dmtype} already in the cache")
return XmlUtils.xmltree_from_file(filepath)
@staticmethod
def _get_vodmlid(vodmlid, model_name):
if ":" in vodmlid:
return f"{vodmlid}"
return f"{model_name}:{vodmlid}"
@staticmethod
def _build_inheritence_graph(vodml_filepath):
"""
Build a map of the inheritance links.
This is necessary to resolve cases where the model refer to abstract types
and the annotation uses concrete types (sub-types)
"""
vodml_tree = XmlUtils.xmltree_from_file(vodml_filepath)
graph = {}
for ele in vodml_tree.xpath("./name"):
model_name = ele.text
print(f" Build inheritence tree for model {model_name}")
# Build a map superclass : [sublcasses]
# No distinctions between objecttypeand datatypes
# MIVOT does not make any difference
# the vodml)id are unique within the scope of the whole model
for ele in vodml_tree.xpath(".//primitiveType"):
for tags in ele.getchildren():
if tags.tag == "vodml-id":
sub_class = model_name + ":" + tags.text
for ext in ele.xpath("./extends/vodml-ref"):
super_class = ext.text
if super_class not in graph:
graph[super_class] = []
if sub_class not in graph[super_class]:
graph[super_class].append(sub_class)
for ele in vodml_tree.xpath(".//dataType"):
for tags in ele.getchildren():
if tags.tag == "vodml-id":
sub_class = model_name + ":" + tags.text
for ext in ele.xpath("./extends/vodml-ref"):
super_class = ext.text
if super_class not in graph:
graph[super_class] = []
if sub_class not in graph[super_class]:
graph[super_class].append(sub_class)
for ele in vodml_tree.xpath(".//objectType"):
for tags in ele.getchildren():
if tags.tag == "vodml-id":
sub_class = model_name + ":" + tags.text
for ext in ele.xpath("./extends/vodml-ref"):
super_class = ext.text
if super_class not in graph:
graph[super_class] = []
if sub_class not in graph[super_class]:
graph[super_class].append(sub_class)
#
# We have inheritance with multiple levels (A->B->C)
# In such a case we must consider (in term of validation) that C extends A as well
# This the purpose of the code below.
# {A: [B, C, D] C:[X, Y]} --> {A: [B, C, D, X, Y], C:[X, Y]}
deep_tree = {}
for superclass, subclasses in graph.items():
for subclass in subclasses:
if subclass in graph:
if superclass not in deep_tree:
deep_tree[superclass] = []
for sc in graph[subclass]:
if sc not in deep_tree[superclass]:
deep_tree[superclass].append(sc)
for key in deep_tree:
for val in deep_tree[key]:
if val not in graph[key]:
graph[key].append(val)
for key in graph:
if key not in InstanceChecker.inheritence_tree:
InstanceChecker.inheritence_tree[key] = graph[key]
else:
InstanceChecker.inheritence_tree[key] = (
InstanceChecker.inheritence_tree[key] + graph[key]
)
# ivoa model is not parsed yet....
if "ivoa:Quantity" not in InstanceChecker.inheritence_tree:
InstanceChecker.inheritence_tree["ivoa:Quantity"] = ivoa_types
# Cross model inheritance not supported yet
if "meas:Measure" in InstanceChecker.inheritence_tree:
InstanceChecker.inheritence_tree["meas:Measure"].append(
"mango:extmeas.PhotometricMeasure"
)
return graph
@staticmethod
def _check_attribute(attribute_etree, vodml_instance):
"""
checks that the MIVOT representation of the attribute matches the model definition
parameters
----------
attribute_etree: etree
MIVOT representation of the attribute
vodml_instance: etree
VODML serialization of that attribute
return
------
boolean
"""
for child in vodml_instance.xpath("./ATTRIBUTE"):
checker = InheritanceChecker(InstanceChecker.inheritence_tree)
if child.get("dmrole") == attribute_etree.get("dmrole") and checker.inherits_from(
attribute_etree.get("dmtype"), child.get("dmtype")):
return True
model1, class1 = DmtypeUtils.split_dmtype(
child.get("dmtype")
)
model2, class2 = DmtypeUtils.split_dmtype(
attribute_etree.get("dmtype")
)
if model1 != model2 and class1 == class2:
return True
return False
@staticmethod
def _check_collection(collection_etree, vodml_instance, session):
"""
checks that the MIVOT representation of the collection matches the model definition
parameters
----------
collection_etree: etree
MIVOT representation of the collection
vodml_instance: etree
VODML serialization of that collection
return
------
a documented exception in case of failure
"""
collection_role = collection_etree.get("dmrole")
# Checks that collection items have all the same type
item_type = ""
for item in collection_etree.xpath("./*"):
mivot_item_type = item.get("dmtype")
checker = InheritanceChecker(InstanceChecker.inheritence_tree)
if item_type != "" and not checker.check_inheritance(
mivot_item_type, item_type
):
raise_check_failed_exception(
f"Collection with dmrole={collection_role} "
f"has items with different dmtypes {mivot_item_type} {item_type}",
collection_etree
)
item_type = mivot_item_type
# check that the mapped collection item have the type defined in the model
role_found = False
for vodml_child in vodml_instance.xpath("./COLLECTION"):
print(f'{vodml_child.get("dmrole")} {collection_role}')
if vodml_child.get("dmrole") == collection_role:
role_found = True
# Get the item type as defined by vodml
vodml_type = None
for vodml_item in vodml_child.xpath("./*"):
vodml_type = vodml_item.get("dmtype")
break
# This occurs when the collection is empty or filled with ATTRIBUTE
# The latest is a bug in the snippet generator
# TODO: fix it
if not vodml_type:
print(f"collection {collection_role} looks empty: no further checking")
return
# Get the item type as used by mivot
for item in collection_etree.xpath("./*"):
mivot_item_type = item.get("dmtype")
if (
mivot_item_type not in ivoa_types
and mivot_item_type != vodml_type
and (
vodml_type not in InstanceChecker.inheritence_tree
or mivot_item_type
not in InstanceChecker.inheritence_tree[vodml_type]
)
):
raise_check_failed_exception(
f"Collection with dmrole={collection_role} "
f"has items with prohibited types ({mivot_item_type}) "
f"instead of expected {vodml_type} ",
item
)
for item in collection_etree.xpath("./*"):
if item.tag == "INSTANCE":
InstanceChecker.check_instance_validity(item, session)
return
if role_found is False:
raise_check_failed_exception(
f"No collection with dmrole {collection_role} "
f"in object type {vodml_instance.getroot().get('dmtype')}",
collection_etree
)
@staticmethod
def _check_membership(actual_instance, enclosing_vodml_instance):
"""
Checks that the MIVOT component is a component of the VODML class
parameters
----------
actual_instance: etree
MIVOT instance
enclosing_vodml_instance: etree
VODML class supposed to enclose the actual instance
return
-------
a documented exception ins case of failure
"""
actual_role = actual_instance.get("dmrole")
for vodml_instance in enclosing_vodml_instance.getroot().xpath("./*"):
#print(vodml_instance.get("dmrole") + " " + actual_role)
if vodml_instance.get("dmrole") == actual_role:
actual_type = actual_instance.get("dmtype")
vodml_type = vodml_instance.get("dmtype")
if vodml_instance.tag == "REFERENCE":
print(f"-> found a reference with dmrole={actual_role} and dmtype={vodml_type}: no checking")
return
if actual_type == vodml_type:
return
# Sort of ad_hoc patch meanwhile ivoa DM is properly supported
if actual_type == "ivoa:RealQuantity" and vodml_type == "ivoa:Quantity":
return
if (vodml_type == "ivoa:datetime" and
actual_type in ["mango:Decimalyear", "mango:JulianEpoch", "mango:BesselianEpoch",
"mango:jd", "mango:mjd", "mango:iso"]):
return
if (
vodml_type in InstanceChecker.inheritence_tree
and actual_type in InstanceChecker.inheritence_tree[vodml_type]
):
print(f"-> found that {actual_type} inherits from {vodml_type}")
return
raise_check_failed_exception(
f"Object type {enclosing_vodml_instance.getroot().get('dmtype')} "
f"has no component with dmrole={actual_role} and dmtype={actual_type} "
f"type should be {vodml_type}",
vodml_instance
)
raise_check_failed_exception(
f"dmrole {actual_role} not found in "
f"object type {enclosing_vodml_instance.getroot().get('dmtype')}",
actual_instance
)
[docs]
@staticmethod
def check_instance_validity(instance_etree, session):
"""
Public method. The only one meant to be used from from outside
Checks that instance_etree is compliant with the model it refers to
parameters
----------
instance_etree: etree
MIVOT instance to be checked
return
-------
a documented exception ins case of failure
"""
checked_roles = []
dmtype = instance_etree.get("dmtype")
if dmtype is None:
raise_check_failed_exception(f"Mising dmtype in \n {XmlUtils.pretty_string(instance_etree)}",
instance_etree)
eles = dmtype.split(":")
dmrole = instance_etree.get("dmrole")
print(f"-> check class {eles[0]}:{eles[1]} from role {dmrole if dmrole else 'None'}")
if eles[0] == "ivoa":
print("-> IVOA/ see later")
return True
vodml_instance = InstanceChecker._get_vodml_class_tree(
eles[0], eles[1], session
)
for child in instance_etree.xpath("./*"):
if child.tag == "ATTRIBUTE":
InstanceChecker._check_membership(child, vodml_instance)
dmrole = child.get("dmrole")
if dmrole in checked_roles:
raise_check_failed_exception(f"Duplicated dmrole {dmrole}",
child)
checked_roles.append(child.get("dmrole"))
# ivao:Quantity are complex types that can be serialized as ATTRIBUTE.
# This is an exception
if (
child.get("dmtype") not in ivoa_types
and InstanceChecker._check_attribute(child, vodml_instance) is False
):
message = (
f"cannot find attribute with dmrole={dmrole} "
f'dmtype={child.get("dmtype")} in complex type {dmtype}'
)
raise_check_failed_exception(message,
child)
print(
f'VALID: attribute with dmrole={child.get("dmrole")} '
f'dmtype={child.get("dmtype")} in complex type {dmtype}'
)
elif child.tag == "INSTANCE":
dmrole = child.get("dmrole")
if dmrole in checked_roles:
raise_check_failed_exception(f"Duplicated dmrole {dmrole} (dmtype {child.get('dmtype')})",
child)
checked_roles.append(child.get("dmrole"))
if InstanceChecker.check_instance_validity(child, session) is False:
message = (
f"cannot find instance with dmrole={dmrole} "
f'dmtype={child.get("dmtype")} in complex type {dmtype}'
)
raise_check_failed_exception(message,
child)
InstanceChecker._check_membership(child, vodml_instance)
print(
f"VALID: instance with dmrole={dmrole} "
f'dmtype={child.get("dmtype")} in complex type {dmtype}'
)
elif child.tag == "COLLECTION":
dmrole = child.get("dmrole")
if dmrole in checked_roles:
raise_check_failed_exception(f"Duplicated dmrole {dmrole}",
child)
checked_roles.append(child.get("dmrole"))
if (
InstanceChecker._check_collection(child, vodml_instance, session)
is False
):
message = (
f"cannot find collection with dmrole={dmrole} "
f"in complex type {dmtype}"
)
raise_check_failed_exception(message,
instance_etree)
print(
f"VALID: collection with dmrole={dmrole} "
f"in complex type {dmtype}"
)
elif child.tag == "REFERENCE":
dmrole = child.get("dmrole")
if dmrole in checked_roles:
raise_check_failed_exception(f"Duplicated dmrole {dmrole}",
child)
print(f"SKIPPED: Reference to instance with dmrole={dmrole}")
else:
raise_check_failed_exception(f"unsupported tag {child.tag}",
child)
return True