1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-09 01:03:24 +00:00

Compare commits

..

33 Commits

Author SHA1 Message Date
Kevin McCarthy
d6bded1820 bump version to v5.1.0 2023-07-30 17:11:15 -10:00
Sebastian Pipping
e7c00a4bf9 Merge pull request #739 from kevin1024/issue-734-fix-body-matcher-for-chunked-requests
Fix body matcher for chunked requests (fixes #734)
2023-07-23 23:22:34 +02:00
dependabot[bot]
92dd4d00f7 build(deps): update sphinx requirement from <7 to <8
Updates the requirements on [sphinx](https://github.com/sphinx-doc/sphinx) to permit the latest version.
- [Release notes](https://github.com/sphinx-doc/sphinx/releases)
- [Changelog](https://github.com/sphinx-doc/sphinx/blob/master/CHANGES)
- [Commits](https://github.com/sphinx-doc/sphinx/compare/v0.1.61611...v7.0.1)

---
updated-dependencies:
- dependency-name: sphinx
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2023-07-23 18:19:26 -03:00
Jair Henrique
cf3ffcad61 Create action to validate docs 2023-07-23 18:09:49 -03:00
Jair Henrique
3ad462e766 Drop dependabot duplicity check 2023-07-23 18:09:49 -03:00
Jair Henrique
cdab3fcb30 Drop iscoroutinefunction fallback function for unsupported python 2023-07-23 12:30:58 -03:00
Jair Henrique
d3a5f4dd6c Add editorconfig file 2023-07-23 12:03:42 -03:00
Jair Henrique
75c8607fd2 Fix read the docs build 2023-07-20 13:17:15 -03:00
Jair Henrique
8c075c7fb3 Configure read the docs V2 2023-07-20 13:10:07 -03:00
Sebastian Pipping
a045a46bb4 Merge pull request #740 from kevin1024/issue-512-fix-query-param-filter-for-aiohttp
Fix query param filter for aiohttp (fixes #517)
2023-07-19 15:37:54 +02:00
Sebastian Pipping
1d979b078d Merge pull request #743 from charettes/remove-six
Remove unnecessary dependency on six.
2023-07-18 23:11:31 +02:00
Simon Charette
f7d76bd40a Remove unnecessary dependency on six.
Remove the last remaining usag of it in VCR.testcase.
2023-07-16 22:22:34 -04:00
Sebastian Pipping
7e11cfc9e4 Merge pull request #738 from kevin1024/json-loads-py36-plus
Make `json.loads` of Python >=3.6 decode bytes by itself
2023-07-11 17:30:39 +02:00
Sebastian Pipping
c95b7264a2 Merge pull request #735 from quasimik/patch-1
Fix typo in docs
2023-07-10 18:52:30 +02:00
Sebastian Pipping
8ab8e63e04 test_filter.py: Make test_filter_querystring meaner 2023-07-10 16:34:09 +02:00
Sebastian Pipping
d2c1da9ab7 test_aiohttp.py: Cover filter_query_parameters with aiohttp 2023-07-10 16:34:08 +02:00
Sebastian Pipping
8336d66976 aiohttp_stubs.py: Stop leaking unfiltered URL into cassette responses 2023-07-10 16:11:26 +02:00
Sebastian Pipping
e69b10c2e0 test_matchers.py: Briefly cover chunked transfer encoding 2023-07-08 02:25:51 +02:00
Sebastian Pipping
a6b9a070a5 matchers.py: Decode chunked request bodies 2023-07-08 02:25:51 +02:00
Sebastian Pipping
e35205c5c8 matchers.py: Support transforming the request body multiple times 2023-07-08 01:29:19 +02:00
Sebastian Pipping
05f61ea56c Make json.loads of Python >=3.6 decode bytes by itself
Quoting https://docs.python.org/3/library/json.html#json.loads :
> Changed in version 3.6: s can now be of type bytes or bytearray.
> The input encoding should be UTF-8, UTF-16 or UTF-32.
2023-07-07 20:00:57 +02:00
Michael Liu
943cabb14f docs/advanced.rst: fix typo 2023-07-04 16:45:49 +08:00
Jair Henrique
4f70152e7c Enable rule B (flake8-bugbear) on ruff 2023-06-27 17:36:26 -03:00
Jair Henrique
016a394f2c Enable E, W and F linters for ruff 2023-06-26 20:46:09 -03:00
Jair Henrique
6b2fc182c3 Improve string format 2023-06-26 20:46:09 -03:00
Jair Henrique
a77173c002 Use ruff as linter 2023-06-26 20:46:09 -03:00
Kevin McCarthy
34d5384318 bump version to v5.0.0 2023-06-26 12:54:39 -05:00
Sebastian Pipping
ad1010d0f8 Merge pull request #695 from kevin1024/drop37
Drop support for Python 3.7 (after 2023-06-27)
2023-06-26 18:32:42 +02:00
Amos Ng
d99593bcd3 Split persister errors into CassetteNotFoundError and CassetteDecodeError (#681) 2023-06-26 18:27:35 +02:00
Sebastian Pipping
8c03c37df4 Merge pull request #725 from kevin1024/make-assert-is-json-less-misleading
assertions.py: Fix mis-leading `assert_is_json`
2023-06-26 17:43:28 +02:00
Jair Henrique
b827cbe2da Drop support to python 3.7 2023-06-26 11:46:20 -03:00
Kevin McCarthy
92ca5a102c fix misspelled word 2023-06-26 09:22:16 -05:00
Sebastian Pipping
f21c8f0224 assertions.py: Fix mis-leading assert_is_json
Parameter name "a_string" was mistaken and function
name "assert_is_json" was less clear than ideal,
given that it explicitly needs bytes unlike json.loads .
2023-06-24 15:59:23 +02:00
62 changed files with 593 additions and 349 deletions

14
.editorconfig Normal file
View File

@@ -0,0 +1,14 @@
root = true
[*]
indent_style = space
indent_size = 4
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
[Makefile]
indent_style = tab
[*.{yml,yaml}]
indent_size = 2

23
.github/workflows/docs.yml vendored Normal file
View File

@@ -0,0 +1,23 @@
name: Validate docs
on:
push:
paths:
- 'docs/**'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install build dependencies
run: pip install -r docs/requirements.txt
- name: Rendering HTML documentation
run: sphinx-build -b html docs/ html
- name: Inspect html rendered
run: cat html/index.html

View File

@@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
python-version: ["3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
steps:
- uses: actions/checkout@v3.5.2

22
.readthedocs.yaml Normal file
View File

@@ -0,0 +1,22 @@
# .readthedocs.yaml
# Read the Docs configuration file
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
# Required
version: 2
# Set the version of Python and other tools you might need
build:
os: ubuntu-22.04
tools:
python: "3.11"
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
# We recommend specifying your dependencies to enable reproducible builds:
# https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
python:
install:
- requirements: docs/requirements.txt

View File

@@ -136,7 +136,8 @@ Create your own persistence class, see the example below:
Your custom persister must implement both ``load_cassette`` and ``save_cassette``
methods. The ``load_cassette`` method must return a deserialized cassette or raise
``ValueError`` if no cassette is found.
either ``CassetteNotFoundError`` if no cassette is found, or ``CassetteDecodeError``
if the cassette cannot be successfully deserialized.
Once the persister class is defined, register with VCR like so...
@@ -188,7 +189,7 @@ of post data parameters to filter.
.. code:: python
with my_vcr.use_cassette('test.yml', filter_post_data_parameters=['client_secret']):
with my_vcr.use_cassette('test.yml', filter_post_data_parameters=['api_key']):
requests.post('http://api.com/postdata', data={'api_key': 'secretstring'})
Advanced use of filter_headers, filter_query_parameters and filter_post_data_parameters

View File

@@ -7,11 +7,27 @@ For a full list of triaged issues, bugs and PRs and what release they are target
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
- 5.1.0
- Use ruff for linting (instead of current flake8/isort/pyflakes) - thanks @jairhenrique
- Enable rule B (flake8-bugbear) on ruff - thanks @jairhenrique
- Configure read the docs V2 - thanks @jairhenrique
- Fix typo in docs - thanks @quasimik
- Make json.loads of Python >=3.6 decode bytes by itself - thanks @hartwork
- Fix body matcher for chunked requests (fixes #734) - thanks @hartwork
- Fix query param filter for aiohttp (fixes #517) - thanks @hartwork and @salomvary
- Remove unnecessary dependency on six. - thanks @charettes
- build(deps): update sphinx requirement from <7 to <8 - thanks @jairhenrique
- Add action to validate docs - thanks @jairhenrique
- Add editorconfig file - thanks @jairhenrique
- Drop iscoroutinefunction fallback function for unsupported python thanks @jairhenrique
- 5.0.0
- BREAKING CHANGE: Drop support for Python 3.7. 3.7 is EOL as of 6/27/23 Thanks @jairhenrique
- BREAKING CHANGE: Custom Cassette persisters no longer catch ValueError. If you have implemented a custom persister (has anyone implemented a custom persister? Let us know!) then you will need to throw a CassetteNotFoundError when unable to find a cassette. See #681 for discussion and reason for this change. Thanks @amosjyng for the PR and the review from @hartwork
- 4.4.0
- HUGE thanks to @hartwork for all the work done on this release!
- Bring vcr/unittest in to vcrpy as a full feature of vcr instead of a separate library. Big thanks to @hartwork for doing this and to @agriffis for originally creating the library
- Make decompression robust towards already decompressed input (thanks @hartwork)
- Bugfix: Add read1 method (fixes compatability with biopython), thanks @mghantous
- Bugfix: Add read1 method (fixes compatibility with biopython), thanks @mghantous
- Bugfix: Prevent filters from corrupting request (thanks @abramclark)
- Bugfix: Add support for `response.raw.stream()` to fix urllib v2 compat
- Bugfix: Replace `assert` with `raise AssertionError`: fixes support for `PYTHONOPTIMIZE=1`

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# vcrpy documentation build configuration file, created by
# sphinx-quickstart on Sun Sep 13 11:18:00 2015.

View File

@@ -96,11 +96,11 @@ The test suite is pretty big and slow, but you can tell tox to only run specific
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through>
tox -e py37-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py37-requests -- -v --last-failed
tox -e py38-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py38-requests -- -v --last-failed
This will run only tests that look like ``test_status_code`` or
``test_gzip`` in the test suite, and only in the python 3.7 environment
``test_gzip`` in the test suite, and only in the python 3.8 environment
that has ``requests`` installed.
Also, in order for the boto tests to run, you will need an AWS key.
@@ -130,17 +130,17 @@ in this example::
pip3 install tox tox-pyenv
# Install supported versions (at time of writing), this does not activate them
pyenv install 3.7.5 3.8.0 pypy3.8
pyenv install 3.8.0 pypy3.8
# This activates them
pyenv local 3.7.5 3.8.0 pypy3.8
pyenv local 3.8.0 pypy3.8
# Run the whole test suite
tox
# Run the whole test suite or just part of it
tox -e lint
tox -e py37-requests
tox -e py38-requests
Troubleshooting on MacOSX

View File

@@ -9,7 +9,7 @@ with pip::
Compatibility
-------------
VCR.py supports Python 3.7+, and `pypy <http://pypy.org>`__.
VCR.py supports Python 3.8+, and `pypy <http://pypy.org>`__.
The following HTTP libraries are supported:
@@ -18,7 +18,7 @@ The following HTTP libraries are supported:
- ``boto3``
- ``http.client``
- ``httplib2``
- ``requests`` (both 1.x and 2.x versions)
- ``requests`` (>=2.16.2 versions)
- ``tornado.httpclient``
- ``urllib2``
- ``urllib3``

2
docs/requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
sphinx<8
sphinx_rtd_theme==1.2.2

View File

@@ -1,13 +1,6 @@
[tool.black]
line-length=110
[tool.isort]
line_length = 110
known_first_party = "vcrpy"
multi_line_output = 3
use_parentheses = true
include_trailing_comma = true
[tool.codespell]
skip = '.git,*.pdf,*.svg,.tox'
ignore-regex = "\\\\[fnrstv]"
@@ -18,3 +11,23 @@ ignore-regex = "\\\\[fnrstv]"
markers = [
"online",
]
[tool.ruff]
select = [
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"COM", # flake8-commas
"E", # pycodestyle error
"F", # pyflakes
"I", # isort
"ISC", # flake8-implicit-str-concat
"PIE", # flake8-pie
"RUF", # Ruff-specific rules
"UP", # pyupgrade
"W", # pycodestyle warning
]
line-length = 110
target-version = "py38"
[tool.ruff.isort]
known-first-party = [ "vcr" ]

View File

@@ -8,7 +8,7 @@ import sys
from setuptools import find_packages, setup
from setuptools.command.test import test as TestCommand
long_description = open("README.rst", "r").read()
long_description = open("README.rst").read()
here = os.path.abspath(os.path.dirname(__file__))
@@ -45,7 +45,6 @@ class PyTest(TestCommand):
install_requires = [
"PyYAML",
"wrapt",
"six>=1.5",
"yarl",
# Support for urllib3 >=2 needs Python >=3.10
# so we need to block urllib3 >=2 for Python <3.10 for now.
@@ -64,7 +63,7 @@ tests_require = [
"pytest",
"pytest-aiohttp",
"pytest-httpbin",
"requests",
"requests>=2.16.2",
"tornado",
# Needed to un-break httpbin 0.7.0. For httpbin >=0.7.1 and after,
# this pin and the dependency itself can be removed, provided
@@ -85,7 +84,7 @@ setup(
author_email="me@kevinmccarthy.org",
url="https://github.com/kevin1024/vcrpy",
packages=find_packages(exclude=["tests*"]),
python_requires=">=3.7",
python_requires=">=3.8",
install_requires=install_requires,
license="MIT",
tests_require=tests_require,
@@ -95,7 +94,6 @@ setup(
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",

View File

@@ -11,9 +11,12 @@ def assert_cassette_has_one_response(cass):
assert cass.play_count == 1
def assert_is_json(a_string):
def assert_is_json_bytes(b: bytes):
assert isinstance(b, bytes)
try:
json.loads(a_string.decode("utf-8"))
except Exception:
assert False
json.loads(b)
except Exception as error:
raise AssertionError() from error
assert True

View File

@@ -4,10 +4,11 @@ import urllib.parse
import pytest
import vcr
asyncio = pytest.importorskip("asyncio")
aiohttp = pytest.importorskip("aiohttp")
import vcr # noqa: E402
from .aiohttp_utils import aiohttp_app, aiohttp_request # noqa: E402
@@ -59,7 +60,7 @@ def test_headers(tmpdir, auth, mockbin_request_url):
request = cassette.requests[0]
assert "AUTHORIZATION" in request.headers
cassette_response, _ = get(url, auth=auth)
assert dict(cassette_response.headers) == dict(response.headers)
assert cassette_response.headers.items() == response.headers.items()
assert cassette.play_count == 1
assert "istr" not in cassette.data[0]
assert "yarl.URL" not in cassette.data[0]
@@ -151,7 +152,7 @@ def test_post(tmpdir, body, caplog, mockbin_request_url):
(
log
for log in caplog.records
if log.getMessage() == "<Request (POST) {}> not in cassette, sending to real server".format(url)
if log.getMessage() == f"<Request (POST) {url}> not in cassette, sending to real server"
),
None,
), "Log message not found."
@@ -278,9 +279,7 @@ def test_redirect(tmpdir, mockbin):
# looking request_info.
assert cassette_response.request_info.url == response.request_info.url
assert cassette_response.request_info.method == response.request_info.method
assert {k: v for k, v in cassette_response.request_info.headers.items()} == {
k: v for k, v in response.request_info.headers.items()
}
assert cassette_response.request_info.headers.items() == response.request_info.headers.items()
assert cassette_response.request_info.real_url == response.request_info.real_url
@@ -351,7 +350,10 @@ def test_cookies(httpbin_both, httpbin_ssl_context, tmpdir):
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
home_resp = await session.get(
home_url, cookies=req_cookies, headers=req_headers, ssl=httpbin_ssl_context
home_url,
cookies=req_cookies,
headers=req_headers,
ssl=httpbin_ssl_context,
)
assert cassette.play_count == 0
assert_responses(cookies_resp, home_resp)
@@ -361,7 +363,10 @@ def test_cookies(httpbin_both, httpbin_ssl_context, tmpdir):
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
home_resp = await session.get(
home_url, cookies=req_cookies, headers=req_headers, ssl=httpbin_ssl_context
home_url,
cookies=req_cookies,
headers=req_headers,
ssl=httpbin_ssl_context,
)
assert cassette.play_count == 2
assert_responses(cookies_resp, home_resp)
@@ -392,7 +397,8 @@ def test_cookies_redirect(httpbin_both, httpbin_ssl_context, tmpdir):
cookies = session.cookie_jar.filter_cookies(cookies_url)
assert cookies["Cookie_1"].value == "Val_1"
assert cassette.play_count == 0
cassette.requests[1].headers["Cookie"] == "Cookie_1=Val_1"
assert cassette.requests[1].headers["Cookie"] == "Cookie_1=Val_1"
# -------------------------- Play --------------------------- #
with vcr.use_cassette(tmp, record_mode=vcr.mode.NONE) as cassette:
@@ -402,12 +408,13 @@ def test_cookies_redirect(httpbin_both, httpbin_ssl_context, tmpdir):
cookies = session.cookie_jar.filter_cookies(cookies_url)
assert cookies["Cookie_1"].value == "Val_1"
assert cassette.play_count == 2
cassette.requests[1].headers["Cookie"] == "Cookie_1=Val_1"
assert cassette.requests[1].headers["Cookie"] == "Cookie_1=Val_1"
# Assert that it's ignoring expiration date
with vcr.use_cassette(tmp, record_mode=vcr.mode.NONE) as cassette:
cassette.responses[0]["headers"]["set-cookie"] = [
"Cookie_1=Val_1; Expires=Wed, 21 Oct 2015 07:28:00 GMT"
"Cookie_1=Val_1; Expires=Wed, 21 Oct 2015 07:28:00 GMT",
]
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
@@ -433,3 +440,19 @@ def test_not_allow_redirects(tmpdir, mockbin):
assert response.url.path == "/redirect/308/5"
assert response.status == 308
assert cassette.play_count == 1
def test_filter_query_parameters(tmpdir, httpbin):
url = httpbin + "?password=secret"
path = str(tmpdir.join("query_param_filter.yaml"))
with vcr.use_cassette(path, filter_query_parameters=["password"]) as cassette:
get(url)
assert "password" not in cassette.requests[0].url
assert "secret" not in cassette.requests[0].url
with open(path) as f:
cassette_content = f.read()
assert "password" not in cassette_content
assert "secret" not in cassette_content

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Basic tests for cassettes"""
# External imports

View File

@@ -1,15 +1,15 @@
from configparser import DuplicateSectionError
import pytest
import vcr
boto = pytest.importorskip("boto")
from configparser import DuplicateSectionError # NOQA
import boto # NOQA
import boto.iam # NOQA
from boto.s3.connection import S3Connection # NOQA
from boto.s3.key import Key # NOQA
import vcr # NOQA
import boto # noqa
import boto.iam # noqa
from boto.s3.connection import S3Connection # noqa
from boto.s3.key import Key # noqa
def test_boto_stubs(tmpdir):

View File

@@ -2,15 +2,14 @@ import os
import pytest
import vcr
boto3 = pytest.importorskip("boto3")
import boto3 # NOQA
import botocore # NOQA
import vcr # NOQA
import botocore # noqa
try:
from botocore import awsrequest # NOQA
from botocore import awsrequest # noqa
botocore_awsrequest = True
except ImportError:
@@ -20,12 +19,12 @@ except ImportError:
# https://github.com/boto/botocore/pull/1495
boto3_skip_vendored_requests = pytest.mark.skipif(
botocore_awsrequest,
reason="botocore version {ver} does not use vendored requests anymore.".format(ver=botocore.__version__),
reason=f"botocore version {botocore.__version__} does not use vendored requests anymore.",
)
boto3_skip_awsrequest = pytest.mark.skipif(
not botocore_awsrequest,
reason="botocore version {ver} still uses vendored requests.".format(ver=botocore.__version__),
reason=f"botocore version {botocore.__version__} still uses vendored requests.",
)
IAM_USER_NAME = "vcrpy"

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Basic tests about save behavior"""
# External imports

View File

@@ -5,7 +5,7 @@ from urllib.parse import urlencode
from urllib.request import Request, urlopen
import pytest
from assertions import assert_cassette_has_one_response, assert_is_json
from assertions import assert_cassette_has_one_response, assert_is_json_bytes
import vcr
@@ -45,13 +45,18 @@ def test_filter_basic_auth(tmpdir, httpbin):
def test_filter_querystring(tmpdir, httpbin):
url = httpbin.url + "/?foo=bar"
url = httpbin.url + "/?password=secret"
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, filter_query_parameters=["foo"]):
with vcr.use_cassette(cass_file, filter_query_parameters=["password"]):
urlopen(url)
with vcr.use_cassette(cass_file, filter_query_parameters=["foo"]) as cass:
with vcr.use_cassette(cass_file, filter_query_parameters=["password"]) as cass:
urlopen(url)
assert "foo" not in cass.requests[0].url
assert "password" not in cass.requests[0].url
assert "secret" not in cass.requests[0].url
with open(cass_file) as f:
cassette_content = f.read()
assert "password" not in cassette_content
assert "secret" not in cassette_content
def test_filter_post_data(tmpdir, httpbin):
@@ -105,7 +110,7 @@ def test_decompress_gzip(tmpdir, httpbin):
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
assert_is_json_bytes(decoded_response)
def test_decomptess_empty_body(tmpdir, httpbin):
@@ -129,7 +134,7 @@ def test_decompress_deflate(tmpdir, httpbin):
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
assert_is_json_bytes(decoded_response)
def test_decompress_regular(tmpdir, httpbin):
@@ -141,7 +146,7 @@ def test_decompress_regular(tmpdir, httpbin):
with vcr.use_cassette(cass_file) as cass:
resp = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(resp)
assert_is_json_bytes(resp)
def test_before_record_request_corruption(tmpdir, httpbin):

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Integration tests with httplib2"""
from urllib.parse import urlencode

View File

@@ -2,10 +2,11 @@ import os
import pytest
import vcr
asyncio = pytest.importorskip("asyncio")
httpx = pytest.importorskip("httpx")
import vcr # noqa: E402
from vcr.stubs.httpx_stubs import HTTPX_REDIRECT_PARAM # noqa: E402
@@ -65,8 +66,8 @@ class DoAsyncRequest(BaseDoRequest):
def client(self):
try:
return self._client
except AttributeError:
raise ValueError('To access async client, use "with do_request() as client"')
except AttributeError as e:
raise ValueError('To access async client, use "with do_request() as client"') from e
def __call__(self, *args, **kwargs):
if hasattr(self, "_loop"):
@@ -185,9 +186,7 @@ def test_redirect(mockbin, yml, do_request):
# looking request_info.
assert cassette_response.request.url == response.request.url
assert cassette_response.request.method == response.request.method
assert {k: v for k, v in cassette_response.request.headers.items()} == {
k: v for k, v in response.request.headers.items()
}
assert cassette_response.request.headers.items() == response.request.headers.items()
@pytest.mark.online
@@ -242,10 +241,10 @@ def test_behind_proxy(do_request):
@pytest.mark.online
def test_cookies(tmpdir, mockbin, do_request):
def client_cookies(client):
return [c for c in client.client.cookies]
return list(client.client.cookies)
def response_cookies(response):
return [c for c in response.cookies]
return list(response.cookies)
url = mockbin + "/bin/26148652-fe25-4f21-aaf5-689b5b4bf65f"
headers = {"cookie": "k1=v1;k2=v2"}

View File

@@ -28,9 +28,9 @@ def test_ignore_localhost(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
urlopen("http://localhost:{}/".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}/")
assert len(cass) == 0
urlopen("http://httpbin.org:{}/".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}/")
assert len(cass) == 1
@@ -38,9 +38,9 @@ def test_ignore_httpbin(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_hosts=["httpbin.org"]) as cass:
urlopen("http://httpbin.org:{}/".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}/")
assert len(cass) == 0
urlopen("http://localhost:{}/".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}/")
assert len(cass) == 1
@@ -48,8 +48,8 @@ def test_ignore_localhost_and_httpbin(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_hosts=["httpbin.org"], ignore_localhost=True) as cass:
urlopen("http://httpbin.org:{}".format(httpbin.port))
urlopen("http://localhost:{}".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}")
urlopen(f"http://localhost:{httpbin.port}")
assert len(cass) == 0
@@ -57,12 +57,12 @@ def test_ignore_localhost_twice(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
urlopen("http://localhost:{}".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}")
assert len(cass) == 0
urlopen("http://httpbin.org:{}".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}")
assert len(cass) == 1
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
assert len(cass) == 1
urlopen("http://localhost:{}".format(httpbin.port))
urlopen("http://httpbin.org:{}".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}")
urlopen(f"http://httpbin.org:{httpbin.port}")
assert len(cass) == 1

View File

@@ -72,7 +72,12 @@ def test_method_matcher(cassette, httpbin, httpbin_secure):
@pytest.mark.parametrize(
"uri", [DEFAULT_URI, "http://httpbin.org/get?p2=q2&p1=q1", "http://httpbin.org/get?p2=q2&p1=q1"]
"uri",
(
DEFAULT_URI,
"http://httpbin.org/get?p2=q2&p1=q1",
"http://httpbin.org/get?p2=q2&p1=q1",
),
)
def test_default_matcher_matches(cassette, uri, httpbin, httpbin_secure):
uri = _replace_httpbin(uri, httpbin, httpbin_secure)

View File

@@ -3,6 +3,7 @@ from urllib.request import urlopen
import pytest
import vcr
from vcr.errors import CannotOverwriteExistingCassetteException
def test_making_extra_request_raises_exception(tmpdir, httpbin):
@@ -18,5 +19,5 @@ def test_making_extra_request_raises_exception(tmpdir, httpbin):
with vcr.use_cassette(str(tmpdir.join("test.json")), match_on=["method"]):
assert urlopen(httpbin.url + "/status/200").getcode() == 200
assert urlopen(httpbin.url + "/status/201").getcode() == 201
with pytest.raises(Exception):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(httpbin.url + "/status/200")

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Test using a proxy."""
import http.server

View File

@@ -3,6 +3,7 @@ from urllib.request import urlopen
import pytest
import vcr
from vcr.errors import CannotOverwriteExistingCassetteException
def test_once_record_mode(tmpdir, httpbin):
@@ -18,7 +19,7 @@ def test_once_record_mode(tmpdir, httpbin):
# the first time, it's played from the cassette.
# but, try to access something else from the same cassette, and an
# exception is raised.
with pytest.raises(Exception):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(httpbin.url + "/get").read()
@@ -94,7 +95,7 @@ def test_new_episodes_record_mode_two_times(tmpdir, httpbin):
assert urlopen(url).read() == original_second_response
# now that we are back in once mode, this should raise
# an error.
with pytest.raises(Exception):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(url).read()
@@ -124,7 +125,7 @@ def test_none_record_mode(tmpdir, httpbin):
# raise hell.
testfile = str(tmpdir.join("recordmode.yml"))
with vcr.use_cassette(testfile, record_mode=vcr.mode.NONE):
with pytest.raises(Exception):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(httpbin.url).read()
@@ -140,5 +141,5 @@ def test_none_record_mode_with_existing_cassette(tmpdir, httpbin):
urlopen(httpbin.url).read()
assert cass.play_count == 1
# but if I try to hit the net, raise an exception.
with pytest.raises(Exception):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(httpbin.url + "/get").read()

View File

@@ -1,13 +1,14 @@
# -*- coding: utf-8 -*-
"""Tests for cassettes with custom persistence"""
# External imports
import os
from urllib.request import urlopen
import pytest
# Internal imports
import vcr
from vcr.persisters.filesystem import FilesystemPersister
from vcr.persisters.filesystem import CassetteDecodeError, CassetteNotFoundError, FilesystemPersister
class CustomFilesystemPersister:
@@ -25,6 +26,19 @@ class CustomFilesystemPersister:
FilesystemPersister.save_cassette(cassette_path, cassette_dict, serializer)
class BadPersister(FilesystemPersister):
"""A bad persister that raises different errors."""
@staticmethod
def load_cassette(cassette_path, serializer):
if "nonexistent" in cassette_path:
raise CassetteNotFoundError()
elif "encoding" in cassette_path:
raise CassetteDecodeError()
else:
raise ValueError("buggy persister")
def test_save_cassette_with_custom_persister(tmpdir, httpbin):
"""Ensure you can save a cassette using custom persister"""
my_vcr = vcr.VCR()
@@ -53,3 +67,22 @@ def test_load_cassette_with_custom_persister(tmpdir, httpbin):
with my_vcr.use_cassette(test_fixture, serializer="json"):
response = urlopen(httpbin.url).read()
assert b"difficult sometimes" in response
def test_load_cassette_persister_exception_handling(tmpdir, httpbin):
"""
Ensure expected errors from persister are swallowed while unexpected ones
are passed up the call stack.
"""
my_vcr = vcr.VCR()
my_vcr.register_persister(BadPersister)
with my_vcr.use_cassette("bad/nonexistent") as cass:
assert len(cass) == 0
with my_vcr.use_cassette("bad/encoding") as cass:
assert len(cass) == 0
with pytest.raises(ValueError):
with my_vcr.use_cassette("bad/buggy") as cass:
pass

View File

@@ -1,11 +1,10 @@
"""Test requests' interaction with vcr"""
import pytest
from assertions import assert_cassette_empty, assert_is_json
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
requests = pytest.importorskip("requests")
from requests.exceptions import ConnectionError # noqa E402
def test_status_code(httpbin_both, tmpdir):
@@ -114,22 +113,6 @@ def test_post_chunked_binary(tmpdir, httpbin):
assert req1 == req2
@pytest.mark.skipif("sys.version_info >= (3, 6)", strict=True, raises=ConnectionError)
def test_post_chunked_binary_secure(tmpdir, httpbin_secure):
"""Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str."""
data1 = iter([b"data", b"to", b"send"])
data2 = iter([b"data", b"to", b"send"])
url = httpbin_secure.url + "/post"
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
req1 = requests.post(url, data1).content
print(req1)
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
req2 = requests.post(url, data2).content
assert req1 == req2
def test_redirects(tmpdir, httpbin_both):
"""Ensure that we can handle redirects"""
url = httpbin_both + "/redirect-to?url=bytes/1024"
@@ -176,7 +159,7 @@ def test_gzip__decode_compressed_response_false(tmpdir, httpbin_both):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = requests.get(httpbin_both + "/gzip")
assert response.headers["content-encoding"] == "gzip" # i.e. not removed
assert_is_json(response.content) # i.e. uncompressed bytes
assert_is_json_bytes(response.content) # i.e. uncompressed bytes
def test_gzip__decode_compressed_response_true(tmpdir, httpbin_both):
@@ -187,7 +170,8 @@ def test_gzip__decode_compressed_response_true(tmpdir, httpbin_both):
assert expected_response.headers["content-encoding"] == "gzip" # self-test
with vcr.use_cassette(
str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True
str(tmpdir.join("decode_compressed.yaml")),
decode_compressed_response=True,
) as cassette:
r = requests.get(url)
assert r.headers["content-encoding"] == "gzip" # i.e. not removed

View File

@@ -2,7 +2,7 @@ import http.client as httplib
import json
import zlib
from assertions import assert_is_json
from assertions import assert_is_json_bytes
import vcr
@@ -84,7 +84,7 @@ def test_original_decoded_response_is_not_modified(tmpdir, httpbin):
inside = conn.getresponse()
assert "content-encoding" not in inside.headers
assert_is_json(inside.read())
assert_is_json_bytes(inside.read())
def _make_before_record_response(fields, replacement="[REDACTED]"):
@@ -120,8 +120,8 @@ def test_original_response_is_not_modified_by_before_filter(tmpdir, httpbin):
# The scrubbed field should be the same, because no cassette existed.
# Furthermore, the responses should be identical.
inside_body = json.loads(inside.read().decode("utf-8"))
outside_body = json.loads(outside.read().decode("utf-8"))
inside_body = json.loads(inside.read())
outside_body = json.loads(outside.read())
assert not inside_body[field_to_scrub] == replacement
assert inside_body[field_to_scrub] == outside_body[field_to_scrub]
@@ -131,5 +131,5 @@ def test_original_response_is_not_modified_by_before_filter(tmpdir, httpbin):
conn.request("GET", "/get")
inside = conn.getresponse()
inside_body = json.loads(inside.read().decode("utf-8"))
inside_body = json.loads(inside.read())
assert inside_body[field_to_scrub] == replacement

View File

@@ -1,10 +1,9 @@
# -*- coding: utf-8 -*-
"""Test requests' interaction with vcr"""
import json
import pytest
from assertions import assert_cassette_empty, assert_is_json
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from vcr.errors import CannotOverwriteExistingCassetteException
@@ -195,11 +194,11 @@ def test_gzip(get_client, tmpdir, scheme):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = yield get(get_client(), url, **kwargs)
assert_is_json(response.body)
assert_is_json_bytes(response.body)
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cass:
response = yield get(get_client(), url, **kwargs)
assert_is_json(response.body)
assert_is_json_bytes(response.body)
assert 1 == cass.play_count
@@ -221,7 +220,7 @@ def test_unsupported_features_raises_in_future(get_client, tmpdir):
supported is raised inside the future."""
def callback(chunk):
assert False, "Did not expect to be called."
raise AssertionError("Did not expect to be called.")
with vcr.use_cassette(str(tmpdir.join("invalid.yaml"))):
future = get(get_client(), "http://httpbin.org", streaming_callback=callback)
@@ -239,11 +238,14 @@ def test_unsupported_features_raise_error_disabled(get_client, tmpdir):
supported is not raised if raise_error=False."""
def callback(chunk):
assert False, "Did not expect to be called."
raise AssertionError("Did not expect to be called.")
with vcr.use_cassette(str(tmpdir.join("invalid.yaml"))):
response = yield get(
get_client(), "http://httpbin.org", streaming_callback=callback, raise_error=False
get_client(),
"http://httpbin.org",
streaming_callback=callback,
raise_error=False,
)
assert "not yet supported by VCR" in str(response.error)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Integration tests with urllib2"""
import ssl

View File

@@ -4,7 +4,7 @@
import pytest
import pytest_httpbin
from assertions import assert_cassette_empty, assert_is_json
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from vcr.patch import force_reset
@@ -16,7 +16,8 @@ urllib3 = pytest.importorskip("urllib3")
@pytest.fixture(scope="module")
def verify_pool_mgr():
return urllib3.PoolManager(
cert_reqs="CERT_REQUIRED", ca_certs=pytest_httpbin.certs.where() # Force certificate check.
cert_reqs="CERT_REQUIRED",
ca_certs=pytest_httpbin.certs.where(), # Force certificate check.
)
@@ -136,10 +137,10 @@ def test_gzip(tmpdir, httpbin_both, verify_pool_mgr):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = verify_pool_mgr.request("GET", url)
assert_is_json(response.data)
assert_is_json_bytes(response.data)
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
assert_is_json(response.data)
assert_is_json_bytes(response.data)
def test_https_with_cert_validation_disabled(tmpdir, httpbin_secure, pool_mgr):

View File

@@ -5,9 +5,9 @@ from xmlrpc.server import SimpleXMLRPCServer
import pytest
requests = pytest.importorskip("requests")
import vcr
import vcr # NOQA
requests = pytest.importorskip("requests")
def test_domain_redirect():

View File

@@ -20,15 +20,31 @@ def test_cassette_load(tmpdir):
yaml.dump(
{
"interactions": [
{"request": {"body": "", "uri": "foo", "method": "GET", "headers": {}}, "response": "bar"}
]
}
)
{
"request": {"body": "", "uri": "foo", "method": "GET", "headers": {}},
"response": "bar",
},
],
},
),
)
a_cassette = Cassette.load(path=str(a_file))
assert len(a_cassette) == 1
def test_cassette_load_nonexistent():
a_cassette = Cassette.load(path="something/nonexistent.yml")
assert len(a_cassette) == 0
def test_cassette_load_invalid_encoding(tmpdir):
a_file = tmpdir.join("invalid_encoding.yml")
with open(a_file, "wb") as fd:
fd.write(b"\xda")
a_cassette = Cassette.load(path=str(a_file))
assert len(a_cassette) == 0
def test_cassette_not_played():
a = Cassette("test")
assert not a.play_count
@@ -97,7 +113,7 @@ def make_get_request():
@mock.patch("vcr.stubs.VCRHTTPResponse")
def test_function_decorated_with_use_cassette_can_be_invoked_multiple_times(*args):
decorated_function = Cassette.use(path="test")(make_get_request)
for i in range(4):
for _ in range(4):
decorated_function()
@@ -143,7 +159,7 @@ def test_cassette_allow_playback_repeats():
a = Cassette("test", allow_playback_repeats=True)
a.append("foo", "bar")
a.append("other", "resp")
for x in range(10):
for _ in range(10):
assert a.play_response("foo") == "bar"
assert a.play_count == 10
assert a.all_played is False
@@ -205,7 +221,7 @@ def test_nesting_cassette_context_managers(*args):
with contextlib.ExitStack() as exit_stack:
first_cassette = exit_stack.enter_context(Cassette.use(path="test"))
exit_stack.enter_context(
mock.patch.object(first_cassette, "play_response", return_value=first_response)
mock.patch.object(first_cassette, "play_response", return_value=first_response),
)
assert_get_response_body_is("first_response")

View File

@@ -55,15 +55,18 @@ from vcr.cassette import Cassette
],
)
def test_CannotOverwriteExistingCassetteException_get_message(
mock_find_requests_with_most_matches, most_matches, expected_message
mock_find_requests_with_most_matches,
most_matches,
expected_message,
):
mock_find_requests_with_most_matches.return_value = most_matches
cassette = Cassette("path")
failed_request = "request"
exception_message = errors.CannotOverwriteExistingCassetteException._get_message(cassette, "request")
expected = (
"Can't overwrite existing cassette (%r) in your current record mode (%r).\n"
"No match for the request (%r) was found.\n"
"%s" % (cassette._path, cassette.record_mode, failed_request, expected_message)
f"Can't overwrite existing cassette ({cassette._path!r}) "
f"in your current record mode ({cassette.record_mode!r}).\n"
f"No match for the request ({failed_request!r}) was found.\n"
f"{expected_message}"
)
assert exception_message == expected

View File

@@ -197,7 +197,7 @@ def test_replace_json_post_data_parameters():
("six", "doesntexist"),
],
)
request_data = json.loads(request.body.decode("utf-8"))
request_data = json.loads(request.body)
expected_data = json.loads('{"one": "keep", "three": "tada", "four": "SHOUT"}')
assert request_data == expected_data
@@ -208,8 +208,8 @@ def test_remove_json_post_data_parameters():
request = Request("POST", "http://google.com", body, {})
request.headers["Content-Type"] = "application/json"
remove_post_data_parameters(request, ["id"])
request_body_json = json.loads(request.body.decode("utf-8"))
expected_json = json.loads(b'{"foo": "bar", "baz": "qux"}'.decode("utf-8"))
request_body_json = json.loads(request.body)
expected_json = json.loads(b'{"foo": "bar", "baz": "qux"}')
assert request_body_json == expected_json

View File

@@ -63,6 +63,9 @@ boto3_bytes_headers = {
"Expect": b"100-continue",
"Content-Length": "21",
}
chunked_headers = {
"Transfer-Encoding": "chunked",
}
@pytest.mark.parametrize(
@@ -74,10 +77,16 @@ boto3_bytes_headers = {
),
(
request.Request(
"POST", "http://host.com/", "a=1&b=2", {"Content-Type": "application/x-www-form-urlencoded"}
"POST",
"http://host.com/",
"a=1&b=2",
{"Content-Type": "application/x-www-form-urlencoded"},
),
request.Request(
"POST", "http://host.com/", "b=2&a=1", {"Content-Type": "application/x-www-form-urlencoded"}
"POST",
"http://host.com/",
"b=2&a=1",
{"Content-Type": "application/x-www-form-urlencoded"},
),
),
(
@@ -86,23 +95,38 @@ boto3_bytes_headers = {
),
(
request.Request(
"POST", "http://host.com/", "a=1&b=2", {"Content-Type": "application/x-www-form-urlencoded"}
"POST",
"http://host.com/",
"a=1&b=2",
{"Content-Type": "application/x-www-form-urlencoded"},
),
request.Request(
"POST", "http://host.com/", "b=2&a=1", {"Content-Type": "application/x-www-form-urlencoded"}
"POST",
"http://host.com/",
"b=2&a=1",
{"Content-Type": "application/x-www-form-urlencoded"},
),
),
(
request.Request(
"POST", "http://host.com/", '{"a": 1, "b": 2}', {"Content-Type": "application/json"}
"POST",
"http://host.com/",
'{"a": 1, "b": 2}',
{"Content-Type": "application/json"},
),
request.Request(
"POST", "http://host.com/", '{"b": 2, "a": 1}', {"content-type": "application/json"}
"POST",
"http://host.com/",
'{"b": 2, "a": 1}',
{"content-type": "application/json"},
),
),
(
request.Request(
"POST", "http://host.com/", req1_body, {"User-Agent": "xmlrpclib", "Content-Type": "text/xml"}
"POST",
"http://host.com/",
req1_body,
{"User-Agent": "xmlrpclib", "Content-Type": "text/xml"},
),
request.Request(
"POST",
@@ -113,10 +137,16 @@ boto3_bytes_headers = {
),
(
request.Request(
"POST", "http://host.com/", '{"a": 1, "b": 2}', {"Content-Type": "application/json"}
"POST",
"http://host.com/",
'{"a": 1, "b": 2}',
{"Content-Type": "application/json"},
),
request.Request(
"POST", "http://host.com/", '{"b": 2, "a": 1}', {"content-type": "application/json"}
"POST",
"http://host.com/",
'{"b": 2, "a": 1}',
{"content-type": "application/json"},
),
),
(
@@ -124,6 +154,36 @@ boto3_bytes_headers = {
request.Request("POST", "http://aws.custom.com/", b"123", boto3_bytes_headers),
request.Request("POST", "http://aws.custom.com/", b"123", boto3_bytes_headers),
),
(
# chunked transfer encoding: decoded bytes versus encoded bytes
request.Request("POST", "scheme1://host1.test/", b"123456789_123456", chunked_headers),
request.Request(
"GET",
"scheme2://host2.test/",
b"10\r\n123456789_123456\r\n0\r\n\r\n",
chunked_headers,
),
),
(
# chunked transfer encoding: bytes iterator versus string iterator
request.Request(
"POST",
"scheme1://host1.test/",
iter([b"123456789_", b"123456"]),
chunked_headers,
),
request.Request("GET", "scheme2://host2.test/", iter(["123456789_", "123456"]), chunked_headers),
),
(
# chunked transfer encoding: bytes iterator versus single byte iterator
request.Request(
"POST",
"scheme1://host1.test/",
iter([b"123456789_", b"123456"]),
chunked_headers,
),
request.Request("GET", "scheme2://host2.test/", iter(b"123456789_123456"), chunked_headers),
),
],
)
def test_body_matcher_does_match(r1, r2):
@@ -139,10 +199,16 @@ def test_body_matcher_does_match(r1, r2):
),
(
request.Request(
"POST", "http://host.com/", '{"a": 1, "b": 3}', {"Content-Type": "application/json"}
"POST",
"http://host.com/",
'{"a": 1, "b": 3}',
{"Content-Type": "application/json"},
),
request.Request(
"POST", "http://host.com/", '{"b": 2, "a": 1}', {"content-type": "application/json"}
"POST",
"http://host.com/",
'{"b": 2, "a": 1}',
{"content-type": "application/json"},
),
),
(

View File

@@ -17,9 +17,9 @@ def test_try_migrate_with_json(tmpdir):
cassette = tmpdir.join("cassette.json").strpath
shutil.copy("tests/fixtures/migration/old_cassette.json", cassette)
assert vcr.migration.try_migrate(cassette)
with open("tests/fixtures/migration/new_cassette.json", "r") as f:
with open("tests/fixtures/migration/new_cassette.json") as f:
expected_json = json.load(f)
with open(cassette, "r") as f:
with open(cassette) as f:
actual_json = json.load(f)
assert actual_json == expected_json
@@ -28,9 +28,9 @@ def test_try_migrate_with_yaml(tmpdir):
cassette = tmpdir.join("cassette.yaml").strpath
shutil.copy("tests/fixtures/migration/old_cassette.yaml", cassette)
assert vcr.migration.try_migrate(cassette)
with open("tests/fixtures/migration/new_cassette.yaml", "r") as f:
with open("tests/fixtures/migration/new_cassette.yaml") as f:
expected_yaml = yaml.load(f, Loader=Loader)
with open(cassette, "r") as f:
with open(cassette) as f:
actual_yaml = yaml.load(f, Loader=Loader)
assert actual_yaml == expected_yaml

View File

@@ -1,4 +1,3 @@
# coding: UTF-8
import io
from vcr.stubs import VCRHTTPResponse
@@ -89,11 +88,11 @@ def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
b"different types of cancer cells. Recently, the first HDACi was\n "
b"approved for the "
b"treatment of cutaneous T cell lymphomas. Most HDACi currently in\n "
b"clinical "
b"clinical ",
},
}
vcr_response = VCRHTTPResponse(recorded_response)
handle = io.TextIOWrapper(vcr_response, encoding="utf-8")
handle = iter(handle)
articles = [line for line in handle]
articles = list(handle)
assert len(articles) > 1

View File

@@ -1,4 +1,3 @@
# -*- encoding: utf-8 -*-
from unittest import mock
import pytest
@@ -9,24 +8,24 @@ from vcr.serializers import compat, jsonserializer, yamlserializer
def test_deserialize_old_yaml_cassette():
with open("tests/fixtures/migration/old_cassette.yaml", "r") as f:
with open("tests/fixtures/migration/old_cassette.yaml") as f:
with pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
def test_deserialize_old_json_cassette():
with open("tests/fixtures/migration/old_cassette.json", "r") as f:
with open("tests/fixtures/migration/old_cassette.json") as f:
with pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
def test_deserialize_new_yaml_cassette():
with open("tests/fixtures/migration/new_cassette.yaml", "r") as f:
with open("tests/fixtures/migration/new_cassette.yaml") as f:
deserialize(f.read(), yamlserializer)
def test_deserialize_new_json_cassette():
with open("tests/fixtures/migration/new_cassette.json", "r") as f:
with open("tests/fixtures/migration/new_cassette.json") as f:
deserialize(f.read(), jsonserializer)

View File

@@ -100,9 +100,9 @@ def test_vcr_kwargs_cassette_dir():
pass
def _get_vcr_kwargs(self):
return dict(
record_mode="new_episodes",
)
return {
"record_mode": "new_episodes",
}
_get_cassette_library_dir = MagicMock(return_value="/testing")
@@ -117,9 +117,9 @@ def test_vcr_kwargs_cassette_dir():
pass
def _get_vcr_kwargs(self):
return dict(
cassette_library_dir="/testing",
)
return {
"cassette_library_dir": "/testing",
}
_get_cassette_library_dir = MagicMock(return_value="/ignored")

View File

@@ -15,7 +15,8 @@ def test_vcr_use_cassette():
record_mode = mock.Mock()
test_vcr = VCR(record_mode=record_mode)
with mock.patch(
"vcr.cassette.Cassette.load", return_value=mock.MagicMock(inject=False)
"vcr.cassette.Cassette.load",
return_value=mock.MagicMock(inject=False),
) as mock_cassette_load:
@test_vcr.use_cassette("test")
@@ -71,16 +72,19 @@ def test_vcr_before_record_request_params():
# Test filter_headers
request = Request(
"GET", base_path + "?foo=bar", "", {"cookie": "test", "other": "fun", "bert": "nobody"}
"GET",
base_path + "?foo=bar",
"",
{"cookie": "test", "other": "fun", "bert": "nobody"},
)
assert cassette.filter_request(request).headers == {"other": "fun", "bert": "ernie"}
# Test ignore_hosts
request = Request("GET", "http://www.test.com" + "?foo=bar", "", {"cookie": "test", "other": "fun"})
request = Request("GET", "http://www.test.com?foo=bar", "", {"cookie": "test", "other": "fun"})
assert cassette.filter_request(request) is None
# Test ignore_localhost
request = Request("GET", "http://localhost:8000" + "?foo=bar", "", {"cookie": "test", "other": "fun"})
request = Request("GET", "http://localhost:8000?foo=bar", "", {"cookie": "test", "other": "fun"})
assert cassette.filter_request(request) is None
with test_vcr.use_cassette("test", before_record_request=None) as cassette:
@@ -259,7 +263,9 @@ def test_cassette_library_dir_with_decoration_and_super_explicit_path():
def test_cassette_library_dir_with_path_transformer():
library_dir = "/library_dir"
vcr = VCR(
inject_cassette=True, cassette_library_dir=library_dir, path_transformer=lambda path: path + ".json"
inject_cassette=True,
cassette_library_dir=library_dir,
path_transformer=lambda path: path + ".json",
)
@vcr.use_cassette()
@@ -362,7 +368,7 @@ del test_dynamically_added
def test_path_class_as_cassette():
path = Path(__file__).parent.parent.joinpath(
"integration/cassettes/test_httpx_test_test_behind_proxy.yml"
"integration/cassettes/test_httpx_test_test_behind_proxy.yml",
)
with use_cassette(path):
pass

View File

@@ -8,8 +8,4 @@ def test_vcr_import_deprecation(recwarn):
import vcr # noqa: F401
if sys.version_info[0] == 2:
assert len(recwarn) == 1
assert issubclass(recwarn[0].category, DeprecationWarning)
else:
assert len(recwarn) == 0
assert len(recwarn) == 0

48
tox.ini
View File

@@ -3,7 +3,7 @@ skip_missing_interpreters=true
envlist =
cov-clean,
lint,
{py37,py38,py39,py310,py311}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
{py38,py39,py310,py311}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
{py310,py311}-{requests-urllib3-2,urllib3-2},
{pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},
{py310}-httpx019,
@@ -12,7 +12,6 @@ envlist =
[gh-actions]
python =
3.7: py37
3.8: py38
3.9: py39
3.10: py310, lint
@@ -38,38 +37,13 @@ skipsdist = True
commands =
black --version
black --check --diff .
isort --version
isort . --check --diff
flake8 --version
flake8 --exclude=./docs/conf.py,./.tox/,./venv/
pyflakes ./docs/conf.py
ruff --version
ruff check .
deps =
flake8
black
isort
ruff
basepython = python3.10
[testenv:docs]
# Running sphinx from inside the "docs" directory
# ensures it will not pick up any stray files that might
# get into a virtual environment under the top-level directory
# or other artifacts under build/
changedir = docs
# The only dependency is sphinx
# If we were using extensions packaged separately,
# we would specify them here.
# A better practice is to specify a specific version of sphinx.
deps =
sphinx
sphinx_rtd_theme
# This is the sphinx command to generate HTML.
# In other circumstances, we might want to generate a PDF or an ebook
commands =
sphinx-build -W -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
# We use Python 3.7. Tox sometimes tries to autodetect it based on the name of
# the testenv, but "docs" does not give useful clues so we have to be explicit.
basepython = python3.7
[testenv]
# Need to use develop install so that paths
# for aggregate code coverage combine
@@ -94,19 +68,15 @@ deps =
aiohttp: pytest-asyncio
aiohttp: pytest-aiohttp
httpx: httpx
{py37,py38,py39,py310}-{httpx}: httpx
{py37,py38,py39,py310}-{httpx}: pytest-asyncio
{py38,py39,py310}-{httpx}: httpx
{py38,py39,py310}-{httpx}: pytest-asyncio
httpx: httpx>0.19
# httpx==0.19 is the latest version that supports allow_redirects, newer versions use follow_redirects
httpx019: httpx==0.19
{py37,py38,py39,py310}-{httpx}: pytest-asyncio
{py38,py39,py310}-{httpx}: pytest-asyncio
depends =
lint,{py37,py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py37,py38,py39,py310,py311}-{aiohttp},{py37,py38,py39,py310,py311}-{httpx}: cov-clean
cov-report: lint,{py37,py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py37,py38,py39,py310,py311}-{aiohttp}
lint,{py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311}-{aiohttp},{py38,py39,py310,py311}-{httpx}: cov-clean
cov-report: lint,{py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311}-{aiohttp}
passenv =
AWS_ACCESS_KEY_ID
AWS_DEFAULT_REGION
AWS_SECRET_ACCESS_KEY
[flake8]
max_line_length = 110

View File

@@ -2,9 +2,9 @@ import logging
from logging import NullHandler
from .config import VCR
from .record_mode import RecordMode as mode # noqa import is not used in this file
from .record_mode import RecordMode as mode # noqa: F401
__version__ = "4.4.0"
__version__ = "5.1.0"
logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -1,3 +1,3 @@
async def handle_coroutine(vcr, fn): # noqa: E999
async def handle_coroutine(vcr, fn):
with vcr as cassette:
return await fn(cassette) # noqa: E999
return await fn(cassette)

View File

@@ -4,6 +4,7 @@ import copy
import inspect
import logging
import sys
from asyncio import iscoroutinefunction
import wrapt
@@ -11,19 +12,11 @@ from ._handle_coroutine import handle_coroutine
from .errors import UnhandledHTTPRequestError
from .matchers import get_matchers_results, method, requests_match, uri
from .patch import CassettePatcherBuilder
from .persisters.filesystem import FilesystemPersister
from .persisters.filesystem import CassetteDecodeError, CassetteNotFoundError, FilesystemPersister
from .record_mode import RecordMode
from .serializers import yamlserializer
from .util import partition_dict
try:
from asyncio import iscoroutinefunction
except ImportError:
def iscoroutinefunction(*args, **kwargs):
return False
log = logging.getLogger(__name__)
@@ -81,7 +74,8 @@ class CassetteContextDecorator:
# pass
assert self.__finish is None, "Cassette already open."
other_kwargs, cassette_kwargs = partition_dict(
lambda key, _: key in self._non_cassette_arguments, self._args_getter()
lambda key, _: key in self._non_cassette_arguments,
self._args_getter(),
)
if other_kwargs.get("path_transformer"):
transformer = other_kwargs["path_transformer"]
@@ -280,7 +274,7 @@ class Cassette:
return response
# The cassette doesn't contain the request asked for.
raise UnhandledHTTPRequestError(
"The cassette (%r) doesn't contain the request (%r) asked for" % (self._path, request)
f"The cassette ({self._path!r}) doesn't contain the request ({request!r}) asked for",
)
def responses_of(self, request):
@@ -295,7 +289,7 @@ class Cassette:
return responses
# The cassette doesn't contain the request asked for.
raise UnhandledHTTPRequestError(
"The cassette (%r) doesn't contain the request (%r) asked for" % (self._path, request)
f"The cassette ({self._path!r}) doesn't contain the request ({request!r}) asked for",
)
def rewind(self):
@@ -314,7 +308,7 @@ class Cassette:
"""
best_matches = []
request = self._before_record_request(request)
for index, (stored_request, response) in enumerate(self.data):
for _, (stored_request, _) in enumerate(self.data):
successes, fails = get_matchers_results(request, stored_request, self._match_on)
best_matches.append((len(successes), stored_request, successes, fails))
best_matches.sort(key=lambda t: t[0], reverse=True)
@@ -352,11 +346,11 @@ class Cassette:
self.append(request, response)
self.dirty = False
self.rewound = True
except ValueError:
except (CassetteDecodeError, CassetteNotFoundError):
pass
def __str__(self):
return "<Cassette containing {} recorded response(s)>".format(len(self))
return f"<Cassette containing {len(self)} recorded response(s)>"
def __len__(self):
"""Return the number of request,response pairs stored in here"""
@@ -364,7 +358,7 @@ class Cassette:
def __contains__(self, request):
"""Return whether or not a request has been stored"""
for index, response in self._responses(request):
for index, _ in self._responses(request):
if self.play_counts[index] == 0 or self.allow_playback_repeats:
return True
return False

View File

@@ -6,8 +6,6 @@ import types
from collections import abc as collections_abc
from pathlib import Path
import six
from . import filters, matchers
from .cassette import Cassette
from .persisters.filesystem import FilesystemPersister
@@ -88,7 +86,7 @@ class VCR:
try:
serializer = self.serializers[serializer_name]
except KeyError:
raise KeyError("Serializer {} doesn't exist or isn't registered".format(serializer_name))
raise KeyError(f"Serializer {serializer_name} doesn't exist or isn't registered") from None
return serializer
def _get_matchers(self, matcher_names):
@@ -97,7 +95,7 @@ class VCR:
for m in matcher_names:
matchers.append(self.matchers[m])
except KeyError:
raise KeyError("Matcher {} doesn't exist or isn't registered".format(m))
raise KeyError(f"Matcher {m} doesn't exist or isn't registered") from None
return matchers
def use_cassette(self, path=None, **kwargs):
@@ -162,7 +160,8 @@ class VCR:
def _build_before_record_response(self, options):
before_record_response = options.get("before_record_response", self.before_record_response)
decode_compressed_response = options.get(
"decode_compressed_response", self.decode_compressed_response
"decode_compressed_response",
self.decode_compressed_response,
)
filter_functions = []
if decode_compressed_response:
@@ -186,10 +185,12 @@ class VCR:
filter_headers = options.get("filter_headers", self.filter_headers)
filter_query_parameters = options.get("filter_query_parameters", self.filter_query_parameters)
filter_post_data_parameters = options.get(
"filter_post_data_parameters", self.filter_post_data_parameters
"filter_post_data_parameters",
self.filter_post_data_parameters,
)
before_record_request = options.get(
"before_record_request", options.get("before_record", self.before_record_request)
"before_record_request",
options.get("before_record", self.before_record_request),
)
ignore_hosts = options.get("ignore_hosts", self.ignore_hosts)
ignore_localhost = options.get("ignore_localhost", self.ignore_localhost)
@@ -199,12 +200,12 @@ class VCR:
if filter_query_parameters:
replacements = [p if isinstance(p, tuple) else (p, None) for p in filter_query_parameters]
filter_functions.append(
functools.partial(filters.replace_query_parameters, replacements=replacements)
functools.partial(filters.replace_query_parameters, replacements=replacements),
)
if filter_post_data_parameters:
replacements = [p if isinstance(p, tuple) else (p, None) for p in filter_post_data_parameters]
filter_functions.append(
functools.partial(filters.replace_post_data_parameters, replacements=replacements)
functools.partial(filters.replace_post_data_parameters, replacements=replacements),
)
hosts_to_ignore = set(ignore_hosts)
@@ -253,5 +254,5 @@ class VCR:
def test_case(self, predicate=None):
predicate = predicate or self.is_test_method
# TODO: Remove this reference to `six` in favor of the Python3 equivalent
return six.with_metaclass(auto_decorate(self.use_cassette, predicate))
metaclass = auto_decorate(self.use_cassette, predicate)
return metaclass("temporary_class", (), {})

View File

@@ -13,30 +13,29 @@ class CannotOverwriteExistingCassetteException(Exception):
best_matches = cassette.find_requests_with_most_matches(failed_request)
if best_matches:
# Build a comprehensible message to put in the exception.
best_matches_msg = "Found {} similar requests with {} different matcher(s) :\n".format(
len(best_matches), len(best_matches[0][2])
best_matches_msg = (
f"Found {len(best_matches)} similar requests "
f"with {len(best_matches[0][2])} different matcher(s) :\n"
)
for idx, best_match in enumerate(best_matches, start=1):
request, succeeded_matchers, failed_matchers_assertion_msgs = best_match
best_matches_msg += (
"\n%s - (%r).\n"
"Matchers succeeded : %s\n"
"Matchers failed :\n" % (idx, request, succeeded_matchers)
f"\n{idx} - ({request!r}).\n"
f"Matchers succeeded : {succeeded_matchers}\n"
"Matchers failed :\n"
)
for failed_matcher, assertion_msg in failed_matchers_assertion_msgs:
best_matches_msg += "%s - assertion failure :\n" "%s\n" % (failed_matcher, assertion_msg)
best_matches_msg += f"{failed_matcher} - assertion failure :\n{assertion_msg}\n"
else:
best_matches_msg = "No similar requests, that have not been played, found."
return (
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r).\n"
"No match for the request (%r) was found.\n"
"%s" % (cassette._path, cassette.record_mode, failed_request, best_matches_msg)
f"Can't overwrite existing cassette ({cassette._path!r}) in "
f"your current record mode ({cassette.record_mode!r}).\n"
f"No match for the request ({failed_request!r}) was found.\n"
f"{best_matches_msg}"
)
class UnhandledHTTPRequestError(KeyError):
"""Raised when a cassette does not contain the request we want."""
pass

View File

@@ -95,7 +95,7 @@ def replace_post_data_parameters(request, replacements):
new_body[k] = rv
request.body = new_body
elif request.headers.get("Content-Type") == "application/json":
json_data = json.loads(request.body.decode("utf-8"))
json_data = json.loads(request.body)
for k, rv in replacements.items():
if k in json_data:
ov = json_data.pop(k)

View File

@@ -2,45 +2,49 @@ import json
import logging
import urllib
import xmlrpc.client
from string import hexdigits
from typing import List, Set
from .util import read_body
_HEXDIG_CODE_POINTS: Set[int] = {ord(s.encode("ascii")) for s in hexdigits}
log = logging.getLogger(__name__)
def method(r1, r2):
if r1.method != r2.method:
raise AssertionError("{} != {}".format(r1.method, r2.method))
raise AssertionError(f"{r1.method} != {r2.method}")
def uri(r1, r2):
if r1.uri != r2.uri:
raise AssertionError("{} != {}".format(r1.uri, r2.uri))
raise AssertionError(f"{r1.uri} != {r2.uri}")
def host(r1, r2):
if r1.host != r2.host:
raise AssertionError("{} != {}".format(r1.host, r2.host))
raise AssertionError(f"{r1.host} != {r2.host}")
def scheme(r1, r2):
if r1.scheme != r2.scheme:
raise AssertionError("{} != {}".format(r1.scheme, r2.scheme))
raise AssertionError(f"{r1.scheme} != {r2.scheme}")
def port(r1, r2):
if r1.port != r2.port:
raise AssertionError("{} != {}".format(r1.port, r2.port))
raise AssertionError(f"{r1.port} != {r2.port}")
def path(r1, r2):
if r1.path != r2.path:
raise AssertionError("{} != {}".format(r1.path, r2.path))
raise AssertionError(f"{r1.path} != {r2.path}")
def query(r1, r2):
if r1.query != r2.query:
raise AssertionError("{} != {}".format(r1.query, r2.query))
raise AssertionError(f"{r1.query} != {r2.query}")
def raw_body(r1, r2):
@@ -49,17 +53,23 @@ def raw_body(r1, r2):
def body(r1, r2):
transformer = _get_transformer(r1)
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
if transformer(read_body(r1)) != transformer(read_body(r2)):
transformers = list(_get_transformers(r1))
if transformers != list(_get_transformers(r2)):
transformers = []
b1 = read_body(r1)
b2 = read_body(r2)
for transform in transformers:
b1 = transform(b1)
b2 = transform(b2)
if b1 != b2:
raise AssertionError
def headers(r1, r2):
if r1.headers != r2.headers:
raise AssertionError("{} != {}".format(r1.headers, r2.headers))
raise AssertionError(f"{r1.headers} != {r2.headers}")
def _header_checker(value, header="Content-Type"):
@@ -72,17 +82,71 @@ def _header_checker(value, header="Content-Type"):
return checker
def _dechunk(body):
if isinstance(body, str):
body = body.encode("utf-8")
elif isinstance(body, bytearray):
body = bytes(body)
elif hasattr(body, "__iter__"):
body = list(body)
if body:
if isinstance(body[0], str):
body = ("".join(body)).encode("utf-8")
elif isinstance(body[0], bytes):
body = b"".join(body)
elif isinstance(body[0], int):
body = bytes(body)
else:
raise ValueError(f"Body chunk type {type(body[0])} not supported")
else:
body = None
if not isinstance(body, bytes):
return body
# Now decode chunked data format (https://en.wikipedia.org/wiki/Chunked_transfer_encoding)
# Example input: b"45\r\n<69 bytes>\r\n0\r\n\r\n" where int(b"45", 16) == 69.
CHUNK_GAP = b"\r\n"
BODY_LEN: int = len(body)
chunks: List[bytes] = []
pos: int = 0
while True:
for i in range(pos, BODY_LEN):
if body[i] not in _HEXDIG_CODE_POINTS:
break
if i == 0 or body[i : i + len(CHUNK_GAP)] != CHUNK_GAP:
if pos == 0:
return body # i.e. assume non-chunk data
raise ValueError("Malformed chunked data")
size_bytes = int(body[pos:i], 16)
if size_bytes == 0: # i.e. well-formed ending
return b"".join(chunks)
chunk_data_first = i + len(CHUNK_GAP)
chunk_data_after_last = chunk_data_first + size_bytes
if body[chunk_data_after_last : chunk_data_after_last + len(CHUNK_GAP)] != CHUNK_GAP:
raise ValueError("Malformed chunked data")
chunk_data = body[chunk_data_first:chunk_data_after_last]
chunks.append(chunk_data)
pos = chunk_data_after_last + len(CHUNK_GAP)
def _transform_json(body):
# Request body is always a byte string, but json.loads() wants a text
# string. RFC 7159 says the default encoding is UTF-8 (although UTF-16
# and UTF-32 are also allowed: hmmmmm).
if body:
return json.loads(body.decode("utf-8"))
return json.loads(body)
_xml_header_checker = _header_checker("text/xml")
_xmlrpc_header_checker = _header_checker("xmlrpc", header="User-Agent")
_checker_transformer_pairs = (
(_header_checker("chunked", header="Transfer-Encoding"), _dechunk),
(
_header_checker("application/x-www-form-urlencoded"),
lambda body: urllib.parse.parse_qs(body.decode("ascii")),
@@ -92,22 +156,16 @@ _checker_transformer_pairs = (
)
def _identity(x):
return x
def _get_transformer(request):
def _get_transformers(request):
for checker, transformer in _checker_transformer_pairs:
if checker(request.headers):
return transformer
else:
return _identity
yield transformer
def requests_match(r1, r2, matchers):
successes, failures = get_matchers_results(r1, r2, matchers)
if failures:
log.debug("Requests {} and {} differ.\n" "Failure details:\n" "{}".format(r1, r2, failures))
log.debug(f"Requests {r1} and {r2} differ.\nFailure details:\n{failures}")
return len(failures) == 0

View File

@@ -55,7 +55,7 @@ def build_uri(**parts):
port = parts["port"]
scheme = parts["protocol"]
default_port = {"https": 443, "http": 80}[scheme]
parts["port"] = ":{}".format(port) if port != default_port else ""
parts["port"] = f":{port}" if port != default_port else ""
return "{protocol}://{host}{port}{path}".format(**parts)
@@ -92,7 +92,7 @@ def migrate_json(in_fp, out_fp):
def _list_of_tuples_to_dict(fs):
return {k: v for k, v in fs[0]}
return dict(fs[0])
def _already_migrated(data):
@@ -118,7 +118,7 @@ def migrate(file_path, migration_fn):
# because we assume that original files can be reverted
# we will try to copy the content. (os.rename not needed)
with tempfile.TemporaryFile(mode="w+") as out_fp:
with open(file_path, "r") as in_fp:
with open(file_path) as in_fp:
if not migration_fn(in_fp, out_fp):
return False
with open(file_path, "w") as in_fp:
@@ -130,7 +130,7 @@ def migrate(file_path, migration_fn):
def try_migrate(path):
if path.endswith(".json"):
return migrate(path, migrate_json)
elif path.endswith(".yaml") or path.endswith(".yml"):
elif path.endswith((".yaml", ".yml")):
return migrate(path, migrate_yml)
return False
@@ -138,7 +138,7 @@ def try_migrate(path):
def main():
if len(sys.argv) != 2:
raise SystemExit(
"Please provide path to cassettes directory or file. " "Usage: python3 -m vcr.migration PATH"
"Please provide path to cassettes directory or file. Usage: python3 -m vcr.migration PATH",
)
path = sys.argv[1]
@@ -150,7 +150,7 @@ def main():
for file_path in files:
migrated = try_migrate(file_path)
status = "OK" if migrated else "FAIL"
sys.stderr.write("[{}] {}\n".format(status, file_path))
sys.stderr.write(f"[{status}] {file_path}\n")
sys.stderr.write("Done.\n")

View File

@@ -24,7 +24,7 @@ except ImportError as e:
else:
raise RuntimeError(
"vcrpy >=4.2.2 and botocore <1.11.0 are not compatible"
"; please upgrade botocore (or downgrade vcrpy)"
"; please upgrade botocore (or downgrade vcrpy)",
) from e
else:
_Boto3VerifiedHTTPSConnection = AWSHTTPSConnection
@@ -53,7 +53,7 @@ else:
if requests.__build__ < 0x021602:
raise RuntimeError(
"vcrpy >=4.2.2 and requests <2.16.2 are not compatible"
"; please upgrade requests (or downgrade vcrpy)"
"; please upgrade requests (or downgrade vcrpy)",
)
@@ -144,7 +144,9 @@ class CassettePatcherBuilder:
return
return mock.patch.object(
obj, patched_attribute, self._recursively_apply_get_cassette_subclass(replacement_class)
obj,
patched_attribute,
self._recursively_apply_get_cassette_subclass(replacement_class),
)
def _recursively_apply_get_cassette_subclass(self, replacement_dict_or_obj):
@@ -186,9 +188,7 @@ class CassettePatcherBuilder:
bases = (base_class,)
if not issubclass(base_class, object): # Check for old style class
bases += (object,)
return type(
"{}{}".format(base_class.__name__, self._cassette._path), bases, dict(cassette=self._cassette)
)
return type(f"{base_class.__name__}{self._cassette._path}", bases, {"cassette": self._cassette})
@_build_patchers_from_mock_triples_decorator
def _httplib(self):
@@ -335,10 +335,10 @@ class CassettePatcherBuilder:
def _urllib3_patchers(self, cpool, conn, stubs):
http_connection_remover = ConnectionRemover(
self._get_cassette_subclass(stubs.VCRRequestsHTTPConnection)
self._get_cassette_subclass(stubs.VCRRequestsHTTPConnection),
)
https_connection_remover = ConnectionRemover(
self._get_cassette_subclass(stubs.VCRRequestsHTTPSConnection)
self._get_cassette_subclass(stubs.VCRRequestsHTTPSConnection),
)
mock_triples = (
(conn, "VerifiedHTTPSConnection", stubs.VCRRequestsHTTPSConnection),

View File

@@ -5,17 +5,25 @@ from pathlib import Path
from ..serialize import deserialize, serialize
class CassetteNotFoundError(FileNotFoundError):
pass
class CassetteDecodeError(ValueError):
pass
class FilesystemPersister:
@classmethod
def load_cassette(cls, cassette_path, serializer):
cassette_path = Path(cassette_path) # if cassette path is already Path this is no operation
if not cassette_path.is_file():
raise ValueError("Cassette not found.")
raise CassetteNotFoundError()
try:
with cassette_path.open() as f:
data = f.read()
except UnicodeEncodeError as err:
raise ValueError("Can't read Cassette, Encoding is broken") from err
except UnicodeDecodeError as err:
raise CassetteDecodeError("Can't read Cassette, Encoding is broken") from err
return deserialize(data, serializer)

View File

@@ -46,8 +46,9 @@ class Request:
def add_header(self, key, value):
warnings.warn(
"Request.add_header is deprecated. " "Please assign to request.headers instead.",
"Request.add_header is deprecated. Please assign to request.headers instead.",
DeprecationWarning,
stacklevel=2,
)
self.headers[key] = value
@@ -90,7 +91,7 @@ class Request:
return self.scheme
def __str__(self):
return "<Request ({}) {}>".format(self.method, self.uri)
return f"<Request ({self.method}) {self.uri}>"
def __repr__(self):
return self.__str__()

View File

@@ -28,7 +28,7 @@ def _warn_about_old_cassette_format():
raise ValueError(
"Your cassette files were generated in an older version "
"of VCR. Delete your cassettes or run the migration script."
"See http://git.io/mHhLBg for more details."
"See http://git.io/mHhLBg for more details.",
)

View File

@@ -17,13 +17,5 @@ def serialize(cassette_dict):
try:
return json.dumps(cassette_dict, indent=4) + "\n"
except UnicodeDecodeError as original: # py2
raise UnicodeDecodeError(
original.encoding,
b"Error serializing cassette to JSON",
original.start,
original.end,
original.args[-1] + error_message,
)
except TypeError: # py3
raise TypeError(error_message)
except TypeError:
raise TypeError(error_message) from None

View File

@@ -188,26 +188,26 @@ class VCRConnection:
"""
port = self.real_connection.port
default_port = {"https": 443, "http": 80}[self._protocol]
return ":{}".format(port) if port != default_port else ""
return f":{port}" if port != default_port else ""
def _uri(self, url):
"""Returns request absolute URI"""
if url and not url.startswith("/"):
# Then this must be a proxy request.
return url
uri = "{}://{}{}{}".format(self._protocol, self.real_connection.host, self._port_postfix(), url)
uri = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}{url}"
log.debug("Absolute URI: %s", uri)
return uri
def _url(self, uri):
"""Returns request selector url from absolute URI"""
prefix = "{}://{}{}".format(self._protocol, self.real_connection.host, self._port_postfix())
prefix = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}"
return uri.replace(prefix, "", 1)
def request(self, method, url, body=None, headers=None, *args, **kwargs):
"""Persist the request metadata in self._vcr_request"""
self._vcr_request = Request(method=method, uri=self._uri(url), body=body, headers=headers or {})
log.debug("Got {}".format(self._vcr_request))
log.debug(f"Got {self._vcr_request}")
# Note: The request may not actually be finished at this point, so
# I'm not sending the actual request until getresponse(). This
@@ -223,7 +223,7 @@ class VCRConnection:
of putheader() calls.
"""
self._vcr_request = Request(method=method, uri=self._uri(url), body="", headers={})
log.debug("Got {}".format(self._vcr_request))
log.debug(f"Got {self._vcr_request}")
def putheader(self, header, *values):
self._vcr_request.headers[header] = values
@@ -255,19 +255,20 @@ class VCRConnection:
# Check to see if the cassette has a response for this request. If so,
# then return it
if self.cassette.can_play_response_for(self._vcr_request):
log.info("Playing response for {} from cassette".format(self._vcr_request))
log.info(f"Playing response for {self._vcr_request} from cassette")
response = self.cassette.play_response(self._vcr_request)
return VCRHTTPResponse(response)
else:
if self.cassette.write_protected and self.cassette.filter_request(self._vcr_request):
raise CannotOverwriteExistingCassetteException(
cassette=self.cassette, failed_request=self._vcr_request
cassette=self.cassette,
failed_request=self._vcr_request,
)
# Otherwise, we should send the request, then get the response
# and return it.
log.info("{} not in cassette, sending to real server".format(self._vcr_request))
log.info(f"{self._vcr_request} not in cassette, sending to real server")
# This is imported here to avoid circular import.
# TODO(@IvanMalison): Refactor to allow normal import.
from vcr.patch import force_reset

View File

@@ -35,7 +35,7 @@ class MockClientResponse(ClientResponse):
session=None,
)
async def json(self, *, encoding="utf-8", loads=json.loads, **kwargs): # NOQA: E999
async def json(self, *, encoding="utf-8", loads=json.loads, **kwargs):
stripped = self._body.strip()
if not stripped:
return None
@@ -66,7 +66,7 @@ def build_response(vcr_request, vcr_response, history):
headers=_deserialize_headers(vcr_request.headers),
real_url=URL(vcr_request.url),
)
response = MockClientResponse(vcr_request.method, URL(vcr_response.get("url")), request_info=request_info)
response = MockClientResponse(vcr_request.method, URL(vcr_request.url), request_info=request_info)
response.status = vcr_response["status"]["code"]
response._body = vcr_response["body"].get("string", b"")
response.reason = vcr_response["status"]["message"]
@@ -162,8 +162,7 @@ async def record_response(cassette, vcr_request, response):
vcr_response = {
"status": {"code": response.status, "message": response.reason},
"headers": _serialize_headers(response.headers),
"body": body, # NOQA: E999
"url": str(response.url),
"body": body,
}
cassette.append(vcr_request, vcr_response)
@@ -261,7 +260,7 @@ def vcr_request(cassette, real_request):
vcr_request = Request(method, str(request_url), data, _serialize_headers(headers))
if cassette.can_play_response_for(vcr_request):
log.info("Playing response for {} from cassette".format(vcr_request))
log.info(f"Playing response for {vcr_request} from cassette")
response = play_responses(cassette, vcr_request, kwargs)
for redirect in response.history:
self._cookie_jar.update_cookies(redirect.cookies, redirect.url)
@@ -273,7 +272,7 @@ def vcr_request(cassette, real_request):
log.info("%s not in cassette, sending to real server", vcr_request)
response = await real_request(self, method, url, **kwargs) # NOQA: E999
response = await real_request(self, method, url, **kwargs)
await record_responses(cassette, vcr_request, response)
return response

View File

@@ -29,9 +29,9 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
request,
599,
error=Exception(
"The request (%s) uses AsyncHTTPClient functionality "
f"The request ({request!r}) uses AsyncHTTPClient functionality "
"that is not yet supported by VCR.py. Please make the "
"request outside a VCR.py context." % repr(request)
"request outside a VCR.py context.",
),
request_time=self.io_loop.time() - request.start_time,
)
@@ -65,7 +65,8 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
request,
599,
error=CannotOverwriteExistingCassetteException(
cassette=cassette, failed_request=vcr_request
cassette=cassette,
failed_request=vcr_request,
),
request_time=self.io_loop.time() - request.start_time,
)

View File

@@ -32,7 +32,7 @@ class VCRMixin:
return os.path.join(testdir, "cassettes")
def _get_cassette_name(self):
return "{0}.{1}.yaml".format(self.__class__.__name__, self._testMethodName)
return f"{self.__class__.__name__}.{self._testMethodName}.yaml"
class VCRTestCase(VCRMixin, unittest.TestCase):

View File

@@ -1,9 +1,5 @@
import types
try:
from collections.abc import Mapping, MutableMapping
except ImportError:
from collections import Mapping, MutableMapping
from collections.abc import Mapping, MutableMapping
# Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py
@@ -31,7 +27,7 @@ class CaseInsensitiveDict(MutableMapping):
"""
def __init__(self, data=None, **kwargs):
self._store = dict()
self._store = {}
if data is None:
data = {}
self.update(data, **kwargs)