1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

...

12 Commits

Author SHA1 Message Date
Matthias (~talfus-laddus)
d5ba702a1b only log message if response is appended
Closes: https://github.com/kevin1024/vcrpy/issues/685
2025-11-19 13:53:06 -03:00
Seow Alex
952994b365 Patch httpcore instead of httpx 2025-11-19 13:44:40 -03:00
Jair Henrique
e2f3240835 Fixes some tornado tests 2025-11-19 12:18:05 -03:00
Jair Henrique
bb690833bc Enables SIM ruff lint 2025-11-19 12:18:05 -03:00
Jair Henrique
73eed94c47 Drops Python 3.9 support 2025-11-19 12:18:05 -03:00
dependabot[bot]
a23fe0333a build(deps): bump actions/setup-python from 5 to 6
Bumps [actions/setup-python](https://github.com/actions/setup-python) from 5 to 6.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](https://github.com/actions/setup-python/compare/v5...v6)

---
updated-dependencies:
- dependency-name: actions/setup-python
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-11-19 11:14:40 -03:00
dependabot[bot]
bb743861b6 build(deps): bump astral-sh/setup-uv from 6 to 7
Bumps [astral-sh/setup-uv](https://github.com/astral-sh/setup-uv) from 6 to 7.
- [Release notes](https://github.com/astral-sh/setup-uv/releases)
- [Commits](https://github.com/astral-sh/setup-uv/compare/v6...v7)

---
updated-dependencies:
- dependency-name: astral-sh/setup-uv
  dependency-version: '7'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-11-19 06:57:48 -03:00
dependabot[bot]
ac70eaa17f build(deps): bump astral-sh/setup-uv from 5 to 6
Bumps [astral-sh/setup-uv](https://github.com/astral-sh/setup-uv) from 5 to 6.
- [Release notes](https://github.com/astral-sh/setup-uv/releases)
- [Commits](https://github.com/astral-sh/setup-uv/compare/v5...v6)

---
updated-dependencies:
- dependency-name: astral-sh/setup-uv
  dependency-version: '6'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-09-11 16:57:56 -03:00
dependabot[bot]
d50f3385a6 build(deps): bump actions/checkout from 4 to 5
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to 5.
- [Release notes](https://github.com/actions/checkout/releases)
- [Changelog](https://github.com/actions/checkout/blob/main/CHANGELOG.md)
- [Commits](https://github.com/actions/checkout/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/checkout
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-08-26 09:37:05 -03:00
Jair Henrique
14db4de224 Use uv on CI 2025-08-26 09:35:26 -03:00
Sebastian Pipping
2c4df79498 Merge pull request #917 from kevin1024/precommit-autoupdate
pre-commit: Autoupdate
2025-08-01 19:17:52 +02:00
pre-commit
1456673cb4 pre-commit: Autoupdate 2025-08-01 16:06:54 +00:00
33 changed files with 413 additions and 554 deletions

View File

@@ -17,6 +17,6 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v5
- name: Codespell
uses: codespell-project/actions-codespell@v2

View File

@@ -10,8 +10,8 @@ jobs:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: "3.12"

View File

@@ -6,7 +6,7 @@ on:
- master
pull_request:
schedule:
- cron: '0 16 * * 5' # Every Friday 4pm
- cron: "0 16 * * 5" # Every Friday 4pm
workflow_dispatch:
jobs:
@@ -16,40 +16,28 @@ jobs:
fail-fast: false
matrix:
python-version:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"
- "pypy-3.9"
- "pypy-3.10"
urllib3-requirement:
- "urllib3>=2"
- "urllib3<2"
exclude:
- python-version: "3.9"
urllib3-requirement: "urllib3>=2"
- python-version: "pypy-3.9"
urllib3-requirement: "urllib3>=2"
- python-version: "pypy-3.10"
urllib3-requirement: "urllib3>=2"
- "pypy-3.11"
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Install uv
uses: astral-sh/setup-uv@v7
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}
cache: pip
allow-prereleases: true
- name: Install project dependencies
run: |
pip install --upgrade pip setuptools
pip install codecov '.[tests]' '${{ matrix.urllib3-requirement }}'
pip check
uv pip install --system --upgrade pip setuptools
uv pip install --system codecov '.[tests]'
uv pip check
- name: Allow creation of user namespaces (e.g. to the unshare command)
run: |

View File

@@ -18,10 +18,10 @@ jobs:
name: Detect outdated pre-commit hooks
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5
- name: Set up Python 3.12
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: 3.12

View File

@@ -13,8 +13,8 @@ jobs:
name: Run pre-commit
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: actions/checkout@v5
- uses: actions/setup-python@v6
with:
python-version: 3.12
- uses: pre-commit/action@v3.0.1

View File

@@ -3,7 +3,7 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.2
rev: v0.14.5
hooks:
- id: ruff
args: ["--output-format=full"]

View File

@@ -7,6 +7,10 @@ For a full list of triaged issues, bugs and PRs and what release they are target
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
- Unreleased
- Drop support for Python 3.9
- Drop support for urllib3 < 2
- 7.0.0
- Drop support for python 3.8 (major version bump) - thanks @jairhenrique
- Various linting and test fixes - thanks @jairhenrique

View File

@@ -22,6 +22,7 @@ The following HTTP libraries are supported:
- ``urllib2``
- ``urllib3``
- ``httpx``
- ``httpcore``
Speed
-----

View File

@@ -2,14 +2,15 @@
skip = '.git,*.pdf,*.svg,.tox'
ignore-regex = "\\\\[fnrstv]"
[tool.pytest.ini_options]
[tool.pytest]
addopts = ["--strict-config", "--strict-markers"]
asyncio_default_fixture_loop_scope = "function"
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"
markers = ["online"]
[tool.ruff]
line-length = 110
target-version = "py39"
target-version = "py310"
[tool.ruff.lint]
select = [
@@ -24,6 +25,7 @@ select = [
"RUF", # Ruff-specific rules
"UP", # pyupgrade
"W", # pycodestyle warning
"SIM",
]
[tool.ruff.lint.isort]

View File

@@ -3,10 +3,11 @@
import codecs
import os
import re
from pathlib import Path
from setuptools import find_packages, setup
long_description = open("README.rst").read()
long_description = Path("README.rst").read_text()
here = os.path.abspath(os.path.dirname(__file__))
@@ -29,42 +30,27 @@ def find_version(*file_paths):
install_requires = [
"PyYAML",
"wrapt",
"yarl",
# Support for urllib3 >=2 needs CPython >=3.10
# so we need to block urllib3 >=2 for Python <3.10 and PyPy for now.
# Note that vcrpy would work fine without any urllib3 around,
# so this block and the dependency can be dropped at some point
# in the future. For more Details:
# https://github.com/kevin1024/vcrpy/pull/699#issuecomment-1551439663
"urllib3 <2; python_version <'3.10'",
# https://github.com/kevin1024/vcrpy/pull/775#issuecomment-1847849962
"urllib3 <2; platform_python_implementation =='PyPy'",
# Workaround for Poetry with CPython >= 3.10, problem description at:
# https://github.com/kevin1024/vcrpy/pull/826
"urllib3; platform_python_implementation !='PyPy' and python_version >='3.10'",
]
extras_require = {
"tests": [
"aiohttp",
"boto3",
"cryptography",
"httpbin",
"httpcore",
"httplib2",
"httpx",
"pycurl; platform_python_implementation !='PyPy'",
"pytest",
"pytest-aiohttp",
"pytest-asyncio",
"pytest-cov",
"pytest-httpbin",
"pytest",
"requests>=2.22.0",
"tornado",
"urllib3",
# Needed to un-break httpbin 0.7.0. For httpbin >=0.7.1 and after,
# this pin and the dependency itself can be removed, provided
# that the related bug in httpbin has been fixed:
# https://github.com/kevin1024/vcrpy/issues/645#issuecomment-1562489489
# https://github.com/postmanlabs/httpbin/issues/673
# https://github.com/postmanlabs/httpbin/pull/674
"Werkzeug==2.0.3",
"werkzeug==2.0.3",
],
}
@@ -78,7 +64,7 @@ setup(
author_email="me@kevinmccarthy.org",
url="https://github.com/kevin1024/vcrpy",
packages=find_packages(exclude=["tests*"]),
python_requires=">=3.9",
python_requires=">=3.10",
install_requires=install_requires,
license="MIT",
extras_require=extras_require,
@@ -89,7 +75,6 @@ setup(
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",

View File

@@ -193,9 +193,11 @@ def test_params_same_url_distinct_params(tmpdir, httpbin):
assert cassette.play_count == 1
other_params = {"other": "params"}
with vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette:
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
get(url, output="text", params=other_params)
with (
vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette,
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
):
get(url, output="text", params=other_params)
@pytest.mark.online
@@ -264,12 +266,6 @@ def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
assert cassette.play_count == 1
def test_cleanup_from_pytest_asyncio():
# work around https://github.com/pytest-dev/pytest-asyncio/issues/724
asyncio.get_event_loop().close()
asyncio.set_event_loop(None)
@pytest.mark.online
def test_redirect(tmpdir, httpbin):
url = httpbin.url + "/redirect/2"

View File

@@ -62,9 +62,8 @@ def test_override_match_on(tmpdir, httpbin):
def test_missing_matcher():
my_vcr = vcr.VCR()
my_vcr.register_matcher("awesome", object)
with pytest.raises(KeyError):
with my_vcr.use_cassette("test.yaml", match_on=["notawesome"]):
pass
with pytest.raises(KeyError), my_vcr.use_cassette("test.yaml", match_on=["notawesome"]):
pass
@pytest.mark.online
@@ -81,9 +80,8 @@ def test_dont_record_on_exception(tmpdir, httpbin):
assert not os.path.exists(str(tmpdir.join("dontsave.yml")))
# Make sure context decorator has the same behavior
with pytest.raises(AssertionError):
with my_vcr.use_cassette(str(tmpdir.join("dontsave2.yml"))):
assert b"Not in content" in urlopen(httpbin.url).read()
with pytest.raises(AssertionError), my_vcr.use_cassette(str(tmpdir.join("dontsave2.yml"))):
assert b"Not in content" in urlopen(httpbin.url).read()
assert not os.path.exists(str(tmpdir.join("dontsave2.yml")))

View File

@@ -60,9 +60,8 @@ class DoSyncRequest(BaseDoRequest):
return b"".join(response.iter_bytes())
# Use one-time context and dispose of the client afterwards
with self:
with self.client.stream(*args, **kwargs) as response:
return b"".join(response.iter_bytes())
with self, self.client.stream(*args, **kwargs) as response:
return b"".join(response.iter_bytes())
class DoAsyncRequest(BaseDoRequest):
@@ -195,9 +194,11 @@ def test_params_same_url_distinct_params(tmpdir, httpbin, do_request):
assert cassette.play_count == 1
params = {"other": "params"}
with vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette:
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
do_request()("GET", url, params=params, headers=headers)
with (
vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette,
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
):
do_request()("GET", url, params=params, headers=headers)
@pytest.mark.online

View File

@@ -51,9 +51,11 @@ def test_matchers(httpbin, httpbin_secure, cassette, matcher, matching_uri, not_
assert cass.play_count == 1
# play cassette with not matching on uri, it should fail
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette, match_on=[matcher]) as cass:
urlopen(not_matching_uri)
with (
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
vcr.use_cassette(cassette, match_on=[matcher]) as cass,
):
urlopen(not_matching_uri)
def test_method_matcher(cassette, httpbin, httpbin_secure):
@@ -65,10 +67,12 @@ def test_method_matcher(cassette, httpbin, httpbin_secure):
assert cass.play_count == 1
# should fail if method does not match
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette, match_on=["method"]) as cass:
# is a POST request
urlopen(default_uri, data=b"")
with (
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
vcr.use_cassette(cassette, match_on=["method"]) as cass,
):
# is a POST request
urlopen(default_uri, data=b"")
@pytest.mark.parametrize(
@@ -98,14 +102,12 @@ def test_default_matcher_matches(cassette, uri, httpbin, httpbin_secure):
)
def test_default_matcher_does_not_match(cassette, uri, httpbin, httpbin_secure):
uri = _replace_httpbin(uri, httpbin, httpbin_secure)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette):
urlopen(uri)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException), vcr.use_cassette(cassette):
urlopen(uri)
def test_default_matcher_does_not_match_on_method(cassette, httpbin, httpbin_secure):
default_uri = _replace_httpbin(DEFAULT_URI, httpbin, httpbin_secure)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette):
# is a POST request
urlopen(default_uri, data=b"")
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException), vcr.use_cassette(cassette):
# is a POST request
urlopen(default_uri, data=b"")

View File

@@ -124,9 +124,11 @@ def test_none_record_mode(tmpdir, httpbin):
# Cassette file doesn't exist, yet we are trying to make a request.
# raise hell.
testfile = str(tmpdir.join("recordmode.yml"))
with vcr.use_cassette(testfile, record_mode=vcr.mode.NONE):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(httpbin.url).read()
with (
vcr.use_cassette(testfile, record_mode=vcr.mode.NONE),
pytest.raises(CannotOverwriteExistingCassetteException),
):
urlopen(httpbin.url).read()
def test_none_record_mode_with_existing_cassette(tmpdir, httpbin):

View File

@@ -83,6 +83,5 @@ def test_load_cassette_persister_exception_handling(tmpdir, httpbin):
with my_vcr.use_cassette("bad/encoding") as cass:
assert len(cass) == 0
with pytest.raises(ValueError):
with my_vcr.use_cassette("bad/buggy") as cass:
pass
with pytest.raises(ValueError), my_vcr.use_cassette("bad/buggy") as cass:
pass

View File

@@ -66,7 +66,7 @@ def test_original_decoded_response_is_not_modified(tmpdir, httpbin):
# Assert that we do not modify the original response while appending
# to the cassette.
assert "gzip" == inside.headers["content-encoding"]
assert inside.headers["content-encoding"] == "gzip"
# They should effectively be the same response.
inside_headers = (h for h in inside.headers.items() if h[0].lower() != "date")
@@ -122,7 +122,7 @@ def test_original_response_is_not_modified_by_before_filter(tmpdir, httpbin):
# Furthermore, the responses should be identical.
inside_body = json.loads(inside.read())
outside_body = json.loads(outside.read())
assert not inside_body[field_to_scrub] == replacement
assert inside_body[field_to_scrub] != replacement
assert inside_body[field_to_scrub] == outside_body[field_to_scrub]
# Ensure that when a cassette exists, the scrubbed response is returned.

View File

@@ -4,6 +4,8 @@ import asyncio
import functools
import inspect
import json
import os
import ssl
import pytest
@@ -36,23 +38,23 @@ def gen_test(func):
return wrapper
@pytest.fixture(params=["https", "http"])
def scheme(request):
"""Fixture that returns both http and https."""
return request.param
@pytest.fixture(params=["simple", "curl", "default"])
def get_client(request):
ca_bundle_path = os.environ.get("REQUESTS_CA_BUNDLE")
ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations(cafile=ca_bundle_path)
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
if request.param == "simple":
from tornado import simple_httpclient as simple
return lambda: simple.SimpleAsyncHTTPClient()
elif request.param == "curl":
return lambda: simple.SimpleAsyncHTTPClient(defaults={"ssl_options": ssl_ctx})
if request.param == "curl":
curl = pytest.importorskip("tornado.curl_httpclient")
return lambda: curl.CurlAsyncHTTPClient()
else:
return lambda: http.AsyncHTTPClient()
return lambda: curl.CurlAsyncHTTPClient(defaults={"ca_certs": ca_bundle_path})
return lambda: http.AsyncHTTPClient(defaults={"ssl_options": ssl_ctx})
def get(client, url, **kwargs):
@@ -71,42 +73,42 @@ def post(client, url, data=None, **kwargs):
@pytest.mark.online
@gen_test
def test_status_code(get_client, scheme, tmpdir):
def test_status_code(get_client, tmpdir, httpbin_both):
"""Ensure that we can read the status code"""
url = scheme + "://httpbin.org/"
url = httpbin_both.url
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))):
status_code = (yield get(get_client(), url)).code
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))) as cass:
assert status_code == (yield get(get_client(), url)).code
assert 1 == cass.play_count
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_headers(get_client, scheme, tmpdir):
def test_headers(get_client, httpbin_both, tmpdir):
"""Ensure that we can read the headers back"""
url = scheme + "://httpbin.org/"
url = httpbin_both.url
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
headers = (yield get(get_client(), url)).headers
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))) as cass:
assert headers == (yield get(get_client(), url)).headers
assert 1 == cass.play_count
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_body(get_client, tmpdir, scheme):
def test_body(get_client, tmpdir, httpbin_both):
"""Ensure the responses are all identical enough"""
url = scheme + "://httpbin.org/bytes/1024"
url = httpbin_both.url + "/bytes/1024"
with vcr.use_cassette(str(tmpdir.join("body.yaml"))):
content = (yield get(get_client(), url)).body
with vcr.use_cassette(str(tmpdir.join("body.yaml"))) as cass:
assert content == (yield get(get_client(), url)).body
assert 1 == cass.play_count
assert cass.play_count == 1
@gen_test
@@ -119,15 +121,15 @@ def test_effective_url(get_client, tmpdir, httpbin):
with vcr.use_cassette(str(tmpdir.join("url.yaml"))) as cass:
assert effective_url == (yield get(get_client(), url)).effective_url
assert 1 == cass.play_count
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_auth(get_client, tmpdir, scheme):
def test_auth(get_client, tmpdir, httpbin_both):
"""Ensure that we can handle basic auth"""
auth = ("user", "passwd")
url = scheme + "://httpbin.org/basic-auth/user/passwd"
url = httpbin_both.url + "/basic-auth/user/passwd"
with vcr.use_cassette(str(tmpdir.join("auth.yaml"))):
one = yield get(get_client(), url, auth_username=auth[0], auth_password=auth[1])
@@ -135,15 +137,15 @@ def test_auth(get_client, tmpdir, scheme):
two = yield get(get_client(), url, auth_username=auth[0], auth_password=auth[1])
assert one.body == two.body
assert one.code == two.code
assert 1 == cass.play_count
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_auth_failed(get_client, tmpdir, scheme):
def test_auth_failed(get_client, tmpdir, httpbin_both):
"""Ensure that we can save failed auth statuses"""
auth = ("user", "wrongwrongwrong")
url = scheme + "://httpbin.org/basic-auth/user/passwd"
url = httpbin_both.url + "/basic-auth/user/passwd"
with vcr.use_cassette(str(tmpdir.join("auth-failed.yaml"))) as cass:
# Ensure that this is empty to begin with
assert_cassette_empty(cass)
@@ -159,15 +161,15 @@ def test_auth_failed(get_client, tmpdir, scheme):
assert exc_info.value.code == 401
assert one.body == two.body
assert one.code == two.code == 401
assert 1 == cass.play_count
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_post(get_client, tmpdir, scheme):
def test_post(get_client, tmpdir, httpbin_both):
"""Ensure that we can post and cache the results"""
data = {"key1": "value1", "key2": "value2"}
url = scheme + "://httpbin.org/post"
url = httpbin_both.url + "/post"
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
req1 = (yield post(get_client(), url, data)).body
@@ -175,7 +177,7 @@ def test_post(get_client, tmpdir, scheme):
req2 = (yield post(get_client(), url, data)).body
assert req1 == req2
assert 1 == cass.play_count
assert cass.play_count == 1
@gen_test
@@ -192,32 +194,36 @@ def test_redirects(get_client, tmpdir, httpbin):
@pytest.mark.online
@gen_test
def test_cross_scheme(get_client, tmpdir, scheme):
def test_cross_scheme(get_client, tmpdir, httpbin, httpbin_secure):
"""Ensure that requests between schemes are treated separately"""
# First fetch a url under http, and then again under https and then
# ensure that we haven't served anything out of cache, and we have two
# requests / response pairs in the cassette
url = httpbin.url
url_secure = httpbin_secure.url
with vcr.use_cassette(str(tmpdir.join("cross_scheme.yaml"))) as cass:
yield get(get_client(), "https://httpbin.org/")
yield get(get_client(), "http://httpbin.org/")
yield get(get_client(), url)
yield get(get_client(), url_secure)
assert cass.play_count == 0
assert len(cass) == 2
# Then repeat the same requests and ensure both were replayed.
with vcr.use_cassette(str(tmpdir.join("cross_scheme.yaml"))) as cass:
yield get(get_client(), "https://httpbin.org/")
yield get(get_client(), "http://httpbin.org/")
yield get(get_client(), url)
yield get(get_client(), url_secure)
assert cass.play_count == 2
@pytest.mark.online
@gen_test
def test_gzip(get_client, tmpdir, scheme):
def test_gzip(get_client, tmpdir, httpbin_both):
"""
Ensure that httpclient is able to automatically decompress the response
body
"""
url = scheme + "://httpbin.org/gzip"
url = httpbin_both + "/gzip"
# use_gzip was renamed to decompress_response in 4.0
kwargs = {}
@@ -233,24 +239,26 @@ def test_gzip(get_client, tmpdir, scheme):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cass:
response = yield get(get_client(), url, **kwargs)
assert_is_json_bytes(response.body)
assert 1 == cass.play_count
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_https_with_cert_validation_disabled(get_client, tmpdir):
def test_https_with_cert_validation_disabled(get_client, tmpdir, httpbin_secure):
cass_path = str(tmpdir.join("cert_validation_disabled.yaml"))
url = httpbin_secure.url
with vcr.use_cassette(cass_path):
yield get(get_client(), "https://httpbin.org", validate_cert=False)
yield get(get_client(), url, validate_cert=False)
with vcr.use_cassette(cass_path) as cass:
yield get(get_client(), "https://httpbin.org", validate_cert=False)
assert 1 == cass.play_count
yield get(get_client(), url, validate_cert=False)
assert cass.play_count == 1
@gen_test
def test_unsupported_features_raises_in_future(get_client, tmpdir):
def test_unsupported_features_raises_in_future(get_client, tmpdir, httpbin):
"""Ensure that the exception for an AsyncHTTPClient feature not being
supported is raised inside the future."""
@@ -258,7 +266,7 @@ def test_unsupported_features_raises_in_future(get_client, tmpdir):
raise AssertionError("Did not expect to be called.")
with vcr.use_cassette(str(tmpdir.join("invalid.yaml"))):
future = get(get_client(), "http://httpbin.org", streaming_callback=callback)
future = get(get_client(), httpbin.url, streaming_callback=callback)
with pytest.raises(Exception) as excinfo:
yield future
@@ -292,15 +300,17 @@ def test_unsupported_features_raise_error_disabled(get_client, tmpdir):
@pytest.mark.online
@gen_test
def test_cannot_overwrite_cassette_raises_in_future(get_client, tmpdir):
def test_cannot_overwrite_cassette_raises_in_future(get_client, tmpdir, httpbin):
"""Ensure that CannotOverwriteExistingCassetteException is raised inside
the future."""
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
yield get(get_client(), "http://httpbin.org/get")
url = httpbin.url
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
future = get(get_client(), "http://httpbin.org/headers")
yield get(get_client(), url + "/get")
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
future = get(get_client(), url + "/headers")
with pytest.raises(CannotOverwriteExistingCassetteException):
yield future
@@ -312,15 +322,17 @@ def test_cannot_overwrite_cassette_raises_in_future(get_client, tmpdir):
reason="raise_error only ignores HTTPErrors due to response code",
)
@gen_test
def test_cannot_overwrite_cassette_raise_error_disabled(get_client, tmpdir):
def test_cannot_overwrite_cassette_raise_error_disabled(get_client, tmpdir, httpbin):
"""Ensure that CannotOverwriteExistingCassetteException is not raised if
raise_error=False in the fetch() call."""
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
yield get(get_client(), "http://httpbin.org/get", raise_error=False)
url = httpbin.url
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
response = yield get(get_client(), "http://httpbin.org/headers", raise_error=False)
yield get(get_client(), url + "/get", raise_error=False)
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
response = yield get(get_client(), url + "/headers", raise_error=False)
assert isinstance(response.error, CannotOverwriteExistingCassetteException)
@@ -348,46 +360,51 @@ def test_tornado_exception_can_be_caught(get_client):
@pytest.mark.online
@gen_test
def test_existing_references_get_patched(tmpdir):
def test_existing_references_get_patched(tmpdir, httpbin):
from tornado.httpclient import AsyncHTTPClient
url = httpbin.url + "/get"
with vcr.use_cassette(str(tmpdir.join("data.yaml"))):
client = AsyncHTTPClient()
yield get(client, "http://httpbin.org/get")
yield get(client, url)
with vcr.use_cassette(str(tmpdir.join("data.yaml"))) as cass:
yield get(client, "http://httpbin.org/get")
yield get(client, url)
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_existing_instances_get_patched(get_client, tmpdir):
def test_existing_instances_get_patched(get_client, tmpdir, httpbin):
"""Ensure that existing instances of AsyncHTTPClient get patched upon
entering VCR context."""
url = httpbin.url + "/get"
client = get_client()
with vcr.use_cassette(str(tmpdir.join("data.yaml"))):
yield get(client, "http://httpbin.org/get")
yield get(client, url)
with vcr.use_cassette(str(tmpdir.join("data.yaml"))) as cass:
yield get(client, "http://httpbin.org/get")
yield get(client, url)
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_request_time_is_set(get_client, tmpdir):
def test_request_time_is_set(get_client, tmpdir, httpbin):
"""Ensures that the request_time on HTTPResponses is set."""
url = httpbin.url + "/get"
with vcr.use_cassette(str(tmpdir.join("data.yaml"))):
client = get_client()
response = yield get(client, "http://httpbin.org/get")
response = yield get(client, url)
assert response.request_time is not None
with vcr.use_cassette(str(tmpdir.join("data.yaml"))) as cass:
client = get_client()
response = yield get(client, "http://httpbin.org/get")
response = yield get(client, url)
assert response.request_time is not None
assert cass.play_count == 1

View File

@@ -1,147 +0,0 @@
"""Integration tests with urllib2"""
import ssl
from urllib.parse import urlencode
from urllib.request import urlopen
import pytest_httpbin.certs
from pytest import mark
# Internal imports
import vcr
from ..assertions import assert_cassette_has_one_response
def urlopen_with_cafile(*args, **kwargs):
context = ssl.create_default_context(cafile=pytest_httpbin.certs.where())
context.check_hostname = False
kwargs["context"] = context
try:
return urlopen(*args, **kwargs)
except TypeError:
# python2/pypi don't let us override this
del kwargs["cafile"]
return urlopen(*args, **kwargs)
def test_response_code(httpbin_both, tmpdir):
"""Ensure we can read a response code from a fetch"""
url = httpbin_both.url
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))):
code = urlopen_with_cafile(url).getcode()
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))):
assert code == urlopen_with_cafile(url).getcode()
def test_random_body(httpbin_both, tmpdir):
"""Ensure we can read the content, and that it's served from cache"""
url = httpbin_both.url + "/bytes/1024"
with vcr.use_cassette(str(tmpdir.join("body.yaml"))):
body = urlopen_with_cafile(url).read()
with vcr.use_cassette(str(tmpdir.join("body.yaml"))):
assert body == urlopen_with_cafile(url).read()
def test_response_headers(httpbin_both, tmpdir):
"""Ensure we can get information from the response"""
url = httpbin_both.url
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
open1 = urlopen_with_cafile(url).info().items()
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
open2 = urlopen_with_cafile(url).info().items()
assert sorted(open1) == sorted(open2)
@mark.online
def test_effective_url(tmpdir, httpbin):
"""Ensure that the effective_url is captured"""
url = httpbin.url + "/redirect-to?url=.%2F&status_code=301"
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
effective_url = urlopen_with_cafile(url).geturl()
assert effective_url == httpbin.url + "/"
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
assert effective_url == urlopen_with_cafile(url).geturl()
def test_multiple_requests(httpbin_both, tmpdir):
"""Ensure that we can cache multiple requests"""
urls = [httpbin_both.url, httpbin_both.url, httpbin_both.url + "/get", httpbin_both.url + "/bytes/1024"]
with vcr.use_cassette(str(tmpdir.join("multiple.yaml"))) as cass:
[urlopen_with_cafile(url) for url in urls]
assert len(cass) == len(urls)
def test_get_data(httpbin_both, tmpdir):
"""Ensure that it works with query data"""
data = urlencode({"some": 1, "data": "here"})
url = httpbin_both.url + "/get?" + data
with vcr.use_cassette(str(tmpdir.join("get_data.yaml"))):
res1 = urlopen_with_cafile(url).read()
with vcr.use_cassette(str(tmpdir.join("get_data.yaml"))):
res2 = urlopen_with_cafile(url).read()
assert res1 == res2
def test_post_data(httpbin_both, tmpdir):
"""Ensure that it works when posting data"""
data = urlencode({"some": 1, "data": "here"}).encode("utf-8")
url = httpbin_both.url + "/post"
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))):
res1 = urlopen_with_cafile(url, data).read()
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))) as cass:
res2 = urlopen_with_cafile(url, data).read()
assert len(cass) == 1
assert res1 == res2
assert_cassette_has_one_response(cass)
def test_post_unicode_data(httpbin_both, tmpdir):
"""Ensure that it works when posting unicode data"""
data = urlencode({"snowman": "".encode()}).encode("utf-8")
url = httpbin_both.url + "/post"
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))):
res1 = urlopen_with_cafile(url, data).read()
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))) as cass:
res2 = urlopen_with_cafile(url, data).read()
assert len(cass) == 1
assert res1 == res2
assert_cassette_has_one_response(cass)
def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
"""Ensure that requests between schemes are treated separately"""
# First fetch a url under https, and then again under https and then
# ensure that we haven't served anything out of cache, and we have two
# requests / response pairs in the cassette
with vcr.use_cassette(str(tmpdir.join("cross_scheme.yaml"))) as cass:
urlopen_with_cafile(httpbin_secure.url)
urlopen_with_cafile(httpbin.url)
assert len(cass) == 2
assert cass.play_count == 0
def test_decorator(httpbin_both, tmpdir):
"""Test the decorator version of VCR.py"""
url = httpbin_both.url
@vcr.use_cassette(str(tmpdir.join("atts.yaml")))
def inner1():
return urlopen_with_cafile(url).getcode()
@vcr.use_cassette(str(tmpdir.join("atts.yaml")))
def inner2():
return urlopen_with_cafile(url).getcode()
assert inner1() == inner2()

View File

@@ -62,13 +62,12 @@ def test_flickr_should_respond_with_200(tmpdir):
def test_cookies(tmpdir, httpbin):
testfile = str(tmpdir.join("cookies.yml"))
with vcr.use_cassette(testfile):
with requests.Session() as s:
s.get(httpbin.url + "/cookies/set?k1=v1&k2=v2")
assert s.cookies.keys() == ["k1", "k2"]
with vcr.use_cassette(testfile), requests.Session() as s:
s.get(httpbin.url + "/cookies/set?k1=v1&k2=v2")
assert s.cookies.keys() == ["k1", "k2"]
r2 = s.get(httpbin.url + "/cookies")
assert sorted(r2.json()["cookies"].keys()) == ["k1", "k2"]
r2 = s.get(httpbin.url + "/cookies")
assert sorted(r2.json()["cookies"].keys()) == ["k1", "k2"]
@pytest.mark.online

View File

@@ -227,9 +227,11 @@ def test_nesting_cassette_context_managers(*args):
assert_get_response_body_is("first_response")
# Make sure a second cassette can supersede the first
with Cassette.use(path="test") as second_cassette:
with mock.patch.object(second_cassette, "play_response", return_value=second_response):
assert_get_response_body_is("second_response")
with (
Cassette.use(path="test") as second_cassette,
mock.patch.object(second_cassette, "play_response", return_value=second_response),
):
assert_get_response_body_is("second_response")
# Now the first cassette should be back in effect
assert_get_response_body_is("first_response")

View File

@@ -8,15 +8,13 @@ from vcr.serializers import compat, jsonserializer, yamlserializer
def test_deserialize_old_yaml_cassette():
with open("tests/fixtures/migration/old_cassette.yaml") as f:
with pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
with open("tests/fixtures/migration/old_cassette.yaml") as f, pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
def test_deserialize_old_json_cassette():
with open("tests/fixtures/migration/old_cassette.json") as f:
with pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
with open("tests/fixtures/migration/old_cassette.json") as f, pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
def test_deserialize_new_yaml_cassette():
@@ -76,7 +74,7 @@ def test_deserialize_py2py3_yaml_cassette(tmpdir, req_body, expect):
cfile = tmpdir.join("test_cassette.yaml")
cfile.write(REQBODY_TEMPLATE.format(req_body=req_body))
with open(str(cfile)) as f:
(requests, responses) = deserialize(f.read(), yamlserializer)
(requests, _) = deserialize(f.read(), yamlserializer)
assert requests[0].body == expect

View File

@@ -178,7 +178,7 @@ def test_testcase_playback(tmpdir):
return str(cassette_dir)
test = run_testcase(MyTest)[0][0]
assert b"illustrative examples" in test.response
assert b"Example Domain" in test.response
assert len(test.cassette.requests) == 1
assert test.cassette.play_count == 0
@@ -186,7 +186,7 @@ def test_testcase_playback(tmpdir):
test2 = run_testcase(MyTest)[0][0]
assert test.cassette is not test2.cassette
assert b"illustrative examples" in test.response
assert b"Example Domain" in test.response
assert len(test2.cassette.requests) == 1
assert test2.cassette.play_count == 1

View File

@@ -225,10 +225,10 @@ class Cassette:
def append(self, request, response):
"""Add a request, response pair to this cassette"""
log.info("Appending request %s and response %s", request, response)
request = self._before_record_request(request)
if not request:
return
log.info("Appending request %s and response %s", request, response)
# Deepcopy is here because mutation of `response` will corrupt the
# real response.
response = copy.deepcopy(response)
@@ -359,7 +359,7 @@ class Cassette:
def _load(self):
try:
requests, responses = self._persister.load_cassette(self._path, serializer=self._serializer)
for request, response in zip(requests, responses):
for request, response in zip(requests, responses, strict=False):
self.append(request, response)
self._old_interactions.append((request, response))
self.dirty = False

View File

@@ -162,7 +162,7 @@ def _get_transformers(request):
def requests_match(r1, r2, matchers):
successes, failures = get_matchers_results(r1, r2, matchers)
_, failures = get_matchers_results(r1, r2, matchers)
if failures:
log.debug(f"Requests {r1} and {r2} differ.\nFailure details:\n{failures}")
return len(failures) == 0

View File

@@ -92,12 +92,12 @@ else:
try:
import httpx
import httpcore
except ImportError: # pragma: no cover
pass
else:
_HttpxSyncClient_send_single_request = httpx.Client._send_single_request
_HttpxAsyncClient_send_single_request = httpx.AsyncClient._send_single_request
_HttpcoreConnectionPool_handle_request = httpcore.ConnectionPool.handle_request
_HttpcoreAsyncConnectionPool_handle_async_request = httpcore.AsyncConnectionPool.handle_async_request
class CassettePatcherBuilder:
@@ -121,7 +121,7 @@ class CassettePatcherBuilder:
self._httplib2(),
self._tornado(),
self._aiohttp(),
self._httpx(),
self._httpcore(),
self._build_patchers_from_mock_triples(self._cassette.custom_patches),
)
@@ -304,19 +304,22 @@ class CassettePatcherBuilder:
yield client.ClientSession, "_request", new_request
@_build_patchers_from_mock_triples_decorator
def _httpx(self):
def _httpcore(self):
try:
import httpx
import httpcore
except ImportError: # pragma: no cover
return
else:
from .stubs.httpx_stubs import async_vcr_send, sync_vcr_send
from .stubs.httpcore_stubs import vcr_handle_async_request, vcr_handle_request
new_async_client_send = async_vcr_send(self._cassette, _HttpxAsyncClient_send_single_request)
yield httpx.AsyncClient, "_send_single_request", new_async_client_send
new_handle_async_request = vcr_handle_async_request(
self._cassette,
_HttpcoreAsyncConnectionPool_handle_async_request,
)
yield httpcore.AsyncConnectionPool, "handle_async_request", new_handle_async_request
new_sync_client_send = sync_vcr_send(self._cassette, _HttpxSyncClient_send_single_request)
yield httpx.Client, "_send_single_request", new_sync_client_send
new_handle_request = vcr_handle_request(self._cassette, _HttpcoreConnectionPool_handle_request)
yield httpcore.ConnectionPool, "handle_request", new_handle_request
def _urllib3_patchers(self, cpool, conn, stubs):
http_connection_remover = ConnectionRemover(

View File

@@ -1,5 +1,6 @@
import logging
import warnings
from contextlib import suppress
from io import BytesIO
from urllib.parse import parse_qsl, urlparse
@@ -80,10 +81,9 @@ class Request:
def port(self):
port = self.parsed_uri.port
if port is None:
try:
with suppress(KeyError):
port = {"https": 443, "http": 80}[self.parsed_uri.scheme]
except KeyError:
pass
return port
@property

View File

@@ -53,7 +53,7 @@ def serialize(cassette_dict, serializer):
"request": compat.convert_to_unicode(request._to_dict()),
"response": compat.convert_to_unicode(response),
}
for request, response in zip(cassette_dict["requests"], cassette_dict["responses"])
for request, response in zip(cassette_dict["requests"], cassette_dict["responses"], strict=False)
]
data = {"version": CASSETTE_FORMAT_VERSION, "interactions": interactions}
return serializer.serialize(data)

View File

@@ -1,6 +1,7 @@
"""Stubs for patching HTTP and HTTPS requests"""
import logging
from contextlib import suppress
from http.client import HTTPConnection, HTTPResponse, HTTPSConnection
from io import BytesIO
@@ -77,7 +78,7 @@ class VCRHTTPResponse(HTTPResponse):
# libraries trying to process a chunked response. By removing the
# transfer-encoding: chunked header, this should cause the downstream
# libraries to process this as a non-chunked response.
te_key = [h for h in headers.keys() if h.upper() == "TRANSFER-ENCODING"]
te_key = [h for h in headers if h.upper() == "TRANSFER-ENCODING"]
if te_key:
del headers[te_key[0]]
self.headers = self.msg = parse_headers(headers)
@@ -370,12 +371,8 @@ class VCRConnection:
TODO: Separately setting the attribute on the two instances is not
ideal. We should switch to a proxying implementation.
"""
try:
with suppress(AttributeError):
setattr(self.real_connection, name, value)
except AttributeError:
# raised if real_connection has not been set yet, such as when
# we're setting the real_connection itself for the first time
pass
super().__setattr__(name, value)

View File

@@ -6,7 +6,6 @@ import json
import logging
from collections.abc import Mapping
from http.cookies import CookieError, Morsel, SimpleCookie
from typing import Union
from aiohttp import ClientConnectionError, ClientResponse, CookieJar, RequestInfo, hdrs, streams
from aiohttp.helpers import strip_auth_from_url
@@ -230,7 +229,7 @@ def _build_cookie_header(session, cookies, cookie_header, url):
return c.output(header="", sep=";").strip()
def _build_url_with_params(url_str: str, params: Mapping[str, Union[str, int, float]]) -> URL:
def _build_url_with_params(url_str: str, params: Mapping[str, str | int | float]) -> URL:
# This code is basically a copy&paste of aiohttp.
# https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_reqrep.py#L225
url = URL(url_str)

215
vcr/stubs/httpcore_stubs.py Normal file
View File

@@ -0,0 +1,215 @@
import asyncio
import functools
import logging
from collections import defaultdict
from collections.abc import AsyncIterable, Iterable
from httpcore import Response
from httpcore._models import ByteStream
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.filters import decode_response
from vcr.request import Request as VcrRequest
from vcr.serializers.compat import convert_body_to_bytes
_logger = logging.getLogger(__name__)
async def _convert_byte_stream(stream):
if isinstance(stream, Iterable):
return list(stream)
if isinstance(stream, AsyncIterable):
return [part async for part in stream]
raise TypeError(
f"_convert_byte_stream: stream must be Iterable or AsyncIterable, got {type(stream).__name__}",
)
def _serialize_headers(real_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore serialize every header key to a list of values.
"""
headers = defaultdict(list)
for name, value in real_response.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))
return dict(headers)
async def _serialize_response(real_response):
# The reason_phrase may not exist
try:
reason_phrase = real_response.extensions["reason_phrase"].decode("ascii")
except KeyError:
reason_phrase = None
# Reading the response stream consumes the iterator, so we need to restore it afterwards
content = b"".join(await _convert_byte_stream(real_response.stream))
real_response.stream = ByteStream(content)
return {
"status": {"code": real_response.status, "message": reason_phrase},
"headers": _serialize_headers(real_response),
"body": {"string": content},
}
def _deserialize_headers(headers):
"""
httpcore accepts headers as list of tuples of header key and value.
"""
return [
(name.encode("ascii"), value.encode("ascii")) for name, values in headers.items() for value in values
]
def _deserialize_response(vcr_response):
# Cassette format generated for HTTPX requests by older versions of
# vcrpy. We restructure the content to resemble what a regular
# cassette looks like.
if "status_code" in vcr_response:
vcr_response = decode_response(
convert_body_to_bytes(
{
"headers": vcr_response["headers"],
"body": {"string": vcr_response["content"]},
"status": {"code": vcr_response["status_code"]},
},
),
)
extensions = None
else:
extensions = (
{"reason_phrase": vcr_response["status"]["message"].encode("ascii")}
if vcr_response["status"]["message"]
else None
)
return Response(
vcr_response["status"]["code"],
headers=_deserialize_headers(vcr_response["headers"]),
content=vcr_response["body"]["string"],
extensions=extensions,
)
async def _make_vcr_request(real_request):
# Reading the request stream consumes the iterator, so we need to restore it afterwards
body = b"".join(await _convert_byte_stream(real_request.stream))
real_request.stream = ByteStream(body)
uri = bytes(real_request.url).decode("ascii")
# As per HTTPX: If there are multiple headers with the same key, then we concatenate them with commas
headers = defaultdict(list)
for name, value in real_request.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))
headers = {name: ", ".join(values) for name, values in headers.items()}
return VcrRequest(real_request.method.decode("ascii"), uri, body, headers)
async def _vcr_request(cassette, real_request):
vcr_request = await _make_vcr_request(real_request)
if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, vcr_request)
if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(
cassette=cassette,
failed_request=vcr_request,
)
_logger.info("%s not in cassette, sending to real server", vcr_request)
return vcr_request, None
async def _record_responses(cassette, vcr_request, real_response):
cassette.append(vcr_request, await _serialize_response(real_response))
def _play_responses(cassette, vcr_request):
vcr_response = cassette.play_response(vcr_request)
real_response = _deserialize_response(vcr_response)
return real_response
async def _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
):
vcr_request, vcr_response = await _vcr_request(cassette, real_request)
if vcr_response:
return vcr_response
real_response = await real_handle_async_request(self, real_request)
await _record_responses(cassette, vcr_request, real_response)
return real_response
def vcr_handle_async_request(cassette, real_handle_async_request):
@functools.wraps(real_handle_async_request)
def _inner_handle_async_request(self, real_request):
return _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
)
return _inner_handle_async_request
def _run_async_function(sync_func, *args, **kwargs):
"""
Safely run an asynchronous function from a synchronous context.
Handles both cases:
- An event loop is already running.
- No event loop exists yet.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(sync_func(*args, **kwargs))
else:
# If inside a running loop, create a task and wait for it
return asyncio.ensure_future(sync_func(*args, **kwargs))
def _vcr_handle_request(cassette, real_handle_request, self, real_request):
vcr_request, vcr_response = _run_async_function(
_vcr_request,
cassette,
real_request,
)
if vcr_response:
return vcr_response
real_response = real_handle_request(self, real_request)
_run_async_function(_record_responses, cassette, vcr_request, real_response)
return real_response
def vcr_handle_request(cassette, real_handle_request):
@functools.wraps(real_handle_request)
def _inner_handle_request(self, real_request):
return _vcr_handle_request(cassette, real_handle_request, self, real_request)
return _inner_handle_request

View File

@@ -1,202 +0,0 @@
import asyncio
import functools
import inspect
import logging
from unittest.mock import MagicMock, patch
import httpx
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.filters import decode_response
from vcr.request import Request as VcrRequest
from vcr.serializers.compat import convert_body_to_bytes
_httpx_signature = inspect.signature(httpx.Client.request)
try:
HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["follow_redirects"]
except KeyError:
HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["allow_redirects"]
_logger = logging.getLogger(__name__)
def _transform_headers(httpx_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore transform to every header key to list of values.
"""
out = {}
for key, var in httpx_response.headers.raw:
decoded_key = key.decode("utf-8")
out.setdefault(decoded_key, [])
out[decoded_key].append(var.decode("utf-8"))
return out
async def _to_serialized_response(resp, aread):
# The content shouldn't already have been read in by HTTPX.
assert not hasattr(resp, "_decoder")
# Retrieve the content, but without decoding it.
with patch.dict(resp.headers, {"Content-Encoding": ""}):
if aread:
await resp.aread()
else:
resp.read()
result = {
"status": {"code": resp.status_code, "message": resp.reason_phrase},
"headers": _transform_headers(resp),
"body": {"string": resp.content},
}
# As the content wasn't decoded, we restore the response to a state which
# will be capable of decoding the content for the consumer.
del resp._decoder
resp._content = resp._get_content_decoder().decode(resp.content)
return result
def _from_serialized_headers(headers):
"""
httpx accepts headers as list of tuples of header key and value.
"""
header_list = []
for key, values in headers.items():
for v in values:
header_list.append((key, v))
return header_list
@patch("httpx.Response.close", MagicMock())
@patch("httpx.Response.read", MagicMock())
def _from_serialized_response(request, serialized_response, history=None):
# Cassette format generated for HTTPX requests by older versions of
# vcrpy. We restructure the content to resemble what a regular
# cassette looks like.
if "status_code" in serialized_response:
serialized_response = decode_response(
convert_body_to_bytes(
{
"headers": serialized_response["headers"],
"body": {"string": serialized_response["content"]},
"status": {"code": serialized_response["status_code"]},
},
),
)
extensions = None
else:
extensions = {"reason_phrase": serialized_response["status"]["message"].encode()}
response = httpx.Response(
status_code=serialized_response["status"]["code"],
request=request,
headers=_from_serialized_headers(serialized_response["headers"]),
content=serialized_response["body"]["string"],
history=history or [],
extensions=extensions,
)
return response
def _make_vcr_request(httpx_request, **kwargs):
body = httpx_request.read().decode("utf-8")
uri = str(httpx_request.url)
headers = dict(httpx_request.headers)
return VcrRequest(httpx_request.method, uri, body, headers)
def _shared_vcr_send(cassette, real_send, *args, **kwargs):
real_request = args[1]
vcr_request = _make_vcr_request(real_request, **kwargs)
if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, real_request, vcr_request, args[0], kwargs)
if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(cassette=cassette, failed_request=vcr_request)
_logger.info("%s not in cassette, sending to real server", vcr_request)
return vcr_request, None
async def _record_responses(cassette, vcr_request, real_response, aread):
for past_real_response in real_response.history:
past_vcr_request = _make_vcr_request(past_real_response.request)
cassette.append(past_vcr_request, await _to_serialized_response(past_real_response, aread))
if real_response.history:
# If there was a redirection keep we want the request which will hold the
# final redirect value
vcr_request = _make_vcr_request(real_response.request)
cassette.append(vcr_request, await _to_serialized_response(real_response, aread))
return real_response
def _play_responses(cassette, request, vcr_request, client, kwargs):
vcr_response = cassette.play_response(vcr_request)
response = _from_serialized_response(request, vcr_response)
return response
async def _async_vcr_send(cassette, real_send, *args, **kwargs):
vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs)
if response:
# add cookies from response to session cookie store
args[0].cookies.extract_cookies(response)
return response
real_response = await real_send(*args, **kwargs)
await _record_responses(cassette, vcr_request, real_response, aread=True)
return real_response
def async_vcr_send(cassette, real_send):
@functools.wraps(real_send)
def _inner_send(*args, **kwargs):
return _async_vcr_send(cassette, real_send, *args, **kwargs)
return _inner_send
def _run_async_function(sync_func, *args, **kwargs):
"""
Safely run an asynchronous function from a synchronous context.
Handles both cases:
- An event loop is already running.
- No event loop exists yet.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(sync_func(*args, **kwargs))
else:
# If inside a running loop, create a task and wait for it
return asyncio.ensure_future(sync_func(*args, **kwargs))
def _sync_vcr_send(cassette, real_send, *args, **kwargs):
vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs)
if response:
# add cookies from response to session cookie store
args[0].cookies.extract_cookies(response)
return response
real_response = real_send(*args, **kwargs)
_run_async_function(_record_responses, cassette, vcr_request, real_response, aread=False)
return real_response
def sync_vcr_send(cassette, real_send):
@functools.wraps(real_send)
def _inner_send(*args, **kwargs):
return _sync_vcr_send(cassette, real_send, *args, **kwargs)
return _inner_send

View File

@@ -74,7 +74,7 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
return callback(response)
def new_callback(response):
headers = [(k, response.headers.get_list(k)) for k in response.headers.keys()]
headers = [(k, response.headers.get_list(k)) for k in response.headers]
vcr_response = {
"status": {"code": response.code, "message": response.reason},