1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-09 17:15:35 +00:00

Compare commits

..

1 Commits

Author SHA1 Message Date
Luiz Menezes
4ce937978e Add pytest-xdist 2018-05-06 19:23:56 -03:00
49 changed files with 350 additions and 1443 deletions

View File

@@ -1,7 +0,0 @@
coverage:
status:
project:
default:
target: 75
# Allow 0% coverage regression
threshold: 0

3
.gitignore vendored
View File

@@ -1,13 +1,10 @@
*.pyc *.pyc
.tox .tox
.cache .cache
.pytest_cache/
build/ build/
dist/ dist/
*.egg/ *.egg/
.coverage .coverage
coverage.xml
htmlcov/
*.egg-info/ *.egg-info/
pytestdebug.log pytestdebug.log

View File

@@ -1,42 +1,25 @@
language: python language: python
sudo: false
before_install: openssl version before_install: openssl version
env: env:
# global: global:
# - secure: AifoKzwhjV94cmcQZrdQmqRu/9rkZZvWpwBv1daeAQpLOKFPGsOm3D+x2cSw9+iCfkgDZDfqQVv1kCaFVxTll8v8jTq5SJdqEY0NmGWbj/UkNtShh609oRDsuzLxAEwtVKYjf/h8K2BRea+bl1tGkwZ2vtmYS6dxNlAijjWOfds= - secure: AifoKzwhjV94cmcQZrdQmqRu/9rkZZvWpwBv1daeAQpLOKFPGsOm3D+x2cSw9+iCfkgDZDfqQVv1kCaFVxTll8v8jTq5SJdqEY0NmGWbj/UkNtShh609oRDsuzLxAEwtVKYjf/h8K2BRea+bl1tGkwZ2vtmYS6dxNlAijjWOfds=
# - secure: LBSEg/gMj4u4Hrpo3zs6Y/1mTpd2RtcN49mZIFgTdbJ9IhpiNPqcEt647Lz94F9Eses2x2WbNuKqZKZZReY7QLbEzU1m0nN5jlaKrjcG5NR5clNABfFFyhgc0jBikyS4abAG8jc2efeaTrFuQwdoF4sE8YiVrkiVj2X5Xoi6sBk= - secure: LBSEg/gMj4u4Hrpo3zs6Y/1mTpd2RtcN49mZIFgTdbJ9IhpiNPqcEt647Lz94F9Eses2x2WbNuKqZKZZReY7QLbEzU1m0nN5jlaKrjcG5NR5clNABfFFyhgc0jBikyS4abAG8jc2efeaTrFuQwdoF4sE8YiVrkiVj2X5Xoi6sBk=
matrix: matrix:
- TOX_SUFFIX="flakes" - TOX_SUFFIX="flakes"
- TOX_SUFFIX="requests" - TOX_SUFFIX="requests27"
- TOX_SUFFIX="httplib2" - TOX_SUFFIX="httplib2"
- TOX_SUFFIX="boto3" - TOX_SUFFIX="boto3"
- TOX_SUFFIX="urllib3" - TOX_SUFFIX="urllib3121"
- TOX_SUFFIX="tornado4" - TOX_SUFFIX="tornado4"
- TOX_SUFFIX="aiohttp" - TOX_SUFFIX="aiohttp"
matrix: matrix:
include:
- env: TOX_SUFFIX="flakes"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="requests"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="httplib2"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="urllib3"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="tornado4"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="aiohttp"
python: 3.7
dist: xenial
allow_failures: allow_failures:
- env: TOX_SUFFIX="aiohttp" - env: TOX_SUFFIX="boto3"
python: "pypy3.5-5.9.0"
exclude: exclude:
# Only run flakes on a single Python 2.x and a single 3.x # Only run flakes on a single Python 2.x and a single 3.x
- env: TOX_SUFFIX="flakes"
python: 3.4
- env: TOX_SUFFIX="flakes" - env: TOX_SUFFIX="flakes"
python: 3.5 python: 3.5
- env: TOX_SUFFIX="flakes" - env: TOX_SUFFIX="flakes"
@@ -54,9 +37,7 @@ python:
- pypy - pypy
- "pypy3.5-5.9.0" - "pypy3.5-5.9.0"
install: install:
- pip install tox-travis codecov - pip install tox-travis
- if [[ $TOX_SUFFIX != 'flakes' ]]; then python setup.py install ; fi - if [[ $TOX_SUFFIX != 'flakes' ]]; then python setup.py install ; fi
script: script:
- tox -e "${TOX_SUFFIX}" - tox -e "${TOX_SUFFIX}"
after_success:
- codecov

View File

@@ -1,4 +1,4 @@
|PyPI| |Python versions| |Build Status| |CodeCov| |Gitter| |PyPI| |Python versions| |Build Status| |Waffle Ready| |Gitter|
VCR.py VCR.py
====== ======
@@ -41,6 +41,19 @@ VCR.py will detect the absence of a cassette file and once again record
all HTTP interactions, which will update them to correspond to the new all HTTP interactions, which will update them to correspond to the new
API. API.
Support
-------
VCR.py works great with the following HTTP clients:
- requests
- aiohttp
- urllib3
- tornado
- urllib2
- boto3
License License
======= =======
@@ -48,15 +61,13 @@ This library uses the MIT license. See `LICENSE.txt <LICENSE.txt>`__ for
more details more details
.. |PyPI| image:: https://img.shields.io/pypi/v/vcrpy.svg .. |PyPI| image:: https://img.shields.io/pypi/v/vcrpy.svg
:target: https://pypi.python.org/pypi/vcrpy :target: https://pypi.python.org/pypi/vcrpy-unittest
.. |Python versions| image:: https://img.shields.io/pypi/pyversions/vcrpy.svg .. |Python versions| image:: https://img.shields.io/pypi/pyversions/vcrpy-unittest.svg
:target: https://pypi.python.org/pypi/vcrpy :target: https://pypi.python.org/pypi/vcrpy-unittest
.. |Build Status| image:: https://secure.travis-ci.org/kevin1024/vcrpy.svg?branch=master .. |Build Status| image:: https://secure.travis-ci.org/kevin1024/vcrpy.png?branch=master
:target: http://travis-ci.org/kevin1024/vcrpy :target: http://travis-ci.org/kevin1024/vcrpy
.. |Waffle Ready| image:: https://badge.waffle.io/kevin1024/vcrpy.png?label=ready&title=waffle
:target: https://waffle.io/kevin1024/vcrpy
.. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg .. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg
:alt: Join the chat at https://gitter.im/kevin1024/vcrpy :alt: Join the chat at https://gitter.im/kevin1024/vcrpy
:target: https://gitter.im/kevin1024/vcrpy?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge :target: https://gitter.im/kevin1024/vcrpy?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge
.. |CodeCov| image:: https://codecov.io/gh/kevin1024/vcrpy/branch/master/graph/badge.svg
:target: https://codecov.io/gh/kevin1024/vcrpy
:alt: Code Coverage Status

View File

@@ -97,12 +97,8 @@ Create your own method with the following signature
def my_matcher(r1, r2): def my_matcher(r1, r2):
Your method receives the two requests and can return : Your method receives the two requests and must return ``True`` if they
match, ``False`` if they don't.
- Use an ``assert`` statement in the matcher, then we have ``None`` if they match, raise an `AssertionError`` if they don't.
- A boolean, ``True`` if they match, ``False`` if they don't.
Note : You should use an ``assert`` statement in order to have feedback when a matcher is failing.
Finally, register your method with VCR to use your new request matcher. Finally, register your method with VCR to use your new request matcher.
@@ -111,7 +107,7 @@ Finally, register your method with VCR to use your new request matcher.
import vcr import vcr
def jurassic_matcher(r1, r2): def jurassic_matcher(r1, r2):
assert r1.uri == r2.uri and 'JURASSIC PARK' in r1.body return r1.uri == r2.uri and 'JURASSIC PARK' in r1.body
my_vcr = vcr.VCR() my_vcr = vcr.VCR()
my_vcr.register_matcher('jurassic', jurassic_matcher) my_vcr.register_matcher('jurassic', jurassic_matcher)
@@ -225,25 +221,24 @@ Custom Request filtering
~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~
If none of these covers your request filtering needs, you can register a If none of these covers your request filtering needs, you can register a
callback with the ``before_record_request`` configuration option to callback that will manipulate the HTTP request before adding it to the
manipulate the HTTP request before adding it to the cassette, or return cassette. Use the ``before_record_request`` configuration option to so this.
``None`` to ignore it entirely. Here is an example that will never record Here is an example that will never record requests to the /login
requests to the ``'/login'`` path: endpoint.
.. code:: python .. code:: python
def before_record_cb(request): def before_record_cb(request):
if request.path == '/login': if request.path != '/login':
return None return request
return request
my_vcr = vcr.VCR( my_vcr = vcr.VCR(
before_record_request=before_record_cb, before_record_request = before_record_cb,
) )
with my_vcr.use_cassette('test.yml'): with my_vcr.use_cassette('test.yml'):
# your http code here # your http code here
You can also mutate the request using this callback. For example, you You can also mutate the response using this callback. For example, you
could remove all query parameters from any requests to the ``'/login'`` could remove all query parameters from any requests to the ``'/login'``
path. path.
@@ -251,7 +246,7 @@ path.
def scrub_login_request(request): def scrub_login_request(request):
if request.path == '/login': if request.path == '/login':
request.uri, _ = urllib.splitquery(request.uri) request.uri, _ = urllib.splitquery(response.uri)
return request return request
my_vcr = vcr.VCR( my_vcr = vcr.VCR(
@@ -263,12 +258,9 @@ path.
Custom Response Filtering Custom Response Filtering
~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~
You can also do response filtering with the VCR.py also suports response filtering with the
``before_record_response`` configuration option. Its usage is ``before_record_response`` keyword argument. It's usage is similar to
similar to the above ``before_record_request`` - you can that of ``before_record``:
mutate the response, or return ``None`` to avoid recording
the request and response altogether. For example to hide
sensitive data from the request body:
.. code:: python .. code:: python
@@ -310,8 +302,8 @@ in a few ways:
or 0.0.0.0. or 0.0.0.0.
- Set the ``ignore_hosts`` configuration option to a list of hosts to - Set the ``ignore_hosts`` configuration option to a list of hosts to
ignore ignore
- Add a ``before_record_request`` or ``before_record_response`` callback - Add a ``before_record`` callback that returns None for requests you
that returns ``None`` for requests you want to ignore (see above). want to ignore
Requests that are ignored by VCR will not be saved in a cassette, nor Requests that are ignored by VCR will not be saved in a cassette, nor
played back from a cassette. VCR will completely ignore those requests played back from a cassette. VCR will completely ignore those requests
@@ -372,16 +364,3 @@ cassette names, use ``VCR.ensure_suffix`` as follows:
@my_vcr.use_cassette @my_vcr.use_cassette
def my_test_function(): def my_test_function():
Rewind Cassette
---------------
VCR.py allows to rewind a cassette in order to replay it inside the same function/test.
.. code:: python
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml') as cass:
response = urllib2.urlopen('http://www.zombo.com/').read()
assert cass.all_played
a.rewind()
assert not cass.all_played

View File

@@ -1,27 +1,5 @@
Changelog Changelog
--------- ---------
- 2.1.1 (UNRELEASED)
- 2.1.0 - Add a `rewind` method to reset a cassette (thanks @khamidou)
New error message with more details on why the cassette failed to play a request (thanks @arthurHamon2, @neozenith)
Handle connect tunnel URI (thanks @jeking3)
Add code coverage to the project (thanks @neozenith)
Drop support to python 3.4
Add deprecation warning on python 2.7, next major release will drop python 2.7 support
Fix build problems on requests tests (thanks to @dunossauro)
Fix matching on 'body' failing when Unicode symbols are present in them (thanks @valgur)
Fix bugs on aiohttp integration (thanks @graingert, @steinnes, @stj, @lamenezes, @lmazuel)
Fix Biopython incompatibility (thanks @rishab121)
Fix Boto3 integration (thanks @1oglop1, @arthurHamon2)
- 2.0.1 - Fix bug when using vcrpy with python 3.4
- 2.0.0 - Support python 3.7 (fix httplib2 and urllib2, thanks @felixonmars)
[#356] Fixes `before_record_response` so the original response isn't changed (thanks @kgraves)
Fix requests stub when using proxy (thanks @samuelfekete @daneoshiga)
(only for aiohttp stub) Drop support to python 3.4 asyncio.coroutine (aiohttp doesn't support python it anymore)
Fix aiohttp stub to work with aiohttp client (thanks @stj)
Fix aiohttp stub to accept content type passed
Improve docs (thanks @adamchainz)
- 1.13.0 - Fix support to latest aiohttp version (3.3.2). Fix content-type bug in aiohttp stub. Save URL with query params properly when using aiohttp.
- 1.12.0 - Fix support to latest aiohttp version (3.2.1), Adapted setup to PEP508, Support binary responses on aiohttp, Dropped support for EOL python versions (2.6 and 3.3)
- 1.11.1 Fix compatibility with newest requests and urllib3 releases - 1.11.1 Fix compatibility with newest requests and urllib3 releases
- 1.11.0 Allow injection of persistence methods + bugfixes (thanks @j-funk and @IvanMalison), - 1.11.0 Allow injection of persistence methods + bugfixes (thanks @j-funk and @IvanMalison),
Support python 3.6 + CI tests (thanks @derekbekoe and @graingert), Support python 3.6 + CI tests (thanks @derekbekoe and @graingert),

View File

@@ -11,10 +11,7 @@ yourself using `py.test <http://pytest.org/>`__ and
all environments VCR.py supports. The test suite is pretty big and slow, all environments VCR.py supports. The test suite is pretty big and slow,
but you can tell tox to only run specific tests like this:: but you can tell tox to only run specific tests like this::
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through> tox -e py27requests -- -v -k "'test_status_code or test_gzip'"
tox -e py27-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py37-requests -- -v --last-failed
This will run only tests that look like ``test_status_code`` or This will run only tests that look like ``test_status_code`` or
``test_gzip`` in the test suite, and only in the python 2.7 environment ``test_gzip`` in the test suite, and only in the python 2.7 environment
@@ -26,24 +23,3 @@ documentation <https://boto.readthedocs.io/en/latest/getting_started.html>`__
for how to set this up. I have marked the boto tests as optional in for how to set this up. I have marked the boto tests as optional in
Travis so you don't have to worry about them failing if you submit a Travis so you don't have to worry about them failing if you submit a
pull request. pull request.
Troubleshooting on MacOSX
-------------------------
If you have this kind of error when running tox :
.. code:: python
__main__.ConfigurationError: Curl is configured to use SSL, but we have
not been able to determine which SSL backend it is using. Please see PycURL documentation for how to specify the SSL backend manually.
Then you need to define some environment variables:
.. code:: bash
export PYCURL_SSL_LIBRARY=openssl
export LDFLAGS=-L/usr/local/opt/openssl/lib
export CPPFLAGS=-I/usr/local/opt/openssl/include
Reference : `stackoverflow issue <https://stackoverflow.com/questions/51019622/curl-is-configured-to-use-ssl-but-we-have-not-been-able-to-determine-which-ssl>`__

View File

@@ -29,29 +29,3 @@ The second time, you will see::
If you set the loglevel to DEBUG, you will also get information about If you set the loglevel to DEBUG, you will also get information about
which matchers didn't match. This can help you with debugging custom which matchers didn't match. This can help you with debugging custom
matchers. matchers.
CannotOverwriteExistingCassetteException
========================================
When a request failed to be found in an existing cassette,
VCR.py tries to get the request(s) that may be similar to the one being searched.
The goal is to see which matcher(s) failed and understand what part of the failed request may have changed.
It can return multiple similar requests with :
- the matchers that have succeeded
- the matchers that have failed
- for each failed matchers, why it has failed with an assertion message
CannotOverwriteExistingCassetteException message example :
.. code:: python
CannotOverwriteExistingCassetteException: Can't overwrite existing cassette ('cassette.yaml') in your current record mode ('once').
No match for the request (<Request (GET) https://www.googleapis.com/?alt=json&maxResults=200>) was found.
Found 1 similar requests with 1 different matchers :
1 - (<Request (GET) https://www.googleapis.com/?alt=json&maxResults=500>).
Matchers succeeded : ['method', 'scheme', 'host', 'port', 'path']
Matchers failed :
query - assertion failure :
[('alt', 'json'), ('maxResults', '200')] != [('alt', 'json'), ('maxResults', '500')]

View File

@@ -9,20 +9,18 @@ with pip::
Compatibility Compatibility
------------- -------------
VCR.py supports Python 2.7 and 3.5+, and VCR.py supports Python 2.7 and 3.4+, and
`pypy <http://pypy.org>`__. `pypy <http://pypy.org>`__.
The following HTTP libraries are supported: The following http libraries are supported:
- ``aiohttp`` - urllib2
- ``boto`` - urllib3
- ``boto3`` - http.client (python3)
- ``http.client`` - requests (both 1.x and 2.x versions)
- ``httplib2`` - httplib2
- ``requests`` (both 1.x and 2.x versions) - boto
- ``tornado.httpclient`` - Tornado's AsyncHTTPClient
- ``urllib2``
- ``urllib3``
Speed Speed
----- -----

View File

@@ -11,7 +11,7 @@ Usage
assert 'Example domains' in response assert 'Example domains' in response
Run this test once, and VCR.py will record the HTTP request to Run this test once, and VCR.py will record the HTTP request to
``fixtures/vcr_cassettes/synopsis.yaml``. Run it again, and VCR.py will ``fixtures/vcr_cassettes/synopsis.yml``. Run it again, and VCR.py will
replay the response from iana.org when the http request is made. This replay the response from iana.org when the http request is made. This
test is now fast (no real HTTP requests are made anymore), deterministic test is now fast (no real HTTP requests are made anymore), deterministic
(the test will continue to pass, even if you are offline, or iana.org (the test will continue to pass, even if you are offline, or iana.org
@@ -95,9 +95,3 @@ Unittest Integration
While it's possible to use the context manager or decorator forms with unittest, While it's possible to use the context manager or decorator forms with unittest,
there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest
<https://github.com/agriffis/vcrpy-unittest>`__. <https://github.com/agriffis/vcrpy-unittest>`__.
Pytest Integration
------------------
A Pytest plugin is available here : `pytest-vcr
<https://github.com/ktosiek/pytest-vcr>`__.

View File

@@ -1,7 +1,3 @@
#!/bin/bash #!/bin/bash
# https://blog.ionelmc.ro/2015/04/14/tox-tricks-and-patterns/#when-it-inevitably-leads-to-shell-scripts
# If you are getting an INVOCATION ERROR for this script then there is
# a good chance you are running on Windows.
# You can and should use WSL for running tox on Windows when it calls bash scripts.
REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` py.test $* REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` py.test $*

View File

@@ -28,7 +28,7 @@ install_requires = [
'six>=1.5', 'six>=1.5',
'contextlib2; python_version=="2.7"', 'contextlib2; python_version=="2.7"',
'mock; python_version=="2.7"', 'mock; python_version=="2.7"',
'yarl; python_version>="3.5"', 'yarl; python_version>="3.4"',
] ]
excluded_packages = ["tests*"] excluded_packages = ["tests*"]
@@ -37,7 +37,7 @@ if sys.version_info[0] == 2:
setup( setup(
name='vcrpy', name='vcrpy',
version='2.1.0', version='1.11.1',
description=( description=(
"Automatically mock your HTTP interactions to simplify and " "Automatically mock your HTTP interactions to simplify and "
"speed up testing" "speed up testing"
@@ -47,21 +47,21 @@ setup(
author_email='me@kevinmccarthy.org', author_email='me@kevinmccarthy.org',
url='https://github.com/kevin1024/vcrpy', url='https://github.com/kevin1024/vcrpy',
packages=find_packages(exclude=excluded_packages), packages=find_packages(exclude=excluded_packages),
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*', python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*',
install_requires=install_requires, install_requires=install_requires,
license='MIT', license='MIT',
tests_require=['pytest', 'mock', 'pytest-httpbin'], tests_require=['pytest', 'mock', 'pytest-httpbin'],
classifiers=[ classifiers=[
'Development Status :: 5 - Production/Stable', 'Development Status :: 4 - Beta',
'Environment :: Console', 'Environment :: Console',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Programming Language :: Python', 'Programming Language :: Python',
'Programming Language :: Python :: 2', 'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: Implementation :: CPython', 'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy', 'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Software Development :: Testing', 'Topic :: Software Development :: Testing',

View File

@@ -9,7 +9,7 @@ interactions:
method: GET method: GET
uri: http://httpbin.org/ip uri: http://httpbin.org/ip
response: response:
body: {string: "{\n \"origin\": \"217.122.164.194\"\n}"} body: {string: !!python/unicode "{\n \"origin\": \"217.122.164.194\"\n}"}
headers: headers:
access-control-allow-origin: ['*'] access-control-allow-origin: ['*']
content-type: [application/json] content-type: [application/json]

View File

@@ -1,43 +1,15 @@
# flake8: noqa
import asyncio import asyncio
import aiohttp import aiohttp
from aiohttp.test_utils import TestClient
async def aiohttp_request(loop, method, url, output='text', encoding='utf-8', content_type=None, **kwargs): @asyncio.coroutine
session = aiohttp.ClientSession(loop=loop) def aiohttp_request(loop, method, url, output='text', **kwargs):
response_ctx = session.request(method, url, **kwargs) with aiohttp.ClientSession(loop=loop) as session:
response = yield from session.request(method, url, **kwargs) # NOQA: E999
response = await response_ctx.__aenter__() if output == 'text':
if output == 'text': content = yield from response.text() # NOQA: E999
content = await response.text() elif output == 'json':
elif output == 'json': content = yield from response.json() # NOQA: E999
content_type = content_type or 'application/json' elif output == 'raw':
content = await response.json(encoding=encoding, content_type=content_type) content = yield from response.read() # NOQA: E999
elif output == 'raw': return response, content
content = await response.read()
elif output == 'stream':
content = await response.content.read()
response_ctx._resp.close()
await session.close()
return response, content
def aiohttp_app():
async def hello(request):
return aiohttp.web.Response(text='hello')
async def json(request):
return aiohttp.web.json_response({})
async def json_empty_body(request):
return aiohttp.web.json_response()
app = aiohttp.web.Application()
app.router.add_get('/', hello)
app.router.add_get('/json', json)
app.router.add_get('/json/empty', json_empty_body)
return app

View File

@@ -0,0 +1,13 @@
import aiohttp
import pytest
import vcr
@vcr.use_cassette()
@pytest.mark.asyncio
async def test_http(): # noqa: E999
async with aiohttp.ClientSession() as session:
url = 'https://httpbin.org/get'
params = {'ham': 'spam'}
resp = await session.get(url, params=params) # noqa: E999
assert (await resp.json())['args'] == {'ham': 'spam'} # noqa: E999

View File

@@ -1,12 +1,18 @@
import contextlib
import logging
import pytest import pytest
asyncio = pytest.importorskip("asyncio")
aiohttp = pytest.importorskip("aiohttp") aiohttp = pytest.importorskip("aiohttp")
import asyncio # noqa: E402
import contextlib # noqa: E402
import pytest # noqa: E402
import vcr # noqa: E402 import vcr # noqa: E402
from .aiohttp_utils import aiohttp_app, aiohttp_request # noqa: E402
from .aiohttp_utils import aiohttp_request # noqa: E402
try:
from .async_def import test_http # noqa: F401
except SyntaxError:
pass
def run_in_loop(fn): def run_in_loop(fn):
@@ -48,32 +54,14 @@ def test_status(tmpdir, scheme):
assert cassette.play_count == 1 assert cassette.play_count == 1
@pytest.mark.parametrize("auth", [None, aiohttp.BasicAuth("vcrpy", "test")]) def test_headers(tmpdir, scheme):
def test_headers(tmpdir, scheme, auth):
url = scheme + '://httpbin.org' url = scheme + '://httpbin.org'
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))): with vcr.use_cassette(str(tmpdir.join('headers.yaml'))):
response, _ = get(url, auth=auth) response, _ = get(url)
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cassette:
if auth is not None:
request = cassette.requests[0]
assert "AUTHORIZATION" in request.headers
cassette_response, _ = get(url, auth=auth)
assert dict(cassette_response.headers) == dict(response.headers)
assert cassette.play_count == 1
assert 'istr' not in cassette.data[0]
assert 'yarl.URL' not in cassette.data[0]
def test_case_insensitive_headers(tmpdir, scheme):
url = scheme + '://httpbin.org'
with vcr.use_cassette(str(tmpdir.join('whatever.yaml'))):
_, _ = get(url)
with vcr.use_cassette(str(tmpdir.join('whatever.yaml'))) as cassette:
cassette_response, _ = get(url) cassette_response, _ = get(url)
assert "Content-Type" in cassette_response.headers assert cassette_response.headers == response.headers
assert "content-type" in cassette_response.headers
assert cassette.play_count == 1 assert cassette.play_count == 1
@@ -90,13 +78,11 @@ def test_text(tmpdir, scheme):
def test_json(tmpdir, scheme): def test_json(tmpdir, scheme):
url = scheme + '://httpbin.org/get' url = scheme + '://httpbin.org/get'
headers = {'Content-Type': 'application/json'}
with vcr.use_cassette(str(tmpdir.join('json.yaml'))): with vcr.use_cassette(str(tmpdir.join('json.yaml'))):
_, response_json = get(url, output='json', headers=headers) _, response_json = get(url, output='json')
with vcr.use_cassette(str(tmpdir.join('json.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('json.yaml'))) as cassette:
_, cassette_response_json = get(url, output='json', headers=headers) _, cassette_response_json = get(url, output='json')
assert cassette_response_json == response_json assert cassette_response_json == response_json
assert cassette.play_count == 1 assert cassette.play_count == 1
@@ -112,68 +98,38 @@ def test_binary(tmpdir, scheme):
assert cassette.play_count == 1 assert cassette.play_count == 1
def test_stream(tmpdir, scheme): def test_post(tmpdir, scheme):
url = scheme + '://httpbin.org/get'
with vcr.use_cassette(str(tmpdir.join('stream.yaml'))):
resp, body = get(url, output='raw') # Do not use stream here, as the stream is exhausted by vcr
with vcr.use_cassette(str(tmpdir.join('stream.yaml'))) as cassette:
cassette_resp, cassette_body = get(url, output='stream')
assert cassette_body == body
assert cassette.play_count == 1
@pytest.mark.parametrize('body', ['data', 'json'])
def test_post(tmpdir, scheme, body, caplog):
caplog.set_level(logging.INFO)
data = {'key1': 'value1', 'key2': 'value2'} data = {'key1': 'value1', 'key2': 'value2'}
url = scheme + '://httpbin.org/post' url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('post.yaml'))): with vcr.use_cassette(str(tmpdir.join('post.yaml'))):
_, response_json = post(url, **{body: data}) _, response_json = post(url, data=data)
with vcr.use_cassette(str(tmpdir.join('post.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('post.yaml'))) as cassette:
request = cassette.requests[0] _, cassette_response_json = post(url, data=data)
assert request.body == data
_, cassette_response_json = post(url, **{body: data})
assert cassette_response_json == response_json assert cassette_response_json == response_json
assert cassette.play_count == 1 assert cassette.play_count == 1
assert next(
(
log
for log in caplog.records
if log.getMessage()
== '<Request (POST) {}> not in cassette, sending to real server'.format(url)
),
None,
), 'Log message not found.'
def test_params(tmpdir, scheme): def test_params(tmpdir, scheme):
url = scheme + '://httpbin.org/get' url = scheme + '://httpbin.org/get'
headers = {'Content-Type': 'application/json'}
params = {'a': 1, 'b': False, 'c': 'c'} params = {'a': 1, 'b': False, 'c': 'c'}
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, response_json = get(url, output='json', params=params)
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, response_json = get(url, output='json', params=params, headers=headers) _, cassette_response_json = get(url, output='json', params=params)
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, cassette_response_json = get(url, output='json', params=params, headers=headers)
assert cassette_response_json == response_json assert cassette_response_json == response_json
assert cassette.play_count == 1 assert cassette.play_count == 1
def test_params_same_url_distinct_params(tmpdir, scheme): def test_params_same_url_distinct_params(tmpdir, scheme):
url = scheme + '://httpbin.org/get' url = scheme + '://httpbin.org/get'
headers = {'Content-Type': 'application/json'}
params = {'a': 1, 'b': False, 'c': 'c'} params = {'a': 1, 'b': False, 'c': 'c'}
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, response_json = get(url, output='json', params=params)
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette: with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, response_json = get(url, output='json', params=params, headers=headers) _, cassette_response_json = get(url, output='json', params=params)
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, cassette_response_json = get(url, output='json', params=params, headers=headers)
assert cassette_response_json == response_json assert cassette_response_json == response_json
assert cassette.play_count == 1 assert cassette.play_count == 1
@@ -182,83 +138,3 @@ def test_params_same_url_distinct_params(tmpdir, scheme):
response, cassette_response_text = get(url, output='text', params=other_params) response, cassette_response_text = get(url, output='text', params=other_params)
assert 'No match for the request' in cassette_response_text assert 'No match for the request' in cassette_response_text
assert response.status == 599 assert response.status == 599
def test_params_on_url(tmpdir, scheme):
url = scheme + '://httpbin.org/get?a=1&b=foo'
headers = {'Content-Type': 'application/json'}
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, response_json = get(url, output='json', headers=headers)
request = cassette.requests[0]
assert request.url == url
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
_, cassette_response_json = get(url, output='json', headers=headers)
request = cassette.requests[0]
assert request.url == url
assert cassette_response_json == response_json
assert cassette.play_count == 1
def test_aiohttp_test_client(aiohttp_client, tmpdir):
loop = asyncio.get_event_loop()
app = aiohttp_app()
url = '/'
client = loop.run_until_complete(aiohttp_client(app))
with vcr.use_cassette(str(tmpdir.join('get.yaml'))):
response = loop.run_until_complete(client.get(url))
assert response.status == 200
response_text = loop.run_until_complete(response.text())
assert response_text == 'hello'
response_text = loop.run_until_complete(response.text(errors='replace'))
assert response_text == 'hello'
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
response = loop.run_until_complete(client.get(url))
request = cassette.requests[0]
assert request.url == str(client.make_url(url))
response_text = loop.run_until_complete(response.text())
assert response_text == 'hello'
assert cassette.play_count == 1
def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
loop = asyncio.get_event_loop()
app = aiohttp_app()
url = '/json/empty'
client = loop.run_until_complete(aiohttp_client(app))
with vcr.use_cassette(str(tmpdir.join('get.yaml'))):
response = loop.run_until_complete(client.get(url))
assert response.status == 200
response_json = loop.run_until_complete(response.json())
assert response_json is None
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
response = loop.run_until_complete(client.get(url))
request = cassette.requests[0]
assert request.url == str(client.make_url(url))
response_json = loop.run_until_complete(response.json())
assert response_json is None
assert cassette.play_count == 1
def test_redirect(aiohttp_client, tmpdir):
url = 'https://httpbin.org/redirect/2'
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))):
response, _ = get(url)
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cassette:
cassette_response, _ = get(url)
assert cassette_response.status == response.status
assert len(cassette_response.history) == len(response.history)
assert len(cassette) == 3
assert cassette.play_count == 3

View File

@@ -1,60 +1,15 @@
import pytest import pytest
import os
boto3 = pytest.importorskip("boto3") boto3 = pytest.importorskip("boto3")
import boto3 # NOQA import boto3 # NOQA
import botocore # NOQA
import vcr # NOQA import vcr # NOQA
try: bucket = 'boto3-demo-1337' # a bucket you can access
from botocore import awsrequest # NOQA key = 'test/my_test.txt' # key with r+w access
content = 'hello world i am a string' # content to put in the test file
botocore_awsrequest = True
except ImportError:
botocore_awsrequest = False
# skip tests if boto does not use vendored requests anymore
# https://github.com/boto/botocore/pull/1495
boto3_skip_vendored_requests = pytest.mark.skipif(
botocore_awsrequest,
reason='botocore version {ver} does not use vendored requests anymore.'.format(
ver=botocore.__version__))
boto3_skip_awsrequest = pytest.mark.skipif(
not botocore_awsrequest,
reason='botocore version {ver} still uses vendored requests.'.format(
ver=botocore.__version__))
IAM_USER_NAME = "vcrpy"
@pytest.fixture def test_boto_stubs(tmpdir):
def iam_client():
def _iam_client(boto3_session=None):
if boto3_session is None:
boto3_session = boto3.Session(
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID', "default"),
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY', "default"),
aws_session_token=None,
region_name=os.environ.get('AWS_DEFAULT_REGION', "default"),
)
return boto3_session.client('iam')
return _iam_client
@pytest.fixture
def get_user(iam_client):
def _get_user(client=None, user_name=IAM_USER_NAME):
if client is None:
# Default client set with fixture `iam_client`
client = iam_client()
return client.get_user(UserName=user_name)
return _get_user
@boto3_skip_vendored_requests
def test_boto_vendored_stubs(tmpdir):
with vcr.use_cassette(str(tmpdir.join('boto3-stubs.yml'))): with vcr.use_cassette(str(tmpdir.join('boto3-stubs.yml'))):
# Perform the imports within the patched context so that # Perform the imports within the patched context so that
# HTTPConnection, VerifiedHTTPSConnection refers to the patched version. # HTTPConnection, VerifiedHTTPSConnection refers to the patched version.
@@ -68,50 +23,45 @@ def test_boto_vendored_stubs(tmpdir):
VerifiedHTTPSConnection('hostname.does.not.matter') VerifiedHTTPSConnection('hostname.does.not.matter')
@pytest.mark.skipif( def test_boto3_without_vcr():
os.environ.get("TRAVIS_PULL_REQUEST") != "false", s3_resource = boto3.resource('s3')
reason="Encrypted Environment Variables from Travis Repository Settings" b = s3_resource.Bucket(bucket)
" are disabled on PRs from forks. " b.put_object(Key=key, Body=content)
"https://docs.travis-ci.com/user/pull-requests/#pull-requests-and-security-restrictions"
)
def test_boto_medium_difficulty(tmpdir, get_user):
# retrieve content to check it
o = s3_resource.Object(bucket, key).get()
# decode for python3
assert content == o['Body'].read().decode('utf-8')
def test_boto_medium_difficulty(tmpdir):
s3_resource = boto3.resource('s3')
b = s3_resource.Bucket(bucket)
with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))): with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))):
response = get_user() b.put_object(Key=key, Body=content)
assert response['User']['UserName'] == IAM_USER_NAME o = s3_resource.Object(bucket, key).get()
assert content == o['Body'].read().decode('utf-8')
with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))) as cass: with vcr.use_cassette(str(tmpdir.join('boto3-medium.yml'))) as cass:
response = get_user() b.put_object(Key=key, Body=content)
assert response['User']['UserName'] == IAM_USER_NAME o = s3_resource.Object(bucket, key).get()
assert content == o['Body'].read().decode('utf-8')
assert cass.all_played assert cass.all_played
@pytest.mark.skipif( def test_boto_hardcore_mode(tmpdir):
os.environ.get("TRAVIS_PULL_REQUEST") != "false",
reason="Encrypted Environment Variables from Travis Repository Settings"
" are disabled on PRs from forks. "
"https://docs.travis-ci.com/user/pull-requests/#pull-requests-and-security-restrictions"
)
def test_boto_hardcore_mode(tmpdir, iam_client, get_user):
with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))): with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))):
ses = boto3.Session( s3_resource = boto3.resource('s3')
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), b = s3_resource.Bucket(bucket)
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'), b.put_object(Key=key, Body=content)
region_name=os.environ.get('AWS_DEFAULT_REGION'), o = s3_resource.Object(bucket, key).get()
) assert content == o['Body'].read().decode('utf-8')
client = iam_client(ses)
response = get_user(client=client)
assert response['User']['UserName'] == IAM_USER_NAME
with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))) as cass: with vcr.use_cassette(str(tmpdir.join('boto3-hardcore.yml'))) as cass:
ses = boto3.Session( s3_resource = boto3.resource('s3')
aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'), b = s3_resource.Bucket(bucket)
aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'), b.put_object(Key=key, Body=content)
aws_session_token=None, o = s3_resource.Object(bucket, key).get()
region_name=os.environ.get('AWS_DEFAULT_REGION'), assert content == o['Body'].read().decode('utf-8')
)
client = iam_client(ses)
response = get_user(client=client)
assert response['User']['UserName'] == IAM_USER_NAME
assert cass.all_played assert cass.all_played

View File

@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
'''Integration tests with httplib2''' '''Integration tests with httplib2'''
import sys # External imports
from six.moves.urllib_parse import urlencode from six.moves.urllib_parse import urlencode
import pytest import pytest
import pytest_httpbin.certs import pytest_httpbin.certs
# Internal imports
import vcr import vcr
from assertions import assert_cassette_has_one_response from assertions import assert_cassette_has_one_response
@@ -19,12 +19,7 @@ def http():
Returns an httplib2 HTTP instance Returns an httplib2 HTTP instance
with the certificate replaced by the httpbin one. with the certificate replaced by the httpbin one.
""" """
kwargs = { return httplib2.Http(ca_certs=pytest_httpbin.certs.where())
'ca_certs': pytest_httpbin.certs.where()
}
if sys.version_info[:2] in [(2, 7), (3, 7)]:
kwargs['disable_ssl_certificate_validation'] = True
return httplib2.Http(**kwargs)
def test_response_code(tmpdir, httpbin_both): def test_response_code(tmpdir, httpbin_both):

View File

@@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
'''Test using a proxy.'''
# External imports
import multiprocessing
import pytest
from six.moves import socketserver, SimpleHTTPServer
from six.moves.urllib.request import urlopen
# Internal imports
import vcr
# Conditional imports
requests = pytest.importorskip("requests")
class Proxy(SimpleHTTPServer.SimpleHTTPRequestHandler):
'''
Simple proxy server.
(Inspired by: http://effbot.org/librarybook/simplehttpserver.htm).
'''
def do_GET(self):
upstream_response = urlopen(self.path)
try:
status = upstream_response.status
headers = upstream_response.headers.items()
except AttributeError:
# In Python 2 the response is an addinfourl instance.
status = upstream_response.code
headers = upstream_response.info().items()
self.send_response(status, upstream_response.msg)
for header in headers:
self.send_header(*header)
self.end_headers()
self.copyfile(upstream_response, self.wfile)
@pytest.yield_fixture(scope='session')
def proxy_server():
httpd = socketserver.ThreadingTCPServer(('', 0), Proxy)
proxy_process = multiprocessing.Process(
target=httpd.serve_forever,
)
proxy_process.start()
yield 'http://{}:{}'.format(*httpd.server_address)
proxy_process.terminate()
def test_use_proxy(tmpdir, httpbin, proxy_server):
'''Ensure that it works with a proxy.'''
with vcr.use_cassette(str(tmpdir.join('proxy.yaml'))):
response = requests.get(httpbin.url, proxies={'http': proxy_server})
with vcr.use_cassette(str(tmpdir.join('proxy.yaml'))) as cassette:
cassette_response = requests.get(httpbin.url, proxies={'http': proxy_server})
assert cassette_response.headers == response.headers
assert cassette.play_count == 1

View File

@@ -116,10 +116,10 @@ def test_post_chunked_binary(tmpdir, httpbin):
assert req1 == req2 assert req1 == req2
@pytest.mark.skipif('sys.version_info >= (3, 6)', strict=True, raises=ConnectionError) @pytest.mark.xfail('sys.version_info >= (3, 6)', strict=True, raises=ConnectionError)
@pytest.mark.skipif((3, 5) < sys.version_info < (3, 6) and @pytest.mark.xfail((3, 5) < sys.version_info < (3, 6) and
platform.python_implementation() == 'CPython', platform.python_implementation() == 'CPython',
reason='Fails on CPython 3.5') reason='Fails on CPython 3.5')
def test_post_chunked_binary_secure(tmpdir, httpbin_secure): def test_post_chunked_binary_secure(tmpdir, httpbin_secure):
'''Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str.''' '''Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str.'''
data1 = iter([b'data', b'to', b'send']) data1 = iter([b'data', b'to', b'send'])
@@ -254,7 +254,7 @@ def test_nested_cassettes_with_session_created_before_nesting(httpbin_both, tmpd
def test_post_file(tmpdir, httpbin_both): def test_post_file(tmpdir, httpbin_both):
'''Ensure that we handle posting a file.''' '''Ensure that we handle posting a file.'''
url = httpbin_both + '/post' url = httpbin_both + '/post'
with vcr.use_cassette(str(tmpdir.join('post_file.yaml'))) as cass, open('tox.ini', 'rb') as f: with vcr.use_cassette(str(tmpdir.join('post_file.yaml'))) as cass, open('tox.ini') as f:
original_response = requests.post(url, f).content original_response = requests.post(url, f).content
# This also tests that we do the right thing with matching the body when they are files. # This also tests that we do the right thing with matching the body when they are files.
@@ -282,17 +282,3 @@ def test_filter_post_params(tmpdir, httpbin_both):
requests.post(url, data={'key': 'value'}) requests.post(url, data={'key': 'value'})
with vcr.use_cassette(cass_loc, filter_post_data_parameters=['key']) as cass: with vcr.use_cassette(cass_loc, filter_post_data_parameters=['key']) as cass:
assert b'key=value' not in cass.requests[0].body assert b'key=value' not in cass.requests[0].body
def test_post_unicode_match_on_body(tmpdir, httpbin_both):
'''Ensure that matching on POST body that contains Unicode characters works.'''
data = {'key1': 'value1', '●‿●': '٩(●̮̮̃•̃)۶'}
url = httpbin_both + '/post'
with vcr.use_cassette(str(tmpdir.join('requests.yaml')), additional_matchers=('body',)):
req1 = requests.post(url, data).content
with vcr.use_cassette(str(tmpdir.join('requests.yaml')), additional_matchers=('body',)):
req2 = requests.post(url, data).content
assert req1 == req2

View File

@@ -1,6 +1,5 @@
import vcr import vcr
import zlib import zlib
import json
import six.moves.http_client as httplib import six.moves.http_client as httplib
from assertions import assert_is_json from assertions import assert_is_json
@@ -84,50 +83,3 @@ def test_original_decoded_response_is_not_modified(tmpdir, httpbin):
assert 'content-encoding' not in inside.headers assert 'content-encoding' not in inside.headers
assert_is_json(inside.read()) assert_is_json(inside.read())
def _make_before_record_response(fields, replacement='[REDACTED]'):
def before_record_response(response):
string_body = response['body']['string'].decode('utf8')
body = json.loads(string_body)
for field in fields:
if field in body:
body[field] = replacement
response['body']['string'] = json.dumps(body).encode()
return response
return before_record_response
def test_original_response_is_not_modified_by_before_filter(tmpdir, httpbin):
testfile = str(tmpdir.join('sensitive_data_scrubbed_response.yml'))
host, port = httpbin.host, httpbin.port
field_to_scrub = 'url'
replacement = '[YOU_CANT_HAVE_THE_MANGO]'
conn = httplib.HTTPConnection(host, port)
conn.request('GET', '/get')
outside = conn.getresponse()
callback = _make_before_record_response([field_to_scrub], replacement)
with vcr.use_cassette(testfile, before_record_response=callback):
conn = httplib.HTTPConnection(host, port)
conn.request('GET', '/get')
inside = conn.getresponse()
# The scrubbed field should be the same, because no cassette existed.
# Furthermore, the responses should be identical.
inside_body = json.loads(inside.read().decode('utf-8'))
outside_body = json.loads(outside.read().decode('utf-8'))
assert not inside_body[field_to_scrub] == replacement
assert inside_body[field_to_scrub] == outside_body[field_to_scrub]
# Ensure that when a cassette exists, the scrubbed response is returned.
with vcr.use_cassette(testfile, before_record_response=callback):
conn = httplib.HTTPConnection(host, port)
conn.request('GET', '/get')
inside = conn.getresponse()
inside_body = json.loads(inside.read().decode('utf-8'))
assert inside_body[field_to_scrub] == replacement

View File

@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
'''Integration tests with urllib2''' '''Integration tests with urllib2'''
import ssl
from six.moves.urllib.request import urlopen from six.moves.urllib.request import urlopen
from six.moves.urllib_parse import urlencode from six.moves.urllib_parse import urlencode
import pytest_httpbin.certs import pytest_httpbin.certs
@@ -13,9 +12,7 @@ from assertions import assert_cassette_has_one_response
def urlopen_with_cafile(*args, **kwargs): def urlopen_with_cafile(*args, **kwargs):
context = ssl.create_default_context(cafile=pytest_httpbin.certs.where()) kwargs['cafile'] = pytest_httpbin.certs.where()
context.check_hostname = False
kwargs['context'] = context
try: try:
return urlopen(*args, **kwargs) return urlopen(*args, **kwargs)
except TypeError: except TypeError:

View File

@@ -20,7 +20,7 @@ def verify_pool_mgr():
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def pool_mgr(): def pool_mgr():
return urllib3.PoolManager(cert_reqs='CERT_NONE') return urllib3.PoolManager()
def test_status_code(httpbin_both, tmpdir, verify_pool_mgr): def test_status_code(httpbin_both, tmpdir, verify_pool_mgr):

View File

@@ -59,7 +59,7 @@ def test_flickr_multipart_upload(httpbin, tmpdir):
def test_flickr_should_respond_with_200(tmpdir): def test_flickr_should_respond_with_200(tmpdir):
testfile = str(tmpdir.join('flickr.yml')) testfile = str(tmpdir.join('flickr.yml'))
with vcr.use_cassette(testfile): with vcr.use_cassette(testfile):
r = requests.post("https://api.flickr.com/services/upload", verify=False) r = requests.post("http://api.flickr.com/services/upload")
assert r.status_code == 200 assert r.status_code == 200
@@ -81,23 +81,16 @@ def test_amazon_doctype(tmpdir):
assert 'html' in r.text assert 'html' in r.text
def start_rpc_server(q):
httpd = xmlrpc_server.SimpleXMLRPCServer(('127.0.0.1', 0))
httpd.register_function(pow)
q.put('http://{}:{}'.format(*httpd.server_address))
httpd.serve_forever()
@pytest.yield_fixture(scope='session') @pytest.yield_fixture(scope='session')
def rpc_server(): def rpc_server():
q = multiprocessing.Queue() httpd = xmlrpc_server.SimpleXMLRPCServer(('', 0))
httpd.register_function(pow)
proxy_process = multiprocessing.Process( proxy_process = multiprocessing.Process(
target=start_rpc_server, target=httpd.serve_forever,
args=(q,)
) )
try: try:
proxy_process.start() proxy_process.start()
yield q.get() yield 'http://{}:{}'.format(*httpd.server_address)
finally: finally:
proxy_process.terminate() proxy_process.terminate()

View File

@@ -133,17 +133,6 @@ def test_cassette_all_played():
assert a.all_played assert a.all_played
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
def test_cassette_rewound():
a = Cassette('test')
a.append('foo', 'bar')
a.play_response('foo')
assert a.all_played
a.rewind()
assert not a.all_played
def test_before_record_response(): def test_before_record_response():
before_record_response = mock.Mock(return_value='mutated') before_record_response = mock.Mock(return_value='mutated')
cassette = Cassette('test', before_record_response=before_record_response) cassette = Cassette('test', before_record_response=before_record_response)
@@ -317,51 +306,3 @@ def test_use_as_decorator_on_generator():
yield 2 yield 2
assert list(test_function()) == [1, 2] assert list(test_function()) == [1, 2]
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_one_similar_request(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
(["method"], [("path", "failed : path"), ("query", "failed : query")]),
(["method", "path"], [("query", "failed : query")]),
([], [("method", "failed : method"), ("path", "failed : path"), ("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == [(2, ["method", "path"], [("query", "failed : query")])]
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_no_similar_requests(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
([], [("path", "failed : path"), ("query", "failed : query")]),
([], [("path", "failed : path"), ("query", "failed : query")]),
([], [("path", "failed : path"), ("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == []
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_many_similar_requests(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
(["method", "path"], [("query", "failed : query")]),
(["method"], [("path", "failed : path"), ("query", "failed : query")]),
(["method", "path"], [("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == [
(1, ["method", "path"], [("query", "failed : query")]),
(3, ["method", "path"], [("query", "failed : query")])
]

View File

@@ -1,73 +0,0 @@
import pytest
from vcr.compat import mock
from vcr import errors
from vcr.cassette import Cassette
@mock.patch("vcr.cassette.Cassette.find_requests_with_most_matches")
@pytest.mark.parametrize(
"most_matches, expected_message",
[
# No request match found
(
[],
"No similar requests, that have not been played, found."
),
# One matcher failed
(
[("similar request", ["method", "path"], [("query", "failed : query")])],
"Found 1 similar requests with 1 different matcher(s) :\n"
"\n1 - ('similar request').\n"
"Matchers succeeded : ['method', 'path']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query\n"
),
# Multiple failed matchers
(
[("similar request", ["method"], [("query", "failed : query"), ("path", "failed : path")])],
"Found 1 similar requests with 2 different matcher(s) :\n"
"\n1 - ('similar request').\n"
"Matchers succeeded : ['method']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query\n"
"path - assertion failure :\n"
"failed : path\n"
),
# Multiple similar requests
(
[
("similar request", ["method"], [("query", "failed : query")]),
("similar request 2", ["method"], [("query", "failed : query 2")])
],
"Found 2 similar requests with 1 different matcher(s) :\n"
"\n1 - ('similar request').\n"
"Matchers succeeded : ['method']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query\n"
"\n2 - ('similar request 2').\n"
"Matchers succeeded : ['method']\n"
"Matchers failed :\n"
"query - assertion failure :\n"
"failed : query 2\n"
),
]
)
def test_CannotOverwriteExistingCassetteException_get_message(
mock_find_requests_with_most_matches, most_matches, expected_message):
mock_find_requests_with_most_matches.return_value = most_matches
cassette = Cassette("path")
failed_request = "request"
exception_message = errors.CannotOverwriteExistingCassetteException._get_message(
cassette, "request"
)
expected = "Can't overwrite existing cassette (%r) in your current record mode (%r).\n" \
"No match for the request (%r) was found.\n" \
"%s" % (cassette._path, cassette.record_mode, failed_request, expected_message)
assert exception_message == expected

View File

@@ -1,5 +1,4 @@
import itertools import itertools
from vcr.compat import mock
import pytest import pytest
@@ -22,22 +21,20 @@ REQUESTS = {
def assert_matcher(matcher_name): def assert_matcher(matcher_name):
matcher = getattr(matchers, matcher_name) matcher = getattr(matchers, matcher_name)
for k1, k2 in itertools.permutations(REQUESTS, 2): for k1, k2 in itertools.permutations(REQUESTS, 2):
expecting_assertion_error = matcher_name in {k1, k2} matched = matcher(REQUESTS[k1], REQUESTS[k2])
if expecting_assertion_error: if matcher_name in {k1, k2}:
with pytest.raises(AssertionError): assert not matched
matcher(REQUESTS[k1], REQUESTS[k2])
else: else:
assert matcher(REQUESTS[k1], REQUESTS[k2]) is None assert matched
def test_uri_matcher(): def test_uri_matcher():
for k1, k2 in itertools.permutations(REQUESTS, 2): for k1, k2 in itertools.permutations(REQUESTS, 2):
expecting_assertion_error = {k1, k2} != {"base", "method"} matched = matchers.uri(REQUESTS[k1], REQUESTS[k2])
if expecting_assertion_error: if {k1, k2} != {'base', 'method'}:
with pytest.raises(AssertionError): assert not matched
matchers.uri(REQUESTS[k1], REQUESTS[k2])
else: else:
assert matchers.uri(REQUESTS[k1], REQUESTS[k2]) is None assert matched
req1_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>" req1_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
@@ -110,7 +107,7 @@ req2_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
) )
]) ])
def test_body_matcher_does_match(r1, r2): def test_body_matcher_does_match(r1, r2):
assert matchers.body(r1, r2) is None assert matchers.body(r1, r2)
@pytest.mark.parametrize("r1, r2", [ @pytest.mark.parametrize("r1, r2", [
@@ -138,128 +135,25 @@ def test_body_matcher_does_match(r1, r2):
) )
]) ])
def test_body_match_does_not_match(r1, r2): def test_body_match_does_not_match(r1, r2):
with pytest.raises(AssertionError): assert not matchers.body(r1, r2)
matchers.body(r1, r2)
def test_query_matcher(): def test_query_matcher():
req1 = request.Request("GET", "http://host.com/?a=b&c=d", "", {}) req1 = request.Request('GET', 'http://host.com/?a=b&c=d', '', {})
req2 = request.Request("GET", "http://host.com/?c=d&a=b", "", {}) req2 = request.Request('GET', 'http://host.com/?c=d&a=b', '', {})
assert matchers.query(req1, req2) is None assert matchers.query(req1, req2)
req1 = request.Request("GET", "http://host.com/?a=b&a=b&c=d", "", {}) req1 = request.Request('GET', 'http://host.com/?a=b&a=b&c=d', '', {})
req2 = request.Request("GET", "http://host.com/?a=b&c=d&a=b", "", {}) req2 = request.Request('GET', 'http://host.com/?a=b&c=d&a=b', '', {})
req3 = request.Request("GET", "http://host.com/?c=d&a=b&a=b", "", {}) req3 = request.Request('GET', 'http://host.com/?c=d&a=b&a=b', '', {})
assert matchers.query(req1, req2) is None assert matchers.query(req1, req2)
assert matchers.query(req1, req3) is None assert matchers.query(req1, req3)
def test_matchers(): def test_metchers():
assert_matcher("method") assert_matcher('method')
assert_matcher("scheme") assert_matcher('scheme')
assert_matcher("host") assert_matcher('host')
assert_matcher("port") assert_matcher('port')
assert_matcher("path") assert_matcher('path')
assert_matcher("query") assert_matcher('query')
def test_evaluate_matcher_does_match():
def bool_matcher(r1, r2):
return True
def assertion_matcher(r1, r2):
assert 1 == 1
r1, r2 = None, None
for matcher in [bool_matcher, assertion_matcher]:
match, assertion_msg = matchers._evaluate_matcher(matcher, r1, r2)
assert match is True
assert assertion_msg is None
def test_evaluate_matcher_does_not_match():
def bool_matcher(r1, r2):
return False
def assertion_matcher(r1, r2):
# This is like the "assert" statement preventing pytest to recompile it
raise AssertionError()
r1, r2 = None, None
for matcher in [bool_matcher, assertion_matcher]:
match, assertion_msg = matchers._evaluate_matcher(matcher, r1, r2)
assert match is False
assert not assertion_msg
def test_evaluate_matcher_does_not_match_with_assert_message():
def assertion_matcher(r1, r2):
# This is like the "assert" statement preventing pytest to recompile it
raise AssertionError("Failing matcher")
r1, r2 = None, None
match, assertion_msg = matchers._evaluate_matcher(assertion_matcher, r1, r2)
assert match is False
assert assertion_msg == "Failing matcher"
def test_get_assertion_message():
assert matchers.get_assertion_message(None) is None
assert matchers.get_assertion_message("") == ""
def test_get_assertion_message_with_details():
assertion_msg = "q1=1 != q2=1"
expected = assertion_msg
assert matchers.get_assertion_message(assertion_msg) == expected
@pytest.mark.parametrize(
"r1, r2, expected_successes, expected_failures",
[
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("GET", "http://host.com/p?a=b", "", {}),
["method", "path"],
[],
),
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("POST", "http://host.com/p?a=b", "", {}),
["path"],
["method"],
),
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("POST", "http://host.com/path?a=b", "", {}),
[],
["method", "path"],
),
],
)
def test_get_matchers_results(r1, r2, expected_successes, expected_failures):
successes, failures = matchers.get_matchers_results(
r1, r2, [matchers.method, matchers.path]
)
assert successes == expected_successes
assert len(failures) == len(expected_failures)
for i, expected_failure in enumerate(expected_failures):
assert failures[i][0] == expected_failure
assert failures[i][1] is not None
@mock.patch("vcr.matchers.get_matchers_results")
@pytest.mark.parametrize(
"successes, failures, expected_match",
[
(["method", "path"], [], True),
(["method"], ["path"], False),
([], ["method", "path"], False),
],
)
def test_requests_match(mock_get_matchers_results, successes, failures, expected_match):
mock_get_matchers_results.return_value = (successes, failures)
r1 = request.Request("GET", "http://host.com/p?a=b", "", {})
r2 = request.Request("GET", "http://host.com/p?a=b", "", {})
match = matchers.requests_match(r1, r2, [matchers.method, matchers.path])
assert match is expected_match

View File

@@ -5,12 +5,6 @@ import yaml
import vcr.migration import vcr.migration
# Use the libYAML versions if possible
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def test_try_migrate_with_json(tmpdir): def test_try_migrate_with_json(tmpdir):
cassette = tmpdir.join('cassette.json').strpath cassette = tmpdir.join('cassette.json').strpath
@@ -28,9 +22,9 @@ def test_try_migrate_with_yaml(tmpdir):
shutil.copy('tests/fixtures/migration/old_cassette.yaml', cassette) shutil.copy('tests/fixtures/migration/old_cassette.yaml', cassette)
assert vcr.migration.try_migrate(cassette) assert vcr.migration.try_migrate(cassette)
with open('tests/fixtures/migration/new_cassette.yaml', 'r') as f: with open('tests/fixtures/migration/new_cassette.yaml', 'r') as f:
expected_yaml = yaml.load(f, Loader=Loader) expected_yaml = yaml.load(f)
with open(cassette, 'r') as f: with open(cassette, 'r') as f:
actual_yaml = yaml.load(f, Loader=Loader) actual_yaml = yaml.load(f)
assert actual_yaml == expected_yaml assert actual_yaml == expected_yaml

View File

@@ -3,13 +3,9 @@ import pytest
from vcr.request import Request, HeadersDict from vcr.request import Request, HeadersDict
@pytest.mark.parametrize("method, uri, expected_str", [ def test_str():
('GET', 'http://www.google.com/', '<Request (GET) http://www.google.com/>'), req = Request('GET', 'http://www.google.com/', '', {})
('OPTIONS', '*', '<Request (OPTIONS) *>'), str(req) == '<Request (GET) http://www.google.com/>'
('CONNECT', 'host.some.where:1234', '<Request (CONNECT) host.some.where:1234>')
])
def test_str(method, uri, expected_str):
assert str(Request(method, uri, '', {})) == expected_str
def test_headers(): def test_headers():
@@ -33,21 +29,18 @@ def test_add_header_deprecated():
('https://go.com/', 443), ('https://go.com/', 443),
('https://go.com:443/', 443), ('https://go.com:443/', 443),
('https://go.com:3000/', 3000), ('https://go.com:3000/', 3000),
('*', None)
]) ])
def test_port(uri, expected_port): def test_port(uri, expected_port):
req = Request('GET', uri, '', {}) req = Request('GET', uri, '', {})
assert req.port == expected_port assert req.port == expected_port
@pytest.mark.parametrize("method, uri", [ def test_uri():
('GET', 'http://go.com/'), req = Request('GET', 'http://go.com/', '', {})
('GET', 'http://go.com:80/'), assert req.uri == 'http://go.com/'
('CONNECT', 'localhost:1234'),
('OPTIONS', '*') req = Request('GET', 'http://go.com:80/', '', {})
]) assert req.uri == 'http://go.com:80/'
def test_uri(method, uri):
assert Request(method, uri, '', {}).uri == uri
def test_HeadersDict(): def test_HeadersDict():

View File

@@ -1,9 +1,4 @@
# coding: UTF-8 # coding: UTF-8
import io
import unittest
import six
from vcr.stubs import VCRHTTPResponse from vcr.stubs import VCRHTTPResponse
@@ -71,52 +66,3 @@ def test_response_headers_should_have_correct_values():
assert response.headers.get('content-length') == "10806" assert response.headers.get('content-length') == "10806"
assert response.headers.get('date') == "Fri, 24 Oct 2014 18:35:37 GMT" assert response.headers.get('date') == "Fri, 24 Oct 2014 18:35:37 GMT"
@unittest.skipIf(six.PY2, "Regression test for Python3 only")
def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
"""
Regression test for https://github.com/kevin1024/vcrpy/issues/440
:return:
"""
recorded_response = {
"status": {
"message": "OK",
"code": 200
},
"headers": {
"content-length": ["0"],
"server": ["gunicorn/18.0"],
"connection": ["Close"],
"access-control-allow-credentials": ["true"],
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
"access-control-allow-origin": ["*"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b"\nPMID- 19416910\nOWN - NLM\nSTAT- MEDLINE\nDA - 20090513\nDCOM- "
b"20090622\nLR - "
b"20141209\nIS - 1091-6490 (Electronic)\nIS - 0027-8424 (Linking)\nVI - "
b"106\nIP - "
b"19\nDP - 2009 May 12\nTI - Genetic dissection of histone deacetylase "
b"requirement in "
b"tumor cells.\nPG - 7751-5\nLID - 10.1073/pnas.0903139106 [doi]\nAB - "
b"Histone "
b"deacetylase inhibitors (HDACi) represent a new group of drugs currently\n "
b" being "
b"tested in a wide variety of clinical applications. They are especially\n "
b" effective "
b"in preclinical models of cancer where they show antiproliferative\n "
b"action in many "
b"different types of cancer cells. Recently, the first HDACi was\n "
b"approved for the "
b"treatment of cutaneous T cell lymphomas. Most HDACi currently in\n "
b"clinical "
}
}
vcr_response = VCRHTTPResponse(recorded_response)
handle = io.TextIOWrapper(io.BufferedReader(vcr_response), encoding='utf-8')
handle = iter(handle)
articles = [line for line in handle]
assert len(articles) > 1

View File

@@ -147,7 +147,7 @@ def test_vcr_path_transformer():
# and it should still work with cassette_library_dir # and it should still work with cassette_library_dir
vcr = VCR(cassette_library_dir='/foo') vcr = VCR(cassette_library_dir='/foo')
with vcr.use_cassette('test') as cassette: with vcr.use_cassette('test') as cassette:
assert os.path.abspath(cassette._path) == os.path.abspath('/foo/test') assert cassette._path == '/foo/test'
@pytest.fixture @pytest.fixture

View File

@@ -1,16 +0,0 @@
import sys
def test_vcr_import_deprecation(recwarn):
if 'vcr' in sys.modules:
# Remove imported module entry if already loaded in another test
del sys.modules['vcr']
import vcr # noqa: F401
if sys.version_info[0] == 2:
assert len(recwarn) == 1
assert issubclass(recwarn[0].category, DeprecationWarning)
else:
assert len(recwarn) == 0

43
tox.ini
View File

@@ -1,21 +1,5 @@
[tox] [tox]
skip_missing_interpreters=true envlist = {py27,py35,py36,pypy}-{flakes,requests27,httplib2,urllib3121,tornado4,boto3,aiohttp}
envlist = cov-clean,{py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3,tornado4,boto3},{py35,py36,py37}-{aiohttp},cov-report
# Coverage environment tasks: cov-clean and cov-report
# https://pytest-cov.readthedocs.io/en/latest/tox.html
[testenv:cov-clean]
deps = coverage
skip_install=true
commands = coverage erase
[testenv:cov-report]
deps = coverage
skip_install=true
commands =
coverage html
coverage report --fail-under=75
[testenv:flakes] [testenv:flakes]
skipsdist = True skipsdist = True
@@ -26,37 +10,24 @@ commands =
deps = flake8 deps = flake8
[testenv] [testenv]
# Need to use develop install so that paths
# for aggregate code coverage combine
usedevelop=true
commands = commands =
./runtests.sh --cov=./vcr --cov-branch --cov-report=xml --cov-append {posargs} ./runtests.sh -n 4 {posargs}
deps = deps =
Flask Flask<1
mock mock
pytest pytest
pytest-httpbin pytest-httpbin
pytest-cov pytest-xdist
PyYAML PyYAML
ipaddress requests27: requests==2.7.0
requests: requests>=2.22.0
httplib2: httplib2 httplib2: httplib2
urllib3: urllib3 urllib3121: urllib3==1.21.1
{py27,py35,py36,pypy}-tornado4: tornado>=4,<5 {py27,py35,py36,pypy}-tornado4: tornado>=4,<5
{py27,py35,py36,pypy}-tornado4: pytest-tornado {py27,py35,py36,pypy}-tornado4: pytest-tornado
{py27,py35,py36}-tornado4: pycurl {py27,py35,py36}-tornado4: pycurl
boto3: boto3 boto3: boto3
boto3: urllib3 aiohttp: aiohttp<3
aiohttp: aiohttp
aiohttp: pytest-asyncio aiohttp: pytest-asyncio
aiohttp: pytest-aiohttp
depends =
{py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3,tornado4,boto3},{py35,py36,py37}-{aiohttp}: cov-clean
cov-report: {py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3,tornado4,boto3},{py35,py36,py37}-{aiohttp}
passenv =
AWS_ACCESS_KEY_ID
AWS_DEFAULT_REGION
AWS_SECRET_ACCESS_KEY
[flake8] [flake8]
max_line_length = 110 max_line_length = 110

View File

@@ -1,6 +1,4 @@
import logging import logging
import warnings
import sys
from .config import VCR from .config import VCR
# Set default logging handler to avoid "No handler found" warnings. # Set default logging handler to avoid "No handler found" warnings.
@@ -11,11 +9,6 @@ except ImportError:
def emit(self, record): def emit(self, record):
pass pass
if sys.version_info[0] == 2:
warnings.warn(
"Python 2.x support of vcrpy is deprecated and will be removed in an upcoming major release.",
DeprecationWarning
)
logging.getLogger(__name__).addHandler(NullHandler()) logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -1,3 +1,7 @@
async def handle_coroutine(vcr, fn): # noqa: E999 import asyncio
@asyncio.coroutine
def handle_coroutine(vcr, fn):
with vcr as cassette: with vcr as cassette:
return (await fn(cassette)) # noqa: E999 return (yield from fn(cassette)) # noqa: E999

View File

@@ -1,5 +1,4 @@
import collections import collections
import copy
import sys import sys
import inspect import inspect
import logging import logging
@@ -8,7 +7,7 @@ import wrapt
from .compat import contextlib from .compat import contextlib
from .errors import UnhandledHTTPRequestError from .errors import UnhandledHTTPRequestError
from .matchers import requests_match, uri, method, get_matchers_results from .matchers import requests_match, uri, method
from .patch import CassettePatcherBuilder from .patch import CassettePatcherBuilder
from .serializers import yamlserializer from .serializers import yamlserializer
from .persisters.filesystem import FilesystemPersister from .persisters.filesystem import FilesystemPersister
@@ -16,13 +15,11 @@ from .util import partition_dict
try: try:
from asyncio import iscoroutinefunction from asyncio import iscoroutinefunction
from ._handle_coroutine import handle_coroutine
except ImportError: except ImportError:
def iscoroutinefunction(*args, **kwargs): def iscoroutinefunction(*args, **kwargs):
return False return False
if sys.version_info[:2] >= (3, 5):
from ._handle_coroutine import handle_coroutine
else:
def handle_coroutine(*args, **kwags): def handle_coroutine(*args, **kwags):
raise NotImplementedError('Not implemented on Python 2') raise NotImplementedError('Not implemented on Python 2')
@@ -138,10 +135,7 @@ class CassetteContextDecorator(object):
except Exception: except Exception:
to_yield = coroutine.throw(*sys.exc_info()) to_yield = coroutine.throw(*sys.exc_info())
else: else:
try: to_yield = coroutine.send(to_send)
to_yield = coroutine.send(to_send)
except StopIteration:
break
def _handle_function(self, fn): def _handle_function(self, fn):
with self as cassette: with self as cassette:
@@ -190,7 +184,6 @@ class Cassette(object):
self._serializer = serializer or yamlserializer self._serializer = serializer or yamlserializer
self._match_on = match_on self._match_on = match_on
self._before_record_request = before_record_request or (lambda x: x) self._before_record_request = before_record_request or (lambda x: x)
log.info(self._before_record_request)
self._before_record_response = before_record_response or (lambda x: x) self._before_record_response = before_record_response or (lambda x: x)
self.inject = inject self.inject = inject
self.record_mode = record_mode self.record_mode = record_mode
@@ -226,13 +219,9 @@ class Cassette(object):
def append(self, request, response): def append(self, request, response):
"""Add a request, response pair to this cassette""" """Add a request, response pair to this cassette"""
log.info("Appending request %s and response %s", request, response)
request = self._before_record_request(request) request = self._before_record_request(request)
if not request: if not request:
return return
# Deepcopy is here because mutation of `response` will corrupt the
# real response.
response = copy.deepcopy(response)
response = self._before_record_response(response) response = self._before_record_response(response)
if response is None: if response is None:
return return
@@ -289,45 +278,6 @@ class Cassette(object):
% (self._path, request) % (self._path, request)
) )
def rewind(self):
self.play_counts = collections.Counter()
def find_requests_with_most_matches(self, request):
"""
Get the most similar request(s) stored in the cassette
of a given request as a list of tuples like this:
- the request object
- the successful matchers as string
- the failed matchers and the related assertion message with the difference details as strings tuple
This is useful when a request failed to be found,
we can get the similar request(s) in order to know what have changed in the request parts.
"""
best_matches = []
request = self._before_record_request(request)
for index, (stored_request, response) in enumerate(self.data):
successes, fails = get_matchers_results(request, stored_request, self._match_on)
best_matches.append((len(successes), stored_request, successes, fails))
best_matches.sort(key=lambda t: t[0], reverse=True)
# Get the first best matches (multiple if equal matches)
final_best_matches = []
if not best_matches:
return final_best_matches
previous_nb_success = best_matches[0][0]
for best_match in best_matches:
nb_success = best_match[0]
# Do not keep matches that have 0 successes,
# it means that the request is totally different from
# the ones stored in the cassette
if nb_success < 1 or previous_nb_success != nb_success:
break
previous_nb_success = nb_success
final_best_matches.append(best_match[1:])
return final_best_matches
def _as_dict(self): def _as_dict(self):
return {"requests": self.requests, "responses": self.responses} return {"requests": self.requests, "responses": self.responses}

View File

@@ -1,8 +1,5 @@
import copy import copy
try: import collections
from collections import abc as collections_abc # only works on python 3.3+
except ImportError:
import collections as collections_abc
import functools import functools
import inspect import inspect
import os import os
@@ -178,7 +175,7 @@ class VCR(object):
if decode_compressed_response: if decode_compressed_response:
filter_functions.append(filters.decode_response) filter_functions.append(filters.decode_response)
if before_record_response: if before_record_response:
if not isinstance(before_record_response, collections_abc.Iterable): if not isinstance(before_record_response, collections.Iterable):
before_record_response = (before_record_response,) before_record_response = (before_record_response,)
filter_functions.extend(before_record_response) filter_functions.extend(before_record_response)
@@ -244,7 +241,7 @@ class VCR(object):
filter_functions.append(self._build_ignore_hosts(hosts_to_ignore)) filter_functions.append(self._build_ignore_hosts(hosts_to_ignore))
if before_record_request: if before_record_request:
if not isinstance(before_record_request, collections_abc.Iterable): if not isinstance(before_record_request, collections.Iterable):
before_record_request = (before_record_request,) before_record_request = (before_record_request,)
filter_functions.extend(before_record_request) filter_functions.extend(before_record_request)

View File

@@ -1,38 +1,5 @@
class CannotOverwriteExistingCassetteException(Exception): class CannotOverwriteExistingCassetteException(Exception):
def __init__(self, *args, **kwargs): pass
self.cassette = kwargs["cassette"]
self.failed_request = kwargs["failed_request"]
message = self._get_message(kwargs["cassette"], kwargs["failed_request"])
super(CannotOverwriteExistingCassetteException, self).__init__(message)
@staticmethod
def _get_message(cassette, failed_request):
"""Get the final message related to the exception"""
# Get the similar requests in the cassette that
# have match the most with the request.
best_matches = cassette.find_requests_with_most_matches(failed_request)
if best_matches:
# Build a comprehensible message to put in the exception.
best_matches_msg = "Found {} similar requests with {} different matcher(s) :\n".format(
len(best_matches), len(best_matches[0][2]))
for idx, best_match in enumerate(best_matches, start=1):
request, succeeded_matchers, failed_matchers_assertion_msgs = best_match
best_matches_msg += "\n%s - (%r).\n" \
"Matchers succeeded : %s\n" \
"Matchers failed :\n" % (idx, request, succeeded_matchers)
for failed_matcher, assertion_msg in failed_matchers_assertion_msgs:
best_matches_msg += "%s - assertion failure :\n" \
"%s\n" % (failed_matcher, assertion_msg)
else:
best_matches_msg = "No similar requests, that have not been played, found."
return (
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r).\n"
"No match for the request (%r) was found.\n"
"%s"
% (cassette._path, cassette.record_mode, failed_request, best_matches_msg)
)
class UnhandledHTTPRequestError(KeyError): class UnhandledHTTPRequestError(KeyError):

View File

@@ -8,47 +8,35 @@ log = logging.getLogger(__name__)
def method(r1, r2): def method(r1, r2):
assert r1.method == r2.method, "{} != {}".format(r1.method, r2.method) return r1.method == r2.method
def uri(r1, r2): def uri(r1, r2):
assert r1.uri == r2.uri, "{} != {}".format(r1.uri, r2.uri) return r1.uri == r2.uri
def host(r1, r2): def host(r1, r2):
assert r1.host == r2.host, "{} != {}".format(r1.host, r2.host) return r1.host == r2.host
def scheme(r1, r2): def scheme(r1, r2):
assert r1.scheme == r2.scheme, "{} != {}".format(r1.scheme, r2.scheme) return r1.scheme == r2.scheme
def port(r1, r2): def port(r1, r2):
assert r1.port == r2.port, "{} != {}".format(r1.port, r2.port) return r1.port == r2.port
def path(r1, r2): def path(r1, r2):
assert r1.path == r2.path, "{} != {}".format(r1.path, r2.path) return r1.path == r2.path
def query(r1, r2): def query(r1, r2):
assert r1.query == r2.query, "{} != {}".format(r1.query, r2.query) return r1.query == r2.query
def raw_body(r1, r2): def raw_body(r1, r2):
assert read_body(r1) == read_body(r2) return read_body(r1) == read_body(r2)
def body(r1, r2):
transformer = _get_transformer(r1)
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
assert transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):
assert r1.headers == r2.headers, "{} != {}".format(r1.headers, r2.headers)
def _header_checker(value, header='Content-Type'): def _header_checker(value, header='Content-Type'):
@@ -68,12 +56,9 @@ def _transform_json(body):
_xml_header_checker = _header_checker('text/xml') _xml_header_checker = _header_checker('text/xml')
_xmlrpc_header_checker = _header_checker('xmlrpc', header='User-Agent') _xmlrpc_header_checker = _header_checker('xmlrpc', header='User-Agent')
_checker_transformer_pairs = ( _checker_transformer_pairs = (
(_header_checker('application/x-www-form-urlencoded'), (_header_checker('application/x-www-form-urlencoded'), urllib.parse.parse_qs),
lambda body: urllib.parse.parse_qs(body.decode('ascii'))), (_header_checker('application/json'), _transform_json),
(_header_checker('application/json'), (lambda request: _xml_header_checker(request) and _xmlrpc_header_checker(request), xmlrpc_client.loads),
_transform_json),
(lambda request: _xml_header_checker(request) and _xmlrpc_header_checker(request),
xmlrpc_client.loads),
) )
@@ -89,54 +74,28 @@ def _get_transformer(request):
return _identity return _identity
def requests_match(r1, r2, matchers): def body(r1, r2):
successes, failures = get_matchers_results(r1, r2, matchers) transformer = _get_transformer(r1)
if failures: r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
return transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):
return r1.headers == r2.headers
def _log_matches(r1, r2, matches):
differences = [m for m in matches if not m[0]]
if differences:
log.debug( log.debug(
"Requests {} and {} differ.\n" "Requests {} and {} differ according to "
"Failure details:\n" "the following matchers: {}".format(r1, r2, differences)
"{}".format(r1, r2, failures)
) )
return len(failures) == 0
def _evaluate_matcher(matcher_function, *args): def requests_match(r1, r2, matchers):
""" matches = [(m(r1, r2), m) for m in matchers]
Evaluate the result of a given matcher as a boolean with an assertion error message if any. _log_matches(r1, r2, matches)
It handles two types of matcher : return all(m[0] for m in matches)
- a matcher returning a boolean value.
- a matcher that only makes an assert, returning None or raises an assertion error.
"""
assertion_message = None
try:
match = matcher_function(*args)
match = True if match is None else match
except AssertionError as e:
match = False
assertion_message = str(e)
return match, assertion_message
def get_matchers_results(r1, r2, matchers):
"""
Get the comparison results of two requests as two list.
The first returned list represents the matchers names that passed.
The second list is the failed matchers as a string with failed assertion details if any.
"""
matches_success, matches_fails = [], []
for m in matchers:
matcher_name = m.__name__
match, assertion_message = _evaluate_matcher(m, r1, r2)
if match:
matches_success.append(matcher_name)
else:
assertion_message = get_assertion_message(assertion_message)
matches_fails.append((matcher_name, assertion_message))
return matches_success, matches_fails
def get_assertion_message(assertion_details):
"""
Get a detailed message about the failing matcher.
"""
return assertion_details

View File

@@ -68,7 +68,7 @@ def _migrate(data):
for item in data: for item in data:
req = item['request'] req = item['request']
res = item['response'] res = item['response']
uri = {k: req.pop(k) for k in PARTS} uri = dict((k, req.pop(k)) for k in PARTS)
req['uri'] = build_uri(**uri) req['uri'] = build_uri(**uri)
# convert headers to dict of lists # convert headers to dict of lists
headers = req['headers'] headers = req['headers']
@@ -100,7 +100,7 @@ def migrate_json(in_fp, out_fp):
def _list_of_tuples_to_dict(fs): def _list_of_tuples_to_dict(fs):
return {k: v for k, v in fs[0]} return dict((k, v) for k, v in fs[0])
def _already_migrated(data): def _already_migrated(data):
@@ -159,9 +159,9 @@ def main():
for (root, dirs, files) in os.walk(path) for (root, dirs, files) in os.walk(path)
for name in files) for name in files)
for file_path in files: for file_path in files:
migrated = try_migrate(file_path) migrated = try_migrate(file_path)
status = 'OK' if migrated else 'FAIL' status = 'OK' if migrated else 'FAIL'
sys.stderr.write("[{}] {}\n".format(status, file_path)) sys.stderr.write("[{}] {}\n".format(status, file_path))
sys.stderr.write("Done.\n") sys.stderr.write("Done.\n")

View File

@@ -6,29 +6,21 @@ from .compat import contextlib, mock
from .stubs import VCRHTTPConnection, VCRHTTPSConnection from .stubs import VCRHTTPConnection, VCRHTTPSConnection
from six.moves import http_client as httplib from six.moves import http_client as httplib
import logging
log = logging.getLogger(__name__)
# Save some of the original types for the purposes of unpatching # Save some of the original types for the purposes of unpatching
_HTTPConnection = httplib.HTTPConnection _HTTPConnection = httplib.HTTPConnection
_HTTPSConnection = httplib.HTTPSConnection _HTTPSConnection = httplib.HTTPSConnection
# Try to save the original types for boto3 # Try to save the original types for boto3
try: try:
from botocore.awsrequest import AWSHTTPSConnection, AWSHTTPConnection import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
except ImportError: except ImportError: # pragma: no cover
try: pass
import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
pass
else:
_Boto3VerifiedHTTPSConnection = cpool.VerifiedHTTPSConnection
_cpoolBoto3HTTPConnection = cpool.HTTPConnection
_cpoolBoto3HTTPSConnection = cpool.HTTPSConnection
else: else:
_Boto3VerifiedHTTPSConnection = AWSHTTPSConnection _Boto3VerifiedHTTPSConnection = cpool.VerifiedHTTPSConnection
_cpoolBoto3HTTPConnection = AWSHTTPConnection _cpoolBoto3HTTPConnection = cpool.HTTPConnection
_cpoolBoto3HTTPSConnection = AWSHTTPSConnection _cpoolBoto3HTTPSConnection = cpool.HTTPSConnection
cpool = None cpool = None
# Try to save the original types for urllib3 # Try to save the original types for urllib3
@@ -52,6 +44,7 @@ else:
_cpoolHTTPConnection = cpool.HTTPConnection _cpoolHTTPConnection = cpool.HTTPConnection
_cpoolHTTPSConnection = cpool.HTTPSConnection _cpoolHTTPSConnection = cpool.HTTPSConnection
# Try to save the original types for httplib2 # Try to save the original types for httplib2
try: try:
import httplib2 import httplib2
@@ -62,6 +55,7 @@ else:
_HTTPSConnectionWithTimeout = httplib2.HTTPSConnectionWithTimeout _HTTPSConnectionWithTimeout = httplib2.HTTPSConnectionWithTimeout
_SCHEME_TO_CONNECTION = httplib2.SCHEME_TO_CONNECTION _SCHEME_TO_CONNECTION = httplib2.SCHEME_TO_CONNECTION
# Try to save the original types for boto # Try to save the original types for boto
try: try:
import boto.https_connection import boto.https_connection
@@ -70,6 +64,7 @@ except ImportError: # pragma: no cover
else: else:
_CertValidatingHTTPSConnection = boto.https_connection.CertValidatingHTTPSConnection _CertValidatingHTTPSConnection = boto.https_connection.CertValidatingHTTPSConnection
# Try to save the original types for Tornado # Try to save the original types for Tornado
try: try:
import tornado.simple_httpclient import tornado.simple_httpclient
@@ -79,6 +74,7 @@ else:
_SimpleAsyncHTTPClient_fetch_impl = \ _SimpleAsyncHTTPClient_fetch_impl = \
tornado.simple_httpclient.SimpleAsyncHTTPClient.fetch_impl tornado.simple_httpclient.SimpleAsyncHTTPClient.fetch_impl
try: try:
import tornado.curl_httpclient import tornado.curl_httpclient
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
@@ -103,7 +99,6 @@ class CassettePatcherBuilder(object):
return self._build_patchers_from_mock_triples( return self._build_patchers_from_mock_triples(
function(self, *args, **kwargs) function(self, *args, **kwargs)
) )
return wrapped return wrapped
def __init__(self, cassette): def __init__(self, cassette):
@@ -189,26 +184,13 @@ class CassettePatcherBuilder(object):
return () return ()
return self._urllib3_patchers(cpool, requests_stubs) return self._urllib3_patchers(cpool, requests_stubs)
@_build_patchers_from_mock_triples_decorator
def _boto3(self): def _boto3(self):
try: try:
# botocore using awsrequest import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
import botocore.awsrequest as cpool
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
try: return ()
# botocore using vendored requests from .stubs import boto3_stubs
import botocore.vendored.requests.packages.urllib3.connectionpool as cpool return self._urllib3_patchers(cpool, boto3_stubs)
except ImportError: # pragma: no cover
pass
else:
from .stubs import boto3_stubs
yield self._urllib3_patchers(cpool, boto3_stubs)
else:
from .stubs import boto3_stubs
log.debug("Patching boto3 cpool with %s", cpool)
yield cpool.AWSHTTPConnectionPool, 'ConnectionCls', boto3_stubs.VCRRequestsHTTPConnection
yield cpool.AWSHTTPSConnectionPool, 'ConnectionCls', boto3_stubs.VCRRequestsHTTPSConnection
def _patched_get_conn(self, connection_pool_class, connection_class_getter): def _patched_get_conn(self, connection_pool_class, connection_class_getter):
get_conn = connection_pool_class._get_conn get_conn = connection_pool_class._get_conn
@@ -425,36 +407,22 @@ def reset_patchers():
yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls', _cpoolHTTPSConnection) yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls', _cpoolHTTPSConnection)
try: try:
# unpatch botocore with awsrequest import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
import botocore.awsrequest as cpool
except ImportError: # pragma: no cover except ImportError: # pragma: no cover
try: pass
# unpatch botocore with vendored requests
import botocore.vendored.requests.packages.urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
pass
else:
# unpatch requests v1.x
yield mock.patch.object(cpool, 'VerifiedHTTPSConnection', _Boto3VerifiedHTTPSConnection)
yield mock.patch.object(cpool, 'HTTPConnection', _cpoolBoto3HTTPConnection)
# unpatch requests v2.x
if hasattr(cpool.HTTPConnectionPool, 'ConnectionCls'):
yield mock.patch.object(cpool.HTTPConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPConnection)
yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPSConnection)
if hasattr(cpool, 'HTTPSConnection'):
yield mock.patch.object(cpool, 'HTTPSConnection', _cpoolBoto3HTTPSConnection)
else: else:
if hasattr(cpool.AWSHTTPConnectionPool, 'ConnectionCls'): # unpatch requests v1.x
yield mock.patch.object(cpool.AWSHTTPConnectionPool, 'ConnectionCls', yield mock.patch.object(cpool, 'VerifiedHTTPSConnection', _Boto3VerifiedHTTPSConnection)
yield mock.patch.object(cpool, 'HTTPConnection', _cpoolBoto3HTTPConnection)
# unpatch requests v2.x
if hasattr(cpool.HTTPConnectionPool, 'ConnectionCls'):
yield mock.patch.object(cpool.HTTPConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPConnection) _cpoolBoto3HTTPConnection)
yield mock.patch.object(cpool.AWSHTTPSConnectionPool, 'ConnectionCls', yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls',
_cpoolBoto3HTTPSConnection) _cpoolBoto3HTTPSConnection)
if hasattr(cpool, 'AWSHTTPSConnection'): if hasattr(cpool, 'HTTPSConnection'):
yield mock.patch.object(cpool, 'AWSHTTPSConnection', _cpoolBoto3HTTPSConnection) yield mock.patch.object(cpool, 'HTTPSConnection', _cpoolBoto3HTTPSConnection)
try: try:
import httplib2 as cpool import httplib2 as cpool

View File

@@ -2,9 +2,6 @@ import warnings
from six import BytesIO, text_type from six import BytesIO, text_type
from six.moves.urllib.parse import urlparse, parse_qsl from six.moves.urllib.parse import urlparse, parse_qsl
from .util import CaseInsensitiveDict from .util import CaseInsensitiveDict
import logging
log = logging.getLogger(__name__)
class Request(object): class Request(object):
@@ -21,7 +18,6 @@ class Request(object):
else: else:
self.body = body self.body = body
self.headers = headers self.headers = headers
log.debug("Invoking Request %s", self.uri)
@property @property
def headers(self): def headers(self):
@@ -62,10 +58,7 @@ class Request(object):
parse_uri = urlparse(self.uri) parse_uri = urlparse(self.uri)
port = parse_uri.port port = parse_uri.port
if port is None: if port is None:
try: port = {'https': 443, 'http': 80}[parse_uri.scheme]
port = {'https': 443, 'http': 80}[parse_uri.scheme]
except KeyError:
pass
return port return port
@property @property
@@ -98,7 +91,7 @@ class Request(object):
'method': self.method, 'method': self.method,
'uri': self.uri, 'uri': self.uri,
'body': self.body, 'body': self.body,
'headers': {k: [v] for k, v in self.headers.items()}, 'headers': dict(((k, [v]) for k, v in self.headers.items())),
} }
@classmethod @classmethod
@@ -119,7 +112,7 @@ class HeadersDict(CaseInsensitiveDict):
In addition, some servers sometimes send the same header more than once, In addition, some servers sometimes send the same header more than once,
and httplib *can* deal with this situation. and httplib *can* deal with this situation.
Furthermore, I wanted to keep the request and response cassette format as Futhermore, I wanted to keep the request and response cassette format as
similar as possible. similar as possible.
For this reason, in cassettes I keep a dict with lists as keys, but once For this reason, in cassettes I keep a dict with lists as keys, but once

View File

@@ -25,5 +25,5 @@ def serialize(cassette_dict):
original.end, original.end,
original.args[-1] + error_message original.args[-1] + error_message
) )
except TypeError: # py3 except TypeError as original: # py3
raise TypeError(error_message) raise TypeError(error_message)

View File

@@ -18,7 +18,7 @@ log = logging.getLogger(__name__)
class VCRFakeSocket(object): class VCRFakeSocket(object):
""" """
A socket that doesn't do anything! A socket that doesn't do anything!
Used when playing back cassettes, when there Used when playing back casssettes, when there
is no actual open socket. is no actual open socket.
""" """
@@ -60,10 +60,9 @@ def serialize_headers(response):
class VCRHTTPResponse(HTTPResponse): class VCRHTTPResponse(HTTPResponse):
""" """
Stub response class that gets returned instead of a HTTPResponse Stub reponse class that gets returned instead of a HTTPResponse
""" """
def __init__(self, recorded_response): def __init__(self, recorded_response):
self.fp = None
self.recorded_response = recorded_response self.recorded_response = recorded_response
self.reason = recorded_response['status']['message'] self.reason = recorded_response['status']['message']
self.status = self.code = recorded_response['status']['code'] self.status = self.code = recorded_response['status']['code']
@@ -94,30 +93,9 @@ class VCRHTTPResponse(HTTPResponse):
def read(self, *args, **kwargs): def read(self, *args, **kwargs):
return self._content.read(*args, **kwargs) return self._content.read(*args, **kwargs)
def readall(self):
return self._content.readall()
def readinto(self, *args, **kwargs):
return self._content.readinto(*args, **kwargs)
def readline(self, *args, **kwargs): def readline(self, *args, **kwargs):
return self._content.readline(*args, **kwargs) return self._content.readline(*args, **kwargs)
def readlines(self, *args, **kwargs):
return self._content.readlines(*args, **kwargs)
def seekable(self):
return self._content.seekable()
def tell(self):
return self._content.tell()
def isatty(self):
return self._content.isatty()
def seek(self, *args, **kwargs):
return self._content.seek(*args, **kwargs)
def close(self): def close(self):
self._closed = True self._closed = True
return True return True
@@ -143,9 +121,6 @@ class VCRHTTPResponse(HTTPResponse):
else: else:
return default return default
def readable(self):
return self._content.readable()
class VCRConnection(object): class VCRConnection(object):
# A reference to the cassette that's currently being patched in # A reference to the cassette that's currently being patched in
@@ -161,16 +136,12 @@ class VCRConnection(object):
def _uri(self, url): def _uri(self, url):
"""Returns request absolute URI""" """Returns request absolute URI"""
if url and not url.startswith('/'):
# Then this must be a proxy request.
return url
uri = "{}://{}{}{}".format( uri = "{}://{}{}{}".format(
self._protocol, self._protocol,
self.real_connection.host, self.real_connection.host,
self._port_postfix(), self._port_postfix(),
url, url,
) )
log.debug("Absolute URI: %s", uri)
return uri return uri
def _url(self, uri): def _url(self, uri):
@@ -197,8 +168,6 @@ class VCRConnection(object):
# allows me to compare the entire length of the response to see if it # allows me to compare the entire length of the response to see if it
# exists in the cassette. # exists in the cassette.
self._sock = VCRFakeSocket()
def putrequest(self, method, url, *args, **kwargs): def putrequest(self, method, url, *args, **kwargs):
""" """
httplib gives you more than one way to do it. This is a way httplib gives you more than one way to do it. This is a way
@@ -256,8 +225,11 @@ class VCRConnection(object):
self._vcr_request self._vcr_request
): ):
raise CannotOverwriteExistingCassetteException( raise CannotOverwriteExistingCassetteException(
cassette=self.cassette, "No match for the request (%r) was found. "
failed_request=self._vcr_request "Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (self._vcr_request, self.cassette._path,
self.cassette.record_mode)
) )
# Otherwise, we should send the request, then get the response # Otherwise, we should send the request, then get the response
@@ -319,13 +291,11 @@ class VCRConnection(object):
with force_reset(): with force_reset():
return self.real_connection.connect(*args, **kwargs) return self.real_connection.connect(*args, **kwargs)
self._sock = VCRFakeSocket()
@property @property
def sock(self): def sock(self):
if self.real_connection.sock: if self.real_connection.sock:
return self.real_connection.sock return self.real_connection.sock
return self._sock return VCRFakeSocket()
@sock.setter @sock.setter
def sock(self, value): def sock(self, value):
@@ -343,8 +313,6 @@ class VCRConnection(object):
with force_reset(): with force_reset():
self.real_connection = self._baseclass(*args, **kwargs) self.real_connection = self._baseclass(*args, **kwargs)
self._sock = None
def __setattr__(self, name, value): def __setattr__(self, name, value):
""" """
We need to define this because any attributes that are set on the We need to define this because any attributes that are set on the

View File

@@ -3,130 +3,59 @@ from __future__ import absolute_import
import asyncio import asyncio
import functools import functools
import logging
import json import json
from aiohttp import ClientResponse, streams from aiohttp import ClientResponse
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL from yarl import URL
from vcr.request import Request from vcr.request import Request
log = logging.getLogger(__name__)
class MockStream(asyncio.StreamReader, streams.AsyncStreamReaderMixin):
pass
class MockClientResponse(ClientResponse): class MockClientResponse(ClientResponse):
def __init__(self, method, url): # TODO: get encoding from header
super().__init__( @asyncio.coroutine
method=method, def json(self, *, encoding='utf-8', loads=json.loads): # NOQA: E999
url=url, return loads(self.content.decode(encoding))
writer=None,
continue100=None,
timer=None,
request_info=None,
traces=None,
loop=asyncio.get_event_loop(),
session=None,
)
async def json(self, *, encoding='utf-8', loads=json.loads, **kwargs): # NOQA: E999 @asyncio.coroutine
stripped = self._body.strip() def text(self, encoding='utf-8'):
if not stripped: return self.content.decode(encoding)
return None
return loads(stripped.decode(encoding)) @asyncio.coroutine
def read(self):
async def text(self, encoding='utf-8', errors='strict'): return self.content
return self._body.decode(encoding, errors=errors)
async def read(self):
return self._body
@asyncio.coroutine
def release(self): def release(self):
pass pass
@property
def content(self):
s = MockStream()
s.feed_data(self._body)
s.feed_eof()
return s
def build_response(vcr_request, vcr_response, history):
response = MockClientResponse(vcr_request.method, URL(vcr_response.get('url')))
response.status = vcr_response['status']['code']
response._body = vcr_response['body'].get('string', b'')
response.reason = vcr_response['status']['message']
response._headers = CIMultiDictProxy(CIMultiDict(vcr_response['headers']))
response._history = tuple(history)
response.close()
return response
def play_responses(cassette, vcr_request):
history = []
vcr_response = cassette.play_response(vcr_request)
response = build_response(vcr_request, vcr_response, history)
while cassette.can_play_response_for(vcr_request):
history.append(response)
vcr_response = cassette.play_response(vcr_request)
response = build_response(vcr_request, vcr_response, history)
return response
async def record_response(cassette, vcr_request, response, past=False):
body = {} if past else {'string': (await response.read())}
headers = {str(key): value for key, value in response.headers.items()}
vcr_response = {
'status': {
'code': response.status,
'message': response.reason,
},
'headers': headers,
'body': body, # NOQA: E999
'url': str(response.url),
}
cassette.append(vcr_request, vcr_response)
async def record_responses(cassette, vcr_request, response):
for past_response in response.history:
await record_response(cassette, vcr_request, past_response, past=True)
await record_response(cassette, vcr_request, response)
def vcr_request(cassette, real_request): def vcr_request(cassette, real_request):
@functools.wraps(real_request) @functools.wraps(real_request)
async def new_request(self, method, url, **kwargs): @asyncio.coroutine
def new_request(self, method, url, **kwargs):
headers = kwargs.get('headers') headers = kwargs.get('headers')
auth = kwargs.get('auth')
headers = self._prepare_headers(headers) headers = self._prepare_headers(headers)
data = kwargs.get('data', kwargs.get('json')) data = kwargs.get('data')
params = kwargs.get('params') params = kwargs.get('params')
if auth is not None:
headers['AUTHORIZATION'] = auth.encode()
request_url = URL(url)
if params: if params:
for k, v in params.items(): for k, v in params.items():
params[k] = str(v) params[k] = str(v)
request_url = URL(url).with_query(params)
request_url = URL(url).with_query(params)
vcr_request = Request(method, str(request_url), data, headers) vcr_request = Request(method, str(request_url), data, headers)
if cassette.can_play_response_for(vcr_request): if cassette.can_play_response_for(vcr_request):
return play_responses(cassette, vcr_request) vcr_response = cassette.play_response(vcr_request)
response = MockClientResponse(method, URL(vcr_response.get('url')))
response.status = vcr_response['status']['code']
response.content = vcr_response['body']['string']
response.reason = vcr_response['status']['message']
response.headers = vcr_response['headers']
response.close()
return response
if cassette.write_protected and cassette.filter_request(vcr_request): if cassette.write_protected and cassette.filter_request(vcr_request):
response = MockClientResponse(method, URL(url)) response = MockClientResponse(method, URL(url))
@@ -134,14 +63,23 @@ def vcr_request(cassette, real_request):
msg = ("No match for the request {!r} was found. Can't overwrite " msg = ("No match for the request {!r} was found. Can't overwrite "
"existing cassette {!r} in your current record mode {!r}.") "existing cassette {!r} in your current record mode {!r}.")
msg = msg.format(vcr_request, cassette._path, cassette.record_mode) msg = msg.format(vcr_request, cassette._path, cassette.record_mode)
response._body = msg.encode() response.content = msg.encode()
response.close() response.close()
return response return response
log.info('%s not in cassette, sending to real server', vcr_request) response = yield from real_request(self, method, url, **kwargs) # NOQA: E999
vcr_response = {
'status': {
'code': response.status,
'message': response.reason,
},
'headers': dict(response.headers),
'body': {'string': (yield from response.read())}, # NOQA: E999
'url': response.url,
}
cassette.append(vcr_request, vcr_response)
response = await real_request(self, method, url, **kwargs) # NOQA: E999
await record_responses(cassette, vcr_request, response)
return response return response
return new_request return new_request

View File

@@ -1,22 +1,11 @@
"""Stubs for boto3""" '''Stubs for boto3'''
import six
try:
# boto using awsrequest
from botocore.awsrequest import AWSHTTPConnection as HTTPConnection
from botocore.awsrequest import AWSHTTPSConnection as VerifiedHTTPSConnection
except ImportError: # pragma: nocover
# boto using vendored requests
# urllib3 defines its own HTTPConnection classes, which boto3 goes ahead and assumes
# you're using. It includes some polyfills for newer features missing in older pythons.
try:
from urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
except ImportError: # pragma: nocover
from requests.packages.urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
from botocore.vendored.requests.packages.urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
from ..stubs import VCRHTTPConnection, VCRHTTPSConnection from ..stubs import VCRHTTPConnection, VCRHTTPSConnection
# urllib3 defines its own HTTPConnection classes, which boto3 goes ahead and assumes
# you're using. It includes some polyfills for newer features missing in older pythons.
class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection): class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection):
_baseclass = HTTPConnection _baseclass = HTTPConnection
@@ -24,20 +13,3 @@ class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection):
class VCRRequestsHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection): class VCRRequestsHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection):
_baseclass = VerifiedHTTPSConnection _baseclass = VerifiedHTTPSConnection
def __init__(self, *args, **kwargs):
if six.PY3:
kwargs.pop('strict', None) # apparently this is gone in py3
# need to temporarily reset here because the real connection
# inherits from the thing that we are mocking out. Take out
# the reset if you want to see what I mean :)
from vcr.patch import force_reset
with force_reset():
self.real_connection = self._baseclass(*args, **kwargs)
# Make sure to set those attributes as it seems `AWSHTTPConnection` does not
# set them, making the connection to fail !
self.real_connection.assert_hostname = kwargs.get("assert_hostname", False)
self.real_connection.cert_reqs = kwargs.get("cert_reqs", 'CERT_NONE')
self._sock = None

View File

@@ -40,7 +40,6 @@ class VCRHTTPSConnectionWithTimeout(VCRHTTPSConnection,
'timeout', 'timeout',
'source_address', 'source_address',
'ca_certs', 'ca_certs',
'disable_ssl_certificate_validation',
} }
unknown_keys = set(kwargs.keys()) - safe_keys unknown_keys = set(kwargs.keys()) - safe_keys
safe_kwargs = kwargs.copy() safe_kwargs = kwargs.copy()

View File

@@ -75,8 +75,10 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
request, request,
599, 599,
error=CannotOverwriteExistingCassetteException( error=CannotOverwriteExistingCassetteException(
cassette=cassette, "No match for the request (%r) was found. "
failed_request=vcr_request "Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (vcr_request, cassette._path, cassette.record_mode)
), ),
request_time=self.io_loop.time() - request.start_time, request_time=self.io_loop.time() - request.start_time,
) )

View File

@@ -1,17 +1,13 @@
import collections
import types import types
try:
from collections.abc import Mapping, MutableMapping
except ImportError:
from collections import Mapping, MutableMapping
# Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py # Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py
class CaseInsensitiveDict(MutableMapping): class CaseInsensitiveDict(collections.MutableMapping):
""" """
A case-insensitive ``dict``-like object. A case-insensitive ``dict``-like object.
Implements all methods and operations of Implements all methods and operations of
``collections.abc.MutableMapping`` as well as dict's ``copy``. Also ``collections.MutableMapping`` as well as dict's ``copy``. Also
provides ``lower_items``. provides ``lower_items``.
All keys are expected to be strings. The structure remembers the All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``, case of the last key to be set, and ``iter(instance)``,
@@ -61,7 +57,7 @@ class CaseInsensitiveDict(MutableMapping):
) )
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, Mapping): if isinstance(other, collections.Mapping):
other = CaseInsensitiveDict(other) other = CaseInsensitiveDict(other)
else: else:
return NotImplemented return NotImplemented
@@ -118,10 +114,10 @@ def auto_decorate(
) )
def __new__(cls, name, bases, attributes_dict): def __new__(cls, name, bases, attributes_dict):
new_attributes_dict = { new_attributes_dict = dict(
attribute: maybe_decorate(attribute, value) (attribute, maybe_decorate(attribute, value))
for attribute, value in attributes_dict.items() for attribute, value in attributes_dict.items()
} )
return super(DecorateAll, cls).__new__( return super(DecorateAll, cls).__new__(
cls, name, bases, new_attributes_dict cls, name, bases, new_attributes_dict
) )