1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-09 01:03:24 +00:00

Merge pull request #196 from agriffis/remove-replace

Enable header replacement rather than removal
This commit is contained in:
Aron Griffis
2015-11-07 19:01:00 -05:00
5 changed files with 323 additions and 92 deletions

View File

@@ -163,6 +163,39 @@ of post data parameters to filter.
with my_vcr.use_cassette('test.yml', filter_post_data_parameters=['client_secret']):
requests.post('http://api.com/postdata', data={'api_key': 'secretstring'})
Advanced use of filter_headers, filter_query_parameters and filter_post_data_parameters
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In all of the above cases, it's also possible to pass a list of ``(key, value)``
tuples where the value can be any of the following:
* A new value to replace the original value.
* ``None`` to remove the key/value pair. (Same as passing a simple key string.)
* A callable that returns a new value or ``None``.
So these two calls are the same:
.. code:: python
# original (still works)
vcr = VCR(filter_headers=['authorization'])
# new
vcr = VCR(filter_headers=[('authorization', None)])
Here are two examples of the new functionality:
.. code:: python
# replace with a static value (most common)
vcr = VCR(filter_headers=[('authorization', 'XXXXXX')])
# replace with a callable, for example when testing
# lots of different kinds of authorization.
def replace_auth(key, value, request):
auth_type = value.split(' ', 1)[0]
return '{} {}'.format(auth_type, 'XXXXXX')
Custom Request filtering
~~~~~~~~~~~~~~~~~~~~~~~~

View File

@@ -1,48 +1,145 @@
from vcr.filters import (
remove_headers,
remove_query_parameters,
remove_post_data_parameters
remove_headers, replace_headers,
remove_query_parameters, replace_query_parameters,
remove_post_data_parameters, replace_post_data_parameters,
)
from vcr.compat import mock
from vcr.request import Request
import json
def test_replace_headers():
# This tests all of:
# 1. keeping a header
# 2. removing a header
# 3. replacing a header
# 4. replacing a header using a callable
# 5. removing a header using a callable
# 6. replacing a header that doesn't exist
headers = {
'one': ['keep'],
'two': ['lose'],
'three': ['change'],
'four': ['shout'],
'five': ['whisper'],
}
request = Request('GET', 'http://google.com', '', headers)
replace_headers(request, [
('two', None),
('three', 'tada'),
('four', lambda key, value, request: value.upper()),
('five', lambda key, value, request: None),
('six', 'doesntexist'),
])
assert request.headers == {
'one': 'keep',
'three': 'tada',
'four': 'SHOUT',
}
def test_replace_headers_empty():
headers = {'hello': 'goodbye', 'secret': 'header'}
request = Request('GET', 'http://google.com', '', headers)
replace_headers(request, [])
assert request.headers == headers
def test_replace_headers_callable():
# This goes beyond test_replace_headers() to ensure that the callable
# receives the expected arguments.
headers = {'hey': 'there'}
request = Request('GET', 'http://google.com', '', headers)
callme = mock.Mock(return_value='ho')
replace_headers(request, [('hey', callme)])
assert request.headers == {'hey': 'ho'}
assert callme.call_args == ((), {'request': request,
'key': 'hey',
'value': 'there'})
def test_remove_headers():
# Test the backward-compatible API wrapper.
headers = {'hello': ['goodbye'], 'secret': ['header']}
request = Request('GET', 'http://google.com', '', headers)
remove_headers(request, ['secret'])
assert request.headers == {'hello': 'goodbye'}
def test_remove_headers_empty():
headers = {'hello': 'goodbye', 'secret': 'header'}
request = Request('GET', 'http://google.com', '', headers)
remove_headers(request, [])
assert request.headers == headers
def test_replace_query_parameters():
# This tests all of:
# 1. keeping a parameter
# 2. removing a parameter
# 3. replacing a parameter
# 4. replacing a parameter using a callable
# 5. removing a parameter using a callable
# 6. replacing a parameter that doesn't exist
uri = 'http://g.com/?one=keep&two=lose&three=change&four=shout&five=whisper'
request = Request('GET', uri, '', {})
replace_query_parameters(request, [
('two', None),
('three', 'tada'),
('four', lambda key, value, request: value.upper()),
('five', lambda key, value, request: None),
('six', 'doesntexist'),
])
assert request.query == [
('four', 'SHOUT'),
('one', 'keep'),
('three', 'tada'),
]
def test_remove_all_query_parameters():
uri = 'http://g.com/?q=cowboys&w=1'
request = Request('GET', uri, '', {})
replace_query_parameters(request, [('w', None), ('q', None)])
assert request.uri == 'http://g.com/'
def test_replace_query_parameters_callable():
# This goes beyond test_replace_query_parameters() to ensure that the
# callable receives the expected arguments.
uri = 'http://g.com/?hey=there'
request = Request('GET', uri, '', {})
callme = mock.Mock(return_value='ho')
replace_query_parameters(request, [('hey', callme)])
assert request.uri == 'http://g.com/?hey=ho'
assert callme.call_args == ((), {'request': request,
'key': 'hey',
'value': 'there'})
def test_remove_query_parameters():
# Test the backward-compatible API wrapper.
uri = 'http://g.com/?q=cowboys&w=1'
request = Request('GET', uri, '', {})
remove_query_parameters(request, ['w'])
assert request.uri == 'http://g.com/?q=cowboys'
def test_remove_all_query_parameters():
uri = 'http://g.com/?q=cowboys&w=1'
request = Request('GET', uri, '', {})
remove_query_parameters(request, ['w', 'q'])
assert request.uri == 'http://g.com/'
def test_remove_nonexistent_query_parameters():
uri = 'http://g.com/'
request = Request('GET', uri, '', {})
remove_query_parameters(request, ['w', 'q'])
assert request.uri == 'http://g.com/'
def test_replace_post_data_parameters():
# This tests all of:
# 1. keeping a parameter
# 2. removing a parameter
# 3. replacing a parameter
# 4. replacing a parameter using a callable
# 5. removing a parameter using a callable
# 6. replacing a parameter that doesn't exist
body = b'one=keep&two=lose&three=change&four=shout&five=whisper'
request = Request('POST', 'http://google.com', body, {})
replace_post_data_parameters(request, [
('two', None),
('three', 'tada'),
('four', lambda key, value, request: value.upper()),
('five', lambda key, value, request: None),
('six', 'doesntexist'),
])
assert request.body == b'one=keep&three=tada&four=SHOUT'
def test_remove_post_data_parameters():
# Test the backward-compatible API wrapper.
body = b'id=secret&foo=bar'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
@@ -52,25 +149,42 @@ def test_remove_post_data_parameters():
def test_preserve_multiple_post_data_parameters():
body = b'id=secret&foo=bar&foo=baz'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
replace_post_data_parameters(request, [('id', None)])
assert request.body == b'foo=bar&foo=baz'
def test_remove_all_post_data_parameters():
body = b'id=secret&foo=bar'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id', 'foo'])
replace_post_data_parameters(request, [('id', None), ('foo', None)])
assert request.body == b''
def test_remove_nonexistent_post_data_parameters():
body = b''
def test_replace_json_post_data_parameters():
# This tests all of:
# 1. keeping a parameter
# 2. removing a parameter
# 3. replacing a parameter
# 4. replacing a parameter using a callable
# 5. removing a parameter using a callable
# 6. replacing a parameter that doesn't exist
body = b'{"one": "keep", "two": "lose", "three": "change", "four": "shout", "five": "whisper"}'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b''
request.headers['Content-Type'] = 'application/json'
replace_post_data_parameters(request, [
('two', None),
('three', 'tada'),
('four', lambda key, value, request: value.upper()),
('five', lambda key, value, request: None),
('six', 'doesntexist'),
])
request_data = json.loads(request.body.decode('utf-8'))
expected_data = json.loads('{"one": "keep", "three": "tada", "four": "SHOUT"}')
assert request_data == expected_data
def test_remove_json_post_data_parameters():
# Test the backward-compatible API wrapper.
body = b'{"id": "secret", "foo": "bar", "baz": "qux"}'
request = Request('POST', 'http://google.com', body, {})
request.headers['Content-Type'] = 'application/json'
@@ -84,13 +198,5 @@ def test_remove_all_json_post_data_parameters():
body = b'{"id": "secret", "foo": "bar"}'
request = Request('POST', 'http://google.com', body, {})
request.headers['Content-Type'] = 'application/json'
remove_post_data_parameters(request, ['id', 'foo'])
assert request.body == b'{}'
def test_remove_nonexistent_json_post_data_parameters():
body = b'{}'
request = Request('POST', 'http://google.com', body, {})
request.headers['Content-Type'] = 'application/json'
remove_post_data_parameters(request, ['id'])
replace_post_data_parameters(request, [('id', None), ('foo', None)])
assert request.body == b'{}'

View File

@@ -47,31 +47,44 @@ def test_vcr_before_record_request_params():
if request.path != '/get':
return request
test_vcr = VCR(filter_headers=('cookie',), before_record_request=before_record_cb,
test_vcr = VCR(filter_headers=('cookie', ('bert', 'ernie')),
before_record_request=before_record_cb,
ignore_hosts=('www.test.com',), ignore_localhost=True,
filter_query_parameters=('foo',))
filter_query_parameters=('foo', ('tom', 'jerry')),
filter_post_data_parameters=('posted', ('no', 'trespassing')))
with test_vcr.use_cassette('test') as cassette:
assert cassette.filter_request(Request('GET', base_path + 'get', '', {})) is None
assert cassette.filter_request(Request('GET', base_path + 'get2', '', {})) is not None
# Test explicit before_record_cb
request_get = Request('GET', base_path + 'get', '', {})
assert cassette.filter_request(request_get) is None
request = Request('GET', base_path + 'get2', '', {})
assert cassette.filter_request(request) is not None
assert cassette.filter_request(Request('GET', base_path + '?foo=bar', '', {})).query == []
assert cassette.filter_request(
Request('GET', base_path + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})).headers == {'other': 'fun'}
assert cassette.filter_request(
Request(
'GET', base_path + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'}
)
).headers == {'other': 'fun'}
# Test filter_query_parameters
request = Request('GET', base_path + '?foo=bar', '', {})
assert cassette.filter_request(request).query == []
request = Request('GET', base_path + '?tom=nobody', '', {})
assert cassette.filter_request(request).query == [('tom', 'jerry')]
assert cassette.filter_request(Request('GET', 'http://www.test.com' + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})) is None
# Test filter_headers
request = Request('GET', base_path + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun', 'bert': 'nobody'})
assert (cassette.filter_request(request).headers ==
{'other': 'fun', 'bert': 'ernie'})
# Test ignore_hosts
request = Request('GET', 'http://www.test.com' + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})
assert cassette.filter_request(request) is None
# Test ignore_localhost
request = Request('GET', 'http://localhost:8000' + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})
assert cassette.filter_request(request) is None
with test_vcr.use_cassette('test', before_record_request=None) as cassette:
# Test that before_record can be overwritten with
assert cassette.filter_request(Request('GET', base_path + 'get', '', {})) is not None
# Test that before_record can be overwritten in context manager.
assert cassette.filter_request(request_get) is not None
def test_vcr_before_record_response_iterable():

View File

@@ -199,22 +199,28 @@ class VCR(object):
'ignore_localhost', self.ignore_localhost
)
if filter_headers:
replacements = [h if isinstance(h, tuple) else (h, None)
for h in filter_headers]
filter_functions.append(
functools.partial(
filters.remove_headers,
headers_to_remove=filter_headers
filters.replace_headers,
replacements=replacements,
)
)
if filter_query_parameters:
replacements = [p if isinstance(p, tuple) else (p, None)
for p in filter_query_parameters]
filter_functions.append(functools.partial(
filters.remove_query_parameters,
query_parameters_to_remove=filter_query_parameters
filters.replace_query_parameters,
replacements=replacements,
))
if filter_post_data_parameters:
replacements = [p if isinstance(p, tuple) else (p, None)
for p in filter_post_data_parameters]
filter_functions.append(
functools.partial(
filters.remove_post_data_parameters,
post_data_parameters_to_remove=filter_post_data_parameters
filters.replace_post_data_parameters,
replacements=replacements,
)
)

View File

@@ -2,48 +2,121 @@ from six import BytesIO, text_type
from six.moves.urllib.parse import urlparse, urlencode, urlunparse
import json
from .compat import collections
def remove_headers(request, headers_to_remove):
def replace_headers(request, replacements):
"""
Replace headers in request according to replacements. The replacements
should be a list of (key, value) pairs where the value can be any of:
1. A simple replacement string value.
2. None to remove the given header.
3. A callable which accepts (key, value, request) and returns a string
value or None.
"""
new_headers = request.headers.copy()
for k in headers_to_remove:
for k, rv in replacements:
if k in new_headers:
del new_headers[k]
ov = new_headers.pop(k)
if callable(rv):
rv = rv(key=k, value=ov, request=request)
if rv is not None:
new_headers[k] = rv
request.headers = new_headers
return request
def remove_query_parameters(request, query_parameters_to_remove):
def remove_headers(request, headers_to_remove):
"""
Wrap replace_headers() for API backward compatibility.
"""
replacements = [(k, None) for k in headers_to_remove]
return replace_headers(request, replacements)
def replace_query_parameters(request, replacements):
"""
Replace query parameters in request according to replacements. The
replacements should be a list of (key, value) pairs where the value can be
any of:
1. A simple replacement string value.
2. None to remove the given header.
3. A callable which accepts (key, value, request) and returns a string
value or None.
"""
query = request.query
new_query = [(k, v) for (k, v) in query
if k not in query_parameters_to_remove]
if len(new_query) != len(query):
uri_parts = list(urlparse(request.uri))
uri_parts[4] = urlencode(new_query)
request.uri = urlunparse(uri_parts)
new_query = []
replacements = dict(replacements)
for k, ov in query:
if k not in replacements:
new_query.append((k, ov))
else:
rv = replacements[k]
if callable(rv):
rv = rv(key=k, value=ov, request=request)
if rv is not None:
new_query.append((k, rv))
uri_parts = list(urlparse(request.uri))
uri_parts[4] = urlencode(new_query)
request.uri = urlunparse(uri_parts)
return request
def remove_query_parameters(request, query_parameters_to_remove):
"""
Wrap replace_query_parameters() for API backward compatibility.
"""
replacements = [(k, None) for k in query_parameters_to_remove]
return replace_query_parameters(request, replacements)
def replace_post_data_parameters(request, replacements):
"""
Replace post data in request--either form data or json--according to
replacements. The replacements should be a list of (key, value) pairs where
the value can be any of:
1. A simple replacement string value.
2. None to remove the given header.
3. A callable which accepts (key, value, request) and returns a string
value or None.
"""
replacements = dict(replacements)
if request.method == 'POST' and not isinstance(request.body, BytesIO):
if request.headers.get('Content-Type') == 'application/json':
json_data = json.loads(request.body.decode('utf-8'))
for k, rv in replacements.items():
if k in json_data:
ov = json_data.pop(k)
if callable(rv):
rv = rv(key=k, value=ov, request=request)
if rv is not None:
json_data[k] = rv
request.body = json.dumps(json_data).encode('utf-8')
else:
if isinstance(request.body, text_type):
request.body = request.body.encode('utf-8')
splits = [p.partition(b'=') for p in request.body.split(b'&')]
new_splits = []
for k, sep, ov in splits:
if sep is None:
new_splits.append((k, sep, ov))
else:
rk = k.decode('utf-8')
if rk not in replacements:
new_splits.append((k, sep, ov))
else:
rv = replacements[rk]
if callable(rv):
rv = rv(key=rk, value=ov.decode('utf-8'),
request=request)
if rv is not None:
new_splits.append((k, sep, rv.encode('utf-8')))
request.body = b'&'.join(k if sep is None else b''.join([k, sep, v])
for k, sep, v in new_splits)
return request
def remove_post_data_parameters(request, post_data_parameters_to_remove):
if request.method == 'POST' and not isinstance(request.body, BytesIO):
if request.headers.get('Content-Type') == 'application/json':
json_data = json.loads(request.body.decode('utf-8'))
for k in list(json_data.keys()):
if k in post_data_parameters_to_remove:
del json_data[k]
request.body = json.dumps(json_data).encode('utf-8')
else:
post_data = collections.OrderedDict()
if isinstance(request.body, text_type):
request.body = request.body.encode('utf-8')
for k, sep, v in (p.partition(b'=') for p in request.body.split(b'&')):
if k in post_data:
post_data[k].append(v)
elif len(k) > 0 and k.decode('utf-8') not in post_data_parameters_to_remove:
post_data[k] = [v]
request.body = b'&'.join(
b'='.join([k, v])
for k, vals in post_data.items() for v in vals)
return request
"""
Wrap replace_post_data_parameters() for API backward compatibility.
"""
replacements = [(k, None) for k in post_data_parameters_to_remove]
return replace_post_data_parameters(request, replacements)