"""
Code imported from SVOM
Created on 29 mai 2019
@author: michel
"""
import json
import re
import urllib
import ssl
from collections import OrderedDict
from mivot_validator.utils import logger
from mivot_validator.utils.json_encoder import MyEncoder
[docs]
class DictUtils:
"""
static class processing implementing convenient operation on dictionaries
"""
[docs]
@staticmethod
def get_required_value(dictionary, key):
"""
returns dictionary[key] if available and rises an exception otherwise.
:param dictionary: dictionary
:type dictionary: Python Dict
:param key: key of the searched value
:type key: string
:return: the value attached to the key
:rtype: any supported Dict value
"""
if not dictionary:
raise Exception("Cannot get any value for None dict")
if key in dictionary.keys():
return dictionary[key]
raise Exception(f"missing key: {key}")
[docs]
@staticmethod
def get_fatal_value(dictionary, key):
"""
returns dictionary[key] if available and trigger
a system exit otherwise.
:param dictionary: dictionary
:type dictionary: Python Dict
:param key: key of the searched value
:type key: string
:return: the value attached to the key
:rtype: any supported Dict value
"""
if not dictionary:
raise Exception("Cannot get any value for None dict")
if key in dictionary.keys():
return dictionary[key]
raise Exception(f"missing key: {key}")
[docs]
@staticmethod
def get_optional_value(dictionary, key, null=None):
"""
returns dictionary[key] if available or a null value otherwise.
:param dictionary: dictionary
:type dictionary: Python Dict
:param key: key of the searched value
:type key: string
:param null: value taken when the key os
not available (None by default)
:type null: string
:return: the value attached to the key or the null value
:rtype: any supported Dict value
"""
if not dictionary:
return null
if dictionary and key in dictionary.keys():
return dictionary[key]
return null
[docs]
@staticmethod
def read_dict_from_file(filename, fatal=False):
"""
Read a Dict in filename, rises an exception if something goes wrong.
:param filename: filename
:type filename: string
:param fatal: trigger a systeml exit if true
:type fatal: boolean
:return: the Dict extracted from the file
:rtype: Python Dict
"""
try:
logger.debug("Reading json from %s", filename)
with open(filename, "r", encoding="utf-8") as file:
retour = json.load(file, object_pairs_hook=OrderedDict)
return retour
except Exception as exception:
if fatal is True:
raise Exception(f"reading {filename}")
logger.error(f"{exception} reading {filename}")
return None
[docs]
@staticmethod
def write_dict_from_file(dictionary, filename, fatal=False):
"""
Write the dictionary in filename, rises an exception
if something goes wrong.
:param filename: filename
:type filename: string
:param fatal: trigger a systeml exit if true
:type fatal: boolean
"""
try:
logger.debug("Writing json in %s", filename)
with open(filename, "w", encoding="utf-8") as file:
file.write(json.dumps(dictionary, indent=2, sort_keys=True))
except Exception as exception:
if fatal is True:
raise Exception(f"writing {filename}")
logger.error(f"{exception} writing {filename}")
[docs]
@staticmethod
def read_dict_from_url(url, fatal=False):
"""
Read a Dict from url, rises an exception if something goes wrong.
:param url: url
:type url: string
:param fatal: trigger a systeml exit if true
:type fatal: boolean
:return: the Dict extracted from the file
:rtype: Python Dict
"""
try:
logger.debug("Reading json from %s", url)
open_url = urllib.request.urlopen(
url, context=ssl._create_unverified_context()
)
if open_url.getcode() == 200:
return json.loads(open_url.read().decode("utf-8"))
raise Exception(f"{url} return code {open_url.getcode()}")
except Exception as exception:
if fatal is True:
raise Exception(f"reading {url}")
logger.error(f"{exception} reading {url}")
return None
[docs]
@staticmethod
def get_pretty_json(dictionnary):
"""
:return: A pretty string representation of the dictionary
:rtype: Python Dict
"""
return json.dumps(
dictionnary,
indent=2,
# sort_keys=True,
cls=MyEncoder,
)
[docs]
@staticmethod
def print_pretty_json(dictionnary):
"""
:return: Print out pretty string representation of the dictionary
"""
print(DictUtils.get_pretty_json(dictionnary))
[docs]
@staticmethod
def get_permanent_string(text):
"""
This method make the string independent of a specific run context
It us used to format object in a way they can be checked in unit demo
dates values are replaced with DATE
and the seq_id with SEQID.
:param text: string to be formated
:type text: string
"""
replaced = re.sub(
"svom_mxt_proto_any_.*",
"svom_mxt_proto_any_SEQID",
text)
replaced = re.sub("svom_test_.*", "svom_test_any_SEQID", replaced)
replaced = re.sub(
"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}",
"DATE",
replaced
)
return replaced
[docs]
@staticmethod
def get_permanent_object(dico):
"""
This method make the object content independent
of a specific run context
It us used to format object in a way they
can be checked in unit demo
dates values are replaced with DATE
and the seq_id with SEQID.
:param dico: object to be formated
:type dico: anything
"""
retour = None
if isinstance(dico, str):
retour = DictUtils.get_permanent_string(dico)
elif isinstance(dico, bytes):
retour = DictUtils.get_permanent_string(dico.decode("utf-8"))
elif isinstance(dico, list):
for idx, item in enumerate(dico):
dico[idx] = DictUtils.get_permanent_object(item)
retour = dico
elif isinstance(dico, dict):
for key, value in dico.items():
dico[key] = DictUtils.get_permanent_object(value)
retour = dico
else:
retour = dico
return retour
[docs]
@staticmethod
def get_prefixed_keys_dict(dictionnary, prefix):
"""
Returns a copy of dict with all keys prefixed with prefix
Works only art root level, no recursion.
:param dictionnary: dict on which key prefixes must be applied
:type dictionnary: dict
:param prefix: prefix to be aplied to the keys
:type prefix: string
:rtype: prefixed dict
"""
retour = {}
for key in dictionnary.keys():
retour[prefix + "_" + key] = dictionnary[key]
return retour
[docs]
@staticmethod
def find_item_by_key(dictionnary, key):
"""
Look for the first dictionary item attached to key
:param dictionary: dictionary to be explored
:type dictionary: {}
:param key: searched key
:type key: string
"""
result = []
DictUtils._find_item_by_key(dictionnary, key, result)
return result
@staticmethod
def _find_item_by_key(dictionnary, key, result):
"""
Look for the first dictionary item attached to key
:param dictionary: dictionary to be explored
:type dictionary: {}
:param key: searched key
:type key: string
"""
if len(result) > 0:
return
if isinstance(dictionnary, dict):
if key in dictionnary:
result.append(dictionnary[key])
for _, sval in dictionnary.items():
DictUtils._find_item_by_key(sval, key, result)
elif isinstance(dictionnary, list):
for list_item in dictionnary:
DictUtils._find_item_by_key(list_item, key, result)