1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

...

3 Commits

Author SHA1 Message Date
immerrr again
b28316ab10 Enable brotli decompression if it is available (#620)
* Enable brotli decompression if it is available

* Apply PR feedback
2025-12-05 16:50:36 -05:00
Mathieu Virbel
3f78330c1e fix: usage of io-like interface with VCR.py (#906)
* fix: usage of io-like interface with VCR.py

* Update tests/integration/test_aiohttp.py

Co-authored-by: Jair Henrique <jair.henrique@gmail.com>

---------

Co-authored-by: Jair Henrique <jair.henrique@gmail.com>
2025-12-05 15:45:49 -05:00
dependabot[bot]
e8818e5c0b build(deps): bump actions/checkout from 5 to 6 (#955)
Bumps [actions/checkout](https://github.com/actions/checkout) from 5 to 6.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-05 15:31:56 -05:00
9 changed files with 104 additions and 37 deletions

View File

@@ -17,6 +17,6 @@ jobs:
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v5 uses: actions/checkout@v6
- name: Codespell - name: Codespell
uses: codespell-project/actions-codespell@v2 uses: codespell-project/actions-codespell@v2

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-24.04 runs-on: ubuntu-24.04
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v6
- uses: actions/setup-python@v6 - uses: actions/setup-python@v6
with: with:
python-version: "3.12" python-version: "3.12"

View File

@@ -23,7 +23,7 @@ jobs:
- "pypy-3.11" - "pypy-3.11"
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v6
- name: Install uv - name: Install uv
uses: astral-sh/setup-uv@v7 uses: astral-sh/setup-uv@v7

View File

@@ -18,7 +18,7 @@ jobs:
name: Detect outdated pre-commit hooks name: Detect outdated pre-commit hooks
runs-on: ubuntu-24.04 runs-on: ubuntu-24.04
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v6
- name: Set up Python 3.12 - name: Set up Python 3.12
uses: actions/setup-python@v6 uses: actions/setup-python@v6

View File

@@ -13,7 +13,7 @@ jobs:
name: Run pre-commit name: Run pre-commit
runs-on: ubuntu-24.04 runs-on: ubuntu-24.04
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v6
- uses: actions/setup-python@v6 - uses: actions/setup-python@v6
with: with:
python-version: 3.12 python-version: 3.12

View File

@@ -1,3 +1,4 @@
import io
import logging import logging
import ssl import ssl
import urllib.parse import urllib.parse
@@ -462,3 +463,19 @@ def test_filter_query_parameters(tmpdir, httpbin):
cassette_content = f.read() cassette_content = f.read()
assert "password" not in cassette_content assert "password" not in cassette_content
assert "secret" not in cassette_content assert "secret" not in cassette_content
@pytest.mark.online
def test_use_cassette_with_io(tmpdir, caplog, httpbin):
url = httpbin.url + "/post"
# test without cassettes
data = io.BytesIO(b"hello")
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"
# test with cassettes
data = io.BytesIO(b"hello")
with vcr.use_cassette(str(tmpdir.join("post.yaml"))):
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"

View File

@@ -7,6 +7,7 @@ from urllib.request import Request, urlopen
import pytest import pytest
import vcr import vcr
from vcr.filters import brotli
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
@@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin):
assert_is_json_bytes(decoded_response) assert_is_json_bytes(decoded_response)
def test_decompress_brotli(tmpdir, httpbin):
if brotli is None:
# XXX: this is never true, because brotlipy is installed with "httpbin"
pytest.skip("Brotli is not installed")
url = httpbin.url + "/brotli"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
cass_file = str(tmpdir.join("brotli_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json_bytes(decoded_response)
def test_decompress_regular(tmpdir, httpbin): def test_decompress_regular(tmpdir, httpbin):
"""Test that it doesn't try to decompress content that isn't compressed""" """Test that it doesn't try to decompress content that isn't compressed"""
url = httpbin.url + "/get" url = httpbin.url + "/get"

View File

@@ -6,6 +6,49 @@ from urllib.parse import urlencode, urlparse, urlunparse
from .util import CaseInsensitiveDict from .util import CaseInsensitiveDict
try:
# This supports both brotli & brotlipy packages
import brotli
except ImportError:
try:
import brotlicffi as brotli
except ImportError:
brotli = None
def decompress_deflate(body):
try:
return zlib.decompress(body)
except zlib.error:
# Assume the response was already decompressed
return body
def decompress_gzip(body):
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS = {
"deflate": decompress_deflate,
"gzip": decompress_gzip,
}
if brotli is not None:
def decompress_brotli(body):
try:
return brotli.decompress(body)
except brotli.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
def replace_headers(request, replacements): def replace_headers(request, replacements):
"""Replace headers in request according to replacements. """Replace headers in request according to replacements.
@@ -136,44 +179,29 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
def decode_response(response): def decode_response(response):
""" """
If the response is compressed with gzip or deflate: If the response is compressed with any supported compression (gzip,
deflate, br if available):
1. decompress the response body 1. decompress the response body
2. delete the content-encoding header 2. delete the content-encoding header
3. update content-length header to decompressed length 3. update content-length header to decompressed length
""" """
def is_compressed(headers):
encoding = headers.get("content-encoding", [])
return encoding and encoding[0] in ("gzip", "deflate")
def decompress_body(body, encoding):
"""Returns decompressed body according to encoding using zlib.
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
"""
if not body:
return ""
if encoding == "gzip":
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could # Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response. # be mutated by a shallow copy and corrupt the real response.
response = copy.deepcopy(response) response = copy.deepcopy(response)
headers = CaseInsensitiveDict(response["headers"]) headers = CaseInsensitiveDict(response["headers"])
if is_compressed(headers): content_encoding = headers.get("content-encoding")
encoding = headers["content-encoding"][0] if not content_encoding:
headers["content-encoding"].remove(encoding) return response
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
if not decompressor:
return response
headers["content-encoding"].remove(content_encoding[0])
if not headers["content-encoding"]: if not headers["content-encoding"]:
del headers["content-encoding"] del headers["content-encoding"]
new_body = decompress_body(response["body"]["string"], encoding) new_body = decompressor(response["body"]["string"])
response["body"]["string"] = new_body response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))] headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers) response["headers"] = dict(headers)

View File

@@ -20,6 +20,11 @@ class Request:
self._was_file = hasattr(body, "read") self._was_file = hasattr(body, "read")
self._was_iter = _is_nonsequence_iterator(body) self._was_iter = _is_nonsequence_iterator(body)
if self._was_file: if self._was_file:
if hasattr(body, "tell"):
tell = body.tell()
self.body = body.read()
body.seek(tell)
else:
self.body = body.read() self.body = body.read()
elif self._was_iter: elif self._was_iter:
self.body = list(body) self.body = list(body)