1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

...

35 Commits

Author SHA1 Message Date
Ivan Malison
8b59d73f25 v1.7.0 2015-08-02 13:36:57 -07:00
Ivan 'Goat' Malison
eb394b90d9 Merge pull request #181 from coagulant/fix-readme-custom-response
Fix example for custom response filtering in docs
2015-08-01 06:06:34 -07:00
Ilya Baryshev
14931dd47a Fix example for custom response filtering in docs 2015-08-01 11:10:51 +03:00
Ivan Malison
89cdda86d1 fix generator test. 2015-07-30 14:22:16 -07:00
Ivan 'Goat' Malison
ad48d71897 Merge pull request #180 from abhinav/master
Fix exception catching in coroutines.
2015-07-30 14:21:44 -07:00
Abhinav Gupta
946ce17a97 Fix exception catching in coroutines. 2015-07-30 14:13:58 -07:00
Ivan Malison
4d438dac75 Fix tornado python3 tests. 2015-07-30 04:19:17 -07:00
Ivan Malison
a234ad6b12 Fix all the tests in python 3 2015-07-30 03:39:04 -07:00
Ivan Malison
1d000ac652 Fix all the failing tests 2015-07-30 02:08:42 -07:00
Ivan Malison
21c176ee1e Make cassette active for duration of coroutine/generator
Closes #177.
2015-07-30 01:47:29 -07:00
Ivan 'Goat' Malison
4fb5bef8e1 Merge pull request #179 from graingert/support-ancient-PyPA-tools
Support distribute, Fixes #178
2015-07-29 23:22:16 -07:00
Thomas Grainger
9717596e2c Support distribute, Fixes #178 2015-07-29 22:19:45 +01:00
Ivan 'Goat' Malison
1660cc3a9f Merge pull request #176 from charlax/patch-1
Make setup example pep8 compliant
2015-07-27 10:37:42 -07:00
Charles-Axel Dein
4beb023204 Make setup example pep8 compliant
Pretty minor doc change.
2015-07-27 18:22:51 +02:00
Ivan 'Goat' Malison
72eb5345d6 Merge pull request #175 from gward/issue163-v2
Fix for #163, take 2.
2015-07-26 02:40:27 -07:00
Greg Ward
fe7d193d1a Add several more test cases for issue #163. 2015-07-16 14:49:48 -04:00
Greg Ward
09b7ccf561 Ensure that request bodies are always bytes, not text (fixes #163).
It shouldn't matter whether the request body comes from a file or a
string, or whether it is passed to the Request constructor or assigned
later. It should always be stored internally as bytes.
2015-07-16 14:36:26 -04:00
Kevin McCarthy
a4a80b431b Merge pull request #173 from graingert/patch-2
Fix before_record_reponse doc
2015-07-16 07:28:25 -10:00
Thomas Grainger
025a3b422d Fix before_record_reponse doc 2015-07-16 15:13:19 +01:00
Kevin McCarthy
bb05b2fcf7 Merge pull request #172 from abhinav/patch-1
Add Tornado to list of supported libraries
2015-07-15 10:31:51 -10:00
Abhinav Gupta
f77ef81877 Add Tornado to list of supported libraries 2015-07-15 12:43:36 -07:00
Ivan Malison
80ece7750f v1.6.1 2015-07-15 00:25:56 -07:00
Ivan Malison
8a86d75dc5 Merge remote-tracking branch 'upstream/master' into improved_body_matcher 2015-07-15 00:10:37 -07:00
Ivan Malison
33a4fb98c6 Update unit tests for body matcher. Simplified logic. 2015-07-15 00:01:31 -07:00
Diaoul
a046697567 Add a read_body helper function 2015-07-15 01:16:10 +02:00
Diaoul
c0286dfd97 Add body matcher unit tests 2015-07-11 23:22:42 +02:00
Diaoul
cc9af1d5fb Use CaseInsensitiveDict in body matcher 2015-07-11 23:18:45 +02:00
Ivan 'Goat' Malison
5f8407a8a1 Merge pull request #170 from graingert/manual-conditional-requirements-for-old-pip
Support conditional requirements in old versions of pip
2015-07-07 16:09:46 -07:00
Thomas Grainger
c789c82c1d Support conditional requirements in old versions of pip 2015-07-07 11:28:49 +01:00
Ivan 'Goat' Malison
16b5b77bcd Merge pull request #168 from graingert/patch-1
Fix RST parse errors generated by pandoc
2015-07-05 12:43:43 -07:00
Thomas Grainger
0a093786ed Fix RST parse errors generated by pandoc 2015-07-05 12:14:01 +01:00
Diaoul
3986caf182 Use Content-Type based approach for body matcher
When converting objects to body, dicts and sets order can change
resulting in a different but same body. This fixes the issue by
comparing the enclosed data in the body rather than the body itself
while still allowing raw body matching with the raw_body matcher.
2015-07-04 19:21:14 +02:00
Ivan 'Goat' Malison
cc6c26646c Merge pull request #165 from abhinav/master
[Tornado] Fix unsupported features exception not being raised.
2015-07-03 16:04:49 -07:00
Abhinav Gupta
3846a4ccef [Tornado] Fix unsupported features exception not being raised.
Add tests for that exception being raisd correctly and for
CannotOverwriteCassetteException.
2015-07-03 12:34:57 -07:00
Ivan Malison
aae4ae255b README spacing fix. 2015-07-03 10:26:52 -07:00
14 changed files with 628 additions and 55 deletions

View File

@@ -5,6 +5,7 @@ VCR.py
:alt: vcr.py
vcr.py
This is a Python version of `Ruby's VCR
library <https://github.com/vcr/vcr>`__.
@@ -49,6 +50,7 @@ The following http libraries are supported:
- requests (both 1.x and 2.x versions)
- httplib2
- boto
- Tornado's AsyncHTTPClient
Usage
-----
@@ -107,10 +109,10 @@ If you don't like VCR's defaults, you can set options by instantiating a
import vcr
my_vcr = vcr.VCR(
serializer = 'json',
cassette_library_dir = 'fixtures/cassettes',
record_mode = 'once',
match_on = ['uri', 'method'],
serializer='json',
cassette_library_dir='fixtures/cassettes',
record_mode='once',
match_on=['uri', 'method'],
)
with my_vcr.use_cassette('test.json'):
@@ -144,7 +146,9 @@ The following options are available :
- port (the port of the server receiving the request)
- path (the path of the request)
- query (the query string of the request)
- body (the entire request body)
- raw\_body (the entire request body as is)
- body (the entire request body unmarshalled by content-type
i.e. xmlrpc, json, form-urlencoded, falling back on raw\_body)
- headers (the headers of the request)
Backwards compatible matchers:
@@ -413,12 +417,13 @@ that of ``before_record``:
.. code:: python
def scrub_string(string, replacement=''):
def before_record_reponse(response):
return response['body']['string'] = response['body']['string'].replace(string, replacement)
return scrub_string
def before_record_response(response):
response['body']['string'] = response['body']['string'].replace(string, replacement)
return response
return before_record_response
my_vcr = vcr.VCR(
before_record=scrub_string(settings.USERNAME, 'username'),
before_record_response=scrub_string(settings.USERNAME, 'username'),
)
with my_vcr.use_cassette('test.yml'):
# your http code here
@@ -603,7 +608,15 @@ new API in version 1.0.x
Changelog
---------
- 1.6.0 [#120] Tornado support thanks (thanks @abhinav), [#147] packaging fixes
- 1.7.0 [#177] Properly support coroutine/generator decoration. [#178]
Support distribute (thanks @graingert). [#163] Make compatibility
between python2 and python3 recorded cassettes more robust (thanks
@gward).
- 1.6.1 [#169] Support conditional requirements in old versions of
pip, Fix RST parse errors generated by pandoc, [Tornado] Fix
unsupported features exception not being raised, [#166]
content-aware body matcher.
- 1.6.0 [#120] Tornado support (thanks @abhinav), [#147] packaging fixes
(thanks @graingert), [#158] allow filtering post params in requests
(thanks @MrJohz), [#140] add xmlrpclib support (thanks @Diaoul).
- 1.5.2 Fix crash when cassette path contains cassette library

View File

@@ -1,8 +1,11 @@
#!/usr/bin/env python
import sys
import logging
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
import pkg_resources
long_description = open('README.rst', 'r').read()
@@ -20,9 +23,33 @@ class PyTest(TestCommand):
sys.exit(errno)
install_requires=['PyYAML', 'wrapt', 'six>=1.5']
extras_require = {
':python_version in "2.4, 2.5, 2.6"':
['contextlib2', 'backport_collections', 'mock'],
':python_version in "2.7, 3.1, 3.2"': ['contextlib2', 'mock'],
}
try:
if 'bdist_wheel' not in sys.argv:
for key, value in extras_require.items():
if key.startswith(':') and pkg_resources.evaluate_marker(key[1:]):
install_requires.extend(value)
except Exception:
logging.getLogger(__name__).exception(
'Something went wrong calculating platform specific dependencies, so '
"you're getting them all!"
)
for key, value in extras_require.items():
if key.startswith(':'):
install_requires.extend(value)
setup(
name='vcrpy',
version='1.6.0',
version='1.7.0',
description=(
"Automatically mock your HTTP interactions to simplify and "
"speed up testing"
@@ -32,12 +59,8 @@ setup(
author_email='me@kevinmccarthy.org',
url='https://github.com/kevin1024/vcrpy',
packages=find_packages(exclude=("tests*",)),
install_requires=['PyYAML', 'wrapt', 'six>=1.5'],
extras_require = {
':python_version in "2.4, 2.5, 2.6"':
['contextlib2', 'backport_collections', 'mock'],
':python_version in "2.7, 3.1, 3.2"': ['contextlib2', 'mock'],
},
install_requires=install_requires,
extras_require=extras_require,
license='MIT',
tests_require=['pytest', 'mock', 'pytest-localserver'],
cmdclass={'test': PyTest},

View File

@@ -5,6 +5,7 @@ import json
import pytest
import vcr
from vcr.errors import CannotOverwriteExistingCassetteException
from assertions import assert_cassette_empty, assert_is_json
@@ -203,3 +204,97 @@ def test_https_with_cert_validation_disabled(get_client, tmpdir):
with vcr.use_cassette(cass_path) as cass:
yield get(get_client(), 'https://httpbin.org', validate_cert=False)
assert 1 == cass.play_count
@pytest.mark.gen_test
def test_unsupported_features_raises_in_future(get_client, tmpdir):
'''Ensure that the exception for an AsyncHTTPClient feature not being
supported is raised inside the future.'''
def callback(chunk):
assert False, "Did not expect to be called."
with vcr.use_cassette(str(tmpdir.join('invalid.yaml'))):
future = get(
get_client(), 'http://httpbin.org', streaming_callback=callback
)
with pytest.raises(Exception) as excinfo:
yield future
assert "not yet supported by VCR" in str(excinfo)
@pytest.mark.gen_test
def test_unsupported_features_raise_error_disabled(get_client, tmpdir):
'''Ensure that the exception for an AsyncHTTPClient feature not being
supported is not raised if raise_error=False.'''
def callback(chunk):
assert False, "Did not expect to be called."
with vcr.use_cassette(str(tmpdir.join('invalid.yaml'))):
response = yield get(
get_client(),
'http://httpbin.org',
streaming_callback=callback,
raise_error=False,
)
assert "not yet supported by VCR" in str(response.error)
@pytest.mark.gen_test
def test_cannot_overwrite_cassette_raises_in_future(get_client, tmpdir):
'''Ensure that CannotOverwriteExistingCassetteException is raised inside
the future.'''
with vcr.use_cassette(str(tmpdir.join('overwrite.yaml'))):
yield get(get_client(), 'http://httpbin.org/get')
with vcr.use_cassette(str(tmpdir.join('overwrite.yaml'))):
future = get(get_client(), 'http://httpbin.org/headers')
with pytest.raises(CannotOverwriteExistingCassetteException):
yield future
@pytest.mark.gen_test
def test_cannot_overwrite_cassette_raise_error_disabled(get_client, tmpdir):
'''Ensure that CannotOverwriteExistingCassetteException is not raised if
raise_error=False in the fetch() call.'''
with vcr.use_cassette(str(tmpdir.join('overwrite.yaml'))):
yield get(
get_client(), 'http://httpbin.org/get', raise_error=False
)
with vcr.use_cassette(str(tmpdir.join('overwrite.yaml'))):
response = yield get(
get_client(), 'http://httpbin.org/headers', raise_error=False
)
assert isinstance(response.error, CannotOverwriteExistingCassetteException)
@pytest.mark.gen_test
@vcr.use_cassette(path_transformer=vcr.default_vcr.ensure_suffix('.yaml'))
def test_tornado_with_decorator_use_cassette(get_client):
response = yield get_client().fetch(
http.HTTPRequest('http://www.google.com/', method='GET')
)
assert response.body.decode('utf-8') == "not actually google"
@pytest.mark.gen_test
@vcr.use_cassette(path_transformer=vcr.default_vcr.ensure_suffix('.yaml'))
def test_tornado_exception_can_be_caught(get_client):
try:
yield get(get_client(), 'http://httpbin.org/status/500')
except http.HTTPError as e:
assert e.code == 500
try:
yield get(get_client(), 'http://httpbin.org/status/404')
except http.HTTPError as e:
assert e.code == 404

View File

@@ -0,0 +1,62 @@
interactions:
- request:
body: null
headers: {}
method: GET
uri: http://httpbin.org/status/500
response:
body: {string: !!python/unicode ''}
headers:
- !!python/tuple
- Content-Length
- ['0']
- !!python/tuple
- Server
- [nginx]
- !!python/tuple
- Connection
- [close]
- !!python/tuple
- Access-Control-Allow-Credentials
- ['true']
- !!python/tuple
- Date
- ['Thu, 30 Jul 2015 17:32:39 GMT']
- !!python/tuple
- Access-Control-Allow-Origin
- ['*']
- !!python/tuple
- Content-Type
- [text/html; charset=utf-8]
status: {code: 500, message: INTERNAL SERVER ERROR}
- request:
body: null
headers: {}
method: GET
uri: http://httpbin.org/status/404
response:
body: {string: !!python/unicode ''}
headers:
- !!python/tuple
- Content-Length
- ['0']
- !!python/tuple
- Server
- [nginx]
- !!python/tuple
- Connection
- [close]
- !!python/tuple
- Access-Control-Allow-Credentials
- ['true']
- !!python/tuple
- Date
- ['Thu, 30 Jul 2015 17:32:39 GMT']
- !!python/tuple
- Access-Control-Allow-Origin
- ['*']
- !!python/tuple
- Content-Type
- [text/html; charset=utf-8]
status: {code: 404, message: NOT FOUND}
version: 1

View File

@@ -0,0 +1,53 @@
interactions:
- request:
body: null
headers: {}
method: GET
uri: http://www.google.com/
response:
body: {string: !!python/unicode 'not actually google'}
headers:
- !!python/tuple
- Expires
- ['-1']
- !!python/tuple
- Connection
- [close]
- !!python/tuple
- P3p
- ['CP="This is not a P3P policy! See http://www.google.com/support/accounts/bin/answer.py?hl=en&answer=151657
for more info."']
- !!python/tuple
- Alternate-Protocol
- ['80:quic,p=0']
- !!python/tuple
- Accept-Ranges
- [none]
- !!python/tuple
- X-Xss-Protection
- [1; mode=block]
- !!python/tuple
- Vary
- [Accept-Encoding]
- !!python/tuple
- Date
- ['Thu, 30 Jul 2015 08:41:40 GMT']
- !!python/tuple
- Cache-Control
- ['private, max-age=0']
- !!python/tuple
- Content-Type
- [text/html; charset=ISO-8859-1]
- !!python/tuple
- Set-Cookie
- ['PREF=ID=1111111111111111:FF=0:TM=1438245700:LM=1438245700:V=1:S=GAzVO0ALebSpC_cJ;
expires=Sat, 29-Jul-2017 08:41:40 GMT; path=/; domain=.google.com', 'NID=69=Br7oRAwgmKoK__HC6FEnuxglTFDmFxqP6Md63lKhzW1w6WkDbp3U90CDxnUKvDP6wJH8yxY5Lk5ZnFf66Q1B0d4OsYoKgq0vjfBAYXuCIAWtOuGZEOsFXanXs7pt2Mjx;
expires=Fri, 29-Jan-2016 08:41:40 GMT; path=/; domain=.google.com; HttpOnly']
- !!python/tuple
- X-Frame-Options
- [SAMEORIGIN]
- !!python/tuple
- Server
- [gws]
status: {code: 200, message: OK}
version: 1

View File

@@ -253,3 +253,37 @@ def test_func_path_generator():
def function_name(cassette):
assert cassette._path == os.path.join(os.path.dirname(__file__), 'function_name')
function_name()
def test_use_as_decorator_on_coroutine():
original_http_connetion = httplib.HTTPConnection
@Cassette.use(inject=True)
def test_function(cassette):
assert httplib.HTTPConnection.cassette is cassette
assert httplib.HTTPConnection is not original_http_connetion
value = yield 1
assert value == 1
assert httplib.HTTPConnection.cassette is cassette
assert httplib.HTTPConnection is not original_http_connetion
value = yield 2
assert value == 2
coroutine = test_function()
value = next(coroutine)
while True:
try:
value = coroutine.send(value)
except StopIteration:
break
def test_use_as_decorator_on_generator():
original_http_connetion = httplib.HTTPConnection
@Cassette.use(inject=True)
def test_function(cassette):
assert httplib.HTTPConnection.cassette is cassette
assert httplib.HTTPConnection is not original_http_connetion
yield 1
assert httplib.HTTPConnection.cassette is cassette
assert httplib.HTTPConnection is not original_http_connetion
yield 2
assert list(test_function()) == [1, 2]

View File

@@ -1,5 +1,7 @@
import itertools
import pytest
from vcr import matchers
from vcr import request
@@ -35,6 +37,107 @@ def test_uri_matcher():
assert matched
req1_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
b"<params><param><value><array><data><value><struct>"
b"<member><name>a</name><value><string>1</string></value></member>"
b"<member><name>b</name><value><string>2</string></value></member>"
b"</struct></value></data></array></value></param></params></methodCall>")
req2_body = (b"<?xml version='1.0'?><methodCall><methodName>test</methodName>"
b"<params><param><value><array><data><value><struct>"
b"<member><name>b</name><value><string>2</string></value></member>"
b"<member><name>a</name><value><string>1</string></value></member>"
b"</struct></value></data></array></value></param></params></methodCall>")
@pytest.mark.parametrize("r1, r2", [
(
request.Request('POST', 'http://host.com/', '123', {}),
request.Request('POST', 'http://another-host.com/',
'123', {'Some-Header': 'value'})
),
(
request.Request('POST', 'http://host.com/', 'a=1&b=2',
{'Content-Type': 'application/x-www-form-urlencoded'}),
request.Request('POST', 'http://host.com/', 'b=2&a=1',
{'Content-Type': 'application/x-www-form-urlencoded'})
),
(
request.Request('POST', 'http://host.com/', '123', {}),
request.Request('POST', 'http://another-host.com/', '123', {'Some-Header': 'value'})
),
(
request.Request(
'POST', 'http://host.com/', 'a=1&b=2',
{'Content-Type': 'application/x-www-form-urlencoded'}
),
request.Request(
'POST', 'http://host.com/', 'b=2&a=1',
{'Content-Type': 'application/x-www-form-urlencoded'}
)
),
(
request.Request(
'POST', 'http://host.com/', '{"a": 1, "b": 2}',
{'Content-Type': 'application/json'}
),
request.Request(
'POST', 'http://host.com/', '{"b": 2, "a": 1}',
{'content-type': 'application/json'}
)
),
(
request.Request(
'POST', 'http://host.com/', req1_body,
{'User-Agent': 'xmlrpclib', 'Content-Type': 'text/xml'}
),
request.Request(
'POST', 'http://host.com/', req2_body,
{'user-agent': 'somexmlrpc', 'content-type': 'text/xml'}
)
),
(
request.Request(
'POST', 'http://host.com/',
'{"a": 1, "b": 2}', {'Content-Type': 'application/json'}
),
request.Request(
'POST', 'http://host.com/',
'{"b": 2, "a": 1}', {'content-type': 'application/json'}
)
)
])
def test_body_matcher_does_match(r1, r2):
assert matchers.body(r1, r2)
@pytest.mark.parametrize("r1, r2", [
(
request.Request('POST', 'http://host.com/', '{"a": 1, "b": 2}', {}),
request.Request('POST', 'http://host.com/', '{"b": 2, "a": 1}', {}),
),
(
request.Request(
'POST', 'http://host.com/',
'{"a": 1, "b": 3}', {'Content-Type': 'application/json'}
),
request.Request(
'POST', 'http://host.com/',
'{"b": 2, "a": 1}', {'content-type': 'application/json'}
)
),
(
request.Request(
'POST', 'http://host.com/', req1_body, {'Content-Type': 'text/xml'}
),
request.Request(
'POST', 'http://host.com/', req2_body, {'content-type': 'text/xml'}
)
)
])
def test_body_match_does_not_match(r1, r2):
assert not matchers.body(r1, r2)
def test_query_matcher():
req1 = request.Request('GET', 'http://host.com/?a=b&c=d', '', {})
req2 = request.Request('GET', 'http://host.com/?c=d&a=b', '', {})

View File

@@ -1,3 +1,4 @@
# -*- encoding: utf-8 -*-
import pytest
from vcr.compat import mock
@@ -27,6 +28,55 @@ def test_deserialize_new_json_cassette():
deserialize(f.read(), jsonserializer)
REQBODY_TEMPLATE = u'''\
interactions:
- request:
body: {req_body}
headers:
Content-Type: [application/x-www-form-urlencoded]
Host: [httpbin.org]
method: POST
uri: http://httpbin.org/post
response:
body: {{string: ""}}
headers:
content-length: ['0']
content-type: [application/json]
status: {{code: 200, message: OK}}
'''
# A cassette generated under Python 2 stores the request body as a string,
# but the same cassette generated under Python 3 stores it as "!!binary".
# Make sure we accept both forms, regardless of whether we're running under
# Python 2 or 3.
@pytest.mark.parametrize("req_body, expect", [
# Cassette written under Python 2 (pure ASCII body)
('x=5&y=2', b'x=5&y=2'),
# Cassette written under Python 3 (pure ASCII body)
('!!binary |\n eD01Jnk9Mg==', b'x=5&y=2'),
# Request body has non-ASCII chars (x=föo&y=2), encoded in UTF-8.
('!!python/str "x=f\\xF6o&y=2"', b'x=f\xc3\xb6o&y=2'),
('!!binary |\n eD1mw7ZvJnk9Mg==', b'x=f\xc3\xb6o&y=2'),
# Same request body, this time encoded in UTF-16. In this case, we
# write the same YAML file under both Python 2 and 3, so there's only
# one test case here.
('!!binary |\n //54AD0AZgD2AG8AJgB5AD0AMgA=',
b'\xff\xfex\x00=\x00f\x00\xf6\x00o\x00&\x00y\x00=\x002\x00'),
# Same again, this time encoded in ISO-8859-1.
('!!binary |\n eD1m9m8meT0y', b'x=f\xf6o&y=2'),
])
def test_deserialize_py2py3_yaml_cassette(tmpdir, req_body, expect):
cfile = tmpdir.join('test_cassette.yaml')
cfile.write(REQBODY_TEMPLATE.format(req_body=req_body))
with open(str(cfile)) as f:
(requests, responses) = deserialize(f.read(), yamlserializer)
assert requests[0].body == expect
@mock.patch.object(jsonserializer.json, 'dumps',
side_effect=UnicodeDecodeError('utf-8', b'unicode error in serialization',
0, 10, 'blew up'))

View File

@@ -1,11 +1,9 @@
"""The container for recorded requests and responses"""
import functools
import sys
import inspect
import logging
import wrapt
# Internal imports
from .compat import contextlib, collections
from .errors import UnhandledHTTPRequestError
from .matchers import requests_match, uri, method
@@ -50,14 +48,6 @@ class CassetteContextDecorator(object):
# somewhere else.
cassette._save()
@classmethod
def key_predicate(cls, key, value):
return key in cls._non_cassette_arguments
@classmethod
def _split_keys(cls, kwargs):
return partition_dict(cls.key_predicate, kwargs)
def __enter__(self):
# This assertion is here to prevent the dangerous behavior
# that would result from forgetting about a __finish before
@@ -68,7 +58,10 @@ class CassetteContextDecorator(object):
# with context_decorator:
# pass
assert self.__finish is None, "Cassette already open."
other_kwargs, cassette_kwargs = self._split_keys(self._args_getter())
other_kwargs, cassette_kwargs = partition_dict(
lambda key, _: key in self._non_cassette_arguments,
self._args_getter()
)
if 'path_transformer' in other_kwargs:
transformer = other_kwargs['path_transformer']
cassette_kwargs['path'] = transformer(cassette_kwargs['path'])
@@ -84,27 +77,53 @@ class CassetteContextDecorator(object):
# This awkward cloning thing is done to ensure that decorated
# functions are reentrant. This is required for thread
# safety and the correct operation of recursive functions.
args_getter = self._build_args_getter_for_decorator(
function, self._args_getter
)
clone = type(self)(self.cls, args_getter)
with clone as cassette:
if cassette.inject:
return function(cassette, *args, **kwargs)
else:
return function(*args, **kwargs)
args_getter = self._build_args_getter_for_decorator(function)
return type(self)(self.cls, args_getter)._execute_function(function, args, kwargs)
def _execute_function(self, function, args, kwargs):
if inspect.isgeneratorfunction(function):
handler = self._handle_coroutine
else:
handler = self._handle_function
return handler(function, args, kwargs)
def _handle_coroutine(self, function, args, kwargs):
"""Wraps a coroutine so that we're inside the cassette context for the
duration of the coroutine.
"""
with self as cassette:
coroutine = self.__handle_function(cassette, function, args, kwargs)
# We don't need to catch StopIteration. The caller (Tornado's
# gen.coroutine, for example) will handle that.
to_yield = next(coroutine)
while True:
try:
to_send = yield to_yield
except Exception:
to_yield = coroutine.throw(*sys.exc_info())
else:
to_yield = coroutine.send(to_send)
def __handle_function(self, cassette, function, args, kwargs):
if cassette.inject:
return function(cassette, *args, **kwargs)
else:
return function(*args, **kwargs)
def _handle_function(self, function, args, kwargs):
with self as cassette:
self.__handle_function(cassette, function, args, kwargs)
@staticmethod
def get_function_name(function):
return function.__name__
@classmethod
def _build_args_getter_for_decorator(cls, function, args_getter):
def _build_args_getter_for_decorator(self, function):
def new_args_getter():
kwargs = args_getter()
kwargs = self._args_getter()
if 'path' not in kwargs:
name_generator = (kwargs.get('func_path_generator') or
cls.get_function_name)
self.get_function_name)
path = name_generator(function)
kwargs['path'] = path
return kwargs

View File

@@ -47,6 +47,7 @@ class VCR(object):
'path': matchers.path,
'query': matchers.query,
'headers': matchers.headers,
'raw_body': matchers.raw_body,
'body': matchers.body,
}
self.record_mode = record_mode
@@ -106,7 +107,7 @@ class VCR(object):
matcher_names = kwargs.get('match_on', self.match_on)
path_transformer = kwargs.get(
'path_transformer',
self.path_transformer
self.path_transformer or self.ensure_suffix('.yaml')
)
func_path_generator = kwargs.get(
'func_path_generator',

View File

@@ -1,3 +1,6 @@
import json
from six.moves import urllib, xmlrpc_client
from .util import CaseInsensitiveDict, read_body
import logging
log = logging.getLogger(__name__)
@@ -30,10 +33,50 @@ def query(r1, r2):
return r1.query == r2.query
def raw_body(r1, r2):
return read_body(r1) == read_body(r2)
def _header_checker(value, header='Content-Type'):
def checker(headers):
return value in headers.get(header, '').lower()
return checker
def _transform_json(body):
# Request body is always a byte string, but json.loads() wants a text
# string. RFC 7159 says the default encoding is UTF-8 (although UTF-16
# and UTF-32 are also allowed: hmmmmm).
return json.loads(body.decode('utf-8'))
_xml_header_checker = _header_checker('text/xml')
_xmlrpc_header_checker = _header_checker('xmlrpc', header='User-Agent')
_checker_transformer_pairs = (
(_header_checker('application/x-www-form-urlencoded'), urllib.parse.parse_qs),
(_header_checker('application/json'), _transform_json),
(lambda request: _xml_header_checker(request) and _xmlrpc_header_checker(request), xmlrpc_client.loads),
)
def _identity(x):
return x
def _get_transformer(request):
headers = CaseInsensitiveDict(request.headers)
for checker, transformer in _checker_transformer_pairs:
if checker(headers): return transformer
else:
return _identity
def body(r1, r2):
if hasattr(r1.body, 'read') and hasattr(r2.body, 'read'):
return r1.body.read() == r2.body.read()
return r1.body == r2.body
transformer = _get_transformer(r1)
r2_transformer = _get_transformer(r2)
if transformer != r2_transformer:
transformer = _identity
return transformer(read_body(r1)) == transformer(read_body(r2))
def headers(r1, r2):

View File

@@ -1,4 +1,4 @@
from six import BytesIO, binary_type
from six import BytesIO, text_type
from six.moves.urllib.parse import urlparse, parse_qsl
@@ -29,11 +29,9 @@ class Request(object):
self.uri = uri
self._was_file = hasattr(body, 'read')
if self._was_file:
self._body = body.read()
if not isinstance(self._body, binary_type):
self._body = self._body.encode('utf-8')
self.body = body.read()
else:
self._body = body
self.body = body
self.headers = {}
for key in headers:
self.add_header(key, headers[key])
@@ -44,6 +42,8 @@ class Request(object):
@body.setter
def body(self, value):
if isinstance(value, text_type):
value = value.encode('utf-8')
self._body = value
def add_header(self, key, value):

View File

@@ -65,6 +65,7 @@ class _VCRAsyncClient(object):
"request outside a VCR.py context." % repr(request)
),
)
return callback(response)
vcr_request = Request(
request.method,
@@ -90,7 +91,7 @@ class _VCRAsyncClient(object):
headers=headers,
buffer=BytesIO(vcr_response['body']['string']),
)
callback(response)
return callback(response)
else:
if self.cassette.write_protected and self.cassette.filter_request(
vcr_request
@@ -106,7 +107,7 @@ class _VCRAsyncClient(object):
self.cassette.record_mode)
),
)
callback(response)
return callback(response)
def new_callback(response):
headers = [
@@ -123,7 +124,7 @@ class _VCRAsyncClient(object):
'body': {'string': response.body},
}
self.cassette.append(vcr_request, vcr_response)
callback(response)
return callback(response)
from vcr.patch import force_reset
with force_reset():

View File

@@ -1,3 +1,74 @@
import collections
# Shamelessly stolen from https://github.com/kennethreitz/requests/blob/master/requests/structures.py
class CaseInsensitiveDict(collections.MutableMapping):
"""
A case-insensitive ``dict``-like object.
Implements all methods and operations of
``collections.MutableMapping`` as well as dict's ``copy``. Also
provides ``lower_items``.
All keys are expected to be strings. The structure remembers the
case of the last key to be set, and ``iter(instance)``,
``keys()``, ``items()``, ``iterkeys()``, and ``iteritems()``
will contain case-sensitive keys. However, querying and contains
testing is case insensitive::
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
cid['aCCEPT'] == 'application/json' # True
list(cid) == ['Accept'] # True
For example, ``headers['content-encoding']`` will return the
value of a ``'Content-Encoding'`` response header, regardless
of how the header name was originally stored.
If the constructor, ``.update``, or equality comparison
operations are given keys that have equal ``.lower()``s, the
behavior is undefined.
"""
def __init__(self, data=None, **kwargs):
self._store = dict()
if data is None:
data = {}
self.update(data, **kwargs)
def __setitem__(self, key, value):
# Use the lowercased key for lookups, but store the actual
# key alongside the value.
self._store[key.lower()] = (key, value)
def __getitem__(self, key):
return self._store[key.lower()][1]
def __delitem__(self, key):
del self._store[key.lower()]
def __iter__(self):
return (casedkey for casedkey, mappedvalue in self._store.values())
def __len__(self):
return len(self._store)
def lower_items(self):
"""Like iteritems(), but with all lowercase keys."""
return (
(lowerkey, keyval[1])
for (lowerkey, keyval)
in self._store.items()
)
def __eq__(self, other):
if isinstance(other, collections.Mapping):
other = CaseInsensitiveDict(other)
else:
return NotImplemented
# Compare insensitively
return dict(self.lower_items()) == dict(other.lower_items())
# Copy is required
def copy(self):
return CaseInsensitiveDict(self._store.values())
def __repr__(self):
return str(dict(self.items()))
def partition_dict(predicate, dictionary):
true_dict = {}
false_dict = {}
@@ -14,3 +85,8 @@ def compose(*functions):
res = function(res)
return res
return composed
def read_body(request):
if hasattr(request.body, 'read'):
return request.body.read()
return request.body