1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-09 17:15:35 +00:00

Compare commits

..

3 Commits

Author SHA1 Message Date
Jair Henrique
3145827198 Fixes some tornado tests 2025-11-19 12:15:02 -03:00
Jair Henrique
93737a0e73 Enables SIM ruff lint 2025-11-19 11:18:52 -03:00
Jair Henrique
ab550d9a7a Drops Python 3.9 support 2025-11-19 11:14:59 -03:00
18 changed files with 257 additions and 354 deletions

View File

@@ -17,6 +17,6 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v5
- name: Codespell
uses: codespell-project/actions-codespell@v2

View File

@@ -10,7 +10,7 @@ jobs:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: "3.12"

View File

@@ -23,7 +23,7 @@ jobs:
- "pypy-3.11"
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v7

View File

@@ -18,7 +18,7 @@ jobs:
name: Detect outdated pre-commit hooks
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- name: Set up Python 3.12
uses: actions/setup-python@v6

View File

@@ -13,7 +13,7 @@ jobs:
name: Run pre-commit
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: 3.12

View File

@@ -3,14 +3,14 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.6
rev: v0.14.5
hooks:
- id: ruff
args: ["--output-format=full"]
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
rev: v5.0.0
hooks:
- id: check-merge-conflict
- id: end-of-file-fixer

View File

@@ -7,21 +7,9 @@ For a full list of triaged issues, bugs and PRs and what release they are target
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
- 8.0.0
- BREAKING: Drop support for Python 3.9 (major version bump) - thanks @jairhenrique
- BREAKING: Drop support for urllib3 < 2 - fixes CVE warnings from urllib3 1.x (#926, #880) - thanks @jairhenrique
- New feature: ``drop_unused_requests`` option to remove unused interactions from cassettes (#763) - thanks @danielnsilva
- Rewrite httpx support to patch httpcore instead of httpx (#943) - thanks @seowalex
- Fixes ``httpx.ResponseNotRead`` exceptions (#832, #834)
- Fixes ``KeyError: 'follow_redirects'`` (#945)
- Adds support for custom httpx transports
- Fix HTTPS proxy handling - proxy address no longer ends up in cassette URIs (#809, #914) - thanks @alga
- Fix ``iscoroutinefunction`` deprecation warning on Python 3.14 - thanks @kloczek
- Only log message if response is appended - thanks @talfus-laddus
- Optimize urllib.parse calls - thanks @Martin-Brunthaler
- Fix CI for Ubuntu 24.04 - thanks @hartwork
- Various CI improvements: migrate to uv, update GitHub Actions - thanks @jairhenrique
- Various linting and test improvements - thanks @jairhenrique and @hartwork
- Unreleased
- Drop support for Python 3.9
- Drop support for urllib3 < 2
- 7.0.0
- Drop support for python 3.8 (major version bump) - thanks @jairhenrique

View File

@@ -22,7 +22,6 @@ The following HTTP libraries are supported:
- ``urllib2``
- ``urllib3``
- ``httpx``
- ``httpcore``
Speed
-----

View File

@@ -38,7 +38,6 @@ extras_require = {
"boto3",
"cryptography",
"httpbin",
"httpcore",
"httplib2",
"httpx",
"pycurl; platform_python_implementation !='PyPy'",

View File

@@ -1,4 +1,3 @@
import io
import logging
import ssl
import urllib.parse
@@ -463,19 +462,3 @@ def test_filter_query_parameters(tmpdir, httpbin):
cassette_content = f.read()
assert "password" not in cassette_content
assert "secret" not in cassette_content
@pytest.mark.online
def test_use_cassette_with_io(tmpdir, caplog, httpbin):
url = httpbin.url + "/post"
# test without cassettes
data = io.BytesIO(b"hello")
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"
# test with cassettes
data = io.BytesIO(b"hello")
with vcr.use_cassette(str(tmpdir.join("post.yaml"))):
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"

View File

@@ -7,7 +7,6 @@ from urllib.request import Request, urlopen
import pytest
import vcr
from vcr.filters import brotli
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
@@ -139,22 +138,6 @@ def test_decompress_deflate(tmpdir, httpbin):
assert_is_json_bytes(decoded_response)
def test_decompress_brotli(tmpdir, httpbin):
if brotli is None:
# XXX: this is never true, because brotlipy is installed with "httpbin"
pytest.skip("Brotli is not installed")
url = httpbin.url + "/brotli"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
cass_file = str(tmpdir.join("brotli_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json_bytes(decoded_response)
def test_decompress_regular(tmpdir, httpbin):
"""Test that it doesn't try to decompress content that isn't compressed"""
url = httpbin.url + "/get"

View File

@@ -4,7 +4,7 @@ from logging import NullHandler
from .config import VCR
from .record_mode import RecordMode as mode # noqa: F401
__version__ = "8.0.0"
__version__ = "7.0.0"
logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -225,10 +225,10 @@ class Cassette:
def append(self, request, response):
"""Add a request, response pair to this cassette"""
log.info("Appending request %s and response %s", request, response)
request = self._before_record_request(request)
if not request:
return
log.info("Appending request %s and response %s", request, response)
# Deepcopy is here because mutation of `response` will corrupt the
# real response.
response = copy.deepcopy(response)

View File

@@ -6,49 +6,6 @@ from urllib.parse import urlencode, urlparse, urlunparse
from .util import CaseInsensitiveDict
try:
# This supports both brotli & brotlipy packages
import brotli
except ImportError:
try:
import brotlicffi as brotli
except ImportError:
brotli = None
def decompress_deflate(body):
try:
return zlib.decompress(body)
except zlib.error:
# Assume the response was already decompressed
return body
def decompress_gzip(body):
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS = {
"deflate": decompress_deflate,
"gzip": decompress_gzip,
}
if brotli is not None:
def decompress_brotli(body):
try:
return brotli.decompress(body)
except brotli.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
def replace_headers(request, replacements):
"""Replace headers in request according to replacements.
@@ -179,29 +136,44 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
def decode_response(response):
"""
If the response is compressed with any supported compression (gzip,
deflate, br if available):
If the response is compressed with gzip or deflate:
1. decompress the response body
2. delete the content-encoding header
3. update content-length header to decompressed length
"""
def is_compressed(headers):
encoding = headers.get("content-encoding", [])
return encoding and encoding[0] in ("gzip", "deflate")
def decompress_body(body, encoding):
"""Returns decompressed body according to encoding using zlib.
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
"""
if not body:
return ""
if encoding == "gzip":
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.
response = copy.deepcopy(response)
headers = CaseInsensitiveDict(response["headers"])
content_encoding = headers.get("content-encoding")
if not content_encoding:
return response
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
if not decompressor:
return response
headers["content-encoding"].remove(content_encoding[0])
if is_compressed(headers):
encoding = headers["content-encoding"][0]
headers["content-encoding"].remove(encoding)
if not headers["content-encoding"]:
del headers["content-encoding"]
new_body = decompressor(response["body"]["string"])
new_body = decompress_body(response["body"]["string"], encoding)
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)

View File

@@ -92,12 +92,12 @@ else:
try:
import httpcore
import httpx
except ImportError: # pragma: no cover
pass
else:
_HttpcoreConnectionPool_handle_request = httpcore.ConnectionPool.handle_request
_HttpcoreAsyncConnectionPool_handle_async_request = httpcore.AsyncConnectionPool.handle_async_request
_HttpxSyncClient_send_single_request = httpx.Client._send_single_request
_HttpxAsyncClient_send_single_request = httpx.AsyncClient._send_single_request
class CassettePatcherBuilder:
@@ -121,7 +121,7 @@ class CassettePatcherBuilder:
self._httplib2(),
self._tornado(),
self._aiohttp(),
self._httpcore(),
self._httpx(),
self._build_patchers_from_mock_triples(self._cassette.custom_patches),
)
@@ -304,22 +304,19 @@ class CassettePatcherBuilder:
yield client.ClientSession, "_request", new_request
@_build_patchers_from_mock_triples_decorator
def _httpcore(self):
def _httpx(self):
try:
import httpcore
import httpx
except ImportError: # pragma: no cover
return
else:
from .stubs.httpcore_stubs import vcr_handle_async_request, vcr_handle_request
from .stubs.httpx_stubs import async_vcr_send, sync_vcr_send
new_handle_async_request = vcr_handle_async_request(
self._cassette,
_HttpcoreAsyncConnectionPool_handle_async_request,
)
yield httpcore.AsyncConnectionPool, "handle_async_request", new_handle_async_request
new_async_client_send = async_vcr_send(self._cassette, _HttpxAsyncClient_send_single_request)
yield httpx.AsyncClient, "_send_single_request", new_async_client_send
new_handle_request = vcr_handle_request(self._cassette, _HttpcoreConnectionPool_handle_request)
yield httpcore.ConnectionPool, "handle_request", new_handle_request
new_sync_client_send = sync_vcr_send(self._cassette, _HttpxSyncClient_send_single_request)
yield httpx.Client, "_send_single_request", new_sync_client_send
def _urllib3_patchers(self, cpool, conn, stubs):
http_connection_remover = ConnectionRemover(

View File

@@ -20,11 +20,6 @@ class Request:
self._was_file = hasattr(body, "read")
self._was_iter = _is_nonsequence_iterator(body)
if self._was_file:
if hasattr(body, "tell"):
tell = body.tell()
self.body = body.read()
body.seek(tell)
else:
self.body = body.read()
elif self._was_iter:
self.body = list(body)

View File

@@ -1,215 +0,0 @@
import asyncio
import functools
import logging
from collections import defaultdict
from collections.abc import AsyncIterable, Iterable
from httpcore import Response
from httpcore._models import ByteStream
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.filters import decode_response
from vcr.request import Request as VcrRequest
from vcr.serializers.compat import convert_body_to_bytes
_logger = logging.getLogger(__name__)
async def _convert_byte_stream(stream):
if isinstance(stream, Iterable):
return list(stream)
if isinstance(stream, AsyncIterable):
return [part async for part in stream]
raise TypeError(
f"_convert_byte_stream: stream must be Iterable or AsyncIterable, got {type(stream).__name__}",
)
def _serialize_headers(real_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore serialize every header key to a list of values.
"""
headers = defaultdict(list)
for name, value in real_response.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))
return dict(headers)
async def _serialize_response(real_response):
# The reason_phrase may not exist
try:
reason_phrase = real_response.extensions["reason_phrase"].decode("ascii")
except KeyError:
reason_phrase = None
# Reading the response stream consumes the iterator, so we need to restore it afterwards
content = b"".join(await _convert_byte_stream(real_response.stream))
real_response.stream = ByteStream(content)
return {
"status": {"code": real_response.status, "message": reason_phrase},
"headers": _serialize_headers(real_response),
"body": {"string": content},
}
def _deserialize_headers(headers):
"""
httpcore accepts headers as list of tuples of header key and value.
"""
return [
(name.encode("ascii"), value.encode("ascii")) for name, values in headers.items() for value in values
]
def _deserialize_response(vcr_response):
# Cassette format generated for HTTPX requests by older versions of
# vcrpy. We restructure the content to resemble what a regular
# cassette looks like.
if "status_code" in vcr_response:
vcr_response = decode_response(
convert_body_to_bytes(
{
"headers": vcr_response["headers"],
"body": {"string": vcr_response["content"]},
"status": {"code": vcr_response["status_code"]},
},
),
)
extensions = None
else:
extensions = (
{"reason_phrase": vcr_response["status"]["message"].encode("ascii")}
if vcr_response["status"]["message"]
else None
)
return Response(
vcr_response["status"]["code"],
headers=_deserialize_headers(vcr_response["headers"]),
content=vcr_response["body"]["string"],
extensions=extensions,
)
async def _make_vcr_request(real_request):
# Reading the request stream consumes the iterator, so we need to restore it afterwards
body = b"".join(await _convert_byte_stream(real_request.stream))
real_request.stream = ByteStream(body)
uri = bytes(real_request.url).decode("ascii")
# As per HTTPX: If there are multiple headers with the same key, then we concatenate them with commas
headers = defaultdict(list)
for name, value in real_request.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))
headers = {name: ", ".join(values) for name, values in headers.items()}
return VcrRequest(real_request.method.decode("ascii"), uri, body, headers)
async def _vcr_request(cassette, real_request):
vcr_request = await _make_vcr_request(real_request)
if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, vcr_request)
if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(
cassette=cassette,
failed_request=vcr_request,
)
_logger.info("%s not in cassette, sending to real server", vcr_request)
return vcr_request, None
async def _record_responses(cassette, vcr_request, real_response):
cassette.append(vcr_request, await _serialize_response(real_response))
def _play_responses(cassette, vcr_request):
vcr_response = cassette.play_response(vcr_request)
real_response = _deserialize_response(vcr_response)
return real_response
async def _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
):
vcr_request, vcr_response = await _vcr_request(cassette, real_request)
if vcr_response:
return vcr_response
real_response = await real_handle_async_request(self, real_request)
await _record_responses(cassette, vcr_request, real_response)
return real_response
def vcr_handle_async_request(cassette, real_handle_async_request):
@functools.wraps(real_handle_async_request)
def _inner_handle_async_request(self, real_request):
return _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
)
return _inner_handle_async_request
def _run_async_function(sync_func, *args, **kwargs):
"""
Safely run an asynchronous function from a synchronous context.
Handles both cases:
- An event loop is already running.
- No event loop exists yet.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(sync_func(*args, **kwargs))
else:
# If inside a running loop, create a task and wait for it
return asyncio.ensure_future(sync_func(*args, **kwargs))
def _vcr_handle_request(cassette, real_handle_request, self, real_request):
vcr_request, vcr_response = _run_async_function(
_vcr_request,
cassette,
real_request,
)
if vcr_response:
return vcr_response
real_response = real_handle_request(self, real_request)
_run_async_function(_record_responses, cassette, vcr_request, real_response)
return real_response
def vcr_handle_request(cassette, real_handle_request):
@functools.wraps(real_handle_request)
def _inner_handle_request(self, real_request):
return _vcr_handle_request(cassette, real_handle_request, self, real_request)
return _inner_handle_request

202
vcr/stubs/httpx_stubs.py Normal file
View File

@@ -0,0 +1,202 @@
import asyncio
import functools
import inspect
import logging
from unittest.mock import MagicMock, patch
import httpx
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.filters import decode_response
from vcr.request import Request as VcrRequest
from vcr.serializers.compat import convert_body_to_bytes
_httpx_signature = inspect.signature(httpx.Client.request)
try:
HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["follow_redirects"]
except KeyError:
HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["allow_redirects"]
_logger = logging.getLogger(__name__)
def _transform_headers(httpx_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore transform to every header key to list of values.
"""
out = {}
for key, var in httpx_response.headers.raw:
decoded_key = key.decode("utf-8")
out.setdefault(decoded_key, [])
out[decoded_key].append(var.decode("utf-8"))
return out
async def _to_serialized_response(resp, aread):
# The content shouldn't already have been read in by HTTPX.
assert not hasattr(resp, "_decoder")
# Retrieve the content, but without decoding it.
with patch.dict(resp.headers, {"Content-Encoding": ""}):
if aread:
await resp.aread()
else:
resp.read()
result = {
"status": {"code": resp.status_code, "message": resp.reason_phrase},
"headers": _transform_headers(resp),
"body": {"string": resp.content},
}
# As the content wasn't decoded, we restore the response to a state which
# will be capable of decoding the content for the consumer.
del resp._decoder
resp._content = resp._get_content_decoder().decode(resp.content)
return result
def _from_serialized_headers(headers):
"""
httpx accepts headers as list of tuples of header key and value.
"""
header_list = []
for key, values in headers.items():
for v in values:
header_list.append((key, v))
return header_list
@patch("httpx.Response.close", MagicMock())
@patch("httpx.Response.read", MagicMock())
def _from_serialized_response(request, serialized_response, history=None):
# Cassette format generated for HTTPX requests by older versions of
# vcrpy. We restructure the content to resemble what a regular
# cassette looks like.
if "status_code" in serialized_response:
serialized_response = decode_response(
convert_body_to_bytes(
{
"headers": serialized_response["headers"],
"body": {"string": serialized_response["content"]},
"status": {"code": serialized_response["status_code"]},
},
),
)
extensions = None
else:
extensions = {"reason_phrase": serialized_response["status"]["message"].encode()}
response = httpx.Response(
status_code=serialized_response["status"]["code"],
request=request,
headers=_from_serialized_headers(serialized_response["headers"]),
content=serialized_response["body"]["string"],
history=history or [],
extensions=extensions,
)
return response
def _make_vcr_request(httpx_request, **kwargs):
body = httpx_request.read().decode("utf-8")
uri = str(httpx_request.url)
headers = dict(httpx_request.headers)
return VcrRequest(httpx_request.method, uri, body, headers)
def _shared_vcr_send(cassette, real_send, *args, **kwargs):
real_request = args[1]
vcr_request = _make_vcr_request(real_request, **kwargs)
if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, real_request, vcr_request, args[0], kwargs)
if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(cassette=cassette, failed_request=vcr_request)
_logger.info("%s not in cassette, sending to real server", vcr_request)
return vcr_request, None
async def _record_responses(cassette, vcr_request, real_response, aread):
for past_real_response in real_response.history:
past_vcr_request = _make_vcr_request(past_real_response.request)
cassette.append(past_vcr_request, await _to_serialized_response(past_real_response, aread))
if real_response.history:
# If there was a redirection keep we want the request which will hold the
# final redirect value
vcr_request = _make_vcr_request(real_response.request)
cassette.append(vcr_request, await _to_serialized_response(real_response, aread))
return real_response
def _play_responses(cassette, request, vcr_request, client, kwargs):
vcr_response = cassette.play_response(vcr_request)
response = _from_serialized_response(request, vcr_response)
return response
async def _async_vcr_send(cassette, real_send, *args, **kwargs):
vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs)
if response:
# add cookies from response to session cookie store
args[0].cookies.extract_cookies(response)
return response
real_response = await real_send(*args, **kwargs)
await _record_responses(cassette, vcr_request, real_response, aread=True)
return real_response
def async_vcr_send(cassette, real_send):
@functools.wraps(real_send)
def _inner_send(*args, **kwargs):
return _async_vcr_send(cassette, real_send, *args, **kwargs)
return _inner_send
def _run_async_function(sync_func, *args, **kwargs):
"""
Safely run an asynchronous function from a synchronous context.
Handles both cases:
- An event loop is already running.
- No event loop exists yet.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(sync_func(*args, **kwargs))
else:
# If inside a running loop, create a task and wait for it
return asyncio.ensure_future(sync_func(*args, **kwargs))
def _sync_vcr_send(cassette, real_send, *args, **kwargs):
vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs)
if response:
# add cookies from response to session cookie store
args[0].cookies.extract_cookies(response)
return response
real_response = real_send(*args, **kwargs)
_run_async_function(_record_responses, cassette, vcr_request, real_response, aread=False)
return real_response
def sync_vcr_send(cassette, real_send):
@functools.wraps(real_send)
def _inner_send(*args, **kwargs):
return _sync_vcr_send(cassette, real_send, *args, **kwargs)
return _inner_send