1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
pre-commit
c7b5a53307 pre-commit: Autoupdate 2025-11-21 16:06:32 +00:00
11 changed files with 41 additions and 120 deletions

View File

@@ -17,6 +17,6 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v5
- name: Codespell
uses: codespell-project/actions-codespell@v2

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: "3.12"

View File

@@ -23,7 +23,7 @@ jobs:
- "pypy-3.11"
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v7

View File

@@ -18,7 +18,7 @@ jobs:
name: Detect outdated pre-commit hooks
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Set up Python 3.12
uses: actions/setup-python@v6

View File

@@ -13,7 +13,7 @@ jobs:
name: Run pre-commit
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: 3.12

View File

@@ -7,21 +7,9 @@ For a full list of triaged issues, bugs and PRs and what release they are target
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
- 8.0.0
- BREAKING: Drop support for Python 3.9 (major version bump) - thanks @jairhenrique
- BREAKING: Drop support for urllib3 < 2 - fixes CVE warnings from urllib3 1.x (#926, #880) - thanks @jairhenrique
- New feature: ``drop_unused_requests`` option to remove unused interactions from cassettes (#763) - thanks @danielnsilva
- Rewrite httpx support to patch httpcore instead of httpx (#943) - thanks @seowalex
- Fixes ``httpx.ResponseNotRead`` exceptions (#832, #834)
- Fixes ``KeyError: 'follow_redirects'`` (#945)
- Adds support for custom httpx transports
- Fix HTTPS proxy handling - proxy address no longer ends up in cassette URIs (#809, #914) - thanks @alga
- Fix ``iscoroutinefunction`` deprecation warning on Python 3.14 - thanks @kloczek
- Only log message if response is appended - thanks @talfus-laddus
- Optimize urllib.parse calls - thanks @Martin-Brunthaler
- Fix CI for Ubuntu 24.04 - thanks @hartwork
- Various CI improvements: migrate to uv, update GitHub Actions - thanks @jairhenrique
- Various linting and test improvements - thanks @jairhenrique and @hartwork
- Unreleased
- Drop support for Python 3.9
- Drop support for urllib3 < 2
- 7.0.0
- Drop support for python 3.8 (major version bump) - thanks @jairhenrique

View File

@@ -1,4 +1,3 @@
import io
import logging
import ssl
import urllib.parse
@@ -463,19 +462,3 @@ def test_filter_query_parameters(tmpdir, httpbin):
cassette_content = f.read()
assert "password" not in cassette_content
assert "secret" not in cassette_content
@pytest.mark.online
def test_use_cassette_with_io(tmpdir, caplog, httpbin):
url = httpbin.url + "/post"
# test without cassettes
data = io.BytesIO(b"hello")
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"
# test with cassettes
data = io.BytesIO(b"hello")
with vcr.use_cassette(str(tmpdir.join("post.yaml"))):
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"

View File

@@ -7,7 +7,6 @@ from urllib.request import Request, urlopen
import pytest
import vcr
from vcr.filters import brotli
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
@@ -139,22 +138,6 @@ def test_decompress_deflate(tmpdir, httpbin):
assert_is_json_bytes(decoded_response)
def test_decompress_brotli(tmpdir, httpbin):
if brotli is None:
# XXX: this is never true, because brotlipy is installed with "httpbin"
pytest.skip("Brotli is not installed")
url = httpbin.url + "/brotli"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
cass_file = str(tmpdir.join("brotli_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json_bytes(decoded_response)
def test_decompress_regular(tmpdir, httpbin):
"""Test that it doesn't try to decompress content that isn't compressed"""
url = httpbin.url + "/get"

View File

@@ -4,7 +4,7 @@ from logging import NullHandler
from .config import VCR
from .record_mode import RecordMode as mode # noqa: F401
__version__ = "8.0.0"
__version__ = "7.0.0"
logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -6,49 +6,6 @@ from urllib.parse import urlencode, urlparse, urlunparse
from .util import CaseInsensitiveDict
try:
# This supports both brotli & brotlipy packages
import brotli
except ImportError:
try:
import brotlicffi as brotli
except ImportError:
brotli = None
def decompress_deflate(body):
try:
return zlib.decompress(body)
except zlib.error:
# Assume the response was already decompressed
return body
def decompress_gzip(body):
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS = {
"deflate": decompress_deflate,
"gzip": decompress_gzip,
}
if brotli is not None:
def decompress_brotli(body):
try:
return brotli.decompress(body)
except brotli.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
def replace_headers(request, replacements):
"""Replace headers in request according to replacements.
@@ -179,30 +136,45 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
def decode_response(response):
"""
If the response is compressed with any supported compression (gzip,
deflate, br if available):
If the response is compressed with gzip or deflate:
1. decompress the response body
2. delete the content-encoding header
3. update content-length header to decompressed length
"""
def is_compressed(headers):
encoding = headers.get("content-encoding", [])
return encoding and encoding[0] in ("gzip", "deflate")
def decompress_body(body, encoding):
"""Returns decompressed body according to encoding using zlib.
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
"""
if not body:
return ""
if encoding == "gzip":
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.
response = copy.deepcopy(response)
headers = CaseInsensitiveDict(response["headers"])
content_encoding = headers.get("content-encoding")
if not content_encoding:
return response
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
if not decompressor:
return response
if is_compressed(headers):
encoding = headers["content-encoding"][0]
headers["content-encoding"].remove(encoding)
if not headers["content-encoding"]:
del headers["content-encoding"]
headers["content-encoding"].remove(content_encoding[0])
if not headers["content-encoding"]:
del headers["content-encoding"]
new_body = decompressor(response["body"]["string"])
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
new_body = decompress_body(response["body"]["string"], encoding)
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
return response

View File

@@ -20,12 +20,7 @@ class Request:
self._was_file = hasattr(body, "read")
self._was_iter = _is_nonsequence_iterator(body)
if self._was_file:
if hasattr(body, "tell"):
tell = body.tell()
self.body = body.read()
body.seek(tell)
else:
self.body = body.read()
self.body = body.read()
elif self._was_iter:
self.body = list(body)
else: