diff --git a/tests/integration/test_httpx.py b/tests/integration/test_httpx.py index d3883e2..e508f6d 100644 --- a/tests/integration/test_httpx.py +++ b/tests/integration/test_httpx.py @@ -5,6 +5,7 @@ import vcr asyncio = pytest.importorskip("asyncio") httpx = pytest.importorskip("httpx") +from ..assertions import assert_is_json_bytes @pytest.fixture(params=["https", "http"]) def scheme(request): @@ -207,22 +208,6 @@ def test_redirect(httpbin, yml, do_request): assert cassette_response.request.headers.items() == response.request.headers.items() -@pytest.mark.online -def test_work_with_gzipped_data(httpbin, do_request, yml): - url = httpbin.url + "/gzip?foo=bar" - headers = {"accept-encoding": "deflate, gzip"} - - with vcr.use_cassette(yml): - do_request(headers=headers)("GET", url) - - with vcr.use_cassette(yml) as cassette: - cassette_response = do_request(headers=headers)("GET", url) - - # Show we can read the content of the cassette. - assert cassette_response.json()['gzipped'] == True - assert cassette.play_count == 1 - - @pytest.mark.online @pytest.mark.parametrize("url", ["https://github.com/kevin1024/vcrpy/issues/" + str(i) for i in range(3, 6)]) def test_simple_fetching(do_request, yml, url): @@ -296,7 +281,7 @@ def test_stream(tmpdir, httpbin, do_request): ("httpx_old_format", "OK"), ], ) -def test_load_gzipped(do_request, cassette_name, reason): +def test_load_cassette_format(do_request, cassette_name, reason): mydir = os.path.dirname(os.path.realpath(__file__)) yml = f"{mydir}/cassettes/gzip_{cassette_name}.yaml" url = "https://httpbin.org/gzip" @@ -314,3 +299,48 @@ def test_load_gzipped(do_request, cassette_name, reason): assert cassette_response.status_code == 200 assert cassette_response.reason_phrase == reason + +def test_gzip__decode_compressed_response_false(tmpdir, httpbin, do_request): + """ + Ensure that httpx is able to automatically decompress the response body. + """ + for _ in range(2): # one for recording, one for re-playing + with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cassette: + response = do_request()("GET", httpbin + "/gzip") + assert response.headers["content-encoding"] == "gzip" # i.e. not removed + # The content stored in the cassette should be gzipped. + assert cassette.responses[0]["body"]["string"][:2] == b"\x1f\x8b" + assert_is_json_bytes(response.content) # i.e. uncompressed bytes + + +def test_gzip__decode_compressed_response_true(do_request, tmpdir, httpbin): + url = httpbin + "/gzip" + + expected_response = do_request()("GET", url) + expected_content = expected_response.content + assert expected_response.headers["content-encoding"] == "gzip" # self-test + + with vcr.use_cassette( + str(tmpdir.join("decode_compressed.yaml")), + decode_compressed_response=True, + ) as cassette: + r = do_request()("GET", url) + assert r.headers["content-encoding"] == "gzip" # i.e. not removed + content_length = r.headers["content-length"] + assert r.content == expected_content + + # Has the cassette body been decompressed? + cassette_response_body = cassette.responses[0]["body"]["string"] + assert isinstance(cassette_response_body, str) + + # Content should be JSON. + assert cassette_response_body[0:1] == "{" + + with vcr.use_cassette(str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True): + r = httpx.get(url) + assert "content-encoding" not in r.headers # i.e. removed + assert r.content == expected_content + + # As the content is uncompressed, it should have a bigger + # length than the compressed version. + assert r.headers["content-length"] > content_length diff --git a/vcr/stubs/httpx_stubs.py b/vcr/stubs/httpx_stubs.py index 477cf95..8fc63e7 100644 --- a/vcr/stubs/httpx_stubs.py +++ b/vcr/stubs/httpx_stubs.py @@ -38,17 +38,27 @@ def _transform_headers(httpx_response): async def _to_serialized_response(resp, aread): - if aread: - await resp.aread() - else: - resp.read() + # The content shouldn't already have been read in by HTTPX. + assert not hasattr(resp, "_decoder") - return { + # Retrieve the content, but without decoding it. + with patch.dict(resp.headers, {"Content-Encoding": ""}): + if aread: + await resp.aread() + else: + resp.read() + + result = { "status": dict(code=resp.status_code, message=resp.reason_phrase), "headers": _transform_headers(resp), "body": {"string": resp.content}, } + # As the content wasn't decoded, we restore the response to a state which + # will be capable of decoding the content for the consumer. + del resp._decoder + resp._content = resp._get_content_decoder().decode(resp.content) + return result def _from_serialized_headers(headers): """