hat.json
JSON Data library
1"""JSON Data library""" 2 3from hat.json.data import (Array, 4 Object, 5 Data, 6 equals, 7 clone, 8 flatten) 9from hat.json.path import (Path, 10 get, 11 set_, 12 remove, 13 Storage) 14from hat.json.encoder import (Format, 15 encode, 16 decode, 17 get_file_format, 18 encode_file, 19 decode_file, 20 encode_stream, 21 decode_stream, 22 read_conf) 23from hat.json.patch import (diff, 24 patch) 25from hat.json.repository import (SchemaRepository, 26 json_schema_repo) 27from hat.json.validator import (Validator, 28 DefaultValidator, 29 JsonSchemaValidator) 30from hat.json import vt 31 32 33__all__ = ['Array', 34 'Object', 35 'Data', 36 'equals', 37 'clone', 38 'flatten', 39 'Path', 40 'get', 41 'set_', 42 'remove', 43 'Storage', 44 'Format', 45 'encode', 46 'decode', 47 'get_file_format', 48 'encode_file', 49 'decode_file', 50 'encode_stream', 51 'decode_stream', 52 'read_conf', 53 'diff', 54 'patch', 55 'SchemaRepository', 56 'json_schema_repo', 57 'Validator', 58 'DefaultValidator', 59 'JsonSchemaValidator', 60 'vt']
17def equals(a: Data, 18 b: Data 19 ) -> bool: 20 """Equality comparison of json serializable data. 21 22 Tests for equality of data according to JSON format. Notably, ``bool`` 23 values are not considered equal to numeric values in any case. This is 24 different from default equality comparison, which considers `False` 25 equal to `0` and `0.0`; and `True` equal to `1` and `1.0`. 26 27 Example:: 28 29 assert equals(0, 0.0) is True 30 assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True 31 assert equals(1, True) is False 32 33 """ 34 if isinstance(a, bool) != isinstance(b, bool): 35 return False 36 if a != b: 37 return False 38 39 if isinstance(a, dict): 40 return all(equals(a[key], b[key]) for key in a) 41 elif isinstance(a, list): 42 return all(equals(i, j) for i, j in zip(a, b)) 43 else: 44 return True
Equality comparison of json serializable data.
Tests for equality of data according to JSON format. Notably, bool
values are not considered equal to numeric values in any case. This is
different from default equality comparison, which considers False
equal to 0
and 0.0
; and True
equal to 1
and 1.0
.
Example::
assert equals(0, 0.0) is True
assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
assert equals(1, True) is False
47def clone(data: Data) -> Data: 48 """Deep clone data 49 50 This function recursively creates new instances of array and object data 51 based on input data. Resulting json data is equal to provided data. 52 53 Example:: 54 55 x = {'a': [1, 2, 3]} 56 y = clone(x) 57 assert x is not y 58 assert x['a'] is not y['a'] 59 assert equals(x, y) 60 61 """ 62 if isinstance(data, list): 63 return [clone(i) for i in data] 64 65 if isinstance(data, dict): 66 return {k: clone(v) for k, v in data.items()} 67 68 return data
Deep clone data
This function recursively creates new instances of array and object data based on input data. Resulting json data is equal to provided data.
Example::
x = {'a': [1, 2, 3]}
y = clone(x)
assert x is not y
assert x['a'] is not y['a']
assert equals(x, y)
71def flatten(data: Data 72 ) -> typing.Iterable[Data]: 73 """Flatten JSON data 74 75 If `data` is array, this generator recursively yields result of `flatten` 76 call with each element of input list. For other `Data` types, input data is 77 yielded. 78 79 Example:: 80 81 data = [1, [], [2], {'a': [3]}] 82 result = [1, 2, {'a': [3]}] 83 assert list(flatten(data)) == result 84 85 """ 86 if isinstance(data, list): 87 for i in data: 88 yield from flatten(i) 89 else: 90 yield data
16def get(data: Data, 17 path: Path, 18 default: Data | None = None 19 ) -> Data: 20 """Get data element referenced by path 21 22 Example:: 23 24 data = {'a': [1, 2, [3, 4]]} 25 path = ['a', 2, 0] 26 assert get(data, path) == 3 27 28 data = [1, 2, 3] 29 assert get(data, 0) == 1 30 assert get(data, 5) is None 31 assert get(data, 5, default=123) == 123 32 33 """ 34 for i in flatten(path): 35 if isinstance(i, str): 36 if not isinstance(data, dict) or i not in data: 37 return default 38 data = data[i] 39 40 elif isinstance(i, int) and not isinstance(i, bool): 41 if not isinstance(data, list): 42 return default 43 try: 44 data = data[i] 45 except IndexError: 46 return default 47 48 else: 49 raise ValueError('invalid path') 50 51 return data
Get data element referenced by path
Example::
data = {'a': [1, 2, [3, 4]]}
path = ['a', 2, 0]
assert get(data, path) == 3
data = [1, 2, 3]
assert get(data, 0) == 1
assert get(data, 5) is None
assert get(data, 5, default=123) == 123
54def set_(data: Data, 55 path: Path, 56 value: Data 57 ) -> Data: 58 """Create new data by setting data path element value 59 60 Example:: 61 62 data = [1, {'a': 2, 'b': 3}, 4] 63 path = [1, 'b'] 64 result = set_(data, path, 5) 65 assert result == [1, {'a': 2, 'b': 5}, 4] 66 assert result is not data 67 68 data = [1, 2, 3] 69 result = set_(data, 4, 4) 70 assert result == [1, 2, 3, None, 4] 71 72 """ 73 parents = collections.deque() 74 75 for i in flatten(path): 76 parent = data 77 78 if isinstance(i, str): 79 data = data.get(i) if isinstance(data, dict) else None 80 81 elif isinstance(i, int) and not isinstance(i, bool): 82 try: 83 data = data[i] if isinstance(data, list) else None 84 except IndexError: 85 data = None 86 87 else: 88 raise ValueError('invalid path') 89 90 parents.append((parent, i)) 91 92 while parents: 93 parent, i = parents.pop() 94 95 if isinstance(i, str): 96 parent = dict(parent) if isinstance(parent, dict) else {} 97 parent[i] = value 98 99 elif isinstance(i, int) and not isinstance(i, bool): 100 if not isinstance(parent, list): 101 parent = [] 102 103 if i >= len(parent): 104 parent = [*parent, 105 *itertools.repeat(None, i - len(parent) + 1)] 106 107 elif i < 0 and (-i) > len(parent): 108 parent = [*itertools.repeat(None, (-i) - len(parent)), 109 *parent] 110 111 else: 112 parent = list(parent) 113 114 parent[i] = value 115 116 else: 117 raise ValueError('invalid path') 118 119 value = parent 120 121 return value
Create new data by setting data path element value
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = set_(data, path, 5)
assert result == [1, {'a': 2, 'b': 5}, 4]
assert result is not data
data = [1, 2, 3]
result = set_(data, 4, 4)
assert result == [1, 2, 3, None, 4]
124def remove(data: Data, 125 path: Path 126 ) -> Data: 127 """Create new data by removing part of data referenced by path 128 129 Example:: 130 131 data = [1, {'a': 2, 'b': 3}, 4] 132 path = [1, 'b'] 133 result = remove(data, path) 134 assert result == [1, {'a': 2}, 4] 135 assert result is not data 136 137 data = [1, 2, 3] 138 result = remove(data, 4) 139 assert result == [1, 2, 3] 140 141 """ 142 result = data 143 parents = collections.deque() 144 145 for i in flatten(path): 146 parent = data 147 148 if isinstance(i, str): 149 if not isinstance(data, dict) or i not in data: 150 return result 151 data = data[i] 152 153 elif isinstance(i, int) and not isinstance(i, bool): 154 if not isinstance(data, list): 155 return result 156 try: 157 data = data[i] 158 except IndexError: 159 return result 160 161 else: 162 raise ValueError('invalid path') 163 164 parents.append((parent, i)) 165 166 result = None 167 168 while parents: 169 parent, i = parents.pop() 170 171 if isinstance(i, str): 172 parent = dict(parent) 173 174 elif isinstance(i, int) and not isinstance(i, bool): 175 parent = list(parent) 176 177 else: 178 raise ValueError('invalid path') 179 180 if result is None: 181 del parent[i] 182 183 else: 184 parent[i] = result 185 186 result = parent 187 188 return result
Create new data by removing part of data referenced by path
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = remove(data, path)
assert result == [1, {'a': 2}, 4]
assert result is not data
data = [1, 2, 3]
result = remove(data, 4)
assert result == [1, 2, 3]
191class Storage: 192 """JSON data storage 193 194 Helper class representing observable JSON data state manipulated with 195 path based get/set/remove functions. 196 197 """ 198 199 def __init__(self, data: Data = None): 200 self._data = data 201 self._change_cbs = util.CallbackRegistry() 202 203 @property 204 def data(self) -> Data: 205 """Data""" 206 return self._data 207 208 def register_change_cb(self, 209 cb: typing.Callable[[Data], None] 210 ) -> util.RegisterCallbackHandle: 211 """Register data change callback""" 212 return self._change_cbs.register(cb) 213 214 def get(self, path: Path, default: Data | None = None): 215 """Get data""" 216 return get(self._data, path, default) 217 218 def set(self, path: Path, value: Data): 219 """Set data""" 220 self._data = set_(self._data, path, value) 221 self._change_cbs.notify(self._data) 222 223 def remove(self, path: Path): 224 """Remove data""" 225 self._data = remove(self._data, path) 226 self._change_cbs.notify(self._data)
JSON data storage
Helper class representing observable JSON data state manipulated with path based get/set/remove functions.
Data
208 def register_change_cb(self, 209 cb: typing.Callable[[Data], None] 210 ) -> util.RegisterCallbackHandle: 211 """Register data change callback""" 212 return self._change_cbs.register(cb)
Register data change callback
214 def get(self, path: Path, default: Data | None = None): 215 """Get data""" 216 return get(self._data, path, default)
Get data
218 def set(self, path: Path, value: Data): 219 """Set data""" 220 self._data = set_(self._data, path, value) 221 self._change_cbs.notify(self._data)
Set data
17class Format(enum.Enum): 18 """Encoding format""" 19 JSON = 'json' 20 YAML = 'yaml' 21 TOML = 'toml'
Encoding format
Inherited Members
- enum.Enum
- name
- value
24def encode(data: Data, 25 format: Format = Format.JSON, 26 indent: int | None = None 27 ) -> str: 28 """Encode JSON data. 29 30 In case of TOML format, data must be JSON Object. 31 32 Args: 33 data: JSON data 34 format: encoding format 35 indent: indentation size 36 37 """ 38 if format == Format.JSON: 39 return json.dumps(data, indent=indent, allow_nan=False) 40 41 if format == Format.YAML: 42 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 43 else yaml.SafeDumper) 44 return str(yaml.dump(data, indent=indent, Dumper=dumper)) 45 46 if format == Format.TOML: 47 return tomli_w.dumps(data) 48 49 raise ValueError('unsupported format')
Encode JSON data.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- format: encoding format
- indent: indentation size
52def decode(data_str: str, 53 format: Format = Format.JSON 54 ) -> Data: 55 """Decode JSON data. 56 57 Args: 58 data_str: encoded JSON data 59 format: encoding format 60 61 """ 62 if format == Format.JSON: 63 return json.loads(data_str) 64 65 if format == Format.YAML: 66 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 67 else yaml.SafeLoader) 68 return yaml.load(io.StringIO(data_str), Loader=loader) 69 70 if format == Format.TOML: 71 return tomli.loads(data_str) 72 73 raise ValueError('unsupported format')
Decode JSON data.
Arguments:
- data_str: encoded JSON data
- format: encoding format
76def get_file_format(path: pathlib.PurePath) -> Format: 77 """Detect file format based on path suffix""" 78 if path.suffix == '.json': 79 return Format.JSON 80 81 if path.suffix in ('.yaml', '.yml'): 82 return Format.YAML 83 84 if path.suffix == '.toml': 85 return Format.TOML 86 87 raise ValueError('can not determine format from path suffix')
Detect file format based on path suffix
90def encode_file(data: Data, 91 path: pathlib.PurePath, 92 format: Format | None = None, 93 indent: int | None = 4): 94 """Encode JSON data to file. 95 96 If `format` is ``None``, encoding format is derived from path suffix. 97 98 In case of TOML format, data must be JSON Object. 99 100 Args: 101 data: JSON data 102 path: file path 103 format: encoding format 104 indent: indentation size 105 106 """ 107 if format is None: 108 format = get_file_format(path) 109 110 flags = 'w' if format != Format.TOML else 'wb' 111 encoding = 'utf-8' if format != Format.TOML else None 112 113 with open(path, flags, encoding=encoding) as f: 114 encode_stream(data, f, format, indent)
Encode JSON data to file.
If format
is None
, encoding format is derived from path suffix.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- path: file path
- format: encoding format
- indent: indentation size
117def decode_file(path: pathlib.PurePath, 118 format: Format | None = None 119 ) -> Data: 120 """Decode JSON data from file. 121 122 If `format` is ``None``, encoding format is derived from path suffix. 123 124 Args: 125 path: file path 126 format: encoding format 127 128 """ 129 if format is None: 130 format = get_file_format(path) 131 132 flags = 'r' if format != Format.TOML else 'rb' 133 encoding = 'utf-8' if format != Format.TOML else None 134 135 with open(path, flags, encoding=encoding) as f: 136 return decode_stream(f, format)
Decode JSON data from file.
If format
is None
, encoding format is derived from path suffix.
Arguments:
- path: file path
- format: encoding format
139def encode_stream(data: Data, 140 stream: io.TextIOBase | io.RawIOBase, 141 format: Format = Format.JSON, 142 indent: int | None = 4): 143 """Encode JSON data to stream. 144 145 In case of TOML format, data must be JSON Object. 146 147 In case of TOML format, `stream` should be `io.RawIOBase`. For 148 other formats, `io.TextIOBase` is expected. 149 150 Args: 151 data: JSON data 152 stream: output stream 153 format: encoding format 154 indent: indentation size 155 156 """ 157 if format == Format.JSON: 158 json.dump(data, stream, indent=indent, allow_nan=False) 159 160 elif format == Format.YAML: 161 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 162 else yaml.SafeDumper) 163 yaml.dump(data, stream, indent=indent, Dumper=dumper, 164 explicit_start=True, explicit_end=True) 165 166 elif format == Format.TOML: 167 tomli_w.dump(data, stream) 168 169 else: 170 raise ValueError('unsupported format')
Encode JSON data to stream.
In case of TOML format, data must be JSON Object.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- data: JSON data
- stream: output stream
- format: encoding format
- indent: indentation size
173def decode_stream(stream: io.TextIOBase | io.RawIOBase, 174 format: Format = Format.JSON 175 ) -> Data: 176 """Decode JSON data from stream. 177 178 In case of TOML format, `stream` should be `io.RawIOBase`. For 179 other formats, `io.TextIOBase` is expected. 180 181 Args: 182 stream: input stream 183 format: encoding format 184 185 """ 186 if format == Format.JSON: 187 return json.load(stream) 188 189 if format == Format.YAML: 190 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 191 else yaml.SafeLoader) 192 return yaml.load(stream, Loader=loader) 193 194 if format == Format.TOML: 195 return tomli.load(stream) 196 197 raise ValueError('unsupported format')
Decode JSON data from stream.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- stream: input stream
- format: encoding format
200def read_conf(path: pathlib.Path | None, 201 default_path: pathlib.Path | None = None, 202 default_suffixes: list[str] = ['.yaml', '.yml', '.toml', '.json'], # NOQA 203 stdio_path: pathlib.Path | None = pathlib.Path('-') 204 ) -> Data: 205 """Read configuration formated as JSON data""" 206 if stdio_path and path == stdio_path: 207 return decode_stream(sys.stdin) 208 209 if path: 210 return decode_file(path) 211 212 if not default_path: 213 raise Exception('invalid configuration path') 214 215 for suffix in default_suffixes: 216 path = default_path.with_suffix(suffix) 217 if path.exists(): 218 break 219 220 return decode_file(path)
Read configuration formated as JSON data
9def diff(src: Data, 10 dst: Data 11 ) -> Data: 12 """Generate JSON Patch diff. 13 14 Example:: 15 16 src = [1, {'a': 2}, 3] 17 dst = [1, {'a': 4}, 3] 18 result = diff(src, dst) 19 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 20 21 """ 22 return jsonpatch.JsonPatch.from_diff(src, dst).patch
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
25def patch(data: Data, 26 diff: Data 27 ) -> Data: 28 """Apply JSON Patch diff. 29 30 Example:: 31 32 data = [1, {'a': 2}, 3] 33 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 34 result = patch(data, d) 35 assert result == [1, {'a': 4}, 3] 36 37 """ 38 return jsonpatch.apply_patch(data, diff)
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]
17class SchemaRepository: 18 """JSON Schema repository. 19 20 A repository that holds json schemas and enables validation against them. 21 22 Repository can be initialized with multiple arguments, which can be 23 instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``. 24 25 If an argument is of type ``pathlib.PurePath``, and path points to file 26 with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded 27 from the file. Otherwise, it is assumed that path points to a directory, 28 which is recursively searched for json and yaml files. All decoded schemas 29 are added to the repository. If a schema with the same `id` was previously 30 added, an exception is raised. 31 32 If an argument is of type ``Data``, it should be a json serializable data 33 representation of a JSON schema. If a schema with the same `id` was 34 previously added, an exception is raised. 35 36 If an argument is of type ``SchemaRepository``, its schemas are added to 37 the new repository. Previously added schemas with the same `id` are 38 replaced. 39 40 """ 41 42 def __init__(self, *args: typing.Union[pathlib.PurePath, 43 Data, 44 'SchemaRepository']): 45 self._validators = weakref.WeakValueDictionary() 46 self._data = collections.defaultdict(dict) 47 48 for arg in args: 49 if isinstance(arg, pathlib.PurePath): 50 self._load_path(arg) 51 52 elif isinstance(arg, SchemaRepository): 53 self._load_repository(arg) 54 55 else: 56 self._load_schema(arg) 57 58 def get_uri_schemes(self) -> typing.Iterable[str]: 59 """Get URI schemes stored in repository""" 60 return self._data.keys() 61 62 def get_schema_ids(self, 63 uri_schemes: typing.Iterable[str] | None = None 64 ) -> typing.Iterable[str]: 65 """Get schema ids stored in repository 66 67 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 68 only schema ids that have one of provided URI scheme are returned. 69 70 """ 71 if uri_schemes is None: 72 uri_schemes = self._data.keys() 73 74 for uri_scheme in uri_schemes: 75 schemas = self._data.get(uri_scheme) 76 if not schemas: 77 continue 78 79 for path in schemas.keys(): 80 yield f'{uri_scheme}://{path}' 81 82 def get_schema(self, schema_id: str) -> Data: 83 """Get stored schema based on schema id""" 84 uri = urllib.parse.urlparse(schema_id) 85 path = uri.netloc + uri.path 86 return self._data[uri.scheme][path] 87 88 def validate(self, 89 schema_id: str, 90 data: Data, 91 validator_cls: typing.Type[Validator] = DefaultValidator): 92 """Validate data against JSON schema. 93 94 Args: 95 schema_id: JSON schema identifier 96 data: data to be validated 97 validator_cls: validator implementation 98 99 Raises: 100 Exception 101 102 """ 103 validator = self._validators.get(validator_cls) 104 if validator is None: 105 validator = validator_cls(self) 106 self._validators[validator_cls] = validator 107 108 validator.validate(schema_id, data) 109 110 def to_json(self) -> Data: 111 """Export repository content as json serializable data. 112 113 Entire repository content is exported as json serializable data. 114 New repository can be created from the exported content by using 115 :meth:`SchemaRepository.from_json`. 116 117 """ 118 return self._data 119 120 @staticmethod 121 def from_json(data: pathlib.PurePath | Data 122 ) -> 'SchemaRepository': 123 """Create new repository from content exported as json serializable 124 data. 125 126 Creates a new repository from content of another repository that was 127 exported by using :meth:`SchemaRepository.to_json`. 128 129 Args: 130 data: repository data 131 132 """ 133 if isinstance(data, pathlib.PurePath): 134 data = decode_file(data) 135 136 repo = SchemaRepository() 137 repo._data.update(data) 138 return repo 139 140 def _load_path(self, path): 141 json_suffixes = {'.json', '.yaml', '.yml'} 142 paths = ([path] if path.suffix in json_suffixes 143 else list(itertools.chain.from_iterable( 144 path.rglob(f'*{i}') for i in json_suffixes))) 145 for path in paths: 146 schema = decode_file(path) 147 self._load_schema(schema) 148 149 def _load_schema(self, schema): 150 if '$schema' in schema: 151 meta_schema_id = urllib.parse.urldefrag(schema['$schema']).url 152 if meta_schema_id not in _meta_schema_ids: 153 schema = dict(schema) 154 del schema['$schema'] 155 156 schema_id = schema.get('$id') 157 if not schema_id: 158 schema_id = schema.get('id') 159 if not schema_id: 160 raise Exception('invalid schema id') 161 162 uri = urllib.parse.urlparse(schema_id) 163 path = uri.netloc + uri.path 164 if path in self._data[uri.scheme]: 165 raise Exception(f"duplicate schema id {uri.scheme}://{path}") 166 167 self._data[uri.scheme][path] = schema 168 169 def _load_repository(self, repo): 170 for k, v in repo._data.items(): 171 self._data[k].update(v)
JSON Schema repository.
A repository that holds json schemas and enables validation against them.
Repository can be initialized with multiple arguments, which can be
instances of pathlib.PurePath
, Data
or SchemaRepository
.
If an argument is of type pathlib.PurePath
, and path points to file
with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded
from the file. Otherwise, it is assumed that path points to a directory,
which is recursively searched for json and yaml files. All decoded schemas
are added to the repository. If a schema with the same id
was previously
added, an exception is raised.
If an argument is of type Data
, it should be a json serializable data
representation of a JSON schema. If a schema with the same id
was
previously added, an exception is raised.
If an argument is of type SchemaRepository
, its schemas are added to
the new repository. Previously added schemas with the same id
are
replaced.
42 def __init__(self, *args: typing.Union[pathlib.PurePath, 43 Data, 44 'SchemaRepository']): 45 self._validators = weakref.WeakValueDictionary() 46 self._data = collections.defaultdict(dict) 47 48 for arg in args: 49 if isinstance(arg, pathlib.PurePath): 50 self._load_path(arg) 51 52 elif isinstance(arg, SchemaRepository): 53 self._load_repository(arg) 54 55 else: 56 self._load_schema(arg)
58 def get_uri_schemes(self) -> typing.Iterable[str]: 59 """Get URI schemes stored in repository""" 60 return self._data.keys()
Get URI schemes stored in repository
62 def get_schema_ids(self, 63 uri_schemes: typing.Iterable[str] | None = None 64 ) -> typing.Iterable[str]: 65 """Get schema ids stored in repository 66 67 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 68 only schema ids that have one of provided URI scheme are returned. 69 70 """ 71 if uri_schemes is None: 72 uri_schemes = self._data.keys() 73 74 for uri_scheme in uri_schemes: 75 schemas = self._data.get(uri_scheme) 76 if not schemas: 77 continue 78 79 for path in schemas.keys(): 80 yield f'{uri_scheme}://{path}'
Get schema ids stored in repository
If uri_schemes
is None
, all schema ids are returned. Otherwise,
only schema ids that have one of provided URI scheme are returned.
82 def get_schema(self, schema_id: str) -> Data: 83 """Get stored schema based on schema id""" 84 uri = urllib.parse.urlparse(schema_id) 85 path = uri.netloc + uri.path 86 return self._data[uri.scheme][path]
Get stored schema based on schema id
88 def validate(self, 89 schema_id: str, 90 data: Data, 91 validator_cls: typing.Type[Validator] = DefaultValidator): 92 """Validate data against JSON schema. 93 94 Args: 95 schema_id: JSON schema identifier 96 data: data to be validated 97 validator_cls: validator implementation 98 99 Raises: 100 Exception 101 102 """ 103 validator = self._validators.get(validator_cls) 104 if validator is None: 105 validator = validator_cls(self) 106 self._validators[validator_cls] = validator 107 108 validator.validate(schema_id, data)
Validate data against JSON schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
- validator_cls: validator implementation
Raises:
- Exception
110 def to_json(self) -> Data: 111 """Export repository content as json serializable data. 112 113 Entire repository content is exported as json serializable data. 114 New repository can be created from the exported content by using 115 :meth:`SchemaRepository.from_json`. 116 117 """ 118 return self._data
Export repository content as json serializable data.
Entire repository content is exported as json serializable data.
New repository can be created from the exported content by using
SchemaRepository.from_json()
.
120 @staticmethod 121 def from_json(data: pathlib.PurePath | Data 122 ) -> 'SchemaRepository': 123 """Create new repository from content exported as json serializable 124 data. 125 126 Creates a new repository from content of another repository that was 127 exported by using :meth:`SchemaRepository.to_json`. 128 129 Args: 130 data: repository data 131 132 """ 133 if isinstance(data, pathlib.PurePath): 134 data = decode_file(data) 135 136 repo = SchemaRepository() 137 repo._data.update(data) 138 return repo
Create new repository from content exported as json serializable data.
Creates a new repository from content of another repository that was
exported by using SchemaRepository.to_json()
.
Arguments:
- data: repository data
28class Validator(typing.Protocol): 29 """JSON Schema validator interface 30 31 Args: 32 repo: repository containing JSON Schemas 33 34 """ 35 36 def __init__(self, repo: Repository): 37 ... 38 39 def validate(self, schema_id: str, data: Data): 40 """Validate data against JSON Schema. 41 42 Args: 43 schema_id: JSON schema identifier 44 data: data to be validated 45 46 Raises: 47 Exception 48 49 """
JSON Schema validator interface
Arguments:
- repo: repository containing JSON Schemas
1431def _no_init_or_replace_init(self, *args, **kwargs): 1432 cls = type(self) 1433 1434 if cls._is_protocol: 1435 raise TypeError('Protocols cannot be instantiated') 1436 1437 # Already using a custom `__init__`. No need to calculate correct 1438 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1439 if cls.__init__ is not _no_init_or_replace_init: 1440 return 1441 1442 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1443 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1444 # searches for a proper new `__init__` in the MRO. The new `__init__` 1445 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1446 # instantiation of the protocol subclass will thus use the new 1447 # `__init__` and no longer call `_no_init_or_replace_init`. 1448 for base in cls.__mro__: 1449 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1450 if init is not _no_init_or_replace_init: 1451 cls.__init__ = init 1452 break 1453 else: 1454 # should not happen 1455 cls.__init__ = object.__init__ 1456 1457 cls.__init__(self, *args, **kwargs)
39 def validate(self, schema_id: str, data: Data): 40 """Validate data against JSON Schema. 41 42 Args: 43 schema_id: JSON schema identifier 44 data: data to be validated 45 46 Raises: 47 Exception 48 49 """
Validate data against JSON Schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
Raises:
- Exception
9class JsonSchemaValidator: 10 11 def __init__(self, repo: Repository): 12 self._repo = repo 13 self._registry = referencing.Registry(retrieve=self._retrieve) 14 15 def validate(self, schema_id: str, data: Data): 16 jsonschema.validate(instance=data, 17 schema={'$ref': schema_id}, 18 registry=self._registry) 19 20 def _retrieve(self, uri): 21 try: 22 schema = self._repo.get_schema(uri) 23 24 except Exception: 25 raise referencing.exceptions.NoSuchResource(uri) 26 27 if '$schema' not in schema: 28 schema = { 29 **schema, 30 '$schema': "https://json-schema.org/draft/2020-12/schema"} 31 32 return referencing.Resource.from_contents(schema)