mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-09 09:13:23 +00:00
Enable brotli decompression if it is available
This commit is contained in:
@@ -7,6 +7,7 @@ from urllib.request import Request, urlopen
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import vcr
|
import vcr
|
||||||
|
from vcr.filters import brotli
|
||||||
|
|
||||||
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
|
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
|
||||||
|
|
||||||
@@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin):
|
|||||||
assert_is_json_bytes(decoded_response)
|
assert_is_json_bytes(decoded_response)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decompress_brotli(tmpdir, httpbin):
|
||||||
|
if brotli is None:
|
||||||
|
# XXX: this is never true, because brotlipy is installed with "httpbin"
|
||||||
|
pytest.skip("Brotli is not installed")
|
||||||
|
|
||||||
|
url = httpbin.url + "/brotli"
|
||||||
|
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
|
||||||
|
cass_file = str(tmpdir.join("brotli_response.yaml"))
|
||||||
|
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||||
|
urlopen(request)
|
||||||
|
with vcr.use_cassette(cass_file) as cass:
|
||||||
|
decoded_response = urlopen(url).read()
|
||||||
|
assert_cassette_has_one_response(cass)
|
||||||
|
assert_is_json_bytes(decoded_response)
|
||||||
|
|
||||||
|
|
||||||
def test_decompress_regular(tmpdir, httpbin):
|
def test_decompress_regular(tmpdir, httpbin):
|
||||||
"""Test that it doesn't try to decompress content that isn't compressed"""
|
"""Test that it doesn't try to decompress content that isn't compressed"""
|
||||||
url = httpbin.url + "/get"
|
url = httpbin.url + "/get"
|
||||||
|
|||||||
@@ -6,6 +6,20 @@ from urllib.parse import urlencode, urlparse, urlunparse
|
|||||||
|
|
||||||
from .util import CaseInsensitiveDict
|
from .util import CaseInsensitiveDict
|
||||||
|
|
||||||
|
try:
|
||||||
|
# This supports both brotli & brotlipy packages
|
||||||
|
import brotli
|
||||||
|
except ImportError:
|
||||||
|
try:
|
||||||
|
import brotlicffi as brotli
|
||||||
|
except ImportError:
|
||||||
|
brotli = None
|
||||||
|
|
||||||
|
|
||||||
|
AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"}
|
||||||
|
if brotli is not None:
|
||||||
|
AVAILABLE_DECOMPRESSORS.add("br")
|
||||||
|
|
||||||
|
|
||||||
def replace_headers(request, replacements):
|
def replace_headers(request, replacements):
|
||||||
"""Replace headers in request according to replacements.
|
"""Replace headers in request according to replacements.
|
||||||
@@ -136,15 +150,16 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
|
|||||||
|
|
||||||
def decode_response(response):
|
def decode_response(response):
|
||||||
"""
|
"""
|
||||||
If the response is compressed with gzip or deflate:
|
If the response is compressed with any supported compression (gzip,
|
||||||
|
deflate, br if available):
|
||||||
1. decompress the response body
|
1. decompress the response body
|
||||||
2. delete the content-encoding header
|
2. delete the content-encoding header
|
||||||
3. update content-length header to decompressed length
|
3. update content-length header to decompressed length
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def is_compressed(headers):
|
def is_decompressable(headers):
|
||||||
encoding = headers.get("content-encoding", [])
|
encoding = headers.get("content-encoding", [])
|
||||||
return encoding and encoding[0] in ("gzip", "deflate")
|
return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS
|
||||||
|
|
||||||
def decompress_body(body, encoding):
|
def decompress_body(body, encoding):
|
||||||
"""Returns decompressed body according to encoding using zlib.
|
"""Returns decompressed body according to encoding using zlib.
|
||||||
@@ -157,17 +172,23 @@ def decode_response(response):
|
|||||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||||
except zlib.error:
|
except zlib.error:
|
||||||
return body # assumes that the data was already decompressed
|
return body # assumes that the data was already decompressed
|
||||||
else: # encoding == 'deflate'
|
elif encoding == 'deflate':
|
||||||
try:
|
try:
|
||||||
return zlib.decompress(body)
|
return zlib.decompress(body)
|
||||||
except zlib.error:
|
except zlib.error:
|
||||||
return body # assumes that the data was already decompressed
|
return body # assumes that the data was already decompressed
|
||||||
|
else: # encoding == 'br'
|
||||||
|
try:
|
||||||
|
return brotli.decompress(body)
|
||||||
|
except brotli.error:
|
||||||
|
return body # assumes that the data was already decompressed
|
||||||
|
|
||||||
|
|
||||||
# Deepcopy here in case `headers` contain objects that could
|
# Deepcopy here in case `headers` contain objects that could
|
||||||
# be mutated by a shallow copy and corrupt the real response.
|
# be mutated by a shallow copy and corrupt the real response.
|
||||||
response = copy.deepcopy(response)
|
response = copy.deepcopy(response)
|
||||||
headers = CaseInsensitiveDict(response["headers"])
|
headers = CaseInsensitiveDict(response["headers"])
|
||||||
if is_compressed(headers):
|
if is_decompressable(headers):
|
||||||
encoding = headers["content-encoding"][0]
|
encoding = headers["content-encoding"][0]
|
||||||
headers["content-encoding"].remove(encoding)
|
headers["content-encoding"].remove(encoding)
|
||||||
if not headers["content-encoding"]:
|
if not headers["content-encoding"]:
|
||||||
|
|||||||
Reference in New Issue
Block a user