1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Merge branch 'master' into unicode-match-on-body

This commit is contained in:
Martin Valgur
2019-07-01 22:09:57 +03:00
committed by GitHub
36 changed files with 804 additions and 201 deletions

View File

@@ -1,5 +1,4 @@
language: python
sudo: false
before_install: openssl version
env:
global:
@@ -7,21 +6,38 @@ env:
- secure: LBSEg/gMj4u4Hrpo3zs6Y/1mTpd2RtcN49mZIFgTdbJ9IhpiNPqcEt647Lz94F9Eses2x2WbNuKqZKZZReY7QLbEzU1m0nN5jlaKrjcG5NR5clNABfFFyhgc0jBikyS4abAG8jc2efeaTrFuQwdoF4sE8YiVrkiVj2X5Xoi6sBk=
matrix:
- TOX_SUFFIX="flakes"
- TOX_SUFFIX="requests27"
- TOX_SUFFIX="requests"
- TOX_SUFFIX="httplib2"
- TOX_SUFFIX="boto3"
- TOX_SUFFIX="urllib3121"
- TOX_SUFFIX="tornado4"
- TOX_SUFFIX="aiohttp"
matrix:
include:
- env: TOX_SUFFIX="flakes"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="requests"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="httplib2"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="urllib3121"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="tornado4"
python: 3.7
dist: xenial
- env: TOX_SUFFIX="aiohttp"
python: 3.7
dist: xenial
allow_failures:
- env: TOX_SUFFIX="boto3"
- env: TOX_SUFFIX="aiohttp"
python: "pypy3.5-5.9.0"
exclude:
# Only run flakes on a single Python 2.x and a single 3.x
- env: TOX_SUFFIX="flakes"
python: 3.4
- env: TOX_SUFFIX="flakes"
python: 3.5
- env: TOX_SUFFIX="flakes"

View File

@@ -1,4 +1,4 @@
|PyPI| |Python versions| |Build Status| |Waffle Ready| |Gitter|
|PyPI| |Python versions| |Build Status| |Gitter|
VCR.py
======
@@ -41,19 +41,6 @@ VCR.py will detect the absence of a cassette file and once again record
all HTTP interactions, which will update them to correspond to the new
API.
Support
-------
VCR.py works great with the following HTTP clients:
- requests
- aiohttp
- urllib3
- tornado
- urllib2
- boto3
License
=======
@@ -61,13 +48,11 @@ This library uses the MIT license. See `LICENSE.txt <LICENSE.txt>`__ for
more details
.. |PyPI| image:: https://img.shields.io/pypi/v/vcrpy.svg
:target: https://pypi.python.org/pypi/vcrpy-unittest
.. |Python versions| image:: https://img.shields.io/pypi/pyversions/vcrpy-unittest.svg
:target: https://pypi.python.org/pypi/vcrpy-unittest
.. |Build Status| image:: https://secure.travis-ci.org/kevin1024/vcrpy.png?branch=master
:target: https://pypi.python.org/pypi/vcrpy
.. |Python versions| image:: https://img.shields.io/pypi/pyversions/vcrpy.svg
:target: https://pypi.python.org/pypi/vcrpy
.. |Build Status| image:: https://secure.travis-ci.org/kevin1024/vcrpy.svg?branch=master
:target: http://travis-ci.org/kevin1024/vcrpy
.. |Waffle Ready| image:: https://badge.waffle.io/kevin1024/vcrpy.png?label=ready&title=waffle
:target: https://waffle.io/kevin1024/vcrpy
.. |Gitter| image:: https://badges.gitter.im/Join%20Chat.svg
:alt: Join the chat at https://gitter.im/kevin1024/vcrpy
:target: https://gitter.im/kevin1024/vcrpy?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge

View File

@@ -97,8 +97,12 @@ Create your own method with the following signature
def my_matcher(r1, r2):
Your method receives the two requests and must return ``True`` if they
match, ``False`` if they don't.
Your method receives the two requests and can return :
- Use an ``assert`` statement in the matcher, then we have ``None`` if they match, raise an `AssertionError`` if they don't.
- A boolean, ``True`` if they match, ``False`` if they don't.
Note : You should use an ``assert`` statement in order to have feedback when a matcher is failing.
Finally, register your method with VCR to use your new request matcher.
@@ -107,7 +111,7 @@ Finally, register your method with VCR to use your new request matcher.
import vcr
def jurassic_matcher(r1, r2):
return r1.uri == r2.uri and 'JURASSIC PARK' in r1.body
assert r1.uri == r2.uri and 'JURASSIC PARK' in r1.body
my_vcr = vcr.VCR()
my_vcr.register_matcher('jurassic', jurassic_matcher)
@@ -221,24 +225,25 @@ Custom Request filtering
~~~~~~~~~~~~~~~~~~~~~~~~
If none of these covers your request filtering needs, you can register a
callback that will manipulate the HTTP request before adding it to the
cassette. Use the ``before_record_request`` configuration option to so this.
Here is an example that will never record requests to the /login
endpoint.
callback with the ``before_record_request`` configuration option to
manipulate the HTTP request before adding it to the cassette, or return
``None`` to ignore it entirely. Here is an example that will never record
requests to the ``'/login'`` path:
.. code:: python
def before_record_cb(request):
if request.path != '/login':
return request
if request.path == '/login':
return None
return request
my_vcr = vcr.VCR(
before_record_request = before_record_cb,
before_record_request=before_record_cb,
)
with my_vcr.use_cassette('test.yml'):
# your http code here
You can also mutate the response using this callback. For example, you
You can also mutate the request using this callback. For example, you
could remove all query parameters from any requests to the ``'/login'``
path.
@@ -246,7 +251,7 @@ path.
def scrub_login_request(request):
if request.path == '/login':
request.uri, _ = urllib.splitquery(response.uri)
request.uri, _ = urllib.splitquery(request.uri)
return request
my_vcr = vcr.VCR(
@@ -258,9 +263,12 @@ path.
Custom Response Filtering
~~~~~~~~~~~~~~~~~~~~~~~~~
VCR.py also suports response filtering with the
``before_record_response`` keyword argument. It's usage is similar to
that of ``before_record``:
You can also do response filtering with the
``before_record_response`` configuration option. Its usage is
similar to the above ``before_record_request`` - you can
mutate the response, or return ``None`` to avoid recording
the request and response altogether. For example to hide
sensitive data from the request body:
.. code:: python
@@ -302,8 +310,8 @@ in a few ways:
or 0.0.0.0.
- Set the ``ignore_hosts`` configuration option to a list of hosts to
ignore
- Add a ``before_record`` callback that returns None for requests you
want to ignore
- Add a ``before_record_request`` or ``before_record_response`` callback
that returns ``None`` for requests you want to ignore (see above).
Requests that are ignored by VCR will not be saved in a cassette, nor
played back from a cassette. VCR will completely ignore those requests

View File

@@ -1,5 +1,15 @@
Changelog
---------
- 2.0.2 (UNRELEASED) - Drop support to python 3.4
Fix build problems on requests tests (thanks to @dunossauro)
- 2.0.1 - Fix bug when using vcrpy with python 3.4
- 2.0.0 - Support python 3.7 (fix httplib2 and urllib2, thanks @felixonmars)
[#356] Fixes `before_record_response` so the original response isn't changed (thanks @kgraves)
Fix requests stub when using proxy (thanks @samuelfekete @daneoshiga)
(only for aiohttp stub) Drop support to python 3.4 asyncio.coroutine (aiohttp doesn't support python it anymore)
Fix aiohttp stub to work with aiohttp client (thanks @stj)
Fix aiohttp stub to accept content type passed
Improve docs (thanks @adamchainz)
- 1.13.0 - Fix support to latest aiohttp version (3.3.2). Fix content-type bug in aiohttp stub. Save URL with query params properly when using aiohttp.
- 1.12.0 - Fix support to latest aiohttp version (3.2.1), Adapted setup to PEP508, Support binary responses on aiohttp, Dropped support for EOL python versions (2.6 and 3.3)
- 1.11.1 Fix compatibility with newest requests and urllib3 releases

View File

@@ -11,7 +11,10 @@ yourself using `py.test <http://pytest.org/>`__ and
all environments VCR.py supports. The test suite is pretty big and slow,
but you can tell tox to only run specific tests like this::
tox -e py27requests -- -v -k "'test_status_code or test_gzip'"
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through>
tox -e py27-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py37-requests -- -v --last-failed
This will run only tests that look like ``test_status_code`` or
``test_gzip`` in the test suite, and only in the python 2.7 environment
@@ -23,3 +26,24 @@ documentation <https://boto.readthedocs.io/en/latest/getting_started.html>`__
for how to set this up. I have marked the boto tests as optional in
Travis so you don't have to worry about them failing if you submit a
pull request.
Troubleshooting on MacOSX
-------------------------
If you have this kind of error when running tox :
.. code:: python
__main__.ConfigurationError: Curl is configured to use SSL, but we have
not been able to determine which SSL backend it is using. Please see PycURL documentation for how to specify the SSL backend manually.
Then you need to define some environment variables:
.. code:: bash
export PYCURL_SSL_LIBRARY=openssl
export LDFLAGS=-L/usr/local/opt/openssl/lib
export CPPFLAGS=-I/usr/local/opt/openssl/include
Reference : `stackoverflow issue <https://stackoverflow.com/questions/51019622/curl-is-configured-to-use-ssl-but-we-have-not-been-able-to-determine-which-ssl>`__

View File

@@ -9,18 +9,20 @@ with pip::
Compatibility
-------------
VCR.py supports Python 2.7 and 3.4+, and
VCR.py supports Python 2.7 and 3.5+, and
`pypy <http://pypy.org>`__.
The following http libraries are supported:
The following HTTP libraries are supported:
- urllib2
- urllib3
- http.client (python3)
- requests (both 1.x and 2.x versions)
- httplib2
- boto
- Tornado's AsyncHTTPClient
- ``aiohttp``
- ``boto``
- ``boto3``
- ``http.client``
- ``httplib2``
- ``requests`` (both 1.x and 2.x versions)
- ``tornado.httpclient``
- ``urllib2``
- ``urllib3``
Speed
-----

View File

@@ -11,7 +11,7 @@ Usage
assert 'Example domains' in response
Run this test once, and VCR.py will record the HTTP request to
``fixtures/vcr_cassettes/synopsis.yml``. Run it again, and VCR.py will
``fixtures/vcr_cassettes/synopsis.yaml``. Run it again, and VCR.py will
replay the response from iana.org when the http request is made. This
test is now fast (no real HTTP requests are made anymore), deterministic
(the test will continue to pass, even if you are offline, or iana.org
@@ -95,3 +95,9 @@ Unittest Integration
While it's possible to use the context manager or decorator forms with unittest,
there's also a ``VCRTestCase`` provided separately by `vcrpy-unittest
<https://github.com/agriffis/vcrpy-unittest>`__.
Pytest Integration
------------------
A Pytest plugin is available here : `pytest-vcr
<https://github.com/ktosiek/pytest-vcr>`__.

View File

@@ -1,3 +1,7 @@
#!/bin/bash
# https://blog.ionelmc.ro/2015/04/14/tox-tricks-and-patterns/#when-it-inevitably-leads-to-shell-scripts
# If you are getting an INVOCATION ERROR for this script then there is
# a good chance you are running on Windows.
# You can and should use WSL for running tox on Windows when it calls bash scripts.
REQUESTS_CA_BUNDLE=`python -m pytest_httpbin.certs` py.test $*

View File

@@ -28,7 +28,7 @@ install_requires = [
'six>=1.5',
'contextlib2; python_version=="2.7"',
'mock; python_version=="2.7"',
'yarl; python_version>="3.4"',
'yarl; python_version>"3.5"',
]
excluded_packages = ["tests*"]
@@ -37,7 +37,7 @@ if sys.version_info[0] == 2:
setup(
name='vcrpy',
version='1.13.0',
version='2.0.1',
description=(
"Automatically mock your HTTP interactions to simplify and "
"speed up testing"
@@ -47,7 +47,7 @@ setup(
author_email='me@kevinmccarthy.org',
url='https://github.com/kevin1024/vcrpy',
packages=find_packages(exclude=excluded_packages),
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*',
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*',
install_requires=install_requires,
license='MIT',
tests_require=['pytest', 'mock', 'pytest-httpbin'],
@@ -59,9 +59,9 @@ setup(
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Software Development :: Testing',

View File

@@ -2,23 +2,42 @@
import asyncio
import aiohttp
from aiohttp.test_utils import TestClient
@asyncio.coroutine
def aiohttp_request(loop, method, url, output='text', encoding='utf-8', content_type=None, **kwargs):
async def aiohttp_request(loop, method, url, output='text', encoding='utf-8', content_type=None, **kwargs):
session = aiohttp.ClientSession(loop=loop)
response_ctx = session.request(method, url, **kwargs)
response = yield from response_ctx.__aenter__()
response = await response_ctx.__aenter__()
if output == 'text':
content = yield from response.text()
content = await response.text()
elif output == 'json':
content_type = content_type or 'application/json'
content = yield from response.json(encoding=encoding, content_type=content_type)
content = await response.json(encoding=encoding, content_type=content_type)
elif output == 'raw':
content = yield from response.read()
content = await response.read()
elif output == 'stream':
content = await response.content.read()
response_ctx._resp.close()
yield from session.close()
await session.close()
return response, content
def aiohttp_app():
async def hello(request):
return aiohttp.web.Response(text='hello')
async def json(request):
return aiohttp.web.json_response({})
async def json_empty_body(request):
return aiohttp.web.json_response()
app = aiohttp.web.Application()
app.router.add_get('/', hello)
app.router.add_get('/json', json)
app.router.add_get('/json/empty', json_empty_body)
return app

View File

@@ -5,7 +5,7 @@ asyncio = pytest.importorskip("asyncio")
aiohttp = pytest.importorskip("aiohttp")
import vcr # noqa: E402
from .aiohttp_utils import aiohttp_request # noqa: E402
from .aiohttp_utils import aiohttp_app, aiohttp_request # noqa: E402
def run_in_loop(fn):
@@ -93,6 +93,18 @@ def test_binary(tmpdir, scheme):
assert cassette.play_count == 1
def test_stream(tmpdir, scheme):
url = scheme + '://httpbin.org/get'
with vcr.use_cassette(str(tmpdir.join('stream.yaml'))):
resp, body = get(url, output='raw') # Do not use stream here, as the stream is exhausted by vcr
with vcr.use_cassette(str(tmpdir.join('stream.yaml'))) as cassette:
cassette_resp, cassette_body = get(url, output='stream')
assert cassette_body == body
assert cassette.play_count == 1
def test_post(tmpdir, scheme):
data = {'key1': 'value1', 'key2': 'value2'}
url = scheme + '://httpbin.org/post'
@@ -154,3 +166,51 @@ def test_params_on_url(tmpdir, scheme):
assert request.url == url
assert cassette_response_json == response_json
assert cassette.play_count == 1
def test_aiohttp_test_client(aiohttp_client, tmpdir):
loop = asyncio.get_event_loop()
app = aiohttp_app()
url = '/'
client = loop.run_until_complete(aiohttp_client(app))
with vcr.use_cassette(str(tmpdir.join('get.yaml'))):
response = loop.run_until_complete(client.get(url))
assert response.status == 200
response_text = loop.run_until_complete(response.text())
assert response_text == 'hello'
response_text = loop.run_until_complete(response.text(errors='replace'))
assert response_text == 'hello'
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
response = loop.run_until_complete(client.get(url))
request = cassette.requests[0]
assert request.url == str(client.make_url(url))
response_text = loop.run_until_complete(response.text())
assert response_text == 'hello'
assert cassette.play_count == 1
def test_aiohttp_test_client_json(aiohttp_client, tmpdir):
loop = asyncio.get_event_loop()
app = aiohttp_app()
url = '/json/empty'
client = loop.run_until_complete(aiohttp_client(app))
with vcr.use_cassette(str(tmpdir.join('get.yaml'))):
response = loop.run_until_complete(client.get(url))
assert response.status == 200
response_json = loop.run_until_complete(response.json())
assert response_json is None
with vcr.use_cassette(str(tmpdir.join('get.yaml'))) as cassette:
response = loop.run_until_complete(client.get(url))
request = cassette.requests[0]
assert request.url == str(client.make_url(url))
response_json = loop.run_until_complete(response.json())
assert response_json is None
assert cassette.play_count == 1

View File

@@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
'''Integration tests with httplib2'''
# External imports
import sys
from six.moves.urllib_parse import urlencode
import pytest
import pytest_httpbin.certs
# Internal imports
import vcr
from assertions import assert_cassette_has_one_response
@@ -19,7 +19,12 @@ def http():
Returns an httplib2 HTTP instance
with the certificate replaced by the httpbin one.
"""
return httplib2.Http(ca_certs=pytest_httpbin.certs.where())
kwargs = {
'ca_certs': pytest_httpbin.certs.where()
}
if sys.version_info[:2] in [(2, 7), (3, 7)]:
kwargs['disable_ssl_certificate_validation'] = True
return httplib2.Http(**kwargs)
def test_response_code(tmpdir, httpbin_both):

View File

@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
'''Test using a proxy.'''
# External imports
import multiprocessing
import pytest
from six.moves import socketserver, SimpleHTTPServer
from six.moves.urllib.request import urlopen
# Internal imports
import vcr
# Conditional imports
requests = pytest.importorskip("requests")
class Proxy(SimpleHTTPServer.SimpleHTTPRequestHandler):
'''
Simple proxy server.
(Inspired by: http://effbot.org/librarybook/simplehttpserver.htm).
'''
def do_GET(self):
upstream_response = urlopen(self.path)
try:
status = upstream_response.status
headers = upstream_response.headers.items()
except AttributeError:
# In Python 2 the response is an addinfourl instance.
status = upstream_response.code
headers = upstream_response.info().items()
self.send_response(status, upstream_response.msg)
for header in headers:
self.send_header(*header)
self.end_headers()
self.copyfile(upstream_response, self.wfile)
@pytest.yield_fixture(scope='session')
def proxy_server():
httpd = socketserver.ThreadingTCPServer(('', 0), Proxy)
proxy_process = multiprocessing.Process(
target=httpd.serve_forever,
)
proxy_process.start()
yield 'http://{}:{}'.format(*httpd.server_address)
proxy_process.terminate()
def test_use_proxy(tmpdir, httpbin, proxy_server):
'''Ensure that it works with a proxy.'''
with vcr.use_cassette(str(tmpdir.join('proxy.yaml'))):
response = requests.get(httpbin.url, proxies={'http': proxy_server})
with vcr.use_cassette(str(tmpdir.join('proxy.yaml'))) as cassette:
cassette_response = requests.get(httpbin.url, proxies={'http': proxy_server})
assert cassette_response.headers == response.headers
assert cassette.play_count == 1

View File

@@ -116,10 +116,10 @@ def test_post_chunked_binary(tmpdir, httpbin):
assert req1 == req2
@pytest.mark.xskip('sys.version_info >= (3, 6)', strict=True, raises=ConnectionError)
@pytest.mark.xskip((3, 5) < sys.version_info < (3, 6) and
platform.python_implementation() == 'CPython',
reason='Fails on CPython 3.5')
@pytest.mark.skipif('sys.version_info >= (3, 6)', strict=True, raises=ConnectionError)
@pytest.mark.skipif((3, 5) < sys.version_info < (3, 6) and
platform.python_implementation() == 'CPython',
reason='Fails on CPython 3.5')
def test_post_chunked_binary_secure(tmpdir, httpbin_secure):
'''Ensure that we can send chunked binary without breaking while trying to concatenate bytes with str.'''
data1 = iter([b'data', b'to', b'send'])

View File

@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
'''Integration tests with urllib2'''
import ssl
from six.moves.urllib.request import urlopen
from six.moves.urllib_parse import urlencode
import pytest_httpbin.certs
@@ -12,7 +13,9 @@ from assertions import assert_cassette_has_one_response
def urlopen_with_cafile(*args, **kwargs):
kwargs['cafile'] = pytest_httpbin.certs.where()
context = ssl.create_default_context(cafile=pytest_httpbin.certs.where())
context.check_hostname = False
kwargs['context'] = context
try:
return urlopen(*args, **kwargs)
except TypeError:

View File

@@ -20,7 +20,7 @@ def verify_pool_mgr():
@pytest.fixture(scope='module')
def pool_mgr():
return urllib3.PoolManager()
return urllib3.PoolManager(cert_reqs='CERT_NONE')
def test_status_code(httpbin_both, tmpdir, verify_pool_mgr):

View File

@@ -59,7 +59,7 @@ def test_flickr_multipart_upload(httpbin, tmpdir):
def test_flickr_should_respond_with_200(tmpdir):
testfile = str(tmpdir.join('flickr.yml'))
with vcr.use_cassette(testfile):
r = requests.post("http://api.flickr.com/services/upload")
r = requests.post("https://api.flickr.com/services/upload", verify=False)
assert r.status_code == 200

View File

@@ -133,6 +133,17 @@ def test_cassette_all_played():
assert a.all_played
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
def test_cassette_rewound():
a = Cassette('test')
a.append('foo', 'bar')
a.play_response('foo')
assert a.all_played
a.rewind()
assert not a.all_played
def test_before_record_response():
before_record_response = mock.Mock(return_value='mutated')
cassette = Cassette('test', before_record_response=before_record_response)
@@ -306,3 +317,51 @@ def test_use_as_decorator_on_generator():
yield 2
assert list(test_function()) == [1, 2]
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_one_similar_request(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
(["method"], [("path", "failed : path"), ("query", "failed : query")]),
(["method", "path"], [("query", "failed : query")]),
([], [("method", "failed : method"), ("path", "failed : path"), ("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == [(2, ["method", "path"], [("query", "failed : query")])]
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_no_similar_requests(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
([], [("path", "failed : path"), ("query", "failed : query")]),
([], [("path", "failed : path"), ("query", "failed : query")]),
([], [("path", "failed : path"), ("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == []
@mock.patch("vcr.cassette.get_matchers_results")
def test_find_requests_with_most_matches_many_similar_requests(mock_get_matchers_results):
mock_get_matchers_results.side_effect = [
(["method", "path"], [("query", "failed : query")]),
(["method"], [("path", "failed : path"), ("query", "failed : query")]),
(["method", "path"], [("query", "failed : query")]),
]
cassette = Cassette("test")
for request in range(1, 4):
cassette.append(request, 'response')
result = cassette.find_requests_with_most_matches("fake request")
assert result == [
(1, ["method", "path"], [("query", "failed : query")]),
(3, ["method", "path"], [("query", "failed : query")])
]

View File

@@ -1,4 +1,5 @@
import itertools
from vcr.compat import mock
import pytest
@@ -21,20 +22,22 @@ REQUESTS = {
def assert_matcher(matcher_name):
matcher = getattr(matchers, matcher_name)
for k1, k2 in itertools.permutations(REQUESTS, 2):
matched = matcher(REQUESTS[k1], REQUESTS[k2])
if matcher_name in {k1, k2}:
assert not matched
expecting_assertion_error = matcher_name in {k1, k2}
if expecting_assertion_error:
with pytest.raises(AssertionError):
matcher(REQUESTS[k1], REQUESTS[k2])
else:
assert matched
assert matcher(REQUESTS[k1], REQUESTS[k2]) is None
def test_uri_matcher():
for k1, k2 in itertools.permutations(REQUESTS, 2):
matched = matchers.uri(REQUESTS[k1], REQUESTS[k2])
if {k1, k2} != {'base', 'method'}:
assert not matched
expecting_assertion_error = {k1, k2} != {"base", "method"}
if expecting_assertion_error:
with pytest.raises(AssertionError):
matchers.uri(REQUESTS[k1], REQUESTS[k2])
else:
assert matched
assert matchers.uri(REQUESTS[k1], REQUESTS[k2]) is None
req1_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
@@ -107,7 +110,7 @@ req2_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
)
])
def test_body_matcher_does_match(r1, r2):
assert matchers.body(r1, r2)
assert matchers.body(r1, r2) is None
@pytest.mark.parametrize("r1, r2", [
@@ -135,25 +138,132 @@ def test_body_matcher_does_match(r1, r2):
)
])
def test_body_match_does_not_match(r1, r2):
assert not matchers.body(r1, r2)
with pytest.raises(AssertionError):
matchers.body(r1, r2)
def test_query_matcher():
req1 = request.Request('GET', 'http://host.com/?a=b&c=d', '', {})
req2 = request.Request('GET', 'http://host.com/?c=d&a=b', '', {})
assert matchers.query(req1, req2)
req1 = request.Request("GET", "http://host.com/?a=b&c=d", "", {})
req2 = request.Request("GET", "http://host.com/?c=d&a=b", "", {})
assert matchers.query(req1, req2) is None
req1 = request.Request('GET', 'http://host.com/?a=b&a=b&c=d', '', {})
req2 = request.Request('GET', 'http://host.com/?a=b&c=d&a=b', '', {})
req3 = request.Request('GET', 'http://host.com/?c=d&a=b&a=b', '', {})
assert matchers.query(req1, req2)
assert matchers.query(req1, req3)
req1 = request.Request("GET", "http://host.com/?a=b&a=b&c=d", "", {})
req2 = request.Request("GET", "http://host.com/?a=b&c=d&a=b", "", {})
req3 = request.Request("GET", "http://host.com/?c=d&a=b&a=b", "", {})
assert matchers.query(req1, req2) is None
assert matchers.query(req1, req3) is None
def test_metchers():
assert_matcher('method')
assert_matcher('scheme')
assert_matcher('host')
assert_matcher('port')
assert_matcher('path')
assert_matcher('query')
def test_matchers():
assert_matcher("method")
assert_matcher("scheme")
assert_matcher("host")
assert_matcher("port")
assert_matcher("path")
assert_matcher("query")
def test_evaluate_matcher_does_match():
def bool_matcher(r1, r2):
return True
def assertion_matcher(r1, r2):
assert 1 == 1
r1, r2 = None, None
for matcher in [bool_matcher, assertion_matcher]:
match, assertion_msg = matchers._evaluate_matcher(matcher, r1, r2)
assert match is True
assert assertion_msg is None
def test_evaluate_matcher_does_not_match():
def bool_matcher(r1, r2):
return False
def assertion_matcher(r1, r2):
# This is like the "assert" statement preventing pytest to recompile it
raise AssertionError()
r1, r2 = None, None
for matcher in [bool_matcher, assertion_matcher]:
match, assertion_msg = matchers._evaluate_matcher(matcher, r1, r2)
assert match is False
assert not assertion_msg
def test_evaluate_matcher_does_not_match_with_assert_message():
def assertion_matcher(r1, r2):
# This is like the "assert" statement preventing pytest to recompile it
raise AssertionError("Failing matcher")
r1, r2 = None, None
match, assertion_msg = matchers._evaluate_matcher(assertion_matcher, r1, r2)
assert match is False
assert assertion_msg == "Failing matcher"
def test_get_assertion_message():
assert matchers.get_assertion_message(None) == ""
assert matchers.get_assertion_message("") == ""
def test_get_assertion_message_with_details():
assertion_msg = "q1=1 != q2=1"
expected = (
"--------------- DETAILS ---------------\n"
"{}\n"
"----------------------------------------\n".format(assertion_msg)
)
assert matchers.get_assertion_message(assertion_msg) == expected
@pytest.mark.parametrize(
"r1, r2, expected_successes, expected_failures",
[
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("GET", "http://host.com/p?a=b", "", {}),
["method", "path"],
[],
),
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("POST", "http://host.com/p?a=b", "", {}),
["path"],
["method"],
),
(
request.Request("GET", "http://host.com/p?a=b", "", {}),
request.Request("POST", "http://host.com/path?a=b", "", {}),
[],
["method", "path"],
),
],
)
def test_get_matchers_results(r1, r2, expected_successes, expected_failures):
successes, failures = matchers.get_matchers_results(
r1, r2, [matchers.method, matchers.path]
)
assert successes == expected_successes
assert len(failures) == len(expected_failures)
for i, expected_failure in enumerate(expected_failures):
assert failures[i][0] == expected_failure
assert failures[i][1] is not None
@mock.patch("vcr.matchers.get_matchers_results")
@pytest.mark.parametrize(
"successes, failures, expected_match",
[
(["method", "path"], [], True),
(["method"], ["path"], False),
([], ["method", "path"], False),
],
)
def test_requests_match(mock_get_matchers_results, successes, failures, expected_match):
mock_get_matchers_results.return_value = (successes, failures)
r1 = request.Request("GET", "http://host.com/p?a=b", "", {})
r2 = request.Request("GET", "http://host.com/p?a=b", "", {})
match = matchers.requests_match(r1, r2, [matchers.method, matchers.path])
assert match is expected_match

View File

@@ -5,6 +5,12 @@ import yaml
import vcr.migration
# Use the libYAML versions if possible
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def test_try_migrate_with_json(tmpdir):
cassette = tmpdir.join('cassette.json').strpath
@@ -22,9 +28,9 @@ def test_try_migrate_with_yaml(tmpdir):
shutil.copy('tests/fixtures/migration/old_cassette.yaml', cassette)
assert vcr.migration.try_migrate(cassette)
with open('tests/fixtures/migration/new_cassette.yaml', 'r') as f:
expected_yaml = yaml.load(f)
expected_yaml = yaml.load(f, Loader=Loader)
with open(cassette, 'r') as f:
actual_yaml = yaml.load(f)
actual_yaml = yaml.load(f, Loader=Loader)
assert actual_yaml == expected_yaml

View File

@@ -3,9 +3,13 @@ import pytest
from vcr.request import Request, HeadersDict
def test_str():
req = Request('GET', 'http://www.google.com/', '', {})
assert str(req) == '<Request (GET) http://www.google.com/>'
@pytest.mark.parametrize("method, uri, expected_str", [
('GET', 'http://www.google.com/', '<Request (GET) http://www.google.com/>'),
('OPTIONS', '*', '<Request (OPTIONS) *>'),
('CONNECT', 'host.some.where:1234', '<Request (CONNECT) host.some.where:1234>')
])
def test_str(method, uri, expected_str):
assert str(Request(method, uri, '', {})) == expected_str
def test_headers():
@@ -29,18 +33,21 @@ def test_add_header_deprecated():
('https://go.com/', 443),
('https://go.com:443/', 443),
('https://go.com:3000/', 3000),
('*', None)
])
def test_port(uri, expected_port):
req = Request('GET', uri, '', {})
assert req.port == expected_port
def test_uri():
req = Request('GET', 'http://go.com/', '', {})
assert req.uri == 'http://go.com/'
req = Request('GET', 'http://go.com:80/', '', {})
assert req.uri == 'http://go.com:80/'
@pytest.mark.parametrize("method, uri", [
('GET', 'http://go.com/'),
('GET', 'http://go.com:80/'),
('CONNECT', 'localhost:1234'),
('OPTIONS', '*')
])
def test_uri(method, uri):
assert Request(method, uri, '', {}).uri == uri
def test_HeadersDict():

View File

@@ -1,4 +1,9 @@
# coding: UTF-8
import io
import unittest
import six
from vcr.stubs import VCRHTTPResponse
@@ -66,3 +71,52 @@ def test_response_headers_should_have_correct_values():
assert response.headers.get('content-length') == "10806"
assert response.headers.get('date') == "Fri, 24 Oct 2014 18:35:37 GMT"
@unittest.skipIf(six.PY2, "Regression test for Python3 only")
def test_response_parses_correctly_and_fp_attribute_error_is_not_thrown():
"""
Regression test for https://github.com/kevin1024/vcrpy/issues/440
:return:
"""
recorded_response = {
"status": {
"message": "OK",
"code": 200
},
"headers": {
"content-length": ["0"],
"server": ["gunicorn/18.0"],
"connection": ["Close"],
"access-control-allow-credentials": ["true"],
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
"access-control-allow-origin": ["*"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b"\nPMID- 19416910\nOWN - NLM\nSTAT- MEDLINE\nDA - 20090513\nDCOM- "
b"20090622\nLR - "
b"20141209\nIS - 1091-6490 (Electronic)\nIS - 0027-8424 (Linking)\nVI - "
b"106\nIP - "
b"19\nDP - 2009 May 12\nTI - Genetic dissection of histone deacetylase "
b"requirement in "
b"tumor cells.\nPG - 7751-5\nLID - 10.1073/pnas.0903139106 [doi]\nAB - "
b"Histone "
b"deacetylase inhibitors (HDACi) represent a new group of drugs currently\n "
b" being "
b"tested in a wide variety of clinical applications. They are especially\n "
b" effective "
b"in preclinical models of cancer where they show antiproliferative\n "
b"action in many "
b"different types of cancer cells. Recently, the first HDACi was\n "
b"approved for the "
b"treatment of cutaneous T cell lymphomas. Most HDACi currently in\n "
b"clinical "
}
}
vcr_response = VCRHTTPResponse(recorded_response)
handle = io.TextIOWrapper(io.BufferedReader(vcr_response), encoding='utf-8')
handle = iter(handle)
articles = [line for line in handle]
assert len(articles) > 1

View File

@@ -1,5 +1,5 @@
[tox]
envlist = {py27,py35,py36,pypy}-{flakes,requests27,httplib2,urllib3121,tornado4,boto3,aiohttp}
envlist = {py27,py35,py36,py37,pypy}-{flakes,requests,httplib2,urllib3121,tornado4,boto3},{py35,py36,py37}-{aiohttp}
[testenv:flakes]
skipsdist = True
@@ -13,12 +13,13 @@ deps = flake8
commands =
./runtests.sh {posargs}
deps =
Flask<1
Flask
mock
pytest
pytest-httpbin
PyYAML
requests27: requests==2.7.0
ipaddress
requests: requests>=2.22.0
httplib2: httplib2
urllib3121: urllib3==1.21.1
{py27,py35,py36,pypy}-tornado4: tornado>=4,<5
@@ -27,6 +28,7 @@ deps =
boto3: boto3
aiohttp: aiohttp
aiohttp: pytest-asyncio
aiohttp: pytest-aiohttp
[flake8]
max_line_length = 110

View File

@@ -1,7 +1,3 @@
import asyncio
@asyncio.coroutine
def handle_coroutine(vcr, fn):
async def handle_coroutine(vcr, fn): # noqa: E999
with vcr as cassette:
return (yield from fn(cassette)) # noqa: E999
return (await fn(cassette)) # noqa: E999

View File

@@ -8,7 +8,7 @@ import wrapt
from .compat import contextlib
from .errors import UnhandledHTTPRequestError
from .matchers import requests_match, uri, method
from .matchers import requests_match, uri, method, get_matchers_results
from .patch import CassettePatcherBuilder
from .serializers import yamlserializer
from .persisters.filesystem import FilesystemPersister
@@ -16,11 +16,13 @@ from .util import partition_dict
try:
from asyncio import iscoroutinefunction
from ._handle_coroutine import handle_coroutine
except ImportError:
def iscoroutinefunction(*args, **kwargs):
return False
if sys.version_info[:2] >= (3, 5):
from ._handle_coroutine import handle_coroutine
else:
def handle_coroutine(*args, **kwags):
raise NotImplementedError('Not implemented on Python 2')
@@ -136,7 +138,10 @@ class CassetteContextDecorator(object):
except Exception:
to_yield = coroutine.throw(*sys.exc_info())
else:
to_yield = coroutine.send(to_send)
try:
to_yield = coroutine.send(to_send)
except StopIteration:
break
def _handle_function(self, fn):
with self as cassette:
@@ -282,6 +287,41 @@ class Cassette(object):
% (self._path, request)
)
def rewind(self):
self.play_counts = collections.Counter()
def find_requests_with_most_matches(self, request):
"""
Get the most similar request(s) stored in the cassette
of a given request as a list of tuples like this:
- the request object
- the successful matchers as string
- the failed matchers and the related assertion message with the difference details as strings tuple
This is useful when a request failed to be found,
we can get the similar request(s) in order to know what have changed in the request parts.
"""
best_matches = []
request = self._before_record_request(request)
for index, (stored_request, response) in enumerate(self.data):
successes, fails = get_matchers_results(request, stored_request, self._match_on)
best_matches.append((len(successes), stored_request, successes, fails))
best_matches.sort(key=lambda t: t[0], reverse=True)
# Get the first best matches (multiple if equal matches)
final_best_matches = []
previous_nb_success = best_matches[0][0]
for best_match in best_matches:
nb_success = best_match[0]
# Do not keep matches that have 0 successes,
# it means that the request is totally different from
# the ones stored in the cassette
if nb_success < 1 or previous_nb_success != nb_success:
break
previous_nb_success = nb_success
final_best_matches.append(best_match[1:])
return final_best_matches
def _as_dict(self):
return {"requests": self.requests, "responses": self.responses}

View File

@@ -1,5 +1,8 @@
import copy
import collections
try:
from collections import abc as collections_abc # only works on python 3.3+
except ImportError:
import collections as collections_abc
import functools
import inspect
import os
@@ -175,7 +178,7 @@ class VCR(object):
if decode_compressed_response:
filter_functions.append(filters.decode_response)
if before_record_response:
if not isinstance(before_record_response, collections.Iterable):
if not isinstance(before_record_response, collections_abc.Iterable):
before_record_response = (before_record_response,)
filter_functions.extend(before_record_response)
@@ -241,7 +244,7 @@ class VCR(object):
filter_functions.append(self._build_ignore_hosts(hosts_to_ignore))
if before_record_request:
if not isinstance(before_record_request, collections.Iterable):
if not isinstance(before_record_request, collections_abc.Iterable):
before_record_request = (before_record_request,)
filter_functions.extend(before_record_request)

View File

@@ -1,5 +1,32 @@
class CannotOverwriteExistingCassetteException(Exception):
pass
def __init__(self, *args, **kwargs):
self.cassette = kwargs["cassette"]
self.failed_request = kwargs["failed_request"]
message = self._get_message(kwargs["cassette"], kwargs["failed_request"])
super(CannotOverwriteExistingCassetteException, self).__init__(message)
def _get_message(self, cassette, failed_request):
"""Get the final message related to the exception"""
# Get the similar requests in the cassette that
# have match the most with the request.
best_matches = cassette.find_requests_with_most_matches(failed_request)
# Build a comprehensible message to put in the exception.
best_matches_msg = ""
for best_match in best_matches:
request, _, failed_matchers_assertion_msgs = best_match
best_matches_msg += "Similar request found : (%r).\n" % request
for failed_matcher, assertion_msg in failed_matchers_assertion_msgs:
best_matches_msg += "Matcher failed : %s\n" "%s\n" % (
failed_matcher,
assertion_msg,
)
return (
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r).\n"
"No match for the request (%r) was found.\n"
"%s"
% (cassette._path, cassette.record_mode, failed_request, best_matches_msg)
)
class UnhandledHTTPRequestError(KeyError):

View File

@@ -8,35 +8,47 @@ log = logging.getLogger(__name__)
def method(r1, r2):
return r1.method == r2.method
assert r1.method == r2.method, "{} != {}".format(r1.method, r2.method)
def uri(r1, r2):
return r1.uri == r2.uri
assert r1.uri == r2.uri, "{} != {}".format(r1.uri, r2.uri)
def host(r1, r2):
return r1.host == r2.host
assert r1.host == r2.host, "{} != {}".format(r1.host, r2.host)
def scheme(r1, r2):
return r1.scheme == r2.scheme
assert r1.scheme == r2.scheme, "{} != {}".format(r1.scheme, r2.scheme)
def port(r1, r2):
return r1.port == r2.port
assert r1.port == r2.port, "{} != {}".format(r1.port, r2.port)
def path(r1, r2):
return r1.path == r2.path
assert r1.path == r2.path, "{} != {}".format(r1.path, r2.path)
def query(r1, r2):
return r1.query == r2.query
assert r1.query == r2.query, "{} != {}".format(r1.query, r2.query)
def raw_body(r1, r2):
return read_body(r1) == read_body(r2)
assert read_body(r1) == read_body(r2)
def body(r1, r2):
transformer = _get_transformer(r1)
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
assert transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):
assert r1.headers == r2.headers, "{} != {}".format(r1.headers, r2.headers)
def _header_checker(value, header='Content-Type'):
@@ -77,28 +89,67 @@ def _get_transformer(request):
return _identity
def body(r1, r2):
transformer = _get_transformer(r1)
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
return transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):
return r1.headers == r2.headers
def _log_matches(r1, r2, matches):
differences = [m for m in matches if not m[0]]
if differences:
log.debug(
"Requests {} and {} differ according to "
"the following matchers: {}".format(r1, r2, differences)
)
def requests_match(r1, r2, matchers):
matches = [(m(r1, r2), m) for m in matchers]
_log_matches(r1, r2, matches)
return all(m[0] for m in matches)
successes, failures = get_matchers_results(r1, r2, matchers)
if failures:
log.debug(
"Requests {} and {} differ.\n"
"Failure details:\n"
"{}".format(r1, r2, failures)
)
return len(failures) == 0
def _evaluate_matcher(matcher_function, *args):
"""
Evaluate the result of a given matcher as a boolean with an assertion error message if any.
It handles two types of matcher :
- a matcher returning a boolean value.
- a matcher that only makes an assert, returning None or raises an assertion error.
"""
assertion_message = None
try:
match = matcher_function(*args)
match = True if match is None else match
except AssertionError as e:
match = False
assertion_message = str(e)
return match, assertion_message
def get_matchers_results(r1, r2, matchers):
"""
Get the comparison results of two requests as two list.
The first returned list represents the matchers names that passed.
The second list is the failed matchers as a string with failed assertion details if any.
"""
matches_success, matches_fails = [], []
for m in matchers:
matcher_name = m.__name__
match, assertion_message = _evaluate_matcher(m, r1, r2)
if match:
matches_success.append(matcher_name)
else:
assertion_message = get_assertion_message(assertion_message)
matches_fails.append((matcher_name, assertion_message))
return matches_success, matches_fails
def get_assertion_message(assertion_details, **format_options):
"""
Get a detailed message about the failing matcher.
"""
msg = ""
if assertion_details:
separator = format_options.get("separator", "-")
title = format_options.get("title", " DETAILS ")
nb_separator = format_options.get("nb_separator", 40)
first_title_line = (
separator * ((nb_separator - len(title)) // 2)
+ title
+ separator * ((nb_separator - len(title)) // 2)
)
msg += "{}\n{}\n{}\n".format(
first_title_line, str(assertion_details), separator * nb_separator
)
return msg

View File

@@ -68,7 +68,7 @@ def _migrate(data):
for item in data:
req = item['request']
res = item['response']
uri = dict((k, req.pop(k)) for k in PARTS)
uri = {k: req.pop(k) for k in PARTS}
req['uri'] = build_uri(**uri)
# convert headers to dict of lists
headers = req['headers']
@@ -100,7 +100,7 @@ def migrate_json(in_fp, out_fp):
def _list_of_tuples_to_dict(fs):
return dict((k, v) for k, v in fs[0])
return {k: v for k, v in fs[0]}
def _already_migrated(data):
@@ -159,9 +159,9 @@ def main():
for (root, dirs, files) in os.walk(path)
for name in files)
for file_path in files:
migrated = try_migrate(file_path)
status = 'OK' if migrated else 'FAIL'
sys.stderr.write("[{}] {}\n".format(status, file_path))
migrated = try_migrate(file_path)
status = 'OK' if migrated else 'FAIL'
sys.stderr.write("[{}] {}\n".format(status, file_path))
sys.stderr.write("Done.\n")

View File

@@ -58,7 +58,10 @@ class Request(object):
parse_uri = urlparse(self.uri)
port = parse_uri.port
if port is None:
port = {'https': 443, 'http': 80}[parse_uri.scheme]
try:
port = {'https': 443, 'http': 80}[parse_uri.scheme]
except KeyError:
pass
return port
@property
@@ -91,7 +94,7 @@ class Request(object):
'method': self.method,
'uri': self.uri,
'body': self.body,
'headers': dict(((k, [v]) for k, v in self.headers.items())),
'headers': {k: [v] for k, v in self.headers.items()},
}
@classmethod
@@ -112,7 +115,7 @@ class HeadersDict(CaseInsensitiveDict):
In addition, some servers sometimes send the same header more than once,
and httplib *can* deal with this situation.
Futhermore, I wanted to keep the request and response cassette format as
Furthermore, I wanted to keep the request and response cassette format as
similar as possible.
For this reason, in cassettes I keep a dict with lists as keys, but once

View File

@@ -25,5 +25,5 @@ def serialize(cassette_dict):
original.end,
original.args[-1] + error_message
)
except TypeError as original: # py3
except TypeError: # py3
raise TypeError(error_message)

View File

@@ -18,7 +18,7 @@ log = logging.getLogger(__name__)
class VCRFakeSocket(object):
"""
A socket that doesn't do anything!
Used when playing back casssettes, when there
Used when playing back cassettes, when there
is no actual open socket.
"""
@@ -60,9 +60,10 @@ def serialize_headers(response):
class VCRHTTPResponse(HTTPResponse):
"""
Stub reponse class that gets returned instead of a HTTPResponse
Stub response class that gets returned instead of a HTTPResponse
"""
def __init__(self, recorded_response):
self.fp = None
self.recorded_response = recorded_response
self.reason = recorded_response['status']['message']
self.status = self.code = recorded_response['status']['code']
@@ -93,9 +94,30 @@ class VCRHTTPResponse(HTTPResponse):
def read(self, *args, **kwargs):
return self._content.read(*args, **kwargs)
def readall(self):
return self._content.readall()
def readinto(self, *args, **kwargs):
return self._content.readinto(*args, **kwargs)
def readline(self, *args, **kwargs):
return self._content.readline(*args, **kwargs)
def readlines(self, *args, **kwargs):
return self._content.readlines(*args, **kwargs)
def seekable(self):
return self._content.seekable()
def tell(self):
return self._content.tell()
def isatty(self):
return self._content.isatty()
def seek(self, *args, **kwargs):
return self._content.seek(*args, **kwargs)
def close(self):
self._closed = True
return True
@@ -121,6 +143,9 @@ class VCRHTTPResponse(HTTPResponse):
else:
return default
def readable(self):
return self._content.readable()
class VCRConnection(object):
# A reference to the cassette that's currently being patched in
@@ -136,6 +161,9 @@ class VCRConnection(object):
def _uri(self, url):
"""Returns request absolute URI"""
if url and not url.startswith('/'):
# Then this must be a proxy request.
return url
uri = "{}://{}{}{}".format(
self._protocol,
self.real_connection.host,
@@ -168,6 +196,8 @@ class VCRConnection(object):
# allows me to compare the entire length of the response to see if it
# exists in the cassette.
self._sock = VCRFakeSocket()
def putrequest(self, method, url, *args, **kwargs):
"""
httplib gives you more than one way to do it. This is a way
@@ -225,11 +255,8 @@ class VCRConnection(object):
self._vcr_request
):
raise CannotOverwriteExistingCassetteException(
"No match for the request (%r) was found. "
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (self._vcr_request, self.cassette._path,
self.cassette.record_mode)
cassette=self.cassette,
failed_request=self._vcr_request
)
# Otherwise, we should send the request, then get the response
@@ -291,11 +318,13 @@ class VCRConnection(object):
with force_reset():
return self.real_connection.connect(*args, **kwargs)
self._sock = VCRFakeSocket()
@property
def sock(self):
if self.real_connection.sock:
return self.real_connection.sock
return VCRFakeSocket()
return self._sock
@sock.setter
def sock(self, value):
@@ -313,6 +342,8 @@ class VCRConnection(object):
with force_reset():
self.real_connection = self._baseclass(*args, **kwargs)
self._sock = None
def __setattr__(self, name, value):
"""
We need to define this because any attributes that are set on the

View File

@@ -5,12 +5,16 @@ import asyncio
import functools
import json
from aiohttp import ClientResponse
from aiohttp import ClientResponse, streams
from yarl import URL
from vcr.request import Request
class MockStream(asyncio.StreamReader, streams.AsyncStreamReaderMixin):
pass
class MockClientResponse(ClientResponse):
def __init__(self, method, url):
super().__init__(
@@ -25,28 +29,33 @@ class MockClientResponse(ClientResponse):
session=None,
)
# TODO: get encoding from header
@asyncio.coroutine
def json(self, *, encoding='utf-8', loads=json.loads, **kwargs): # NOQA: E999
return loads(self._body.decode(encoding))
async def json(self, *, encoding='utf-8', loads=json.loads, **kwargs): # NOQA: E999
stripped = self._body.strip()
if not stripped:
return None
@asyncio.coroutine
def text(self, encoding='utf-8'):
return self._body.decode(encoding)
return loads(stripped.decode(encoding))
@asyncio.coroutine
def read(self):
async def text(self, encoding='utf-8', errors='strict'):
return self._body.decode(encoding, errors=errors)
async def read(self):
return self._body
@asyncio.coroutine
def release(self):
pass
@property
def content(self):
s = MockStream()
s.feed_data(self._body)
s.feed_eof()
return s
def vcr_request(cassette, real_request):
@functools.wraps(real_request)
@asyncio.coroutine
def new_request(self, method, url, **kwargs):
async def new_request(self, method, url, **kwargs):
headers = kwargs.get('headers')
headers = self._prepare_headers(headers)
data = kwargs.get('data')
@@ -82,7 +91,7 @@ def vcr_request(cassette, real_request):
response.close()
return response
response = yield from real_request(self, method, url, **kwargs) # NOQA: E999
response = await real_request(self, method, url, **kwargs) # NOQA: E999
vcr_response = {
'status': {
@@ -90,7 +99,7 @@ def vcr_request(cassette, real_request):
'message': response.reason,
},
'headers': dict(response.headers),
'body': {'string': (yield from response.read())}, # NOQA: E999
'body': {'string': (await response.read())}, # NOQA: E999
'url': response.url,
}
cassette.append(vcr_request, vcr_response)

View File

@@ -40,6 +40,7 @@ class VCRHTTPSConnectionWithTimeout(VCRHTTPSConnection,
'timeout',
'source_address',
'ca_certs',
'disable_ssl_certificate_validation',
}
unknown_keys = set(kwargs.keys()) - safe_keys
safe_kwargs = kwargs.copy()

View File

@@ -75,10 +75,8 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
request,
599,
error=CannotOverwriteExistingCassetteException(
"No match for the request (%r) was found. "
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (vcr_request, cassette._path, cassette.record_mode)
cassette=cassette,
failed_request=vcr_request
),
request_time=self.io_loop.time() - request.start_time,
)

View File

@@ -1,13 +1,17 @@
import collections
import types
try:
from collections.abc import Mapping, MutableMapping
except ImportError:
from collections import Mapping, MutableMapping
# Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py
class CaseInsensitiveDict(collections.MutableMapping):
class CaseInsensitiveDict(MutableMapping):
"""
A case-insensitive ``dict``-like object.
Implements all methods and operations of
``collections.MutableMapping`` as well as dict's ``copy``. Also
``collections.abc.MutableMapping`` as well as dict's ``copy``. Also
provides ``lower_items``.
All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``,
@@ -57,7 +61,7 @@ class CaseInsensitiveDict(collections.MutableMapping):
)
def __eq__(self, other):
if isinstance(other, collections.Mapping):
if isinstance(other, Mapping):
other = CaseInsensitiveDict(other)
else:
return NotImplemented
@@ -114,10 +118,10 @@ def auto_decorate(
)
def __new__(cls, name, bases, attributes_dict):
new_attributes_dict = dict(
(attribute, maybe_decorate(attribute, value))
new_attributes_dict = {
attribute: maybe_decorate(attribute, value)
for attribute, value in attributes_dict.items()
)
}
return super(DecorateAll, cls).__new__(
cls, name, bases, new_attributes_dict
)