hat.json.patch
JSON Patch
1"""JSON Patch""" 2 3import typing 4 5import jsonpatch 6 7from hat.json.data import Data, equals 8 9 10_Pointer: typing.TypeAlias = list[str] 11 12 13def diff(src: Data, 14 dst: Data 15 ) -> Data: 16 """Generate JSON Patch diff. 17 18 Example:: 19 20 src = [1, {'a': 2}, 3] 21 dst = [1, {'a': 4}, 3] 22 result = diff(src, dst) 23 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 24 25 """ 26 return jsonpatch.JsonPatch.from_diff(src, dst).patch 27 28 29def patch(data: Data, 30 diff: Data 31 ) -> Data: 32 """Apply JSON Patch diff. 33 34 Example:: 35 36 data = [1, {'a': 2}, 3] 37 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 38 result = patch(data, d) 39 assert result == [1, {'a': 4}, 3] 40 41 """ 42 for op in diff: 43 data = _apply_op(data, op) 44 45 return data 46 47 48def _apply_op(data: Data, op: Data) -> Data: 49 50 if op['op'] == 'add': 51 path = _parse_pointer(op['path']) 52 return _add(data, path, op['value']) 53 54 if op['op'] == 'remove': 55 path = _parse_pointer(op['path']) 56 return _remove(data, path) 57 58 if op['op'] == 'replace': 59 path = _parse_pointer(op['path']) 60 return _replace(data, path, op['value']) 61 62 if op['op'] == 'move': 63 from_path = _parse_pointer(op['from']) 64 to_path = _parse_pointer(op['path']) 65 return _move(data, from_path, to_path) 66 67 if op['op'] == 'copy': 68 from_path = _parse_pointer(op['from']) 69 to_path = _parse_pointer(op['path']) 70 return _copy(data, from_path, to_path) 71 72 if op['op'] == 'test': 73 path = _parse_pointer(op['path']) 74 return _test(data, path, op['value']) 75 76 raise ValueError('unsupported operation') 77 78 79def _add(data: Data, path: _Pointer, value: Data) -> Data: 80 81 if not path: 82 return value 83 84 key, *rest = path 85 86 if isinstance(data, list): 87 if rest: 88 idx = int(key) 89 if not 0 <= idx < len(data): 90 raise ValueError('invalid array index') 91 92 return [*data[:idx], _add(data[idx], rest, value), *data[idx+1:]] 93 94 else: 95 if key == '-': 96 return [*data, value] 97 98 idx = int(key) 99 if not 0 <= idx <= len(data): 100 raise ValueError('invalid array index') 101 102 return [*data[:idx], value, *data[idx:]] 103 104 if isinstance(data, dict): 105 if rest: 106 if key not in data: 107 raise ValueError('invalid object key') 108 109 return {**data, key: _add(data[key], rest, value)} 110 111 else: 112 return {**data, key: value} 113 114 raise ValueError('invalid data type') 115 116 117def _remove(data: Data, path: _Pointer) -> Data: 118 119 if not path: 120 return None 121 122 key, *rest = path 123 124 if isinstance(data, list): 125 idx = int(key) 126 if not 0 <= idx < len(data): 127 raise ValueError('invalid array index') 128 129 if rest: 130 return [*data[:idx], _remove(data[idx], rest), *data[idx+1:]] 131 else: 132 return [*data[:idx], *data[idx+1:]] 133 134 if isinstance(data, dict): 135 if key not in data: 136 raise ValueError('invalid object key') 137 138 if rest: 139 return {**data, key: _remove(data[key], rest)} 140 else: 141 return {k: v for k, v in data.items() if k != key} 142 143 144def _replace(data: Data, path: _Pointer, value: Data) -> Data: 145 146 if not path: 147 return value 148 149 key, *rest = path 150 151 if isinstance(data, list): 152 idx = int(key) 153 if not 0 <= idx < len(data): 154 raise ValueError('invalid array index') 155 156 if rest: 157 return [*data[:idx], _replace(data[idx], rest, value), 158 *data[idx+1:]] 159 else: 160 return [*data[:idx], value, *data[idx+1:]] 161 162 if isinstance(data, dict): 163 if key not in data: 164 raise ValueError('invalid object key') 165 166 if rest: 167 return {**data, key: _replace(data[key], rest, value)} 168 else: 169 return {**data, key: value} 170 171 172def _move(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data: 173 if len(to_path) > len(from_path) and from_path == to_path[:len(from_path)]: 174 raise ValueError("path can't be child of from") 175 176 value = _get(data, from_path) 177 return _add(_remove(data, from_path), to_path, value) 178 179 180def _copy(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data: 181 value = _get(data, from_path) 182 return _add(data, to_path, value) 183 184 185def _test(data: Data, path: _Pointer, value: Data) -> Data: 186 if not equals(value, _get(data, path)): 187 raise ValueError('invalid value') 188 189 190def _get(data: Data, path: _Pointer) -> Data: 191 192 if not path: 193 return data 194 195 key, *rest = path 196 197 if isinstance(data, list): 198 idx = int(key) 199 if not 0 <= idx < len(data): 200 raise ValueError('invalid array index') 201 202 return _get(data[idx], rest) 203 204 if isinstance(data, dict): 205 if key not in data: 206 raise ValueError('invalid object key') 207 208 return _get(data[key], rest) 209 210 raise ValueError('invalid data type') 211 212 213def _parse_pointer(pointer: str) -> _Pointer: 214 if pointer == '': 215 return [] 216 217 segments = pointer.split('/') 218 if segments[0] != '': 219 raise ValueError('invalid pointer') 220 221 return [_unescape_pointer_segment(i) for i in segments[1:]] 222 223 224def _unescape_pointer_segment(segment: str) -> str: 225 return segment.replace('~1', '/').replace('~0', '~') 226 227 228# check upstream changes in jsonpatch and validate performance impact 229 230# def _monkeypatch_jsonpatch(): 231# """Monkeypatch jsonpatch. 232 233# Patch incorrect value comparison between ``bool`` and numeric values when 234# diffing json serializable data. 235 236# Comparing `False` to `0` or `0.0`; and `True` to `1` or `1.0` incorrectly 237# results in no change. 238 239# """ 240# def _compare_values(self, path, key, src, dst): 241 242# if isinstance(src, jsonpatch.MutableMapping) and \ 243# isinstance(dst, jsonpatch.MutableMapping): 244# self._compare_dicts(jsonpatch._path_join(path, key), src, dst) 245 246# elif isinstance(src, jsonpatch.MutableSequence) and \ 247# isinstance(dst, jsonpatch.MutableSequence): 248# self._compare_lists(jsonpatch._path_join(path, key), src, dst) 249 250# elif isinstance(src, bool) == isinstance(dst, bool) and src == dst: 251# pass 252 253# else: 254# self._item_replaced(path, key, dst) 255 256# jsonpatch.DiffBuilder._compare_values = _compare_values 257 258 259# _monkeypatch_jsonpatch()
def
diff( src: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], dst: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
14def diff(src: Data, 15 dst: Data 16 ) -> Data: 17 """Generate JSON Patch diff. 18 19 Example:: 20 21 src = [1, {'a': 2}, 3] 22 dst = [1, {'a': 4}, 3] 23 result = diff(src, dst) 24 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 25 26 """ 27 return jsonpatch.JsonPatch.from_diff(src, dst).patch
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
def
patch( data: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], diff: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
30def patch(data: Data, 31 diff: Data 32 ) -> Data: 33 """Apply JSON Patch diff. 34 35 Example:: 36 37 data = [1, {'a': 2}, 3] 38 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 39 result = patch(data, d) 40 assert result == [1, {'a': 4}, 3] 41 42 """ 43 for op in diff: 44 data = _apply_op(data, op) 45 46 return data
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]