hat.json
JSON Data library
1"""JSON Data library""" 2 3from hat.json.data import (Array, 4 Object, 5 Data, 6 equals, 7 clone, 8 flatten) 9from hat.json.path import (Path, 10 get, 11 set_, 12 remove, 13 Storage) 14from hat.json.encoder import (Format, 15 encode, 16 decode, 17 get_file_format, 18 encode_file, 19 decode_file, 20 encode_stream, 21 decode_stream) 22from hat.json.patch import (diff, 23 patch) 24from hat.json.repository import (SchemaRepository, 25 json_schema_repo) 26from hat.json.validator import (Validator, 27 DefaultValidator, 28 JsonSchemaValidator) 29from hat.json import vt 30 31 32__all__ = ['Array', 33 'Object', 34 'Data', 35 'equals', 36 'clone', 37 'flatten', 38 'Path', 39 'get', 40 'set_', 41 'remove', 42 'Storage', 43 'Format', 44 'encode', 45 'decode', 46 'get_file_format', 47 'encode_file', 48 'decode_file', 49 'encode_stream', 50 'decode_stream', 51 'diff', 52 'patch', 53 'SchemaRepository', 54 'json_schema_repo', 55 'Validator', 56 'DefaultValidator', 57 'JsonSchemaValidator', 58 'vt']
19def equals(a: Data, 20 b: Data 21 ) -> bool: 22 """Equality comparison of json serializable data. 23 24 Tests for equality of data according to JSON format. Notably, ``bool`` 25 values are not considered equal to numeric values in any case. This is 26 different from default equality comparison, which considers `False` 27 equal to `0` and `0.0`; and `True` equal to `1` and `1.0`. 28 29 Example:: 30 31 assert equals(0, 0.0) is True 32 assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True 33 assert equals(1, True) is False 34 35 """ 36 if isinstance(a, bool) != isinstance(b, bool): 37 return False 38 if a != b: 39 return False 40 41 if isinstance(a, dict): 42 return all(equals(a[key], b[key]) for key in a) 43 elif isinstance(a, list): 44 return all(equals(i, j) for i, j in zip(a, b)) 45 else: 46 return True
Equality comparison of json serializable data.
Tests for equality of data according to JSON format. Notably, bool
values are not considered equal to numeric values in any case. This is
different from default equality comparison, which considers False
equal to 0
and 0.0
; and True
equal to 1
and 1.0
.
Example::
assert equals(0, 0.0) is True
assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
assert equals(1, True) is False
49def clone(data: Data) -> Data: 50 """Deep clone data 51 52 This function recursively creates new instances of array and object data 53 based on input data. Resulting json data is equal to provided data. 54 55 Example:: 56 57 x = {'a': [1, 2, 3]} 58 y = clone(x) 59 assert x is not y 60 assert x['a'] is not y['a'] 61 assert equals(x, y) 62 63 """ 64 if isinstance(data, list): 65 return [clone(i) for i in data] 66 67 if isinstance(data, dict): 68 return {k: clone(v) for k, v in data.items()} 69 70 return data
Deep clone data
This function recursively creates new instances of array and object data based on input data. Resulting json data is equal to provided data.
Example::
x = {'a': [1, 2, 3]}
y = clone(x)
assert x is not y
assert x['a'] is not y['a']
assert equals(x, y)
73def flatten(data: Data 74 ) -> typing.Iterable[Data]: 75 """Flatten JSON data 76 77 If `data` is array, this generator recursively yields result of `flatten` 78 call with each element of input list. For other `Data` types, input data is 79 yielded. 80 81 Example:: 82 83 data = [1, [], [2], {'a': [3]}] 84 result = [1, 2, {'a': [3]}] 85 assert list(flatten(data)) == result 86 87 """ 88 if isinstance(data, list): 89 for i in data: 90 yield from flatten(i) 91 else: 92 yield data
17def get(data: Data, 18 path: Path, 19 default: typing.Optional[Data] = None 20 ) -> Data: 21 """Get data element referenced by path 22 23 Example:: 24 25 data = {'a': [1, 2, [3, 4]]} 26 path = ['a', 2, 0] 27 assert get(data, path) == 3 28 29 data = [1, 2, 3] 30 assert get(data, 0) == 1 31 assert get(data, 5) is None 32 assert get(data, 5, default=123) == 123 33 34 """ 35 for i in flatten(path): 36 if isinstance(i, str): 37 if not isinstance(data, dict) or i not in data: 38 return default 39 data = data[i] 40 41 elif isinstance(i, int) and not isinstance(i, bool): 42 if not isinstance(data, list): 43 return default 44 try: 45 data = data[i] 46 except IndexError: 47 return default 48 49 else: 50 raise ValueError('invalid path') 51 52 return data
Get data element referenced by path
Example::
data = {'a': [1, 2, [3, 4]]}
path = ['a', 2, 0]
assert get(data, path) == 3
data = [1, 2, 3]
assert get(data, 0) == 1
assert get(data, 5) is None
assert get(data, 5, default=123) == 123
55def set_(data: Data, 56 path: Path, 57 value: Data 58 ) -> Data: 59 """Create new data by setting data path element value 60 61 Example:: 62 63 data = [1, {'a': 2, 'b': 3}, 4] 64 path = [1, 'b'] 65 result = set_(data, path, 5) 66 assert result == [1, {'a': 2, 'b': 5}, 4] 67 assert result is not data 68 69 data = [1, 2, 3] 70 result = set_(data, 4, 4) 71 assert result == [1, 2, 3, None, 4] 72 73 """ 74 parents = collections.deque() 75 76 for i in flatten(path): 77 parent = data 78 79 if isinstance(i, str): 80 data = data.get(i) if isinstance(data, dict) else None 81 82 elif isinstance(i, int) and not isinstance(i, bool): 83 try: 84 data = data[i] if isinstance(data, list) else None 85 except IndexError: 86 data = None 87 88 else: 89 raise ValueError('invalid path') 90 91 parents.append((parent, i)) 92 93 while parents: 94 parent, i = parents.pop() 95 96 if isinstance(i, str): 97 parent = dict(parent) if isinstance(parent, dict) else {} 98 parent[i] = value 99 100 elif isinstance(i, int) and not isinstance(i, bool): 101 if not isinstance(parent, list): 102 parent = [] 103 104 if i >= len(parent): 105 parent = [*parent, 106 *itertools.repeat(None, i - len(parent) + 1)] 107 108 elif i < 0 and (-i) > len(parent): 109 parent = [*itertools.repeat(None, (-i) - len(parent)), 110 *parent] 111 112 else: 113 parent = list(parent) 114 115 parent[i] = value 116 117 else: 118 raise ValueError('invalid path') 119 120 value = parent 121 122 return value
Create new data by setting data path element value
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = set_(data, path, 5)
assert result == [1, {'a': 2, 'b': 5}, 4]
assert result is not data
data = [1, 2, 3]
result = set_(data, 4, 4)
assert result == [1, 2, 3, None, 4]
125def remove(data: Data, 126 path: Path 127 ) -> Data: 128 """Create new data by removing part of data referenced by path 129 130 Example:: 131 132 data = [1, {'a': 2, 'b': 3}, 4] 133 path = [1, 'b'] 134 result = remove(data, path) 135 assert result == [1, {'a': 2}, 4] 136 assert result is not data 137 138 data = [1, 2, 3] 139 result = remove(data, 4) 140 assert result == [1, 2, 3] 141 142 """ 143 result = data 144 parents = collections.deque() 145 146 for i in flatten(path): 147 parent = data 148 149 if isinstance(i, str): 150 if not isinstance(data, dict) or i not in data: 151 return result 152 data = data[i] 153 154 elif isinstance(i, int) and not isinstance(i, bool): 155 if not isinstance(data, list): 156 return result 157 try: 158 data = data[i] 159 except IndexError: 160 return result 161 162 else: 163 raise ValueError('invalid path') 164 165 parents.append((parent, i)) 166 167 result = None 168 169 while parents: 170 parent, i = parents.pop() 171 172 if isinstance(i, str): 173 parent = dict(parent) 174 175 elif isinstance(i, int) and not isinstance(i, bool): 176 parent = list(parent) 177 178 else: 179 raise ValueError('invalid path') 180 181 if result is None: 182 del parent[i] 183 184 else: 185 parent[i] = result 186 187 result = parent 188 189 return result
Create new data by removing part of data referenced by path
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = remove(data, path)
assert result == [1, {'a': 2}, 4]
assert result is not data
data = [1, 2, 3]
result = remove(data, 4)
assert result == [1, 2, 3]
192class Storage: 193 """JSON data storage 194 195 Helper class representing observable JSON data state manipulated with 196 path based get/set/remove functions. 197 198 """ 199 200 def __init__(self, data: Data = None): 201 self._data = data 202 self._change_cbs = util.CallbackRegistry() 203 204 @property 205 def data(self) -> Data: 206 """Data""" 207 return self._data 208 209 def register_change_cb(self, 210 cb: typing.Callable[[Data], None] 211 ) -> util.RegisterCallbackHandle: 212 """Register data change callback""" 213 return self._change_cbs.register(cb) 214 215 def get(self, path: Path, default: typing.Optional[Data] = None): 216 """Get data""" 217 return get(self._data, path, default) 218 219 def set(self, path: Path, value: Data): 220 """Set data""" 221 self._data = set_(self._data, path, value) 222 self._change_cbs.notify(self._data) 223 224 def remove(self, path: Path): 225 """Remove data""" 226 self._data = remove(self._data, path) 227 self._change_cbs.notify(self._data)
JSON data storage
Helper class representing observable JSON data state manipulated with path based get/set/remove functions.
209 def register_change_cb(self, 210 cb: typing.Callable[[Data], None] 211 ) -> util.RegisterCallbackHandle: 212 """Register data change callback""" 213 return self._change_cbs.register(cb)
Register data change callback
215 def get(self, path: Path, default: typing.Optional[Data] = None): 216 """Get data""" 217 return get(self._data, path, default)
Get data
17class Format(enum.Enum): 18 """Encoding format""" 19 JSON = 'json' 20 YAML = 'yaml' 21 TOML = 'toml'
Encoding format
Inherited Members
- enum.Enum
- name
- value
24def encode(data: Data, 25 format: Format = Format.JSON, 26 indent: typing.Optional[int] = None 27 ) -> str: 28 """Encode JSON data. 29 30 In case of TOML format, data must be JSON Object. 31 32 Args: 33 data: JSON data 34 format: encoding format 35 indent: indentation size 36 37 """ 38 if format == Format.JSON: 39 return json.dumps(data, indent=indent, allow_nan=False) 40 41 if format == Format.YAML: 42 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 43 else yaml.SafeDumper) 44 return str(yaml.dump(data, indent=indent, Dumper=dumper)) 45 46 if format == Format.TOML: 47 return tomli_w.dumps(data) 48 49 raise ValueError('unsupported format')
Encode JSON data.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- format: encoding format
- indent: indentation size
52def decode(data_str: str, 53 format: Format = Format.JSON 54 ) -> Data: 55 """Decode JSON data. 56 57 Args: 58 data_str: encoded JSON data 59 format: encoding format 60 61 """ 62 if format == Format.JSON: 63 return json.loads(data_str) 64 65 if format == Format.YAML: 66 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 67 else yaml.SafeLoader) 68 return yaml.load(io.StringIO(data_str), Loader=loader) 69 70 if format == Format.TOML: 71 return tomli.loads(data_str) 72 73 raise ValueError('unsupported format')
Decode JSON data.
Arguments:
- data_str: encoded JSON data
- format: encoding format
76def get_file_format(path: pathlib.PurePath) -> Format: 77 """Detect file format based on path suffix""" 78 if path.suffix == '.json': 79 return Format.JSON 80 81 if path.suffix in ('.yaml', '.yml'): 82 return Format.YAML 83 84 if path.suffix == '.toml': 85 return Format.TOML 86 87 raise ValueError('can not determine format from path suffix')
Detect file format based on path suffix
90def encode_file(data: Data, 91 path: pathlib.PurePath, 92 format: typing.Optional[Format] = None, 93 indent: typing.Optional[int] = 4): 94 """Encode JSON data to file. 95 96 If `format` is ``None``, encoding format is derived from path suffix. 97 98 In case of TOML format, data must be JSON Object. 99 100 Args: 101 data: JSON data 102 path: file path 103 format: encoding format 104 indent: indentation size 105 106 """ 107 if format is None: 108 format = get_file_format(path) 109 110 flags = 'w' if format != Format.TOML else 'wb' 111 encoding = 'utf-8' if format != Format.TOML else None 112 113 with open(path, flags, encoding=encoding) as f: 114 encode_stream(data, f, format, indent)
Encode JSON data to file.
If format
is None
, encoding format is derived from path suffix.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- path: file path
- format: encoding format
- indent: indentation size
117def decode_file(path: pathlib.PurePath, 118 format: typing.Optional[Format] = None 119 ) -> Data: 120 """Decode JSON data from file. 121 122 If `format` is ``None``, encoding format is derived from path suffix. 123 124 Args: 125 path: file path 126 format: encoding format 127 128 """ 129 if format is None: 130 format = get_file_format(path) 131 132 flags = 'r' if format != Format.TOML else 'rb' 133 encoding = 'utf-8' if format != Format.TOML else None 134 135 with open(path, flags, encoding=encoding) as f: 136 return decode_stream(f, format)
Decode JSON data from file.
If format
is None
, encoding format is derived from path suffix.
Arguments:
- path: file path
- format: encoding format
139def encode_stream(data: Data, 140 stream: typing.Union[io.TextIOBase, io.RawIOBase], 141 format: Format = Format.JSON, 142 indent: typing.Optional[int] = 4): 143 """Encode JSON data to stream. 144 145 In case of TOML format, data must be JSON Object. 146 147 In case of TOML format, `stream` should be `io.RawIOBase`. For 148 other formats, `io.TextIOBase` is expected. 149 150 Args: 151 data: JSON data 152 stream: output stream 153 format: encoding format 154 indent: indentation size 155 156 """ 157 if format == Format.JSON: 158 json.dump(data, stream, indent=indent, allow_nan=False) 159 160 elif format == Format.YAML: 161 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 162 else yaml.SafeDumper) 163 yaml.dump(data, stream, indent=indent, Dumper=dumper, 164 explicit_start=True, explicit_end=True) 165 166 elif format == Format.TOML: 167 tomli_w.dump(data, stream) 168 169 else: 170 raise ValueError('unsupported format')
Encode JSON data to stream.
In case of TOML format, data must be JSON Object.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- data: JSON data
- stream: output stream
- format: encoding format
- indent: indentation size
173def decode_stream(stream: typing.Union[io.TextIOBase, io.RawIOBase], 174 format: Format = Format.JSON 175 ) -> Data: 176 """Decode JSON data from stream. 177 178 In case of TOML format, `stream` should be `io.RawIOBase`. For 179 other formats, `io.TextIOBase` is expected. 180 181 Args: 182 stream: input stream 183 format: encoding format 184 185 """ 186 if format == Format.JSON: 187 return json.load(stream) 188 189 if format == Format.YAML: 190 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 191 else yaml.SafeLoader) 192 return yaml.load(stream, Loader=loader) 193 194 if format == Format.TOML: 195 return tomli.load(stream) 196 197 raise ValueError('unsupported format')
Decode JSON data from stream.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- stream: input stream
- format: encoding format
9def diff(src: Data, 10 dst: Data 11 ) -> Data: 12 """Generate JSON Patch diff. 13 14 Example:: 15 16 src = [1, {'a': 2}, 3] 17 dst = [1, {'a': 4}, 3] 18 result = diff(src, dst) 19 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 20 21 """ 22 return jsonpatch.JsonPatch.from_diff(src, dst).patch
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
25def patch(data: Data, 26 diff: Data 27 ) -> Data: 28 """Apply JSON Patch diff. 29 30 Example:: 31 32 data = [1, {'a': 2}, 3] 33 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 34 result = patch(data, d) 35 assert result == [1, {'a': 4}, 3] 36 37 """ 38 return jsonpatch.apply_patch(data, diff)
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]
17class SchemaRepository: 18 """JSON Schema repository. 19 20 A repository that holds json schemas and enables validation against them. 21 22 Repository can be initialized with multiple arguments, which can be 23 instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``. 24 25 If an argument is of type ``pathlib.PurePath``, and path points to file 26 with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded 27 from the file. Otherwise, it is assumed that path points to a directory, 28 which is recursively searched for json and yaml files. All decoded schemas 29 are added to the repository. If a schema with the same `id` was previously 30 added, an exception is raised. 31 32 If an argument is of type ``Data``, it should be a json serializable data 33 representation of a JSON schema. If a schema with the same `id` was 34 previously added, an exception is raised. 35 36 If an argument is of type ``SchemaRepository``, its schemas are added to 37 the new repository. Previously added schemas with the same `id` are 38 replaced. 39 40 """ 41 42 def __init__(self, *args: typing.Union[pathlib.PurePath, 43 Data, 44 'SchemaRepository']): 45 self._validators = weakref.WeakValueDictionary() 46 self._data = {} 47 for arg in args: 48 if isinstance(arg, pathlib.PurePath): 49 self._load_path(arg) 50 elif isinstance(arg, SchemaRepository): 51 self._load_repository(arg) 52 else: 53 self._load_schema(arg) 54 55 def get_uri_schemes(self) -> typing.Iterable[str]: 56 """Get URI schemes stored in repository""" 57 return self._data.keys() 58 59 def get_schema_ids(self, 60 uri_schemes: typing.Optional[typing.Iterable[str]] = None # NOQA 61 ) -> typing.Iterable[str]: 62 """Get schema ids stored in repository 63 64 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 65 only schema ids that have one of provided URI scheme are returned. 66 67 """ 68 if uri_schemes is None: 69 uri_schemes = self._data.keys() 70 71 for uri_scheme in uri_schemes: 72 schemas = self._data.get(uri_scheme) 73 if not schemas: 74 continue 75 76 for path in schemas.keys(): 77 yield f'{uri_scheme}://{path}' 78 79 def get_schema(self, schema_id: str) -> Data: 80 """Get stored schema based on schema id""" 81 uri = urllib.parse.urlparse(schema_id) 82 path = uri.netloc + uri.path 83 return self._data[uri.scheme][path] 84 85 def validate(self, 86 schema_id: str, 87 data: Data, 88 validator_cls: typing.Type[Validator] = DefaultValidator): 89 """Validate data against JSON schema. 90 91 Args: 92 schema_id: JSON schema identifier 93 data: data to be validated 94 validator_cls: validator implementation 95 96 Raises: 97 Exception 98 99 """ 100 validator = self._validators.get(validator_cls) 101 if validator is None: 102 validator = validator_cls(self) 103 self._validators[validator_cls] = validator 104 105 validator.validate(schema_id, data) 106 107 def to_json(self) -> Data: 108 """Export repository content as json serializable data. 109 110 Entire repository content is exported as json serializable data. 111 New repository can be created from the exported content by using 112 :meth:`SchemaRepository.from_json`. 113 114 """ 115 return self._data 116 117 @staticmethod 118 def from_json(data: typing.Union[pathlib.PurePath, 119 Data] 120 ) -> 'SchemaRepository': 121 """Create new repository from content exported as json serializable 122 data. 123 124 Creates a new repository from content of another repository that was 125 exported by using :meth:`SchemaRepository.to_json`. 126 127 Args: 128 data: repository data 129 130 """ 131 if isinstance(data, pathlib.PurePath): 132 data = decode_file(data) 133 repo = SchemaRepository() 134 repo._data = data 135 return repo 136 137 def _load_path(self, path): 138 json_suffixes = {'.json', '.yaml', '.yml'} 139 paths = ([path] if path.suffix in json_suffixes 140 else list(itertools.chain.from_iterable( 141 path.rglob(f'*{i}') for i in json_suffixes))) 142 for path in paths: 143 schema = decode_file(path) 144 self._load_schema(schema) 145 146 def _load_schema(self, schema): 147 if '$schema' in schema: 148 meta_schema_id = urllib.parse.urldefrag(schema['$schema']).url 149 if meta_schema_id not in _meta_schema_ids: 150 schema = dict(schema) 151 del schema['$schema'] 152 153 uri = urllib.parse.urlparse(schema['id']) 154 path = uri.netloc + uri.path 155 if uri.scheme not in self._data: 156 self._data[uri.scheme] = {} 157 if path in self._data[uri.scheme]: 158 raise Exception(f"duplicate schema id {uri.scheme}://{path}") 159 self._data[uri.scheme][path] = schema 160 161 def _load_repository(self, repo): 162 for k, v in repo._data.items(): 163 if k not in self._data: 164 self._data[k] = v 165 else: 166 self._data[k].update(v)
JSON Schema repository.
A repository that holds json schemas and enables validation against them.
Repository can be initialized with multiple arguments, which can be
instances of pathlib.PurePath
, Data
or SchemaRepository
.
If an argument is of type pathlib.PurePath
, and path points to file
with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded
from the file. Otherwise, it is assumed that path points to a directory,
which is recursively searched for json and yaml files. All decoded schemas
are added to the repository. If a schema with the same id
was previously
added, an exception is raised.
If an argument is of type Data
, it should be a json serializable data
representation of a JSON schema. If a schema with the same id
was
previously added, an exception is raised.
If an argument is of type SchemaRepository
, its schemas are added to
the new repository. Previously added schemas with the same id
are
replaced.
42 def __init__(self, *args: typing.Union[pathlib.PurePath, 43 Data, 44 'SchemaRepository']): 45 self._validators = weakref.WeakValueDictionary() 46 self._data = {} 47 for arg in args: 48 if isinstance(arg, pathlib.PurePath): 49 self._load_path(arg) 50 elif isinstance(arg, SchemaRepository): 51 self._load_repository(arg) 52 else: 53 self._load_schema(arg)
55 def get_uri_schemes(self) -> typing.Iterable[str]: 56 """Get URI schemes stored in repository""" 57 return self._data.keys()
Get URI schemes stored in repository
59 def get_schema_ids(self, 60 uri_schemes: typing.Optional[typing.Iterable[str]] = None # NOQA 61 ) -> typing.Iterable[str]: 62 """Get schema ids stored in repository 63 64 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 65 only schema ids that have one of provided URI scheme are returned. 66 67 """ 68 if uri_schemes is None: 69 uri_schemes = self._data.keys() 70 71 for uri_scheme in uri_schemes: 72 schemas = self._data.get(uri_scheme) 73 if not schemas: 74 continue 75 76 for path in schemas.keys(): 77 yield f'{uri_scheme}://{path}'
Get schema ids stored in repository
If uri_schemes
is None
, all schema ids are returned. Otherwise,
only schema ids that have one of provided URI scheme are returned.
79 def get_schema(self, schema_id: str) -> Data: 80 """Get stored schema based on schema id""" 81 uri = urllib.parse.urlparse(schema_id) 82 path = uri.netloc + uri.path 83 return self._data[uri.scheme][path]
Get stored schema based on schema id
85 def validate(self, 86 schema_id: str, 87 data: Data, 88 validator_cls: typing.Type[Validator] = DefaultValidator): 89 """Validate data against JSON schema. 90 91 Args: 92 schema_id: JSON schema identifier 93 data: data to be validated 94 validator_cls: validator implementation 95 96 Raises: 97 Exception 98 99 """ 100 validator = self._validators.get(validator_cls) 101 if validator is None: 102 validator = validator_cls(self) 103 self._validators[validator_cls] = validator 104 105 validator.validate(schema_id, data)
Validate data against JSON schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
- validator_cls: validator implementation
Raises:
- Exception
107 def to_json(self) -> Data: 108 """Export repository content as json serializable data. 109 110 Entire repository content is exported as json serializable data. 111 New repository can be created from the exported content by using 112 :meth:`SchemaRepository.from_json`. 113 114 """ 115 return self._data
Export repository content as json serializable data.
Entire repository content is exported as json serializable data.
New repository can be created from the exported content by using
SchemaRepository.from_json()
.
117 @staticmethod 118 def from_json(data: typing.Union[pathlib.PurePath, 119 Data] 120 ) -> 'SchemaRepository': 121 """Create new repository from content exported as json serializable 122 data. 123 124 Creates a new repository from content of another repository that was 125 exported by using :meth:`SchemaRepository.to_json`. 126 127 Args: 128 data: repository data 129 130 """ 131 if isinstance(data, pathlib.PurePath): 132 data = decode_file(data) 133 repo = SchemaRepository() 134 repo._data = data 135 return repo
Create new repository from content exported as json serializable data.
Creates a new repository from content of another repository that was
exported by using SchemaRepository.to_json()
.
Arguments:
- data: repository data
28class Validator(typing.Protocol): 29 """JSON Schema validator interface 30 31 Args: 32 repo: repository containing JSON Schemas 33 34 """ 35 36 def __init__(self, repo: Repository): 37 ... 38 39 def validate(self, schema_id: str, data: Data): 40 """Validate data against JSON Schema. 41 42 Args: 43 schema_id: JSON schema identifier 44 data: data to be validated 45 46 Raises: 47 Exception 48 49 """
JSON Schema validator interface
Arguments:
- repo: repository containing JSON Schemas
1431def _no_init_or_replace_init(self, *args, **kwargs): 1432 cls = type(self) 1433 1434 if cls._is_protocol: 1435 raise TypeError('Protocols cannot be instantiated') 1436 1437 # Already using a custom `__init__`. No need to calculate correct 1438 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1439 if cls.__init__ is not _no_init_or_replace_init: 1440 return 1441 1442 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1443 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1444 # searches for a proper new `__init__` in the MRO. The new `__init__` 1445 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1446 # instantiation of the protocol subclass will thus use the new 1447 # `__init__` and no longer call `_no_init_or_replace_init`. 1448 for base in cls.__mro__: 1449 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1450 if init is not _no_init_or_replace_init: 1451 cls.__init__ = init 1452 break 1453 else: 1454 # should not happen 1455 cls.__init__ = object.__init__ 1456 1457 cls.__init__(self, *args, **kwargs)
39 def validate(self, schema_id: str, data: Data): 40 """Validate data against JSON Schema. 41 42 Args: 43 schema_id: JSON schema identifier 44 data: data to be validated 45 46 Raises: 47 Exception 48 49 """
Validate data against JSON Schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
Raises:
- Exception
10class JsonSchemaValidator: 11 12 def __init__(self, repo: Repository): 13 self._repo = repo 14 15 def validate(self, schema_id: str, data: Data): 16 uri = urllib.parse.urlparse(schema_id) 17 path = uri.netloc + uri.path 18 resolver = jsonschema.RefResolver( 19 base_uri=f'{uri.scheme}://{path}', 20 referrer=self._repo.get_schema(schema_id), 21 handlers={i: self._repo.get_schema 22 for i in self._repo.get_uri_schemes()}) 23 jsonschema.validate( 24 instance=data, 25 schema=resolver.resolve_fragment(resolver.referrer, uri.fragment), 26 resolver=resolver)
15 def validate(self, schema_id: str, data: Data): 16 uri = urllib.parse.urlparse(schema_id) 17 path = uri.netloc + uri.path 18 resolver = jsonschema.RefResolver( 19 base_uri=f'{uri.scheme}://{path}', 20 referrer=self._repo.get_schema(schema_id), 21 handlers={i: self._repo.get_schema 22 for i in self._repo.get_uri_schemes()}) 23 jsonschema.validate( 24 instance=data, 25 schema=resolver.resolve_fragment(resolver.referrer, uri.fragment), 26 resolver=resolver)