mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-09 09:13:23 +00:00
Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d2281ab646 | ||
|
|
8b97fd6551 | ||
|
|
29e42211d7 | ||
|
|
6e511b67fd | ||
|
|
9b6cb1ce23 | ||
|
|
6a12bd1511 | ||
|
|
3411bedc06 | ||
|
|
438a65426b | ||
|
|
8c6b1fdf38 | ||
|
|
15e9f1868c | ||
|
|
7eb235cd9c | ||
|
|
d2f2731481 | ||
|
|
b2a895cb89 | ||
|
|
ffb2f44236 | ||
|
|
d66392a3fb | ||
|
|
b9cab239a7 | ||
|
|
276a41d9b6 | ||
|
|
7007e944ae | ||
|
|
bd112a2385 | ||
|
|
42848285a0 | ||
|
|
e3aae34ef7 | ||
|
|
f4316d2dae | ||
|
|
d613a814d3 | ||
|
|
ce234e503f | ||
|
|
3527d25ce8 | ||
|
|
dedb7ec403 | ||
|
|
59263d6025 | ||
|
|
2842cabec6 | ||
|
|
ad650a7ee1 | ||
|
|
9232915885 | ||
|
|
cbb540029f | ||
|
|
bf30d9a5e5 | ||
|
|
f06f71ece4 | ||
|
|
1070d417b3 | ||
|
|
46726a9a61 | ||
|
|
87db8e69ff | ||
|
|
52701ebca4 | ||
|
|
69679dc3fc | ||
|
|
c13f33b1e0 | ||
|
|
5476dd010c | ||
|
|
0add77d5ae | ||
|
|
96a6e91def | ||
|
|
3b41f0ede3 | ||
|
|
a79356cf5f |
22
.github/workflows/codespell.yml
vendored
Normal file
22
.github/workflows/codespell.yml
vendored
Normal file
@@ -0,0 +1,22 @@
|
||||
---
|
||||
name: Codespell
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master]
|
||||
pull_request:
|
||||
branches: [master]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
codespell:
|
||||
name: Check for spelling errors
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: Codespell
|
||||
uses: codespell-project/actions-codespell@v2
|
||||
16
.github/workflows/main.yml
vendored
16
.github/workflows/main.yml
vendored
@@ -25,11 +25,19 @@ jobs:
|
||||
|
||||
- name: Install project dependencies
|
||||
run: |
|
||||
pip install --upgrade pip
|
||||
pip install codecov tox tox-gh-actions
|
||||
pip3 install --upgrade pip
|
||||
pip3 install codecov tox tox-gh-actions
|
||||
|
||||
- name: Run tests with tox
|
||||
run: tox
|
||||
- name: Run online tests with tox
|
||||
run: tox -- -m online
|
||||
|
||||
- name: Run offline tests with tox with no access to the Internet
|
||||
run: |
|
||||
# We're using unshare to take Internet access
|
||||
# away from tox so that we'll notice whenever some new test
|
||||
# is missing @pytest.mark.online decoration in the future
|
||||
unshare --map-root-user --net -- \
|
||||
sh -c 'ip link set lo up; tox -- -m "not online"'
|
||||
|
||||
- name: Run coverage
|
||||
run: codecov
|
||||
|
||||
@@ -71,7 +71,7 @@ Finally, register your class with VCR to use your new serializer.
|
||||
|
||||
import vcr
|
||||
|
||||
class BogoSerializer(object):
|
||||
class BogoSerializer:
|
||||
"""
|
||||
Must implement serialize() and deserialize() methods
|
||||
"""
|
||||
|
||||
@@ -7,6 +7,17 @@ For a full list of triaged issues, bugs and PRs and what release they are target
|
||||
|
||||
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
|
||||
|
||||
- 4.4.0
|
||||
- HUGE thanks to @hartwork for all the work done on this release!
|
||||
- Bring vcr/unittest in to vcrpy as a full feature of vcr instead of a separate library. Big thanks to @hartwork for doing this and to @agriffis for originally creating the library
|
||||
- Make decompression robust towards already decompressed input (thanks @hartwork)
|
||||
- Bugfix: Add read1 method (fixes compatability with biopython), thanks @mghantous
|
||||
- Bugfix: Prevent filters from corrupting request (thanks @abramclark)
|
||||
- Bugfix: Add support for `response.raw.stream()` to fix urllib v2 compat
|
||||
- Bugfix: Replace `assert` with `raise AssertionError`: fixes support for `PYTHONOPTIMIZE=1`
|
||||
- Add pytest.mark.online to run test suite offline, thanks @jspricke
|
||||
- use python3 and pip3 binaries to ease debian packaging (thanks @hartwork)
|
||||
- Add codespell (thanks @mghantous)
|
||||
- 4.3.1
|
||||
- Support urllib3 v1 and v2. NOTE: there is an issue running urllib3 v2 on
|
||||
Python older than 3.10, so this is currently blocked in the requirements.
|
||||
|
||||
@@ -74,7 +74,7 @@ The PR reviewer is a second set of eyes to see if:
|
||||
**Release Manager:**
|
||||
- Ensure CI is passing.
|
||||
- Create a release on github and tag it with the changelog release notes.
|
||||
- ``python setup.py build sdist bdist_wheel``
|
||||
- ``python3 setup.py build sdist bdist_wheel``
|
||||
- ``twine upload dist/*``
|
||||
- Go to ReadTheDocs build page and trigger a build https://readthedocs.org/projects/vcrpy/builds/
|
||||
|
||||
@@ -127,7 +127,7 @@ in this example::
|
||||
eval "$(pyenv init -)"
|
||||
|
||||
# Setup your local system tox tooling
|
||||
pip install tox tox-pyenv
|
||||
pip3 install tox tox-pyenv
|
||||
|
||||
# Install supported versions (at time of writing), this does not activate them
|
||||
pyenv install 3.7.5 3.8.0 pypy3.8
|
||||
|
||||
@@ -4,7 +4,7 @@ Installation
|
||||
VCR.py is a package on `PyPI <https://pypi.python.org>`__, so you can install
|
||||
with pip::
|
||||
|
||||
pip install vcrpy
|
||||
pip3 install vcrpy
|
||||
|
||||
Compatibility
|
||||
-------------
|
||||
@@ -35,7 +35,7 @@ rebuilding pyyaml.
|
||||
|
||||
1. Test if pyyaml is built with libyaml. This should work::
|
||||
|
||||
python -c 'from yaml import CLoader'
|
||||
python3 -c 'from yaml import CLoader'
|
||||
|
||||
2. Install libyaml according to your Linux distribution, or using `Homebrew
|
||||
<http://mxcl.github.com/homebrew/>`__ on Mac::
|
||||
@@ -46,8 +46,8 @@ rebuilding pyyaml.
|
||||
|
||||
3. Rebuild pyyaml with libyaml::
|
||||
|
||||
pip uninstall pyyaml
|
||||
pip --no-cache-dir install pyyaml
|
||||
pip3 uninstall pyyaml
|
||||
pip3 --no-cache-dir install pyyaml
|
||||
|
||||
Upgrade
|
||||
-------
|
||||
@@ -61,7 +61,7 @@ is to simply delete your cassettes and re-record all of them. VCR.py
|
||||
also provides a migration script that attempts to upgrade your 0.x
|
||||
cassettes to the new 1.x format. To use it, run the following command::
|
||||
|
||||
python -m vcr.migration PATH
|
||||
python3 -m vcr.migration PATH
|
||||
|
||||
The PATH can be either a path to the directory with cassettes or the
|
||||
path to a single cassette.
|
||||
|
||||
@@ -4,11 +4,11 @@ Usage
|
||||
.. code:: python
|
||||
|
||||
import vcr
|
||||
import urllib
|
||||
import urllib.request
|
||||
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
||||
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
assert b'Example domains' in response
|
||||
|
||||
Run this test once, and VCR.py will record the HTTP request to
|
||||
``fixtures/vcr_cassettes/synopsis.yaml``. Run it again, and VCR.py will
|
||||
@@ -26,7 +26,7 @@ look like this:
|
||||
@vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml')
|
||||
def test_iana():
|
||||
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
assert b'Example domains' in response
|
||||
|
||||
When using the decorator version of ``use_cassette``, it is possible to
|
||||
omit the path to the cassette file.
|
||||
@@ -36,7 +36,7 @@ omit the path to the cassette file.
|
||||
@vcr.use_cassette()
|
||||
def test_iana():
|
||||
response = urllib.request.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
assert b'Example domains' in response
|
||||
|
||||
In this case, the cassette file will be given the same name as the test
|
||||
function, and it will be placed in the same directory as the file in
|
||||
@@ -92,9 +92,73 @@ all
|
||||
Unittest Integration
|
||||
--------------------
|
||||
|
||||
While it's possible to use the context manager or decorator forms with unittest,
|
||||
there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest
|
||||
<https://github.com/agriffis/vcrpy-unittest>`__.
|
||||
Inherit from ``VCRTestCase`` for automatic recording and playback of HTTP
|
||||
interactions.
|
||||
|
||||
.. code:: python
|
||||
|
||||
from vcr.unittest import VCRTestCase
|
||||
import requests
|
||||
|
||||
class MyTestCase(VCRTestCase):
|
||||
def test_something(self):
|
||||
response = requests.get('http://example.com')
|
||||
|
||||
Similar to how VCR.py returns the cassette from the context manager,
|
||||
``VCRTestCase`` makes the cassette available as ``self.cassette``:
|
||||
|
||||
.. code:: python
|
||||
|
||||
self.assertEqual(len(self.cassette), 1)
|
||||
self.assertEqual(self.cassette.requests[0].uri, 'http://example.com')
|
||||
|
||||
By default cassettes will be placed in the ``cassettes`` subdirectory next to the
|
||||
test, named according to the test class and method. For example, the above test
|
||||
would read from and write to ``cassettes/MyTestCase.test_something.yaml``
|
||||
|
||||
The configuration can be modified by overriding methods on your subclass:
|
||||
``_get_vcr_kwargs``, ``_get_cassette_library_dir`` and ``_get_cassette_name``.
|
||||
To modify the ``VCR`` object after instantiation, for example to add a matcher,
|
||||
you can hook on ``_get_vcr``, for example:
|
||||
|
||||
.. code:: python
|
||||
|
||||
class MyTestCase(VCRTestCase):
|
||||
def _get_vcr(self, **kwargs):
|
||||
myvcr = super(MyTestCase, self)._get_vcr(**kwargs)
|
||||
myvcr.register_matcher('mymatcher', mymatcher)
|
||||
myvcr.match_on = ['mymatcher']
|
||||
return myvcr
|
||||
|
||||
See
|
||||
`the source
|
||||
<https://github.com/kevin1024/vcrpy/blob/master/vcr/unittest.py>`__
|
||||
for the default implementations of these methods.
|
||||
|
||||
If you implement a ``setUp`` method on your test class then make sure to call
|
||||
the parent version ``super().setUp()`` in your own in order to continue getting
|
||||
the cassettes produced.
|
||||
|
||||
VCRMixin
|
||||
~~~~~~~~
|
||||
|
||||
In case inheriting from ``VCRTestCase`` is difficult because of an existing
|
||||
class hierarchy containing tests in the base classes, inherit from ``VCRMixin``
|
||||
instead.
|
||||
|
||||
.. code:: python
|
||||
|
||||
from vcr.unittest import VCRMixin
|
||||
import requests
|
||||
import unittest
|
||||
|
||||
class MyTestMixin(VCRMixin):
|
||||
def test_something(self):
|
||||
response = requests.get(self.url)
|
||||
|
||||
class MyTestCase(MyTestMixin, unittest.TestCase):
|
||||
url = 'http://example.com'
|
||||
|
||||
|
||||
Pytest Integration
|
||||
------------------
|
||||
|
||||
@@ -7,3 +7,14 @@ known_first_party = "vcrpy"
|
||||
multi_line_output = 3
|
||||
use_parentheses = true
|
||||
include_trailing_comma = true
|
||||
|
||||
[tool.codespell]
|
||||
skip = '.git,*.pdf,*.svg,.tox'
|
||||
ignore-regex = "\\\\[fnrstv]"
|
||||
#
|
||||
# ignore-words-list = ''
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
markers = [
|
||||
"online",
|
||||
]
|
||||
|
||||
@@ -4,4 +4,4 @@
|
||||
# If you are getting an INVOCATION ERROR for this script then there is
|
||||
# a good chance you are running on Windows.
|
||||
# You can and should use WSL for running tox on Windows when it calls bash scripts.
|
||||
REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` pytest $*
|
||||
REQUESTS_CA_BUNDLE=`python3 -m pytest_httpbin.certs` exec pytest "$@"
|
||||
|
||||
@@ -34,6 +34,7 @@ def post(url, output="text", **kwargs):
|
||||
return request("POST", url, output="text", **kwargs)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_status(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -46,6 +47,7 @@ def test_status(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize("auth", [None, aiohttp.BasicAuth("vcrpy", "test")])
|
||||
def test_headers(tmpdir, auth, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
@@ -63,6 +65,7 @@ def test_headers(tmpdir, auth, mockbin_request_url):
|
||||
assert "yarl.URL" not in cassette.data[0]
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_case_insensitive_headers(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -76,6 +79,7 @@ def test_case_insensitive_headers(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_text(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -88,6 +92,7 @@ def test_text(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_json(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -101,6 +106,7 @@ def test_json(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_binary(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url + "/image/png"
|
||||
with vcr.use_cassette(str(tmpdir.join("binary.yaml"))):
|
||||
@@ -112,6 +118,7 @@ def test_binary(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_stream(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
|
||||
@@ -124,6 +131,7 @@ def test_stream(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize("body", ["data", "json"])
|
||||
def test_post(tmpdir, body, caplog, mockbin_request_url):
|
||||
caplog.set_level(logging.INFO)
|
||||
@@ -149,6 +157,7 @@ def test_post(tmpdir, body, caplog, mockbin_request_url):
|
||||
), "Log message not found."
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url + "?d=d"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -164,6 +173,7 @@ def test_params(tmpdir, mockbin_request_url):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params_same_url_distinct_params(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -183,6 +193,7 @@ def test_params_same_url_distinct_params(tmpdir, mockbin_request_url):
|
||||
get(url, output="text", params=other_params)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params_on_url(tmpdir, mockbin_request_url):
|
||||
url = mockbin_request_url + "?a=1&b=foo"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -248,6 +259,7 @@ def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirect(tmpdir, mockbin):
|
||||
url = mockbin + "/redirect/302/2"
|
||||
|
||||
@@ -272,6 +284,7 @@ def test_redirect(tmpdir, mockbin):
|
||||
assert cassette_response.request_info.real_url == response.request_info.real_url
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_not_modified(tmpdir, mockbin):
|
||||
"""It doesn't try to redirect on 304"""
|
||||
url = mockbin + "/status/304"
|
||||
@@ -289,6 +302,7 @@ def test_not_modified(tmpdir, mockbin):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_double_requests(tmpdir, mockbin_request_url):
|
||||
"""We should capture, record, and replay all requests and response chains,
|
||||
even if there are duplicate ones.
|
||||
@@ -404,6 +418,7 @@ def test_cookies_redirect(httpbin_both, httpbin_ssl_context, tmpdir):
|
||||
run_in_loop(run)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_not_allow_redirects(tmpdir, mockbin):
|
||||
url = mockbin + "/redirect/308/5"
|
||||
path = str(tmpdir.join("redirects.yaml"))
|
||||
|
||||
@@ -7,6 +7,7 @@ import pytest
|
||||
import vcr
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_set_serializer_default_config(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(serializer="json")
|
||||
|
||||
@@ -20,6 +21,7 @@ def test_set_serializer_default_config(tmpdir, mockbin_request_url):
|
||||
assert json.loads(file_content)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_default_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join("subdir")))
|
||||
|
||||
@@ -29,6 +31,7 @@ def test_default_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
assert os.path.exists(str(tmpdir.join("subdir").join("test.json")))
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_override_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join("subdir")))
|
||||
|
||||
@@ -41,6 +44,7 @@ def test_override_set_cassette_library_dir(tmpdir, mockbin_request_url):
|
||||
assert not os.path.exists(str(tmpdir.join("subdir").join("test.json")))
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_override_match_on(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(match_on=["method"])
|
||||
|
||||
@@ -62,6 +66,7 @@ def test_missing_matcher():
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_dont_record_on_exception(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR(record_on_exception=False)
|
||||
|
||||
|
||||
@@ -6,10 +6,13 @@ import os
|
||||
import time
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_disk_saver_nowrite(tmpdir, mockbin_request_url):
|
||||
"""
|
||||
Ensure that when you close a cassette without changing it it doesn't
|
||||
@@ -30,6 +33,7 @@ def test_disk_saver_nowrite(tmpdir, mockbin_request_url):
|
||||
assert last_mod == last_mod2
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_disk_saver_write(tmpdir, mockbin_request_url):
|
||||
"""
|
||||
Ensure that when you close a cassette after changing it it does
|
||||
|
||||
@@ -142,3 +142,24 @@ def test_decompress_regular(tmpdir, httpbin):
|
||||
resp = urlopen(url).read()
|
||||
assert_cassette_has_one_response(cass)
|
||||
assert_is_json(resp)
|
||||
|
||||
|
||||
def test_before_record_request_corruption(tmpdir, httpbin):
|
||||
"""Modifying request in before_record_request should not affect outgoing request"""
|
||||
|
||||
def before_record(request):
|
||||
request.headers.clear()
|
||||
request.body = b""
|
||||
return request
|
||||
|
||||
req = Request(
|
||||
httpbin.url + "/post",
|
||||
data=urlencode({"test": "exists"}).encode(),
|
||||
headers={"X-Test": "exists"},
|
||||
)
|
||||
cass_file = str(tmpdir.join("modified_response.yaml"))
|
||||
with vcr.use_cassette(cass_file, before_record_request=before_record):
|
||||
resp = json.loads(urlopen(req).read())
|
||||
|
||||
assert resp["headers"]["X-Test"] == "exists"
|
||||
assert resp["form"]["test"] == "exists"
|
||||
|
||||
@@ -56,6 +56,7 @@ def test_response_headers(tmpdir, httpbin_both):
|
||||
assert set(headers) == set(resp.items())
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_effective_url(tmpdir):
|
||||
"""Ensure that the effective_url is captured"""
|
||||
url = "http://mockbin.org/redirect/301"
|
||||
|
||||
@@ -87,6 +87,7 @@ def yml(tmpdir, request):
|
||||
return str(tmpdir.join(request.function.__name__ + ".yaml"))
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_status(tmpdir, mockbin, do_request):
|
||||
url = mockbin
|
||||
|
||||
@@ -99,6 +100,7 @@ def test_status(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_case_insensitive_headers(tmpdir, mockbin, do_request):
|
||||
url = mockbin
|
||||
|
||||
@@ -112,6 +114,7 @@ def test_case_insensitive_headers(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_content(tmpdir, mockbin, do_request):
|
||||
url = mockbin
|
||||
|
||||
@@ -124,6 +127,7 @@ def test_content(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_json(tmpdir, mockbin, do_request):
|
||||
url = mockbin + "/request"
|
||||
|
||||
@@ -138,6 +142,7 @@ def test_json(tmpdir, mockbin, do_request):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_params_same_url_distinct_params(tmpdir, mockbin, do_request):
|
||||
url = mockbin + "/request"
|
||||
headers = {"Content-Type": "application/json"}
|
||||
@@ -158,6 +163,7 @@ def test_params_same_url_distinct_params(tmpdir, mockbin, do_request):
|
||||
do_request()("GET", url, params=params, headers=headers)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirect(mockbin, yml, do_request):
|
||||
url = mockbin + "/redirect/303/2"
|
||||
|
||||
@@ -184,6 +190,7 @@ def test_redirect(mockbin, yml, do_request):
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_work_with_gzipped_data(mockbin, do_request, yml):
|
||||
url = mockbin + "/gzip?foo=bar"
|
||||
headers = {"accept-encoding": "deflate, gzip"}
|
||||
@@ -199,6 +206,7 @@ def test_work_with_gzipped_data(mockbin, do_request, yml):
|
||||
assert cassette.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
@pytest.mark.parametrize("url", ["https://github.com/kevin1024/vcrpy/issues/" + str(i) for i in range(3, 6)])
|
||||
def test_simple_fetching(do_request, yml, url):
|
||||
with vcr.use_cassette(yml):
|
||||
@@ -231,6 +239,7 @@ def test_behind_proxy(do_request):
|
||||
assert cassette_response.request.url == response.request.url
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_cookies(tmpdir, mockbin, do_request):
|
||||
def client_cookies(client):
|
||||
return [c for c in client.client.cookies]
|
||||
@@ -268,6 +277,7 @@ def test_cookies(tmpdir, mockbin, do_request):
|
||||
assert client_cookies(new_client) == ["k1", "k2"]
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_relative_redirects(tmpdir, scheme, do_request, mockbin):
|
||||
redirect_kwargs = {HTTPX_REDIRECT_PARAM.name: True}
|
||||
|
||||
@@ -286,6 +296,7 @@ def test_relative_redirects(tmpdir, scheme, do_request, mockbin):
|
||||
assert cassette.play_count == 3
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirect_wo_allow_redirects(do_request, mockbin, yml):
|
||||
url = mockbin + "/redirect/308/5"
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
import vcr
|
||||
|
||||
|
||||
@@ -11,6 +13,7 @@ def false_matcher(r1, r2):
|
||||
return False
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_registered_true_matcher(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher("true", true_matcher)
|
||||
@@ -26,6 +29,7 @@ def test_registered_true_matcher(tmpdir, mockbin_request_url):
|
||||
urlopen(mockbin_request_url)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_registered_false_matcher(tmpdir, mockbin_request_url):
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher("false", false_matcher)
|
||||
|
||||
@@ -10,7 +10,7 @@ import vcr
|
||||
from vcr.persisters.filesystem import FilesystemPersister
|
||||
|
||||
|
||||
class CustomFilesystemPersister(object):
|
||||
class CustomFilesystemPersister:
|
||||
"""Behaves just like default FilesystemPersister but adds .test extension
|
||||
to the cassette file"""
|
||||
|
||||
|
||||
@@ -144,6 +144,17 @@ def test_redirects(tmpdir, httpbin_both):
|
||||
assert cass.play_count == 2
|
||||
|
||||
|
||||
def test_raw_stream(tmpdir, httpbin):
|
||||
expected_response = requests.get(httpbin.url, stream=True)
|
||||
expected_content = b"".join(expected_response.raw.stream())
|
||||
|
||||
for _ in range(2): # one for recording, one for cassette reply
|
||||
with vcr.use_cassette(str(tmpdir.join("raw_stream.yaml"))):
|
||||
actual_response = requests.get(httpbin.url, stream=True)
|
||||
actual_content = b"".join(actual_response.raw.stream())
|
||||
assert actual_content == expected_content
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
|
||||
"""Ensure that requests between schemes are treated separately"""
|
||||
# First fetch a url under http, and then again under https and then
|
||||
@@ -156,20 +167,40 @@ def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
def test_gzip(tmpdir, httpbin_both):
|
||||
def test_gzip__decode_compressed_response_false(tmpdir, httpbin_both):
|
||||
"""
|
||||
Ensure that requests (actually urllib3) is able to automatically decompress
|
||||
the response body
|
||||
"""
|
||||
for _ in range(2): # one for recording, one for re-playing
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
response = requests.get(httpbin_both + "/gzip")
|
||||
assert response.headers["content-encoding"] == "gzip" # i.e. not removed
|
||||
assert_is_json(response.content) # i.e. uncompressed bytes
|
||||
|
||||
|
||||
def test_gzip__decode_compressed_response_true(tmpdir, httpbin_both):
|
||||
url = httpbin_both + "/gzip"
|
||||
response = requests.get(url)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
response = requests.get(url)
|
||||
assert_is_json(response.content)
|
||||
expected_response = requests.get(url)
|
||||
expected_content = expected_response.content
|
||||
assert expected_response.headers["content-encoding"] == "gzip" # self-test
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))):
|
||||
assert_is_json(response.content)
|
||||
with vcr.use_cassette(
|
||||
str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True
|
||||
) as cassette:
|
||||
r = requests.get(url)
|
||||
assert r.headers["content-encoding"] == "gzip" # i.e. not removed
|
||||
assert r.content == expected_content
|
||||
|
||||
# Has the cassette body been decompressed?
|
||||
cassette_response_body = cassette.responses[0]["body"]["string"]
|
||||
assert isinstance(cassette_response_body, str)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True):
|
||||
r = requests.get(url)
|
||||
assert "content-encoding" not in r.headers # i.e. removed
|
||||
assert r.content == expected_content
|
||||
|
||||
|
||||
def test_session_and_connection_close(tmpdir, httpbin):
|
||||
|
||||
@@ -7,6 +7,7 @@ from urllib.request import urlopen
|
||||
|
||||
import pytest_httpbin.certs
|
||||
from assertions import assert_cassette_has_one_response
|
||||
from pytest import mark
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
@@ -56,6 +57,7 @@ def test_response_headers(httpbin_both, tmpdir):
|
||||
assert sorted(open1) == sorted(open2)
|
||||
|
||||
|
||||
@mark.online
|
||||
def test_effective_url(tmpdir):
|
||||
"""Ensure that the effective_url is captured"""
|
||||
url = "http://mockbin.org/redirect/301"
|
||||
|
||||
@@ -97,6 +97,7 @@ def test_post(tmpdir, httpbin_both, verify_pool_mgr):
|
||||
assert req1 == req2
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_redirects(tmpdir, verify_pool_mgr):
|
||||
"""Ensure that we can handle redirects"""
|
||||
url = "http://mockbin.org/redirect/301"
|
||||
|
||||
@@ -52,6 +52,7 @@ def test_flickr_multipart_upload(httpbin, tmpdir):
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_flickr_should_respond_with_200(tmpdir):
|
||||
testfile = str(tmpdir.join("flickr.yml"))
|
||||
with vcr.use_cassette(testfile):
|
||||
@@ -70,6 +71,7 @@ def test_cookies(tmpdir, httpbin):
|
||||
assert sorted(r2.json()["cookies"].keys()) == ["k1", "k2"]
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_amazon_doctype(tmpdir):
|
||||
# amazon gzips its homepage. For some reason, in requests 2.7, it's not
|
||||
# getting gunzipped.
|
||||
|
||||
@@ -298,6 +298,18 @@ def test_decode_response_deflate():
|
||||
assert decoded_response["headers"]["content-length"] == [str(len(body))]
|
||||
|
||||
|
||||
def test_decode_response_deflate_already_decompressed():
|
||||
body = b"deflate message"
|
||||
gzip_response = {
|
||||
"body": {"string": body},
|
||||
"headers": {
|
||||
"content-encoding": ["deflate"],
|
||||
},
|
||||
}
|
||||
decoded_response = decode_response(gzip_response)
|
||||
assert decoded_response["body"]["string"] == body
|
||||
|
||||
|
||||
def test_decode_response_gzip():
|
||||
body = b"gzip message"
|
||||
|
||||
@@ -325,3 +337,15 @@ def test_decode_response_gzip():
|
||||
decoded_response = decode_response(gzip_response)
|
||||
assert decoded_response["body"]["string"] == body
|
||||
assert decoded_response["headers"]["content-length"] == [str(len(body))]
|
||||
|
||||
|
||||
def test_decode_response_gzip_already_decompressed():
|
||||
body = b"gzip message"
|
||||
gzip_response = {
|
||||
"body": {"string": body},
|
||||
"headers": {
|
||||
"content-encoding": ["gzip"],
|
||||
},
|
||||
}
|
||||
decoded_response = decode_response(gzip_response)
|
||||
assert decoded_response["body"]["string"] == body
|
||||
|
||||
@@ -93,7 +93,7 @@ def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
|
||||
},
|
||||
}
|
||||
vcr_response = VCRHTTPResponse(recorded_response)
|
||||
handle = io.TextIOWrapper(io.BufferedReader(vcr_response), encoding="utf-8")
|
||||
handle = io.TextIOWrapper(vcr_response, encoding="utf-8")
|
||||
handle = iter(handle)
|
||||
articles = [line for line in handle]
|
||||
assert len(articles) > 1
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from unittest import mock
|
||||
|
||||
from pytest import mark
|
||||
|
||||
from vcr import mode
|
||||
from vcr.cassette import Cassette
|
||||
from vcr.stubs import VCRHTTPSConnection
|
||||
@@ -11,6 +13,7 @@ class TestVCRConnection:
|
||||
vcr_connection.ssl_version = "example_ssl_version"
|
||||
assert vcr_connection.real_connection.ssl_version == "example_ssl_version"
|
||||
|
||||
@mark.online
|
||||
@mock.patch("vcr.cassette.Cassette.can_play_response_for", return_value=False)
|
||||
def testing_connect(*args):
|
||||
vcr_connection = VCRHTTPSConnection("www.google.com")
|
||||
|
||||
199
tests/unit/test_unittest.py
Normal file
199
tests/unit/test_unittest.py
Normal file
@@ -0,0 +1,199 @@
|
||||
import os
|
||||
from unittest import TextTestRunner, defaultTestLoader
|
||||
from unittest.mock import MagicMock
|
||||
from urllib.request import urlopen
|
||||
|
||||
import pytest
|
||||
|
||||
from vcr.unittest import VCRTestCase
|
||||
|
||||
|
||||
def test_defaults():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
expected_path = os.path.join(os.path.dirname(__file__), "cassettes")
|
||||
expected_name = "MyTest.test_foo.yaml"
|
||||
assert os.path.dirname(test.cassette._path) == expected_path
|
||||
assert os.path.basename(test.cassette._path) == expected_name
|
||||
|
||||
|
||||
def test_disabled():
|
||||
# Baseline vcr_enabled = True
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert hasattr(test, "cassette")
|
||||
|
||||
# Test vcr_enabled = False
|
||||
class MyTest(VCRTestCase):
|
||||
vcr_enabled = False
|
||||
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert not hasattr(test, "cassette")
|
||||
|
||||
|
||||
def test_cassette_library_dir():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
return "/testing"
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette._path.startswith("/testing/")
|
||||
|
||||
|
||||
def test_cassette_name():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_cassette_name(self):
|
||||
return "my-custom-name"
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert os.path.basename(test.cassette._path) == "my-custom-name"
|
||||
|
||||
|
||||
def test_vcr_kwargs_overridden():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
kwargs = super()._get_vcr_kwargs()
|
||||
kwargs["record_mode"] = "new_episodes"
|
||||
return kwargs
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette.record_mode == "new_episodes"
|
||||
|
||||
|
||||
def test_vcr_kwargs_passed():
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
return super()._get_vcr_kwargs(
|
||||
record_mode="new_episodes",
|
||||
)
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette.record_mode == "new_episodes"
|
||||
|
||||
|
||||
def test_vcr_kwargs_cassette_dir():
|
||||
# Test that _get_cassette_library_dir applies if cassette_library_dir
|
||||
# is absent from vcr kwargs.
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
return dict(
|
||||
record_mode="new_episodes",
|
||||
)
|
||||
|
||||
_get_cassette_library_dir = MagicMock(return_value="/testing")
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette._path.startswith("/testing/")
|
||||
assert test._get_cassette_library_dir.call_count == 1
|
||||
|
||||
# Test that _get_cassette_library_dir is ignored if cassette_library_dir
|
||||
# is present in vcr kwargs.
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
|
||||
def _get_vcr_kwargs(self):
|
||||
return dict(
|
||||
cassette_library_dir="/testing",
|
||||
)
|
||||
|
||||
_get_cassette_library_dir = MagicMock(return_value="/ignored")
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette._path.startswith("/testing/")
|
||||
assert test._get_cassette_library_dir.call_count == 0
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_get_vcr_with_matcher(tmpdir):
|
||||
cassette_dir = tmpdir.mkdir("cassettes")
|
||||
assert len(cassette_dir.listdir()) == 0
|
||||
|
||||
mock_matcher = MagicMock(return_value=True, __name__="MockMatcher")
|
||||
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
self.response = urlopen("http://example.com").read()
|
||||
|
||||
def _get_vcr(self):
|
||||
myvcr = super()._get_vcr()
|
||||
myvcr.register_matcher("mymatcher", mock_matcher)
|
||||
myvcr.match_on = ["mymatcher"]
|
||||
return myvcr
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
return str(cassette_dir)
|
||||
|
||||
# First run to fill cassette.
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert len(test.cassette.requests) == 1
|
||||
assert not mock_matcher.called # nothing in cassette
|
||||
|
||||
# Second run to call matcher.
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert len(test.cassette.requests) == 1
|
||||
assert mock_matcher.called
|
||||
assert (
|
||||
repr(mock_matcher.mock_calls[0])
|
||||
== "call(<Request (GET) http://example.com>, <Request (GET) http://example.com>)"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.online
|
||||
def test_testcase_playback(tmpdir):
|
||||
cassette_dir = tmpdir.mkdir("cassettes")
|
||||
assert len(cassette_dir.listdir()) == 0
|
||||
|
||||
# First test actually reads from the web.
|
||||
|
||||
class MyTest(VCRTestCase):
|
||||
def test_foo(self):
|
||||
self.response = urlopen("http://example.com").read()
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
return str(cassette_dir)
|
||||
|
||||
test = run_testcase(MyTest)[0][0]
|
||||
assert b"illustrative examples" in test.response
|
||||
assert len(test.cassette.requests) == 1
|
||||
assert test.cassette.play_count == 0
|
||||
|
||||
# Second test reads from cassette.
|
||||
|
||||
test2 = run_testcase(MyTest)[0][0]
|
||||
assert test.cassette is not test2.cassette
|
||||
assert b"illustrative examples" in test.response
|
||||
assert len(test2.cassette.requests) == 1
|
||||
assert test2.cassette.play_count == 1
|
||||
|
||||
|
||||
def run_testcase(testcase_class):
|
||||
"""Run all the tests in a TestCase and return them."""
|
||||
suite = defaultTestLoader.loadTestsFromTestCase(testcase_class)
|
||||
tests = list(suite._tests)
|
||||
result = TextTestRunner().run(suite)
|
||||
return tests, result
|
||||
@@ -4,7 +4,7 @@ from logging import NullHandler
|
||||
from .config import VCR
|
||||
from .record_mode import RecordMode as mode # noqa import is not used in this file
|
||||
|
||||
__version__ = "4.3.1"
|
||||
__version__ = "4.4.0"
|
||||
|
||||
logging.getLogger(__name__).addHandler(NullHandler())
|
||||
|
||||
|
||||
@@ -219,7 +219,7 @@ class VCR:
|
||||
filter_functions.extend(before_record_request)
|
||||
|
||||
def before_record_request(request):
|
||||
request = copy.copy(request)
|
||||
request = copy.deepcopy(request)
|
||||
for function in filter_functions:
|
||||
if request is None:
|
||||
break
|
||||
|
||||
@@ -153,9 +153,15 @@ def decode_response(response):
|
||||
if not body:
|
||||
return ""
|
||||
if encoding == "gzip":
|
||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||
try:
|
||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||
except zlib.error:
|
||||
return body # assumes that the data was already decompressed
|
||||
else: # encoding == 'deflate'
|
||||
return zlib.decompress(body)
|
||||
try:
|
||||
return zlib.decompress(body)
|
||||
except zlib.error:
|
||||
return body # assumes that the data was already decompressed
|
||||
|
||||
# Deepcopy here in case `headers` contain objects that could
|
||||
# be mutated by a shallow copy and corrupt the real response.
|
||||
|
||||
@@ -9,35 +9,43 @@ log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def method(r1, r2):
|
||||
assert r1.method == r2.method, "{} != {}".format(r1.method, r2.method)
|
||||
if r1.method != r2.method:
|
||||
raise AssertionError("{} != {}".format(r1.method, r2.method))
|
||||
|
||||
|
||||
def uri(r1, r2):
|
||||
assert r1.uri == r2.uri, "{} != {}".format(r1.uri, r2.uri)
|
||||
if r1.uri != r2.uri:
|
||||
raise AssertionError("{} != {}".format(r1.uri, r2.uri))
|
||||
|
||||
|
||||
def host(r1, r2):
|
||||
assert r1.host == r2.host, "{} != {}".format(r1.host, r2.host)
|
||||
if r1.host != r2.host:
|
||||
raise AssertionError("{} != {}".format(r1.host, r2.host))
|
||||
|
||||
|
||||
def scheme(r1, r2):
|
||||
assert r1.scheme == r2.scheme, "{} != {}".format(r1.scheme, r2.scheme)
|
||||
if r1.scheme != r2.scheme:
|
||||
raise AssertionError("{} != {}".format(r1.scheme, r2.scheme))
|
||||
|
||||
|
||||
def port(r1, r2):
|
||||
assert r1.port == r2.port, "{} != {}".format(r1.port, r2.port)
|
||||
if r1.port != r2.port:
|
||||
raise AssertionError("{} != {}".format(r1.port, r2.port))
|
||||
|
||||
|
||||
def path(r1, r2):
|
||||
assert r1.path == r2.path, "{} != {}".format(r1.path, r2.path)
|
||||
if r1.path != r2.path:
|
||||
raise AssertionError("{} != {}".format(r1.path, r2.path))
|
||||
|
||||
|
||||
def query(r1, r2):
|
||||
assert r1.query == r2.query, "{} != {}".format(r1.query, r2.query)
|
||||
if r1.query != r2.query:
|
||||
raise AssertionError("{} != {}".format(r1.query, r2.query))
|
||||
|
||||
|
||||
def raw_body(r1, r2):
|
||||
assert read_body(r1) == read_body(r2)
|
||||
if read_body(r1) != read_body(r2):
|
||||
raise AssertionError
|
||||
|
||||
|
||||
def body(r1, r2):
|
||||
@@ -45,11 +53,13 @@ def body(r1, r2):
|
||||
r2_transformer = _get_transformer(r2)
|
||||
if transformer != r2_transformer:
|
||||
transformer = _identity
|
||||
assert transformer(read_body(r1)) == transformer(read_body(r2))
|
||||
if transformer(read_body(r1)) != transformer(read_body(r2)):
|
||||
raise AssertionError
|
||||
|
||||
|
||||
def headers(r1, r2):
|
||||
assert r1.headers == r2.headers, "{} != {}".format(r1.headers, r2.headers)
|
||||
if r1.headers != r2.headers:
|
||||
raise AssertionError("{} != {}".format(r1.headers, r2.headers))
|
||||
|
||||
|
||||
def _header_checker(value, header="Content-Type"):
|
||||
|
||||
@@ -7,7 +7,7 @@ It merges and deletes the request obsolete keys (protocol, host, port, path)
|
||||
into new 'uri' key.
|
||||
Usage::
|
||||
|
||||
python -m vcr.migration PATH
|
||||
python3 -m vcr.migration PATH
|
||||
|
||||
The PATH can be path to the directory with cassettes or cassette itself
|
||||
"""
|
||||
@@ -138,7 +138,7 @@ def try_migrate(path):
|
||||
def main():
|
||||
if len(sys.argv) != 2:
|
||||
raise SystemExit(
|
||||
"Please provide path to cassettes directory or file. " "Usage: python -m vcr.migration PATH"
|
||||
"Please provide path to cassettes directory or file. " "Usage: python3 -m vcr.migration PATH"
|
||||
)
|
||||
|
||||
path = sys.argv[1]
|
||||
|
||||
@@ -93,6 +93,9 @@ class VCRHTTPResponse(HTTPResponse):
|
||||
def read(self, *args, **kwargs):
|
||||
return self._content.read(*args, **kwargs)
|
||||
|
||||
def read1(self, *args, **kwargs):
|
||||
return self._content.read1(*args, **kwargs)
|
||||
|
||||
def readall(self):
|
||||
return self._content.readall()
|
||||
|
||||
@@ -167,6 +170,13 @@ class VCRHTTPResponse(HTTPResponse):
|
||||
def drain_conn(self):
|
||||
pass
|
||||
|
||||
def stream(self, amt=65536, decode_content=None):
|
||||
while True:
|
||||
b = self._content.read(amt)
|
||||
yield b
|
||||
if not b:
|
||||
break
|
||||
|
||||
|
||||
class VCRConnection:
|
||||
# A reference to the cassette that's currently being patched in
|
||||
|
||||
39
vcr/unittest.py
Normal file
39
vcr/unittest.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import inspect
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from .config import VCR
|
||||
|
||||
|
||||
class VCRMixin:
|
||||
"""A TestCase mixin that provides VCR integration."""
|
||||
|
||||
vcr_enabled = True
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
if self.vcr_enabled:
|
||||
kwargs = self._get_vcr_kwargs()
|
||||
myvcr = self._get_vcr(**kwargs)
|
||||
cm = myvcr.use_cassette(self._get_cassette_name())
|
||||
self.cassette = cm.__enter__()
|
||||
self.addCleanup(cm.__exit__, None, None, None)
|
||||
|
||||
def _get_vcr(self, **kwargs):
|
||||
if "cassette_library_dir" not in kwargs:
|
||||
kwargs["cassette_library_dir"] = self._get_cassette_library_dir()
|
||||
return VCR(**kwargs)
|
||||
|
||||
def _get_vcr_kwargs(self, **kwargs):
|
||||
return kwargs
|
||||
|
||||
def _get_cassette_library_dir(self):
|
||||
testdir = os.path.dirname(inspect.getfile(self.__class__))
|
||||
return os.path.join(testdir, "cassettes")
|
||||
|
||||
def _get_cassette_name(self):
|
||||
return "{0}.{1}.yaml".format(self.__class__.__name__, self._testMethodName)
|
||||
|
||||
|
||||
class VCRTestCase(VCRMixin, unittest.TestCase):
|
||||
pass
|
||||
Reference in New Issue
Block a user