1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Merge pull request #723 from kevin1024/issue-719-compression-urllib3-v2

Make decompression robust towards already decompressed input (arguably fixes #719)
This commit is contained in:
Sebastian Pipping
2023-06-22 15:45:10 +02:00
committed by GitHub
3 changed files with 59 additions and 9 deletions

View File

@@ -167,20 +167,40 @@ def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
assert len(cass) == 2
def test_gzip(tmpdir, httpbin_both):
def test_gzip__decode_compressed_response_false(tmpdir, httpbin_both):
"""
Ensure that requests (actually urllib3) is able to automatically decompress
the response body
"""
for _ in range(2): # one for recording, one for re-playing
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = requests.get(httpbin_both + "/gzip")
assert response.headers["content-encoding"] == "gzip" # i.e. not removed
assert_is_json(response.content) # i.e. uncompressed bytes
def test_gzip__decode_compressed_response_true(tmpdir, httpbin_both):
url = httpbin_both + "/gzip"
response = requests.get(url)
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = requests.get(url)
assert_is_json(response.content)
expected_response = requests.get(url)
expected_content = expected_response.content
assert expected_response.headers["content-encoding"] == "gzip" # self-test
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
assert_is_json(response.content)
with vcr.use_cassette(
str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True
) as cassette:
r = requests.get(url)
assert r.headers["content-encoding"] == "gzip" # i.e. not removed
assert r.content == expected_content
# Has the cassette body been decompressed?
cassette_response_body = cassette.responses[0]["body"]["string"]
assert isinstance(cassette_response_body, str)
with vcr.use_cassette(str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True):
r = requests.get(url)
assert "content-encoding" not in r.headers # i.e. removed
assert r.content == expected_content
def test_session_and_connection_close(tmpdir, httpbin):

View File

@@ -298,6 +298,18 @@ def test_decode_response_deflate():
assert decoded_response["headers"]["content-length"] == [str(len(body))]
def test_decode_response_deflate_already_decompressed():
body = b"deflate message"
gzip_response = {
"body": {"string": body},
"headers": {
"content-encoding": ["deflate"],
},
}
decoded_response = decode_response(gzip_response)
assert decoded_response["body"]["string"] == body
def test_decode_response_gzip():
body = b"gzip message"
@@ -325,3 +337,15 @@ def test_decode_response_gzip():
decoded_response = decode_response(gzip_response)
assert decoded_response["body"]["string"] == body
assert decoded_response["headers"]["content-length"] == [str(len(body))]
def test_decode_response_gzip_already_decompressed():
body = b"gzip message"
gzip_response = {
"body": {"string": body},
"headers": {
"content-encoding": ["gzip"],
},
}
decoded_response = decode_response(gzip_response)
assert decoded_response["body"]["string"] == body

View File

@@ -153,9 +153,15 @@ def decode_response(response):
if not body:
return ""
if encoding == "gzip":
return zlib.decompress(body, zlib.MAX_WBITS | 16)
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
return zlib.decompress(body)
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.