hat.json

JSON Data library

 1"""JSON Data library"""
 2
 3from hat.json.data import (Array,
 4                           Object,
 5                           Data,
 6                           equals,
 7                           clone,
 8                           flatten)
 9from hat.json.path import (Path,
10                           get,
11                           set_,
12                           remove,
13                           Storage)
14from hat.json.encoder import (Format,
15                              encode,
16                              decode,
17                              get_file_format,
18                              encode_file,
19                              decode_file,
20                              encode_stream,
21                              decode_stream)
22from hat.json.patch import (diff,
23                            patch)
24from hat.json.repository import (SchemaRepository,
25                                 json_schema_repo)
26from hat.json.validator import (Validator,
27                                DefaultValidator,
28                                JsonSchemaValidator)
29from hat.json import vt
30
31
32__all__ = ['Array',
33           'Object',
34           'Data',
35           'equals',
36           'clone',
37           'flatten',
38           'Path',
39           'get',
40           'set_',
41           'remove',
42           'Storage',
43           'Format',
44           'encode',
45           'decode',
46           'get_file_format',
47           'encode_file',
48           'decode_file',
49           'encode_stream',
50           'decode_stream',
51           'diff',
52           'patch',
53           'SchemaRepository',
54           'json_schema_repo',
55           'Validator',
56           'DefaultValidator',
57           'JsonSchemaValidator',
58           'vt']
Array = ~Array
Object = ~Object
Data = ~Data
def equals( a: Union[NoneType, bool, int, float, str, List[~Data], Dict[str, ~Data]], b: Union[NoneType, bool, int, float, str, List[~Data], Dict[str, ~Data]]) -> bool:
19def equals(a: Data,
20           b: Data
21           ) -> bool:
22    """Equality comparison of json serializable data.
23
24    Tests for equality of data according to JSON format. Notably, ``bool``
25    values are not considered equal to numeric values in any case. This is
26    different from default equality comparison, which considers `False`
27    equal to `0` and `0.0`; and `True` equal to `1` and `1.0`.
28
29    Example::
30
31        assert equals(0, 0.0) is True
32        assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
33        assert equals(1, True) is False
34
35    """
36    if isinstance(a, bool) != isinstance(b, bool):
37        return False
38    if a != b:
39        return False
40
41    if isinstance(a, dict):
42        return all(equals(a[key], b[key]) for key in a)
43    elif isinstance(a, list):
44        return all(equals(i, j) for i, j in zip(a, b))
45    else:
46        return True

Equality comparison of json serializable data.

Tests for equality of data according to JSON format. Notably, bool values are not considered equal to numeric values in any case. This is different from default equality comparison, which considers False equal to 0 and 0.0; and True equal to 1 and 1.0.

Example::

assert equals(0, 0.0) is True
assert equals({'a': 1, 'b': 2}, {'b': 2, 'a': 1}) is True
assert equals(1, True) is False
def clone( data: Union[NoneType, bool, int, float, str, List[~Data], Dict[str, ~Data]]) -> Union[NoneType, bool, int, float, str, List[~Data], Dict[str, ~Data]]:
49def clone(data: Data) -> Data:
50    """Deep clone data
51
52    This function recursively creates new instances of array and object data
53    based on input data. Resulting json data is equal to provided data.
54
55    Example::
56
57        x = {'a': [1, 2, 3]}
58        y = clone(x)
59        assert x is not y
60        assert x['a'] is not y['a']
61        assert equals(x, y)
62
63    """
64    if isinstance(data, list):
65        return [clone(i) for i in data]
66
67    if isinstance(data, dict):
68        return {k: clone(v) for k, v in data.items()}
69
70    return data

Deep clone data

This function recursively creates new instances of array and object data based on input data. Resulting json data is equal to provided data.

Example::

x = {'a': [1, 2, 3]}
y = clone(x)
assert x is not y
assert x['a'] is not y['a']
assert equals(x, y)
def flatten( data: Union[NoneType, bool, int, float, str, List[~Data], Dict[str, ~Data]]) -> Iterable[Union[NoneType, bool, int, float, str, List[~Data], Dict[str, ~Data]]]:
73def flatten(data: Data
74            ) -> typing.Iterable[Data]:
75    """Flatten JSON data
76
77    If `data` is array, this generator recursively yields result of `flatten`
78    call with each element of input list. For other `Data` types, input data is
79    yielded.
80
81    Example::
82
83        data = [1, [], [2], {'a': [3]}]
84        result = [1, 2, {'a': [3]}]
85        assert list(flatten(data)) == result
86
87    """
88    if isinstance(data, list):
89        for i in data:
90            yield from flatten(i)
91    else:
92        yield data

Flatten JSON data

If data is array, this generator recursively yields result of flatten call with each element of input list. For other Data types, input data is yielded.

Example::

data = [1, [], [2], {'a': [3]}]
result = [1, 2, {'a': [3]}]
assert list(flatten(data)) == result
Path = ~Path
def get( data: ~Data, path: Union[int, str, List[~Path]], default: Optional[~Data] = None) -> ~Data:
17def get(data: Data,
18        path: Path,
19        default: typing.Optional[Data] = None
20        ) -> Data:
21    """Get data element referenced by path
22
23    Example::
24
25        data = {'a': [1, 2, [3, 4]]}
26        path = ['a', 2, 0]
27        assert get(data, path) == 3
28
29        data = [1, 2, 3]
30        assert get(data, 0) == 1
31        assert get(data, 5) is None
32        assert get(data, 5, default=123) == 123
33
34    """
35    for i in flatten(path):
36        if isinstance(i, str):
37            if not isinstance(data, dict) or i not in data:
38                return default
39            data = data[i]
40
41        elif isinstance(i, int) and not isinstance(i, bool):
42            if not isinstance(data, list):
43                return default
44            try:
45                data = data[i]
46            except IndexError:
47                return default
48
49        else:
50            raise ValueError('invalid path')
51
52    return data

Get data element referenced by path

Example::

data = {'a': [1, 2, [3, 4]]}
path = ['a', 2, 0]
assert get(data, path) == 3

data = [1, 2, 3]
assert get(data, 0) == 1
assert get(data, 5) is None
assert get(data, 5, default=123) == 123
def set_(data: ~Data, path: Union[int, str, List[~Path]], value: ~Data) -> ~Data:
 55def set_(data: Data,
 56         path: Path,
 57         value: Data
 58         ) -> Data:
 59    """Create new data by setting data path element value
 60
 61    Example::
 62
 63        data = [1, {'a': 2, 'b': 3}, 4]
 64        path = [1, 'b']
 65        result = set_(data, path, 5)
 66        assert result == [1, {'a': 2, 'b': 5}, 4]
 67        assert result is not data
 68
 69        data = [1, 2, 3]
 70        result = set_(data, 4, 4)
 71        assert result == [1, 2, 3, None, 4]
 72
 73    """
 74    parents = collections.deque()
 75
 76    for i in flatten(path):
 77        parent = data
 78
 79        if isinstance(i, str):
 80            data = data.get(i) if isinstance(data, dict) else None
 81
 82        elif isinstance(i, int) and not isinstance(i, bool):
 83            try:
 84                data = data[i] if isinstance(data, list) else None
 85            except IndexError:
 86                data = None
 87
 88        else:
 89            raise ValueError('invalid path')
 90
 91        parents.append((parent, i))
 92
 93    while parents:
 94        parent, i = parents.pop()
 95
 96        if isinstance(i, str):
 97            parent = dict(parent) if isinstance(parent, dict) else {}
 98            parent[i] = value
 99
100        elif isinstance(i, int) and not isinstance(i, bool):
101            if not isinstance(parent, list):
102                parent = []
103
104            if i >= len(parent):
105                parent = [*parent,
106                          *itertools.repeat(None, i - len(parent) + 1)]
107
108            elif i < 0 and (-i) > len(parent):
109                parent = [*itertools.repeat(None, (-i) - len(parent)),
110                          *parent]
111
112            else:
113                parent = list(parent)
114
115            parent[i] = value
116
117        else:
118            raise ValueError('invalid path')
119
120        value = parent
121
122    return value

Create new data by setting data path element value

Example::

data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = set_(data, path, 5)
assert result == [1, {'a': 2, 'b': 5}, 4]
assert result is not data

data = [1, 2, 3]
result = set_(data, 4, 4)
assert result == [1, 2, 3, None, 4]
def remove(data: ~Data, path: Union[int, str, List[~Path]]) -> ~Data:
125def remove(data: Data,
126           path: Path
127           ) -> Data:
128    """Create new data by removing part of data referenced by path
129
130    Example::
131
132        data = [1, {'a': 2, 'b': 3}, 4]
133        path = [1, 'b']
134        result = remove(data, path)
135        assert result == [1, {'a': 2}, 4]
136        assert result is not data
137
138        data = [1, 2, 3]
139        result = remove(data, 4)
140        assert result == [1, 2, 3]
141
142    """
143    result = data
144    parents = collections.deque()
145
146    for i in flatten(path):
147        parent = data
148
149        if isinstance(i, str):
150            if not isinstance(data, dict) or i not in data:
151                return result
152            data = data[i]
153
154        elif isinstance(i, int) and not isinstance(i, bool):
155            if not isinstance(data, list):
156                return result
157            try:
158                data = data[i]
159            except IndexError:
160                return result
161
162        else:
163            raise ValueError('invalid path')
164
165        parents.append((parent, i))
166
167    result = None
168
169    while parents:
170        parent, i = parents.pop()
171
172        if isinstance(i, str):
173            parent = dict(parent)
174
175        elif isinstance(i, int) and not isinstance(i, bool):
176            parent = list(parent)
177
178        else:
179            raise ValueError('invalid path')
180
181        if result is None:
182            del parent[i]
183
184        else:
185            parent[i] = result
186
187        result = parent
188
189    return result

Create new data by removing part of data referenced by path

Example::

data = [1, {'a': 2, 'b': 3}, 4]
path = [1, 'b']
result = remove(data, path)
assert result == [1, {'a': 2}, 4]
assert result is not data

data = [1, 2, 3]
result = remove(data, 4)
assert result == [1, 2, 3]
class Storage:
192class Storage:
193    """JSON data storage
194
195    Helper class representing observable JSON data state manipulated with
196    path based get/set/remove functions.
197
198    """
199
200    def __init__(self, data: Data = None):
201        self._data = data
202        self._change_cbs = util.CallbackRegistry()
203
204    @property
205    def data(self) -> Data:
206        """Data"""
207        return self._data
208
209    def register_change_cb(self,
210                           cb: typing.Callable[[Data], None]
211                           ) -> util.RegisterCallbackHandle:
212        """Register data change callback"""
213        return self._change_cbs.register(cb)
214
215    def get(self, path: Path):
216        """Get data"""
217        return get(self._data, path)
218
219    def set(self, path: Path, value: Data):
220        """Set data"""
221        self._data = set_(self._data, path, value)
222        self._change_cbs.notify(self._data)
223
224    def remove(self, path: Path):
225        """Remove data"""
226        self._data = remove(self._data, path)
227        self._change_cbs.notify(self._data)

JSON data storage

Helper class representing observable JSON data state manipulated with path based get/set/remove functions.

Storage(data: ~Data = None)
200    def __init__(self, data: Data = None):
201        self._data = data
202        self._change_cbs = util.CallbackRegistry()
data: ~Data

Data

def register_change_cb(self, cb: Callable[[~Data], NoneType]) -> hat.util.RegisterCallbackHandle:
209    def register_change_cb(self,
210                           cb: typing.Callable[[Data], None]
211                           ) -> util.RegisterCallbackHandle:
212        """Register data change callback"""
213        return self._change_cbs.register(cb)

Register data change callback

def get(self, path: Union[int, str, List[~Path]]):
215    def get(self, path: Path):
216        """Get data"""
217        return get(self._data, path)

Get data

def set(self, path: Union[int, str, List[~Path]], value: ~Data):
219    def set(self, path: Path, value: Data):
220        """Set data"""
221        self._data = set_(self._data, path, value)
222        self._change_cbs.notify(self._data)

Set data

def remove(self, path: Union[int, str, List[~Path]]):
224    def remove(self, path: Path):
225        """Remove data"""
226        self._data = remove(self._data, path)
227        self._change_cbs.notify(self._data)

Remove data

class Format(enum.Enum):
17class Format(enum.Enum):
18    """Encoding format"""
19    JSON = 'json'
20    YAML = 'yaml'
21    TOML = 'toml'

Encoding format

Inherited Members
enum.Enum
name
value
def encode( data: ~Data, format: hat.json.Format = <Format.JSON: 'json'>, indent: Optional[int] = None) -> str:
24def encode(data: Data,
25           format: Format = Format.JSON,
26           indent: typing.Optional[int] = None
27           ) -> str:
28    """Encode JSON data.
29
30    In case of TOML format, data must be JSON Object.
31
32    Args:
33        data: JSON data
34        format: encoding format
35        indent: indentation size
36
37    """
38    if format == Format.JSON:
39        return json.dumps(data, indent=indent)
40
41    if format == Format.YAML:
42        dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper')
43                  else yaml.SafeDumper)
44        return str(yaml.dump(data, indent=indent, Dumper=dumper))
45
46    if format == Format.TOML:
47        return tomli_w.dumps(data)
48
49    raise ValueError('unsupported format')

Encode JSON data.

In case of TOML format, data must be JSON Object.

Args
  • data: JSON data
  • format: encoding format
  • indent: indentation size
def decode( data_str: str, format: hat.json.Format = <Format.JSON: 'json'>) -> ~Data:
52def decode(data_str: str,
53           format: Format = Format.JSON
54           ) -> Data:
55    """Decode JSON data.
56
57    Args:
58        data_str: encoded JSON data
59        format: encoding format
60
61    """
62    if format == Format.JSON:
63        return json.loads(data_str)
64
65    if format == Format.YAML:
66        loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader')
67                  else yaml.SafeLoader)
68        return yaml.load(io.StringIO(data_str), Loader=loader)
69
70    if format == Format.TOML:
71        return tomli.loads(data_str)
72
73    raise ValueError('unsupported format')

Decode JSON data.

Args
  • data_str: encoded JSON data
  • format: encoding format
def get_file_format(path: pathlib.PurePath) -> hat.json.Format:
76def get_file_format(path: pathlib.PurePath) -> Format:
77    """Detect file format based on path suffix"""
78    if path.suffix == '.json':
79        return Format.JSON
80
81    if path.suffix in ('.yaml', '.yml'):
82        return Format.YAML
83
84    if path.suffix == '.toml':
85        return Format.TOML
86
87    raise ValueError('can not determine format from path suffix')

Detect file format based on path suffix

def encode_file( data: ~Data, path: pathlib.PurePath, format: Optional[hat.json.Format] = None, indent: Optional[int] = 4):
 90def encode_file(data: Data,
 91                path: pathlib.PurePath,
 92                format: typing.Optional[Format] = None,
 93                indent: typing.Optional[int] = 4):
 94    """Encode JSON data to file.
 95
 96    If `format` is ``None``, encoding format is derived from path suffix.
 97
 98    In case of TOML format, data must be JSON Object.
 99
100    Args:
101        data: JSON data
102        path: file path
103        format: encoding format
104        indent: indentation size
105
106    """
107    if format is None:
108        format = get_file_format(path)
109
110    flags = 'w' if format != Format.TOML else 'wb'
111    encoding = 'utf-8' if format != Format.TOML else None
112
113    with open(path, flags, encoding=encoding) as f:
114        encode_stream(data, f, format, indent)

Encode JSON data to file.

If format is None, encoding format is derived from path suffix.

In case of TOML format, data must be JSON Object.

Args
  • data: JSON data
  • path: file path
  • format: encoding format
  • indent: indentation size
def decode_file( path: pathlib.PurePath, format: Optional[hat.json.Format] = None) -> ~Data:
117def decode_file(path: pathlib.PurePath,
118                format: typing.Optional[Format] = None
119                ) -> Data:
120    """Decode JSON data from file.
121
122    If `format` is ``None``, encoding format is derived from path suffix.
123
124    Args:
125        path: file path
126        format: encoding format
127
128    """
129    if format is None:
130        format = get_file_format(path)
131
132    flags = 'r' if format != Format.TOML else 'rb'
133    encoding = 'utf-8' if format != Format.TOML else None
134
135    with open(path, flags, encoding=encoding) as f:
136        return decode_stream(f, format)

Decode JSON data from file.

If format is None, encoding format is derived from path suffix.

Args
  • path: file path
  • format: encoding format
def encode_stream( data: ~Data, stream: Union[io.TextIOBase, io.RawIOBase], format: hat.json.Format = <Format.JSON: 'json'>, indent: Optional[int] = 4):
139def encode_stream(data: Data,
140                  stream: typing.Union[io.TextIOBase, io.RawIOBase],
141                  format: Format = Format.JSON,
142                  indent: typing.Optional[int] = 4):
143    """Encode JSON data to stream.
144
145    In case of TOML format, data must be JSON Object.
146
147    In case of TOML format, `stream` should be `io.RawIOBase`. For
148    other formats, `io.TextIOBase` is expected.
149
150    Args:
151        data: JSON data
152        stream: output stream
153        format: encoding format
154        indent: indentation size
155
156    """
157    if format == Format.JSON:
158        json.dump(data, stream, indent=indent)
159
160    elif format == Format.YAML:
161        dumper = (yaml.CSafeDumper if hasattr(yaml, 'CSafeDumper')
162                  else yaml.SafeDumper)
163        yaml.dump(data, stream, indent=indent, Dumper=dumper,
164                  explicit_start=True, explicit_end=True)
165
166    elif format == Format.TOML:
167        tomli_w.dump(data, stream)
168
169    else:
170        raise ValueError('unsupported format')

Encode JSON data to stream.

In case of TOML format, data must be JSON Object.

In case of TOML format, stream should be io.RawIOBase. For other formats, io.TextIOBase is expected.

Args
  • data: JSON data
  • stream: output stream
  • format: encoding format
  • indent: indentation size
def decode_stream( stream: Union[io.TextIOBase, io.RawIOBase], format: hat.json.Format = <Format.JSON: 'json'>) -> ~Data:
173def decode_stream(stream: typing.Union[io.TextIOBase, io.RawIOBase],
174                  format: Format = Format.JSON
175                  ) -> Data:
176    """Decode JSON data from stream.
177
178    In case of TOML format, `stream` should be `io.RawIOBase`. For
179    other formats, `io.TextIOBase` is expected.
180
181    Args:
182        stream: input stream
183        format: encoding format
184
185    """
186    if format == Format.JSON:
187        return json.load(stream)
188
189    if format == Format.YAML:
190        loader = (yaml.CSafeLoader if hasattr(yaml, 'CSafeLoader')
191                  else yaml.SafeLoader)
192        return yaml.load(stream, Loader=loader)
193
194    if format == Format.TOML:
195        return tomli.load(stream)
196
197    raise ValueError('unsupported format')

Decode JSON data from stream.

In case of TOML format, stream should be io.RawIOBase. For other formats, io.TextIOBase is expected.

Args
  • stream: input stream
  • format: encoding format
def diff(src: ~Data, dst: ~Data) -> ~Data:
 9def diff(src: Data,
10         dst: Data
11         ) -> Data:
12    """Generate JSON Patch diff.
13
14    Example::
15
16        src = [1, {'a': 2}, 3]
17        dst = [1, {'a': 4}, 3]
18        result = diff(src, dst)
19        assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
20
21    """
22    return jsonpatch.JsonPatch.from_diff(src, dst).patch

Generate JSON Patch diff.

Example::

src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
def patch(data: ~Data, diff: ~Data) -> ~Data:
25def patch(data: Data,
26          diff: Data
27          ) -> Data:
28    """Apply JSON Patch diff.
29
30    Example::
31
32        data = [1, {'a': 2}, 3]
33        d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
34        result = patch(data, d)
35        assert result == [1, {'a': 4}, 3]
36
37    """
38    return jsonpatch.apply_patch(data, diff)

Apply JSON Patch diff.

Example::

data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]
class SchemaRepository:
 17class SchemaRepository:
 18    """JSON Schema repository.
 19
 20    A repository that holds json schemas and enables validation against them.
 21
 22    Repository can be initialized with multiple arguments, which can be
 23    instances of ``pathlib.PurePath``, ``Data`` or ``SchemaRepository``.
 24
 25    If an argument is of type ``pathlib.PurePath``, and path points to file
 26    with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded
 27    from the file. Otherwise, it is assumed that path points to a directory,
 28    which is recursively searched for json and yaml files. All decoded schemas
 29    are added to the repository. If a schema with the same `id` was previously
 30    added, an exception is raised.
 31
 32    If an argument is of type ``Data``, it should be a json serializable data
 33    representation of a JSON schema. If a schema with the same `id` was
 34    previously added, an exception is raised.
 35
 36    If an argument is of type ``SchemaRepository``, its schemas are added to
 37    the new repository. Previously added schemas with the same `id` are
 38    replaced.
 39
 40    """
 41
 42    def __init__(self, *args: typing.Union[pathlib.PurePath,
 43                                           Data,
 44                                           'SchemaRepository']):
 45        self._validators = weakref.WeakValueDictionary()
 46        self._data = {}
 47        for arg in args:
 48            if isinstance(arg, pathlib.PurePath):
 49                self._load_path(arg)
 50            elif isinstance(arg, SchemaRepository):
 51                self._load_repository(arg)
 52            else:
 53                self._load_schema(arg)
 54
 55    def get_uri_schemes(self) -> typing.Iterable[str]:
 56        """Get URI schemes stored in repository"""
 57        return self._data.keys()
 58
 59    def get_schema_ids(self,
 60                       uri_schemes: typing.Optional[typing.Iterable[str]] = None  # NOQA
 61                       ) -> typing.Iterable[str]:
 62        """Get schema ids stored in repository
 63
 64        If `uri_schemes` is ``None``, all schema ids are returned. Otherwise,
 65        only schema ids that have one of provided URI scheme are returned.
 66
 67        """
 68        if uri_schemes is None:
 69            uri_schemes = self._data.keys()
 70
 71        for uri_scheme in uri_schemes:
 72            schemas = self._data.get(uri_scheme)
 73            if not schemas:
 74                continue
 75
 76            for path in schemas.keys():
 77                yield f'{uri_scheme}://{path}'
 78
 79    def get_schema(self, schema_id: str) -> Data:
 80        """Get stored schema based on schema id"""
 81        uri = urllib.parse.urlparse(schema_id)
 82        path = uri.netloc + uri.path
 83        return self._data[uri.scheme][path]
 84
 85    def validate(self,
 86                 schema_id: str,
 87                 data: Data,
 88                 validator_cls: typing.Type[Validator] = DefaultValidator):
 89        """Validate data against JSON schema.
 90
 91        Args:
 92            schema_id: JSON schema identifier
 93            data: data to be validated
 94            validator_cls: validator implementation
 95
 96        Raises:
 97            Exception
 98
 99        """
100        validator = self._validators.get(validator_cls)
101        if validator is None:
102            validator = validator_cls(self)
103            self._validators[validator_cls] = validator
104
105        validator.validate(schema_id, data)
106
107    def to_json(self) -> Data:
108        """Export repository content as json serializable data.
109
110        Entire repository content is exported as json serializable data.
111        New repository can be created from the exported content by using
112        :meth:`SchemaRepository.from_json`.
113
114        """
115        return self._data
116
117    @staticmethod
118    def from_json(data: typing.Union[pathlib.PurePath,
119                                     Data]
120                  ) -> 'SchemaRepository':
121        """Create new repository from content exported as json serializable
122        data.
123
124        Creates a new repository from content of another repository that was
125        exported by using :meth:`SchemaRepository.to_json`.
126
127        Args:
128            data: repository data
129
130        """
131        if isinstance(data, pathlib.PurePath):
132            data = decode_file(data)
133        repo = SchemaRepository()
134        repo._data = data
135        return repo
136
137    def _load_path(self, path):
138        json_suffixes = {'.json', '.yaml', '.yml'}
139        paths = ([path] if path.suffix in json_suffixes
140                 else list(itertools.chain.from_iterable(
141                    path.rglob(f'*{i}') for i in json_suffixes)))
142        for path in paths:
143            schema = decode_file(path)
144            self._load_schema(schema)
145
146    def _load_schema(self, schema):
147        if '$schema' in schema:
148            meta_schema_id = urllib.parse.urldefrag(schema['$schema']).url
149            if meta_schema_id not in _meta_schema_ids:
150                schema = dict(schema)
151                del schema['$schema']
152
153        uri = urllib.parse.urlparse(schema['id'])
154        path = uri.netloc + uri.path
155        if uri.scheme not in self._data:
156            self._data[uri.scheme] = {}
157        if path in self._data[uri.scheme]:
158            raise Exception(f"duplicate schema id {uri.scheme}://{path}")
159        self._data[uri.scheme][path] = schema
160
161    def _load_repository(self, repo):
162        for k, v in repo._data.items():
163            if k not in self._data:
164                self._data[k] = v
165            else:
166                self._data[k].update(v)

JSON Schema repository.

A repository that holds json schemas and enables validation against them.

Repository can be initialized with multiple arguments, which can be instances of pathlib.PurePath, Data or SchemaRepository.

If an argument is of type pathlib.PurePath, and path points to file with a suffix '.json', '.yml' or '.yaml', json serializable data is decoded from the file. Otherwise, it is assumed that path points to a directory, which is recursively searched for json and yaml files. All decoded schemas are added to the repository. If a schema with the same id was previously added, an exception is raised.

If an argument is of type Data, it should be a json serializable data representation of a JSON schema. If a schema with the same id was previously added, an exception is raised.

If an argument is of type SchemaRepository, its schemas are added to the new repository. Previously added schemas with the same id are replaced.

SchemaRepository( *args: Union[pathlib.PurePath, ~Data, hat.json.SchemaRepository])
42    def __init__(self, *args: typing.Union[pathlib.PurePath,
43                                           Data,
44                                           'SchemaRepository']):
45        self._validators = weakref.WeakValueDictionary()
46        self._data = {}
47        for arg in args:
48            if isinstance(arg, pathlib.PurePath):
49                self._load_path(arg)
50            elif isinstance(arg, SchemaRepository):
51                self._load_repository(arg)
52            else:
53                self._load_schema(arg)
def get_uri_schemes(self) -> Iterable[str]:
55    def get_uri_schemes(self) -> typing.Iterable[str]:
56        """Get URI schemes stored in repository"""
57        return self._data.keys()

Get URI schemes stored in repository

def get_schema_ids(self, uri_schemes: Optional[Iterable[str]] = None) -> Iterable[str]:
59    def get_schema_ids(self,
60                       uri_schemes: typing.Optional[typing.Iterable[str]] = None  # NOQA
61                       ) -> typing.Iterable[str]:
62        """Get schema ids stored in repository
63
64        If `uri_schemes` is ``None``, all schema ids are returned. Otherwise,
65        only schema ids that have one of provided URI scheme are returned.
66
67        """
68        if uri_schemes is None:
69            uri_schemes = self._data.keys()
70
71        for uri_scheme in uri_schemes:
72            schemas = self._data.get(uri_scheme)
73            if not schemas:
74                continue
75
76            for path in schemas.keys():
77                yield f'{uri_scheme}://{path}'

Get schema ids stored in repository

If uri_schemes is None, all schema ids are returned. Otherwise, only schema ids that have one of provided URI scheme are returned.

def get_schema(self, schema_id: str) -> ~Data:
79    def get_schema(self, schema_id: str) -> Data:
80        """Get stored schema based on schema id"""
81        uri = urllib.parse.urlparse(schema_id)
82        path = uri.netloc + uri.path
83        return self._data[uri.scheme][path]

Get stored schema based on schema id

def validate( self, schema_id: str, data: ~Data, validator_cls: Type[hat.json.Validator] = <class 'hat.json.JsonSchemaValidator'>):
 85    def validate(self,
 86                 schema_id: str,
 87                 data: Data,
 88                 validator_cls: typing.Type[Validator] = DefaultValidator):
 89        """Validate data against JSON schema.
 90
 91        Args:
 92            schema_id: JSON schema identifier
 93            data: data to be validated
 94            validator_cls: validator implementation
 95
 96        Raises:
 97            Exception
 98
 99        """
100        validator = self._validators.get(validator_cls)
101        if validator is None:
102            validator = validator_cls(self)
103            self._validators[validator_cls] = validator
104
105        validator.validate(schema_id, data)

Validate data against JSON schema.

Args
  • schema_id: JSON schema identifier
  • data: data to be validated
  • validator_cls: validator implementation
Raises
  • Exception
def to_json(self) -> ~Data:
107    def to_json(self) -> Data:
108        """Export repository content as json serializable data.
109
110        Entire repository content is exported as json serializable data.
111        New repository can be created from the exported content by using
112        :meth:`SchemaRepository.from_json`.
113
114        """
115        return self._data

Export repository content as json serializable data.

Entire repository content is exported as json serializable data. New repository can be created from the exported content by using SchemaRepository.from_json.

@staticmethod
def from_json( data: Union[pathlib.PurePath, ~Data]) -> hat.json.SchemaRepository:
117    @staticmethod
118    def from_json(data: typing.Union[pathlib.PurePath,
119                                     Data]
120                  ) -> 'SchemaRepository':
121        """Create new repository from content exported as json serializable
122        data.
123
124        Creates a new repository from content of another repository that was
125        exported by using :meth:`SchemaRepository.to_json`.
126
127        Args:
128            data: repository data
129
130        """
131        if isinstance(data, pathlib.PurePath):
132            data = decode_file(data)
133        repo = SchemaRepository()
134        repo._data = data
135        return repo

Create new repository from content exported as json serializable data.

Creates a new repository from content of another repository that was exported by using SchemaRepository.to_json.

Args
  • data: repository data
json_schema_repo = <hat.json.SchemaRepository object>
class Validator(typing.Protocol):
28class Validator(typing.Protocol):
29    """JSON Schema validator interface
30
31    Args:
32        repo: repository containing JSON Schemas
33
34    """
35
36    def __init__(self, repo: Repository):
37        ...
38
39    def validate(self, schema_id: str, data: Data):
40        """Validate data against JSON Schema.
41
42        Args:
43            schema_id: JSON schema identifier
44            data: data to be validated
45
46        Raises:
47            Exception
48
49        """

JSON Schema validator interface

Args
  • repo: repository containing JSON Schemas
Validator(*args, **kwargs)
1431def _no_init_or_replace_init(self, *args, **kwargs):
1432    cls = type(self)
1433
1434    if cls._is_protocol:
1435        raise TypeError('Protocols cannot be instantiated')
1436
1437    # Already using a custom `__init__`. No need to calculate correct
1438    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1439    if cls.__init__ is not _no_init_or_replace_init:
1440        return
1441
1442    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1443    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1444    # searches for a proper new `__init__` in the MRO. The new `__init__`
1445    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1446    # instantiation of the protocol subclass will thus use the new
1447    # `__init__` and no longer call `_no_init_or_replace_init`.
1448    for base in cls.__mro__:
1449        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1450        if init is not _no_init_or_replace_init:
1451            cls.__init__ = init
1452            break
1453    else:
1454        # should not happen
1455        cls.__init__ = object.__init__
1456
1457    cls.__init__(self, *args, **kwargs)
def validate(self, schema_id: str, data: ~Data):
39    def validate(self, schema_id: str, data: Data):
40        """Validate data against JSON Schema.
41
42        Args:
43            schema_id: JSON schema identifier
44            data: data to be validated
45
46        Raises:
47            Exception
48
49        """

Validate data against JSON Schema.

Args
  • schema_id: JSON schema identifier
  • data: data to be validated
Raises
  • Exception
class DefaultValidator:
10class JsonSchemaValidator:
11
12    def __init__(self, repo: Repository):
13        self._repo = repo
14
15    def validate(self, schema_id: str, data: Data):
16        uri = urllib.parse.urlparse(schema_id)
17        path = uri.netloc + uri.path
18        resolver = jsonschema.RefResolver(
19            base_uri=f'{uri.scheme}://{path}',
20            referrer=self._repo.get_schema(schema_id),
21            handlers={i: self._repo.get_schema
22                      for i in self._repo.get_uri_schemes()})
23        jsonschema.validate(
24            instance=data,
25            schema=resolver.resolve_fragment(resolver.referrer, uri.fragment),
26            resolver=resolver)
DefaultValidator(repo: hat.json.validator.common.Repository)
12    def __init__(self, repo: Repository):
13        self._repo = repo
def validate(self, schema_id: str, data: ~Data):
15    def validate(self, schema_id: str, data: Data):
16        uri = urllib.parse.urlparse(schema_id)
17        path = uri.netloc + uri.path
18        resolver = jsonschema.RefResolver(
19            base_uri=f'{uri.scheme}://{path}',
20            referrer=self._repo.get_schema(schema_id),
21            handlers={i: self._repo.get_schema
22                      for i in self._repo.get_uri_schemes()})
23        jsonschema.validate(
24            instance=data,
25            schema=resolver.resolve_fragment(resolver.referrer, uri.fragment),
26            resolver=resolver)
class JsonSchemaValidator:
10class JsonSchemaValidator:
11
12    def __init__(self, repo: Repository):
13        self._repo = repo
14
15    def validate(self, schema_id: str, data: Data):
16        uri = urllib.parse.urlparse(schema_id)
17        path = uri.netloc + uri.path
18        resolver = jsonschema.RefResolver(
19            base_uri=f'{uri.scheme}://{path}',
20            referrer=self._repo.get_schema(schema_id),
21            handlers={i: self._repo.get_schema
22                      for i in self._repo.get_uri_schemes()})
23        jsonschema.validate(
24            instance=data,
25            schema=resolver.resolve_fragment(resolver.referrer, uri.fragment),
26            resolver=resolver)
JsonSchemaValidator(repo: hat.json.validator.common.Repository)
12    def __init__(self, repo: Repository):
13        self._repo = repo
def validate(self, schema_id: str, data: ~Data):
15    def validate(self, schema_id: str, data: Data):
16        uri = urllib.parse.urlparse(schema_id)
17        path = uri.netloc + uri.path
18        resolver = jsonschema.RefResolver(
19            base_uri=f'{uri.scheme}://{path}',
20            referrer=self._repo.get_schema(schema_id),
21            handlers={i: self._repo.get_schema
22                      for i in self._repo.get_uri_schemes()})
23        jsonschema.validate(
24            instance=data,
25            schema=resolver.resolve_fragment(resolver.referrer, uri.fragment),
26            resolver=resolver)