hat.json
JSON Data library
1"""JSON Data library""" 2 3from hat.json.data import (Array, 4 Object, 5 Data, 6 equals, 7 clone, 8 flatten) 9from hat.json.path import (Path, 10 get, 11 set_, 12 remove, 13 Storage) 14from hat.json.encoder import (Format, 15 encode, 16 decode, 17 get_file_format, 18 encode_file, 19 decode_file, 20 encode_stream, 21 decode_stream, 22 read_conf) 23from hat.json.patch import (diff, 24 patch) 25from hat.json.repository import (SchemaRepository, 26 json_schema_repo) 27from hat.json.validator import (Validator, 28 DefaultValidator, 29 JsonSchemaValidator) 30from hat.json import vt 31 32 33__all__ = ['Array', 34 'Object', 35 'Data', 36 'equals', 37 'clone', 38 'flatten', 39 'Path', 40 'get', 41 'set_', 42 'remove', 43 'Storage', 44 'Format', 45 'encode', 46 'decode', 47 'get_file_format', 48 'encode_file', 49 'decode_file', 50 'encode_stream', 51 'decode_stream', 52 'read_conf', 53 'diff', 54 'patch', 55 'SchemaRepository', 56 'json_schema_repo', 57 'Validator', 58 'DefaultValidator', 59 'JsonSchemaValidator', 60 'vt']
18def equals(a: Data, 19 b: Data 20 ) -> bool: 21 """Equality comparison of json serializable data. 22 23 Tests for equality of data according to JSON format. Notably, ``bool`` 24 values are not considered equal to numeric values in any case. This is 25 different from default equality comparison, which considers `False` 26 equal to `0` and `0.0`; and `True` equal to `1` and `1.0`. 27 28 Example:: 29 30 assert equals(0, 0.0) is True 31 assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True 32 assert equals(1, True) is False 33 34 """ 35 if a is None: 36 return b is None 37 38 if isinstance(a, bool): 39 return isinstance(b, bool) and a == b 40 41 if isinstance(a, (int, float)): 42 return (isinstance(b, (int, float)) and 43 not isinstance(b, bool) and 44 a == b) 45 46 if isinstance(a, str): 47 return isinstance(b, str) and a == b 48 49 if isinstance(a, list): 50 return (isinstance(b, list) and 51 len(a) == len(b) and 52 all(equals(i, j) for i, j in zip(a, b))) 53 54 if isinstance(a, dict): 55 return (isinstance(b, dict) and 56 len(a) == len(b) and 57 all(equals(a[key], b[key]) for key in a.keys())) 58 59 raise TypeError('invalid json type')
Equality comparison of json serializable data.
Tests for equality of data according to JSON format. Notably, bool
values are not considered equal to numeric values in any case. This is
different from default equality comparison, which considers False
equal to 0
and 0.0
; and True
equal to 1
and 1.0
.
Example::
assert equals(0, 0.0) is True
assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
assert equals(1, True) is False
62def clone(data: Data) -> Data: 63 """Deep clone data 64 65 This function recursively creates new instances of array and object data 66 based on input data. Resulting json data is equal to provided data. 67 68 Example:: 69 70 x = {'a': [1, 2, 3]} 71 y = clone(x) 72 assert x is not y 73 assert x['a'] is not y['a'] 74 assert equals(x, y) 75 76 """ 77 if isinstance(data, list): 78 return [clone(i) for i in data] 79 80 if isinstance(data, dict): 81 return {k: clone(v) for k, v in data.items()} 82 83 return data
Deep clone data
This function recursively creates new instances of array and object data based on input data. Resulting json data is equal to provided data.
Example::
x = {'a': [1, 2, 3]}
y = clone(x)
assert x is not y
assert x['a'] is not y['a']
assert equals(x, y)
86def flatten(data: Data) -> Iterable[Data]: 87 """Flatten JSON data 88 89 If `data` is array, this generator recursively yields result of `flatten` 90 call with each element of input list. For other `Data` types, input data is 91 yielded. 92 93 Example:: 94 95 data = [1, [], [2], {'a': [3]}] 96 result = [1, 2, {'a': [3]}] 97 assert list(flatten(data)) == result 98 99 """ 100 if isinstance(data, list): 101 for i in data: 102 yield from flatten(i) 103 104 else: 105 yield data
17def get(data: Data, 18 path: Path, 19 default: Data | None = None 20 ) -> Data: 21 """Get data element referenced by path 22 23 Example:: 24 25 data = {'a': [1, 2, [3, 4]]} 26 path = ['a', 2, 0] 27 assert get(data, path) == 3 28 29 data = [1, 2, 3] 30 assert get(data, 0) == 1 31 assert get(data, 5) is None 32 assert get(data, 5, default=123) == 123 33 34 """ 35 for i in flatten(path): 36 if isinstance(i, str): 37 if not isinstance(data, dict) or i not in data: 38 return default 39 data = data[i] 40 41 elif isinstance(i, int) and not isinstance(i, bool): 42 if not isinstance(data, list): 43 return default 44 try: 45 data = data[i] 46 except IndexError: 47 return default 48 49 else: 50 raise ValueError('invalid path') 51 52 return data
Get data element referenced by path
Example::
data = {'a': [1, 2, [3, 4]]}
path = ['a', 2, 0]
assert get(data, path) == 3
data = [1, 2, 3]
assert get(data, 0) == 1
assert get(data, 5) is None
assert get(data, 5, default=123) == 123
55def set_(data: Data, 56 path: Path, 57 value: Data 58 ) -> Data: 59 """Create new data by setting data path element value 60 61 Example:: 62 63 data = [1, {'a': 2, 'b': 3}, 4] 64 path = [1, 'b'] 65 result = set_(data, path, 5) 66 assert result == [1, {'a': 2, 'b': 5}, 4] 67 assert result is not data 68 69 data = [1, 2, 3] 70 result = set_(data, 4, 4) 71 assert result == [1, 2, 3, None, 4] 72 73 """ 74 parents = collections.deque() 75 76 for i in flatten(path): 77 parent = data 78 79 if isinstance(i, str): 80 data = data.get(i) if isinstance(data, dict) else None 81 82 elif isinstance(i, int) and not isinstance(i, bool): 83 try: 84 data = data[i] if isinstance(data, list) else None 85 except IndexError: 86 data = None 87 88 else: 89 raise ValueError('invalid path') 90 91 parents.append((parent, i)) 92 93 while parents: 94 parent, i = parents.pop() 95 96 if isinstance(i, str): 97 parent = dict(parent) if isinstance(parent, dict) else {} 98 parent[i] = value 99 100 elif isinstance(i, int) and not isinstance(i, bool): 101 if not isinstance(parent, list): 102 parent = [] 103 104 if i >= len(parent): 105 parent = [*parent, 106 *itertools.repeat(None, i - len(parent) + 1)] 107 108 elif i < 0 and (-i) > len(parent): 109 parent = [*itertools.repeat(None, (-i) - len(parent)), 110 *parent] 111 112 else: 113 parent = list(parent) 114 115 parent[i] = value 116 117 else: 118 raise ValueError('invalid path') 119 120 value = parent 121 122 return value
Create new data by setting data path element value
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = set_(data, path, 5)
assert result == [1, {'a': 2, 'b': 5}, 4]
assert result is not data
data = [1, 2, 3]
result = set_(data, 4, 4)
assert result == [1, 2, 3, None, 4]
125def remove(data: Data, 126 path: Path 127 ) -> Data: 128 """Create new data by removing part of data referenced by path 129 130 Example:: 131 132 data = [1, {'a': 2, 'b': 3}, 4] 133 path = [1, 'b'] 134 result = remove(data, path) 135 assert result == [1, {'a': 2}, 4] 136 assert result is not data 137 138 data = [1, 2, 3] 139 result = remove(data, 4) 140 assert result == [1, 2, 3] 141 142 """ 143 result = data 144 parents = collections.deque() 145 146 for i in flatten(path): 147 parent = data 148 149 if isinstance(i, str): 150 if not isinstance(data, dict) or i not in data: 151 return result 152 data = data[i] 153 154 elif isinstance(i, int) and not isinstance(i, bool): 155 if not isinstance(data, list): 156 return result 157 try: 158 data = data[i] 159 except IndexError: 160 return result 161 162 else: 163 raise ValueError('invalid path') 164 165 parents.append((parent, i)) 166 167 result = None 168 169 while parents: 170 parent, i = parents.pop() 171 172 if isinstance(i, str): 173 parent = dict(parent) 174 175 elif isinstance(i, int) and not isinstance(i, bool): 176 parent = list(parent) 177 178 else: 179 raise ValueError('invalid path') 180 181 if result is None: 182 del parent[i] 183 184 else: 185 parent[i] = result 186 187 result = parent 188 189 return result
Create new data by removing part of data referenced by path
Example::
data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = remove(data, path)
assert result == [1, {'a': 2}, 4]
assert result is not data
data = [1, 2, 3]
result = remove(data, 4)
assert result == [1, 2, 3]
192class Storage: 193 """JSON data storage 194 195 Helper class representing observable JSON data state manipulated with 196 path based get/set/remove functions. 197 198 """ 199 200 def __init__(self, data: Data = None): 201 self._data = data 202 self._change_cbs = util.CallbackRegistry() 203 204 @property 205 def data(self) -> Data: 206 """Data""" 207 return self._data 208 209 def register_change_cb(self, 210 cb: Callable[[Data], None] 211 ) -> util.RegisterCallbackHandle: 212 """Register data change callback""" 213 return self._change_cbs.register(cb) 214 215 def get(self, path: Path, default: Data | None = None): 216 """Get data""" 217 return get(self._data, path, default) 218 219 def set(self, path: Path, value: Data): 220 """Set data""" 221 self._data = set_(self._data, path, value) 222 self._change_cbs.notify(self._data) 223 224 def remove(self, path: Path): 225 """Remove data""" 226 self._data = remove(self._data, path) 227 self._change_cbs.notify(self._data)
JSON data storage
Helper class representing observable JSON data state manipulated with path based get/set/remove functions.
Data
209 def register_change_cb(self, 210 cb: Callable[[Data], None] 211 ) -> util.RegisterCallbackHandle: 212 """Register data change callback""" 213 return self._change_cbs.register(cb)
Register data change callback
215 def get(self, path: Path, default: Data | None = None): 216 """Get data""" 217 return get(self._data, path, default)
Get data
219 def set(self, path: Path, value: Data): 220 """Set data""" 221 self._data = set_(self._data, path, value) 222 self._change_cbs.notify(self._data)
Set data
21class Format(enum.Enum): 22 """Encoding format""" 23 JSON = 'json' 24 YAML = 'yaml' 25 TOML = 'toml'
Encoding format
28def encode(data: Data, 29 format: Format = Format.JSON, 30 indent: int | None = None 31 ) -> str: 32 """Encode JSON data. 33 34 In case of TOML format, data must be JSON Object. 35 36 Args: 37 data: JSON data 38 format: encoding format 39 indent: indentation size 40 41 """ 42 if format == Format.JSON: 43 return json.dumps(data, indent=indent, allow_nan=False) 44 45 if format == Format.YAML: 46 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 47 else yaml.SafeDumper) 48 return str(yaml.dump(data, indent=indent, Dumper=dumper)) 49 50 if format == Format.TOML: 51 return tomli_w.dumps(data) 52 53 raise ValueError('unsupported format')
Encode JSON data.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- format: encoding format
- indent: indentation size
56def decode(data_str: str, 57 format: Format = Format.JSON 58 ) -> Data: 59 """Decode JSON data. 60 61 Args: 62 data_str: encoded JSON data 63 format: encoding format 64 65 """ 66 if format == Format.JSON: 67 return json.loads(data_str) 68 69 if format == Format.YAML: 70 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 71 else yaml.SafeLoader) 72 return yaml.load(io.StringIO(data_str), Loader=loader) 73 74 if format == Format.TOML: 75 return toml.loads(data_str) 76 77 raise ValueError('unsupported format')
Decode JSON data.
Arguments:
- data_str: encoded JSON data
- format: encoding format
80def get_file_format(path: pathlib.PurePath) -> Format: 81 """Detect file format based on path suffix""" 82 if path.suffix == '.json': 83 return Format.JSON 84 85 if path.suffix in ('.yaml', '.yml'): 86 return Format.YAML 87 88 if path.suffix == '.toml': 89 return Format.TOML 90 91 raise ValueError('can not determine format from path suffix')
Detect file format based on path suffix
94def encode_file(data: Data, 95 path: pathlib.PurePath, 96 format: Format | None = None, 97 indent: int | None = 4): 98 """Encode JSON data to file. 99 100 If `format` is ``None``, encoding format is derived from path suffix. 101 102 In case of TOML format, data must be JSON Object. 103 104 Args: 105 data: JSON data 106 path: file path 107 format: encoding format 108 indent: indentation size 109 110 """ 111 if format is None: 112 format = get_file_format(path) 113 114 flags = 'w' if format != Format.TOML else 'wb' 115 encoding = 'utf-8' if format != Format.TOML else None 116 117 with open(path, flags, encoding=encoding) as f: 118 encode_stream(data, f, format, indent)
Encode JSON data to file.
If format
is None
, encoding format is derived from path suffix.
In case of TOML format, data must be JSON Object.
Arguments:
- data: JSON data
- path: file path
- format: encoding format
- indent: indentation size
121def decode_file(path: pathlib.PurePath, 122 format: Format | None = None 123 ) -> Data: 124 """Decode JSON data from file. 125 126 If `format` is ``None``, encoding format is derived from path suffix. 127 128 Args: 129 path: file path 130 format: encoding format 131 132 """ 133 if format is None: 134 format = get_file_format(path) 135 136 flags = 'r' if format != Format.TOML else 'rb' 137 encoding = 'utf-8' if format != Format.TOML else None 138 139 with open(path, flags, encoding=encoding) as f: 140 return decode_stream(f, format)
Decode JSON data from file.
If format
is None
, encoding format is derived from path suffix.
Arguments:
- path: file path
- format: encoding format
143def encode_stream(data: Data, 144 stream: io.TextIOBase | io.RawIOBase, 145 format: Format = Format.JSON, 146 indent: int | None = 4): 147 """Encode JSON data to stream. 148 149 In case of TOML format, data must be JSON Object. 150 151 In case of TOML format, `stream` should be `io.RawIOBase`. For 152 other formats, `io.TextIOBase` is expected. 153 154 Args: 155 data: JSON data 156 stream: output stream 157 format: encoding format 158 indent: indentation size 159 160 """ 161 if format == Format.JSON: 162 json.dump(data, stream, indent=indent, allow_nan=False) 163 164 elif format == Format.YAML: 165 dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper') 166 else yaml.SafeDumper) 167 yaml.dump(data, stream, indent=indent, Dumper=dumper, 168 explicit_start=True, explicit_end=True) 169 170 elif format == Format.TOML: 171 tomli_w.dump(data, stream) 172 173 else: 174 raise ValueError('unsupported format')
Encode JSON data to stream.
In case of TOML format, data must be JSON Object.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- data: JSON data
- stream: output stream
- format: encoding format
- indent: indentation size
177def decode_stream(stream: io.TextIOBase | io.RawIOBase, 178 format: Format = Format.JSON 179 ) -> Data: 180 """Decode JSON data from stream. 181 182 In case of TOML format, `stream` should be `io.RawIOBase`. For 183 other formats, `io.TextIOBase` is expected. 184 185 Args: 186 stream: input stream 187 format: encoding format 188 189 """ 190 if format == Format.JSON: 191 return json.load(stream) 192 193 if format == Format.YAML: 194 loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader') 195 else yaml.SafeLoader) 196 return yaml.load(stream, Loader=loader) 197 198 if format == Format.TOML: 199 return toml.load(stream) 200 201 raise ValueError('unsupported format')
Decode JSON data from stream.
In case of TOML format, stream
should be io.RawIOBase
. For
other formats, io.TextIOBase
is expected.
Arguments:
- stream: input stream
- format: encoding format
204def read_conf(path: pathlib.Path | None, 205 default_path: pathlib.Path | None = None, 206 default_suffixes: list[str] = ['.yaml', '.yml', '.toml', '.json'], # NOQA 207 stdio_path: pathlib.Path | None = pathlib.Path('-') 208 ) -> Data: 209 """Read configuration formated as JSON data""" 210 if stdio_path and path == stdio_path: 211 return decode_stream(sys.stdin) 212 213 if path: 214 return decode_file(path) 215 216 if not default_path: 217 raise Exception('invalid configuration path') 218 219 for suffix in default_suffixes: 220 path = default_path.with_suffix(suffix) 221 if path.exists(): 222 break 223 224 return decode_file(path)
Read configuration formated as JSON data
9def diff(src: Data, 10 dst: Data 11 ) -> Data: 12 """Generate JSON Patch diff. 13 14 Example:: 15 16 src = [1, {'a': 2}, 3] 17 dst = [1, {'a': 4}, 3] 18 result = diff(src, dst) 19 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 20 21 """ 22 return jsonpatch.JsonPatch.from_diff(src, dst).patch
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
25def patch(data: Data, 26 diff: Data 27 ) -> Data: 28 """Apply JSON Patch diff. 29 30 Example:: 31 32 data = [1, {'a': 2}, 3] 33 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 34 result = patch(data, d) 35 assert result == [1, {'a': 4}, 3] 36 37 """ 38 return jsonpatch.apply_patch(data, diff)
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]
18class SchemaRepository: 19 """JSON Schema repository. 20 21 A repository that holds json schemas and enables validation against them. 22 23 Repository can be initialized with multiple arguments, which can be 24 instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``. 25 26 If an argument is of type ``pathlib.PurePath``, and path points to file 27 with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded 28 from the file. Otherwise, it is assumed that path points to a directory, 29 which is recursively searched for json and yaml files. All decoded schemas 30 are added to the repository. If a schema with the same `id` was previously 31 added, an exception is raised. 32 33 If an argument is of type ``Data``, it should be a json serializable data 34 representation of a JSON schema. If a schema with the same `id` was 35 previously added, an exception is raised. 36 37 If an argument is of type ``SchemaRepository``, its schemas are added to 38 the new repository. Previously added schemas with the same `id` are 39 replaced. 40 41 """ 42 43 def __init__(self, *args: typing.Union[pathlib.PurePath, 44 Data, 45 'SchemaRepository']): 46 self._validators = weakref.WeakValueDictionary() 47 self._data = collections.defaultdict(dict) 48 49 for arg in args: 50 if isinstance(arg, pathlib.PurePath): 51 self._load_path(arg) 52 53 elif isinstance(arg, SchemaRepository): 54 self._load_repository(arg) 55 56 else: 57 self._load_schema(arg) 58 59 def get_uri_schemes(self) -> Iterable[str]: 60 """Get URI schemes stored in repository""" 61 return self._data.keys() 62 63 def get_schema_ids(self, 64 uri_schemes: Iterable[str] | None = None 65 ) -> Iterable[str]: 66 """Get schema ids stored in repository 67 68 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 69 only schema ids that have one of provided URI scheme are returned. 70 71 """ 72 if uri_schemes is None: 73 uri_schemes = self._data.keys() 74 75 for uri_scheme in uri_schemes: 76 schemas = self._data.get(uri_scheme) 77 if not schemas: 78 continue 79 80 for path in schemas.keys(): 81 yield f'{uri_scheme}://{path}' 82 83 def get_schema(self, schema_id: str) -> Data: 84 """Get stored schema based on schema id""" 85 uri = urllib.parse.urlparse(schema_id) 86 path = uri.netloc + uri.path 87 return self._data[uri.scheme][path] 88 89 def validate(self, 90 schema_id: str, 91 data: Data, 92 validator_cls: typing.Type[Validator] = DefaultValidator): 93 """Validate data against JSON schema. 94 95 Args: 96 schema_id: JSON schema identifier 97 data: data to be validated 98 validator_cls: validator implementation 99 100 Raises: 101 Exception 102 103 """ 104 validator = self._validators.get(validator_cls) 105 if validator is None: 106 validator = validator_cls(self) 107 self._validators[validator_cls] = validator 108 109 validator.validate(schema_id, data) 110 111 def to_json(self) -> Data: 112 """Export repository content as json serializable data. 113 114 Entire repository content is exported as json serializable data. 115 New repository can be created from the exported content by using 116 :meth:`SchemaRepository.from_json`. 117 118 """ 119 return self._data 120 121 @staticmethod 122 def from_json(data: pathlib.PurePath | Data 123 ) -> 'SchemaRepository': 124 """Create new repository from content exported as json serializable 125 data. 126 127 Creates a new repository from content of another repository that was 128 exported by using :meth:`SchemaRepository.to_json`. 129 130 Args: 131 data: repository data 132 133 """ 134 if isinstance(data, pathlib.PurePath): 135 data = decode_file(data) 136 137 repo = SchemaRepository() 138 repo._data.update(data) 139 return repo 140 141 def _load_path(self, path): 142 json_suffixes = {'.json', '.yaml', '.yml'} 143 paths = ([path] if path.suffix in json_suffixes 144 else list(itertools.chain.from_iterable( 145 path.rglob(f'*{i}') for i in json_suffixes))) 146 for path in paths: 147 schema = decode_file(path) 148 self._load_schema(schema) 149 150 def _load_schema(self, schema): 151 if '$schema' in schema: 152 meta_schema_id = urllib.parse.urldefrag(schema['$schema']).url 153 if meta_schema_id not in _meta_schema_ids: 154 schema = dict(schema) 155 del schema['$schema'] 156 157 schema_id = schema.get('$id') 158 if not schema_id: 159 schema_id = schema.get('id') 160 if not schema_id: 161 raise Exception('invalid schema id') 162 163 uri = urllib.parse.urlparse(schema_id) 164 path = uri.netloc + uri.path 165 if path in self._data[uri.scheme]: 166 raise Exception(f"duplicate schema id {uri.scheme}://{path}") 167 168 self._data[uri.scheme][path] = schema 169 170 def _load_repository(self, repo): 171 for k, v in repo._data.items(): 172 self._data[k].update(v)
JSON Schema repository.
A repository that holds json schemas and enables validation against them.
Repository can be initialized with multiple arguments, which can be
instances of pathlib.PurePath
, Data
or SchemaRepository
.
If an argument is of type pathlib.PurePath
, and path points to file
with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded
from the file. Otherwise, it is assumed that path points to a directory,
which is recursively searched for json and yaml files. All decoded schemas
are added to the repository. If a schema with the same id
was previously
added, an exception is raised.
If an argument is of type Data
, it should be a json serializable data
representation of a JSON schema. If a schema with the same id
was
previously added, an exception is raised.
If an argument is of type SchemaRepository
, its schemas are added to
the new repository. Previously added schemas with the same id
are
replaced.
43 def __init__(self, *args: typing.Union[pathlib.PurePath, 44 Data, 45 'SchemaRepository']): 46 self._validators = weakref.WeakValueDictionary() 47 self._data = collections.defaultdict(dict) 48 49 for arg in args: 50 if isinstance(arg, pathlib.PurePath): 51 self._load_path(arg) 52 53 elif isinstance(arg, SchemaRepository): 54 self._load_repository(arg) 55 56 else: 57 self._load_schema(arg)
59 def get_uri_schemes(self) -> Iterable[str]: 60 """Get URI schemes stored in repository""" 61 return self._data.keys()
Get URI schemes stored in repository
63 def get_schema_ids(self, 64 uri_schemes: Iterable[str] | None = None 65 ) -> Iterable[str]: 66 """Get schema ids stored in repository 67 68 If `uri_schemes` is ``None``, all schema ids are returned. Otherwise, 69 only schema ids that have one of provided URI scheme are returned. 70 71 """ 72 if uri_schemes is None: 73 uri_schemes = self._data.keys() 74 75 for uri_scheme in uri_schemes: 76 schemas = self._data.get(uri_scheme) 77 if not schemas: 78 continue 79 80 for path in schemas.keys(): 81 yield f'{uri_scheme}://{path}'
Get schema ids stored in repository
If uri_schemes
is None
, all schema ids are returned. Otherwise,
only schema ids that have one of provided URI scheme are returned.
83 def get_schema(self, schema_id: str) -> Data: 84 """Get stored schema based on schema id""" 85 uri = urllib.parse.urlparse(schema_id) 86 path = uri.netloc + uri.path 87 return self._data[uri.scheme][path]
Get stored schema based on schema id
89 def validate(self, 90 schema_id: str, 91 data: Data, 92 validator_cls: typing.Type[Validator] = DefaultValidator): 93 """Validate data against JSON schema. 94 95 Args: 96 schema_id: JSON schema identifier 97 data: data to be validated 98 validator_cls: validator implementation 99 100 Raises: 101 Exception 102 103 """ 104 validator = self._validators.get(validator_cls) 105 if validator is None: 106 validator = validator_cls(self) 107 self._validators[validator_cls] = validator 108 109 validator.validate(schema_id, data)
Validate data against JSON schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
- validator_cls: validator implementation
Raises:
- Exception
111 def to_json(self) -> Data: 112 """Export repository content as json serializable data. 113 114 Entire repository content is exported as json serializable data. 115 New repository can be created from the exported content by using 116 :meth:`SchemaRepository.from_json`. 117 118 """ 119 return self._data
Export repository content as json serializable data.
Entire repository content is exported as json serializable data.
New repository can be created from the exported content by using
SchemaRepository.from_json()
.
121 @staticmethod 122 def from_json(data: pathlib.PurePath | Data 123 ) -> 'SchemaRepository': 124 """Create new repository from content exported as json serializable 125 data. 126 127 Creates a new repository from content of another repository that was 128 exported by using :meth:`SchemaRepository.to_json`. 129 130 Args: 131 data: repository data 132 133 """ 134 if isinstance(data, pathlib.PurePath): 135 data = decode_file(data) 136 137 repo = SchemaRepository() 138 repo._data.update(data) 139 return repo
Create new repository from content exported as json serializable data.
Creates a new repository from content of another repository that was
exported by using SchemaRepository.to_json()
.
Arguments:
- data: repository data
29class Validator(typing.Protocol): 30 """JSON Schema validator interface 31 32 Args: 33 repo: repository containing JSON Schemas 34 35 """ 36 37 def __init__(self, repo: Repository): 38 ... 39 40 def validate(self, schema_id: str, data: Data): 41 """Validate data against JSON Schema. 42 43 Args: 44 schema_id: JSON schema identifier 45 data: data to be validated 46 47 Raises: 48 Exception 49 50 """
JSON Schema validator interface
Arguments:
- repo: repository containing JSON Schemas
1431def _no_init_or_replace_init(self, *args, **kwargs): 1432 cls = type(self) 1433 1434 if cls._is_protocol: 1435 raise TypeError('Protocols cannot be instantiated') 1436 1437 # Already using a custom `__init__`. No need to calculate correct 1438 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1439 if cls.__init__ is not _no_init_or_replace_init: 1440 return 1441 1442 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1443 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1444 # searches for a proper new `__init__` in the MRO. The new `__init__` 1445 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1446 # instantiation of the protocol subclass will thus use the new 1447 # `__init__` and no longer call `_no_init_or_replace_init`. 1448 for base in cls.__mro__: 1449 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1450 if init is not _no_init_or_replace_init: 1451 cls.__init__ = init 1452 break 1453 else: 1454 # should not happen 1455 cls.__init__ = object.__init__ 1456 1457 cls.__init__(self, *args, **kwargs)
40 def validate(self, schema_id: str, data: Data): 41 """Validate data against JSON Schema. 42 43 Args: 44 schema_id: JSON schema identifier 45 data: data to be validated 46 47 Raises: 48 Exception 49 50 """
Validate data against JSON Schema.
Arguments:
- schema_id: JSON schema identifier
- data: data to be validated
Raises:
- Exception
9class JsonSchemaValidator: 10 11 def __init__(self, repo: Repository): 12 self._repo = repo 13 self._registry = referencing.Registry(retrieve=self._retrieve) 14 15 def validate(self, schema_id: str, data: Data): 16 jsonschema.validate(instance=data, 17 schema={'$ref': schema_id}, 18 registry=self._registry) 19 20 def _retrieve(self, uri): 21 try: 22 schema = self._repo.get_schema(uri) 23 24 except Exception: 25 raise referencing.exceptions.NoSuchResource(uri) 26 27 if '$schema' not in schema: 28 schema = { 29 **schema, 30 '$schema': "https://json-schema.org/draft/2020-12/schema"} 31 32 return referencing.Resource.from_contents(schema)