1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Filter Sensitive Data From Requests

Add the ability to filter out sensitive data, using one of three
methods: from headers, from a query string, and by using a custom
callback to modify the request.

Closes #67
This commit is contained in:
Kevin McCarthy
2014-04-22 21:35:16 -10:00
parent f317800cb7
commit e6fdc735e4
6 changed files with 234 additions and 16 deletions

View File

@@ -255,6 +255,49 @@ with my_vcr.use_cassette('test.yml'):
```
## Filter sensitive data from the request
If you are checking your cassettes into source control, and are using some form
of authentication in your tests, you can filter out that information so it won't
appear in your cassette files. There are a few ways to do this:
### Filter information from HTTP Headers
Use the `filter_headers` configuration option with a list of headers to filter.
```python
with my_vcr.use_cassette('test.yml', filter_headers=['authorization']):
# sensitive HTTP request goes here
```
### Filter information from HTTP querystring
Use the `filter_query_parameters` configuration option with a list of query
parameters to filter.
```python
with my_vcr.use_cassette('test.yml', filter_query_parameters=['api_key']):
requests.get('http://api.com/getdata?api_key=secretstring')
```
### Custom request filtering
If neither of these covers your use case, you can register a callback that will
manipulate the HTTP request before adding it to the cassette. Use the
`before_record` configuration option to so this. Here is an
example that will never record requests to the /login endpoint.
```python
def before_record_cb(request):
if request.path != '/login':
return request
my_vcr = vcr.VCR(
before_record = before_record_cb,
)
with my_vcr.use_cassette('test.yml'):
# your http code here
```
## Installation
VCR.py is a package on PyPI, so you can `pip install vcrpy` (first you may need
@@ -321,10 +364,11 @@ matchers didn't match. This can help you with debugging custom matchers.
## Changelog
* 1.0.0 (in development) - Bump supported Python3 version to 3.4, fix some
bugs with Boto support (thanks @marusich), fix error with URL field
capitalization in README (thanks @simon-weber), added some log messages
to help with debugging.
* 1.0.0 (in development) - Add support for filtering sensitive data from
requests, bump supported Python3 version to 3.4, fix some bugs with Boto
support (thanks @marusich), fix error with URL field capitalization in
README (thanks @simon-weber), added some log messages to help with
debugging.
* 0.7.0: VCR.py now supports Python 3! (thanks @asundg) Also I refactored
the stub connections quite a bit to add support for the putrequest and
putheader calls. This version also adds support for httplib2 (thanks

View File

@@ -0,0 +1,68 @@
import base64
import pytest
from six.moves.urllib.request import urlopen, Request
from six.moves.urllib.error import HTTPError
import vcr
def _request_with_auth(url, username, password):
request = Request(url)
base64string = base64.b64encode(
username.encode('ascii') + b':' + password.encode('ascii')
)
request.add_header(b"Authorization", b"Basic " + base64string)
return urlopen(request)
def _find_header(cassette, header):
for request in cassette.requests:
for k, v in request.headers:
if header.lower() == k.lower():
return True
return False
def test_filter_basic_auth(tmpdir):
url = 'http://httpbin.org/basic-auth/user/passwd'
cass_file = str(tmpdir.join('basic_auth_filter.yaml'))
my_vcr = vcr.VCR(match_on = ['url', 'method', 'headers'])
# 2 requests, one with auth failure and one with auth success
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']):
with pytest.raises(HTTPError):
resp = _request_with_auth(url, 'user', 'wrongpasswd')
assert resp.getcode() == 401
resp = _request_with_auth(url, 'user', 'passwd')
assert resp.getcode() == 200
# make same 2 requests, this time both served from cassette.
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
with pytest.raises(HTTPError):
resp = _request_with_auth(url, 'user', 'wrongpasswd')
assert resp.getcode() == 401
resp = _request_with_auth(url, 'user', 'passwd')
assert resp.getcode() == 200
# authorization header should not have been recorded
assert not _find_header(cass, 'authorization')
assert len(cass) == 2
def test_filter_querystring(tmpdir):
url = 'http://httpbin.org/?foo=bar'
cass_file = str(tmpdir.join('filter_qs.yaml'))
with vcr.use_cassette(cass_file, filter_query_parameters=['foo']):
urlopen(url)
with vcr.use_cassette(cass_file, filter_query_parameters=['foo']) as cass:
urlopen(url)
assert 'foo' not in cass.requests[0].url
def test_filter_callback(tmpdir):
url = 'http://httpbin.org/get'
cass_file = str(tmpdir.join('basic_auth_filter.yaml'))
def before_record_cb(request):
if request.path != '/get':
return request
my_vcr = vcr.VCR(
before_record = before_record_cb,
)
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
urlopen(url)
assert len(cass) == 0

View File

@@ -0,0 +1,28 @@
import mock
from vcr.filters import _remove_headers, _remove_query_parameters
from vcr.request import Request
def test_remove_headers():
request = mock.Mock(headers=[('hello','goodbye'),('secret','header')])
assert _remove_headers(request, ['secret']).headers == frozenset([('hello','goodbye')])
def test_remove_headers_empty():
request = mock.Mock(headers=[('hello','goodbye'),('secret','header')])
assert _remove_headers(request, []).headers == frozenset([('hello','goodbye'),('secret','header')])
def test_remove_query_parameters():
request = mock.Mock(url='http://g.com/?q=cowboys&w=1')
assert _remove_query_parameters(request, ['w']).path == '/?q=cowboys'
def test_remove_all_query_parameters():
request = mock.Mock(url='http://g.com/?q=cowboys&w=1')
assert _remove_query_parameters(request, ['w','q']).path == '/'
def test_remove_nonexistent_query_parameters():
request = mock.Mock(url='http://g.com/')
assert _remove_query_parameters(request, ['w','q']).path == '/'

View File

@@ -11,6 +11,7 @@ from contextdecorator import ContextDecorator
# Internal imports
from .patch import install, reset
from .persist import load_cassette, save_cassette
from .filters import filter_request
from .serializers import yamlserializer
from .matchers import requests_match, url, method
from .errors import UnhandledHTTPRequestError
@@ -30,10 +31,17 @@ class Cassette(ContextDecorator):
path,
serializer=yamlserializer,
record_mode='once',
match_on=[url, method]):
match_on=[url, method],
filter_headers=[],
filter_query_parameters=[],
before_record=None,
):
self._path = path
self._serializer = serializer
self._match_on = match_on
self._filter_headers = filter_headers
self._filter_query_parameters = filter_query_parameters
self._before_record = before_record
# self.data is the list of (req, resp) tuples
self.data = []
@@ -61,16 +69,40 @@ class Cassette(ContextDecorator):
def append(self, request, response):
'''Add a request, response pair to this cassette'''
request = filter_request(
request = request,
filter_headers = self._filter_headers,
filter_query_parameters = self._filter_query_parameters,
before_record = self._before_record
)
if not request:
return
self.data.append((request, response))
self.dirty = True
def _responses(self, request):
"""
internal API, returns an iterator with all responses matching
the request.
"""
request = filter_request(
request = request,
filter_headers = self._filter_headers,
filter_query_parameters = self._filter_query_parameters,
before_record = self._before_record
)
if not request:
return
for index, (stored_request, response) in enumerate(self.data):
if requests_match(request, stored_request, self._match_on):
yield index, response
def play_response(self, request):
'''
Get the response corresponding to a request, but only if it
hasn't been played back before, and mark it as played
'''
for index, (stored_request, response) in enumerate(self.data):
if requests_match(request, stored_request, self._match_on):
for index, response in self._responses(request):
if self.play_counts[index] == 0:
self.play_counts[index] += 1
return response
@@ -86,9 +118,7 @@ class Cassette(ContextDecorator):
This function isn't actually used by VCR internally, but is
provided as an external API.
'''
responses = \
[resp for req, resp in self.data if
requests_match(req, request, self._match_on)]
responses = [response for index, response in self._responses(request)]
if responses:
return responses
@@ -134,8 +164,7 @@ class Cassette(ContextDecorator):
def __contains__(self, request):
'''Return whether or not a request has been stored'''
for stored_request, response in self.data:
if requests_match(stored_request, request, self._match_on):
for response in self._responses(request):
return True
return False

View File

@@ -10,6 +10,9 @@ class VCR(object):
cassette_library_dir=None,
record_mode="once",
match_on=['url', 'method'],
filter_headers=[],
filter_query_parameters=[],
before_record=None,
):
self.serializer = serializer
self.match_on = match_on
@@ -27,6 +30,9 @@ class VCR(object):
'body': body,
}
self.record_mode = record_mode
self.filter_headers = filter_headers
self.filter_query_parameters = filter_query_parameters
self.before_record = before_record
def _get_serializer(self, serializer_name):
try:
@@ -65,6 +71,9 @@ class VCR(object):
"serializer": self._get_serializer(serializer_name),
"match_on": self._get_matchers(matcher_names),
"record_mode": kwargs.get('record_mode', self.record_mode),
"filter_headers": kwargs.get('filter_headers', self.filter_headers),
"filter_query_parameters": kwargs.get('filter_query_parameters', self.filter_query_parameters),
"before_record": kwargs.get("before_record", self.before_record),
}
return Cassette.load(path, **merged_config)

40
vcr/filters.py Normal file
View File

@@ -0,0 +1,40 @@
from six.moves.urllib.parse import urlparse, parse_qsl, urlunparse, urlencode
import copy
def _remove_headers(request, headers_to_remove):
out = []
for k, v in request.headers:
if k.lower() not in [h.lower() for h in headers_to_remove]:
out.append((k, v))
request.headers = frozenset(out)
return request
def _remove_query_parameters(request, query_parameters_to_remove):
if not hasattr(request, 'path' or not query_parameters_to_remote):
return request
url = urlparse(request.url)
q = parse_qsl(url.query)
q = [(k, v) for k, v in q if k not in query_parameters_to_remove]
if q:
request.path = url.path + '?' + urlencode(q)
else:
request.path = url.path
return request
def filter_request(
request,
filter_headers,
filter_query_parameters,
before_record
):
request = copy.copy(request) # don't mutate request object
if hasattr(request, 'headers') and filter_headers:
request = _remove_headers(request, filter_headers)
if filter_query_parameters:
request = _remove_query_parameters(request, filter_query_parameters)
if before_record:
request = before_record(request)
return request