hat.json.patch
JSON Patch
1"""JSON Patch""" 2 3import typing 4 5from hat.json.data import Data, equals 6 7 8_Pointer: typing.TypeAlias = list[str] 9 10 11def diff(src: Data, 12 dst: Data 13 ) -> Data: 14 """Generate JSON Patch diff. 15 16 Example:: 17 18 src = [1, {'a': 2}, 3] 19 dst = [1, {'a': 4}, 3] 20 result = diff(src, dst) 21 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 22 23 """ 24 return list(_diff(src, dst)) 25 26 27def _diff(src: Data, 28 dst: Data, 29 pointer: _Pointer = []) -> typing.Iterable[Data]: 30 31 if _shallow_equals(src, dst): 32 return 33 34 if isinstance(src, list) and isinstance(dst, list): 35 36 if not src and not dst: 37 return 38 39 if not src or not dst: 40 yield {'op': 'replace', 41 'path': _format_pointer(pointer), 42 'value': dst} 43 return 44 45 if len(src) == len(dst): 46 for i in range(len(src)): 47 yield from _diff(src[i], dst[i], [*pointer, str(i)]) 48 49 elif len(src) > len(dst): 50 dst_i = 0 51 to_remove = len(src) - len(dst) 52 53 for src_i in range(len(src)): 54 55 if dst_i < len(dst) and _shallow_equals(src[src_i], 56 dst[dst_i]): 57 dst_i += 1 58 59 elif to_remove > 0: 60 yield {'op': 'remove', 61 'path': _format_pointer([*pointer, str(dst_i)])} 62 to_remove -= 1 63 64 else: 65 yield from _diff(src[src_i], dst[dst_i], 66 [*pointer, str(dst_i)]) 67 dst_i += 1 68 69 else: 70 src_i = 0 71 to_add = len(dst) - len(src) 72 73 for dst_i in range(len(dst)): 74 75 if src_i < len(src) and _shallow_equals(src[src_i], 76 dst[dst_i]): 77 src_i += 1 78 79 elif to_add > 0: 80 yield {'op': 'add', 81 'path': _format_pointer([*pointer, str(dst_i)]), 82 'value': dst[dst_i]} 83 to_add -= 1 84 85 else: 86 yield from _diff(src[src_i], dst[dst_i], 87 [*pointer, str(dst_i)]) 88 src_i += 1 89 90 elif isinstance(src, dict) and isinstance(dst, dict): 91 92 if not src and not dst: 93 return 94 95 if not src or not dst: 96 yield {'op': 'replace', 97 'path': _format_pointer(pointer), 98 'value': dst} 99 return 100 101 for k in src: 102 103 if k not in dst: 104 yield {'op': 'remove', 'path': _format_pointer([*pointer, k])} 105 106 for k in dst: 107 108 if k not in src: 109 yield {'op': 'add', 110 'path': _format_pointer([*pointer, k]), 111 'value': dst[k]} 112 113 else: 114 yield from _diff(src[k], dst[k], [*pointer, k]) 115 116 else: 117 yield {'op': 'replace', 118 'path': _format_pointer(pointer), 119 'value': dst} 120 121 122def patch(data: Data, 123 diff: Data 124 ) -> Data: 125 """Apply JSON Patch diff. 126 127 Example:: 128 129 data = [1, {'a': 2}, 3] 130 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 131 result = patch(data, d) 132 assert result == [1, {'a': 4}, 3] 133 134 """ 135 for op in diff: 136 data = _apply_op(data, op) 137 138 return data 139 140 141def _apply_op(data: Data, op: Data) -> Data: 142 143 if op['op'] == 'add': 144 path = _parse_pointer(op['path']) 145 return _add(data, path, op['value']) 146 147 if op['op'] == 'remove': 148 path = _parse_pointer(op['path']) 149 return _remove(data, path) 150 151 if op['op'] == 'replace': 152 path = _parse_pointer(op['path']) 153 return _replace(data, path, op['value']) 154 155 if op['op'] == 'move': 156 from_path = _parse_pointer(op['from']) 157 to_path = _parse_pointer(op['path']) 158 return _move(data, from_path, to_path) 159 160 if op['op'] == 'copy': 161 from_path = _parse_pointer(op['from']) 162 to_path = _parse_pointer(op['path']) 163 return _copy(data, from_path, to_path) 164 165 if op['op'] == 'test': 166 path = _parse_pointer(op['path']) 167 return _test(data, path, op['value']) 168 169 raise ValueError('unsupported operation') 170 171 172def _add(data: Data, path: _Pointer, value: Data) -> Data: 173 174 if not path: 175 return value 176 177 key, *rest = path 178 179 if isinstance(data, list): 180 if rest: 181 idx = int(key) 182 if not 0 <= idx < len(data): 183 raise ValueError('invalid array index') 184 185 return [*data[:idx], _add(data[idx], rest, value), *data[idx+1:]] 186 187 else: 188 if key == '-': 189 return [*data, value] 190 191 idx = int(key) 192 if not 0 <= idx <= len(data): 193 raise ValueError('invalid array index') 194 195 return [*data[:idx], value, *data[idx:]] 196 197 if isinstance(data, dict): 198 if rest: 199 if key not in data: 200 raise ValueError('invalid object key') 201 202 return {**data, key: _add(data[key], rest, value)} 203 204 else: 205 return {**data, key: value} 206 207 raise ValueError('invalid data type') 208 209 210def _remove(data: Data, path: _Pointer) -> Data: 211 212 if not path: 213 return None 214 215 key, *rest = path 216 217 if isinstance(data, list): 218 idx = int(key) 219 if not 0 <= idx < len(data): 220 raise ValueError('invalid array index') 221 222 if rest: 223 return [*data[:idx], _remove(data[idx], rest), *data[idx+1:]] 224 else: 225 return [*data[:idx], *data[idx+1:]] 226 227 if isinstance(data, dict): 228 if key not in data: 229 raise ValueError('invalid object key') 230 231 if rest: 232 return {**data, key: _remove(data[key], rest)} 233 else: 234 return {k: v for k, v in data.items() if k != key} 235 236 237def _replace(data: Data, path: _Pointer, value: Data) -> Data: 238 239 if not path: 240 return value 241 242 key, *rest = path 243 244 if isinstance(data, list): 245 idx = int(key) 246 if not 0 <= idx < len(data): 247 raise ValueError('invalid array index') 248 249 if rest: 250 return [*data[:idx], _replace(data[idx], rest, value), 251 *data[idx+1:]] 252 else: 253 return [*data[:idx], value, *data[idx+1:]] 254 255 if isinstance(data, dict): 256 if key not in data: 257 raise ValueError('invalid object key') 258 259 if rest: 260 return {**data, key: _replace(data[key], rest, value)} 261 else: 262 return {**data, key: value} 263 264 265def _move(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data: 266 if len(to_path) > len(from_path) and from_path == to_path[:len(from_path)]: 267 raise ValueError("path can't be child of from") 268 269 value = _get(data, from_path) 270 return _add(_remove(data, from_path), to_path, value) 271 272 273def _copy(data: Data, from_path: _Pointer, to_path: _Pointer) -> Data: 274 value = _get(data, from_path) 275 return _add(data, to_path, value) 276 277 278def _test(data: Data, path: _Pointer, value: Data) -> Data: 279 if not equals(value, _get(data, path)): 280 raise ValueError('invalid value') 281 282 283def _get(data: Data, path: _Pointer) -> Data: 284 285 if not path: 286 return data 287 288 key, *rest = path 289 290 if isinstance(data, list): 291 idx = int(key) 292 if not 0 <= idx < len(data): 293 raise ValueError('invalid array index') 294 295 return _get(data[idx], rest) 296 297 if isinstance(data, dict): 298 if key not in data: 299 raise ValueError('invalid object key') 300 301 return _get(data[key], rest) 302 303 raise ValueError('invalid data type') 304 305 306def _format_pointer(pointer: _Pointer) -> str: 307 if not pointer: 308 return '' 309 310 return '/' + '/'.join(_escape_pointer_segment(i) for i in pointer) 311 312 313def _parse_pointer(pointer: str) -> _Pointer: 314 if pointer == '': 315 return [] 316 317 segments = pointer.split('/') 318 if segments[0] != '': 319 raise ValueError('invalid pointer') 320 321 return [_unescape_pointer_segment(i) for i in segments[1:]] 322 323 324def _escape_pointer_segment(segment: str) -> str: 325 return segment.replace('~', '~0').replace('/', '~1') 326 327 328def _unescape_pointer_segment(segment: str) -> str: 329 return segment.replace('~1', '/').replace('~0', '~') 330 331 332def _shallow_equals(a: Data, b: Data): 333 if isinstance(a, (list, dict)) or isinstance(b, (list, dict)): 334 return a is b 335 336 return equals(a, b)
def
diff( src: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], dst: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
12def diff(src: Data, 13 dst: Data 14 ) -> Data: 15 """Generate JSON Patch diff. 16 17 Example:: 18 19 src = [1, {'a': 2}, 3] 20 dst = [1, {'a': 4}, 3] 21 result = diff(src, dst) 22 assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}] 23 24 """ 25 return list(_diff(src, dst))
Generate JSON Patch diff.
Example::
src = [1, {'a': 2}, 3]
dst = [1, {'a': 4}, 3]
result = diff(src, dst)
assert result == [{'op': 'replace', 'path': '/1/a', 'value': 4}]
def
patch( data: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]], diff: Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]) -> Union[NoneType, bool, int, float, str, List[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Dict[str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]:
123def patch(data: Data, 124 diff: Data 125 ) -> Data: 126 """Apply JSON Patch diff. 127 128 Example:: 129 130 data = [1, {'a': 2}, 3] 131 d = [{'op': 'replace', 'path': '/1/a', 'value': 4}] 132 result = patch(data, d) 133 assert result == [1, {'a': 4}, 3] 134 135 """ 136 for op in diff: 137 data = _apply_op(data, op) 138 139 return data
Apply JSON Patch diff.
Example::
data = [1, {'a': 2}, 3]
d = [{'op': 'replace', 'path': '/1/a', 'value': 4}]
result = patch(data, d)
assert result == [1, {'a': 4}, 3]