hat.json
JSON Data library
1"""JSON Data library""" 2 3from hat.json.data import (Array, 4 Object, 5 Data, 6 equals, 7 clone, 8 flatten) 9from hat.json.path import (Path, 10 get, 11 set_, 12 remove, 13 Storage) 14from hat.json.encoder import (Format, 15 encode, 16 decode, 17 get_file_format, 18 encode_file, 19 decode_file, 20 encode_stream, 21 decode_stream) 22from hat.json.patch import (diff, 23 patch) 24from hat.json.repository import (SchemaRepository, 25 json_schema_repo) 26from hat.json.validator import (Validator, 27 DefaultValidator, 28 JsonSchemaValidator) 29from hat.json import vt 30 31 32__all__ = ['Array', 33 'Object', 34 'Data', 35 'equals', 36 'clone', 37 'flatten', 38 'Path', 39 'get', 40 'set_', 41 'remove', 42 'Storage', 43 'Format', 44 'encode', 45 'decode', 46 'get_file_format', 47 'encode_file', 48 'decode_file', 49 'encode_stream', 50 'decode_stream', 51 'diff', 52 'patch', 53 'SchemaRepository', 54 'json_schema_repo', 55 'Validator', 56 'DefaultValidator', 57 'JsonSchemaValidator', 58 'vt']
17def equals(a: Data, 18 b: Data 19 ) -> bool: 20 """Equality comparison of json serializable data. 21 22 Tests for equality of data according to JSON format. Notably, ``bool`` 23 values are not considered equal to numeric values in any case. This is 24 different from default equality comparison, which considers `False` 25 equal to `0` and `0.0`; and `True` equal to `1` and `1.0`. 26 27 Example:: 28 29 assert equals(0, 0.0) is True 30 assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True 31 assert equals(1, True) is False 32 33 """ 34 if isinstance(a, bool) != isinstance(b, bool): 35 return False 36 if a != b: 37 return False 38 39 if isinstance(a, dict): 40 return all(equals(a[key], b[key]) for key in a) 41 elif isinstance(a, list): 42 return all(equals(i, j) for i, j in zip(a, b)) 43 else: 44 return True
Equality comparison of json serializable data.
Tests for equality of data according to JSON format. Notably, bool
values are not considered equal to numeric values in any case. This is
different from default equality comparison, which considers False
equal to 0
and 0.0
; and True
equal to 1
and 1.0
.
Example::
assert equals(0, 0.0) is True
assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
assert equals(1, True) is False
47def clone(data: Data) -> Data: 48 """Deep clone data 49 50 This function recursively creates new instances of array and object data 51 based on input data. Resulting json data is equal to provided data. 52 53 Example:: 54 55 x = {'a': [1, 2, 3]} 56 y = clone(x) 57 assert x is not y 58 assert x['a'] is not y['a'] 59 assert equals(x, y) 60 61 """ 62 if isinstance(data, list): 63 return [clone(i) for i in data] 64 65 if isinstance(data, dict): 66 return {k: clone(v) for k, v in data.items()} 67 68 return data
Deep clone data
This function recursively creates new instances of array and object data based on input data. Resulting json data is equal to provided data.
Example::
x = {'a': [1, 2, 3]}
y = clone(x)
assert x is not y
assert x['a'] is not y['a']
assert equals(x, y)
71def flatten(data: Data 72 ) -> typing.Iterable[Data]: 73 """Flatten JSON data 74 75 If `data` is array, this generator recursively yields result of `flatten` 76 call with each element of input list. For other `Data` types, input data is 77 yielded. 78 79 Example:: 80 81 data = [1, [], [2], {'a': [3]}] 82 result = [1, 2, {'a': [3]}] 83 assert list(flatten(data)) == result 84 85 """ 86 if isinstance(data, list): 87 for i in data: 88 yield from flatten(i) 89 else: 90 yield data
16def get(data: Data, 17 path: Path, 18 default: Data | None = None 19 ) -> Data: 20 """Get data element referenced by path 21 22 Example:: 23 24 data = {'a': [1, 2, [3, 4]]} 25 path = ['a', 2, 0] 26 assert get(data, path) == 3 27 28 data = [1, 2, 3] 29 assert get(data, 0) == 1 30 assert get(data, 5) is None 31 assert get(data, 5, default=123) == 123 32 33 """ 34 for i in flatten(path): 35 if isinstance(i, str): 36 if not isinstance(data, dict) or i not in data: 37 return default 38 data = data[i] 39 40 elif isinstance(i, int) and not isinstance(i, bool): 41 if not isinstance(data, list): 42 return default 43 try: 44 data = data[i] 45 except IndexError: 46 return default 47 48 else: 49 raise ValueError('invalid path') 50 51 return data
Get data element referenced by path
Example::
data = {'a': [1, 2, [3, 4]]}
path = ['a', 2, 0]
assert get(data, path) == 3
data = [1, 2, 3]
assert get(data, 0) == 1
assert get(data, 5) is None
assert get(data, 5, default=123) == 123
54def set_(data: Data, 55 path: Path, 56 value: Data 57 ) -> Data: 58 """Create new data by setting data path element value 59 60 Example:: 61 62 data = [1, {'a': 2, 'b': 3}, 4] 63 path = [1, 'b'] 64 result = set_(data, path, 5) 65 assert result == [1, {'a': 2, 'b': 5}, 4] 66 assert result is not data 67 68 data = [1, 2, 3] 69 result = set_(data, 4, 4) 70 assert result == [1, 2, 3, None, 4] 71 72 """ 73 parents = collections.deque() 74 75 for i in flatten(path): 76 parent = data 77 78 if isinstance(i, str): 79 data = data.get(i) if isinstance(data, dict) else None 80 81 elif isinstance(i, int) and not isinstance(i, bool): 82 try: 83 data = data[i] if isinstance(data, list) else None 84 except IndexError: 85 data = None 86 87 else: 88 raise ValueError('invalid path') 89 90 parents.append((parent, i)) 91 92 while parents: 93 parent, i = parents.pop() 94 95 if isinstance(i, str): 96 parent = dict(parent) if isinstance(parent, dict) else {} 97 parent[i] = value 98 99 elif isinstance(i, int) and not isinstance(i, bool): 100 if not isinstance(parent, list): 101 parent = [] 102 103 if i >= len(parent): 104 parent = [*parent, 105 *itertools.repeat(None, i - len(parent) + 1)] 106 107 elif i < 0 and (-i) > len(parent): 108 parent = [*itertools.repeat(None, (-i) - len(parent)), 109 *parent] 110 111 else: 112 parent = list(parent) 113 114 parent[i] = value 115 116 else: 117 raise ValueError('invalid path') 118 119 value = parent 120 121 return value
Create new data by setting data path element value
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = set_(data, path, 5)
assert result == [1, {'a': 2, 'b': 5}, 4]
assert result is not data
data = [1, 2, 3]
result = set_(data, 4, 4)
assert result == [1, 2, 3, None, 4]
124def remove(data: Data, 125 path: Path 126 ) -> Data: 127 """Create new data by removing part of data referenced by path 128 129 Example:: 130 131 data = [1, {'a': 2, 'b': 3}, 4] 132 path = [1, 'b'] 133 result = remove(data, path) 134 assert result == [1, {'a': 2}, 4] 135 assert result is not data 136 137 data = [1, 2, 3] 138 result = remove(data, 4) 139 assert result == [1, 2, 3] 140 141 """ 142 result = data 143 parents = collections.deque() 144 145 for i in flatten(path): 146 parent = data 147 148 if isinstance(i, str): 149 if not isinstance(data, dict) or i not in data: 150 return result 151 data = data[i] 152 153 elif isinstance(i, int) and not isinstance(i, bool): 154 if not isinstance(data, list): 155 return result 156 try: 157 data = data[i] 158 except IndexError: 159 return result 160 161 else: 162 raise ValueError('invalid path') 163 164 parents.append((parent, i)) 165 166 result = None 167 168 while parents: 169 parent, i = parents.pop() 170 171 if isinstance(i, str): 172 parent = dict(parent) 173 174 elif isinstance(i, int) and not isinstance(i, bool): 175 parent = list(parent) 176 177 else: 178 raise ValueError('invalid path') 179 180 if result is None: 181 del parent[i] 182 183 else: 184 parent[i] = result 185 186 result = parent 187 188 return result
Create new data by removing part of data referenced by path
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = remove(data, path)
assert result == [1, {'a': 2}, 4]
assert result is not data
data = [1, 2, 3]
result = remove(data, 4)
assert result == [1, 2, 3]
191class Storage: 192 """JSON data storage 193 194 Helper class representing observable JSON data state manipulated with 195 path based get/set/remove functions. 196 197 """ 198 199 def __init__(self, data: Data = None): 200 self._data = data 201 self._change_cbs = util.CallbackRegistry() 202 203 @property 204 def data(self) -> Data: 205 """Data""" 206 return self._data 207 208 def register_change_cb(self, 209 cb: typing.Callable[[Data], None] 210 ) -> util.RegisterCallbackHandle: 211 """Register data change callback""" 212 return self._change_cbs.register(cb) 213 214 def get(self, path: Path, default: Data | None = None): 215 """Get data""" 216 return get(self._data, path, default) 217 218 def set(self, path: Path, value: Data): 219 """Set data""" 220 self._data = set_(self._data, path, value) 221 self._change_cbs.notify(self._data) 222 223 def remove(self, path: Path): 224 """Remove data""" 225 self._data = remove(self._data, path) 226 self._change_cbs.notify(self._data)
JSON data storage
Helper class representing observable JSON data state manipulated with path based get/set/remove functions.
Data
208 def register_change_cb(self, 209 cb: typing.Callable[[Data], None] 210 ) -> util.RegisterCallbackHandle: 211 """Register data change callback""" 212 return self._change_cbs.register(cb)
Register data change callback
214 def get(self, path: Path, default: Data | None = None): 215 """Get data""" 216 return get(self._data, path, default)
Get data
218 def set(self, path: Path, value: Data): 219 """Set data""" 220 self._data = set_(self._data, path, value) 221 self._change_cbs.notify(self._data)
Set data
16class Format(enum.Enum): 17 """Encoding format""" 18 JSON = 'json' 19 YAML = 'yaml' 20 TOML = 'toml'
Encoding format
Inherited Members
- enum.Enum
- name
- value
23def encode(data: Data, 24 format: Format = Format.JSON, 25 indent: int | None = None 26 ) -> str: 27 """Encode JSON data. 28 29 In case of TOML format, data must be JSON Object. 30 31 Args: 32 data: JSON data 33 format: encoding format 34 indent: indentation size 35 36 """ 37 if format == Format.JSON: 38 return json.dumps(data, indent=indent, allow_nan=False) 39 40 if format == Format.YAML: 41 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 42 else yaml.SafeDumper) 43 return str(yaml.dump(data, indent=indent, Dumper=dumper)) 44 45 if format == Format.TOML: 46 return tomli_w.dumps(data) 47 48 raise ValueError('unsupported format')
Encode JSON data.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- format: encoding format
- indent: indentation size
51def decode(data_str: str, 52 format: Format = Format.JSON 53 ) -> Data: 54 """Decode JSON data. 55 56 Args: 57 data_str: encoded JSON data 58 format: encoding format 59 60 """ 61 if format == Format.JSON: 62 return json.loads(data_str) 63 64 if format == Format.YAML: 65 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 66 else yaml.SafeLoader) 67 return yaml.load(io.StringIO(data_str), Loader=loader) 68 69 if format == Format.TOML: 70 return tomli.loads(data_str) 71 72 raise ValueError('unsupported format')
Decode JSON data.
Arguments:
- data_str: encoded JSON data
- format: encoding format
75def get_file_format(path: pathlib.PurePath) -> Format: 76 """Detect file format based on path suffix""" 77 if path.suffix == '.json': 78 return Format.JSON 79 80 if path.suffix in ('.yaml', '.yml'): 81 return Format.YAML 82 83 if path.suffix == '.toml': 84 return Format.TOML 85 86 raise ValueError('can not determine format from path suffix')
Detect file format based on path suffix
89def encode_file(data: Data, 90 path: pathlib.PurePath, 91 format: Format | None = None, 92 indent: int | None = 4): 93 """Encode JSON data to file. 94 95 If `format` is ``None``, encoding format is derived from path suffix. 96 97 In case of TOML format, data must be JSON Object. 98 99 Args: 100 data: JSON data 101 path: file path 102 format: encoding format 103 indent: indentation size 104 105 """ 106 if format is None: 107 format = get_file_format(path) 108 109 flags = 'w' if format != Format.TOML else 'wb' 110 encoding = 'utf-8' if format != Format.TOML else None 111 112 with open(path, flags, encoding=encoding) as f: 113 encode_stream(data, f, format, indent)
Encode JSON data to file.
If format
is None
, encoding format is derived from path suffix.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- path: file path
- format: encoding format
- indent: indentation size
116def decode_file(path: pathlib.PurePath, 117 format: Format | None = None 118 ) -> Data: 119 """Decode JSON data from file. 120 121 If `format` is ``None``, encoding format is derived from path suffix. 122 123 Args: 124 path: file path 125 format: encoding format 126 127 """ 128 if format is None: 129 format = get_file_format(path) 130 131 flags = 'r' if format != Format.TOML else 'rb' 132 encoding = 'utf-8' if format != Format.TOML else None 133 134 with open(path, flags, encoding=encoding) as f: 135 return decode_stream(f, format)
Decode JSON data from file.
If format
is None
, encoding format is derived from path suffix.
Arguments:
- path: file path
- format: encoding format
138def encode_stream(data: Data, 139 stream: io.TextIOBase | io.RawIOBase, 140 format: Format = Format.JSON, 141 indent: int | None = 4): 142 """Encode JSON data to stream. 143 144 In case of TOML format, data must be JSON Object. 145 146 In case of TOML format, `stream` should be `io.RawIOBase`. For 147 other formats, `io.TextIOBase` is expected. 148 149 Args: 150 data: JSON data 151 stream: output stream 152 format: encoding format 153 indent: indentation size 154 155 """ 156 if format == Format.JSON: 157 json.dump(data, stream, indent=indent, allow_nan=False) 158 159 elif format == Format.YAML: 160 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 161 else yaml.SafeDumper) 162 yaml.dump(data, stream, indent=indent, Dumper=dumper, 163 explicit_start=True, explicit_end=True) 164 165 elif format == Format.TOML: 166 tomli_w.dump(data, stream) 167 168 else: 169 raise ValueError('unsupported format')
Encode JSON data to stream.
In case of TOML format, data must be JSON Object.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- data: JSON data
- stream: output stream
- format: encoding format
- indent: indentation size
172def decode_stream(stream: io.TextIOBase | io.RawIOBase, 173 format: Format = Format.JSON 174 ) -> Data: 175 """Decode JSON data from stream. 176 177 In case of TOML format, `stream` should be `io.RawIOBase`. For 178 other formats, `io.TextIOBase` is expected. 179 180 Args: 181 stream: input stream 182 format: encoding format 183 184 """ 185 if format == Format.JSON: 186 return json.load(stream) 187 188 if format == Format.YAML: 189 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 190 else yaml.SafeLoader) 191 return yaml.load(stream, Loader=loader) 192 193 if format == Format.TOML: 194 return tomli.load(stream) 195 196 raise ValueError('unsupported format')
Decode JSON data from stream.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- stream: input stream
- format: encoding format
9def diff(src: Data, 10 dst: Data 11 ) -> Data: 12 """Generate JSON Patch diff. 13 14 Example:: 15 16 src = [1, {'a': 2}, 3] 17 dst = [1, {'a': 4}, 3] 18 result = diff(src, dst) 19 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 20 21 """ 22 return jsonpatch.JsonPatch.from_diff(src, dst).patch
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
25def patch(data: Data, 26 diff: Data 27 ) -> Data: 28 """Apply JSON Patch diff. 29 30 Example:: 31 32 data = [1, {'a': 2}, 3] 33 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 34 result = patch(data, d) 35 assert result == [1, {'a': 4}, 3] 36 37 """ 38 return jsonpatch.apply_patch(data, diff)
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]
16class SchemaRepository: 17 """JSON Schema repository. 18 19 A repository that holds json schemas and enables validation against them. 20 21 Repository can be initialized with multiple arguments, which can be 22 instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``. 23 24 If an argument is of type ``pathlib.PurePath``, and path points to file 25 with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded 26 from the file. Otherwise, it is assumed that path points to a directory, 27 which is recursively searched for json and yaml files. All decoded schemas 28 are added to the repository. If a schema with the same `id` was previously 29 added, an exception is raised. 30 31 If an argument is of type ``Data``, it should be a json serializable data 32 representation of a JSON schema. If a schema with the same `id` was 33 previously added, an exception is raised. 34 35 If an argument is of type ``SchemaRepository``, its schemas are added to 36 the new repository. Previously added schemas with the same `id` are 37 replaced. 38 39 """ 40 41 def __init__(self, *args: typing.Union[pathlib.PurePath, 42 Data, 43 'SchemaRepository']): 44 self._validators = weakref.WeakValueDictionary() 45 self._data = {} 46 for arg in args: 47 if isinstance(arg, pathlib.PurePath): 48 self._load_path(arg) 49 elif isinstance(arg, SchemaRepository): 50 self._load_repository(arg) 51 else: 52 self._load_schema(arg) 53 54 def get_uri_schemes(self) -> typing.Iterable[str]: 55 """Get URI schemes stored in repository""" 56 return self._data.keys() 57 58 def get_schema_ids(self, 59 uri_schemes: typing.Iterable[str] | None = None 60 ) -> typing.Iterable[str]: 61 """Get schema ids stored in repository 62 63 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 64 only schema ids that have one of provided URI scheme are returned. 65 66 """ 67 if uri_schemes is None: 68 uri_schemes = self._data.keys() 69 70 for uri_scheme in uri_schemes: 71 schemas = self._data.get(uri_scheme) 72 if not schemas: 73 continue 74 75 for path in schemas.keys(): 76 yield f'{uri_scheme}://{path}' 77 78 def get_schema(self, schema_id: str) -> Data: 79 """Get stored schema based on schema id""" 80 uri = urllib.parse.urlparse(schema_id) 81 path = uri.netloc + uri.path 82 return self._data[uri.scheme][path] 83 84 def validate(self, 85 schema_id: str, 86 data: Data, 87 validator_cls: typing.Type[Validator] = DefaultValidator): 88 """Validate data against JSON schema. 89 90 Args: 91 schema_id: JSON schema identifier 92 data: data to be validated 93 validator_cls: validator implementation 94 95 Raises: 96 Exception 97 98 """ 99 validator = self._validators.get(validator_cls) 100 if validator is None: 101 validator = validator_cls(self) 102 self._validators[validator_cls] = validator 103 104 validator.validate(schema_id, data) 105 106 def to_json(self) -> Data: 107 """Export repository content as json serializable data. 108 109 Entire repository content is exported as json serializable data. 110 New repository can be created from the exported content by using 111 :meth:`SchemaRepository.from_json`. 112 113 """ 114 return self._data 115 116 @staticmethod 117 def from_json(data: pathlib.PurePath | Data 118 ) -> 'SchemaRepository': 119 """Create new repository from content exported as json serializable 120 data. 121 122 Creates a new repository from content of another repository that was 123 exported by using :meth:`SchemaRepository.to_json`. 124 125 Args: 126 data: repository data 127 128 """ 129 if isinstance(data, pathlib.PurePath): 130 data = decode_file(data) 131 repo = SchemaRepository() 132 repo._data = data 133 return repo 134 135 def _load_path(self, path): 136 json_suffixes = {'.json', '.yaml', '.yml'} 137 paths = ([path] if path.suffix in json_suffixes 138 else list(itertools.chain.from_iterable( 139 path.rglob(f'*{i}') for i in json_suffixes))) 140 for path in paths: 141 schema = decode_file(path) 142 self._load_schema(schema) 143 144 def _load_schema(self, schema): 145 if '$schema' in schema: 146 meta_schema_id = urllib.parse.urldefrag(schema['$schema']).url 147 if meta_schema_id not in _meta_schema_ids: 148 schema = dict(schema) 149 del schema['$schema'] 150 151 uri = urllib.parse.urlparse(schema['id']) 152 path = uri.netloc + uri.path 153 if uri.scheme not in self._data: 154 self._data[uri.scheme] = {} 155 if path in self._data[uri.scheme]: 156 raise Exception(f"duplicate schema id {uri.scheme}://{path}") 157 self._data[uri.scheme][path] = schema 158 159 def _load_repository(self, repo): 160 for k, v in repo._data.items(): 161 if k not in self._data: 162 self._data[k] = v 163 else: 164 self._data[k].update(v)
JSON Schema repository.
A repository that holds json schemas and enables validation against them.
Repository can be initialized with multiple arguments, which can be
instances of pathlib.PurePath
, Data
or SchemaRepository
.
If an argument is of type pathlib.PurePath
, and path points to file
with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded
from the file. Otherwise, it is assumed that path points to a directory,
which is recursively searched for json and yaml files. All decoded schemas
are added to the repository. If a schema with the same id
was previously
added, an exception is raised.
If an argument is of type Data
, it should be a json serializable data
representation of a JSON schema. If a schema with the same id
was
previously added, an exception is raised.
If an argument is of type SchemaRepository
, its schemas are added to
the new repository. Previously added schemas with the same id
are
replaced.
41 def __init__(self, *args: typing.Union[pathlib.PurePath, 42 Data, 43 'SchemaRepository']): 44 self._validators = weakref.WeakValueDictionary() 45 self._data = {} 46 for arg in args: 47 if isinstance(arg, pathlib.PurePath): 48 self._load_path(arg) 49 elif isinstance(arg, SchemaRepository): 50 self._load_repository(arg) 51 else: 52 self._load_schema(arg)
54 def get_uri_schemes(self) -> typing.Iterable[str]: 55 """Get URI schemes stored in repository""" 56 return self._data.keys()
Get URI schemes stored in repository
58 def get_schema_ids(self, 59 uri_schemes: typing.Iterable[str] | None = None 60 ) -> typing.Iterable[str]: 61 """Get schema ids stored in repository 62 63 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 64 only schema ids that have one of provided URI scheme are returned. 65 66 """ 67 if uri_schemes is None: 68 uri_schemes = self._data.keys() 69 70 for uri_scheme in uri_schemes: 71 schemas = self._data.get(uri_scheme) 72 if not schemas: 73 continue 74 75 for path in schemas.keys(): 76 yield f'{uri_scheme}://{path}'
Get schema ids stored in repository
If uri_schemes
is None
, all schema ids are returned. Otherwise,
only schema ids that have one of provided URI scheme are returned.
78 def get_schema(self, schema_id: str) -> Data: 79 """Get stored schema based on schema id""" 80 uri = urllib.parse.urlparse(schema_id) 81 path = uri.netloc + uri.path 82 return self._data[uri.scheme][path]
Get stored schema based on schema id
84 def validate(self, 85 schema_id: str, 86 data: Data, 87 validator_cls: typing.Type[Validator] = DefaultValidator): 88 """Validate data against JSON schema. 89 90 Args: 91 schema_id: JSON schema identifier 92 data: data to be validated 93 validator_cls: validator implementation 94 95 Raises: 96 Exception 97 98 """ 99 validator = self._validators.get(validator_cls) 100 if validator is None: 101 validator = validator_cls(self) 102 self._validators[validator_cls] = validator 103 104 validator.validate(schema_id, data)
Validate data against JSON schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
- validator_cls: validator implementation
Raises:
- Exception
106 def to_json(self) -> Data: 107 """Export repository content as json serializable data. 108 109 Entire repository content is exported as json serializable data. 110 New repository can be created from the exported content by using 111 :meth:`SchemaRepository.from_json`. 112 113 """ 114 return self._data
Export repository content as json serializable data.
Entire repository content is exported as json serializable data.
New repository can be created from the exported content by using
SchemaRepository.from_json()
.
116 @staticmethod 117 def from_json(data: pathlib.PurePath | Data 118 ) -> 'SchemaRepository': 119 """Create new repository from content exported as json serializable 120 data. 121 122 Creates a new repository from content of another repository that was 123 exported by using :meth:`SchemaRepository.to_json`. 124 125 Args: 126 data: repository data 127 128 """ 129 if isinstance(data, pathlib.PurePath): 130 data = decode_file(data) 131 repo = SchemaRepository() 132 repo._data = data 133 return repo
Create new repository from content exported as json serializable data.
Creates a new repository from content of another repository that was
exported by using SchemaRepository.to_json()
.
Arguments:
- data: repository data
28class Validator(typing.Protocol): 29 """JSON Schema validator interface 30 31 Args: 32 repo: repository containing JSON Schemas 33 34 """ 35 36 def __init__(self, repo: Repository): 37 ... 38 39 def validate(self, schema_id: str, data: Data): 40 """Validate data against JSON Schema. 41 42 Args: 43 schema_id: JSON schema identifier 44 data: data to be validated 45 46 Raises: 47 Exception 48 49 """
JSON Schema validator interface
Arguments:
- repo: repository containing JSON Schemas
1431def _no_init_or_replace_init(self, *args, **kwargs): 1432 cls = type(self) 1433 1434 if cls._is_protocol: 1435 raise TypeError('Protocols cannot be instantiated') 1436 1437 # Already using a custom `__init__`. No need to calculate correct 1438 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1439 if cls.__init__ is not _no_init_or_replace_init: 1440 return 1441 1442 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1443 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1444 # searches for a proper new `__init__` in the MRO. The new `__init__` 1445 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1446 # instantiation of the protocol subclass will thus use the new 1447 # `__init__` and no longer call `_no_init_or_replace_init`. 1448 for base in cls.__mro__: 1449 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1450 if init is not _no_init_or_replace_init: 1451 cls.__init__ = init 1452 break 1453 else: 1454 # should not happen 1455 cls.__init__ = object.__init__ 1456 1457 cls.__init__(self, *args, **kwargs)
39 def validate(self, schema_id: str, data: Data): 40 """Validate data against JSON Schema. 41 42 Args: 43 schema_id: JSON schema identifier 44 data: data to be validated 45 46 Raises: 47 Exception 48 49 """
Validate data against JSON Schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
Raises:
- Exception
10class JsonSchemaValidator: 11 12 def __init__(self, repo: Repository): 13 self._repo = repo 14 15 def validate(self, schema_id: str, data: Data): 16 uri = urllib.parse.urlparse(schema_id) 17 path = uri.netloc + uri.path 18 resolver = jsonschema.RefResolver( 19 base_uri=f'{uri.scheme}://{path}', 20 referrer=self._repo.get_schema(schema_id), 21 handlers={i: self._repo.get_schema 22 for i in self._repo.get_uri_schemes()}) 23 jsonschema.validate( 24 instance=data, 25 schema=resolver.resolve_fragment(resolver.referrer, uri.fragment), 26 resolver=resolver)
15 def validate(self, schema_id: str, data: Data): 16 uri = urllib.parse.urlparse(schema_id) 17 path = uri.netloc + uri.path 18 resolver = jsonschema.RefResolver( 19 base_uri=f'{uri.scheme}://{path}', 20 referrer=self._repo.get_schema(schema_id), 21 handlers={i: self._repo.get_schema 22 for i in self._repo.get_uri_schemes()}) 23 jsonschema.validate( 24 instance=data, 25 schema=resolver.resolve_fragment(resolver.referrer, uri.fragment), 26 resolver=resolver)