From 83e360e99f44b4bdb4b0297c1ed2dec514059c59 Mon Sep 17 00:00:00 2001 From: immerrr Date: Wed, 30 Apr 2025 09:30:29 +0200 Subject: [PATCH] Apply PR feedback --- vcr/filters.py | 83 +++++++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 38 deletions(-) diff --git a/vcr/filters.py b/vcr/filters.py index efb9b29..2f97d09 100644 --- a/vcr/filters.py +++ b/vcr/filters.py @@ -16,9 +16,38 @@ except ImportError: brotli = None -AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"} +def decompress_deflate(body): + try: + return zlib.decompress(body) + except zlib.error: + # Assume the response was already decompressed + return body + + +def decompress_gzip(body): + # To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16. + try: + return zlib.decompress(body, zlib.MAX_WBITS | 16) + except zlib.error: + # Assume the response was already decompressed + return body + + +AVAILABLE_DECOMPRESSORS = { + "deflate": decompress_deflate, + "gzip": decompress_gzip, +} + if brotli is not None: - AVAILABLE_DECOMPRESSORS.add("br") + + def decompress_brotli(body): + try: + return brotli.decompress(body) + except brotli.error: + # Assume the response was already decompressed + return body + + AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli def replace_headers(request, replacements): @@ -157,45 +186,23 @@ def decode_response(response): 3. update content-length header to decompressed length """ - def is_decompressable(headers): - encoding = headers.get("content-encoding", []) - return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS - - def decompress_body(body, encoding): - """Returns decompressed body according to encoding using zlib. - to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16 - """ - if not body: - return "" - if encoding == "gzip": - try: - return zlib.decompress(body, zlib.MAX_WBITS | 16) - except zlib.error: - return body # assumes that the data was already decompressed - elif encoding == 'deflate': - try: - return zlib.decompress(body) - except zlib.error: - return body # assumes that the data was already decompressed - else: # encoding == 'br' - try: - return brotli.decompress(body) - except brotli.error: - return body # assumes that the data was already decompressed - - # Deepcopy here in case `headers` contain objects that could # be mutated by a shallow copy and corrupt the real response. response = copy.deepcopy(response) headers = CaseInsensitiveDict(response["headers"]) - if is_decompressable(headers): - encoding = headers["content-encoding"][0] - headers["content-encoding"].remove(encoding) - if not headers["content-encoding"]: - del headers["content-encoding"] + content_encoding = headers.get("content-encoding") + if not content_encoding: + return response + decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0]) + if not decompressor: + return response - new_body = decompress_body(response["body"]["string"], encoding) - response["body"]["string"] = new_body - headers["content-length"] = [str(len(new_body))] - response["headers"] = dict(headers) + headers["content-encoding"].remove(content_encoding[0]) + if not headers["content-encoding"]: + del headers["content-encoding"] + + new_body = decompressor(response["body"]["string"]) + response["body"]["string"] = new_body + headers["content-length"] = [str(len(new_body))] + response["headers"] = dict(headers) return response