1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

...

51 Commits

Author SHA1 Message Date
Kevin McCarthy
34d5384318 bump version to v5.0.0 2023-06-26 12:54:39 -05:00
Sebastian Pipping
ad1010d0f8 Merge pull request #695 from kevin1024/drop37
Drop support for Python 3.7 (after 2023-06-27)
2023-06-26 18:32:42 +02:00
Amos Ng
d99593bcd3 Split persister errors into CassetteNotFoundError and CassetteDecodeError (#681) 2023-06-26 18:27:35 +02:00
Sebastian Pipping
8c03c37df4 Merge pull request #725 from kevin1024/make-assert-is-json-less-misleading
assertions.py: Fix mis-leading `assert_is_json`
2023-06-26 17:43:28 +02:00
Jair Henrique
b827cbe2da Drop support to python 3.7 2023-06-26 11:46:20 -03:00
Kevin McCarthy
92ca5a102c fix misspelled word 2023-06-26 09:22:16 -05:00
Kevin McCarthy
d2281ab646 version bump to v4.4.0 2023-06-26 09:17:41 -05:00
Sebastian Pipping
f21c8f0224 assertions.py: Fix mis-leading assert_is_json
Parameter name "a_string" was mistaken and function
name "assert_is_json" was less clear than ideal,
given that it explicitly needs bytes unlike json.loads .
2023-06-24 15:59:23 +02:00
Sebastian Pipping
8b97fd6551 Merge pull request #644 from neliseiska/replace_assert_with_raise
Replace `assert` with `raise AssertionError`
2023-06-22 22:29:13 +02:00
Sebastian Pipping
29e42211d7 Merge pull request #722 from kevin1024/run-online-tests-only-once
main.yml: Run online tests only once (to save runtime)
2023-06-22 15:46:04 +02:00
Sebastian Pipping
6e511b67fd Merge pull request #723 from kevin1024/issue-719-compression-urllib3-v2
Make decompression robust towards already decompressed input (arguably fixes #719)
2023-06-22 15:45:10 +02:00
Sebastian Pipping
9b6cb1ce23 Merge pull request #721 from kevin1024/issue-714-response-raw-stream-urllib3-v2
Make response.raw.stream() work for urllib3 v2 (fixes #714)
2023-06-22 15:44:29 +02:00
Sebastian Pipping
6a12bd1511 test_requests.py: Cover response.raw.stream() 2023-06-21 14:52:13 +02:00
Sebastian Pipping
3411bedc06 Make response.raw.stream() work for urllib3 v2 2023-06-21 14:52:13 +02:00
Sebastian Pipping
438a65426b filters.py: Make decompression robust towards decompressed input 2023-06-21 02:28:36 +02:00
Sebastian Pipping
8c6b1fdf38 test_requests.py: Extend coverage of gzip response
.. with regard to:
- not crashing with decode_compressed_response==True
- expected cassette content for body string
- expected response content, i.e. proper decompression
2023-06-21 02:28:36 +02:00
Sebastian Pipping
15e9f1868c main.yml: Run online tests only once
Online tests are tests that need access to the Internet
to pass (and hence have @pytest.marker.online decoration).
2023-06-21 00:38:58 +02:00
Sebastian Pipping
7eb235cd9c Merge pull request #720 from kevin1024/use-python3-command
Use python3 (and pip3) command
2023-06-19 15:07:23 +02:00
Sebastian Pipping
d2f2731481 Replace command "pip" with "pip3" 2023-06-18 23:08:17 +02:00
Sebastian Pipping
b2a895cb89 Replace command "python" by "python3" 2023-06-18 23:07:04 +02:00
Sebastian Pipping
ffb2f44236 Merge pull request #718 from kevin1024/enforce-online-marker-completeness
Make CI enforce that all online tests are marked with @pytest.mark.online
2023-06-18 23:03:48 +02:00
Sebastian Pipping
d66392a3fb main.yml: Enforce that use of @pytest.mark.online remains complete 2023-06-18 21:33:30 +02:00
Sebastian Pipping
b9cab239a7 runtests.sh: Fix variable quoting + add exec 2023-06-18 17:40:49 +02:00
Sebastian Pipping
276a41d9b6 Merge pull request #674 from jspricke/pytest.mark.online
Mark tests with @pytest.mark.online that need access to the Internet
2023-06-18 17:34:16 +02:00
Jochen Sprickerhof
7007e944ae pytest.mark.online tests that need internet 2023-06-18 16:52:51 +02:00
Sebastian Pipping
bd112a2385 docs/usage.rst: Fix assertions
Symptom was:
> Traceback (most recent call last):
>   File "/tmp/tmp.kJAKlLngAX/foo.py", line 6, in <module>
>     assert 'Example domains' in response
> TypeError: a bytes-like object is required, not 'str'
2023-06-18 11:14:44 -03:00
Sebastian Pipping
42848285a0 docs/usage.rst: Fix urllib import
Symptom was:
> Traceback (most recent call last):
>   File "/tmp/tmp.kJAKlLngAX/foo.py", line 5, in <module>
>     response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
> AttributeError: module 'urllib' has no attribute 'request'
2023-06-18 11:14:44 -03:00
Sebastian Pipping
e3aae34ef7 Merge pull request #713 from mghantous/mg/read1
VCRHTTPResponse Not Working with Biopython 1.81
2023-06-12 13:08:00 +02:00
Sebastian Pipping
f4316d2dae Merge pull request #712 from kevin1024/integrate-vcrpy-unittest
Integrate vcrpy-unittest (alternative to #709)
2023-06-08 18:09:22 +02:00
Sebastian Pipping
d613a814d3 vcr/unittest: Simplify file layout
.. and make "from vcr.unittest import VCRTestCase" work again.
2023-06-08 16:28:34 +02:00
Sebastian Pipping
ce234e503f docs/usage.rst: Drop needless self-reference 2023-06-08 16:28:34 +02:00
Sebastian Pipping
3527d25ce8 vcr/unittest: Simplify super(C, self) in class C to super()
.. for Python 3
2023-06-08 16:28:34 +02:00
Sebastian Pipping
dedb7ec403 Resolve needless inheritence from object (Python 3) 2023-06-08 16:28:34 +02:00
Sebastian Pipping
59263d6025 vcr/unittest: Resolve needless inheritence from object 2023-06-08 16:28:34 +02:00
Sebastian Pipping
2842cabec6 vcr/unittest: Remove unused logger 2023-06-08 16:28:34 +02:00
Sebastian Pipping
ad650a7ee1 vcr/unittest: Apply black formatting 2023-06-08 16:28:34 +02:00
Sebastian Pipping
9232915885 docs/usage.rst: Break up a long line 2023-06-08 16:28:34 +02:00
Sebastian Pipping
cbb540029f docs/usage.rst: Adapt documentation to new code location 2023-06-08 16:28:34 +02:00
Sebastian Pipping
bf30d9a5e5 vcr/unittest: Fix test test_get_vcr_with_matcher
Matcher needs attribute __name__ for function vcr.matchers.get_matchers_results .
2023-06-08 16:28:34 +02:00
Sebastian Pipping
f06f71ece4 vcr/unittest: Stop disguising MagicMock as Mock 2023-06-08 16:28:34 +02:00
Sebastian Pipping
1070d417b3 vcr/unittest: Apply 2to3 2023-06-08 16:28:34 +02:00
Sebastian Pipping
46726a9a61 vcr/unittest: Fix import of VCRTestCase in tests 2023-06-08 16:28:34 +02:00
Sebastian Pipping
87db8e69ff vcr/unittest: Use unitest.mock rather than mock of PyPI 2023-06-08 16:28:34 +02:00
Sebastian Pipping
52701ebca4 vcr/unittest: Make import to vcrpy relative 2023-06-08 16:28:34 +02:00
Sebastian Pipping
69679dc3fc vcr/unittest: Drop forward imports
.. to resolve import ambiguity.
2023-06-08 16:28:34 +02:00
Sebastian Pipping
c13f33b1e0 Add unmodified vcrpy-unittest code
Source commit is a2fd7625fde1ea15c8982759b07007aef40424b3.
License is MIT just like vcrpy.
2023-06-08 16:28:34 +02:00
Matt Ghantous
5476dd010c Casting to BufferedReader no longer needed in test 2023-06-05 23:47:47 -04:00
Matt Ghantous
0add77d5ae Add read1 method to VCRHTTPResponse 2023-06-05 23:20:52 -04:00
Yaroslav Halchenko
96a6e91def Codespell: action + config (#704) 2023-06-05 16:56:55 +02:00
Abram Clark
3b41f0ede3 Fix for #174 to prevent filters from corrupting request 2023-05-27 09:40:53 -03:00
Haapalainen, Jonne
a79356cf5f Replace assert with raise AssertionError
Assert seems to behave badly in some cases and should not be used for
handling runtime errors.

see for example:
https://medium.com/@jadhavmanoj/python-what-is-raise-and-assert-statement-c3908697bc62
https://github.com/emre/notes/blob/master/python/when-to-use-assert.md
2022-05-05 11:21:27 +03:00
51 changed files with 690 additions and 161 deletions

22
.github/workflows/codespell.yml vendored Normal file
View File

@@ -0,0 +1,22 @@
---
name: Codespell
on:
push:
branches: [master]
pull_request:
branches: [master]
permissions:
contents: read
jobs:
codespell:
name: Check for spelling errors
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Codespell
uses: codespell-project/actions-codespell@v2

View File

@@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
python-version: ["3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
steps:
- uses: actions/checkout@v3.5.2
@@ -25,11 +25,19 @@ jobs:
- name: Install project dependencies
run: |
pip install --upgrade pip
pip install codecov tox tox-gh-actions
pip3 install --upgrade pip
pip3 install codecov tox tox-gh-actions
- name: Run tests with tox
run: tox
- name: Run online tests with tox
run: tox -- -m online
- name: Run offline tests with tox with no access to the Internet
run: |
# We're using unshare to take Internet access
# away from tox so that we'll notice whenever some new test
# is missing @pytest.mark.online decoration in the future
unshare --map-root-user --net -- \
sh -c 'ip link set lo up; tox -- -m "not online"'
- name: Run coverage
run: codecov

View File

@@ -71,7 +71,7 @@ Finally, register your class with VCR to use your new serializer.
import vcr
class BogoSerializer(object):
class BogoSerializer:
"""
Must implement serialize() and deserialize() methods
"""
@@ -136,7 +136,8 @@ Create your own persistence class, see the example below:
Your custom persister must implement both ``load_cassette`` and ``save_cassette``
methods. The ``load_cassette`` method must return a deserialized cassette or raise
``ValueError`` if no cassette is found.
either ``CassetteNotFoundError`` if no cassette is found, or ``CassetteDecodeError``
if the cassette cannot be successfully deserialized.
Once the persister class is defined, register with VCR like so...

View File

@@ -7,6 +7,20 @@ For a full list of triaged issues, bugs and PRs and what release they are target
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
- 5.0.0
- BREAKING CHANGE: Drop support for Python 3.7. 3.7 is EOL as of 6/27/23 Thanks @jairhenrique
- BREAKING CHANGE: Custom Cassette persisters no longer catch ValueError. If you have implemented a custom persister (has anyone implemented a custom persister? Let us know!) then you will need to throw a CassetteNotFoundError when unable to find a cassette. See #681 for discussion and reason for this change. Thanks @amosjyng for the PR and the review from @hartwork
- 4.4.0
- HUGE thanks to @hartwork for all the work done on this release!
- Bring vcr/unittest in to vcrpy as a full feature of vcr instead of a separate library. Big thanks to @hartwork for doing this and to @agriffis for originally creating the library
- Make decompression robust towards already decompressed input (thanks @hartwork)
- Bugfix: Add read1 method (fixes compatibility with biopython), thanks @mghantous
- Bugfix: Prevent filters from corrupting request (thanks @abramclark)
- Bugfix: Add support for `response.raw.stream()` to fix urllib v2 compat
- Bugfix: Replace `assert` with `raise AssertionError`: fixes support for `PYTHONOPTIMIZE=1`
- Add pytest.mark.online to run test suite offline, thanks @jspricke
- use python3 and pip3 binaries to ease debian packaging (thanks @hartwork)
- Add codespell (thanks @mghantous)
- 4.3.1
- Support urllib3 v1 and v2. NOTE: there is an issue running urllib3 v2 on
Python older than 3.10, so this is currently blocked in the requirements.

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
#
# vcrpy documentation build configuration file, created by
# sphinx-quickstart on Sun Sep 13 11:18:00 2015.

View File

@@ -74,7 +74,7 @@ The PR reviewer is a second set of eyes to see if:
**Release Manager:**
- Ensure CI is passing.
- Create a release on github and tag it with the changelog release notes.
- ``python setup.py build sdist bdist_wheel``
- ``python3 setup.py build sdist bdist_wheel``
- ``twine upload dist/*``
- Go to ReadTheDocs build page and trigger a build https://readthedocs.org/projects/vcrpy/builds/
@@ -96,11 +96,11 @@ The test suite is pretty big and slow, but you can tell tox to only run specific
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through>
tox -e py37-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py37-requests -- -v --last-failed
tox -e py38-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py38-requests -- -v --last-failed
This will run only tests that look like ``test_status_code`` or
``test_gzip`` in the test suite, and only in the python 3.7 environment
``test_gzip`` in the test suite, and only in the python 3.8 environment
that has ``requests`` installed.
Also, in order for the boto tests to run, you will need an AWS key.
@@ -127,20 +127,20 @@ in this example::
eval "$(pyenv init -)"
# Setup your local system tox tooling
pip install tox tox-pyenv
pip3 install tox tox-pyenv
# Install supported versions (at time of writing), this does not activate them
pyenv install 3.7.5 3.8.0 pypy3.8
pyenv install 3.8.0 pypy3.8
# This activates them
pyenv local 3.7.5 3.8.0 pypy3.8
pyenv local 3.8.0 pypy3.8
# Run the whole test suite
tox
# Run the whole test suite or just part of it
tox -e lint
tox -e py37-requests
tox -e py38-requests
Troubleshooting on MacOSX

View File

@@ -4,12 +4,12 @@ Installation
VCR.py is a package on `PyPI <https://pypi.python.org>`__, so you can install
with pip::
pip install vcrpy
pip3 install vcrpy
Compatibility
-------------
VCR.py supports Python 3.7+, and `pypy <http://pypy.org>`__.
VCR.py supports Python 3.8+, and `pypy <http://pypy.org>`__.
The following HTTP libraries are supported:
@@ -35,7 +35,7 @@ rebuilding pyyaml.
1. Test if pyyaml is built with libyaml. This should work::
python -c 'from yaml import CLoader'
python3 -c 'from yaml import CLoader'
2. Install libyaml according to your Linux distribution, or using `Homebrew
<http://mxcl.github.com/homebrew/>`__ on Mac::
@@ -46,8 +46,8 @@ rebuilding pyyaml.
3. Rebuild pyyaml with libyaml::
pip uninstall pyyaml
pip --no-cache-dir install pyyaml
pip3 uninstall pyyaml
pip3 --no-cache-dir install pyyaml
Upgrade
-------
@@ -61,7 +61,7 @@ is to simply delete your cassettes and re-record all of them. VCR.py
also provides a migration script that attempts to upgrade your 0.x
cassettes to the new 1.x format. To use it, run the following command::
python -m vcr.migration PATH
python3 -m vcr.migration PATH
The PATH can be either a path to the directory with cassettes or the
path to a single cassette.

View File

@@ -4,11 +4,11 @@ Usage
.. code:: python
import vcr
import urllib
import urllib.request
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
assert 'Example domains' in response
assert b'Example domains' in response
Run this test once, and VCR.py will record the HTTP request to
``fixtures/vcr_cassettes/synopsis.yaml``. Run it again, and VCR.py will
@@ -26,7 +26,7 @@ look like this:
@vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml')
def test_iana():
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
assert 'Example domains' in response
assert b'Example domains' in response
When using the decorator version of ``use_cassette``, it is possible to
omit the path to the cassette file.
@@ -36,7 +36,7 @@ omit the path to the cassette file.
@vcr.use_cassette()
def test_iana():
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
assert 'Example domains' in response
assert b'Example domains' in response
In this case, the cassette file will be given the same name as the test
function, and it will be placed in the same directory as the file in
@@ -92,9 +92,73 @@ all
Unittest Integration
--------------------
While it's possible to use the context manager or decorator forms with unittest,
there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest
<https://github.com/agriffis/vcrpy-unittest>`__.
Inherit from ``VCRTestCase`` for automatic recording and playback of HTTP
interactions.
.. code:: python
from vcr.unittest import VCRTestCase
import requests
class MyTestCase(VCRTestCase):
def test_something(self):
response = requests.get('http://example.com')
Similar to how VCR.py returns the cassette from the context manager,
``VCRTestCase`` makes the cassette available as ``self.cassette``:
.. code:: python
self.assertEqual(len(self.cassette), 1)
self.assertEqual(self.cassette.requests[0].uri, 'http://example.com')
By default cassettes will be placed in the ``cassettes`` subdirectory next to the
test, named according to the test class and method. For example, the above test
would read from and write to ``cassettes/MyTestCase.test_something.yaml``
The configuration can be modified by overriding methods on your subclass:
``_get_vcr_kwargs``, ``_get_cassette_library_dir`` and ``_get_cassette_name``.
To modify the ``VCR`` object after instantiation, for example to add a matcher,
you can hook on ``_get_vcr``, for example:
.. code:: python
class MyTestCase(VCRTestCase):
def _get_vcr(self, **kwargs):
myvcr = super(MyTestCase, self)._get_vcr(**kwargs)
myvcr.register_matcher('mymatcher', mymatcher)
myvcr.match_on = ['mymatcher']
return myvcr
See
`the source
<https://github.com/kevin1024/vcrpy/blob/master/vcr/unittest.py>`__
for the default implementations of these methods.
If you implement a ``setUp`` method on your test class then make sure to call
the parent version ``super().setUp()`` in your own in order to continue getting
the cassettes produced.
VCRMixin
~~~~~~~~
In case inheriting from ``VCRTestCase`` is difficult because of an existing
class hierarchy containing tests in the base classes, inherit from ``VCRMixin``
instead.
.. code:: python
from vcr.unittest import VCRMixin
import requests
import unittest
class MyTestMixin(VCRMixin):
def test_something(self):
response = requests.get(self.url)
class MyTestCase(MyTestMixin, unittest.TestCase):
url = 'http://example.com'
Pytest Integration
------------------

View File

@@ -7,3 +7,14 @@ known_first_party = "vcrpy"
multi_line_output = 3
use_parentheses = true
include_trailing_comma = true
[tool.codespell]
skip = '.git,*.pdf,*.svg,.tox'
ignore-regex = "\\\\[fnrstv]"
#
# ignore-words-list = ''
[tool.pytest.ini_options]
markers = [
"online",
]

View File

@@ -4,4 +4,4 @@
# If you are getting an INVOCATION ERROR for this script then there is
# a good chance you are running on Windows.
# You can and should use WSL for running tox on Windows when it calls bash scripts.
REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` pytest $*
REQUESTS_CA_BUNDLE=`python3 -m pytest_httpbin.certs` exec pytest "$@"

View File

@@ -8,7 +8,7 @@ import sys
from setuptools import find_packages, setup
from setuptools.command.test import test as TestCommand
long_description = open("README.rst", "r").read()
long_description = open("README.rst").read()
here = os.path.abspath(os.path.dirname(__file__))
@@ -85,7 +85,7 @@ setup(
author_email="me@kevinmccarthy.org",
url="https://github.com/kevin1024/vcrpy",
packages=find_packages(exclude=["tests*"]),
python_requires=">=3.7",
python_requires=">=3.8",
install_requires=install_requires,
license="MIT",
tests_require=tests_require,
@@ -95,7 +95,6 @@ setup(
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",

View File

@@ -11,9 +11,10 @@ def assert_cassette_has_one_response(cass):
assert cass.play_count == 1
def assert_is_json(a_string):
def assert_is_json_bytes(b: bytes):
assert isinstance(b, bytes)
try:
json.loads(a_string.decode("utf-8"))
json.loads(b.decode("utf-8"))
except Exception:
assert False
assert True

View File

@@ -34,6 +34,7 @@ def post(url, output="text", **kwargs):
return request("POST", url, output="text", **kwargs)
@pytest.mark.online
def test_status(tmpdir, mockbin_request_url):
url = mockbin_request_url
@@ -46,6 +47,7 @@ def test_status(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
@pytest.mark.parametrize("auth", [None, aiohttp.BasicAuth("vcrpy", "test")])
def test_headers(tmpdir, auth, mockbin_request_url):
url = mockbin_request_url
@@ -63,6 +65,7 @@ def test_headers(tmpdir, auth, mockbin_request_url):
assert "yarl.URL" not in cassette.data[0]
@pytest.mark.online
def test_case_insensitive_headers(tmpdir, mockbin_request_url):
url = mockbin_request_url
@@ -76,6 +79,7 @@ def test_case_insensitive_headers(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
def test_text(tmpdir, mockbin_request_url):
url = mockbin_request_url
@@ -88,6 +92,7 @@ def test_text(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
def test_json(tmpdir, mockbin_request_url):
url = mockbin_request_url
headers = {"Content-Type": "application/json"}
@@ -101,6 +106,7 @@ def test_json(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
def test_binary(tmpdir, mockbin_request_url):
url = mockbin_request_url + "/image/png"
with vcr.use_cassette(str(tmpdir.join("binary.yaml"))):
@@ -112,6 +118,7 @@ def test_binary(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
def test_stream(tmpdir, mockbin_request_url):
url = mockbin_request_url
@@ -124,6 +131,7 @@ def test_stream(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
@pytest.mark.parametrize("body", ["data", "json"])
def test_post(tmpdir, body, caplog, mockbin_request_url):
caplog.set_level(logging.INFO)
@@ -143,12 +151,13 @@ def test_post(tmpdir, body, caplog, mockbin_request_url):
(
log
for log in caplog.records
if log.getMessage() == "<Request (POST) {}> not in cassette, sending to real server".format(url)
if log.getMessage() == f"<Request (POST) {url}> not in cassette, sending to real server"
),
None,
), "Log message not found."
@pytest.mark.online
def test_params(tmpdir, mockbin_request_url):
url = mockbin_request_url + "?d=d"
headers = {"Content-Type": "application/json"}
@@ -164,6 +173,7 @@ def test_params(tmpdir, mockbin_request_url):
assert cassette.play_count == 1
@pytest.mark.online
def test_params_same_url_distinct_params(tmpdir, mockbin_request_url):
url = mockbin_request_url
headers = {"Content-Type": "application/json"}
@@ -183,6 +193,7 @@ def test_params_same_url_distinct_params(tmpdir, mockbin_request_url):
get(url, output="text", params=other_params)
@pytest.mark.online
def test_params_on_url(tmpdir, mockbin_request_url):
url = mockbin_request_url + "?a=1&b=foo"
headers = {"Content-Type": "application/json"}
@@ -248,6 +259,7 @@ def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
assert cassette.play_count == 1
@pytest.mark.online
def test_redirect(tmpdir, mockbin):
url = mockbin + "/redirect/302/2"
@@ -272,6 +284,7 @@ def test_redirect(tmpdir, mockbin):
assert cassette_response.request_info.real_url == response.request_info.real_url
@pytest.mark.online
def test_not_modified(tmpdir, mockbin):
"""It doesn't try to redirect on 304"""
url = mockbin + "/status/304"
@@ -289,6 +302,7 @@ def test_not_modified(tmpdir, mockbin):
assert cassette.play_count == 1
@pytest.mark.online
def test_double_requests(tmpdir, mockbin_request_url):
"""We should capture, record, and replay all requests and response chains,
even if there are duplicate ones.
@@ -404,6 +418,7 @@ def test_cookies_redirect(httpbin_both, httpbin_ssl_context, tmpdir):
run_in_loop(run)
@pytest.mark.online
def test_not_allow_redirects(tmpdir, mockbin):
url = mockbin + "/redirect/308/5"
path = str(tmpdir.join("redirects.yaml"))

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Basic tests for cassettes"""
# External imports

View File

@@ -20,12 +20,12 @@ except ImportError:
# https://github.com/boto/botocore/pull/1495
boto3_skip_vendored_requests = pytest.mark.skipif(
botocore_awsrequest,
reason="botocore version {ver} does not use vendored requests anymore.".format(ver=botocore.__version__),
reason=f"botocore version {botocore.__version__} does not use vendored requests anymore.",
)
boto3_skip_awsrequest = pytest.mark.skipif(
not botocore_awsrequest,
reason="botocore version {ver} still uses vendored requests.".format(ver=botocore.__version__),
reason=f"botocore version {botocore.__version__} still uses vendored requests.",
)
IAM_USER_NAME = "vcrpy"

View File

@@ -7,6 +7,7 @@ import pytest
import vcr
@pytest.mark.online
def test_set_serializer_default_config(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR(serializer="json")
@@ -20,6 +21,7 @@ def test_set_serializer_default_config(tmpdir, mockbin_request_url):
assert json.loads(file_content)
@pytest.mark.online
def test_default_set_cassette_library_dir(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join("subdir")))
@@ -29,6 +31,7 @@ def test_default_set_cassette_library_dir(tmpdir, mockbin_request_url):
assert os.path.exists(str(tmpdir.join("subdir").join("test.json")))
@pytest.mark.online
def test_override_set_cassette_library_dir(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join("subdir")))
@@ -41,6 +44,7 @@ def test_override_set_cassette_library_dir(tmpdir, mockbin_request_url):
assert not os.path.exists(str(tmpdir.join("subdir").join("test.json")))
@pytest.mark.online
def test_override_match_on(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR(match_on=["method"])
@@ -62,6 +66,7 @@ def test_missing_matcher():
pass
@pytest.mark.online
def test_dont_record_on_exception(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR(record_on_exception=False)

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Basic tests about save behavior"""
# External imports
@@ -6,10 +5,13 @@ import os
import time
from urllib.request import urlopen
import pytest
# Internal imports
import vcr
@pytest.mark.online
def test_disk_saver_nowrite(tmpdir, mockbin_request_url):
"""
Ensure that when you close a cassette without changing it it doesn't
@@ -30,6 +32,7 @@ def test_disk_saver_nowrite(tmpdir, mockbin_request_url):
assert last_mod == last_mod2
@pytest.mark.online
def test_disk_saver_write(tmpdir, mockbin_request_url):
"""
Ensure that when you close a cassette after changing it it does

View File

@@ -5,7 +5,7 @@ from urllib.parse import urlencode
from urllib.request import Request, urlopen
import pytest
from assertions import assert_cassette_has_one_response, assert_is_json
from assertions import assert_cassette_has_one_response, assert_is_json_bytes
import vcr
@@ -105,7 +105,7 @@ def test_decompress_gzip(tmpdir, httpbin):
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
assert_is_json_bytes(decoded_response)
def test_decomptess_empty_body(tmpdir, httpbin):
@@ -129,7 +129,7 @@ def test_decompress_deflate(tmpdir, httpbin):
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
assert_is_json_bytes(decoded_response)
def test_decompress_regular(tmpdir, httpbin):
@@ -141,4 +141,25 @@ def test_decompress_regular(tmpdir, httpbin):
with vcr.use_cassette(cass_file) as cass:
resp = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(resp)
assert_is_json_bytes(resp)
def test_before_record_request_corruption(tmpdir, httpbin):
"""Modifying request in before_record_request should not affect outgoing request"""
def before_record(request):
request.headers.clear()
request.body = b""
return request
req = Request(
httpbin.url + "/post",
data=urlencode({"test": "exists"}).encode(),
headers={"X-Test": "exists"},
)
cass_file = str(tmpdir.join("modified_response.yaml"))
with vcr.use_cassette(cass_file, before_record_request=before_record):
resp = json.loads(urlopen(req).read())
assert resp["headers"]["X-Test"] == "exists"
assert resp["form"]["test"] == "exists"

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Integration tests with httplib2"""
from urllib.parse import urlencode
@@ -56,6 +55,7 @@ def test_response_headers(tmpdir, httpbin_both):
assert set(headers) == set(resp.items())
@pytest.mark.online
def test_effective_url(tmpdir):
"""Ensure that the effective_url is captured"""
url = "http://mockbin.org/redirect/301"

View File

@@ -87,6 +87,7 @@ def yml(tmpdir, request):
return str(tmpdir.join(request.function.__name__ + ".yaml"))
@pytest.mark.online
def test_status(tmpdir, mockbin, do_request):
url = mockbin
@@ -99,6 +100,7 @@ def test_status(tmpdir, mockbin, do_request):
assert cassette.play_count == 1
@pytest.mark.online
def test_case_insensitive_headers(tmpdir, mockbin, do_request):
url = mockbin
@@ -112,6 +114,7 @@ def test_case_insensitive_headers(tmpdir, mockbin, do_request):
assert cassette.play_count == 1
@pytest.mark.online
def test_content(tmpdir, mockbin, do_request):
url = mockbin
@@ -124,6 +127,7 @@ def test_content(tmpdir, mockbin, do_request):
assert cassette.play_count == 1
@pytest.mark.online
def test_json(tmpdir, mockbin, do_request):
url = mockbin + "/request"
@@ -138,6 +142,7 @@ def test_json(tmpdir, mockbin, do_request):
assert cassette.play_count == 1
@pytest.mark.online
def test_params_same_url_distinct_params(tmpdir, mockbin, do_request):
url = mockbin + "/request"
headers = {"Content-Type": "application/json"}
@@ -158,6 +163,7 @@ def test_params_same_url_distinct_params(tmpdir, mockbin, do_request):
do_request()("GET", url, params=params, headers=headers)
@pytest.mark.online
def test_redirect(mockbin, yml, do_request):
url = mockbin + "/redirect/303/2"
@@ -184,6 +190,7 @@ def test_redirect(mockbin, yml, do_request):
}
@pytest.mark.online
def test_work_with_gzipped_data(mockbin, do_request, yml):
url = mockbin + "/gzip?foo=bar"
headers = {"accept-encoding": "deflate, gzip"}
@@ -199,6 +206,7 @@ def test_work_with_gzipped_data(mockbin, do_request, yml):
assert cassette.play_count == 1
@pytest.mark.online
@pytest.mark.parametrize("url", ["https://github.com/kevin1024/vcrpy/issues/" + str(i) for i in range(3, 6)])
def test_simple_fetching(do_request, yml, url):
with vcr.use_cassette(yml):
@@ -231,6 +239,7 @@ def test_behind_proxy(do_request):
assert cassette_response.request.url == response.request.url
@pytest.mark.online
def test_cookies(tmpdir, mockbin, do_request):
def client_cookies(client):
return [c for c in client.client.cookies]
@@ -268,6 +277,7 @@ def test_cookies(tmpdir, mockbin, do_request):
assert client_cookies(new_client) == ["k1", "k2"]
@pytest.mark.online
def test_relative_redirects(tmpdir, scheme, do_request, mockbin):
redirect_kwargs = {HTTPX_REDIRECT_PARAM.name: True}
@@ -286,6 +296,7 @@ def test_relative_redirects(tmpdir, scheme, do_request, mockbin):
assert cassette.play_count == 3
@pytest.mark.online
def test_redirect_wo_allow_redirects(do_request, mockbin, yml):
url = mockbin + "/redirect/308/5"

View File

@@ -28,9 +28,9 @@ def test_ignore_localhost(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
urlopen("http://localhost:{}/".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}/")
assert len(cass) == 0
urlopen("http://httpbin.org:{}/".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}/")
assert len(cass) == 1
@@ -38,9 +38,9 @@ def test_ignore_httpbin(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_hosts=["httpbin.org"]) as cass:
urlopen("http://httpbin.org:{}/".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}/")
assert len(cass) == 0
urlopen("http://localhost:{}/".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}/")
assert len(cass) == 1
@@ -48,8 +48,8 @@ def test_ignore_localhost_and_httpbin(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_hosts=["httpbin.org"], ignore_localhost=True) as cass:
urlopen("http://httpbin.org:{}".format(httpbin.port))
urlopen("http://localhost:{}".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}")
urlopen(f"http://localhost:{httpbin.port}")
assert len(cass) == 0
@@ -57,12 +57,12 @@ def test_ignore_localhost_twice(tmpdir, httpbin):
with overridden_dns({"httpbin.org": "127.0.0.1"}):
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
urlopen("http://localhost:{}".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}")
assert len(cass) == 0
urlopen("http://httpbin.org:{}".format(httpbin.port))
urlopen(f"http://httpbin.org:{httpbin.port}")
assert len(cass) == 1
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
assert len(cass) == 1
urlopen("http://localhost:{}".format(httpbin.port))
urlopen("http://httpbin.org:{}".format(httpbin.port))
urlopen(f"http://localhost:{httpbin.port}")
urlopen(f"http://httpbin.org:{httpbin.port}")
assert len(cass) == 1

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Test using a proxy."""
import http.server

View File

@@ -1,5 +1,7 @@
from urllib.request import urlopen
import pytest
import vcr
@@ -11,6 +13,7 @@ def false_matcher(r1, r2):
return False
@pytest.mark.online
def test_registered_true_matcher(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR()
my_vcr.register_matcher("true", true_matcher)
@@ -26,6 +29,7 @@ def test_registered_true_matcher(tmpdir, mockbin_request_url):
urlopen(mockbin_request_url)
@pytest.mark.online
def test_registered_false_matcher(tmpdir, mockbin_request_url):
my_vcr = vcr.VCR()
my_vcr.register_matcher("false", false_matcher)

View File

@@ -1,16 +1,17 @@
# -*- coding: utf-8 -*-
"""Tests for cassettes with custom persistence"""
# External imports
import os
from urllib.request import urlopen
import pytest
# Internal imports
import vcr
from vcr.persisters.filesystem import FilesystemPersister
from vcr.persisters.filesystem import CassetteDecodeError, CassetteNotFoundError, FilesystemPersister
class CustomFilesystemPersister(object):
class CustomFilesystemPersister:
"""Behaves just like default FilesystemPersister but adds .test extension
to the cassette file"""
@@ -25,6 +26,19 @@ class CustomFilesystemPersister(object):
FilesystemPersister.save_cassette(cassette_path, cassette_dict, serializer)
class BadPersister(FilesystemPersister):
"""A bad persister that raises different errors."""
@staticmethod
def load_cassette(cassette_path, serializer):
if "nonexistent" in cassette_path:
raise CassetteNotFoundError()
elif "encoding" in cassette_path:
raise CassetteDecodeError()
else:
raise ValueError("buggy persister")
def test_save_cassette_with_custom_persister(tmpdir, httpbin):
"""Ensure you can save a cassette using custom persister"""
my_vcr = vcr.VCR()
@@ -53,3 +67,22 @@ def test_load_cassette_with_custom_persister(tmpdir, httpbin):
with my_vcr.use_cassette(test_fixture, serializer="json"):
response = urlopen(httpbin.url).read()
assert b"difficult sometimes" in response
def test_load_cassette_persister_exception_handling(tmpdir, httpbin):
"""
Ensure expected errors from persister are swallowed while unexpected ones
are passed up the call stack.
"""
my_vcr = vcr.VCR()
my_vcr.register_persister(BadPersister)
with my_vcr.use_cassette("bad/nonexistent") as cass:
assert len(cass) == 0
with my_vcr.use_cassette("bad/encoding") as cass:
assert len(cass) == 0
with pytest.raises(ValueError):
with my_vcr.use_cassette("bad/buggy") as cass:
pass

View File

@@ -1,6 +1,6 @@
"""Test requests' interaction with vcr"""
import pytest
from assertions import assert_cassette_empty, assert_is_json
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
@@ -114,22 +114,6 @@ def test_post_chunked_binary(tmpdir, httpbin):
assert req1 == req2
@pytest.mark.skipif("sys.version_info >= (3, 6)", strict=True, raises=ConnectionError)
def test_post_chunked_binary_secure(tmpdir, httpbin_secure):
"""Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str."""
data1 = iter([b"data", b"to", b"send"])
data2 = iter([b"data", b"to", b"send"])
url = httpbin_secure.url + "/post"
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
req1 = requests.post(url, data1).content
print(req1)
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
req2 = requests.post(url, data2).content
assert req1 == req2
def test_redirects(tmpdir, httpbin_both):
"""Ensure that we can handle redirects"""
url = httpbin_both + "/redirect-to?url=bytes/1024"
@@ -144,6 +128,17 @@ def test_redirects(tmpdir, httpbin_both):
assert cass.play_count == 2
def test_raw_stream(tmpdir, httpbin):
expected_response = requests.get(httpbin.url, stream=True)
expected_content = b"".join(expected_response.raw.stream())
for _ in range(2): # one for recording, one for cassette reply
with vcr.use_cassette(str(tmpdir.join("raw_stream.yaml"))):
actual_response = requests.get(httpbin.url, stream=True)
actual_content = b"".join(actual_response.raw.stream())
assert actual_content == expected_content
def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
"""Ensure that requests between schemes are treated separately"""
# First fetch a url under http, and then again under https and then
@@ -156,20 +151,40 @@ def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
assert len(cass) == 2
def test_gzip(tmpdir, httpbin_both):
def test_gzip__decode_compressed_response_false(tmpdir, httpbin_both):
"""
Ensure that requests (actually urllib3) is able to automatically decompress
the response body
"""
for _ in range(2): # one for recording, one for re-playing
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = requests.get(httpbin_both + "/gzip")
assert response.headers["content-encoding"] == "gzip" # i.e. not removed
assert_is_json_bytes(response.content) # i.e. uncompressed bytes
def test_gzip__decode_compressed_response_true(tmpdir, httpbin_both):
url = httpbin_both + "/gzip"
response = requests.get(url)
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = requests.get(url)
assert_is_json(response.content)
expected_response = requests.get(url)
expected_content = expected_response.content
assert expected_response.headers["content-encoding"] == "gzip" # self-test
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
assert_is_json(response.content)
with vcr.use_cassette(
str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True
) as cassette:
r = requests.get(url)
assert r.headers["content-encoding"] == "gzip" # i.e. not removed
assert r.content == expected_content
# Has the cassette body been decompressed?
cassette_response_body = cassette.responses[0]["body"]["string"]
assert isinstance(cassette_response_body, str)
with vcr.use_cassette(str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True):
r = requests.get(url)
assert "content-encoding" not in r.headers # i.e. removed
assert r.content == expected_content
def test_session_and_connection_close(tmpdir, httpbin):

View File

@@ -2,7 +2,7 @@ import http.client as httplib
import json
import zlib
from assertions import assert_is_json
from assertions import assert_is_json_bytes
import vcr
@@ -84,7 +84,7 @@ def test_original_decoded_response_is_not_modified(tmpdir, httpbin):
inside = conn.getresponse()
assert "content-encoding" not in inside.headers
assert_is_json(inside.read())
assert_is_json_bytes(inside.read())
def _make_before_record_response(fields, replacement="[REDACTED]"):

View File

@@ -1,10 +1,9 @@
# -*- coding: utf-8 -*-
"""Test requests' interaction with vcr"""
import json
import pytest
from assertions import assert_cassette_empty, assert_is_json
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from vcr.errors import CannotOverwriteExistingCassetteException
@@ -195,11 +194,11 @@ def test_gzip(get_client, tmpdir, scheme):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = yield get(get_client(), url, **kwargs)
assert_is_json(response.body)
assert_is_json_bytes(response.body)
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cass:
response = yield get(get_client(), url, **kwargs)
assert_is_json(response.body)
assert_is_json_bytes(response.body)
assert 1 == cass.play_count

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""Integration tests with urllib2"""
import ssl
@@ -7,6 +6,7 @@ from urllib.request import urlopen
import pytest_httpbin.certs
from assertions import assert_cassette_has_one_response
from pytest import mark
# Internal imports
import vcr
@@ -56,6 +56,7 @@ def test_response_headers(httpbin_both, tmpdir):
assert sorted(open1) == sorted(open2)
@mark.online
def test_effective_url(tmpdir):
"""Ensure that the effective_url is captured"""
url = "http://mockbin.org/redirect/301"

View File

@@ -4,7 +4,7 @@
import pytest
import pytest_httpbin
from assertions import assert_cassette_empty, assert_is_json
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from vcr.patch import force_reset
@@ -97,6 +97,7 @@ def test_post(tmpdir, httpbin_both, verify_pool_mgr):
assert req1 == req2
@pytest.mark.online
def test_redirects(tmpdir, verify_pool_mgr):
"""Ensure that we can handle redirects"""
url = "http://mockbin.org/redirect/301"
@@ -135,10 +136,10 @@ def test_gzip(tmpdir, httpbin_both, verify_pool_mgr):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
response = verify_pool_mgr.request("GET", url)
assert_is_json(response.data)
assert_is_json_bytes(response.data)
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
assert_is_json(response.data)
assert_is_json_bytes(response.data)
def test_https_with_cert_validation_disabled(tmpdir, httpbin_secure, pool_mgr):

View File

@@ -52,6 +52,7 @@ def test_flickr_multipart_upload(httpbin, tmpdir):
assert cass.play_count == 1
@pytest.mark.online
def test_flickr_should_respond_with_200(tmpdir):
testfile = str(tmpdir.join("flickr.yml"))
with vcr.use_cassette(testfile):
@@ -70,6 +71,7 @@ def test_cookies(tmpdir, httpbin):
assert sorted(r2.json()["cookies"].keys()) == ["k1", "k2"]
@pytest.mark.online
def test_amazon_doctype(tmpdir):
# amazon gzips its homepage. For some reason, in requests 2.7, it's not
# getting gunzipped.

View File

@@ -29,6 +29,19 @@ def test_cassette_load(tmpdir):
assert len(a_cassette) == 1
def test_cassette_load_nonexistent():
a_cassette = Cassette.load(path="something/nonexistent.yml")
assert len(a_cassette) == 0
def test_cassette_load_invalid_encoding(tmpdir):
a_file = tmpdir.join("invalid_encoding.yml")
with open(a_file, "wb") as fd:
fd.write(b"\xda")
a_cassette = Cassette.load(path=str(a_file))
assert len(a_cassette) == 0
def test_cassette_not_played():
a = Cassette("test")
assert not a.play_count

View File

@@ -298,6 +298,18 @@ def test_decode_response_deflate():
assert decoded_response["headers"]["content-length"] == [str(len(body))]
def test_decode_response_deflate_already_decompressed():
body = b"deflate message"
gzip_response = {
"body": {"string": body},
"headers": {
"content-encoding": ["deflate"],
},
}
decoded_response = decode_response(gzip_response)
assert decoded_response["body"]["string"] == body
def test_decode_response_gzip():
body = b"gzip message"
@@ -325,3 +337,15 @@ def test_decode_response_gzip():
decoded_response = decode_response(gzip_response)
assert decoded_response["body"]["string"] == body
assert decoded_response["headers"]["content-length"] == [str(len(body))]
def test_decode_response_gzip_already_decompressed():
body = b"gzip message"
gzip_response = {
"body": {"string": body},
"headers": {
"content-encoding": ["gzip"],
},
}
decoded_response = decode_response(gzip_response)
assert decoded_response["body"]["string"] == body

View File

@@ -17,9 +17,9 @@ def test_try_migrate_with_json(tmpdir):
cassette = tmpdir.join("cassette.json").strpath
shutil.copy("tests/fixtures/migration/old_cassette.json", cassette)
assert vcr.migration.try_migrate(cassette)
with open("tests/fixtures/migration/new_cassette.json", "r") as f:
with open("tests/fixtures/migration/new_cassette.json") as f:
expected_json = json.load(f)
with open(cassette, "r") as f:
with open(cassette) as f:
actual_json = json.load(f)
assert actual_json == expected_json
@@ -28,9 +28,9 @@ def test_try_migrate_with_yaml(tmpdir):
cassette = tmpdir.join("cassette.yaml").strpath
shutil.copy("tests/fixtures/migration/old_cassette.yaml", cassette)
assert vcr.migration.try_migrate(cassette)
with open("tests/fixtures/migration/new_cassette.yaml", "r") as f:
with open("tests/fixtures/migration/new_cassette.yaml") as f:
expected_yaml = yaml.load(f, Loader=Loader)
with open(cassette, "r") as f:
with open(cassette) as f:
actual_yaml = yaml.load(f, Loader=Loader)
assert actual_yaml == expected_yaml

View File

@@ -1,4 +1,3 @@
# coding: UTF-8
import io
from vcr.stubs import VCRHTTPResponse
@@ -93,7 +92,7 @@ def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
},
}
vcr_response = VCRHTTPResponse(recorded_response)
handle = io.TextIOWrapper(io.BufferedReader(vcr_response), encoding="utf-8")
handle = io.TextIOWrapper(vcr_response, encoding="utf-8")
handle = iter(handle)
articles = [line for line in handle]
assert len(articles) > 1

View File

@@ -1,4 +1,3 @@
# -*- encoding: utf-8 -*-
from unittest import mock
import pytest
@@ -9,24 +8,24 @@ from vcr.serializers import compat, jsonserializer, yamlserializer
def test_deserialize_old_yaml_cassette():
with open("tests/fixtures/migration/old_cassette.yaml", "r") as f:
with open("tests/fixtures/migration/old_cassette.yaml") as f:
with pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
def test_deserialize_old_json_cassette():
with open("tests/fixtures/migration/old_cassette.json", "r") as f:
with open("tests/fixtures/migration/old_cassette.json") as f:
with pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
def test_deserialize_new_yaml_cassette():
with open("tests/fixtures/migration/new_cassette.yaml", "r") as f:
with open("tests/fixtures/migration/new_cassette.yaml") as f:
deserialize(f.read(), yamlserializer)
def test_deserialize_new_json_cassette():
with open("tests/fixtures/migration/new_cassette.json", "r") as f:
with open("tests/fixtures/migration/new_cassette.json") as f:
deserialize(f.read(), jsonserializer)

View File

@@ -1,5 +1,7 @@
from unittest import mock
from pytest import mark
from vcr import mode
from vcr.cassette import Cassette
from vcr.stubs import VCRHTTPSConnection
@@ -11,6 +13,7 @@ class TestVCRConnection:
vcr_connection.ssl_version = "example_ssl_version"
assert vcr_connection.real_connection.ssl_version == "example_ssl_version"
@mark.online
@mock.patch("vcr.cassette.Cassette.can_play_response_for", return_value=False)
def testing_connect(*args):
vcr_connection = VCRHTTPSConnection("www.google.com")

199
tests/unit/test_unittest.py Normal file
View File

@@ -0,0 +1,199 @@
import os
from unittest import TextTestRunner, defaultTestLoader
from unittest.mock import MagicMock
from urllib.request import urlopen
import pytest
from vcr.unittest import VCRTestCase
def test_defaults():
class MyTest(VCRTestCase):
def test_foo(self):
pass
test = run_testcase(MyTest)[0][0]
expected_path = os.path.join(os.path.dirname(__file__), "cassettes")
expected_name = "MyTest.test_foo.yaml"
assert os.path.dirname(test.cassette._path) == expected_path
assert os.path.basename(test.cassette._path) == expected_name
def test_disabled():
# Baseline vcr_enabled = True
class MyTest(VCRTestCase):
def test_foo(self):
pass
test = run_testcase(MyTest)[0][0]
assert hasattr(test, "cassette")
# Test vcr_enabled = False
class MyTest(VCRTestCase):
vcr_enabled = False
def test_foo(self):
pass
test = run_testcase(MyTest)[0][0]
assert not hasattr(test, "cassette")
def test_cassette_library_dir():
class MyTest(VCRTestCase):
def test_foo(self):
pass
def _get_cassette_library_dir(self):
return "/testing"
test = run_testcase(MyTest)[0][0]
assert test.cassette._path.startswith("/testing/")
def test_cassette_name():
class MyTest(VCRTestCase):
def test_foo(self):
pass
def _get_cassette_name(self):
return "my-custom-name"
test = run_testcase(MyTest)[0][0]
assert os.path.basename(test.cassette._path) == "my-custom-name"
def test_vcr_kwargs_overridden():
class MyTest(VCRTestCase):
def test_foo(self):
pass
def _get_vcr_kwargs(self):
kwargs = super()._get_vcr_kwargs()
kwargs["record_mode"] = "new_episodes"
return kwargs
test = run_testcase(MyTest)[0][0]
assert test.cassette.record_mode == "new_episodes"
def test_vcr_kwargs_passed():
class MyTest(VCRTestCase):
def test_foo(self):
pass
def _get_vcr_kwargs(self):
return super()._get_vcr_kwargs(
record_mode="new_episodes",
)
test = run_testcase(MyTest)[0][0]
assert test.cassette.record_mode == "new_episodes"
def test_vcr_kwargs_cassette_dir():
# Test that _get_cassette_library_dir applies if cassette_library_dir
# is absent from vcr kwargs.
class MyTest(VCRTestCase):
def test_foo(self):
pass
def _get_vcr_kwargs(self):
return dict(
record_mode="new_episodes",
)
_get_cassette_library_dir = MagicMock(return_value="/testing")
test = run_testcase(MyTest)[0][0]
assert test.cassette._path.startswith("/testing/")
assert test._get_cassette_library_dir.call_count == 1
# Test that _get_cassette_library_dir is ignored if cassette_library_dir
# is present in vcr kwargs.
class MyTest(VCRTestCase):
def test_foo(self):
pass
def _get_vcr_kwargs(self):
return dict(
cassette_library_dir="/testing",
)
_get_cassette_library_dir = MagicMock(return_value="/ignored")
test = run_testcase(MyTest)[0][0]
assert test.cassette._path.startswith("/testing/")
assert test._get_cassette_library_dir.call_count == 0
@pytest.mark.online
def test_get_vcr_with_matcher(tmpdir):
cassette_dir = tmpdir.mkdir("cassettes")
assert len(cassette_dir.listdir()) == 0
mock_matcher = MagicMock(return_value=True, __name__="MockMatcher")
class MyTest(VCRTestCase):
def test_foo(self):
self.response = urlopen("http://example.com").read()
def _get_vcr(self):
myvcr = super()._get_vcr()
myvcr.register_matcher("mymatcher", mock_matcher)
myvcr.match_on = ["mymatcher"]
return myvcr
def _get_cassette_library_dir(self):
return str(cassette_dir)
# First run to fill cassette.
test = run_testcase(MyTest)[0][0]
assert len(test.cassette.requests) == 1
assert not mock_matcher.called # nothing in cassette
# Second run to call matcher.
test = run_testcase(MyTest)[0][0]
assert len(test.cassette.requests) == 1
assert mock_matcher.called
assert (
repr(mock_matcher.mock_calls[0])
== "call(<Request (GET) http://example.com>, <Request (GET) http://example.com>)"
)
@pytest.mark.online
def test_testcase_playback(tmpdir):
cassette_dir = tmpdir.mkdir("cassettes")
assert len(cassette_dir.listdir()) == 0
# First test actually reads from the web.
class MyTest(VCRTestCase):
def test_foo(self):
self.response = urlopen("http://example.com").read()
def _get_cassette_library_dir(self):
return str(cassette_dir)
test = run_testcase(MyTest)[0][0]
assert b"illustrative examples" in test.response
assert len(test.cassette.requests) == 1
assert test.cassette.play_count == 0
# Second test reads from cassette.
test2 = run_testcase(MyTest)[0][0]
assert test.cassette is not test2.cassette
assert b"illustrative examples" in test.response
assert len(test2.cassette.requests) == 1
assert test2.cassette.play_count == 1
def run_testcase(testcase_class):
"""Run all the tests in a TestCase and return them."""
suite = defaultTestLoader.loadTestsFromTestCase(testcase_class)
tests = list(suite._tests)
result = TextTestRunner().run(suite)
return tests, result

18
tox.ini
View File

@@ -3,7 +3,7 @@ skip_missing_interpreters=true
envlist =
cov-clean,
lint,
{py37,py38,py39,py310,py311}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
{py38,py39,py310,py311}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
{py310,py311}-{requests-urllib3-2,urllib3-2},
{pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},
{py310}-httpx019,
@@ -12,7 +12,6 @@ envlist =
[gh-actions]
python =
3.7: py37
3.8: py38
3.9: py39
3.10: py310, lint
@@ -66,9 +65,9 @@ deps =
# In other circumstances, we might want to generate a PDF or an ebook
commands =
sphinx-build -W -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
# We use Python 3.7. Tox sometimes tries to autodetect it based on the name of
# We use Python 3.8. Tox sometimes tries to autodetect it based on the name of
# the testenv, but "docs" does not give useful clues so we have to be explicit.
basepython = python3.7
basepython = python3.8
[testenv]
# Need to use develop install so that paths
@@ -94,15 +93,14 @@ deps =
aiohttp: pytest-asyncio
aiohttp: pytest-aiohttp
httpx: httpx
{py37,py38,py39,py310}-{httpx}: httpx
{py37,py38,py39,py310}-{httpx}: pytest-asyncio
{py38,py39,py310}-{httpx}: httpx
{py38,py39,py310}-{httpx}: pytest-asyncio
httpx: httpx>0.19
# httpx==0.19 is the latest version that supports allow_redirects, newer versions use follow_redirects
httpx019: httpx==0.19
{py37,py38,py39,py310}-{httpx}: pytest-asyncio
{py38,py39,py310}-{httpx}: pytest-asyncio
depends =
lint,{py37,py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py37,py38,py39,py310,py311}-{aiohttp},{py37,py38,py39,py310,py311}-{httpx}: cov-clean
cov-report: lint,{py37,py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py37,py38,py39,py310,py311}-{aiohttp}
lint,{py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311}-{aiohttp},{py38,py39,py310,py311}-{httpx}: cov-clean
cov-report: lint,{py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311}-{aiohttp}
passenv =
AWS_ACCESS_KEY_ID
AWS_DEFAULT_REGION

View File

@@ -4,7 +4,7 @@ from logging import NullHandler
from .config import VCR
from .record_mode import RecordMode as mode # noqa import is not used in this file
__version__ = "4.3.1"
__version__ = "5.0.0"
logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -11,7 +11,7 @@ from ._handle_coroutine import handle_coroutine
from .errors import UnhandledHTTPRequestError
from .matchers import get_matchers_results, method, requests_match, uri
from .patch import CassettePatcherBuilder
from .persisters.filesystem import FilesystemPersister
from .persisters.filesystem import CassetteDecodeError, CassetteNotFoundError, FilesystemPersister
from .record_mode import RecordMode
from .serializers import yamlserializer
from .util import partition_dict
@@ -280,7 +280,7 @@ class Cassette:
return response
# The cassette doesn't contain the request asked for.
raise UnhandledHTTPRequestError(
"The cassette (%r) doesn't contain the request (%r) asked for" % (self._path, request)
f"The cassette ({self._path!r}) doesn't contain the request ({request!r}) asked for"
)
def responses_of(self, request):
@@ -295,7 +295,7 @@ class Cassette:
return responses
# The cassette doesn't contain the request asked for.
raise UnhandledHTTPRequestError(
"The cassette (%r) doesn't contain the request (%r) asked for" % (self._path, request)
f"The cassette ({self._path!r}) doesn't contain the request ({request!r}) asked for"
)
def rewind(self):
@@ -352,11 +352,11 @@ class Cassette:
self.append(request, response)
self.dirty = False
self.rewound = True
except ValueError:
except (CassetteDecodeError, CassetteNotFoundError):
pass
def __str__(self):
return "<Cassette containing {} recorded response(s)>".format(len(self))
return f"<Cassette containing {len(self)} recorded response(s)>"
def __len__(self):
"""Return the number of request,response pairs stored in here"""

View File

@@ -88,7 +88,7 @@ class VCR:
try:
serializer = self.serializers[serializer_name]
except KeyError:
raise KeyError("Serializer {} doesn't exist or isn't registered".format(serializer_name))
raise KeyError(f"Serializer {serializer_name} doesn't exist or isn't registered")
return serializer
def _get_matchers(self, matcher_names):
@@ -97,7 +97,7 @@ class VCR:
for m in matcher_names:
matchers.append(self.matchers[m])
except KeyError:
raise KeyError("Matcher {} doesn't exist or isn't registered".format(m))
raise KeyError(f"Matcher {m} doesn't exist or isn't registered")
return matchers
def use_cassette(self, path=None, **kwargs):
@@ -219,7 +219,7 @@ class VCR:
filter_functions.extend(before_record_request)
def before_record_request(request):
request = copy.copy(request)
request = copy.deepcopy(request)
for function in filter_functions:
if request is None:
break

View File

@@ -153,9 +153,15 @@ def decode_response(response):
if not body:
return ""
if encoding == "gzip":
return zlib.decompress(body, zlib.MAX_WBITS | 16)
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
return zlib.decompress(body)
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.

View File

@@ -9,35 +9,43 @@ log = logging.getLogger(__name__)
def method(r1, r2):
assert r1.method == r2.method, "{} != {}".format(r1.method, r2.method)
if r1.method != r2.method:
raise AssertionError(f"{r1.method} != {r2.method}")
def uri(r1, r2):
assert r1.uri == r2.uri, "{} != {}".format(r1.uri, r2.uri)
if r1.uri != r2.uri:
raise AssertionError(f"{r1.uri} != {r2.uri}")
def host(r1, r2):
assert r1.host == r2.host, "{} != {}".format(r1.host, r2.host)
if r1.host != r2.host:
raise AssertionError(f"{r1.host} != {r2.host}")
def scheme(r1, r2):
assert r1.scheme == r2.scheme, "{} != {}".format(r1.scheme, r2.scheme)
if r1.scheme != r2.scheme:
raise AssertionError(f"{r1.scheme} != {r2.scheme}")
def port(r1, r2):
assert r1.port == r2.port, "{} != {}".format(r1.port, r2.port)
if r1.port != r2.port:
raise AssertionError(f"{r1.port} != {r2.port}")
def path(r1, r2):
assert r1.path == r2.path, "{} != {}".format(r1.path, r2.path)
if r1.path != r2.path:
raise AssertionError(f"{r1.path} != {r2.path}")
def query(r1, r2):
assert r1.query == r2.query, "{} != {}".format(r1.query, r2.query)
if r1.query != r2.query:
raise AssertionError(f"{r1.query} != {r2.query}")
def raw_body(r1, r2):
assert read_body(r1) == read_body(r2)
if read_body(r1) != read_body(r2):
raise AssertionError
def body(r1, r2):
@@ -45,11 +53,13 @@ def body(r1, r2):
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
assert transformer(read_body(r1)) == transformer(read_body(r2))
if transformer(read_body(r1)) != transformer(read_body(r2)):
raise AssertionError
def headers(r1, r2):
assert r1.headers == r2.headers, "{} != {}".format(r1.headers, r2.headers)
if r1.headers != r2.headers:
raise AssertionError(f"{r1.headers} != {r2.headers}")
def _header_checker(value, header="Content-Type"):
@@ -97,7 +107,7 @@ def _get_transformer(request):
def requests_match(r1, r2, matchers):
successes, failures = get_matchers_results(r1, r2, matchers)
if failures:
log.debug("Requests {} and {} differ.\n" "Failure details:\n" "{}".format(r1, r2, failures))
log.debug(f"Requests {r1} and {r2} differ.\nFailure details:\n{failures}")
return len(failures) == 0

View File

@@ -7,7 +7,7 @@ It merges and deletes the request obsolete keys (protocol, host, port, path)
into new 'uri' key.
Usage::
python -m vcr.migration PATH
python3 -m vcr.migration PATH
The PATH can be path to the directory with cassettes or cassette itself
"""
@@ -55,7 +55,7 @@ def build_uri(**parts):
port = parts["port"]
scheme = parts["protocol"]
default_port = {"https": 443, "http": 80}[scheme]
parts["port"] = ":{}".format(port) if port != default_port else ""
parts["port"] = f":{port}" if port != default_port else ""
return "{protocol}://{host}{port}{path}".format(**parts)
@@ -118,7 +118,7 @@ def migrate(file_path, migration_fn):
# because we assume that original files can be reverted
# we will try to copy the content. (os.rename not needed)
with tempfile.TemporaryFile(mode="w+") as out_fp:
with open(file_path, "r") as in_fp:
with open(file_path) as in_fp:
if not migration_fn(in_fp, out_fp):
return False
with open(file_path, "w") as in_fp:
@@ -138,7 +138,7 @@ def try_migrate(path):
def main():
if len(sys.argv) != 2:
raise SystemExit(
"Please provide path to cassettes directory or file. " "Usage: python -m vcr.migration PATH"
"Please provide path to cassettes directory or file. " "Usage: python3 -m vcr.migration PATH"
)
path = sys.argv[1]
@@ -150,7 +150,7 @@ def main():
for file_path in files:
migrated = try_migrate(file_path)
status = "OK" if migrated else "FAIL"
sys.stderr.write("[{}] {}\n".format(status, file_path))
sys.stderr.write(f"[{status}] {file_path}\n")
sys.stderr.write("Done.\n")

View File

@@ -186,9 +186,7 @@ class CassettePatcherBuilder:
bases = (base_class,)
if not issubclass(base_class, object): # Check for old style class
bases += (object,)
return type(
"{}{}".format(base_class.__name__, self._cassette._path), bases, dict(cassette=self._cassette)
)
return type(f"{base_class.__name__}{self._cassette._path}", bases, dict(cassette=self._cassette))
@_build_patchers_from_mock_triples_decorator
def _httplib(self):

View File

@@ -5,17 +5,25 @@ from pathlib import Path
from ..serialize import deserialize, serialize
class CassetteNotFoundError(FileNotFoundError):
pass
class CassetteDecodeError(ValueError):
pass
class FilesystemPersister:
@classmethod
def load_cassette(cls, cassette_path, serializer):
cassette_path = Path(cassette_path) # if cassette path is already Path this is no operation
if not cassette_path.is_file():
raise ValueError("Cassette not found.")
raise CassetteNotFoundError()
try:
with cassette_path.open() as f:
data = f.read()
except UnicodeEncodeError as err:
raise ValueError("Can't read Cassette, Encoding is broken") from err
except UnicodeDecodeError as err:
raise CassetteDecodeError("Can't read Cassette, Encoding is broken") from err
return deserialize(data, serializer)

View File

@@ -90,7 +90,7 @@ class Request:
return self.scheme
def __str__(self):
return "<Request ({}) {}>".format(self.method, self.uri)
return f"<Request ({self.method}) {self.uri}>"
def __repr__(self):
return self.__str__()

View File

@@ -93,6 +93,9 @@ class VCRHTTPResponse(HTTPResponse):
def read(self, *args, **kwargs):
return self._content.read(*args, **kwargs)
def read1(self, *args, **kwargs):
return self._content.read1(*args, **kwargs)
def readall(self):
return self._content.readall()
@@ -167,6 +170,13 @@ class VCRHTTPResponse(HTTPResponse):
def drain_conn(self):
pass
def stream(self, amt=65536, decode_content=None):
while True:
b = self._content.read(amt)
yield b
if not b:
break
class VCRConnection:
# A reference to the cassette that's currently being patched in
@@ -178,26 +188,26 @@ class VCRConnection:
"""
port = self.real_connection.port
default_port = {"https": 443, "http": 80}[self._protocol]
return ":{}".format(port) if port != default_port else ""
return f":{port}" if port != default_port else ""
def _uri(self, url):
"""Returns request absolute URI"""
if url and not url.startswith("/"):
# Then this must be a proxy request.
return url
uri = "{}://{}{}{}".format(self._protocol, self.real_connection.host, self._port_postfix(), url)
uri = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}{url}"
log.debug("Absolute URI: %s", uri)
return uri
def _url(self, uri):
"""Returns request selector url from absolute URI"""
prefix = "{}://{}{}".format(self._protocol, self.real_connection.host, self._port_postfix())
prefix = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}"
return uri.replace(prefix, "", 1)
def request(self, method, url, body=None, headers=None, *args, **kwargs):
"""Persist the request metadata in self._vcr_request"""
self._vcr_request = Request(method=method, uri=self._uri(url), body=body, headers=headers or {})
log.debug("Got {}".format(self._vcr_request))
log.debug(f"Got {self._vcr_request}")
# Note: The request may not actually be finished at this point, so
# I'm not sending the actual request until getresponse(). This
@@ -213,7 +223,7 @@ class VCRConnection:
of putheader() calls.
"""
self._vcr_request = Request(method=method, uri=self._uri(url), body="", headers={})
log.debug("Got {}".format(self._vcr_request))
log.debug(f"Got {self._vcr_request}")
def putheader(self, header, *values):
self._vcr_request.headers[header] = values
@@ -245,7 +255,7 @@ class VCRConnection:
# Check to see if the cassette has a response for this request. If so,
# then return it
if self.cassette.can_play_response_for(self._vcr_request):
log.info("Playing response for {} from cassette".format(self._vcr_request))
log.info(f"Playing response for {self._vcr_request} from cassette")
response = self.cassette.play_response(self._vcr_request)
return VCRHTTPResponse(response)
else:
@@ -257,7 +267,7 @@ class VCRConnection:
# Otherwise, we should send the request, then get the response
# and return it.
log.info("{} not in cassette, sending to real server".format(self._vcr_request))
log.info(f"{self._vcr_request} not in cassette, sending to real server")
# This is imported here to avoid circular import.
# TODO(@IvanMalison): Refactor to allow normal import.
from vcr.patch import force_reset

View File

@@ -261,7 +261,7 @@ def vcr_request(cassette, real_request):
vcr_request = Request(method, str(request_url), data, _serialize_headers(headers))
if cassette.can_play_response_for(vcr_request):
log.info("Playing response for {} from cassette".format(vcr_request))
log.info(f"Playing response for {vcr_request} from cassette")
response = play_responses(cassette, vcr_request, kwargs)
for redirect in response.history:
self._cookie_jar.update_cookies(redirect.cookies, redirect.url)

39
vcr/unittest.py Normal file
View File

@@ -0,0 +1,39 @@
import inspect
import os
import unittest
from .config import VCR
class VCRMixin:
"""A TestCase mixin that provides VCR integration."""
vcr_enabled = True
def setUp(self):
super().setUp()
if self.vcr_enabled:
kwargs = self._get_vcr_kwargs()
myvcr = self._get_vcr(**kwargs)
cm = myvcr.use_cassette(self._get_cassette_name())
self.cassette = cm.__enter__()
self.addCleanup(cm.__exit__, None, None, None)
def _get_vcr(self, **kwargs):
if "cassette_library_dir" not in kwargs:
kwargs["cassette_library_dir"] = self._get_cassette_library_dir()
return VCR(**kwargs)
def _get_vcr_kwargs(self, **kwargs):
return kwargs
def _get_cassette_library_dir(self):
testdir = os.path.dirname(inspect.getfile(self.__class__))
return os.path.join(testdir, "cassettes")
def _get_cassette_name(self):
return f"{self.__class__.__name__}.{self._testMethodName}.yaml"
class VCRTestCase(VCRMixin, unittest.TestCase):
pass

View File

@@ -1,9 +1,5 @@
import types
try:
from collections.abc import Mapping, MutableMapping
except ImportError:
from collections import Mapping, MutableMapping
from collections.abc import Mapping, MutableMapping
# Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py