mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-08 16:53:23 +00:00
Compare commits
2 Commits
b122b5c701
...
enable-dec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
83e360e99f | ||
|
|
878a032283 |
@@ -7,6 +7,7 @@ from urllib.request import Request, urlopen
|
||||
import pytest
|
||||
|
||||
import vcr
|
||||
from vcr.filters import brotli
|
||||
|
||||
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
|
||||
|
||||
@@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin):
|
||||
assert_is_json_bytes(decoded_response)
|
||||
|
||||
|
||||
def test_decompress_brotli(tmpdir, httpbin):
|
||||
if brotli is None:
|
||||
# XXX: this is never true, because brotlipy is installed with "httpbin"
|
||||
pytest.skip("Brotli is not installed")
|
||||
|
||||
url = httpbin.url + "/brotli"
|
||||
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
|
||||
cass_file = str(tmpdir.join("brotli_response.yaml"))
|
||||
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||
urlopen(request)
|
||||
with vcr.use_cassette(cass_file) as cass:
|
||||
decoded_response = urlopen(url).read()
|
||||
assert_cassette_has_one_response(cass)
|
||||
assert_is_json_bytes(decoded_response)
|
||||
|
||||
|
||||
def test_decompress_regular(tmpdir, httpbin):
|
||||
"""Test that it doesn't try to decompress content that isn't compressed"""
|
||||
url = httpbin.url + "/get"
|
||||
|
||||
@@ -6,6 +6,49 @@ from urllib.parse import urlencode, urlparse, urlunparse
|
||||
|
||||
from .util import CaseInsensitiveDict
|
||||
|
||||
try:
|
||||
# This supports both brotli & brotlipy packages
|
||||
import brotli
|
||||
except ImportError:
|
||||
try:
|
||||
import brotlicffi as brotli
|
||||
except ImportError:
|
||||
brotli = None
|
||||
|
||||
|
||||
def decompress_deflate(body):
|
||||
try:
|
||||
return zlib.decompress(body)
|
||||
except zlib.error:
|
||||
# Assume the response was already decompressed
|
||||
return body
|
||||
|
||||
|
||||
def decompress_gzip(body):
|
||||
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
|
||||
try:
|
||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||
except zlib.error:
|
||||
# Assume the response was already decompressed
|
||||
return body
|
||||
|
||||
|
||||
AVAILABLE_DECOMPRESSORS = {
|
||||
"deflate": decompress_deflate,
|
||||
"gzip": decompress_gzip,
|
||||
}
|
||||
|
||||
if brotli is not None:
|
||||
|
||||
def decompress_brotli(body):
|
||||
try:
|
||||
return brotli.decompress(body)
|
||||
except brotli.error:
|
||||
# Assume the response was already decompressed
|
||||
return body
|
||||
|
||||
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
|
||||
|
||||
|
||||
def replace_headers(request, replacements):
|
||||
"""Replace headers in request according to replacements.
|
||||
@@ -136,45 +179,30 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
|
||||
|
||||
def decode_response(response):
|
||||
"""
|
||||
If the response is compressed with gzip or deflate:
|
||||
If the response is compressed with any supported compression (gzip,
|
||||
deflate, br if available):
|
||||
1. decompress the response body
|
||||
2. delete the content-encoding header
|
||||
3. update content-length header to decompressed length
|
||||
"""
|
||||
|
||||
def is_compressed(headers):
|
||||
encoding = headers.get("content-encoding", [])
|
||||
return encoding and encoding[0] in ("gzip", "deflate")
|
||||
|
||||
def decompress_body(body, encoding):
|
||||
"""Returns decompressed body according to encoding using zlib.
|
||||
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
|
||||
"""
|
||||
if not body:
|
||||
return ""
|
||||
if encoding == "gzip":
|
||||
try:
|
||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||
except zlib.error:
|
||||
return body # assumes that the data was already decompressed
|
||||
else: # encoding == 'deflate'
|
||||
try:
|
||||
return zlib.decompress(body)
|
||||
except zlib.error:
|
||||
return body # assumes that the data was already decompressed
|
||||
|
||||
# Deepcopy here in case `headers` contain objects that could
|
||||
# be mutated by a shallow copy and corrupt the real response.
|
||||
response = copy.deepcopy(response)
|
||||
headers = CaseInsensitiveDict(response["headers"])
|
||||
if is_compressed(headers):
|
||||
encoding = headers["content-encoding"][0]
|
||||
headers["content-encoding"].remove(encoding)
|
||||
if not headers["content-encoding"]:
|
||||
del headers["content-encoding"]
|
||||
content_encoding = headers.get("content-encoding")
|
||||
if not content_encoding:
|
||||
return response
|
||||
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
|
||||
if not decompressor:
|
||||
return response
|
||||
|
||||
new_body = decompress_body(response["body"]["string"], encoding)
|
||||
response["body"]["string"] = new_body
|
||||
headers["content-length"] = [str(len(new_body))]
|
||||
response["headers"] = dict(headers)
|
||||
headers["content-encoding"].remove(content_encoding[0])
|
||||
if not headers["content-encoding"]:
|
||||
del headers["content-encoding"]
|
||||
|
||||
new_body = decompressor(response["body"]["string"])
|
||||
response["body"]["string"] = new_body
|
||||
headers["content-length"] = [str(len(new_body))]
|
||||
response["headers"] = dict(headers)
|
||||
return response
|
||||
|
||||
Reference in New Issue
Block a user