1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-11 01:53:01 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Luiz Menezes
1b474c0510 Fix deprecation warnings 2019-04-06 00:37:36 -03:00
43 changed files with 252 additions and 1114 deletions

View File

@@ -1,7 +0,0 @@
coverage:
status:
project:
default:
target: 75
# Allow 0% coverage regression
threshold: 0

2
.gitignore vendored
View File

@@ -6,8 +6,6 @@ build/
dist/ dist/
*.egg/ *.egg/
.coverage .coverage
coverage.xml
htmlcov/
*.egg-info/ *.egg-info/
pytestdebug.log pytestdebug.log

View File

@@ -1,15 +1,16 @@
language: python language: python
sudo: false
before_install: openssl version before_install: openssl version
env: env:
# global: global:
# - secure: AifoKzwhjV94cmcQZrdQmqRu/9rkZZvWpwBv1daeAQpLOKFPGsOm3D+x2cSw9+iCfkgDZDfqQVv1kCaFVxTll8v8jTq5SJdqEY0NmGWbj/UkNtShh609oRDsuzLxAEwtVKYjf/h8K2BRea+bl1tGkwZ2vtmYS6dxNlAijjWOfds= - secure: AifoKzwhjV94cmcQZrdQmqRu/9rkZZvWpwBv1daeAQpLOKFPGsOm3D+x2cSw9+iCfkgDZDfqQVv1kCaFVxTll8v8jTq5SJdqEY0NmGWbj/UkNtShh609oRDsuzLxAEwtVKYjf/h8K2BRea+bl1tGkwZ2vtmYS6dxNlAijjWOfds=
# - secure: LBSEg/gMj4u4Hrpo3zs6Y/1mTpd2RtcN49mZIFgTdbJ9IhpiNPqcEt647Lz94F9Eses2x2WbNuKqZKZZReY7QLbEzU1m0nN5jlaKrjcG5NR5clNABfFFyhgc0jBikyS4abAG8jc2efeaTrFuQwdoF4sE8YiVrkiVj2X5Xoi6sBk= - secure: LBSEg/gMj4u4Hrpo3zs6Y/1mTpd2RtcN49mZIFgTdbJ9IhpiNPqcEt647Lz94F9Eses2x2WbNuKqZKZZReY7QLbEzU1m0nN5jlaKrjcG5NR5clNABfFFyhgc0jBikyS4abAG8jc2efeaTrFuQwdoF4sE8YiVrkiVj2X5Xoi6sBk=
matrix: matrix:
- TOX_SUFFIX="flakes" - TOX_SUFFIX="flakes"
- TOX_SUFFIX="requests" - TOX_SUFFIX="requests27"
- TOX_SUFFIX="httplib2" - TOX_SUFFIX="httplib2"
- TOX_SUFFIX="boto3" - TOX_SUFFIX="boto3"
- TOX_SUFFIX="urllib3" - TOX_SUFFIX="urllib3121"
- TOX_SUFFIX="tornado4" - TOX_SUFFIX="tornado4"
- TOX_SUFFIX="aiohttp" - TOX_SUFFIX="aiohttp"
matrix: matrix:
@@ -17,26 +18,37 @@ matrix:
- env: TOX_SUFFIX="flakes" - env: TOX_SUFFIX="flakes"
python: 3.7 python: 3.7
dist: xenial dist: xenial
- env: TOX_SUFFIX="requests" sudo: true
- env: TOX_SUFFIX="requests27"
python: 3.7 python: 3.7
dist: xenial dist: xenial
sudo: true
- env: TOX_SUFFIX="httplib2" - env: TOX_SUFFIX="httplib2"
python: 3.7 python: 3.7
dist: xenial dist: xenial
- env: TOX_SUFFIX="urllib3" sudo: true
- env: TOX_SUFFIX="urllib3121"
python: 3.7 python: 3.7
dist: xenial dist: xenial
sudo: true
- env: TOX_SUFFIX="tornado4" - env: TOX_SUFFIX="tornado4"
python: 3.7 python: 3.7
dist: xenial dist: xenial
sudo: true
- env: TOX_SUFFIX="aiohttp" - env: TOX_SUFFIX="aiohttp"
python: 3.7 python: 3.7
dist: xenial dist: xenial
sudo: true
allow_failures: allow_failures:
- env: TOX_SUFFIX="boto3"
- env: TOX_SUFFIX="aiohttp" - env: TOX_SUFFIX="aiohttp"
python: "pypy3.5-5.9.0" python: "pypy3.5-5.9.0"
- env: TOX_SUFFIX="aiohttp"
python: 3.4
exclude: exclude:
# Only run flakes on a single Python 2.x and a single 3.x # Only run flakes on a single Python 2.x and a single 3.x
- env: TOX_SUFFIX="flakes"
python: 3.4
- env: TOX_SUFFIX="flakes" - env: TOX_SUFFIX="flakes"
python: 3.5 python: 3.5
- env: TOX_SUFFIX="flakes" - env: TOX_SUFFIX="flakes"
@@ -49,14 +61,13 @@ matrix:
python: pypy python: pypy
python: python:
- 2.7 - 2.7
- 3.4
- 3.5 - 3.5
- 3.6 - 3.6
- pypy - pypy
- "pypy3.5-5.9.0" - "pypy3.5-5.9.0"
install: install:
- pip install tox-travis codecov - pip install tox-travis
- if [[ $TOX_SUFFIX != 'flakes' ]]; then python setup.py install ; fi - if [[ $TOX_SUFFIX != 'flakes' ]]; then python setup.py install ; fi
script: script:
- tox -e "${TOX_SUFFIX}" - tox -e "${TOX_SUFFIX}"
after_success:
- codecov

View File

@@ -1,4 +1,4 @@
|PyPI| |Python versions| |Build Status| |CodeCov| |Gitter| |PyPI| |Python versions| |Build Status| |Waffle Ready| |Gitter|
VCR.py VCR.py
====== ======
@@ -53,10 +53,8 @@ more details
:target: https://pypi.python.org/pypi/vcrpy :target: https://pypi.python.org/pypi/vcrpy
.. |Build Status| image:: https://secure.travis-ci.org/kevin1024/vcrpy.svg?branch=master .. |Build Status| image:: https://secure.travis-ci.org/kevin1024/vcrpy.svg?branch=master
:target: http://travis-ci.org/kevin1024/vcrpy :target: http://travis-ci.org/kevin1024/vcrpy
.. |Waffle Ready| image:: https://badge.waffle.io/kevin1024/vcrpy.svg?label=ready&title=waffle
:target: https://waffle.io/kevin1024/vcrpy
.. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg .. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg
:alt: Join the chat at https://gitter.im/kevin1024/vcrpy :alt: Join the chat at https://gitter.im/kevin1024/vcrpy
:target: https://gitter.im/kevin1024/vcrpy?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge :target: https://gitter.im/kevin1024/vcrpy?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
.. |CodeCov| image:: https://codecov.io/gh/kevin1024/vcrpy/branch/master/graph/badge.svg
:target: https://codecov.io/gh/kevin1024/vcrpy
:alt: Code Coverage Status

View File

@@ -97,12 +97,8 @@ Create your own method with the following signature
def my_matcher(r1, r2): def my_matcher(r1, r2):
Your method receives the two requests and can return : Your method receives the two requests and must return ``True`` if they
match, ``False`` if they don't.
- Use an ``assert`` statement in the matcher, then we have ``None`` if they match, raise an `AssertionError`` if they don't.
- A boolean, ``True`` if they match, ``False`` if they don't.
Note : You should use an ``assert`` statement in order to have feedback when a matcher is failing.
Finally, register your method with VCR to use your new request matcher. Finally, register your method with VCR to use your new request matcher.
@@ -111,7 +107,7 @@ Finally, register your method with VCR to use your new request matcher.
import vcr import vcr
def jurassic_matcher(r1, r2): def jurassic_matcher(r1, r2):
assert r1.uri == r2.uri and 'JURASSIC PARK' in r1.body return r1.uri == r2.uri and 'JURASSIC PARK' in r1.body
my_vcr = vcr.VCR() my_vcr = vcr.VCR()
my_vcr.register_matcher('jurassic', jurassic_matcher) my_vcr.register_matcher('jurassic', jurassic_matcher)
@@ -372,16 +368,3 @@ cassette names, use ``VCR.ensure_suffix`` as follows:
@my_vcr.use_cassette @my_vcr.use_cassette
def my_test_function(): def my_test_function():
Rewind Cassette
---------------
VCR.py allows to rewind a cassette in order to replay it inside the same function/test.
.. code:: python
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml') as cass:
response = urllib2.urlopen('http://www.zombo.com/').read()
assert cass.all_played
a.rewind()
assert not cass.all_played

View File

@@ -1,17 +1,5 @@
Changelog Changelog
--------- ---------
- 2.1.1 (UNRELEASED)
- 2.1.0 - Add a `rewind` method to reset a cassette (thanks @khamidou)
New error message with more details on why the cassette failed to play a request (thanks @arthurHamon2, @neozenith)
Handle connect tunnel URI (thanks @jeking3)
Add code coverage to the project (thanks @neozenith)
Drop support to python 3.4
Add deprecation warning on python 2.7, next major release will drop python 2.7 support
Fix build problems on requests tests (thanks to @dunossauro)
Fix matching on 'body' failing when Unicode symbols are present in them (thanks @valgur)
Fix bugs on aiohttp integration (thanks @graingert, @steinnes, @stj, @lamenezes, @lmazuel)
Fix Biopython incompatibility (thanks @rishab121)
Fix Boto3 integration (thanks @1oglop1, @arthurHamon2)
- 2.0.1 - Fix bug when using vcrpy with python 3.4 - 2.0.1 - Fix bug when using vcrpy with python 3.4
- 2.0.0 - Support python 3.7 (fix httplib2 and urllib2, thanks @felixonmars) - 2.0.0 - Support python 3.7 (fix httplib2 and urllib2, thanks @felixonmars)
[#356] Fixes `before_record_response` so the original response isn't changed (thanks @kgraves) [#356] Fixes `before_record_response` so the original response isn't changed (thanks @kgraves)

View File

@@ -11,10 +11,7 @@ yourself using `py.test <http://pytest.org/>`__ and
all environments VCR.py supports. The test suite is pretty big and slow, all environments VCR.py supports. The test suite is pretty big and slow,
but you can tell tox to only run specific tests like this:: but you can tell tox to only run specific tests like this::
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through> tox -e py27requests -- -v -k "'test_status_code or test_gzip'"
tox -e py27-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py37-requests -- -v --last-failed
This will run only tests that look like ``test_status_code`` or This will run only tests that look like ``test_status_code`` or
``test_gzip`` in the test suite, and only in the python 2.7 environment ``test_gzip`` in the test suite, and only in the python 2.7 environment
@@ -26,24 +23,3 @@ documentation <https://boto.readthedocs.io/en/latest/getting_started.html>`__
for how to set this up. I have marked the boto tests as optional in for how to set this up. I have marked the boto tests as optional in
Travis so you don't have to worry about them failing if you submit a Travis so you don't have to worry about them failing if you submit a
pull request. pull request.
Troubleshooting on MacOSX
-------------------------
If you have this kind of error when running tox :
.. code:: python
__main__.ConfigurationError: Curl is configured to use SSL, but we have
not been able to determine which SSL backend it is using. Please see PycURL documentation for how to specify the SSL backend manually.
Then you need to define some environment variables:
.. code:: bash
export PYCURL_SSL_LIBRARY=openssl
export LDFLAGS=-L/usr/local/opt/openssl/lib
export CPPFLAGS=-I/usr/local/opt/openssl/include
Reference : `stackoverflow issue <https://stackoverflow.com/questions/51019622/curl-is-configured-to-use-ssl-but-we-have-not-been-able-to-determine-which-ssl>`__

View File

@@ -29,29 +29,3 @@ The second time, you will see::
If you set the loglevel to DEBUG, you will also get information about If you set the loglevel to DEBUG, you will also get information about
which matchers didn't match. This can help you with debugging custom which matchers didn't match. This can help you with debugging custom
matchers. matchers.
CannotOverwriteExistingCassetteException
========================================
When a request failed to be found in an existing cassette,
VCR.py tries to get the request(s) that may be similar to the one being searched.
The goal is to see which matcher(s) failed and understand what part of the failed request may have changed.
It can return multiple similar requests with :
- the matchers that have succeeded
- the matchers that have failed
- for each failed matchers, why it has failed with an assertion message
CannotOverwriteExistingCassetteException message example :
.. code:: python
CannotOverwriteExistingCassetteException: Can't overwrite existing cassette ('cassette.yaml') in your current record mode ('once').
No match for the request (<Request (GET) https://www.googleapis.com/?alt=json&maxResults=200>) was found.
Found 1 similar requests with 1 different matchers :
1 - (<Request (GET) https://www.googleapis.com/?alt=json&maxResults=500>).
Matchers succeeded : ['method', 'scheme', 'host', 'port', 'path']
Matchers failed :
query - assertion failure :
[('alt', 'json'), ('maxResults', '200')] != [('alt', 'json'), ('maxResults', '500')]

View File

@@ -9,7 +9,7 @@ with pip::
Compatibility Compatibility
------------- -------------
VCR.py supports Python 2.7 and 3.5+, and VCR.py supports Python 2.7 and 3.4+, and
`pypy <http://pypy.org>`__. `pypy <http://pypy.org>`__.
The following HTTP libraries are supported: The following HTTP libraries are supported:

View File

@@ -11,7 +11,7 @@ Usage
assert 'Example domains' in response assert 'Example domains' in response
Run this test once, and VCR.py will record the HTTP request to Run this test once, and VCR.py will record the HTTP request to
``fixtures/vcr_cassettes/synopsis.yaml``. Run it again, and VCR.py will ``fixtures/vcr_cassettes/synopsis.yml``. Run it again, and VCR.py will
replay the response from iana.org when the http request is made. This replay the response from iana.org when the http request is made. This
test is now fast (no real HTTP requests are made anymore), deterministic test is now fast (no real HTTP requests are made anymore), deterministic
(the test will continue to pass, even if you are offline, or iana.org (the test will continue to pass, even if you are offline, or iana.org
@@ -95,9 +95,3 @@ Unittest Integration
While it's possible to use the context manager or decorator forms with unittest, While it's possible to use the context manager or decorator forms with unittest,
there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest
<https://github.com/agriffis/vcrpy-unittest>`__. <https://github.com/agriffis/vcrpy-unittest>`__.
Pytest Integration
------------------
A Pytest plugin is available here : `pytest-vcr
<https://github.com/ktosiek/pytest-vcr>`__.

View File

@@ -1,7 +1,3 @@
#!/bin/bash #!/bin/bash
# https://blog.ionelmc.ro/2015/04/14/tox-tricks-and-patterns/#when-it-inevitably-leads-to-shell-scripts
# If you are getting an INVOCATION ERROR for this script then there is
# a good chance you are running on Windows.
# You can and should use WSL for running tox on Windows when it calls bash scripts.
REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` py.test $* REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` py.test $*

View File

@@ -28,7 +28,9 @@ install_requires = [
'six>=1.5', 'six>=1.5',
'contextlib2; python_version=="2.7"', 'contextlib2; python_version=="2.7"',
'mock; python_version=="2.7"', 'mock; python_version=="2.7"',
'yarl; python_version>="3.5"', 'yarl; python_version>"3.4"',
'yarl<1.0.0; python_version=="3.4"',
'multidict<4.0.0,>=2.0; python_version=="3.4"'
] ]
excluded_packages = ["tests*"] excluded_packages = ["tests*"]
@@ -37,7 +39,7 @@ if sys.version_info[0] == 2:
setup( setup(
name='vcrpy', name='vcrpy',
version='2.1.0', version='2.0.1',
description=( description=(
"Automatically mock your HTTP interactions to simplify and " "Automatically mock your HTTP interactions to simplify and "
"speed up testing" "speed up testing"
@@ -47,18 +49,19 @@ setup(
author_email='me@kevinmccarthy.org', author_email='me@kevinmccarthy.org',
url='https://github.com/kevin1024/vcrpy', url='https://github.com/kevin1024/vcrpy',
packages=find_packages(exclude=excluded_packages), packages=find_packages(exclude=excluded_packages),
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*', python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*',
install_requires=install_requires, install_requires=install_requires,
license='MIT', license='MIT',
tests_require=['pytest', 'mock', 'pytest-httpbin'], tests_require=['pytest', 'mock', 'pytest-httpbin'],
classifiers=[ classifiers=[
'Development Status :: 5 - Production/Stable', 'Development Status :: 4 - Beta',
'Environment :: Console', 'Environment :: Console',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.7',

View File

@@ -9,7 +9,7 @@ interactions:
method: GET method: GET
uri: http://httpbin.org/ip uri: http://httpbin.org/ip
response: response:
body: {string: "{\n \"origin\": \"217.122.164.194\"\n}"} body: {string: !!python/unicode "{\n \"origin\": \"217.122.164.194\"\n}"}
headers: headers:
access-control-allow-origin: ['*'] access-control-allow-origin: ['*']
content-type: [application/json] content-type: [application/json]

View File

@@ -17,8 +17,6 @@ async def aiohttp_request(loop, method, url, output='text', encoding='utf-8', co
content = await response.json(encoding=encoding, content_type=content_type) content = await response.json(encoding=encoding, content_type=content_type)
elif output == 'raw': elif output == 'raw':
content = await response.read() content = await response.read()
elif output == 'stream':
content = await response.content.read()
response_ctx._resp.close() response_ctx._resp.close()
await session.close() await session.close()
@@ -30,14 +28,6 @@ def aiohttp_app():
async def hello(request): async def hello(request):
return aiohttp.web.Response(text='hello') return aiohttp.web.Response(text='hello')
async def json(request):
return aiohttp.web.json_response({})
async def json_empty_body(request):
return aiohttp.web.json_response()
app = aiohttp.web.Application() app = aiohttp.web.Application()
app.router.add_get('/', hello) app.router.add_get('/', hello)
app.router.add_get('/json', json)
app.router.add_get('/json/empty', json_empty_body)
return app return app

View File

@@ -1,5 +1,4 @@
import contextlib import contextlib
import logging
import pytest import pytest
asyncio = pytest.importorskip("asyncio") asyncio = pytest.importorskip("asyncio")
@@ -48,32 +47,14 @@ def test_status(tmpdir, scheme):
assert cassette.play_count == 1 assert cassette.play_count == 1
@pytest.mark.parametrize("auth", [None, aiohttp.BasicAuth("vcrpy", "test")]) def test_headers(tmpdir, scheme):
def test_headers(tmpdir, scheme, auth):
url = scheme + '://httpbin.org' url = scheme + '://httpbin.org'
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))): with vcr.use_cassette(str(tmpdir.join('headers.yaml'))):
response, _ = get(url, auth=auth) response, _ = get(url)
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cassette:
if auth is not None:
request = cassette.requests[0]
assert "AUTHORIZATION" in request.headers
cassette_response, _ = get(url, auth=auth)
assert dict(cassette_response.headers) == dict(response.headers)
assert cassette.play_count == 1
assert 'istr' not in cassette.data[0]
assert 'yarl.URL' not in cassette.data[0]
def test_case_insensitive_headers(tmpdir, scheme):
url = scheme + '://httpbin.org'
with vcr.use_cassette(str(tmpdir.join('whatever.yaml'))):
_, _ = get(url)
with vcr.use_cassette(str(tmpdir.join('whatever.yaml'))) as cassette:
cassette_response, _ = get(url) cassette_response, _ = get(url)
assert "Content-Type" in cassette_response.headers assert cassette_response.headers == response.headers
assert "content-type" in cassette_response.headers
assert cassette.play_count == 1 assert cassette.play_count == 1
@@ -112,43 +93,17 @@ def test_binary(tmpdir, scheme):
assert cassette.play_count == 1 assert cassette.play_count == 1
def test_stream(tmpdir, scheme): def test_post(tmpdir, scheme):
url = scheme + '://httpbin.org/get'
with vcr.use_cassette(str(tmpdir.join('stream.yaml'))):
resp, body = get(url, output='raw') # Do not use stream here, as the stream is exhausted by vcr
with vcr.use_cassette(str(tmpdir.join('stream.yaml'))) as cassette:
cassette_resp, cassette_body = get(url, output='stream')
assert cassette_body == body
assert cassette.play_count == 1
@pytest.mark.parametrize('body', ['data', 'json'])
def test_post(tmpdir, scheme, body, caplog):
caplog.set_level(logging.INFO)
data = {'key1': 'value1', 'key2': 'value2'} data = {'key1': 'value1', 'key2': 'value2'}
url = scheme + '://httpbin.org/post' url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('post.yaml'))): with vcr.use_cassette(str(tmpdir.join('post.yaml'))):
_, response_json = post(url, **{body: data}) _, response_json = post(url, data=data)
with vcr.use_cassette(str(tmpdir.join('post.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('post.yaml'))) as cassette:
request = cassette.requests[0] _, cassette_response_json = post(url, data=data)
assert request.body == data
_, cassette_response_json = post(url, **{body: data})
assert cassette_response_json == response_json assert cassette_response_json == response_json
assert cassette.play_count == 1 assert cassette.play_count == 1
assert next(
(
log
for log in caplog.records
if log.getMessage()
== '<Request (POST) {}> not in cassette, sending to real server'.format(url)
),
None,
), 'Log message not found.'
def test_params(tmpdir, scheme): def test_params(tmpdir, scheme):
url = scheme + '://httpbin.org/get' url = scheme + '://httpbin.org/get'
@@ -224,41 +179,3 @@ def test_aiohttp_test_client(aiohttp_client, tmpdir):
response_text = loop.run_until_complete(response.text()) response_text = loop.run_until_complete(response.text())
assert response_text == 'hello' assert response_text == 'hello'
assert cassette.play_count == 1 assert cassette.play_count == 1
def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
loop = asyncio.get_event_loop()
app = aiohttp_app()
url = '/json/empty'
client = loop.run_until_complete(aiohttp_client(app))
with vcr.use_cassette(str(tmpdir.join('get.yaml'))):
response = loop.run_until_complete(client.get(url))
assert response.status == 200
response_json = loop.run_until_complete(response.json())
assert response_json is None
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
response = loop.run_until_complete(client.get(url))
request = cassette.requests[0]
assert request.url == str(client.make_url(url))
response_json = loop.run_until_complete(response.json())
assert response_json is None
assert cassette.play_count == 1
def test_redirect(aiohttp_client, tmpdir):
url = 'https://httpbin.org/redirect/2'
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))):
response, _ = get(url)
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cassette:
cassette_response, _ = get(url)
assert cassette_response.status == response.status
assert len(cassette_response.history) == len(response.history)
assert len(cassette) == 3
assert cassette.play_count == 3

View File

@@ -1,60 +1,15 @@
import pytest import pytest
import os
boto3 = pytest.importorskip("boto3") boto3 = pytest.importorskip("boto3")
import boto3 # NOQA import boto3 # NOQA
import botocore # NOQA
import vcr # NOQA import vcr # NOQA
try: bucket = 'boto3-demo-1337' # a bucket you can access
from botocore import awsrequest # NOQA key = 'test/my_test.txt' # key with r+w access
content = 'hello world i am a string' # content to put in the test file
botocore_awsrequest = True
except ImportError:
botocore_awsrequest = False
# skip tests if boto does not use vendored requests anymore
# https://github.com/boto/botocore/pull/1495
boto3_skip_vendored_requests = pytest.mark.skipif(
botocore_awsrequest,
reason='botocore version {ver} does not use vendored requests anymore.'.format(
ver=botocore.__version__))
boto3_skip_awsrequest = pytest.mark.skipif(
not botocore_awsrequest,
reason='botocore version {ver} still uses vendored requests.'.format(
ver=botocore.__version__))
IAM_USER_NAME = "vcrpy"
@pytest.fixture def test_boto_stubs(tmpdir):
def iam_client():
def _iam_client(boto3_session=None):
if boto3_session is None:
boto3_session = boto3.Session(
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID', "default"),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY', "default"),
aws_session_token=None,
region_name=os.environ.get('AWS_DEFAULT_REGION', "default"),
)
return boto3_session.client('iam')
return _iam_client
@pytest.fixture
def get_user(iam_client):
def _get_user(client=None, user_name=IAM_USER_NAME):
if client is None:
# Default client set with fixture `iam_client`
client = iam_client()
return client.get_user(UserName=user_name)
return _get_user
@boto3_skip_vendored_requests
def test_boto_vendored_stubs(tmpdir):
with vcr.use_cassette(str(tmpdir.join('boto3-stubs.yml'))): with vcr.use_cassette(str(tmpdir.join('boto3-stubs.yml'))):
# Perform the imports within the patched context so that # Perform the imports within the patched context so that
# HTTPConnection, VerifiedHTTPSConnection refers to the patched version. # HTTPConnection, VerifiedHTTPSConnection refers to the patched version.
@@ -68,50 +23,45 @@ def test_boto_vendored_stubs(tmpdir):
VerifiedHTTPSConnection('hostname.does.not.matter') VerifiedHTTPSConnection('hostname.does.not.matter')
@pytest.mark.skipif( def test_boto3_without_vcr():
os.environ.get("TRAVIS_PULL_REQUEST") != "false", s3_resource = boto3.resource('s3')
reason="Encrypted Environment Variables from Travis Repository Settings" b = s3_resource.Bucket(bucket)
" are disabled on PRs from forks. " b.put_object(Key=key, Body=content)
"https://docs.travis-ci.com/user/pull-requests/#pull-requests-and-security-restrictions"
)
def test_boto_medium_difficulty(tmpdir, get_user):
# retrieve content to check it
o = s3_resource.Object(bucket, key).get()
# decode for python3
assert content == o['Body'].read().decode('utf-8')
def test_boto_medium_difficulty(tmpdir):
s3_resource = boto3.resource('s3')
b = s3_resource.Bucket(bucket)
with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))): with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))):
response = get_user() b.put_object(Key=key, Body=content)
assert response['User']['UserName'] == IAM_USER_NAME o = s3_resource.Object(bucket, key).get()
assert content == o['Body'].read().decode('utf-8')
with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))) as cass: with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))) as cass:
response = get_user() b.put_object(Key=key, Body=content)
assert response['User']['UserName'] == IAM_USER_NAME o = s3_resource.Object(bucket, key).get()
assert content == o['Body'].read().decode('utf-8')
assert cass.all_played assert cass.all_played
@pytest.mark.skipif( def test_boto_hardcore_mode(tmpdir):
os.environ.get("TRAVIS_PULL_REQUEST") != "false",
reason="Encrypted Environment Variables from Travis Repository Settings"
" are disabled on PRs from forks. "
"https://docs.travis-ci.com/user/pull-requests/#pull-requests-and-security-restrictions"
)
def test_boto_hardcore_mode(tmpdir, iam_client, get_user):
with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))): with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))):
ses = boto3.Session( s3_resource = boto3.resource('s3')
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), b = s3_resource.Bucket(bucket)
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'), b.put_object(Key=key, Body=content)
region_name=os.environ.get('AWS_DEFAULT_REGION'), o = s3_resource.Object(bucket, key).get()
) assert content == o['Body'].read().decode('utf-8')
client = iam_client(ses)
response = get_user(client=client)
assert response['User']['UserName'] == IAM_USER_NAME
with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))) as cass: with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))) as cass:
ses = boto3.Session( s3_resource = boto3.resource('s3')
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), b = s3_resource.Bucket(bucket)
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'), b.put_object(Key=key, Body=content)
aws_session_token=None, o = s3_resource.Object(bucket, key).get()
region_name=os.environ.get('AWS_DEFAULT_REGION'), assert content == o['Body'].read().decode('utf-8')
)
client = iam_client(ses)
response = get_user(client=client)
assert response['User']['UserName'] == IAM_USER_NAME
assert cass.all_played assert cass.all_played

View File

@@ -22,7 +22,7 @@ def http():
kwargs = { kwargs = {
'ca_certs': pytest_httpbin.certs.where() 'ca_certs': pytest_httpbin.certs.where()
} }
if sys.version_info[:2] in [(2, 7), (3, 7)]: if sys.version_info[:2] == (3, 7):
kwargs['disable_ssl_certificate_validation'] = True kwargs['disable_ssl_certificate_validation'] = True
return httplib2.Http(**kwargs) return httplib2.Http(**kwargs)

View File

@@ -44,7 +44,7 @@ def proxy_server():
target=httpd.serve_forever, target=httpd.serve_forever,
) )
proxy_process.start() proxy_process.start()
yield 'http://{}:{}'.format(*httpd.server_address) yield 'http://{0}:{1}'.format(*httpd.server_address)
proxy_process.terminate() proxy_process.terminate()

View File

@@ -116,10 +116,10 @@ def test_post_chunked_binary(tmpdir, httpbin):
assert req1 == req2 assert req1 == req2
@pytest.mark.skipif('sys.version_info >= (3, 6)', strict=True, raises=ConnectionError) @pytest.mark.xskip('sys.version_info >= (3, 6)', strict=True, raises=ConnectionError)
@pytest.mark.skipif((3, 5) < sys.version_info < (3, 6) and @pytest.mark.xskip((3, 5) < sys.version_info < (3, 6) and
platform.python_implementation() == 'CPython', platform.python_implementation() == 'CPython',
reason='Fails on CPython 3.5') reason='Fails on CPython 3.5')
def test_post_chunked_binary_secure(tmpdir, httpbin_secure): def test_post_chunked_binary_secure(tmpdir, httpbin_secure):
'''Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str.''' '''Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str.'''
data1 = iter([b'data', b'to', b'send']) data1 = iter([b'data', b'to', b'send'])
@@ -254,7 +254,7 @@ def test_nested_cassettes_with_session_created_before_nesting(httpbin_both, tmpd
def test_post_file(tmpdir, httpbin_both): def test_post_file(tmpdir, httpbin_both):
'''Ensure that we handle posting a file.''' '''Ensure that we handle posting a file.'''
url = httpbin_both + '/post' url = httpbin_both + '/post'
with vcr.use_cassette(str(tmpdir.join('post_file.yaml'))) as cass, open('tox.ini', 'rb') as f: with vcr.use_cassette(str(tmpdir.join('post_file.yaml'))) as cass, open('tox.ini') as f:
original_response = requests.post(url, f).content original_response = requests.post(url, f).content
# This also tests that we do the right thing with matching the body when they are files. # This also tests that we do the right thing with matching the body when they are files.
@@ -282,17 +282,3 @@ def test_filter_post_params(tmpdir, httpbin_both):
requests.post(url, data={'key': 'value'}) requests.post(url, data={'key': 'value'})
with vcr.use_cassette(cass_loc, filter_post_data_parameters=['key']) as cass: with vcr.use_cassette(cass_loc, filter_post_data_parameters=['key']) as cass:
assert b'key=value' not in cass.requests[0].body assert b'key=value' not in cass.requests[0].body
def test_post_unicode_match_on_body(tmpdir, httpbin_both):
'''Ensure that matching on POST body that contains Unicode characters works.'''
data = {'key1': 'value1', '●‿●': '٩(●̮̮̃•̃)۶'}
url = httpbin_both + '/post'
with vcr.use_cassette(str(tmpdir.join('requests.yaml')), additional_matchers=('body',)):
req1 = requests.post(url, data).content
with vcr.use_cassette(str(tmpdir.join('requests.yaml')), additional_matchers=('body',)):
req2 = requests.post(url, data).content
assert req1 == req2

View File

@@ -20,7 +20,7 @@ def verify_pool_mgr():
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def pool_mgr(): def pool_mgr():
return urllib3.PoolManager(cert_reqs='CERT_NONE') return urllib3.PoolManager()
def test_status_code(httpbin_both, tmpdir, verify_pool_mgr): def test_status_code(httpbin_both, tmpdir, verify_pool_mgr):

View File

@@ -59,7 +59,7 @@ def test_flickr_multipart_upload(httpbin, tmpdir):
def test_flickr_should_respond_with_200(tmpdir): def test_flickr_should_respond_with_200(tmpdir):
testfile = str(tmpdir.join('flickr.yml')) testfile = str(tmpdir.join('flickr.yml'))
with vcr.use_cassette(testfile): with vcr.use_cassette(testfile):
r = requests.post("https://api.flickr.com/services/upload", verify=False) r = requests.post("http://api.flickr.com/services/upload")
assert r.status_code == 200 assert r.status_code == 200
@@ -81,23 +81,16 @@ def test_amazon_doctype(tmpdir):
assert 'html' in r.text assert 'html' in r.text
def start_rpc_server(q):
httpd = xmlrpc_server.SimpleXMLRPCServer(('127.0.0.1', 0))
httpd.register_function(pow)
q.put('http://{}:{}'.format(*httpd.server_address))
httpd.serve_forever()
@pytest.yield_fixture(scope='session') @pytest.yield_fixture(scope='session')
def rpc_server(): def rpc_server():
q = multiprocessing.Queue() httpd = xmlrpc_server.SimpleXMLRPCServer(('', 0))
httpd.register_function(pow)
proxy_process = multiprocessing.Process( proxy_process = multiprocessing.Process(
target=start_rpc_server, target=httpd.serve_forever,
args=(q,)
) )
try: try:
proxy_process.start() proxy_process.start()
yield q.get() yield 'http://{}:{}'.format(*httpd.server_address)
finally: finally:
proxy_process.terminate() proxy_process.terminate()

View File

@@ -133,17 +133,6 @@ def test_cassette_all_played():
assert a.all_played assert a.all_played
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
def test_cassette_rewound():
a = Cassette('test')
a.append('foo', 'bar')
a.play_response('foo')
assert a.all_played
a.rewind()
assert not a.all_played
def test_before_record_response(): def test_before_record_response():
before_record_response = mock.Mock(return_value='mutated') before_record_response = mock.Mock(return_value='mutated')
cassette = Cassette('test', before_record_response=before_record_response) cassette = Cassette('test', before_record_response=before_record_response)
@@ -317,51 +306,3 @@ def test_use_as_decorator_on_generator():
yield 2 yield 2
assert list(test_function()) == [1, 2] assert list(test_function()) == [1, 2]
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_one_similar_request(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
(["method"], [("path", "failed : path"), ("query", "failed : query")]),
(["method", "path"], [("query", "failed : query")]),
([], [("method", "failed : method"), ("path", "failed : path"), ("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == [(2, ["method", "path"], [("query", "failed : query")])]
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_no_similar_requests(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
([], [("path", "failed : path"), ("query", "failed : query")]),
([], [("path", "failed : path"), ("query", "failed : query")]),
([], [("path", "failed : path"), ("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == []
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_many_similar_requests(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
(["method", "path"], [("query", "failed : query")]),
(["method"], [("path", "failed : path"), ("query", "failed : query")]),
(["method", "path"], [("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == [
(1, ["method", "path"], [("query", "failed : query")]),
(3, ["method", "path"], [("query", "failed : query")])
]

View File

@@ -1,73 +0,0 @@
import pytest
from vcr.compat import mock
from vcr import errors
from vcr.cassette import Cassette
@mock.patch("vcr.cassette.Cassette.find_requests_with_most_matches")
@pytest.mark.parametrize(
"most_matches, expected_message",
[
# No request match found
(
[],
"No similar requests, that have not been played, found."
),
# One matcher failed
(
[("similar request", ["method", "path"], [("query", "failed : query")])],
"Found 1 similar requests with 1 different matcher(s) :\n"
"\n1 - ('similar request').\n"
"Matchers succeeded : ['method', 'path']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query\n"
),
# Multiple failed matchers
(
[("similar request", ["method"], [("query", "failed : query"), ("path", "failed : path")])],
"Found 1 similar requests with 2 different matcher(s) :\n"
"\n1 - ('similar request').\n"
"Matchers succeeded : ['method']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query\n"
"path - assertion failure :\n"
"failed : path\n"
),
# Multiple similar requests
(
[
("similar request", ["method"], [("query", "failed : query")]),
("similar request 2", ["method"], [("query", "failed : query 2")])
],
"Found 2 similar requests with 1 different matcher(s) :\n"
"\n1 - ('similar request').\n"
"Matchers succeeded : ['method']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query\n"
"\n2 - ('similar request 2').\n"
"Matchers succeeded : ['method']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query 2\n"
),
]
)
def test_CannotOverwriteExistingCassetteException_get_message(
mock_find_requests_with_most_matches, most_matches, expected_message):
mock_find_requests_with_most_matches.return_value = most_matches
cassette = Cassette("path")
failed_request = "request"
exception_message = errors.CannotOverwriteExistingCassetteException._get_message(
cassette, "request"
)
expected = "Can't overwrite existing cassette (%r) in your current record mode (%r).\n" \
"No match for the request (%r) was found.\n" \
"%s" % (cassette._path, cassette.record_mode, failed_request, expected_message)
assert exception_message == expected

View File

@@ -1,5 +1,4 @@
import itertools import itertools
from vcr.compat import mock
import pytest import pytest
@@ -22,22 +21,20 @@ REQUESTS = {
def assert_matcher(matcher_name): def assert_matcher(matcher_name):
matcher = getattr(matchers, matcher_name) matcher = getattr(matchers, matcher_name)
for k1, k2 in itertools.permutations(REQUESTS, 2): for k1, k2 in itertools.permutations(REQUESTS, 2):
expecting_assertion_error = matcher_name in {k1, k2} matched = matcher(REQUESTS[k1], REQUESTS[k2])
if expecting_assertion_error: if matcher_name in {k1, k2}:
with pytest.raises(AssertionError): assert not matched
matcher(REQUESTS[k1], REQUESTS[k2])
else: else:
assert matcher(REQUESTS[k1], REQUESTS[k2]) is None assert matched
def test_uri_matcher(): def test_uri_matcher():
for k1, k2 in itertools.permutations(REQUESTS, 2): for k1, k2 in itertools.permutations(REQUESTS, 2):
expecting_assertion_error = {k1, k2} != {"base", "method"} matched = matchers.uri(REQUESTS[k1], REQUESTS[k2])
if expecting_assertion_error: if {k1, k2} != {'base', 'method'}:
with pytest.raises(AssertionError): assert not matched
matchers.uri(REQUESTS[k1], REQUESTS[k2])
else: else:
assert matchers.uri(REQUESTS[k1], REQUESTS[k2]) is None assert matched
req1_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>" req1_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
@@ -110,7 +107,7 @@ req2_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
) )
]) ])
def test_body_matcher_does_match(r1, r2): def test_body_matcher_does_match(r1, r2):
assert matchers.body(r1, r2) is None assert matchers.body(r1, r2)
@pytest.mark.parametrize("r1, r2", [ @pytest.mark.parametrize("r1, r2", [
@@ -138,128 +135,25 @@ def test_body_matcher_does_match(r1, r2):
) )
]) ])
def test_body_match_does_not_match(r1, r2): def test_body_match_does_not_match(r1, r2):
with pytest.raises(AssertionError): assert not matchers.body(r1, r2)
matchers.body(r1, r2)
def test_query_matcher(): def test_query_matcher():
req1 = request.Request("GET", "http://host.com/?a=b&c=d", "", {}) req1 = request.Request('GET', 'http://host.com/?a=b&c=d', '', {})
req2 = request.Request("GET", "http://host.com/?c=d&a=b", "", {}) req2 = request.Request('GET', 'http://host.com/?c=d&a=b', '', {})
assert matchers.query(req1, req2) is None assert matchers.query(req1, req2)
req1 = request.Request("GET", "http://host.com/?a=b&a=b&c=d", "", {}) req1 = request.Request('GET', 'http://host.com/?a=b&a=b&c=d', '', {})
req2 = request.Request("GET", "http://host.com/?a=b&c=d&a=b", "", {}) req2 = request.Request('GET', 'http://host.com/?a=b&c=d&a=b', '', {})
req3 = request.Request("GET", "http://host.com/?c=d&a=b&a=b", "", {}) req3 = request.Request('GET', 'http://host.com/?c=d&a=b&a=b', '', {})
assert matchers.query(req1, req2) is None assert matchers.query(req1, req2)
assert matchers.query(req1, req3) is None assert matchers.query(req1, req3)
def test_matchers(): def test_metchers():
assert_matcher("method") assert_matcher('method')
assert_matcher("scheme") assert_matcher('scheme')
assert_matcher("host") assert_matcher('host')
assert_matcher("port") assert_matcher('port')
assert_matcher("path") assert_matcher('path')
assert_matcher("query") assert_matcher('query')
def test_evaluate_matcher_does_match():
def bool_matcher(r1, r2):
return True
def assertion_matcher(r1, r2):
assert 1 == 1
r1, r2 = None, None
for matcher in [bool_matcher, assertion_matcher]:
match, assertion_msg = matchers._evaluate_matcher(matcher, r1, r2)
assert match is True
assert assertion_msg is None
def test_evaluate_matcher_does_not_match():
def bool_matcher(r1, r2):
return False
def assertion_matcher(r1, r2):
# This is like the "assert" statement preventing pytest to recompile it
raise AssertionError()
r1, r2 = None, None
for matcher in [bool_matcher, assertion_matcher]:
match, assertion_msg = matchers._evaluate_matcher(matcher, r1, r2)
assert match is False
assert not assertion_msg
def test_evaluate_matcher_does_not_match_with_assert_message():
def assertion_matcher(r1, r2):
# This is like the "assert" statement preventing pytest to recompile it
raise AssertionError("Failing matcher")
r1, r2 = None, None
match, assertion_msg = matchers._evaluate_matcher(assertion_matcher, r1, r2)
assert match is False
assert assertion_msg == "Failing matcher"
def test_get_assertion_message():
assert matchers.get_assertion_message(None) is None
assert matchers.get_assertion_message("") == ""
def test_get_assertion_message_with_details():
assertion_msg = "q1=1 != q2=1"
expected = assertion_msg
assert matchers.get_assertion_message(assertion_msg) == expected
@pytest.mark.parametrize(
"r1, r2, expected_successes, expected_failures",
[
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("GET", "http://host.com/p?a=b", "", {}),
["method", "path"],
[],
),
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("POST", "http://host.com/p?a=b", "", {}),
["path"],
["method"],
),
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("POST", "http://host.com/path?a=b", "", {}),
[],
["method", "path"],
),
],
)
def test_get_matchers_results(r1, r2, expected_successes, expected_failures):
successes, failures = matchers.get_matchers_results(
r1, r2, [matchers.method, matchers.path]
)
assert successes == expected_successes
assert len(failures) == len(expected_failures)
for i, expected_failure in enumerate(expected_failures):
assert failures[i][0] == expected_failure
assert failures[i][1] is not None
@mock.patch("vcr.matchers.get_matchers_results")
@pytest.mark.parametrize(
"successes, failures, expected_match",
[
(["method", "path"], [], True),
(["method"], ["path"], False),
([], ["method", "path"], False),
],
)
def test_requests_match(mock_get_matchers_results, successes, failures, expected_match):
mock_get_matchers_results.return_value = (successes, failures)
r1 = request.Request("GET", "http://host.com/p?a=b", "", {})
r2 = request.Request("GET", "http://host.com/p?a=b", "", {})
match = matchers.requests_match(r1, r2, [matchers.method, matchers.path])
assert match is expected_match

View File

@@ -5,12 +5,6 @@ import yaml
import vcr.migration import vcr.migration
# Use the libYAML versions if possible
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def test_try_migrate_with_json(tmpdir): def test_try_migrate_with_json(tmpdir):
cassette = tmpdir.join('cassette.json').strpath cassette = tmpdir.join('cassette.json').strpath
@@ -28,9 +22,9 @@ def test_try_migrate_with_yaml(tmpdir):
shutil.copy('tests/fixtures/migration/old_cassette.yaml', cassette) shutil.copy('tests/fixtures/migration/old_cassette.yaml', cassette)
assert vcr.migration.try_migrate(cassette) assert vcr.migration.try_migrate(cassette)
with open('tests/fixtures/migration/new_cassette.yaml', 'r') as f: with open('tests/fixtures/migration/new_cassette.yaml', 'r') as f:
expected_yaml = yaml.load(f, Loader=Loader) expected_yaml = yaml.load(f, Loader=yaml.FullLoader)
with open(cassette, 'r') as f: with open(cassette, 'r') as f:
actual_yaml = yaml.load(f, Loader=Loader) actual_yaml = yaml.load(f, Loader=yaml.FullLoader)
assert actual_yaml == expected_yaml assert actual_yaml == expected_yaml

View File

@@ -3,13 +3,9 @@ import pytest
from vcr.request import Request, HeadersDict from vcr.request import Request, HeadersDict
@pytest.mark.parametrize("method, uri, expected_str", [ def test_str():
('GET', 'http://www.google.com/', '<Request (GET) http://www.google.com/>'), req = Request('GET', 'http://www.google.com/', '', {})
('OPTIONS', '*', '<Request (OPTIONS) *>'), str(req) == '<Request (GET) http://www.google.com/>'
('CONNECT', 'host.some.where:1234', '<Request (CONNECT) host.some.where:1234>')
])
def test_str(method, uri, expected_str):
assert str(Request(method, uri, '', {})) == expected_str
def test_headers(): def test_headers():
@@ -33,21 +29,18 @@ def test_add_header_deprecated():
('https://go.com/', 443), ('https://go.com/', 443),
('https://go.com:443/', 443), ('https://go.com:443/', 443),
('https://go.com:3000/', 3000), ('https://go.com:3000/', 3000),
('*', None)
]) ])
def test_port(uri, expected_port): def test_port(uri, expected_port):
req = Request('GET', uri, '', {}) req = Request('GET', uri, '', {})
assert req.port == expected_port assert req.port == expected_port
@pytest.mark.parametrize("method, uri", [ def test_uri():
('GET', 'http://go.com/'), req = Request('GET', 'http://go.com/', '', {})
('GET', 'http://go.com:80/'), assert req.uri == 'http://go.com/'
('CONNECT', 'localhost:1234'),
('OPTIONS', '*') req = Request('GET', 'http://go.com:80/', '', {})
]) assert req.uri == 'http://go.com:80/'
def test_uri(method, uri):
assert Request(method, uri, '', {}).uri == uri
def test_HeadersDict(): def test_HeadersDict():

View File

@@ -1,9 +1,4 @@
# coding: UTF-8 # coding: UTF-8
import io
import unittest
import six
from vcr.stubs import VCRHTTPResponse from vcr.stubs import VCRHTTPResponse
@@ -71,52 +66,3 @@ def test_response_headers_should_have_correct_values():
assert response.headers.get('content-length') == "10806" assert response.headers.get('content-length') == "10806"
assert response.headers.get('date') == "Fri, 24 Oct 2014 18:35:37 GMT" assert response.headers.get('date') == "Fri, 24 Oct 2014 18:35:37 GMT"
@unittest.skipIf(six.PY2, "Regression test for Python3 only")
def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
"""
Regression test for https://github.com/kevin1024/vcrpy/issues/440
:return:
"""
recorded_response = {
"status": {
"message": "OK",
"code": 200
},
"headers": {
"content-length": ["0"],
"server": ["gunicorn/18.0"],
"connection": ["Close"],
"access-control-allow-credentials": ["true"],
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
"access-control-allow-origin": ["*"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b"\nPMID- 19416910\nOWN - NLM\nSTAT- MEDLINE\nDA - 20090513\nDCOM- "
b"20090622\nLR - "
b"20141209\nIS - 1091-6490 (Electronic)\nIS - 0027-8424 (Linking)\nVI - "
b"106\nIP - "
b"19\nDP - 2009 May 12\nTI - Genetic dissection of histone deacetylase "
b"requirement in "
b"tumor cells.\nPG - 7751-5\nLID - 10.1073/pnas.0903139106 [doi]\nAB - "
b"Histone "
b"deacetylase inhibitors (HDACi) represent a new group of drugs currently\n "
b" being "
b"tested in a wide variety of clinical applications. They are especially\n "
b" effective "
b"in preclinical models of cancer where they show antiproliferative\n "
b"action in many "
b"different types of cancer cells. Recently, the first HDACi was\n "
b"approved for the "
b"treatment of cutaneous T cell lymphomas. Most HDACi currently in\n "
b"clinical "
}
}
vcr_response = VCRHTTPResponse(recorded_response)
handle = io.TextIOWrapper(io.BufferedReader(vcr_response), encoding='utf-8')
handle = iter(handle)
articles = [line for line in handle]
assert len(articles) > 1

View File

@@ -147,7 +147,7 @@ def test_vcr_path_transformer():
# and it should still work with cassette_library_dir # and it should still work with cassette_library_dir
vcr = VCR(cassette_library_dir='/foo') vcr = VCR(cassette_library_dir='/foo')
with vcr.use_cassette('test') as cassette: with vcr.use_cassette('test') as cassette:
assert os.path.abspath(cassette._path) == os.path.abspath('/foo/test') assert cassette._path == '/foo/test'
@pytest.fixture @pytest.fixture

View File

@@ -1,16 +0,0 @@
import sys
def test_vcr_import_deprecation(recwarn):
if 'vcr' in sys.modules:
# Remove imported module entry if already loaded in another test
del sys.modules['vcr']
import vcr # noqa: F401
if sys.version_info[0] == 2:
assert len(recwarn) == 1
assert issubclass(recwarn[0].category, DeprecationWarning)
else:
assert len(recwarn) == 0

39
tox.ini
View File

@@ -1,21 +1,5 @@
[tox] [tox]
skip_missing_interpreters=true envlist = {py27,py34,py35,py36,py37,pypy}-{flakes,requests27,httplib2,urllib3121,tornado4,boto3,aiohttp}
envlist = cov-clean,{py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3,tornado4,boto3},{py35,py36,py37}-{aiohttp},cov-report
# Coverage environment tasks: cov-clean and cov-report
# https://pytest-cov.readthedocs.io/en/latest/tox.html
[testenv:cov-clean]
deps = coverage
skip_install=true
commands = coverage erase
[testenv:cov-report]
deps = coverage
skip_install=true
commands =
coverage html
coverage report --fail-under=75
[testenv:flakes] [testenv:flakes]
skipsdist = True skipsdist = True
@@ -26,37 +10,24 @@ commands =
deps = flake8 deps = flake8
[testenv] [testenv]
# Need to use develop install so that paths
# for aggregate code coverage combine
usedevelop=true
commands = commands =
./runtests.sh --cov=./vcr --cov-branch --cov-report=xml --cov-append {posargs} ./runtests.sh {posargs}
deps = deps =
Flask Flask<1
mock mock
pytest pytest
pytest-httpbin pytest-httpbin
pytest-cov
PyYAML PyYAML
ipaddress requests27: requests==2.7.0
requests: requests>=2.22.0
httplib2: httplib2 httplib2: httplib2
urllib3: urllib3 urllib3121: urllib3==1.21.1
{py27,py35,py36,pypy}-tornado4: tornado>=4,<5 {py27,py35,py36,pypy}-tornado4: tornado>=4,<5
{py27,py35,py36,pypy}-tornado4: pytest-tornado {py27,py35,py36,pypy}-tornado4: pytest-tornado
{py27,py35,py36}-tornado4: pycurl {py27,py35,py36}-tornado4: pycurl
boto3: boto3 boto3: boto3
boto3: urllib3
aiohttp: aiohttp aiohttp: aiohttp
aiohttp: pytest-asyncio aiohttp: pytest-asyncio
aiohttp: pytest-aiohttp aiohttp: pytest-aiohttp
depends =
{py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3,tornado4,boto3},{py35,py36,py37}-{aiohttp}: cov-clean
cov-report: {py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3,tornado4,boto3},{py35,py36,py37}-{aiohttp}
passenv =
AWS_ACCESS_KEY_ID
AWS_DEFAULT_REGION
AWS_SECRET_ACCESS_KEY
[flake8] [flake8]
max_line_length = 110 max_line_length = 110

View File

@@ -1,6 +1,4 @@
import logging import logging
import warnings
import sys
from .config import VCR from .config import VCR
# Set default logging handler to avoid "No handler found" warnings. # Set default logging handler to avoid "No handler found" warnings.
@@ -11,11 +9,6 @@ except ImportError:
def emit(self, record): def emit(self, record):
pass pass
if sys.version_info[0] == 2:
warnings.warn(
"Python 2.x support of vcrpy is deprecated and will be removed in an upcoming major release.",
DeprecationWarning
)
logging.getLogger(__name__).addHandler(NullHandler()) logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -8,7 +8,7 @@ import wrapt
from .compat import contextlib from .compat import contextlib
from .errors import UnhandledHTTPRequestError from .errors import UnhandledHTTPRequestError
from .matchers import requests_match, uri, method, get_matchers_results from .matchers import requests_match, uri, method
from .patch import CassettePatcherBuilder from .patch import CassettePatcherBuilder
from .serializers import yamlserializer from .serializers import yamlserializer
from .persisters.filesystem import FilesystemPersister from .persisters.filesystem import FilesystemPersister
@@ -190,7 +190,6 @@ class Cassette(object):
self._serializer = serializer or yamlserializer self._serializer = serializer or yamlserializer
self._match_on = match_on self._match_on = match_on
self._before_record_request = before_record_request or (lambda x: x) self._before_record_request = before_record_request or (lambda x: x)
log.info(self._before_record_request)
self._before_record_response = before_record_response or (lambda x: x) self._before_record_response = before_record_response or (lambda x: x)
self.inject = inject self.inject = inject
self.record_mode = record_mode self.record_mode = record_mode
@@ -226,7 +225,6 @@ class Cassette(object):
def append(self, request, response): def append(self, request, response):
"""Add a request, response pair to this cassette""" """Add a request, response pair to this cassette"""
log.info("Appending request %s and response %s", request, response)
request = self._before_record_request(request) request = self._before_record_request(request)
if not request: if not request:
return return
@@ -289,45 +287,6 @@ class Cassette(object):
% (self._path, request) % (self._path, request)
) )
def rewind(self):
self.play_counts = collections.Counter()
def find_requests_with_most_matches(self, request):
"""
Get the most similar request(s) stored in the cassette
of a given request as a list of tuples like this:
- the request object
- the successful matchers as string
- the failed matchers and the related assertion message with the difference details as strings tuple
This is useful when a request failed to be found,
we can get the similar request(s) in order to know what have changed in the request parts.
"""
best_matches = []
request = self._before_record_request(request)
for index, (stored_request, response) in enumerate(self.data):
successes, fails = get_matchers_results(request, stored_request, self._match_on)
best_matches.append((len(successes), stored_request, successes, fails))
best_matches.sort(key=lambda t: t[0], reverse=True)
# Get the first best matches (multiple if equal matches)
final_best_matches = []
if not best_matches:
return final_best_matches
previous_nb_success = best_matches[0][0]
for best_match in best_matches:
nb_success = best_match[0]
# Do not keep matches that have 0 successes,
# it means that the request is totally different from
# the ones stored in the cassette
if nb_success < 1 or previous_nb_success != nb_success:
break
previous_nb_success = nb_success
final_best_matches.append(best_match[1:])
return final_best_matches
def _as_dict(self): def _as_dict(self):
return {"requests": self.requests, "responses": self.responses} return {"requests": self.requests, "responses": self.responses}

View File

@@ -1,8 +1,4 @@
import copy import copy
try:
from collections import abc as collections_abc # only works on python 3.3+
except ImportError:
import collections as collections_abc
import functools import functools
import inspect import inspect
import os import os
@@ -17,6 +13,11 @@ from .util import compose, auto_decorate
from . import matchers from . import matchers
from . import filters from . import filters
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
class VCR(object): class VCR(object):
@@ -178,7 +179,7 @@ class VCR(object):
if decode_compressed_response: if decode_compressed_response:
filter_functions.append(filters.decode_response) filter_functions.append(filters.decode_response)
if before_record_response: if before_record_response:
if not isinstance(before_record_response, collections_abc.Iterable): if not isinstance(before_record_response, Iterable):
before_record_response = (before_record_response,) before_record_response = (before_record_response,)
filter_functions.extend(before_record_response) filter_functions.extend(before_record_response)
@@ -244,7 +245,7 @@ class VCR(object):
filter_functions.append(self._build_ignore_hosts(hosts_to_ignore)) filter_functions.append(self._build_ignore_hosts(hosts_to_ignore))
if before_record_request: if before_record_request:
if not isinstance(before_record_request, collections_abc.Iterable): if not isinstance(before_record_request, Iterable):
before_record_request = (before_record_request,) before_record_request = (before_record_request,)
filter_functions.extend(before_record_request) filter_functions.extend(before_record_request)

View File

@@ -1,38 +1,5 @@
class CannotOverwriteExistingCassetteException(Exception): class CannotOverwriteExistingCassetteException(Exception):
def __init__(self, *args, **kwargs): pass
self.cassette = kwargs["cassette"]
self.failed_request = kwargs["failed_request"]
message = self._get_message(kwargs["cassette"], kwargs["failed_request"])
super(CannotOverwriteExistingCassetteException, self).__init__(message)
@staticmethod
def _get_message(cassette, failed_request):
"""Get the final message related to the exception"""
# Get the similar requests in the cassette that
# have match the most with the request.
best_matches = cassette.find_requests_with_most_matches(failed_request)
if best_matches:
# Build a comprehensible message to put in the exception.
best_matches_msg = "Found {} similar requests with {} different matcher(s) :\n".format(
len(best_matches), len(best_matches[0][2]))
for idx, best_match in enumerate(best_matches, start=1):
request, succeeded_matchers, failed_matchers_assertion_msgs = best_match
best_matches_msg += "\n%s - (%r).\n" \
"Matchers succeeded : %s\n" \
"Matchers failed :\n" % (idx, request, succeeded_matchers)
for failed_matcher, assertion_msg in failed_matchers_assertion_msgs:
best_matches_msg += "%s - assertion failure :\n" \
"%s\n" % (failed_matcher, assertion_msg)
else:
best_matches_msg = "No similar requests, that have not been played, found."
return (
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r).\n"
"No match for the request (%r) was found.\n"
"%s"
% (cassette._path, cassette.record_mode, failed_request, best_matches_msg)
)
class UnhandledHTTPRequestError(KeyError): class UnhandledHTTPRequestError(KeyError):

View File

@@ -8,47 +8,35 @@ log = logging.getLogger(__name__)
def method(r1, r2): def method(r1, r2):
assert r1.method == r2.method, "{} != {}".format(r1.method, r2.method) return r1.method == r2.method
def uri(r1, r2): def uri(r1, r2):
assert r1.uri == r2.uri, "{} != {}".format(r1.uri, r2.uri) return r1.uri == r2.uri
def host(r1, r2): def host(r1, r2):
assert r1.host == r2.host, "{} != {}".format(r1.host, r2.host) return r1.host == r2.host
def scheme(r1, r2): def scheme(r1, r2):
assert r1.scheme == r2.scheme, "{} != {}".format(r1.scheme, r2.scheme) return r1.scheme == r2.scheme
def port(r1, r2): def port(r1, r2):
assert r1.port == r2.port, "{} != {}".format(r1.port, r2.port) return r1.port == r2.port
def path(r1, r2): def path(r1, r2):
assert r1.path == r2.path, "{} != {}".format(r1.path, r2.path) return r1.path == r2.path
def query(r1, r2): def query(r1, r2):
assert r1.query == r2.query, "{} != {}".format(r1.query, r2.query) return r1.query == r2.query
def raw_body(r1, r2): def raw_body(r1, r2):
assert read_body(r1) == read_body(r2) return read_body(r1) == read_body(r2)
def body(r1, r2):
transformer = _get_transformer(r1)
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
assert transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):
assert r1.headers == r2.headers, "{} != {}".format(r1.headers, r2.headers)
def _header_checker(value, header='Content-Type'): def _header_checker(value, header='Content-Type'):
@@ -68,12 +56,9 @@ def _transform_json(body):
_xml_header_checker = _header_checker('text/xml') _xml_header_checker = _header_checker('text/xml')
_xmlrpc_header_checker = _header_checker('xmlrpc', header='User-Agent') _xmlrpc_header_checker = _header_checker('xmlrpc', header='User-Agent')
_checker_transformer_pairs = ( _checker_transformer_pairs = (
(_header_checker('application/x-www-form-urlencoded'), (_header_checker('application/x-www-form-urlencoded'), urllib.parse.parse_qs),
lambda body: urllib.parse.parse_qs(body.decode('ascii'))), (_header_checker('application/json'), _transform_json),
(_header_checker('application/json'), (lambda request: _xml_header_checker(request) and _xmlrpc_header_checker(request), xmlrpc_client.loads),
_transform_json),
(lambda request: _xml_header_checker(request) and _xmlrpc_header_checker(request),
xmlrpc_client.loads),
) )
@@ -89,54 +74,28 @@ def _get_transformer(request):
return _identity return _identity
def requests_match(r1, r2, matchers): def body(r1, r2):
successes, failures = get_matchers_results(r1, r2, matchers) transformer = _get_transformer(r1)
if failures: r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
return transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):
return r1.headers == r2.headers
def _log_matches(r1, r2, matches):
differences = [m for m in matches if not m[0]]
if differences:
log.debug( log.debug(
"Requests {} and {} differ.\n" "Requests {} and {} differ according to "
"Failure details:\n" "the following matchers: {}".format(r1, r2, differences)
"{}".format(r1, r2, failures)
) )
return len(failures) == 0
def _evaluate_matcher(matcher_function, *args): def requests_match(r1, r2, matchers):
""" matches = [(m(r1, r2), m) for m in matchers]
Evaluate the result of a given matcher as a boolean with an assertion error message if any. _log_matches(r1, r2, matches)
It handles two types of matcher : return all(m[0] for m in matches)
- a matcher returning a boolean value.
- a matcher that only makes an assert, returning None or raises an assertion error.
"""
assertion_message = None
try:
match = matcher_function(*args)
match = True if match is None else match
except AssertionError as e:
match = False
assertion_message = str(e)
return match, assertion_message
def get_matchers_results(r1, r2, matchers):
"""
Get the comparison results of two requests as two list.
The first returned list represents the matchers names that passed.
The second list is the failed matchers as a string with failed assertion details if any.
"""
matches_success, matches_fails = [], []
for m in matchers:
matcher_name = m.__name__
match, assertion_message = _evaluate_matcher(m, r1, r2)
if match:
matches_success.append(matcher_name)
else:
assertion_message = get_assertion_message(assertion_message)
matches_fails.append((matcher_name, assertion_message))
return matches_success, matches_fails
def get_assertion_message(assertion_details):
"""
Get a detailed message about the failing matcher.
"""
return assertion_details

View File

@@ -68,7 +68,7 @@ def _migrate(data):
for item in data: for item in data:
req = item['request'] req = item['request']
res = item['response'] res = item['response']
uri = {k: req.pop(k) for k in PARTS} uri = dict((k, req.pop(k)) for k in PARTS)
req['uri'] = build_uri(**uri) req['uri'] = build_uri(**uri)
# convert headers to dict of lists # convert headers to dict of lists
headers = req['headers'] headers = req['headers']
@@ -100,7 +100,7 @@ def migrate_json(in_fp, out_fp):
def _list_of_tuples_to_dict(fs): def _list_of_tuples_to_dict(fs):
return {k: v for k, v in fs[0]} return dict((k, v) for k, v in fs[0])
def _already_migrated(data): def _already_migrated(data):
@@ -159,9 +159,9 @@ def main():
for (root, dirs, files) in os.walk(path) for (root, dirs, files) in os.walk(path)
for name in files) for name in files)
for file_path in files: for file_path in files:
migrated = try_migrate(file_path) migrated = try_migrate(file_path)
status = 'OK' if migrated else 'FAIL' status = 'OK' if migrated else 'FAIL'
sys.stderr.write("[{}] {}\n".format(status, file_path)) sys.stderr.write("[{}] {}\n".format(status, file_path))
sys.stderr.write("Done.\n") sys.stderr.write("Done.\n")

View File

@@ -6,29 +6,21 @@ from .compat import contextlib, mock
from .stubs import VCRHTTPConnection, VCRHTTPSConnection from .stubs import VCRHTTPConnection, VCRHTTPSConnection
from six.moves import http_client as httplib from six.moves import http_client as httplib
import logging
log = logging.getLogger(__name__)
# Save some of the original types for the purposes of unpatching # Save some of the original types for the purposes of unpatching
_HTTPConnection = httplib.HTTPConnection _HTTPConnection = httplib.HTTPConnection
_HTTPSConnection = httplib.HTTPSConnection _HTTPSConnection = httplib.HTTPSConnection
# Try to save the original types for boto3 # Try to save the original types for boto3
try: try:
from botocore.awsrequest import AWSHTTPSConnection, AWSHTTPConnection import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
except ImportError: except ImportError: # pragma: no cover
try: pass
import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
pass
else:
_Boto3VerifiedHTTPSConnection = cpool.VerifiedHTTPSConnection
_cpoolBoto3HTTPConnection = cpool.HTTPConnection
_cpoolBoto3HTTPSConnection = cpool.HTTPSConnection
else: else:
_Boto3VerifiedHTTPSConnection = AWSHTTPSConnection _Boto3VerifiedHTTPSConnection = cpool.VerifiedHTTPSConnection
_cpoolBoto3HTTPConnection = AWSHTTPConnection _cpoolBoto3HTTPConnection = cpool.HTTPConnection
_cpoolBoto3HTTPSConnection = AWSHTTPSConnection _cpoolBoto3HTTPSConnection = cpool.HTTPSConnection
cpool = None cpool = None
# Try to save the original types for urllib3 # Try to save the original types for urllib3
@@ -52,6 +44,7 @@ else:
_cpoolHTTPConnection = cpool.HTTPConnection _cpoolHTTPConnection = cpool.HTTPConnection
_cpoolHTTPSConnection = cpool.HTTPSConnection _cpoolHTTPSConnection = cpool.HTTPSConnection
# Try to save the original types for httplib2 # Try to save the original types for httplib2
try: try:
import httplib2 import httplib2
@@ -62,6 +55,7 @@ else:
_HTTPSConnectionWithTimeout = httplib2.HTTPSConnectionWithTimeout _HTTPSConnectionWithTimeout = httplib2.HTTPSConnectionWithTimeout
_SCHEME_TO_CONNECTION = httplib2.SCHEME_TO_CONNECTION _SCHEME_TO_CONNECTION = httplib2.SCHEME_TO_CONNECTION
# Try to save the original types for boto # Try to save the original types for boto
try: try:
import boto.https_connection import boto.https_connection
@@ -70,6 +64,7 @@ except ImportError: # pragma: no cover
else: else:
_CertValidatingHTTPSConnection = boto.https_connection.CertValidatingHTTPSConnection _CertValidatingHTTPSConnection = boto.https_connection.CertValidatingHTTPSConnection
# Try to save the original types for Tornado # Try to save the original types for Tornado
try: try:
import tornado.simple_httpclient import tornado.simple_httpclient
@@ -79,6 +74,7 @@ else:
_SimpleAsyncHTTPClient_fetch_impl = \ _SimpleAsyncHTTPClient_fetch_impl = \
tornado.simple_httpclient.SimpleAsyncHTTPClient.fetch_impl tornado.simple_httpclient.SimpleAsyncHTTPClient.fetch_impl
try: try:
import tornado.curl_httpclient import tornado.curl_httpclient
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
@@ -103,7 +99,6 @@ class CassettePatcherBuilder(object):
return self._build_patchers_from_mock_triples( return self._build_patchers_from_mock_triples(
function(self, *args, **kwargs) function(self, *args, **kwargs)
) )
return wrapped return wrapped
def __init__(self, cassette): def __init__(self, cassette):
@@ -189,26 +184,13 @@ class CassettePatcherBuilder(object):
return () return ()
return self._urllib3_patchers(cpool, requests_stubs) return self._urllib3_patchers(cpool, requests_stubs)
@_build_patchers_from_mock_triples_decorator
def _boto3(self): def _boto3(self):
try: try:
# botocore using awsrequest import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
import botocore.awsrequest as cpool
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
try: return ()
# botocore using vendored requests from .stubs import boto3_stubs
import botocore.vendored.requests.packages.urllib3.connectionpool as cpool return self._urllib3_patchers(cpool, boto3_stubs)
except ImportError: # pragma: no cover
pass
else:
from .stubs import boto3_stubs
yield self._urllib3_patchers(cpool, boto3_stubs)
else:
from .stubs import boto3_stubs
log.debug("Patching boto3 cpool with %s", cpool)
yield cpool.AWSHTTPConnectionPool, 'ConnectionCls', boto3_stubs.VCRRequestsHTTPConnection
yield cpool.AWSHTTPSConnectionPool, 'ConnectionCls', boto3_stubs.VCRRequestsHTTPSConnection
def _patched_get_conn(self, connection_pool_class, connection_class_getter): def _patched_get_conn(self, connection_pool_class, connection_class_getter):
get_conn = connection_pool_class._get_conn get_conn = connection_pool_class._get_conn
@@ -425,36 +407,22 @@ def reset_patchers():
yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls', _cpoolHTTPSConnection) yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls', _cpoolHTTPSConnection)
try: try:
# unpatch botocore with awsrequest import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
import botocore.awsrequest as cpool
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
try: pass
# unpatch botocore with vendored requests
import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
pass
else:
# unpatch requests v1.x
yield mock.patch.object(cpool, 'VerifiedHTTPSConnection', _Boto3VerifiedHTTPSConnection)
yield mock.patch.object(cpool, 'HTTPConnection', _cpoolBoto3HTTPConnection)
# unpatch requests v2.x
if hasattr(cpool.HTTPConnectionPool, 'ConnectionCls'):
yield mock.patch.object(cpool.HTTPConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPConnection)
yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPSConnection)
if hasattr(cpool, 'HTTPSConnection'):
yield mock.patch.object(cpool, 'HTTPSConnection', _cpoolBoto3HTTPSConnection)
else: else:
if hasattr(cpool.AWSHTTPConnectionPool, 'ConnectionCls'): # unpatch requests v1.x
yield mock.patch.object(cpool.AWSHTTPConnectionPool, 'ConnectionCls', yield mock.patch.object(cpool, 'VerifiedHTTPSConnection', _Boto3VerifiedHTTPSConnection)
yield mock.patch.object(cpool, 'HTTPConnection', _cpoolBoto3HTTPConnection)
# unpatch requests v2.x
if hasattr(cpool.HTTPConnectionPool, 'ConnectionCls'):
yield mock.patch.object(cpool.HTTPConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPConnection) _cpoolBoto3HTTPConnection)
yield mock.patch.object(cpool.AWSHTTPSConnectionPool, 'ConnectionCls', yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPSConnection) _cpoolBoto3HTTPSConnection)
if hasattr(cpool, 'AWSHTTPSConnection'): if hasattr(cpool, 'HTTPSConnection'):
yield mock.patch.object(cpool, 'AWSHTTPSConnection', _cpoolBoto3HTTPSConnection) yield mock.patch.object(cpool, 'HTTPSConnection', _cpoolBoto3HTTPSConnection)
try: try:
import httplib2 as cpool import httplib2 as cpool

View File

@@ -2,9 +2,6 @@ import warnings
from six import BytesIO, text_type from six import BytesIO, text_type
from six.moves.urllib.parse import urlparse, parse_qsl from six.moves.urllib.parse import urlparse, parse_qsl
from .util import CaseInsensitiveDict from .util import CaseInsensitiveDict
import logging
log = logging.getLogger(__name__)
class Request(object): class Request(object):
@@ -21,7 +18,6 @@ class Request(object):
else: else:
self.body = body self.body = body
self.headers = headers self.headers = headers
log.debug("Invoking Request %s", self.uri)
@property @property
def headers(self): def headers(self):
@@ -62,10 +58,7 @@ class Request(object):
parse_uri = urlparse(self.uri) parse_uri = urlparse(self.uri)
port = parse_uri.port port = parse_uri.port
if port is None: if port is None:
try: port = {'https': 443, 'http': 80}[parse_uri.scheme]
port = {'https': 443, 'http': 80}[parse_uri.scheme]
except KeyError:
pass
return port return port
@property @property
@@ -98,7 +91,7 @@ class Request(object):
'method': self.method, 'method': self.method,
'uri': self.uri, 'uri': self.uri,
'body': self.body, 'body': self.body,
'headers': {k: [v] for k, v in self.headers.items()}, 'headers': dict(((k, [v]) for k, v in self.headers.items())),
} }
@classmethod @classmethod

View File

@@ -60,10 +60,9 @@ def serialize_headers(response):
class VCRHTTPResponse(HTTPResponse): class VCRHTTPResponse(HTTPResponse):
""" """
Stub response class that gets returned instead of a HTTPResponse Stub reponse class that gets returned instead of a HTTPResponse
""" """
def __init__(self, recorded_response): def __init__(self, recorded_response):
self.fp = None
self.recorded_response = recorded_response self.recorded_response = recorded_response
self.reason = recorded_response['status']['message'] self.reason = recorded_response['status']['message']
self.status = self.code = recorded_response['status']['code'] self.status = self.code = recorded_response['status']['code']
@@ -94,30 +93,9 @@ class VCRHTTPResponse(HTTPResponse):
def read(self, *args, **kwargs): def read(self, *args, **kwargs):
return self._content.read(*args, **kwargs) return self._content.read(*args, **kwargs)
def readall(self):
return self._content.readall()
def readinto(self, *args, **kwargs):
return self._content.readinto(*args, **kwargs)
def readline(self, *args, **kwargs): def readline(self, *args, **kwargs):
return self._content.readline(*args, **kwargs) return self._content.readline(*args, **kwargs)
def readlines(self, *args, **kwargs):
return self._content.readlines(*args, **kwargs)
def seekable(self):
return self._content.seekable()
def tell(self):
return self._content.tell()
def isatty(self):
return self._content.isatty()
def seek(self, *args, **kwargs):
return self._content.seek(*args, **kwargs)
def close(self): def close(self):
self._closed = True self._closed = True
return True return True
@@ -143,9 +121,6 @@ class VCRHTTPResponse(HTTPResponse):
else: else:
return default return default
def readable(self):
return self._content.readable()
class VCRConnection(object): class VCRConnection(object):
# A reference to the cassette that's currently being patched in # A reference to the cassette that's currently being patched in
@@ -164,13 +139,12 @@ class VCRConnection(object):
if url and not url.startswith('/'): if url and not url.startswith('/'):
# Then this must be a proxy request. # Then this must be a proxy request.
return url return url
uri = "{}://{}{}{}".format( uri = "{0}://{1}{2}{3}".format(
self._protocol, self._protocol,
self.real_connection.host, self.real_connection.host,
self._port_postfix(), self._port_postfix(),
url, url,
) )
log.debug("Absolute URI: %s", uri)
return uri return uri
def _url(self, uri): def _url(self, uri):
@@ -256,8 +230,11 @@ class VCRConnection(object):
self._vcr_request self._vcr_request
): ):
raise CannotOverwriteExistingCassetteException( raise CannotOverwriteExistingCassetteException(
cassette=self.cassette, "No match for the request (%r) was found. "
failed_request=self._vcr_request "Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (self._vcr_request, self.cassette._path,
self.cassette.record_mode)
) )
# Otherwise, we should send the request, then get the response # Otherwise, we should send the request, then get the response

View File

@@ -3,21 +3,13 @@ from __future__ import absolute_import
import asyncio import asyncio
import functools import functools
import logging
import json import json
from aiohttp import ClientResponse, streams from aiohttp import ClientResponse
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL from yarl import URL
from vcr.request import Request from vcr.request import Request
log = logging.getLogger(__name__)
class MockStream(asyncio.StreamReader, streams.AsyncStreamReaderMixin):
pass
class MockClientResponse(ClientResponse): class MockClientResponse(ClientResponse):
def __init__(self, method, url): def __init__(self, method, url):
@@ -34,11 +26,7 @@ class MockClientResponse(ClientResponse):
) )
async def json(self, *, encoding='utf-8', loads=json.loads, **kwargs): # NOQA: E999 async def json(self, *, encoding='utf-8', loads=json.loads, **kwargs): # NOQA: E999
stripped = self._body.strip() return loads(self._body.decode(encoding))
if not stripped:
return None
return loads(stripped.decode(encoding))
async def text(self, encoding='utf-8', errors='strict'): async def text(self, encoding='utf-8', errors='strict'):
return self._body.decode(encoding, errors=errors) return self._body.decode(encoding, errors=errors)
@@ -49,74 +37,15 @@ class MockClientResponse(ClientResponse):
def release(self): def release(self):
pass pass
@property
def content(self):
s = MockStream()
s.feed_data(self._body)
s.feed_eof()
return s
def build_response(vcr_request, vcr_response, history):
response = MockClientResponse(vcr_request.method, URL(vcr_response.get('url')))
response.status = vcr_response['status']['code']
response._body = vcr_response['body'].get('string', b'')
response.reason = vcr_response['status']['message']
response._headers = CIMultiDictProxy(CIMultiDict(vcr_response['headers']))
response._history = tuple(history)
response.close()
return response
def play_responses(cassette, vcr_request):
history = []
vcr_response = cassette.play_response(vcr_request)
response = build_response(vcr_request, vcr_response, history)
while cassette.can_play_response_for(vcr_request):
history.append(response)
vcr_response = cassette.play_response(vcr_request)
response = build_response(vcr_request, vcr_response, history)
return response
async def record_response(cassette, vcr_request, response, past=False):
body = {} if past else {'string': (await response.read())}
headers = {str(key): value for key, value in response.headers.items()}
vcr_response = {
'status': {
'code': response.status,
'message': response.reason,
},
'headers': headers,
'body': body, # NOQA: E999
'url': str(response.url),
}
cassette.append(vcr_request, vcr_response)
async def record_responses(cassette, vcr_request, response):
for past_response in response.history:
await record_response(cassette, vcr_request, past_response, past=True)
await record_response(cassette, vcr_request, response)
def vcr_request(cassette, real_request): def vcr_request(cassette, real_request):
@functools.wraps(real_request) @functools.wraps(real_request)
async def new_request(self, method, url, **kwargs): async def new_request(self, method, url, **kwargs):
headers = kwargs.get('headers') headers = kwargs.get('headers')
auth = kwargs.get('auth')
headers = self._prepare_headers(headers) headers = self._prepare_headers(headers)
data = kwargs.get('data', kwargs.get('json')) data = kwargs.get('data')
params = kwargs.get('params') params = kwargs.get('params')
if auth is not None:
headers['AUTHORIZATION'] = auth.encode()
request_url = URL(url) request_url = URL(url)
if params: if params:
for k, v in params.items(): for k, v in params.items():
@@ -126,7 +55,16 @@ def vcr_request(cassette, real_request):
vcr_request = Request(method, str(request_url), data, headers) vcr_request = Request(method, str(request_url), data, headers)
if cassette.can_play_response_for(vcr_request): if cassette.can_play_response_for(vcr_request):
return play_responses(cassette, vcr_request) vcr_response = cassette.play_response(vcr_request)
response = MockClientResponse(method, URL(vcr_response.get('url')))
response.status = vcr_response['status']['code']
response._body = vcr_response['body']['string']
response.reason = vcr_response['status']['message']
response._headers = vcr_response['headers']
response.close()
return response
if cassette.write_protected and cassette.filter_request(vcr_request): if cassette.write_protected and cassette.filter_request(vcr_request):
response = MockClientResponse(method, URL(url)) response = MockClientResponse(method, URL(url))
@@ -138,10 +76,19 @@ def vcr_request(cassette, real_request):
response.close() response.close()
return response return response
log.info('%s not in cassette, sending to real server', vcr_request)
response = await real_request(self, method, url, **kwargs) # NOQA: E999 response = await real_request(self, method, url, **kwargs) # NOQA: E999
await record_responses(cassette, vcr_request, response)
vcr_response = {
'status': {
'code': response.status,
'message': response.reason,
},
'headers': dict(response.headers),
'body': {'string': (await response.read())}, # NOQA: E999
'url': response.url,
}
cassette.append(vcr_request, vcr_response)
return response return response
return new_request return new_request

View File

@@ -1,22 +1,11 @@
"""Stubs for boto3""" '''Stubs for boto3'''
import six
try:
# boto using awsrequest
from botocore.awsrequest import AWSHTTPConnection as HTTPConnection
from botocore.awsrequest import AWSHTTPSConnection as VerifiedHTTPSConnection
except ImportError: # pragma: nocover
# boto using vendored requests
# urllib3 defines its own HTTPConnection classes, which boto3 goes ahead and assumes
# you're using. It includes some polyfills for newer features missing in older pythons.
try:
from urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
except ImportError: # pragma: nocover
from requests.packages.urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
from botocore.vendored.requests.packages.urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
from ..stubs import VCRHTTPConnection, VCRHTTPSConnection from ..stubs import VCRHTTPConnection, VCRHTTPSConnection
# urllib3 defines its own HTTPConnection classes, which boto3 goes ahead and assumes
# you're using. It includes some polyfills for newer features missing in older pythons.
class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection): class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection):
_baseclass = HTTPConnection _baseclass = HTTPConnection
@@ -24,20 +13,3 @@ class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection):
class VCRRequestsHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection): class VCRRequestsHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection):
_baseclass = VerifiedHTTPSConnection _baseclass = VerifiedHTTPSConnection
def __init__(self, *args, **kwargs):
if six.PY3:
kwargs.pop('strict', None) # apparently this is gone in py3
# need to temporarily reset here because the real connection
# inherits from the thing that we are mocking out. Take out
# the reset if you want to see what I mean :)
from vcr.patch import force_reset
with force_reset():
self.real_connection = self._baseclass(*args, **kwargs)
# Make sure to set those attributes as it seems `AWSHTTPConnection` does not
# set them, making the connection to fail !
self.real_connection.assert_hostname = kwargs.get("assert_hostname", False)
self.real_connection.cert_reqs = kwargs.get("cert_reqs", 'CERT_NONE')
self._sock = None

View File

@@ -75,8 +75,10 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
request, request,
599, 599,
error=CannotOverwriteExistingCassetteException( error=CannotOverwriteExistingCassetteException(
cassette=cassette, "No match for the request (%r) was found. "
failed_request=vcr_request "Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (vcr_request, cassette._path, cassette.record_mode)
), ),
request_time=self.io_loop.time() - request.start_time, request_time=self.io_loop.time() - request.start_time,
) )

View File

@@ -118,10 +118,10 @@ def auto_decorate(
) )
def __new__(cls, name, bases, attributes_dict): def __new__(cls, name, bases, attributes_dict):
new_attributes_dict = { new_attributes_dict = dict(
attribute: maybe_decorate(attribute, value) (attribute, maybe_decorate(attribute, value))
for attribute, value in attributes_dict.items() for attribute, value in attributes_dict.items()
} )
return super(DecorateAll, cls).__new__( return super(DecorateAll, cls).__new__(
cls, name, bases, new_attributes_dict cls, name, bases, new_attributes_dict
) )