hat.json.patch

JSON Patch

  1"""JSON Patch"""
  2
  3import typing
  4
  5import jsonpatch
  6
  7from hat.json.data import Data, equals
  8
  9
 10_Pointer: typing.TypeAlias = list[str]
 11
 12
 13def diff(src: Data,
 14         dst: Data
 15         ) -> Data:
 16    """Generate JSON Patch diff.
 17
 18    Example::
 19
 20        src = [1, {'a': 2}, 3]
 21        dst = [1, {'a': 4}, 3]
 22        result = diff(src, dst)
 23        assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
 24
 25    """
 26    return jsonpatch.JsonPatch.from_diff(src, dst).patch
 27
 28
 29def patch(data: Data,
 30          diff: Data
 31          ) -> Data:
 32    """Apply JSON Patch diff.
 33
 34    Example::
 35
 36        data = [1, {'a': 2}, 3]
 37        d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
 38        result = patch(data, d)
 39        assert result == [1, {'a': 4}, 3]
 40
 41    """
 42    for op in diff:
 43        data = _apply_op(data, op)
 44
 45    return data
 46
 47
 48def _apply_op(data: Data, op: Data) -> Data:
 49
 50    if op['op'] == 'add':
 51        path = _parse_pointer(op['path'])
 52        return _add(data, path, op['value'])
 53
 54    if op['op'] == 'remove':
 55        path = _parse_pointer(op['path'])
 56        return _remove(data, path)
 57
 58    if op['op'] == 'replace':
 59        path = _parse_pointer(op['path'])
 60        return _replace(data, path, op['value'])
 61
 62    if op['op'] == 'move':
 63        from_path = _parse_pointer(op['from'])
 64        to_path = _parse_pointer(op['path'])
 65        return _move(data, from_path, to_path)
 66
 67    if op['op'] == 'copy':
 68        from_path = _parse_pointer(op['from'])
 69        to_path = _parse_pointer(op['path'])
 70        return _copy(data, from_path, to_path)
 71
 72    if op['op'] == 'test':
 73        path = _parse_pointer(op['path'])
 74        return _test(data, path, op['value'])
 75
 76    raise ValueError('unsupported operation')
 77
 78
 79def _add(data: Data, path: _Pointer, value: Data) -> Data:
 80
 81    if not path:
 82        return value
 83
 84    key, *rest = path
 85
 86    if isinstance(data, list):
 87        if rest:
 88            idx = int(key)
 89            if not 0 <= idx < len(data):
 90                raise ValueError('invalid array index')
 91
 92            return [*data[:idx], _add(data[idx], rest, value), *data[idx+1:]]
 93
 94        else:
 95            if key == '-':
 96                return [*data, value]
 97
 98            idx = int(key)
 99            if not 0 <= idx <= len(data):
100                raise ValueError('invalid array index')
101
102            return [*data[:idx], value, *data[idx:]]
103
104    if isinstance(data, dict):
105        if rest:
106            if key not in data:
107                raise ValueError('invalid object key')
108
109            return {**data, key: _add(data[key], rest, value)}
110
111        else:
112            return {**data, key: value}
113
114    raise ValueError('invalid data type')
115
116
117def _remove(data: Data, path: _Pointer) -> Data:
118
119    if not path:
120        return None
121
122    key, *rest = path
123
124    if isinstance(data, list):
125        idx = int(key)
126        if not 0 <= idx < len(data):
127            raise ValueError('invalid array index')
128
129        if rest:
130            return [*data[:idx], _remove(data[idx], rest), *data[idx+1:]]
131        else:
132            return [*data[:idx], *data[idx+1:]]
133
134    if isinstance(data, dict):
135        if key not in data:
136            raise ValueError('invalid object key')
137
138        if rest:
139            return {**data, key: _remove(data[key], rest)}
140        else:
141            return {k: v for k, v in data.items() if k != key}
142
143
144def _replace(data: Data, path: _Pointer, value: Data) -> Data:
145
146    if not path:
147        return value
148
149    key, *rest = path
150
151    if isinstance(data, list):
152        idx = int(key)
153        if not 0 <= idx < len(data):
154            raise ValueError('invalid array index')
155
156        if rest:
157            return [*data[:idx], _replace(data[idx], rest, value),
158                    *data[idx+1:]]
159        else:
160            return [*data[:idx], value, *data[idx+1:]]
161
162    if isinstance(data, dict):
163        if key not in data:
164            raise ValueError('invalid object key')
165
166        if rest:
167            return {**data, key: _replace(data[key], rest, value)}
168        else:
169            return {**data, key: value}
170
171
172def _move(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data:
173    if len(to_path) > len(from_path) and from_path == to_path[:len(from_path)]:
174        raise ValueError("path can't be child of from")
175
176    value = _get(data, from_path)
177    return _add(_remove(data, from_path), to_path, value)
178
179
180def _copy(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data:
181    value = _get(data, from_path)
182    return _add(data, to_path, value)
183
184
185def _test(data: Data, path: _Pointer, value: Data) -> Data:
186    if not equals(value, _get(data, path)):
187        raise ValueError('invalid value')
188
189
190def _get(data: Data, path: _Pointer) -> Data:
191
192    if not path:
193        return data
194
195    key, *rest = path
196
197    if isinstance(data, list):
198        idx = int(key)
199        if not 0 <= idx < len(data):
200            raise ValueError('invalid array index')
201
202        return _get(data[idx], rest)
203
204    if isinstance(data, dict):
205        if key not in data:
206            raise ValueError('invalid object key')
207
208        return _get(data[key], rest)
209
210    raise ValueError('invalid data type')
211
212
213def _parse_pointer(pointer: str) -> _Pointer:
214    if pointer == '':
215        return []
216
217    segments = pointer.split('/')
218    if segments[0] != '':
219        raise ValueError('invalid pointer')
220
221    return [_unescape_pointer_segment(i) for i in segments[1:]]
222
223
224def _unescape_pointer_segment(segment: str) -> str:
225    return segment.replace('~1', '/').replace('~0', '~')
226
227
228# check upstream changes in jsonpatch and validate performance impact
229
230# def _monkeypatch_jsonpatch():
231#     """Monkeypatch jsonpatch.
232
233#     Patch incorrect value comparison between ``bool`` and numeric values when
234#     diffing json serializable data.
235
236#     Comparing `False` to `0` or `0.0`; and `True` to `1` or `1.0` incorrectly
237#     results in no change.
238
239#     """
240#     def _compare_values(self, path, key, src, dst):
241
242#         if isinstance(src, jsonpatch.MutableMapping) and \
243#                 isinstance(dst, jsonpatch.MutableMapping):
244#             self._compare_dicts(jsonpatch._path_join(path, key), src, dst)
245
246#         elif isinstance(src, jsonpatch.MutableSequence) and \
247#                 isinstance(dst, jsonpatch.MutableSequence):
248#             self._compare_lists(jsonpatch._path_join(path, key), src, dst)
249
250#         elif isinstance(src, bool) == isinstance(dst, bool) and src == dst:
251#             pass
252
253#         else:
254#             self._item_replaced(path, key, dst)
255
256#     jsonpatch.DiffBuilder._compare_values = _compare_values
257
258
259# _monkeypatch_jsonpatch()
def diff( src: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], dst: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
14def diff(src: Data,
15         dst: Data
16         ) -> Data:
17    """Generate JSON Patch diff.
18
19    Example::
20
21        src = [1, {'a': 2}, 3]
22        dst = [1, {'a': 4}, 3]
23        result = diff(src, dst)
24        assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
25
26    """
27    return jsonpatch.JsonPatch.from_diff(src, dst).patch

Generate JSON Patch diff.

Example::

src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
def patch( data: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], diff: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
30def patch(data: Data,
31          diff: Data
32          ) -> Data:
33    """Apply JSON Patch diff.
34
35    Example::
36
37        data = [1, {'a': 2}, 3]
38        d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
39        result = patch(data, d)
40        assert result == [1, {'a': 4}, 3]
41
42    """
43    for op in diff:
44        data = _apply_op(data, op)
45
46    return data

Apply JSON Patch diff.

Example::

data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]