diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index f036dd4..c2a0a23 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -7,6 +7,7 @@ from urllib.request import Request, urlopen import pytest import vcr +from vcr.filters import brotli from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes @@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin): assert_is_json_bytes(decoded_response) +def test_decompress_brotli(tmpdir, httpbin): + if brotli is None: + # XXX: this is never true, because brotlipy is installed with "httpbin" + pytest.skip("Brotli is not installed") + + url = httpbin.url + "/brotli" + request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]}) + cass_file = str(tmpdir.join("brotli_response.yaml")) + with vcr.use_cassette(cass_file, decode_compressed_response=True): + urlopen(request) + with vcr.use_cassette(cass_file) as cass: + decoded_response = urlopen(url).read() + assert_cassette_has_one_response(cass) + assert_is_json_bytes(decoded_response) + + def test_decompress_regular(tmpdir, httpbin): """Test that it doesn't try to decompress content that isn't compressed""" url = httpbin.url + "/get" diff --git a/vcr/filters.py b/vcr/filters.py index 7f33155..efb9b29 100644 --- a/vcr/filters.py +++ b/vcr/filters.py @@ -6,6 +6,20 @@ from urllib.parse import urlencode, urlparse, urlunparse from .util import CaseInsensitiveDict +try: + # This supports both brotli & brotlipy packages + import brotli +except ImportError: + try: + import brotlicffi as brotli + except ImportError: + brotli = None + + +AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"} +if brotli is not None: + AVAILABLE_DECOMPRESSORS.add("br") + def replace_headers(request, replacements): """Replace headers in request according to replacements. @@ -136,15 +150,16 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove): def decode_response(response): """ - If the response is compressed with gzip or deflate: + If the response is compressed with any supported compression (gzip, + deflate, br if available): 1. decompress the response body 2. delete the content-encoding header 3. update content-length header to decompressed length """ - def is_compressed(headers): + def is_decompressable(headers): encoding = headers.get("content-encoding", []) - return encoding and encoding[0] in ("gzip", "deflate") + return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS def decompress_body(body, encoding): """Returns decompressed body according to encoding using zlib. @@ -157,17 +172,23 @@ def decode_response(response): return zlib.decompress(body, zlib.MAX_WBITS | 16) except zlib.error: return body # assumes that the data was already decompressed - else: # encoding == 'deflate' + elif encoding == 'deflate': try: return zlib.decompress(body) except zlib.error: return body # assumes that the data was already decompressed + else: # encoding == 'br' + try: + return brotli.decompress(body) + except brotli.error: + return body # assumes that the data was already decompressed + # Deepcopy here in case `headers` contain objects that could # be mutated by a shallow copy and corrupt the real response. response = copy.deepcopy(response) headers = CaseInsensitiveDict(response["headers"]) - if is_compressed(headers): + if is_decompressable(headers): encoding = headers["content-encoding"][0] headers["content-encoding"].remove(encoding) if not headers["content-encoding"]: