1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

...

21 Commits

Author SHA1 Message Date
Ivan Malison
8bb3c6beee v1.4.0 2015-04-02 12:20:16 -07:00
Ivan Malison
df3ad5f35c remove compat.py in favor of backport_collections. 2015-04-02 10:32:34 -07:00
Ivan Malison
e8a6a7a49f add backport_collections, tweaks to setup.py. 2015-04-02 10:23:22 -07:00
Ivan Malison
881138cb8d inject_cassette fallout. 2015-04-01 17:46:35 -07:00
Ivan Malison
639dba6f7a Write test for #145 that checks behavior of with_current_defaults. 2015-04-01 17:30:05 -07:00
Ivan Malison
b9bdc6401d inject_cassette kwarg. 2015-04-01 17:30:05 -07:00
Ivan Malison
3ca5529d26 Touch ups. 2015-04-01 15:38:59 -07:00
Sam Stavinoha
e3f2bc8369 fix with_current_defaults causing TypeError
The from_args() method in cassette.py was
throwing a TypeError when calling

    use_cassette(..., with_current_defaults=True)
    ...
    TypeError: from_args() takes exactly 3 arguments (4 given)

The path was then being passed to use() twice.
2015-03-31 21:08:26 -05:00
Edward Stone
fc4e985ee9 fallback to compat OrderedDict if collections.OrderedDict unavailable 2015-03-31 13:12:13 -07:00
Edward Stone
9038bc9066 fix docs for post data filter 2015-03-31 13:12:13 -07:00
Edward Stone
0def349420 Add ability to filter post data parameters 2015-03-31 13:12:13 -07:00
Ivan Malison
0dd7b05990 Get rid of all the constructor parameters that were removed in 0871c3b87c 2015-03-31 13:03:11 -07:00
Ivan Malison
630088599f Update copyright. 2015-03-31 13:02:59 -07:00
Ivan Malison
870ab276c4 Possible fix for #140. 2015-03-25 13:01:55 -07:00
Ivan 'Goat' Malison
779f3b0474 Merge pull request #141 from IvanMalison/post_files_through_requests
Add support for posting files through requests. closes #121
2015-03-24 16:43:13 -07:00
Ivan Malison
b948ed4857 Fix python3 support for requests file uploads. 2015-03-24 15:41:14 -07:00
Ivan Malison
c43e618635 Add mention of urllib3 support in readme. 2015-03-24 14:24:47 -07:00
Ivan Malison
5bd40a447a Add setter to body on vcr's request. 2015-03-24 14:11:16 -07:00
Ivan Malison
4b4be7f661 Don't use 2.7+ style ',' separated with. 2015-03-24 14:11:16 -07:00
Ivan Malison
6602a449b1 Add support for posting files through requests. closes #121. Possibly #134. 2015-03-24 14:11:16 -07:00
Ivan Malison
7cd7264034 fix tox/urllib3 stuff. 2015-03-24 14:10:14 -07:00
18 changed files with 209 additions and 502 deletions

View File

@@ -1,4 +1,4 @@
Copyright (c) 2012-2014 Kevin McCarthy
Copyright (c) 2012-2015 Kevin McCarthy
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

View File

@@ -24,6 +24,7 @@ VCR.py supports Python 2.6 and 2.7, 3.3, 3.4, and [pypy](http://pypy.org).
The following http libraries are supported:
* urllib2
* urllib3
* http.client (python3)
* requests (both 1.x and 2.x versions)
* httplib2
@@ -293,9 +294,18 @@ with my_vcr.use_cassette('test.yml', filter_query_parameters=['api_key']):
requests.get('http://api.com/getdata?api_key=secretstring')
```
### Filter information from HTTP post data
Use the `filter_post_data_parameters` configuration option with a list of post data
parameters to filter.
```python
with my_vcr.use_cassette('test.yml', filter_post_data_parameters=['client_secret']):
requests.post('http://api.com/postdata', data={'api_key': 'secretstring'})
```
### Custom Request filtering
If neither of these covers your request filtering needs, you can register a callback
If none of these covers your request filtering needs, you can register a callback
that will manipulate the HTTP request before adding it to the cassette. Use the
`before_record` configuration option to so this. Here is an example that will
never record requests to the /login endpoint.
@@ -474,6 +484,10 @@ API in version 1.0.x
## Changelog
* 1.4.0 Filter post data parameters (thanks @eadmundo), support for
posting files through requests, inject_cassette kwarg to access
cassette from `use_cassette` decorated function,
`with_current_defaults` actually works (thanks @samstav).
* 1.3.0 Fix/add support for urllib3 (thanks @aisch), fix default
port for https (thanks @abhinav).
* 1.2.0 Add custom_patches argument to VCR/Cassette objects to allow

View File

@@ -1,7 +1,7 @@
##!/usr/bin/env python
#!/usr/bin/env python
import sys
from setuptools import setup
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
@@ -20,7 +20,7 @@ class PyTest(TestCommand):
setup(
name='vcrpy',
version='1.3.0',
version='1.4.0',
description=(
"Automatically mock your HTTP interactions to simplify and "
"speed up testing"
@@ -28,20 +28,9 @@ setup(
author='Kevin McCarthy',
author_email='me@kevinmccarthy.org',
url='https://github.com/kevin1024/vcrpy',
packages=[
'vcr',
'vcr.stubs',
'vcr.compat',
'vcr.persisters',
'vcr.serializers',
],
package_dir={
'vcr': 'vcr',
'vcr.stubs': 'vcr/stubs',
'vcr.compat': 'vcr/compat',
'vcr.persisters': 'vcr/persisters',
},
install_requires=['PyYAML', 'mock', 'six', 'contextlib2', 'wrapt'],
packages=find_packages(exclude=("tests*",)),
install_requires=['PyYAML', 'mock', 'six', 'contextlib2',
'wrapt', 'backport_collections'],
license='MIT',
tests_require=['pytest', 'mock', 'pytest-localserver'],
cmdclass={'test': PyTest},
@@ -54,5 +43,5 @@ setup(
'Topic :: Software Development :: Testing',
'Topic :: Internet :: WWW/HTTP',
'License :: OSI Approved :: MIT License',
],
]
)

View File

@@ -1,6 +1,7 @@
import base64
import pytest
from six.moves.urllib.request import urlopen, Request
from six.moves.urllib.parse import urlencode
from six.moves.urllib.error import HTTPError
import vcr
@@ -55,6 +56,16 @@ def test_filter_querystring(tmpdir):
assert 'foo' not in cass.requests[0].url
def test_filter_post_data(tmpdir):
url = 'http://httpbin.org/post'
data = urlencode({'id': 'secret', 'foo': 'bar'}).encode('utf-8')
cass_file = str(tmpdir.join('filter_pd.yaml'))
with vcr.use_cassette(cass_file, filter_post_data_parameters=['id']):
urlopen(url, data)
with vcr.use_cassette(cass_file, filter_post_data_parameters=['id']) as cass:
assert b'id=secret' not in cass.requests[0].body
def test_filter_callback(tmpdir):
url = 'http://httpbin.org/get'
cass_file = str(tmpdir.join('basic_auth_filter.yaml'))

View File

@@ -199,3 +199,21 @@ def test_nested_cassettes_with_session_created_before_nesting(scheme, tmpdir):
# Make sure that the session can now get content normally.
session.get('http://www.reddit.com')
def test_post_file(tmpdir, scheme):
'''Ensure that we handle posting a file.'''
url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('post_file.yaml'))) as cass:
# Don't use 2.7+ only style ',' separated with here because we support python 2.6
with open('tox.ini') as f:
original_response = requests.post(url, f).content
# This also tests that we do the right thing with matching the body when they are files.
with vcr.use_cassette(str(tmpdir.join('post_file.yaml')),
match_on=('method', 'scheme', 'host', 'port', 'path', 'query', 'body')) as cass:
with open('tox.ini', 'rb') as f:
tox_content = f.read()
assert cass.requests[0].body.read() == tox_content
with open('tox.ini', 'rb') as f:
new_response = requests.post(url, f).content
assert original_response == new_response

View File

@@ -110,7 +110,7 @@ def test_arg_getter_functionality():
def function():
pass
with mock.patch.object(Cassette, 'load') as cassette_load:
with mock.patch.object(Cassette, 'load', return_value=mock.MagicMock(inject=False)) as cassette_load:
function()
cassette_load.assert_called_once_with(arg_getter.return_value[0],
**arg_getter.return_value[1])

View File

@@ -1,4 +1,8 @@
from vcr.filters import remove_headers, remove_query_parameters
from vcr.filters import (
remove_headers,
remove_query_parameters,
remove_post_data_parameters
)
from vcr.request import Request
@@ -35,3 +39,31 @@ def test_remove_nonexistent_query_parameters():
request = Request('GET', uri, '', {})
remove_query_parameters(request, ['w', 'q'])
assert request.uri == 'http://g.com/'
def test_remove_post_data_parameters():
body = b'id=secret&foo=bar'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b'foo=bar'
def test_preserve_multiple_post_data_parameters():
body = b'id=secret&foo=bar&foo=baz'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b'foo=bar&foo=baz'
def test_remove_all_post_data_parameters():
body = b'id=secret&foo=bar'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id', 'foo'])
assert request.body == b''
def test_remove_nonexistent_post_data_parameters():
body = b''
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b''

View File

@@ -9,7 +9,7 @@ from vcr.stubs import VCRHTTPSConnection
def test_vcr_use_cassette():
record_mode = mock.Mock()
test_vcr = VCR(record_mode=record_mode)
with mock.patch('vcr.cassette.Cassette.load') as mock_cassette_load:
with mock.patch('vcr.cassette.Cassette.load', return_value=mock.MagicMock(inject=False)) as mock_cassette_load:
@test_vcr.use_cassette('test')
def function():
pass
@@ -70,10 +70,11 @@ def test_fixtures_with_use_cassette(random_fixture):
# problems if the decorator does not preserve the signature of the original
# test function.
# This test ensures that use_cassette preserves the signature of the original
# test function, and thus that use_cassette is compatible with py.test
# fixtures. It is admittedly a bit strange because the test would never even
# run if the relevant feature were broken.
# This test ensures that use_cassette preserves the signature of
# the original test function, and thus that use_cassette is
# compatible with py.test fixtures. It is admittedly a bit strange
# because the test would never even run if the relevant feature
# were broken.
pass
@@ -90,3 +91,40 @@ def test_custom_patchers():
assert issubclass(Test.attribute, VCRHTTPSConnection)
assert VCRHTTPSConnection is not Test.attribute
assert Test.attribute is Test.attribute2
def test_inject_cassette():
vcr = VCR(inject_cassette=True)
@vcr.use_cassette('test', record_mode='once')
def with_cassette_injected(cassette):
assert cassette.record_mode == 'once'
@vcr.use_cassette('test', record_mode='once', inject_cassette=False)
def without_cassette_injected():
pass
with_cassette_injected()
without_cassette_injected()
def test_with_current_defaults():
vcr = VCR(inject_cassette=True, record_mode='once')
@vcr.use_cassette('test', with_current_defaults=False)
def changing_defaults(cassette, checks):
checks(cassette)
@vcr.use_cassette('test', with_current_defaults=True)
def current_defaults(cassette, checks):
checks(cassette)
def assert_record_mode_once(cassette):
assert cassette.record_mode == 'once'
def assert_record_mode_all(cassette):
assert cassette.record_mode == 'all'
changing_defaults(assert_record_mode_once)
current_defaults(assert_record_mode_once)
vcr.record_mode = 'all'
changing_defaults(assert_record_mode_all)
current_defaults(assert_record_mode_once)

View File

@@ -1,5 +1,5 @@
[tox]
envlist = {py26,py27,py33,py34,pypy}-{requests25,requests24,requests23,requests22,requests1,httplib2,urllib3,boto}
envlist = {py26,py27,py33,py34,pypy}-{requests25,requests24,requests23,requests22,requests1,httplib2,urllib317,urllib319,urllib3110,boto}
[testenv]
commands =

View File

@@ -6,7 +6,7 @@ import wrapt
try:
from collections import Counter
except ImportError:
from .compat.counter import Counter
from backport_collections import Counter
# Internal imports
from .patch import CassettePatcherBuilder
@@ -61,8 +61,11 @@ class CassetteContextDecorator(object):
@wrapt.decorator
def __call__(self, function, instance, args, kwargs):
with self:
return function(*args, **kwargs)
with self as cassette:
if cassette.inject:
return function(cassette, *args, **kwargs)
else:
return function(*args, **kwargs)
class Cassette(object):
@@ -84,23 +87,24 @@ class Cassette(object):
return CassetteContextDecorator.from_args(cls, *args, **kwargs)
def __init__(self, path, serializer=yamlserializer, record_mode='once',
match_on=(uri, method), filter_headers=(),
filter_query_parameters=(), before_record_request=None,
before_record_response=None, ignore_hosts=(),
ignore_localhost=(), custom_patches=()):
match_on=(uri, method), before_record_request=None,
before_record_response=None, custom_patches=(),
inject=False):
self._path = path
self._serializer = serializer
self._match_on = match_on
self._before_record_request = before_record_request or (lambda x: x)
self._before_record_response = before_record_response or (lambda x: x)
self.inject = inject
self.record_mode = record_mode
self.custom_patches = custom_patches
# self.data is the list of (req, resp) tuples
self.data = []
self.play_counts = Counter()
self.dirty = False
self.rewound = False
self.record_mode = record_mode
self.custom_patches = custom_patches
@property
def play_count(self):

View File

View File

@@ -1,194 +0,0 @@
from __future__ import print_function
from operator import itemgetter
from heapq import nlargest
from itertools import repeat, ifilter
# From http://code.activestate.com/recipes/576611-counter-class/
# Backported for python 2.6 support
class Counter(dict):
'''Dict subclass for counting hashable objects. Sometimes called a bag
or multiset. Elements are stored as dictionary keys and their counts
are stored as dictionary values.
>>> Counter('zyzygy')
Counter({'y': 3, 'z': 2, 'g': 1})
'''
def __init__(self, iterable=None, **kwds):
'''Create a new, empty Counter object. And if given, count elements
from an input iterable. Or, initialize the count from another mapping
of elements to their counts.
>>> c = Counter() # a new, empty counter
>>> c = Counter('gallahad') # a new counter from an iterable
>>> c = Counter({'a': 4, 'b': 2}) # a new counter from a mapping
>>> c = Counter(a=4, b=2) # a new counter from keyword args
'''
self.update(iterable, **kwds)
def __missing__(self, key):
return 0
def most_common(self, n=None):
'''List the n most common elements and their counts from the most
common to the least. If n is None, then list all element counts.
>>> Counter('abracadabra').most_common(3)
[('a', 5), ('r', 2), ('b', 2)]
'''
if n is None:
return sorted(self.iteritems(), key=itemgetter(1), reverse=True)
return nlargest(n, self.iteritems(), key=itemgetter(1))
def elements(self):
'''Iterator over elements repeating each as many times as its count.
>>> c = Counter('ABCABC')
>>> sorted(c.elements())
['A', 'A', 'B', 'B', 'C', 'C']
If an element's count has been set to zero or is a negative number,
elements() will ignore it.
'''
for elem, count in self.iteritems():
for _ in repeat(None, count):
yield elem
# Override dict methods where the meaning changes for Counter objects.
@classmethod
def fromkeys(cls, iterable, v=None):
raise NotImplementedError(
'Counter.fromkeys() is undefined. Use Counter(iterable) instead.')
def update(self, iterable=None, **kwds):
'''Like dict.update() but add counts instead of replacing them.
Source can be an iterable, a dictionary, or another Counter instance.
>>> c = Counter('which')
>>> c.update('witch') # add elements from another iterable
>>> d = Counter('watch')
>>> c.update(d) # add elements from another counter
>>> c['h'] # four 'h' in which, witch, and watch
4
'''
if iterable is not None:
if hasattr(iterable, 'iteritems'):
if self:
self_get = self.get
for elem, count in iterable.iteritems():
self[elem] = self_get(elem, 0) + count
else:
dict.update(self, iterable) # fast path when counter is empty
else:
self_get = self.get
for elem in iterable:
self[elem] = self_get(elem, 0) + 1
if kwds:
self.update(kwds)
def copy(self):
'Like dict.copy() but returns a Counter instance instead of a dict.'
return Counter(self)
def __delitem__(self, elem):
'Like dict.__delitem__() but does not raise KeyError for missing values.'
if elem in self:
dict.__delitem__(self, elem)
def __repr__(self):
if not self:
return '%s()' % self.__class__.__name__
items = ', '.join(map('%r: %r'.__mod__, self.most_common()))
return '%s({%s})' % (self.__class__.__name__, items)
# Multiset-style mathematical operations discussed in:
# Knuth TAOCP Volume II section 4.6.3 exercise 19
# and at http://en.wikipedia.org/wiki/Multiset
#
# Outputs guaranteed to only include positive counts.
#
# To strip negative and zero counts, add-in an empty counter:
# c += Counter()
def __add__(self, other):
'''Add counts from two counters.
>>> Counter('abbb') + Counter('bcc')
Counter({'b': 4, 'c': 2, 'a': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
result = Counter()
for elem in set(self) | set(other):
newcount = self[elem] + other[elem]
if newcount > 0:
result[elem] = newcount
return result
def __sub__(self, other):
''' Subtract count, but keep only results with positive counts.
>>> Counter('abbbc') - Counter('bccd')
Counter({'b': 2, 'a': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
result = Counter()
for elem in set(self) | set(other):
newcount = self[elem] - other[elem]
if newcount > 0:
result[elem] = newcount
return result
def __or__(self, other):
'''Union is the maximum of value in either of the input counters.
>>> Counter('abbb') | Counter('bcc')
Counter({'b': 3, 'c': 2, 'a': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_max = max
result = Counter()
for elem in set(self) | set(other):
newcount = _max(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
if __name__ == '__main__':
import doctest
print(doctest.testmod())

View File

@@ -1,258 +0,0 @@
# Backport of OrderedDict() class that runs on Python 2.4, 2.5, 2.6, 2.7 and pypy.
# Passes Python2.7's test suite and incorporates all the latest updates.
try:
from thread import get_ident as _get_ident
except ImportError:
from dummy_thread import get_ident as _get_ident
try:
from _abcoll import KeysView, ValuesView, ItemsView
except ImportError:
pass
class OrderedDict(dict):
'Dictionary that remembers insertion order'
# An inherited dict maps keys to values.
# The inherited dict provides __getitem__, __len__, __contains__, and get.
# The remaining methods are order-aware.
# Big-O running times for all methods are the same as for regular dictionaries.
# The internal self.__map dictionary maps keys to links in a doubly linked list.
# The circular doubly linked list starts and ends with a sentinel element.
# The sentinel element never gets deleted (this simplifies the algorithm).
# Each link is stored as a list of length three: [PREV, NEXT, KEY].
def __init__(self, *args, **kwds):
'''Initialize an ordered dictionary. Signature is the same as for
regular dictionaries, but keyword arguments are not recommended
because their insertion order is arbitrary.
'''
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__root
except AttributeError:
self.__root = root = [] # sentinel node
root[:] = [root, root, None]
self.__map = {}
self.__update(*args, **kwds)
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def __iter__(self):
'od.__iter__() <==> iter(od)'
root = self.__root
curr = root[1]
while curr is not root:
yield curr[2]
curr = curr[1]
def __reversed__(self):
'od.__reversed__() <==> reversed(od)'
root = self.__root
curr = root[0]
while curr is not root:
yield curr[2]
curr = curr[0]
def clear(self):
'od.clear() -> None. Remove all items from od.'
try:
for node in self.__map.itervalues():
del node[:]
root = self.__root
root[:] = [root, root, None]
self.__map.clear()
except AttributeError:
pass
dict.clear(self)
def popitem(self, last=True):
'''od.popitem() -> (k, v), return and remove a (key, value) pair.
Pairs are returned in LIFO order if last is true or FIFO order if false.
'''
if not self:
raise KeyError('dictionary is empty')
root = self.__root
if last:
link = root[0]
link_prev = link[0]
link_prev[1] = root
root[0] = link_prev
else:
link = root[1]
link_next = link[1]
root[1] = link_next
link_next[0] = root
key = link[2]
del self.__map[key]
value = dict.pop(self, key)
return key, value
# -- the following methods do not depend on the internal structure --
def keys(self):
'od.keys() -> list of keys in od'
return list(self)
def values(self):
'od.values() -> list of values in od'
return [self[key] for key in self]
def items(self):
'od.items() -> list of (key, value) pairs in od'
return [(key, self[key]) for key in self]
def iterkeys(self):
'od.iterkeys() -> an iterator over the keys in od'
return iter(self)
def itervalues(self):
'od.itervalues -> an iterator over the values in od'
for k in self:
yield self[k]
def iteritems(self):
'od.iteritems -> an iterator over the (key, value) items in od'
for k in self:
yield (k, self[k])
def update(*args, **kwds):
'''od.update(E, **F) -> None. Update od from dict/iterable E and F.
If E is a dict instance, does: for k in E: od[k] = E[k]
If E has a .keys() method, does: for k in E.keys(): od[k] = E[k]
Or if E is an iterable of items, does: for k, v in E: od[k] = v
In either case, this is followed by: for k, v in F.items(): od[k] = v
'''
if len(args) > 2:
raise TypeError('update() takes at most 2 positional '
'arguments (%d given)' % (len(args),))
elif not args:
raise TypeError('update() takes at least 1 argument (0 given)')
self = args[0]
# Make progressively weaker assumptions about "other"
other = ()
if len(args) == 2:
other = args[1]
if isinstance(other, dict):
for key in other:
self[key] = other[key]
elif hasattr(other, 'keys'):
for key in other.keys():
self[key] = other[key]
else:
for key, value in other:
self[key] = value
for key, value in kwds.items():
self[key] = value
__update = update # let subclasses override update without breaking __init__
__marker = object()
def pop(self, key, default=__marker):
'''od.pop(k[,d]) -> v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
'''
if key in self:
result = self[key]
del self[key]
return result
if default is self.__marker:
raise KeyError(key)
return default
def setdefault(self, key, default=None):
'od.setdefault(k[,d]) -> od.get(k,d), also set od[k]=d if k not in od'
if key in self:
return self[key]
self[key] = default
return default
def __repr__(self, _repr_running={}):
'od.__repr__() <==> repr(od)'
call_key = id(self), _get_ident()
if call_key in _repr_running:
return '...'
_repr_running[call_key] = 1
try:
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
finally:
del _repr_running[call_key]
def __reduce__(self):
'Return state information for pickling'
items = [[k, self[k]] for k in self]
inst_dict = vars(self).copy()
for k in vars(OrderedDict()):
inst_dict.pop(k, None)
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def copy(self):
'od.copy() -> a shallow copy of od'
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
'''od.__eq__(y) <==> od==y. Comparison to another OD is order-sensitive
while comparison to a regular mapping is order-insensitive.
'''
if isinstance(other, OrderedDict):
return len(self)==len(other) and self.items() == other.items()
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
# -- the following methods are only used in Python 2.7 --
def viewkeys(self):
"od.viewkeys() -> a set-like object providing a view on od's keys"
return KeysView(self)
def viewvalues(self):
"od.viewvalues() -> an object providing a view on od's values"
return ValuesView(self)
def viewitems(self):
"od.viewitems() -> a set-like object providing a view on od's items"
return ItemsView(self)

View File

@@ -12,11 +12,12 @@ from . import filters
class VCR(object):
def __init__(self, serializer='yaml', cassette_library_dir=None,
record_mode="once", filter_headers=(), custom_patches=(),
filter_query_parameters=(), before_record_request=None,
record_mode="once", filter_headers=(), ignore_localhost=False,
custom_patches=(), filter_query_parameters=(),
filter_post_data_parameters=(), before_record_request=None,
before_record_response=None, ignore_hosts=(),
match_on=('method', 'scheme', 'host', 'port', 'path', 'query',),
ignore_localhost=False, before_record=None):
match_on=('method', 'scheme', 'host', 'port', 'path', 'query'),
before_record=None, inject_cassette=False):
self.serializer = serializer
self.match_on = match_on
self.cassette_library_dir = cassette_library_dir
@@ -39,10 +40,12 @@ class VCR(object):
self.record_mode = record_mode
self.filter_headers = filter_headers
self.filter_query_parameters = filter_query_parameters
self.filter_post_data_parameters = filter_post_data_parameters
self.before_record_request = before_record_request or before_record
self.before_record_response = before_record_response
self.ignore_hosts = ignore_hosts
self.ignore_localhost = ignore_localhost
self.inject_cassette = inject_cassette
self._custom_patches = tuple(custom_patches)
def _get_serializer(self, serializer_name):
@@ -68,11 +71,14 @@ class VCR(object):
def use_cassette(self, path, with_current_defaults=False, **kwargs):
if with_current_defaults:
return Cassette.use(path, self.get_path_and_merged_config(path, **kwargs))
# This is made a function that evaluates every time a cassette is made so that
# changes that are made to this VCR instance that occur AFTER the use_cassette
# decorator is applied still affect subsequent calls to the decorated function.
args_getter = functools.partial(self.get_path_and_merged_config, path, **kwargs)
path, config = self.get_path_and_merged_config(path, **kwargs)
return Cassette.use(path, **config)
# This is made a function that evaluates every time a cassette
# is made so that changes that are made to this VCR instance
# that occur AFTER the `use_cassette` decorator is applied
# still affect subsequent calls to the decorated function.
args_getter = functools.partial(self.get_path_and_merged_config,
path, **kwargs)
return Cassette.use_arg_getter(args_getter)
def get_path_and_merged_config(self, path, **kwargs):
@@ -90,8 +96,13 @@ class VCR(object):
'match_on': self._get_matchers(matcher_names),
'record_mode': kwargs.get('record_mode', self.record_mode),
'before_record_request': self._build_before_record_request(kwargs),
'before_record_response': self._build_before_record_response(kwargs),
'custom_patches': self._custom_patches + kwargs.get('custom_patches', ())
'before_record_response': self._build_before_record_response(
kwargs
),
'custom_patches': self._custom_patches + kwargs.get(
'custom_patches', ()
),
'inject': kwargs.get('inject_cassette', self.inject_cassette)
}
return path, merged_config
@@ -121,6 +132,9 @@ class VCR(object):
filter_query_parameters = options.get(
'filter_query_parameters', self.filter_query_parameters
)
filter_post_data_parameters = options.get(
'filter_post_data_parameters', self.filter_post_data_parameters
)
before_record_request = options.get(
"before_record_request", options.get("before_record", self.before_record_request)
)
@@ -136,6 +150,9 @@ class VCR(object):
if filter_query_parameters:
filter_functions.append(functools.partial(filters.remove_query_parameters,
query_parameters_to_remove=filter_query_parameters))
if filter_post_data_parameters:
filter_functions.append(functools.partial(filters.remove_post_data_parameters,
post_data_parameters_to_remove=filter_post_data_parameters))
hosts_to_ignore = list(ignore_hosts)
if ignore_localhost:

View File

@@ -1,4 +1,9 @@
from six import BytesIO
from six.moves.urllib.parse import urlparse, urlencode, urlunparse
try:
from collections import OrderedDict
except ImportError:
from backport_collections import OrderedDict
import copy
@@ -22,3 +27,17 @@ def remove_query_parameters(request, query_parameters_to_remove):
uri_parts[4] = urlencode(new_query)
request.uri = urlunparse(uri_parts)
return request
def remove_post_data_parameters(request, post_data_parameters_to_remove):
if request.method == 'POST' and not isinstance(request.body, BytesIO):
post_data = OrderedDict()
for k, sep, v in [p.partition(b'=') for p in request.body.split(b'&')]:
if k in post_data:
post_data[k].append(v)
elif len(k) > 0 and k.decode('utf-8') not in post_data_parameters_to_remove:
post_data[k] = [v]
request.body = b'&'.join(
b'='.join([k, v])
for k, vals in post_data.items() for v in vals)
return request

View File

@@ -31,6 +31,8 @@ def query(r1, r2):
def body(r1, r2):
if hasattr(r1.body, 'read') and hasattr(r2.body, 'read'):
return r1.body.read() == r2.body.read()
return r1.body == r2.body

View File

@@ -1,3 +1,4 @@
from six import BytesIO, binary_type
from six.moves.urllib.parse import urlparse, parse_qsl
@@ -26,11 +27,25 @@ class Request(object):
def __init__(self, method, uri, body, headers):
self.method = method
self.uri = uri
self.body = body
self._was_file = hasattr(body, 'read')
if self._was_file:
self._body = body.read()
if not isinstance(self._body, binary_type):
self._body = self._body.encode('utf-8')
else:
self._body = body
self.headers = {}
for key in headers:
self.add_header(key, headers[key])
@property
def body(self):
return BytesIO(self._body) if self._was_file else self._body
@body.setter
def body(self, value):
self._body = value
def add_header(self, key, value):
# see class docstring for an explanation
if isinstance(value, (tuple, list)):

View File

@@ -204,7 +204,7 @@ class VCRConnection(object):
"""
pass
def getresponse(self, _=False):
def getresponse(self, _=False, **kwargs):
'''Retrieve the response'''
# Check to see if the cassette has a response for this request. If so,
# then return it