"""
Created on 5 Jan 2022
@author: laurentmichel
"""
import sys
import re
from copy import deepcopy
from lxml import etree
from mivot_validator.instance_checking import logger
from mivot_validator.instance_checking.xml_interpreter.exceptions import (
MappingException,
NotImplementedException
)
from mivot_validator.instance_checking.\
xml_interpreter.annotation_seeker import AnnotationSeeker
from mivot_validator.instance_checking.\
xml_interpreter.resource_seeker import ResourceSeeker
from mivot_validator.instance_checking.\
xml_interpreter.table_iterator import TableIterator
from mivot_validator.instance_checking.\
xml_interpreter.static_reference_resolver import StaticReferenceResolver
from mivot_validator.instance_checking.\
xml_interpreter.dynamic_reference import DynamicReference
from mivot_validator.instance_checking.xml_interpreter.join_operator import (
JoinOperator)
from mivot_validator.utils.xml_utils import XmlUtils
[docs]
class ModelViewer:
"""
ModelViewer is a PyVO table wrapper aiming at providing
a model view on VOTable data read with usual tools
Standard usage applied to data rows
.. code-block:: python
votable = parse(votable_path)
for resource in votable.resources:
model_viewer = ModelViewer(resource)
model_viewer.connect_table("results")
while True:
data_row = model_viewer.get_next_row()
if data_row is None:
break
model_view = model_viewer.get_model_view()
json_model_view = model_viewer.get_json_model_view()
break
Standard usage applied to global instances
.. code-block:: python
votable = parse(votable_path)
for resource in votable.resources:
model_viewer = ModelViewer(resource)
time_series = model_viewer.get_globals_instance("cube:TimeSeries")
"""
def __init__(self, resource, votable_path=None):
"""
Constructor
votable_path is a workaround allowing to extract
the annotation block outside of astropy
:param resource: VOTable resource
:type resource: astropy.Resource
"""
self._resource = resource
self._assert_resource_is_result()
self._annotation_seeker = None
self._resource_seeker = ResourceSeeker(self._resource)
self._connected_table = None
self._connected_tableref = None
self._current_data_row = None
# when the search object is in GLOBALS
self._globals_instance = None
self._last_row = None
self._templates = None
self._joins = {}
self._dyn_references = {}
self._extract_mapping_block(votable_path=votable_path)
@property
def annotation_seeker(self):
"""
Return an API to search various components in the XML mapping block
"""
return self._annotation_seeker
@property
def resource_seeker(self):
"""
Return an API to search various components in the VOTabel resource
"""
return self._resource_seeker
@property
def connected_table(self):
return self._connected_table
@property
def connected_table_ref(self):
return self._connected_table_ref
@property
def current_data_row(self):
self._assert_table_is_connected()
return self._current_data_row
[docs]
def get_table_ids(self):
"""
return a list of the table located just below self.resource
"""
return self.resource_seeker.get_table_ids()
[docs]
def get_globals_models(self):
"""
Collection types are GLOBALS/COLLECTION/INSTANCE@dmtype:
used for collections of static objects
:return : The dmtypes of all the top level INSTANCE/COLLECTION
of GLOBALS
:rtype: {'COLLECTION': [dmtpyes], 'INSTANCE': [dmtypes]}
"""
retour = {}
retour["COLLECTION"] = (
self._annotation_seeker.get_globals_collection_dmtypes()
)
retour["INSTANCE"] = (
self._annotation_seeker.get_globals_instance_dmtypes()
)
return retour
[docs]
def get_templates_models(self):
"""
COLLECTION not implemented yet
:return : The dmtypes (except ivoa:ANY) of all INSTANCE/COLLECTION
of all TEMPLATES
:rtype: {'tableref: {'COLLECTIONS': [dmtpyes],
'INSTANCE': [dmtypes]}, ...}
"""
retour = {}
gni = self._annotation_seeker.get_instance_dmtypes()["TEMPLATES"]
for tid, tmplids in gni.items():
retour[tid] = {"COLLECTIONS": [], "INSTANCE": tmplids}
return retour
[docs]
def get_globals_instance(self, dmtype, resolve_ref=True):
"""
The a model view on the GLOBALS object (INSTANCE or COLLECTION)
with @dmtype=dmtype
"""
globals_models = self.get_globals_models()
found = False
retour = []
for globals_type in globals_models["COLLECTION"]:
if globals_type == dmtype:
found = True
# We process only one instance for now
global_type = self.annotation_seeker.get_instance_by_dmtype(
globals_type
)
self._globals_instance = global_type["GLOBALS"][0]
self._squash_globals_join_and_references()
globals_instance_copy = deepcopy(self._globals_instance)
if resolve_ref is True:
StaticReferenceResolver.resolve(
self._annotation_seeker, None, globals_instance_copy
)
for join_tag, join in self._joins.items():
logger.info("resolve join %s", join_tag)
join_operator = JoinOperator(self,
self._connected_tableref,
join)
join_operator._set_filter()
join_operator._set_foreign_instance()
join_operator.get_matching_data(None)
ref_element = globals_instance_copy.xpath(
"//" + join_tag)[0]
ref_host = ref_element.getparent()
for cpart in join_operator.get_matching_model_view(
resolve_ref=resolve_ref
):
ref_host.append(deepcopy(cpart))
# Drop the reference
ref_host.remove(ref_element)
retour.append(globals_instance_copy)
if found is True:
return retour
for globals_type in globals_models["INSTANCE"]:
if globals_type == dmtype:
raise NotImplementedException(
"GLOBALS/INSTANCE access not implemented yet"
)
raise NotImplementedException(f"no {dmtype} type found in GLOBALS")
[docs]
def connect_table(self, tableref):
"""
Iterate over the table identified by tableref
Required to browse table data.
Connect to the first table if tableref is None
"""
self._connected_tableref = tableref
self._connected_table = self._resource_seeker.get_table(tableref)
if self.connected_table is None:
raise MappingException(f"Cannot find table {tableref} in VOTable")
logger.debug("table %s found in VOTable", tableref)
self._templates = deepcopy(
self.annotation_seeker.get_templates_block(tableref))
if self._templates is None:
raise MappingException(f"Cannot find TEMPLATES {tableref} ")
logger.debug("TEMPLATES %s found ", tableref)
self.table_iterator = TableIterator(tableref,
self.connected_table.to_table())
self._squash_join_and_references()
self._set_column_indices()
self._set_column_units()
[docs]
def get_next_row(self):
"""
Return the next data row of the connected table
"""
self._assert_table_is_connected()
self._current_data_row = self.table_iterator._get_next_row()
return self._current_data_row
[docs]
def rewind(self):
"""
Rewind the table iterator of the connected table
"""
self._assert_table_is_connected()
self.table_iterator._rewind()
[docs]
def get_model_view(self, resolve_ref=True):
"""
return a XML model view of the last read row
"""
if self._current_data_row is None:
print("no data row: Cannot continue the process")
sys.exit(1)
self._assert_table_is_connected()
templates_copy = deepcopy(self._templates)
if resolve_ref is True:
while (
StaticReferenceResolver.resolve(
self._annotation_seeker,
self._connected_tableref, templates_copy
)
> 0
):
pass
# Make sure the instances of the resolved
# references have both indexes and unit attribute
XmlUtils.set_column_indices(
templates_copy,
self._resource_seeker.get_id_index_mapping(
self._connected_tableref),
)
XmlUtils.set_column_units(
templates_copy,
self._resource_seeker.get_id_unit_mapping(
self._connected_tableref),
)
for ele in templates_copy.xpath("//ATTRIBUTE"):
ref = ele.get("ref")
if ref is not None and ref != "NotSet":
index = ele.attrib["index"]
ele.attrib["value"] = str(self._current_data_row[int(index)])
for dref_tag, dref in self._dyn_references.items():
logger.info("resolve reference %s", dref_tag)
dyn_resolver = DynamicReference(
self, dref_tag, self._connected_tableref, dref
)
dyn_resolver._set_mode()
ref_target = dyn_resolver.get_target_instance(
self._current_data_row)
ref_element = templates_copy.xpath("//" + dref_tag)[0]
ref_host = ref_element.getparent()
ref_target_copy = deepcopy(ref_target)
# Set the reference role to the copied instance
ref_target_copy.attrib["dmrole"] = ref_element.get("dmrole")
# Insert the referenced object
ref_host.append(ref_target_copy)
# Drop the reference
ref_host.remove(ref_element)
for join_tag, join in self._joins.items():
logger.info("resolve join %s", join_tag)
join_operator = JoinOperator(self, self._connected_tableref, join)
join_operator._set_filter()
join_operator._set_foreign_instance()
join_operator.get_matching_data(self._current_data_row)
ref_element = templates_copy.xpath("//" + join_tag)[0]
ref_host = ref_element.getparent()
for cpart in join_operator.get_matching_model_view(
resolve_ref=resolve_ref):
ref_host.append(deepcopy(cpart))
# Drop the reference
ref_host.remove(ref_element)
return templates_copy
[docs]
def get_model_component_by_type(self, searched_dmtype):
"""
return the list of the xml instances with @dmtype=searched_ type
from the model view of the current data row
Return a {} if no matching dmtype was found
"""
self._assert_table_is_connected()
retour = []
model_view = self.get_model_view(resolve_ref=True)
for ele in model_view.xpath(
f'.//INSTANCE[@dmtype="{searched_dmtype}"]'):
retour.append(deepcopy(ele))
return retour
[docs]
def get_model_component_by_role(self, searched_dmrole):
"""
return the list of the xml instances with
dmrole=searched_role from the model view
of the current data row
Return a [] if no matching dmrole was found
"""
self._assert_table_is_connected()
retour = []
model_view = self.get_model_view(resolve_ref=True)
for ele in model_view.xpath(
f'.//INSTANCE[@dmrole="{searched_dmrole}"]'):
retour.append(deepcopy(ele))
return retour
[docs]
def get_declared_models(self):
"""
return the list of declared models ({name: url, ...}
"""
return self._annotation_seeker._declared_models
"""
Private methods
"""
def _assert_table_is_connected(self):
assert (
self._connected_table is not None
), "Operation failed: no connected data table"
def _assert_resource_is_result(self):
assert (
self._resource.type == "results"
), "ModelViewer must be set on a Resource with type=results"
def _extract_mapping_block(self, votable_path=None):
"""
String extraction must be replaced with astropy.Resource.model_mapping
when available
"""
logger.info("extract vodml block from %s", votable_path)
with open(votable_path, encoding="utf-8") as xml_file:
content = xml_file.read()
start = content.index("<VODML")
if start == -1:
raise MappingException("Cannot find mapping block")
content = content[start:]
stop_pattern = "</VODML>"
stop = content.index(stop_pattern) + len(stop_pattern)
content = content[:stop]
content = re.sub("xmlns=[\"'].*[\"']", "", content)
self._annotation_seeker = AnnotationSeeker(
etree.fromstring(content)
)
logger.info("VODML found")
def _squash_join_and_references(self):
"""
Remove both JOINs and REFERENCEs from the templates and
store them in to be resolved later on
This avoid to have the model view polluted with elements
that are not in the model
"""
for ele in self._templates.xpath(
"//*[starts-with(name(), 'REFERENCE_')]"
):
if ele.get("sourceref") is not None:
self._dyn_references = {ele.tag: deepcopy(ele)}
for child in list(ele):
ele.remove(child)
for ele in self._templates.xpath("//*[starts-with(name(), 'JOIN')]"):
self._joins = {ele.tag: deepcopy(ele)}
for child in list(ele):
ele.remove(child)
def _squash_globals_join_and_references(self):
"""
Remove both JOINs and REFERENCEs from the templates and store them
in to be resolved later on
This avoid to have the model view polluted with elements
that are not in the model
TODO: merge with the former method
"""
for ele in self._globals_instance.xpath(
"//*[starts-with(name(), 'JOIN')]"):
self._joins = {ele.tag: deepcopy(ele)}
for child in list(ele):
ele.remove(child)
def _set_column_indices(self):
"""
add column ranks to attribute having a ref.
Using ranks allow to identify columns even numpy raw
have been serialised as []
"""
index_map = self._resource_seeker.get_id_index_mapping(
self._connected_tableref
)
XmlUtils.set_column_indices(self._templates, index_map)
def _set_column_units(self):
"""
add field unit to attribute having a ref.
Used for performing unit conversions
"""
unit_map = self._resource_seeker.get_id_unit_mapping(
self._connected_tableref
)
XmlUtils.set_column_units(self._templates, unit_map)