From b28316ab103a3ffc9fc79d6c0fcb556903f0ceb7 Mon Sep 17 00:00:00 2001 From: immerrr again Date: Fri, 5 Dec 2025 22:50:36 +0100 Subject: [PATCH] Enable brotli decompression if it is available (#620) * Enable brotli decompression if it is available * Apply PR feedback --- tests/integration/test_filter.py | 17 ++++++ vcr/filters.py | 90 +++++++++++++++++++++----------- 2 files changed, 76 insertions(+), 31 deletions(-) diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index f036dd4..c2a0a23 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -7,6 +7,7 @@ from urllib.request import Request, urlopen import pytest import vcr +from vcr.filters import brotli from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes @@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin): assert_is_json_bytes(decoded_response) +def test_decompress_brotli(tmpdir, httpbin): + if brotli is None: + # XXX: this is never true, because brotlipy is installed with "httpbin" + pytest.skip("Brotli is not installed") + + url = httpbin.url + "/brotli" + request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]}) + cass_file = str(tmpdir.join("brotli_response.yaml")) + with vcr.use_cassette(cass_file, decode_compressed_response=True): + urlopen(request) + with vcr.use_cassette(cass_file) as cass: + decoded_response = urlopen(url).read() + assert_cassette_has_one_response(cass) + assert_is_json_bytes(decoded_response) + + def test_decompress_regular(tmpdir, httpbin): """Test that it doesn't try to decompress content that isn't compressed""" url = httpbin.url + "/get" diff --git a/vcr/filters.py b/vcr/filters.py index 7f33155..2f97d09 100644 --- a/vcr/filters.py +++ b/vcr/filters.py @@ -6,6 +6,49 @@ from urllib.parse import urlencode, urlparse, urlunparse from .util import CaseInsensitiveDict +try: + # This supports both brotli & brotlipy packages + import brotli +except ImportError: + try: + import brotlicffi as brotli + except ImportError: + brotli = None + + +def decompress_deflate(body): + try: + return zlib.decompress(body) + except zlib.error: + # Assume the response was already decompressed + return body + + +def decompress_gzip(body): + # To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16. + try: + return zlib.decompress(body, zlib.MAX_WBITS | 16) + except zlib.error: + # Assume the response was already decompressed + return body + + +AVAILABLE_DECOMPRESSORS = { + "deflate": decompress_deflate, + "gzip": decompress_gzip, +} + +if brotli is not None: + + def decompress_brotli(body): + try: + return brotli.decompress(body) + except brotli.error: + # Assume the response was already decompressed + return body + + AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli + def replace_headers(request, replacements): """Replace headers in request according to replacements. @@ -136,45 +179,30 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove): def decode_response(response): """ - If the response is compressed with gzip or deflate: + If the response is compressed with any supported compression (gzip, + deflate, br if available): 1. decompress the response body 2. delete the content-encoding header 3. update content-length header to decompressed length """ - def is_compressed(headers): - encoding = headers.get("content-encoding", []) - return encoding and encoding[0] in ("gzip", "deflate") - - def decompress_body(body, encoding): - """Returns decompressed body according to encoding using zlib. - to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16 - """ - if not body: - return "" - if encoding == "gzip": - try: - return zlib.decompress(body, zlib.MAX_WBITS | 16) - except zlib.error: - return body # assumes that the data was already decompressed - else: # encoding == 'deflate' - try: - return zlib.decompress(body) - except zlib.error: - return body # assumes that the data was already decompressed - # Deepcopy here in case `headers` contain objects that could # be mutated by a shallow copy and corrupt the real response. response = copy.deepcopy(response) headers = CaseInsensitiveDict(response["headers"]) - if is_compressed(headers): - encoding = headers["content-encoding"][0] - headers["content-encoding"].remove(encoding) - if not headers["content-encoding"]: - del headers["content-encoding"] + content_encoding = headers.get("content-encoding") + if not content_encoding: + return response + decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0]) + if not decompressor: + return response - new_body = decompress_body(response["body"]["string"], encoding) - response["body"]["string"] = new_body - headers["content-length"] = [str(len(new_body))] - response["headers"] = dict(headers) + headers["content-encoding"].remove(content_encoding[0]) + if not headers["content-encoding"]: + del headers["content-encoding"] + + new_body = decompressor(response["body"]["string"]) + response["body"]["string"] = new_body + headers["content-length"] = [str(len(new_body))] + response["headers"] = dict(headers) return response