hat.json.patch

JSON Patch

  1"""JSON Patch"""
  2
  3import typing
  4
  5from hat.json.data import Data, equals
  6
  7
  8_Pointer: typing.TypeAlias = list[str]
  9
 10
 11def diff(src: Data,
 12         dst: Data
 13         ) -> Data:
 14    """Generate JSON Patch diff.
 15
 16    Example::
 17
 18        src = [1, {'a': 2}, 3]
 19        dst = [1, {'a': 4}, 3]
 20        result = diff(src, dst)
 21        assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
 22
 23    """
 24    return list(_diff(src, dst))
 25
 26
 27def _diff(src: Data,
 28          dst: Data,
 29          pointer: _Pointer = []) -> typing.Iterable[Data]:
 30
 31    if _shallow_equals(src, dst):
 32        return
 33
 34    if isinstance(src, list) and isinstance(dst, list):
 35
 36        if not src and not dst:
 37            return
 38
 39        if not src or not dst:
 40            yield {'op': 'replace',
 41                   'path': _format_pointer(pointer),
 42                   'value': dst}
 43            return
 44
 45        if len(src) == len(dst):
 46            for i in range(len(src)):
 47                yield from _diff(src[i], dst[i], [*pointer, str(i)])
 48
 49        elif len(src) > len(dst):
 50            dst_i = 0
 51            to_remove = len(src) - len(dst)
 52
 53            for src_i in range(len(src)):
 54
 55                if dst_i < len(dst) and _shallow_equals(src[src_i],
 56                                                        dst[dst_i]):
 57                    dst_i += 1
 58
 59                elif to_remove > 0:
 60                    yield {'op': 'remove',
 61                           'path': _format_pointer([*pointer, str(dst_i)])}
 62                    to_remove -= 1
 63
 64                else:
 65                    yield from _diff(src[src_i], dst[dst_i],
 66                                     [*pointer, str(dst_i)])
 67                    dst_i += 1
 68
 69        else:
 70            src_i = 0
 71            to_add = len(dst) - len(src)
 72
 73            for dst_i in range(len(dst)):
 74
 75                if src_i < len(src) and _shallow_equals(src[src_i],
 76                                                        dst[dst_i]):
 77                    src_i += 1
 78
 79                elif to_add > 0:
 80                    yield {'op': 'add',
 81                           'path': _format_pointer([*pointer, str(dst_i)]),
 82                           'value': dst[dst_i]}
 83                    to_add -= 1
 84
 85                else:
 86                    yield from _diff(src[src_i], dst[dst_i],
 87                                     [*pointer, str(dst_i)])
 88                    src_i += 1
 89
 90    elif isinstance(src, dict) and isinstance(dst, dict):
 91
 92        if not src and not dst:
 93            return
 94
 95        if not src or not dst:
 96            yield {'op': 'replace',
 97                   'path': _format_pointer(pointer),
 98                   'value': dst}
 99            return
100
101        for k in src:
102
103            if k not in dst:
104                yield {'op': 'remove', 'path': _format_pointer([*pointer, k])}
105
106        for k in dst:
107
108            if k not in src:
109                yield {'op': 'add',
110                       'path': _format_pointer([*pointer, k]),
111                       'value': dst[k]}
112
113            else:
114                yield from _diff(src[k], dst[k], [*pointer, k])
115
116    else:
117        yield {'op': 'replace',
118               'path': _format_pointer(pointer),
119               'value': dst}
120
121
122def patch(data: Data,
123          diff: Data
124          ) -> Data:
125    """Apply JSON Patch diff.
126
127    Example::
128
129        data = [1, {'a': 2}, 3]
130        d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
131        result = patch(data, d)
132        assert result == [1, {'a': 4}, 3]
133
134    """
135    for op in diff:
136        data = _apply_op(data, op)
137
138    return data
139
140
141def _apply_op(data: Data, op: Data) -> Data:
142
143    if op['op'] == 'add':
144        path = _parse_pointer(op['path'])
145        return _add(data, path, op['value'])
146
147    if op['op'] == 'remove':
148        path = _parse_pointer(op['path'])
149        return _remove(data, path)
150
151    if op['op'] == 'replace':
152        path = _parse_pointer(op['path'])
153        return _replace(data, path, op['value'])
154
155    if op['op'] == 'move':
156        from_path = _parse_pointer(op['from'])
157        to_path = _parse_pointer(op['path'])
158        return _move(data, from_path, to_path)
159
160    if op['op'] == 'copy':
161        from_path = _parse_pointer(op['from'])
162        to_path = _parse_pointer(op['path'])
163        return _copy(data, from_path, to_path)
164
165    if op['op'] == 'test':
166        path = _parse_pointer(op['path'])
167        return _test(data, path, op['value'])
168
169    raise ValueError('unsupported operation')
170
171
172def _add(data: Data, path: _Pointer, value: Data) -> Data:
173
174    if not path:
175        return value
176
177    key, *rest = path
178
179    if isinstance(data, list):
180        if rest:
181            idx = int(key)
182            if not 0 <= idx < len(data):
183                raise ValueError('invalid array index')
184
185            return [*data[:idx], _add(data[idx], rest, value), *data[idx+1:]]
186
187        else:
188            if key == '-':
189                return [*data, value]
190
191            idx = int(key)
192            if not 0 <= idx <= len(data):
193                raise ValueError('invalid array index')
194
195            return [*data[:idx], value, *data[idx:]]
196
197    if isinstance(data, dict):
198        if rest:
199            if key not in data:
200                raise ValueError('invalid object key')
201
202            return {**data, key: _add(data[key], rest, value)}
203
204        else:
205            return {**data, key: value}
206
207    raise ValueError('invalid data type')
208
209
210def _remove(data: Data, path: _Pointer) -> Data:
211
212    if not path:
213        return None
214
215    key, *rest = path
216
217    if isinstance(data, list):
218        idx = int(key)
219        if not 0 <= idx < len(data):
220            raise ValueError('invalid array index')
221
222        if rest:
223            return [*data[:idx], _remove(data[idx], rest), *data[idx+1:]]
224        else:
225            return [*data[:idx], *data[idx+1:]]
226
227    if isinstance(data, dict):
228        if key not in data:
229            raise ValueError('invalid object key')
230
231        if rest:
232            return {**data, key: _remove(data[key], rest)}
233        else:
234            return {k: v for k, v in data.items() if k != key}
235
236
237def _replace(data: Data, path: _Pointer, value: Data) -> Data:
238
239    if not path:
240        return value
241
242    key, *rest = path
243
244    if isinstance(data, list):
245        idx = int(key)
246        if not 0 <= idx < len(data):
247            raise ValueError('invalid array index')
248
249        if rest:
250            return [*data[:idx], _replace(data[idx], rest, value),
251                    *data[idx+1:]]
252        else:
253            return [*data[:idx], value, *data[idx+1:]]
254
255    if isinstance(data, dict):
256        if key not in data:
257            raise ValueError('invalid object key')
258
259        if rest:
260            return {**data, key: _replace(data[key], rest, value)}
261        else:
262            return {**data, key: value}
263
264
265def _move(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data:
266    if len(to_path) > len(from_path) and from_path == to_path[:len(from_path)]:
267        raise ValueError("path can't be child of from")
268
269    value = _get(data, from_path)
270    return _add(_remove(data, from_path), to_path, value)
271
272
273def _copy(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data:
274    value = _get(data, from_path)
275    return _add(data, to_path, value)
276
277
278def _test(data: Data, path: _Pointer, value: Data) -> Data:
279    if not equals(value, _get(data, path)):
280        raise ValueError('invalid value')
281
282
283def _get(data: Data, path: _Pointer) -> Data:
284
285    if not path:
286        return data
287
288    key, *rest = path
289
290    if isinstance(data, list):
291        idx = int(key)
292        if not 0 <= idx < len(data):
293            raise ValueError('invalid array index')
294
295        return _get(data[idx], rest)
296
297    if isinstance(data, dict):
298        if key not in data:
299            raise ValueError('invalid object key')
300
301        return _get(data[key], rest)
302
303    raise ValueError('invalid data type')
304
305
306def _format_pointer(pointer: _Pointer) -> str:
307    if not pointer:
308        return ''
309
310    return '/' + '/'.join(_escape_pointer_segment(i) for i in pointer)
311
312
313def _parse_pointer(pointer: str) -> _Pointer:
314    if pointer == '':
315        return []
316
317    segments = pointer.split('/')
318    if segments[0] != '':
319        raise ValueError('invalid pointer')
320
321    return [_unescape_pointer_segment(i) for i in segments[1:]]
322
323
324def _escape_pointer_segment(segment: str) -> str:
325    return segment.replace('~', '~0').replace('/', '~1')
326
327
328def _unescape_pointer_segment(segment: str) -> str:
329    return segment.replace('~1', '/').replace('~0', '~')
330
331
332def _shallow_equals(a: Data, b: Data):
333    if isinstance(a, (list, dict)) or isinstance(b, (list, dict)):
334        return a is b
335
336    return equals(a, b)
def diff( src: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], dst: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
12def diff(src: Data,
13         dst: Data
14         ) -> Data:
15    """Generate JSON Patch diff.
16
17    Example::
18
19        src = [1, {'a': 2}, 3]
20        dst = [1, {'a': 4}, 3]
21        result = diff(src, dst)
22        assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
23
24    """
25    return list(_diff(src, dst))

Generate JSON Patch diff.

Example::

src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
def patch( data: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], diff: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
123def patch(data: Data,
124          diff: Data
125          ) -> Data:
126    """Apply JSON Patch diff.
127
128    Example::
129
130        data = [1, {'a': 2}, 3]
131        d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
132        result = patch(data, d)
133        assert result == [1, {'a': 4}, 3]
134
135    """
136    for op in diff:
137        data = _apply_op(data, op)
138
139    return data

Apply JSON Patch diff.

Example::

data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]