mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-09 17:15:35 +00:00
Compare commits
51 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34d5384318 | ||
|
|
ad1010d0f8 | ||
|
|
d99593bcd3 | ||
|
|
8c03c37df4 | ||
|
|
b827cbe2da | ||
|
|
92ca5a102c | ||
|
|
d2281ab646 | ||
|
|
f21c8f0224 | ||
|
|
8b97fd6551 | ||
|
|
29e42211d7 | ||
|
|
6e511b67fd | ||
|
|
9b6cb1ce23 | ||
|
|
6a12bd1511 | ||
|
|
3411bedc06 | ||
|
|
438a65426b | ||
|
|
8c6b1fdf38 | ||
|
|
15e9f1868c | ||
|
|
7eb235cd9c | ||
|
|
d2f2731481 | ||
|
|
b2a895cb89 | ||
|
|
ffb2f44236 | ||
|
|
d66392a3fb | ||
|
|
b9cab239a7 | ||
|
|
276a41d9b6 | ||
|
|
7007e944ae | ||
|
|
bd112a2385 | ||
|
|
42848285a0 | ||
|
|
e3aae34ef7 | ||
|
|
f4316d2dae | ||
|
|
d613a814d3 | ||
|
|
ce234e503f | ||
|
|
3527d25ce8 | ||
|
|
dedb7ec403 | ||
|
|
59263d6025 | ||
|
|
2842cabec6 | ||
|
|
ad650a7ee1 | ||
|
|
9232915885 | ||
|
|
cbb540029f | ||
|
|
bf30d9a5e5 | ||
|
|
f06f71ece4 | ||
|
|
1070d417b3 | ||
|
|
46726a9a61 | ||
|
|
87db8e69ff | ||
|
|
52701ebca4 | ||
|
|
69679dc3fc | ||
|
|
c13f33b1e0 | ||
|
|
5476dd010c | ||
|
|
0add77d5ae | ||
|
|
96a6e91def | ||
|
|
3b41f0ede3 | ||
|
|
a79356cf5f |
22
.github/workflows/codespell.yml
vendored
Normal file
22
.github/workflows/codespell.yml
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
---
|
||||
name: Codespell
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master]
|
||||
pull_request:
|
||||
branches: [master]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
codespell:
|
||||
name: Check for spelling errors
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: Codespell
|
||||
uses: codespell-project/actions-codespell@v2
|
||||
18
.github/workflows/main.yml
vendored
18
.github/workflows/main.yml
vendored
@@ -13,7 +13,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
|
||||
python-version: ["3.8", "3.9", "3.10", "3.11", "pypy-3.8"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3.5.2
|
||||
@@ -25,11 +25,19 @@ jobs:
|
||||
|
||||
- name: Install project dependencies
|
||||
run: |
|
||||
pip install --upgrade pip
|
||||
pip install codecov tox tox-gh-actions
|
||||
pip3 install --upgrade pip
|
||||
pip3 install codecov tox tox-gh-actions
|
||||
|
||||
- name: Run tests with tox
|
||||
run: tox
|
||||
- name: Run online tests with tox
|
||||
run: tox -- -m online
|
||||
|
||||
- name: Run offline tests with tox with no access to the Internet
|
||||
run: |
|
||||
# We're using unshare to take Internet access
|
||||
# away from tox so that we'll notice whenever some new test
|
||||
# is missing @pytest.mark.online decoration in the future
|
||||
unshare --map-root-user --net -- \
|
||||
sh -c 'ip link set lo up; tox -- -m "not online"'
|
||||
|
||||
- name: Run coverage
|
||||
run: codecov
|
||||
|
||||
@@ -71,7 +71,7 @@ Finally, register your class with VCR to use your new serializer.
|
||||
|
||||
import vcr
|
||||
|
||||
class BogoSerializer(object):
|
||||
class BogoSerializer:
|
||||
"""
|
||||
Must implement serialize() and deserialize() methods
|
||||
"""
|
||||
@@ -136,7 +136,8 @@ Create your own persistence class, see the example below:
|
||||
|
||||
Your custom persister must implement both ``load_cassette`` and ``save_cassette``
|
||||
methods. The ``load_cassette`` method must return a deserialized cassette or raise
|
||||
``ValueError`` if no cassette is found.
|
||||
either ``CassetteNotFoundError`` if no cassette is found, or ``CassetteDecodeError``
|
||||
if the cassette cannot be successfully deserialized.
|
||||
|
||||
Once the persister class is defined, register with VCR like so...
|
||||
|
||||
|
||||
@@ -7,6 +7,20 @@ For a full list of triaged issues, bugs and PRs and what release they are target
|
||||
|
||||
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
|
||||
|
||||
- 5.0.0
|
||||
- BREAKING CHANGE: Drop support for Python 3.7. 3.7 is EOL as of 6/27/23 Thanks @jairhenrique
|
||||
- BREAKING CHANGE: Custom Cassette persisters no longer catch ValueError. If you have implemented a custom persister (has anyone implemented a custom persister? Let us know!) then you will need to throw a CassetteNotFoundError when unable to find a cassette. See #681 for discussion and reason for this change. Thanks @amosjyng for the PR and the review from @hartwork
|
||||
- 4.4.0
|
||||
- HUGE thanks to @hartwork for all the work done on this release!
|
||||
- Bring vcr/unittest in to vcrpy as a full feature of vcr instead of a separate library. Big thanks to @hartwork for doing this and to @agriffis for originally creating the library
|
||||
- Make decompression robust towards already decompressed input (thanks @hartwork)
|
||||
- Bugfix: Add read1 method (fixes compatibility with biopython), thanks @mghantous
|
||||
- Bugfix: Prevent filters from corrupting request (thanks @abramclark)
|
||||
- Bugfix: Add support for `response.raw.stream()` to fix urllib v2 compat
|
||||
- Bugfix: Replace `assert` with `raise AssertionError`: fixes support for `PYTHONOPTIMIZE=1`
|
||||
- Add pytest.mark.online to run test suite offline, thanks @jspricke
|
||||
- use python3 and pip3 binaries to ease debian packaging (thanks @hartwork)
|
||||
- Add codespell (thanks @mghantous)
|
||||
- 4.3.1
|
||||
- Support urllib3 v1 and v2. NOTE: there is an issue running urllib3 v2 on
|
||||
Python older than 3.10, so this is currently blocked in the requirements.
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# vcrpy documentation build configuration file, created by
|
||||
# sphinx-quickstart on Sun Sep 13 11:18:00 2015.
|
||||
|
||||
@@ -74,7 +74,7 @@ The PR reviewer is a second set of eyes to see if:
|
||||
**Release Manager:**
|
||||
- Ensure CI is passing.
|
||||
- Create a release on github and tag it with the changelog release notes.
|
||||
- ``python setup.py build sdist bdist_wheel``
|
||||
- ``python3 setup.py build sdist bdist_wheel``
|
||||
- ``twine upload dist/*``
|
||||
- Go to ReadTheDocs build page and trigger a build https://readthedocs.org/projects/vcrpy/builds/
|
||||
|
||||
@@ -96,11 +96,11 @@ The test suite is pretty big and slow, but you can tell tox to only run specific
|
||||
|
||||
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through>
|
||||
|
||||
tox -e py37-requests -- -v -k "'test_status_code or test_gzip'"
|
||||
tox -e py37-requests -- -v --last-failed
|
||||
tox -e py38-requests -- -v -k "'test_status_code or test_gzip'"
|
||||
tox -e py38-requests -- -v --last-failed
|
||||
|
||||
This will run only tests that look like ``test_status_code`` or
|
||||
``test_gzip`` in the test suite, and only in the python 3.7 environment
|
||||
``test_gzip`` in the test suite, and only in the python 3.8 environment
|
||||
that has ``requests`` installed.
|
||||
|
||||
Also, in order for the boto tests to run, you will need an AWS key.
|
||||
@@ -127,20 +127,20 @@ in this example::
|
||||
eval "$(pyenv init -)"
|
||||
|
||||
# Setup your local system tox tooling
|
||||
pip install tox tox-pyenv
|
||||
pip3 install tox tox-pyenv
|
||||
|
||||
# Install supported versions (at time of writing), this does not activate them
|
||||
pyenv install 3.7.5 3.8.0 pypy3.8
|
||||
pyenv install 3.8.0 pypy3.8
|
||||
|
||||
# This activates them
|
||||
pyenv local 3.7.5 3.8.0 pypy3.8
|
||||
pyenv local 3.8.0 pypy3.8
|
||||
|
||||
# Run the whole test suite
|
||||
tox
|
||||
|
||||
# Run the whole test suite or just part of it
|
||||
tox -e lint
|
||||
tox -e py37-requests
|
||||
tox -e py38-requests
|
||||
|
||||
|
||||
Troubleshooting on MacOSX
|
||||
|
||||
@@ -4,12 +4,12 @@ Installation
|
||||
VCR.py is a package on `PyPI <https://pypi.python.org>`__, so you can install
|
||||
with pip::
|
||||
|
||||
pip install vcrpy
|
||||
pip3 install vcrpy
|
||||
|
||||
Compatibility
|
||||
-------------
|
||||
|
||||
VCR.py supports Python 3.7+, and `pypy <http://pypy.org>`__.
|
||||
VCR.py supports Python 3.8+, and `pypy <http://pypy.org>`__.
|
||||
|
||||
The following HTTP libraries are supported:
|
||||
|
||||
@@ -35,7 +35,7 @@ rebuilding pyyaml.
|
||||
|
||||
1. Test if pyyaml is built with libyaml. This should work::
|
||||
|
||||
python -c 'from yaml import CLoader'
|
||||
python3 -c 'from yaml import CLoader'
|
||||
|
||||
2. Install libyaml according to your Linux distribution, or using `Homebrew
|
||||
<http://mxcl.github.com/homebrew/>`__ on Mac::
|
||||
@@ -46,8 +46,8 @@ rebuilding pyyaml.
|
||||
|
||||
3. Rebuild pyyaml with libyaml::
|
||||
|
||||
pip uninstall pyyaml
|
||||
pip --no-cache-dir install pyyaml
|
||||
pip3 uninstall pyyaml
|
||||
pip3 --no-cache-dir install pyyaml
|
||||
|
||||
Upgrade
|
||||
-------
|
||||
@@ -61,7 +61,7 @@ is to simply delete your cassettes and re-record all of them. VCR.py
|
||||
also provides a migration script that attempts to upgrade your 0.x
|
||||
cassettes to the new 1.x format. To use it, run the following command::
|
||||
|
||||
python -m vcr.migration PATH
|
||||
python3 -m vcr.migration PATH
|
||||
|
||||
The PATH can be either a path to the directory with cassettes or the
|
||||
path to a single cassette.
|
||||
|
||||
@@ -4,11 +4,11 @@ Usage
|
||||
.. code:: python
|
||||
|
||||
import vcr
|
||||
import urllib
|
||||
import urllib.request
|
||||
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
||||
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
assert b'Example domains' in response
|
||||
|
||||
Run this test once, and VCR.py will record the HTTP request to
|
||||
``fixtures/vcr_cassettes/synopsis.yaml``. Run it again, and VCR.py will
|
||||
@@ -26,7 +26,7 @@ look like this:
|
||||
@vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml')
|
||||
def test_iana():
|
||||
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
assert b'Example domains' in response
|
||||
|
||||
When using the decorator version of ``use_cassette``, it is possible to
|
||||
omit the path to the cassette file.
|
||||
@@ -36,7 +36,7 @@ omit the path to the cassette file.
|
||||
@vcr.use_cassette()
|
||||
def test_iana():
|
||||
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
assert b'Example domains' in response
|
||||
|
||||
In this case, the cassette file will be given the same name as the test
|
||||
function, and it will be placed in the same directory as the file in
|
||||
@@ -92,9 +92,73 @@ all
|
||||
Unittest Integration
|
||||
--------------------
|
||||
|
||||
While it's possible to use the context manager or decorator forms with unittest,
|
||||
there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest
|
||||
<https://github.com/agriffis/vcrpy-unittest>`__.
|
||||
Inherit from ``VCRTestCase`` for automatic recording and playback of HTTP
|
||||
interactions.
|
||||
|
||||
.. code:: python
|
||||
|
||||
from vcr.unittest import VCRTestCase
|
||||
import requests
|
||||
|
||||
class MyTestCase(VCRTestCase):
|
||||
def test_something(self):
|
||||
response = requests.get('http://example.com')
|
||||
|
||||
Similar to how VCR.py returns the cassette from the context manager,
|
||||
``VCRTestCase`` makes the cassette available as ``self.cassette``:
|
||||
|
||||
.. code:: python
|
||||
|
||||
self.assertEqual(len(self.cassette), 1)
|
||||
self.assertEqual(self.cassette.requests[0].uri, 'http://example.com')
|
||||
|
||||
By default cassettes will be placed in the ``cassettes`` subdirectory next to the
|
||||
test, named according to the test class and method. For example, the above test
|
||||
would read from and write to ``cassettes/MyTestCase.test_something.yaml``
|
||||
|
||||
The configuration can be modified by overriding methods on your subclass:
|
||||
``_get_vcr_kwargs``, ``_get_cassette_library_dir`` and ``_get_cassette_name``.
|
||||
To modify the ``VCR`` object after instantiation, for example to add a matcher,
|
||||
you can hook on ``_get_vcr``, for example:
|
||||
|
||||
.. code:: python
|
||||
|
||||
class MyTestCase(VCRTestCase):
|
||||
def _get_vcr(self, **kwargs):
|
||||
myvcr = super(MyTestCase, self)._get_vcr(**kwargs)
|
||||
myvcr.register_matcher('mymatcher', mymatcher)
|
||||
myvcr.match_on = ['mymatcher']
|
||||
return myvcr
|
||||
|
||||
See
|
||||
`the source
|
||||
<https://github.com/kevin1024/vcrpy/blob/master/vcr/unittest.py>`__
|
||||
for the default implementations of these methods.
|
||||
|
||||
If you implement a ``setUp`` method on your test class then make sure to call
|
||||
the parent version ``super().setUp()`` in your own in order to continue getting
|
||||
the cassettes produced.
|
||||
|
||||
VCRMixin
|
||||
~~~~~~~~
|
||||
|
||||
In case inheriting from ``VCRTestCase`` is difficult because of an existing
|
||||
class hierarchy containing tests in the base classes, inherit from ``VCRMixin``
|
||||
instead.
|
||||
|
||||
.. code:: python
|
||||
|
||||
from vcr.unittest import VCRMixin
|
||||
import requests
|
||||
import unittest
|
||||
|
||||
class MyTestMixin(VCRMixin):
|
||||
def test_something(self):
|
||||
response = requests.get(self.url)
|
||||
|
||||
class MyTestCase(MyTestMixin, unittest.TestCase):
|
||||
url = 'http://example.com'
|
||||
|
||||
|
||||
Pytest Integration
|
||||
------------------
|
||||
|
||||
@@ -7,3 +7,14 @@ known_first_party = "vcrpy"
|
||||
multi_line_output = 3
|
||||
use_parentheses = true
|
||||
include_trailing_comma = true
|
||||
|
||||
[tool.codespell]
|
||||
skip = '.git,*.pdf,*.svg,.tox'
|
||||
ignore-regex = "\\\\[fnrstv]"
|
||||
#
|
||||
# ignore-words-list = ''
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
markers = [
|
||||
"online",
|
||||
]
|
||||
|
||||
@@ -4,4 +4,4 @@
|
||||
# If you are getting an INVOCATION ERROR for this script then there is
|
||||
# a good chance you are running on Windows.
|
||||
# You can and should use WSL for running tox on Windows when it calls bash scripts.
|
||||
REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` pytest $*
|
||||
REQUESTS_CA_BUNDLE=`python3 -m pytest_httpbin.certs` exec pytest "$@"
|
||||
|
||||
5
setup.py
5
setup.py
@@ -8,7 +8,7 @@ import sys
|
||||
from setuptools import find_packages, setup
|
||||
from setuptools.command.test import test as TestCommand
|
||||
|
||||
long_description = open("README.rst", "r").read()
|
||||
long_description = open("README.rst").read()
|
||||
here = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
@@ -85,7 +85,7 @@ setup(
|
||||
author_email="me@kevinmccarthy.org",
|
||||
url="https://github.com/kevin1024/vcrpy",
|
||||
packages=find_packages(exclude=["tests*"]),
|
||||
python_requires=">=3.7",
|
||||
python_requires=">=3.8",
|
||||
install_requires=install_requires,
|
||||
license="MIT",
|
||||
tests_require=tests_require,
|
||||
@@ -95,7 +95,6 @@ setup(
|
||||
"Intended Audience :: Developers",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
|
||||
@@ -11,9 +11,10 @@ def assert_cassette_has_one_response(cass):
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
def assert_is_json(a_string):
|
||||
def assert_is_json_bytes(b: bytes):
|
||||
assert isinstance(b, bytes)
|
||||
try:
|
||||
json.loads(a_string.decode("utf-8"))
|
||||
json.loads(b.decode("utf-8"))
|
||||
except Exception:
|
||||
assert False
|
||||
assert True
|
||||
|
||||
@@ -34,6 +34,7 @@ def post(url, output="text", **kwargs):
|
||||
return request("POST", url, output="text", **kwargs)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_status(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -46,6 +47,7 @@ def test_status(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize("auth", [None, aiohttp.BasicAuth("vcrpy", "test")])
|
||||
def test_headers(tmpdir, auth, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
@@ -63,6 +65,7 @@ def test_headers(tmpdir, auth, mockbin_request_url):
|
||||
assert "yarl.URL" not in cassette.data[0]
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_case_insensitive_headers(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -76,6 +79,7 @@ def test_case_insensitive_headers(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_text(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -88,6 +92,7 @@ def test_text(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_json(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -101,6 +106,7 @@ def test_json(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_binary(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url + "/image/png"
|
||||
with vcr.use_cassette(str(tmpdir.join("binary.yaml"))):
|
||||
@@ -112,6 +118,7 @@ def test_binary(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_stream(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -124,6 +131,7 @@ def test_stream(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize("body", ["data", "json"])
|
||||
def test_post(tmpdir, body, caplog, mockbin_request_url):
|
||||
caplog.set_level(logging.INFO)
|
||||
@@ -143,12 +151,13 @@ def test_post(tmpdir, body, caplog, mockbin_request_url):
|
||||
(
|
||||
log
|
||||
for log in caplog.records
|
||||
if log.getMessage() == "<Request (POST) {}> not in cassette, sending to real server".format(url)
|
||||
if log.getMessage() == f"<Request (POST) {url}> not in cassette, sending to real server"
|
||||
),
|
||||
None,
|
||||
), "Log message not found."
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url + "?d=d"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -164,6 +173,7 @@ def test_params(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params_same_url_distinct_params(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -183,6 +193,7 @@ def test_params_same_url_distinct_params(tmpdir, mockbin_request_url):
|
||||
get(url, output="text", params=other_params)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params_on_url(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url + "?a=1&b=foo"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -248,6 +259,7 @@ def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirect(tmpdir, mockbin):
|
||||
url = mockbin + "/redirect/302/2"
|
||||
|
||||
@@ -272,6 +284,7 @@ def test_redirect(tmpdir, mockbin):
|
||||
assert cassette_response.request_info.real_url == response.request_info.real_url
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_not_modified(tmpdir, mockbin):
|
||||
"""It doesn't try to redirect on 304"""
|
||||
url = mockbin + "/status/304"
|
||||
@@ -289,6 +302,7 @@ def test_not_modified(tmpdir, mockbin):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_double_requests(tmpdir, mockbin_request_url):
|
||||
"""We should capture, record, and replay all requests and response chains,
|
||||
even if there are duplicate ones.
|
||||
@@ -404,6 +418,7 @@ def test_cookies_redirect(httpbin_both, httpbin_ssl_context, tmpdir):
|
||||
run_in_loop(run)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_not_allow_redirects(tmpdir, mockbin):
|
||||
url = mockbin + "/redirect/308/5"
|
||||
path = str(tmpdir.join("redirects.yaml"))
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Basic tests for cassettes"""
|
||||
|
||||
# External imports
|
||||
|
||||
@@ -20,12 +20,12 @@ except ImportError:
|
||||
# https://github.com/boto/botocore/pull/1495
|
||||
boto3_skip_vendored_requests = pytest.mark.skipif(
|
||||
botocore_awsrequest,
|
||||
reason="botocore version {ver} does not use vendored requests anymore.".format(ver=botocore.__version__),
|
||||
reason=f"botocore version {botocore.__version__} does not use vendored requests anymore.",
|
||||
)
|
||||
|
||||
boto3_skip_awsrequest = pytest.mark.skipif(
|
||||
not botocore_awsrequest,
|
||||
reason="botocore version {ver} still uses vendored requests.".format(ver=botocore.__version__),
|
||||
reason=f"botocore version {botocore.__version__} still uses vendored requests.",
|
||||
)
|
||||
|
||||
IAM_USER_NAME = "vcrpy"
|
||||
|
||||
@@ -7,6 +7,7 @@ import pytest
|
||||
import vcr
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_set_serializer_default_config(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(serializer="json")
|
||||
|
||||
@@ -20,6 +21,7 @@ def test_set_serializer_default_config(tmpdir, mockbin_request_url):
|
||||
assert json.loads(file_content)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_default_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join("subdir")))
|
||||
|
||||
@@ -29,6 +31,7 @@ def test_default_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
assert os.path.exists(str(tmpdir.join("subdir").join("test.json")))
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_override_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join("subdir")))
|
||||
|
||||
@@ -41,6 +44,7 @@ def test_override_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
assert not os.path.exists(str(tmpdir.join("subdir").join("test.json")))
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_override_match_on(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(match_on=["method"])
|
||||
|
||||
@@ -62,6 +66,7 @@ def test_missing_matcher():
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_dont_record_on_exception(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(record_on_exception=False)
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Basic tests about save behavior"""
|
||||
|
||||
# External imports
|
||||
@@ -6,10 +5,13 @@ import os
|
||||
import time
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_disk_saver_nowrite(tmpdir, mockbin_request_url):
|
||||
"""
|
||||
Ensure that when you close a cassette without changing it it doesn't
|
||||
@@ -30,6 +32,7 @@ def test_disk_saver_nowrite(tmpdir, mockbin_request_url):
|
||||
assert last_mod == last_mod2
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_disk_saver_write(tmpdir, mockbin_request_url):
|
||||
"""
|
||||
Ensure that when you close a cassette after changing it it does
|
||||
|
||||
@@ -5,7 +5,7 @@ from urllib.parse import urlencode
|
||||
from urllib.request import Request, urlopen
|
||||
|
||||
import pytest
|
||||
from assertions import assert_cassette_has_one_response, assert_is_json
|
||||
from assertions import assert_cassette_has_one_response, assert_is_json_bytes
|
||||
|
||||
import vcr
|
||||
|
||||
@@ -105,7 +105,7 @@ def test_decompress_gzip(tmpdir, httpbin):
|
||||
with vcr.use_cassette(cass_file) as cass:
|
||||
decoded_response = urlopen(url).read()
|
||||
assert_cassette_has_one_response(cass)
|
||||
assert_is_json(decoded_response)
|
||||
assert_is_json_bytes(decoded_response)
|
||||
|
||||
|
||||
def test_decomptess_empty_body(tmpdir, httpbin):
|
||||
@@ -129,7 +129,7 @@ def test_decompress_deflate(tmpdir, httpbin):
|
||||
with vcr.use_cassette(cass_file) as cass:
|
||||
decoded_response = urlopen(url).read()
|
||||
assert_cassette_has_one_response(cass)
|
||||
assert_is_json(decoded_response)
|
||||
assert_is_json_bytes(decoded_response)
|
||||
|
||||
|
||||
def test_decompress_regular(tmpdir, httpbin):
|
||||
@@ -141,4 +141,25 @@ def test_decompress_regular(tmpdir, httpbin):
|
||||
with vcr.use_cassette(cass_file) as cass:
|
||||
resp = urlopen(url).read()
|
||||
assert_cassette_has_one_response(cass)
|
||||
assert_is_json(resp)
|
||||
assert_is_json_bytes(resp)
|
||||
|
||||
|
||||
def test_before_record_request_corruption(tmpdir, httpbin):
|
||||
"""Modifying request in before_record_request should not affect outgoing request"""
|
||||
|
||||
def before_record(request):
|
||||
request.headers.clear()
|
||||
request.body = b""
|
||||
return request
|
||||
|
||||
req = Request(
|
||||
httpbin.url + "/post",
|
||||
data=urlencode({"test": "exists"}).encode(),
|
||||
headers={"X-Test": "exists"},
|
||||
)
|
||||
cass_file = str(tmpdir.join("modified_response.yaml"))
|
||||
with vcr.use_cassette(cass_file, before_record_request=before_record):
|
||||
resp = json.loads(urlopen(req).read())
|
||||
|
||||
assert resp["headers"]["X-Test"] == "exists"
|
||||
assert resp["form"]["test"] == "exists"
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Integration tests with httplib2"""
|
||||
from urllib.parse import urlencode
|
||||
|
||||
@@ -56,6 +55,7 @@ def test_response_headers(tmpdir, httpbin_both):
|
||||
assert set(headers) == set(resp.items())
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_effective_url(tmpdir):
|
||||
"""Ensure that the effective_url is captured"""
|
||||
url = "http://mockbin.org/redirect/301"
|
||||
|
||||
@@ -87,6 +87,7 @@ def yml(tmpdir, request):
|
||||
return str(tmpdir.join(request.function.__name__ + ".yaml"))
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_status(tmpdir, mockbin, do_request):
|
||||
url = mockbin
|
||||
|
||||
@@ -99,6 +100,7 @@ def test_status(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_case_insensitive_headers(tmpdir, mockbin, do_request):
|
||||
url = mockbin
|
||||
|
||||
@@ -112,6 +114,7 @@ def test_case_insensitive_headers(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_content(tmpdir, mockbin, do_request):
|
||||
url = mockbin
|
||||
|
||||
@@ -124,6 +127,7 @@ def test_content(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_json(tmpdir, mockbin, do_request):
|
||||
url = mockbin + "/request"
|
||||
|
||||
@@ -138,6 +142,7 @@ def test_json(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params_same_url_distinct_params(tmpdir, mockbin, do_request):
|
||||
url = mockbin + "/request"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -158,6 +163,7 @@ def test_params_same_url_distinct_params(tmpdir, mockbin, do_request):
|
||||
do_request()("GET", url, params=params, headers=headers)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirect(mockbin, yml, do_request):
|
||||
url = mockbin + "/redirect/303/2"
|
||||
|
||||
@@ -184,6 +190,7 @@ def test_redirect(mockbin, yml, do_request):
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_work_with_gzipped_data(mockbin, do_request, yml):
|
||||
url = mockbin + "/gzip?foo=bar"
|
||||
headers = {"accept-encoding": "deflate, gzip"}
|
||||
@@ -199,6 +206,7 @@ def test_work_with_gzipped_data(mockbin, do_request, yml):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize("url", ["https://github.com/kevin1024/vcrpy/issues/" + str(i) for i in range(3, 6)])
|
||||
def test_simple_fetching(do_request, yml, url):
|
||||
with vcr.use_cassette(yml):
|
||||
@@ -231,6 +239,7 @@ def test_behind_proxy(do_request):
|
||||
assert cassette_response.request.url == response.request.url
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_cookies(tmpdir, mockbin, do_request):
|
||||
def client_cookies(client):
|
||||
return [c for c in client.client.cookies]
|
||||
@@ -268,6 +277,7 @@ def test_cookies(tmpdir, mockbin, do_request):
|
||||
assert client_cookies(new_client) == ["k1", "k2"]
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_relative_redirects(tmpdir, scheme, do_request, mockbin):
|
||||
redirect_kwargs = {HTTPX_REDIRECT_PARAM.name: True}
|
||||
|
||||
@@ -286,6 +296,7 @@ def test_relative_redirects(tmpdir, scheme, do_request, mockbin):
|
||||
assert cassette.play_count == 3
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirect_wo_allow_redirects(do_request, mockbin, yml):
|
||||
url = mockbin + "/redirect/308/5"
|
||||
|
||||
|
||||
@@ -28,9 +28,9 @@ def test_ignore_localhost(tmpdir, httpbin):
|
||||
with overridden_dns({"httpbin.org": "127.0.0.1"}):
|
||||
cass_file = str(tmpdir.join("filter_qs.yaml"))
|
||||
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
|
||||
urlopen("http://localhost:{}/".format(httpbin.port))
|
||||
urlopen(f"http://localhost:{httpbin.port}/")
|
||||
assert len(cass) == 0
|
||||
urlopen("http://httpbin.org:{}/".format(httpbin.port))
|
||||
urlopen(f"http://httpbin.org:{httpbin.port}/")
|
||||
assert len(cass) == 1
|
||||
|
||||
|
||||
@@ -38,9 +38,9 @@ def test_ignore_httpbin(tmpdir, httpbin):
|
||||
with overridden_dns({"httpbin.org": "127.0.0.1"}):
|
||||
cass_file = str(tmpdir.join("filter_qs.yaml"))
|
||||
with vcr.use_cassette(cass_file, ignore_hosts=["httpbin.org"]) as cass:
|
||||
urlopen("http://httpbin.org:{}/".format(httpbin.port))
|
||||
urlopen(f"http://httpbin.org:{httpbin.port}/")
|
||||
assert len(cass) == 0
|
||||
urlopen("http://localhost:{}/".format(httpbin.port))
|
||||
urlopen(f"http://localhost:{httpbin.port}/")
|
||||
assert len(cass) == 1
|
||||
|
||||
|
||||
@@ -48,8 +48,8 @@ def test_ignore_localhost_and_httpbin(tmpdir, httpbin):
|
||||
with overridden_dns({"httpbin.org": "127.0.0.1"}):
|
||||
cass_file = str(tmpdir.join("filter_qs.yaml"))
|
||||
with vcr.use_cassette(cass_file, ignore_hosts=["httpbin.org"], ignore_localhost=True) as cass:
|
||||
urlopen("http://httpbin.org:{}".format(httpbin.port))
|
||||
urlopen("http://localhost:{}".format(httpbin.port))
|
||||
urlopen(f"http://httpbin.org:{httpbin.port}")
|
||||
urlopen(f"http://localhost:{httpbin.port}")
|
||||
assert len(cass) == 0
|
||||
|
||||
|
||||
@@ -57,12 +57,12 @@ def test_ignore_localhost_twice(tmpdir, httpbin):
|
||||
with overridden_dns({"httpbin.org": "127.0.0.1"}):
|
||||
cass_file = str(tmpdir.join("filter_qs.yaml"))
|
||||
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
|
||||
urlopen("http://localhost:{}".format(httpbin.port))
|
||||
urlopen(f"http://localhost:{httpbin.port}")
|
||||
assert len(cass) == 0
|
||||
urlopen("http://httpbin.org:{}".format(httpbin.port))
|
||||
urlopen(f"http://httpbin.org:{httpbin.port}")
|
||||
assert len(cass) == 1
|
||||
with vcr.use_cassette(cass_file, ignore_localhost=True) as cass:
|
||||
assert len(cass) == 1
|
||||
urlopen("http://localhost:{}".format(httpbin.port))
|
||||
urlopen("http://httpbin.org:{}".format(httpbin.port))
|
||||
urlopen(f"http://localhost:{httpbin.port}")
|
||||
urlopen(f"http://httpbin.org:{httpbin.port}")
|
||||
assert len(cass) == 1
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test using a proxy."""
|
||||
|
||||
import http.server
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
import vcr
|
||||
|
||||
|
||||
@@ -11,6 +13,7 @@ def false_matcher(r1, r2):
|
||||
return False
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_registered_true_matcher(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher("true", true_matcher)
|
||||
@@ -26,6 +29,7 @@ def test_registered_true_matcher(tmpdir, mockbin_request_url):
|
||||
urlopen(mockbin_request_url)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_registered_false_matcher(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher("false", false_matcher)
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Tests for cassettes with custom persistence"""
|
||||
|
||||
# External imports
|
||||
import os
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
from vcr.persisters.filesystem import FilesystemPersister
|
||||
from vcr.persisters.filesystem import CassetteDecodeError, CassetteNotFoundError, FilesystemPersister
|
||||
|
||||
|
||||
class CustomFilesystemPersister(object):
|
||||
class CustomFilesystemPersister:
|
||||
"""Behaves just like default FilesystemPersister but adds .test extension
|
||||
to the cassette file"""
|
||||
|
||||
@@ -25,6 +26,19 @@ class CustomFilesystemPersister(object):
|
||||
FilesystemPersister.save_cassette(cassette_path, cassette_dict, serializer)
|
||||
|
||||
|
||||
class BadPersister(FilesystemPersister):
|
||||
"""A bad persister that raises different errors."""
|
||||
|
||||
@staticmethod
|
||||
def load_cassette(cassette_path, serializer):
|
||||
if "nonexistent" in cassette_path:
|
||||
raise CassetteNotFoundError()
|
||||
elif "encoding" in cassette_path:
|
||||
raise CassetteDecodeError()
|
||||
else:
|
||||
raise ValueError("buggy persister")
|
||||
|
||||
|
||||
def test_save_cassette_with_custom_persister(tmpdir, httpbin):
|
||||
"""Ensure you can save a cassette using custom persister"""
|
||||
my_vcr = vcr.VCR()
|
||||
@@ -53,3 +67,22 @@ def test_load_cassette_with_custom_persister(tmpdir, httpbin):
|
||||
with my_vcr.use_cassette(test_fixture, serializer="json"):
|
||||
response = urlopen(httpbin.url).read()
|
||||
assert b"difficult sometimes" in response
|
||||
|
||||
|
||||
def test_load_cassette_persister_exception_handling(tmpdir, httpbin):
|
||||
"""
|
||||
Ensure expected errors from persister are swallowed while unexpected ones
|
||||
are passed up the call stack.
|
||||
"""
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_persister(BadPersister)
|
||||
|
||||
with my_vcr.use_cassette("bad/nonexistent") as cass:
|
||||
assert len(cass) == 0
|
||||
|
||||
with my_vcr.use_cassette("bad/encoding") as cass:
|
||||
assert len(cass) == 0
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with my_vcr.use_cassette("bad/buggy") as cass:
|
||||
pass
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""Test requests' interaction with vcr"""
|
||||
import pytest
|
||||
from assertions import assert_cassette_empty, assert_is_json
|
||||
from assertions import assert_cassette_empty, assert_is_json_bytes
|
||||
|
||||
import vcr
|
||||
|
||||
@@ -114,22 +114,6 @@ def test_post_chunked_binary(tmpdir, httpbin):
|
||||
assert req1 == req2
|
||||
|
||||
|
||||
@pytest.mark.skipif("sys.version_info >= (3, 6)", strict=True, raises=ConnectionError)
|
||||
def test_post_chunked_binary_secure(tmpdir, httpbin_secure):
|
||||
"""Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str."""
|
||||
data1 = iter([b"data", b"to", b"send"])
|
||||
data2 = iter([b"data", b"to", b"send"])
|
||||
url = httpbin_secure.url + "/post"
|
||||
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
|
||||
req1 = requests.post(url, data1).content
|
||||
print(req1)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
|
||||
req2 = requests.post(url, data2).content
|
||||
|
||||
assert req1 == req2
|
||||
|
||||
|
||||
def test_redirects(tmpdir, httpbin_both):
|
||||
"""Ensure that we can handle redirects"""
|
||||
url = httpbin_both + "/redirect-to?url=bytes/1024"
|
||||
@@ -144,6 +128,17 @@ def test_redirects(tmpdir, httpbin_both):
|
||||
assert cass.play_count == 2
|
||||
|
||||
|
||||
def test_raw_stream(tmpdir, httpbin):
|
||||
expected_response = requests.get(httpbin.url, stream=True)
|
||||
expected_content = b"".join(expected_response.raw.stream())
|
||||
|
||||
for _ in range(2): # one for recording, one for cassette reply
|
||||
with vcr.use_cassette(str(tmpdir.join("raw_stream.yaml"))):
|
||||
actual_response = requests.get(httpbin.url, stream=True)
|
||||
actual_content = b"".join(actual_response.raw.stream())
|
||||
assert actual_content == expected_content
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
|
||||
"""Ensure that requests between schemes are treated separately"""
|
||||
# First fetch a url under http, and then again under https and then
|
||||
@@ -156,20 +151,40 @@ def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
def test_gzip(tmpdir, httpbin_both):
|
||||
def test_gzip__decode_compressed_response_false(tmpdir, httpbin_both):
|
||||
"""
|
||||
Ensure that requests (actually urllib3) is able to automatically decompress
|
||||
the response body
|
||||
"""
|
||||
for _ in range(2): # one for recording, one for re-playing
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
response = requests.get(httpbin_both + "/gzip")
|
||||
assert response.headers["content-encoding"] == "gzip" # i.e. not removed
|
||||
assert_is_json_bytes(response.content) # i.e. uncompressed bytes
|
||||
|
||||
|
||||
def test_gzip__decode_compressed_response_true(tmpdir, httpbin_both):
|
||||
url = httpbin_both + "/gzip"
|
||||
response = requests.get(url)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
response = requests.get(url)
|
||||
assert_is_json(response.content)
|
||||
expected_response = requests.get(url)
|
||||
expected_content = expected_response.content
|
||||
assert expected_response.headers["content-encoding"] == "gzip" # self-test
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
assert_is_json(response.content)
|
||||
with vcr.use_cassette(
|
||||
str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True
|
||||
) as cassette:
|
||||
r = requests.get(url)
|
||||
assert r.headers["content-encoding"] == "gzip" # i.e. not removed
|
||||
assert r.content == expected_content
|
||||
|
||||
# Has the cassette body been decompressed?
|
||||
cassette_response_body = cassette.responses[0]["body"]["string"]
|
||||
assert isinstance(cassette_response_body, str)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True):
|
||||
r = requests.get(url)
|
||||
assert "content-encoding" not in r.headers # i.e. removed
|
||||
assert r.content == expected_content
|
||||
|
||||
|
||||
def test_session_and_connection_close(tmpdir, httpbin):
|
||||
|
||||
@@ -2,7 +2,7 @@ import http.client as httplib
|
||||
import json
|
||||
import zlib
|
||||
|
||||
from assertions import assert_is_json
|
||||
from assertions import assert_is_json_bytes
|
||||
|
||||
import vcr
|
||||
|
||||
@@ -84,7 +84,7 @@ def test_original_decoded_response_is_not_modified(tmpdir, httpbin):
|
||||
inside = conn.getresponse()
|
||||
|
||||
assert "content-encoding" not in inside.headers
|
||||
assert_is_json(inside.read())
|
||||
assert_is_json_bytes(inside.read())
|
||||
|
||||
|
||||
def _make_before_record_response(fields, replacement="[REDACTED]"):
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Test requests' interaction with vcr"""
|
||||
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from assertions import assert_cassette_empty, assert_is_json
|
||||
from assertions import assert_cassette_empty, assert_is_json_bytes
|
||||
|
||||
import vcr
|
||||
from vcr.errors import CannotOverwriteExistingCassetteException
|
||||
@@ -195,11 +194,11 @@ def test_gzip(get_client, tmpdir, scheme):
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
response = yield get(get_client(), url, **kwargs)
|
||||
assert_is_json(response.body)
|
||||
assert_is_json_bytes(response.body)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cass:
|
||||
response = yield get(get_client(), url, **kwargs)
|
||||
assert_is_json(response.body)
|
||||
assert_is_json_bytes(response.body)
|
||||
assert 1 == cass.play_count
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Integration tests with urllib2"""
|
||||
|
||||
import ssl
|
||||
@@ -7,6 +6,7 @@ from urllib.request import urlopen
|
||||
|
||||
import pytest_httpbin.certs
|
||||
from assertions import assert_cassette_has_one_response
|
||||
from pytest import mark
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
@@ -56,6 +56,7 @@ def test_response_headers(httpbin_both, tmpdir):
|
||||
assert sorted(open1) == sorted(open2)
|
||||
|
||||
|
||||
@mark.online
|
||||
def test_effective_url(tmpdir):
|
||||
"""Ensure that the effective_url is captured"""
|
||||
url = "http://mockbin.org/redirect/301"
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
|
||||
import pytest
|
||||
import pytest_httpbin
|
||||
from assertions import assert_cassette_empty, assert_is_json
|
||||
from assertions import assert_cassette_empty, assert_is_json_bytes
|
||||
|
||||
import vcr
|
||||
from vcr.patch import force_reset
|
||||
@@ -97,6 +97,7 @@ def test_post(tmpdir, httpbin_both, verify_pool_mgr):
|
||||
assert req1 == req2
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirects(tmpdir, verify_pool_mgr):
|
||||
"""Ensure that we can handle redirects"""
|
||||
url = "http://mockbin.org/redirect/301"
|
||||
@@ -135,10 +136,10 @@ def test_gzip(tmpdir, httpbin_both, verify_pool_mgr):
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
response = verify_pool_mgr.request("GET", url)
|
||||
assert_is_json(response.data)
|
||||
assert_is_json_bytes(response.data)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
assert_is_json(response.data)
|
||||
assert_is_json_bytes(response.data)
|
||||
|
||||
|
||||
def test_https_with_cert_validation_disabled(tmpdir, httpbin_secure, pool_mgr):
|
||||
|
||||
@@ -52,6 +52,7 @@ def test_flickr_multipart_upload(httpbin, tmpdir):
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_flickr_should_respond_with_200(tmpdir):
|
||||
testfile = str(tmpdir.join("flickr.yml"))
|
||||
with vcr.use_cassette(testfile):
|
||||
@@ -70,6 +71,7 @@ def test_cookies(tmpdir, httpbin):
|
||||
assert sorted(r2.json()["cookies"].keys()) == ["k1", "k2"]
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_amazon_doctype(tmpdir):
|
||||
# amazon gzips its homepage. For some reason, in requests 2.7, it's not
|
||||
# getting gunzipped.
|
||||
|
||||
@@ -29,6 +29,19 @@ def test_cassette_load(tmpdir):
|
||||
assert len(a_cassette) == 1
|
||||
|
||||
|
||||
def test_cassette_load_nonexistent():
|
||||
a_cassette = Cassette.load(path="something/nonexistent.yml")
|
||||
assert len(a_cassette) == 0
|
||||
|
||||
|
||||
def test_cassette_load_invalid_encoding(tmpdir):
|
||||
a_file = tmpdir.join("invalid_encoding.yml")
|
||||
with open(a_file, "wb") as fd:
|
||||
fd.write(b"\xda")
|
||||
a_cassette = Cassette.load(path=str(a_file))
|
||||
assert len(a_cassette) == 0
|
||||
|
||||
|
||||
def test_cassette_not_played():
|
||||
a = Cassette("test")
|
||||
assert not a.play_count
|
||||
|
||||
@@ -298,6 +298,18 @@ def test_decode_response_deflate():
|
||||
assert decoded_response["headers"]["content-length"] == [str(len(body))]
|
||||
|
||||
|
||||
def test_decode_response_deflate_already_decompressed():
|
||||
body = b"deflate message"
|
||||
gzip_response = {
|
||||
"body": {"string": body},
|
||||
"headers": {
|
||||
"content-encoding": ["deflate"],
|
||||
},
|
||||
}
|
||||
decoded_response = decode_response(gzip_response)
|
||||
assert decoded_response["body"]["string"] == body
|
||||
|
||||
|
||||
def test_decode_response_gzip():
|
||||
body = b"gzip message"
|
||||
|
||||
@@ -325,3 +337,15 @@ def test_decode_response_gzip():
|
||||
decoded_response = decode_response(gzip_response)
|
||||
assert decoded_response["body"]["string"] == body
|
||||
assert decoded_response["headers"]["content-length"] == [str(len(body))]
|
||||
|
||||
|
||||
def test_decode_response_gzip_already_decompressed():
|
||||
body = b"gzip message"
|
||||
gzip_response = {
|
||||
"body": {"string": body},
|
||||
"headers": {
|
||||
"content-encoding": ["gzip"],
|
||||
},
|
||||
}
|
||||
decoded_response = decode_response(gzip_response)
|
||||
assert decoded_response["body"]["string"] == body
|
||||
|
||||
@@ -17,9 +17,9 @@ def test_try_migrate_with_json(tmpdir):
|
||||
cassette = tmpdir.join("cassette.json").strpath
|
||||
shutil.copy("tests/fixtures/migration/old_cassette.json", cassette)
|
||||
assert vcr.migration.try_migrate(cassette)
|
||||
with open("tests/fixtures/migration/new_cassette.json", "r") as f:
|
||||
with open("tests/fixtures/migration/new_cassette.json") as f:
|
||||
expected_json = json.load(f)
|
||||
with open(cassette, "r") as f:
|
||||
with open(cassette) as f:
|
||||
actual_json = json.load(f)
|
||||
assert actual_json == expected_json
|
||||
|
||||
@@ -28,9 +28,9 @@ def test_try_migrate_with_yaml(tmpdir):
|
||||
cassette = tmpdir.join("cassette.yaml").strpath
|
||||
shutil.copy("tests/fixtures/migration/old_cassette.yaml", cassette)
|
||||
assert vcr.migration.try_migrate(cassette)
|
||||
with open("tests/fixtures/migration/new_cassette.yaml", "r") as f:
|
||||
with open("tests/fixtures/migration/new_cassette.yaml") as f:
|
||||
expected_yaml = yaml.load(f, Loader=Loader)
|
||||
with open(cassette, "r") as f:
|
||||
with open(cassette) as f:
|
||||
actual_yaml = yaml.load(f, Loader=Loader)
|
||||
assert actual_yaml == expected_yaml
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# coding: UTF-8
|
||||
import io
|
||||
|
||||
from vcr.stubs import VCRHTTPResponse
|
||||
@@ -93,7 +92,7 @@ def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
|
||||
},
|
||||
}
|
||||
vcr_response = VCRHTTPResponse(recorded_response)
|
||||
handle = io.TextIOWrapper(io.BufferedReader(vcr_response), encoding="utf-8")
|
||||
handle = io.TextIOWrapper(vcr_response, encoding="utf-8")
|
||||
handle = iter(handle)
|
||||
articles = [line for line in handle]
|
||||
assert len(articles) > 1
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
@@ -9,24 +8,24 @@ from vcr.serializers import compat, jsonserializer, yamlserializer
|
||||
|
||||
|
||||
def test_deserialize_old_yaml_cassette():
|
||||
with open("tests/fixtures/migration/old_cassette.yaml", "r") as f:
|
||||
with open("tests/fixtures/migration/old_cassette.yaml") as f:
|
||||
with pytest.raises(ValueError):
|
||||
deserialize(f.read(), yamlserializer)
|
||||
|
||||
|
||||
def test_deserialize_old_json_cassette():
|
||||
with open("tests/fixtures/migration/old_cassette.json", "r") as f:
|
||||
with open("tests/fixtures/migration/old_cassette.json") as f:
|
||||
with pytest.raises(ValueError):
|
||||
deserialize(f.read(), jsonserializer)
|
||||
|
||||
|
||||
def test_deserialize_new_yaml_cassette():
|
||||
with open("tests/fixtures/migration/new_cassette.yaml", "r") as f:
|
||||
with open("tests/fixtures/migration/new_cassette.yaml") as f:
|
||||
deserialize(f.read(), yamlserializer)
|
||||
|
||||
|
||||
def test_deserialize_new_json_cassette():
|
||||
with open("tests/fixtures/migration/new_cassette.json", "r") as f:
|
||||
with open("tests/fixtures/migration/new_cassette.json") as f:
|
||||
deserialize(f.read(), jsonserializer)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from unittest import mock
|
||||
|
||||
from pytest import mark
|
||||
|
||||
from vcr import mode
|
||||
from vcr.cassette import Cassette
|
||||
from vcr.stubs import VCRHTTPSConnection
|
||||
@@ -11,6 +13,7 @@ class TestVCRConnection:
|
||||
vcr_connection.ssl_version = "example_ssl_version"
|
||||
assert vcr_connection.real_connection.ssl_version == "example_ssl_version"
|
||||
|
||||
@mark.online
|
||||
@mock.patch("vcr.cassette.Cassette.can_play_response_for", return_value=False)
|
||||
def testing_connect(*args):
|
||||
vcr_connection = VCRHTTPSConnection("www.google.com")
|
||||
|
||||
199
tests/unit/test_unittest.py
Normal file
199
tests/unit/test_unittest.py
Normal file
@@ -0,0 +1,199 @@
|
||||
import os
|
||||
from unittest import TextTestRunner, defaultTestLoader
|
||||
from unittest.mock import MagicMock
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
from vcr.unittest import VCRTestCase
|
||||
|
||||
|
||||
def test_defaults():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
expected_path = os.path.join(os.path.dirname(__file__), "cassettes")
|
||||
expected_name = "MyTest.test_foo.yaml"
|
||||
assert os.path.dirname(test.cassette._path) == expected_path
|
||||
assert os.path.basename(test.cassette._path) == expected_name
|
||||
|
||||
|
||||
def test_disabled():
|
||||
# Baseline vcr_enabled = True
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert hasattr(test, "cassette")
|
||||
|
||||
# Test vcr_enabled = False
|
||||
class MyTest(VCRTestCase):
|
||||
vcr_enabled = False
|
||||
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert not hasattr(test, "cassette")
|
||||
|
||||
|
||||
def test_cassette_library_dir():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
return "/testing"
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette._path.startswith("/testing/")
|
||||
|
||||
|
||||
def test_cassette_name():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_cassette_name(self):
|
||||
return "my-custom-name"
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert os.path.basename(test.cassette._path) == "my-custom-name"
|
||||
|
||||
|
||||
def test_vcr_kwargs_overridden():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
kwargs = super()._get_vcr_kwargs()
|
||||
kwargs["record_mode"] = "new_episodes"
|
||||
return kwargs
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette.record_mode == "new_episodes"
|
||||
|
||||
|
||||
def test_vcr_kwargs_passed():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
return super()._get_vcr_kwargs(
|
||||
record_mode="new_episodes",
|
||||
)
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette.record_mode == "new_episodes"
|
||||
|
||||
|
||||
def test_vcr_kwargs_cassette_dir():
|
||||
# Test that _get_cassette_library_dir applies if cassette_library_dir
|
||||
# is absent from vcr kwargs.
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
return dict(
|
||||
record_mode="new_episodes",
|
||||
)
|
||||
|
||||
_get_cassette_library_dir = MagicMock(return_value="/testing")
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette._path.startswith("/testing/")
|
||||
assert test._get_cassette_library_dir.call_count == 1
|
||||
|
||||
# Test that _get_cassette_library_dir is ignored if cassette_library_dir
|
||||
# is present in vcr kwargs.
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
return dict(
|
||||
cassette_library_dir="/testing",
|
||||
)
|
||||
|
||||
_get_cassette_library_dir = MagicMock(return_value="/ignored")
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette._path.startswith("/testing/")
|
||||
assert test._get_cassette_library_dir.call_count == 0
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_get_vcr_with_matcher(tmpdir):
|
||||
cassette_dir = tmpdir.mkdir("cassettes")
|
||||
assert len(cassette_dir.listdir()) == 0
|
||||
|
||||
mock_matcher = MagicMock(return_value=True, __name__="MockMatcher")
|
||||
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
self.response = urlopen("http://example.com").read()
|
||||
|
||||
def _get_vcr(self):
|
||||
myvcr = super()._get_vcr()
|
||||
myvcr.register_matcher("mymatcher", mock_matcher)
|
||||
myvcr.match_on = ["mymatcher"]
|
||||
return myvcr
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
return str(cassette_dir)
|
||||
|
||||
# First run to fill cassette.
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert len(test.cassette.requests) == 1
|
||||
assert not mock_matcher.called # nothing in cassette
|
||||
|
||||
# Second run to call matcher.
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert len(test.cassette.requests) == 1
|
||||
assert mock_matcher.called
|
||||
assert (
|
||||
repr(mock_matcher.mock_calls[0])
|
||||
== "call(<Request (GET) http://example.com>, <Request (GET) http://example.com>)"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_testcase_playback(tmpdir):
|
||||
cassette_dir = tmpdir.mkdir("cassettes")
|
||||
assert len(cassette_dir.listdir()) == 0
|
||||
|
||||
# First test actually reads from the web.
|
||||
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
self.response = urlopen("http://example.com").read()
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
return str(cassette_dir)
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert b"illustrative examples" in test.response
|
||||
assert len(test.cassette.requests) == 1
|
||||
assert test.cassette.play_count == 0
|
||||
|
||||
# Second test reads from cassette.
|
||||
|
||||
test2 = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette is not test2.cassette
|
||||
assert b"illustrative examples" in test.response
|
||||
assert len(test2.cassette.requests) == 1
|
||||
assert test2.cassette.play_count == 1
|
||||
|
||||
|
||||
def run_testcase(testcase_class):
|
||||
"""Run all the tests in a TestCase and return them."""
|
||||
suite = defaultTestLoader.loadTestsFromTestCase(testcase_class)
|
||||
tests = list(suite._tests)
|
||||
result = TextTestRunner().run(suite)
|
||||
return tests, result
|
||||
18
tox.ini
18
tox.ini
@@ -3,7 +3,7 @@ skip_missing_interpreters=true
|
||||
envlist =
|
||||
cov-clean,
|
||||
lint,
|
||||
{py37,py38,py39,py310,py311}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
|
||||
{py38,py39,py310,py311}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
|
||||
{py310,py311}-{requests-urllib3-2,urllib3-2},
|
||||
{pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},
|
||||
{py310}-httpx019,
|
||||
@@ -12,7 +12,6 @@ envlist =
|
||||
|
||||
[gh-actions]
|
||||
python =
|
||||
3.7: py37
|
||||
3.8: py38
|
||||
3.9: py39
|
||||
3.10: py310, lint
|
||||
@@ -66,9 +65,9 @@ deps =
|
||||
# In other circumstances, we might want to generate a PDF or an ebook
|
||||
commands =
|
||||
sphinx-build -W -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
|
||||
# We use Python 3.7. Tox sometimes tries to autodetect it based on the name of
|
||||
# We use Python 3.8. Tox sometimes tries to autodetect it based on the name of
|
||||
# the testenv, but "docs" does not give useful clues so we have to be explicit.
|
||||
basepython = python3.7
|
||||
basepython = python3.8
|
||||
|
||||
[testenv]
|
||||
# Need to use develop install so that paths
|
||||
@@ -94,15 +93,14 @@ deps =
|
||||
aiohttp: pytest-asyncio
|
||||
aiohttp: pytest-aiohttp
|
||||
httpx: httpx
|
||||
{py37,py38,py39,py310}-{httpx}: httpx
|
||||
{py37,py38,py39,py310}-{httpx}: pytest-asyncio
|
||||
{py38,py39,py310}-{httpx}: httpx
|
||||
{py38,py39,py310}-{httpx}: pytest-asyncio
|
||||
httpx: httpx>0.19
|
||||
# httpx==0.19 is the latest version that supports allow_redirects, newer versions use follow_redirects
|
||||
httpx019: httpx==0.19
|
||||
{py37,py38,py39,py310}-{httpx}: pytest-asyncio
|
||||
{py38,py39,py310}-{httpx}: pytest-asyncio
|
||||
depends =
|
||||
lint,{py37,py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py37,py38,py39,py310,py311}-{aiohttp},{py37,py38,py39,py310,py311}-{httpx}: cov-clean
|
||||
cov-report: lint,{py37,py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py37,py38,py39,py310,py311}-{aiohttp}
|
||||
lint,{py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311}-{aiohttp},{py38,py39,py310,py311}-{httpx}: cov-clean
|
||||
cov-report: lint,{py38,py39,py310,py311,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311}-{aiohttp}
|
||||
passenv =
|
||||
AWS_ACCESS_KEY_ID
|
||||
AWS_DEFAULT_REGION
|
||||
|
||||
@@ -4,7 +4,7 @@ from logging import NullHandler
|
||||
from .config import VCR
|
||||
from .record_mode import RecordMode as mode # noqa import is not used in this file
|
||||
|
||||
__version__ = "4.3.1"
|
||||
__version__ = "5.0.0"
|
||||
|
||||
logging.getLogger(__name__).addHandler(NullHandler())
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ from ._handle_coroutine import handle_coroutine
|
||||
from .errors import UnhandledHTTPRequestError
|
||||
from .matchers import get_matchers_results, method, requests_match, uri
|
||||
from .patch import CassettePatcherBuilder
|
||||
from .persisters.filesystem import FilesystemPersister
|
||||
from .persisters.filesystem import CassetteDecodeError, CassetteNotFoundError, FilesystemPersister
|
||||
from .record_mode import RecordMode
|
||||
from .serializers import yamlserializer
|
||||
from .util import partition_dict
|
||||
@@ -280,7 +280,7 @@ class Cassette:
|
||||
return response
|
||||
# The cassette doesn't contain the request asked for.
|
||||
raise UnhandledHTTPRequestError(
|
||||
"The cassette (%r) doesn't contain the request (%r) asked for" % (self._path, request)
|
||||
f"The cassette ({self._path!r}) doesn't contain the request ({request!r}) asked for"
|
||||
)
|
||||
|
||||
def responses_of(self, request):
|
||||
@@ -295,7 +295,7 @@ class Cassette:
|
||||
return responses
|
||||
# The cassette doesn't contain the request asked for.
|
||||
raise UnhandledHTTPRequestError(
|
||||
"The cassette (%r) doesn't contain the request (%r) asked for" % (self._path, request)
|
||||
f"The cassette ({self._path!r}) doesn't contain the request ({request!r}) asked for"
|
||||
)
|
||||
|
||||
def rewind(self):
|
||||
@@ -352,11 +352,11 @@ class Cassette:
|
||||
self.append(request, response)
|
||||
self.dirty = False
|
||||
self.rewound = True
|
||||
except ValueError:
|
||||
except (CassetteDecodeError, CassetteNotFoundError):
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "<Cassette containing {} recorded response(s)>".format(len(self))
|
||||
return f"<Cassette containing {len(self)} recorded response(s)>"
|
||||
|
||||
def __len__(self):
|
||||
"""Return the number of request,response pairs stored in here"""
|
||||
|
||||
@@ -88,7 +88,7 @@ class VCR:
|
||||
try:
|
||||
serializer = self.serializers[serializer_name]
|
||||
except KeyError:
|
||||
raise KeyError("Serializer {} doesn't exist or isn't registered".format(serializer_name))
|
||||
raise KeyError(f"Serializer {serializer_name} doesn't exist or isn't registered")
|
||||
return serializer
|
||||
|
||||
def _get_matchers(self, matcher_names):
|
||||
@@ -97,7 +97,7 @@ class VCR:
|
||||
for m in matcher_names:
|
||||
matchers.append(self.matchers[m])
|
||||
except KeyError:
|
||||
raise KeyError("Matcher {} doesn't exist or isn't registered".format(m))
|
||||
raise KeyError(f"Matcher {m} doesn't exist or isn't registered")
|
||||
return matchers
|
||||
|
||||
def use_cassette(self, path=None, **kwargs):
|
||||
@@ -219,7 +219,7 @@ class VCR:
|
||||
filter_functions.extend(before_record_request)
|
||||
|
||||
def before_record_request(request):
|
||||
request = copy.copy(request)
|
||||
request = copy.deepcopy(request)
|
||||
for function in filter_functions:
|
||||
if request is None:
|
||||
break
|
||||
|
||||
@@ -153,9 +153,15 @@ def decode_response(response):
|
||||
if not body:
|
||||
return ""
|
||||
if encoding == "gzip":
|
||||
try:
|
||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||
except zlib.error:
|
||||
return body # assumes that the data was already decompressed
|
||||
else: # encoding == 'deflate'
|
||||
try:
|
||||
return zlib.decompress(body)
|
||||
except zlib.error:
|
||||
return body # assumes that the data was already decompressed
|
||||
|
||||
# Deepcopy here in case `headers` contain objects that could
|
||||
# be mutated by a shallow copy and corrupt the real response.
|
||||
|
||||
@@ -9,35 +9,43 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def method(r1, r2):
|
||||
assert r1.method == r2.method, "{} != {}".format(r1.method, r2.method)
|
||||
if r1.method != r2.method:
|
||||
raise AssertionError(f"{r1.method} != {r2.method}")
|
||||
|
||||
|
||||
def uri(r1, r2):
|
||||
assert r1.uri == r2.uri, "{} != {}".format(r1.uri, r2.uri)
|
||||
if r1.uri != r2.uri:
|
||||
raise AssertionError(f"{r1.uri} != {r2.uri}")
|
||||
|
||||
|
||||
def host(r1, r2):
|
||||
assert r1.host == r2.host, "{} != {}".format(r1.host, r2.host)
|
||||
if r1.host != r2.host:
|
||||
raise AssertionError(f"{r1.host} != {r2.host}")
|
||||
|
||||
|
||||
def scheme(r1, r2):
|
||||
assert r1.scheme == r2.scheme, "{} != {}".format(r1.scheme, r2.scheme)
|
||||
if r1.scheme != r2.scheme:
|
||||
raise AssertionError(f"{r1.scheme} != {r2.scheme}")
|
||||
|
||||
|
||||
def port(r1, r2):
|
||||
assert r1.port == r2.port, "{} != {}".format(r1.port, r2.port)
|
||||
if r1.port != r2.port:
|
||||
raise AssertionError(f"{r1.port} != {r2.port}")
|
||||
|
||||
|
||||
def path(r1, r2):
|
||||
assert r1.path == r2.path, "{} != {}".format(r1.path, r2.path)
|
||||
if r1.path != r2.path:
|
||||
raise AssertionError(f"{r1.path} != {r2.path}")
|
||||
|
||||
|
||||
def query(r1, r2):
|
||||
assert r1.query == r2.query, "{} != {}".format(r1.query, r2.query)
|
||||
if r1.query != r2.query:
|
||||
raise AssertionError(f"{r1.query} != {r2.query}")
|
||||
|
||||
|
||||
def raw_body(r1, r2):
|
||||
assert read_body(r1) == read_body(r2)
|
||||
if read_body(r1) != read_body(r2):
|
||||
raise AssertionError
|
||||
|
||||
|
||||
def body(r1, r2):
|
||||
@@ -45,11 +53,13 @@ def body(r1, r2):
|
||||
r2_transformer = _get_transformer(r2)
|
||||
if transformer != r2_transformer:
|
||||
transformer = _identity
|
||||
assert transformer(read_body(r1)) == transformer(read_body(r2))
|
||||
if transformer(read_body(r1)) != transformer(read_body(r2)):
|
||||
raise AssertionError
|
||||
|
||||
|
||||
def headers(r1, r2):
|
||||
assert r1.headers == r2.headers, "{} != {}".format(r1.headers, r2.headers)
|
||||
if r1.headers != r2.headers:
|
||||
raise AssertionError(f"{r1.headers} != {r2.headers}")
|
||||
|
||||
|
||||
def _header_checker(value, header="Content-Type"):
|
||||
@@ -97,7 +107,7 @@ def _get_transformer(request):
|
||||
def requests_match(r1, r2, matchers):
|
||||
successes, failures = get_matchers_results(r1, r2, matchers)
|
||||
if failures:
|
||||
log.debug("Requests {} and {} differ.\n" "Failure details:\n" "{}".format(r1, r2, failures))
|
||||
log.debug(f"Requests {r1} and {r2} differ.\nFailure details:\n{failures}")
|
||||
return len(failures) == 0
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ It merges and deletes the request obsolete keys (protocol, host, port, path)
|
||||
into new 'uri' key.
|
||||
Usage::
|
||||
|
||||
python -m vcr.migration PATH
|
||||
python3 -m vcr.migration PATH
|
||||
|
||||
The PATH can be path to the directory with cassettes or cassette itself
|
||||
"""
|
||||
@@ -55,7 +55,7 @@ def build_uri(**parts):
|
||||
port = parts["port"]
|
||||
scheme = parts["protocol"]
|
||||
default_port = {"https": 443, "http": 80}[scheme]
|
||||
parts["port"] = ":{}".format(port) if port != default_port else ""
|
||||
parts["port"] = f":{port}" if port != default_port else ""
|
||||
return "{protocol}://{host}{port}{path}".format(**parts)
|
||||
|
||||
|
||||
@@ -118,7 +118,7 @@ def migrate(file_path, migration_fn):
|
||||
# because we assume that original files can be reverted
|
||||
# we will try to copy the content. (os.rename not needed)
|
||||
with tempfile.TemporaryFile(mode="w+") as out_fp:
|
||||
with open(file_path, "r") as in_fp:
|
||||
with open(file_path) as in_fp:
|
||||
if not migration_fn(in_fp, out_fp):
|
||||
return False
|
||||
with open(file_path, "w") as in_fp:
|
||||
@@ -138,7 +138,7 @@ def try_migrate(path):
|
||||
def main():
|
||||
if len(sys.argv) != 2:
|
||||
raise SystemExit(
|
||||
"Please provide path to cassettes directory or file. " "Usage: python -m vcr.migration PATH"
|
||||
"Please provide path to cassettes directory or file. " "Usage: python3 -m vcr.migration PATH"
|
||||
)
|
||||
|
||||
path = sys.argv[1]
|
||||
@@ -150,7 +150,7 @@ def main():
|
||||
for file_path in files:
|
||||
migrated = try_migrate(file_path)
|
||||
status = "OK" if migrated else "FAIL"
|
||||
sys.stderr.write("[{}] {}\n".format(status, file_path))
|
||||
sys.stderr.write(f"[{status}] {file_path}\n")
|
||||
sys.stderr.write("Done.\n")
|
||||
|
||||
|
||||
|
||||
@@ -186,9 +186,7 @@ class CassettePatcherBuilder:
|
||||
bases = (base_class,)
|
||||
if not issubclass(base_class, object): # Check for old style class
|
||||
bases += (object,)
|
||||
return type(
|
||||
"{}{}".format(base_class.__name__, self._cassette._path), bases, dict(cassette=self._cassette)
|
||||
)
|
||||
return type(f"{base_class.__name__}{self._cassette._path}", bases, dict(cassette=self._cassette))
|
||||
|
||||
@_build_patchers_from_mock_triples_decorator
|
||||
def _httplib(self):
|
||||
|
||||
@@ -5,17 +5,25 @@ from pathlib import Path
|
||||
from ..serialize import deserialize, serialize
|
||||
|
||||
|
||||
class CassetteNotFoundError(FileNotFoundError):
|
||||
pass
|
||||
|
||||
|
||||
class CassetteDecodeError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class FilesystemPersister:
|
||||
@classmethod
|
||||
def load_cassette(cls, cassette_path, serializer):
|
||||
cassette_path = Path(cassette_path) # if cassette path is already Path this is no operation
|
||||
if not cassette_path.is_file():
|
||||
raise ValueError("Cassette not found.")
|
||||
raise CassetteNotFoundError()
|
||||
try:
|
||||
with cassette_path.open() as f:
|
||||
data = f.read()
|
||||
except UnicodeEncodeError as err:
|
||||
raise ValueError("Can't read Cassette, Encoding is broken") from err
|
||||
except UnicodeDecodeError as err:
|
||||
raise CassetteDecodeError("Can't read Cassette, Encoding is broken") from err
|
||||
|
||||
return deserialize(data, serializer)
|
||||
|
||||
|
||||
@@ -90,7 +90,7 @@ class Request:
|
||||
return self.scheme
|
||||
|
||||
def __str__(self):
|
||||
return "<Request ({}) {}>".format(self.method, self.uri)
|
||||
return f"<Request ({self.method}) {self.uri}>"
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
@@ -93,6 +93,9 @@ class VCRHTTPResponse(HTTPResponse):
|
||||
def read(self, *args, **kwargs):
|
||||
return self._content.read(*args, **kwargs)
|
||||
|
||||
def read1(self, *args, **kwargs):
|
||||
return self._content.read1(*args, **kwargs)
|
||||
|
||||
def readall(self):
|
||||
return self._content.readall()
|
||||
|
||||
@@ -167,6 +170,13 @@ class VCRHTTPResponse(HTTPResponse):
|
||||
def drain_conn(self):
|
||||
pass
|
||||
|
||||
def stream(self, amt=65536, decode_content=None):
|
||||
while True:
|
||||
b = self._content.read(amt)
|
||||
yield b
|
||||
if not b:
|
||||
break
|
||||
|
||||
|
||||
class VCRConnection:
|
||||
# A reference to the cassette that's currently being patched in
|
||||
@@ -178,26 +188,26 @@ class VCRConnection:
|
||||
"""
|
||||
port = self.real_connection.port
|
||||
default_port = {"https": 443, "http": 80}[self._protocol]
|
||||
return ":{}".format(port) if port != default_port else ""
|
||||
return f":{port}" if port != default_port else ""
|
||||
|
||||
def _uri(self, url):
|
||||
"""Returns request absolute URI"""
|
||||
if url and not url.startswith("/"):
|
||||
# Then this must be a proxy request.
|
||||
return url
|
||||
uri = "{}://{}{}{}".format(self._protocol, self.real_connection.host, self._port_postfix(), url)
|
||||
uri = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}{url}"
|
||||
log.debug("Absolute URI: %s", uri)
|
||||
return uri
|
||||
|
||||
def _url(self, uri):
|
||||
"""Returns request selector url from absolute URI"""
|
||||
prefix = "{}://{}{}".format(self._protocol, self.real_connection.host, self._port_postfix())
|
||||
prefix = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}"
|
||||
return uri.replace(prefix, "", 1)
|
||||
|
||||
def request(self, method, url, body=None, headers=None, *args, **kwargs):
|
||||
"""Persist the request metadata in self._vcr_request"""
|
||||
self._vcr_request = Request(method=method, uri=self._uri(url), body=body, headers=headers or {})
|
||||
log.debug("Got {}".format(self._vcr_request))
|
||||
log.debug(f"Got {self._vcr_request}")
|
||||
|
||||
# Note: The request may not actually be finished at this point, so
|
||||
# I'm not sending the actual request until getresponse(). This
|
||||
@@ -213,7 +223,7 @@ class VCRConnection:
|
||||
of putheader() calls.
|
||||
"""
|
||||
self._vcr_request = Request(method=method, uri=self._uri(url), body="", headers={})
|
||||
log.debug("Got {}".format(self._vcr_request))
|
||||
log.debug(f"Got {self._vcr_request}")
|
||||
|
||||
def putheader(self, header, *values):
|
||||
self._vcr_request.headers[header] = values
|
||||
@@ -245,7 +255,7 @@ class VCRConnection:
|
||||
# Check to see if the cassette has a response for this request. If so,
|
||||
# then return it
|
||||
if self.cassette.can_play_response_for(self._vcr_request):
|
||||
log.info("Playing response for {} from cassette".format(self._vcr_request))
|
||||
log.info(f"Playing response for {self._vcr_request} from cassette")
|
||||
response = self.cassette.play_response(self._vcr_request)
|
||||
return VCRHTTPResponse(response)
|
||||
else:
|
||||
@@ -257,7 +267,7 @@ class VCRConnection:
|
||||
# Otherwise, we should send the request, then get the response
|
||||
# and return it.
|
||||
|
||||
log.info("{} not in cassette, sending to real server".format(self._vcr_request))
|
||||
log.info(f"{self._vcr_request} not in cassette, sending to real server")
|
||||
# This is imported here to avoid circular import.
|
||||
# TODO(@IvanMalison): Refactor to allow normal import.
|
||||
from vcr.patch import force_reset
|
||||
|
||||
@@ -261,7 +261,7 @@ def vcr_request(cassette, real_request):
|
||||
vcr_request = Request(method, str(request_url), data, _serialize_headers(headers))
|
||||
|
||||
if cassette.can_play_response_for(vcr_request):
|
||||
log.info("Playing response for {} from cassette".format(vcr_request))
|
||||
log.info(f"Playing response for {vcr_request} from cassette")
|
||||
response = play_responses(cassette, vcr_request, kwargs)
|
||||
for redirect in response.history:
|
||||
self._cookie_jar.update_cookies(redirect.cookies, redirect.url)
|
||||
|
||||
39
vcr/unittest.py
Normal file
39
vcr/unittest.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import inspect
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from .config import VCR
|
||||
|
||||
|
||||
class VCRMixin:
|
||||
"""A TestCase mixin that provides VCR integration."""
|
||||
|
||||
vcr_enabled = True
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
if self.vcr_enabled:
|
||||
kwargs = self._get_vcr_kwargs()
|
||||
myvcr = self._get_vcr(**kwargs)
|
||||
cm = myvcr.use_cassette(self._get_cassette_name())
|
||||
self.cassette = cm.__enter__()
|
||||
self.addCleanup(cm.__exit__, None, None, None)
|
||||
|
||||
def _get_vcr(self, **kwargs):
|
||||
if "cassette_library_dir" not in kwargs:
|
||||
kwargs["cassette_library_dir"] = self._get_cassette_library_dir()
|
||||
return VCR(**kwargs)
|
||||
|
||||
def _get_vcr_kwargs(self, **kwargs):
|
||||
return kwargs
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
testdir = os.path.dirname(inspect.getfile(self.__class__))
|
||||
return os.path.join(testdir, "cassettes")
|
||||
|
||||
def _get_cassette_name(self):
|
||||
return f"{self.__class__.__name__}.{self._testMethodName}.yaml"
|
||||
|
||||
|
||||
class VCRTestCase(VCRMixin, unittest.TestCase):
|
||||
pass
|
||||
@@ -1,9 +1,5 @@
|
||||
import types
|
||||
|
||||
try:
|
||||
from collections.abc import Mapping, MutableMapping
|
||||
except ImportError:
|
||||
from collections import Mapping, MutableMapping
|
||||
from collections.abc import Mapping, MutableMapping
|
||||
|
||||
|
||||
# Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py
|
||||
|
||||
Reference in New Issue
Block a user