Source code for openglider.jsonify
import json
import re
import time
import datetime
import openglider.jsonify.migration
from openglider.utils import recursive_getattr
__ALL__ = ["dumps", "dump", "loads", "load"]
# Main json-export routine.
# Maybe at some point it can become necessary to de-reference classes with _module also,
# because of same-name-elements....
# For the time given, we're alright
datetime_format = "%d.%m.%Y %H:%M"
datetime_format_regex = re.compile(r"^\d{2}\.\d{2}\.\d{4} \d{2}:\d{2}$")
[docs]
class Encoder(json.JSONEncoder):
[docs]
def default(self, obj):
if obj.__class__.__module__ == "numpy":
return obj.tolist()
elif isinstance(obj, datetime.datetime):
return obj.strftime(datetime_format)
elif hasattr(obj, "__json__"):
type_str = str(obj.__class__)
module = obj.__class__.__module__
type_regex = "<class '{}\\.(.*)'>".format(module.replace(".", "\\."))
class_name = re.match(type_regex, type_str).group(1)
return {"_type": class_name, "_module": module, "data": obj.__json__()}
else:
return super(Encoder, self).default(obj)
[docs]
def get_element(_module, _name):
for rex in openglider.config["json_forbidden_modules"]:
if re.match(rex, _module):
raise Exception
elif re.match(rex, _name):
raise Exception
for rex in openglider.config["json_allowed_modules"]:
match = re.match(rex, _module)
if match:
fromlist = [str(w) for w in _module.split(".")]
module = __import__(_module, fromlist=fromlist)
obj = recursive_getattr(module, _name)
return obj
[docs]
def object_hook(dct):
"""
Return the de-serialized object
"""
for key, value in dct.items():
if isinstance(value, str) and datetime_format_regex.match(value):
dct[key] = datetime.datetime.strptime(value, datetime_format)
if "_type" in dct and "_module" in dct:
obj = get_element(dct["_module"], dct["_type"])
try:
# use the __from_json__ function if present. __init__ otherwise
deserializer = getattr(obj, "__from_json__", obj)
return deserializer(**dct["data"])
except TypeError as e:
raise TypeError(
"{} in element: {} ({})".format(e, dct["_type"], dct["_module"])
)
else:
return dct
[docs]
def dumps(obj, add_meta=True):
if add_meta:
obj = add_metadata(obj)
return json.dumps(obj, cls=Encoder)
[docs]
def dump(obj, fp, add_meta=True):
if add_meta:
obj = add_metadata(obj)
return json.dump(obj, fp, cls=Encoder, indent=4)
[docs]
def loads(obj):
try:
return json.loads(obj, object_hook=object_hook)
except:
data = openglider.jsonify.migration.migrate(obj)
return loads(data)
[docs]
def load(fp):
return json.load(fp, object_hook=object_hook)