1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Compare commits

...

123 Commits

Author SHA1 Message Date
Kevin McCarthy
5cfb005b48 bump version 2015-05-05 22:22:37 -10:00
Kevin McCarthy
4ade547779 python3 uses capital headers sometimes, let's do a case-agnostic header removal 2015-05-05 21:59:48 -10:00
Kevin McCarthy
dc8eedf555 we dont actually need lxml for tests 2015-05-05 21:59:31 -10:00
Kevin McCarthy
5b9b6cd8b5 dont try to load chunked responses from cassettes, we already unchunked everything 2015-05-05 21:32:32 -10:00
Kevin McCarthy
856c38479a add failing test for requests2.7/gzip issue 2015-05-05 20:49:12 -10:00
Kevin McCarthy
52496cd091 really add requests 2.7 to tox 2015-05-05 19:50:20 -10:00
Kevin McCarthy
bc26ce877a add requests 2.7 to tox and travis 2015-05-05 19:44:19 -10:00
Kevin McCarthy
8db0d245a5 bump version 2015-04-11 11:21:31 -10:00
Kevin McCarthy
47544b08fe Merge pull request #148 from ralphbean/master
Ship extra bits with the pypi tarball.
2015-04-11 11:18:57 -10:00
Ralph Bean
4e560fc8db Ship extra bits with the pypi tarball.
This change should make new tarballs uploaded to pypi include various
nice pieces:

- The README
- The LICENSE
- The tests

The text of the license actually specifies that the full text must be
distributed with all copies of the software.  So, you need it to be in
compliance with the MIT license.

The README is just nice to have, and the tests are particularly nice for
my use case.  I am packaging vcrpy for inclusion in the Fedora linux
distribution and:

- We like to use the tarball from pypi because it is the same source
  distribution that everyone else is using.
- We like to run the tests before we build the rpm in our build system
  to make sure nothing crazy is going on.

Of course, we can use the tarball for the source and then do a second
step to clone the source and get the tests.  But, this is more work than
we like if we can just get the tests added to the tarball.  Other
distributions (like Debian) like this too.
2015-04-11 14:53:48 -04:00
Ivan Malison
8bb3c6beee v1.4.0 2015-04-02 12:20:16 -07:00
Ivan Malison
df3ad5f35c remove compat.py in favor of backport_collections. 2015-04-02 10:32:34 -07:00
Ivan Malison
e8a6a7a49f add backport_collections, tweaks to setup.py. 2015-04-02 10:23:22 -07:00
Ivan Malison
881138cb8d inject_cassette fallout. 2015-04-01 17:46:35 -07:00
Ivan Malison
639dba6f7a Write test for #145 that checks behavior of with_current_defaults. 2015-04-01 17:30:05 -07:00
Ivan Malison
b9bdc6401d inject_cassette kwarg. 2015-04-01 17:30:05 -07:00
Ivan Malison
3ca5529d26 Touch ups. 2015-04-01 15:38:59 -07:00
Sam Stavinoha
e3f2bc8369 fix with_current_defaults causing TypeError
The from_args() method in cassette.py was
throwing a TypeError when calling

    use_cassette(..., with_current_defaults=True)
    ...
    TypeError: from_args() takes exactly 3 arguments (4 given)

The path was then being passed to use() twice.
2015-03-31 21:08:26 -05:00
Edward Stone
fc4e985ee9 fallback to compat OrderedDict if collections.OrderedDict unavailable 2015-03-31 13:12:13 -07:00
Edward Stone
9038bc9066 fix docs for post data filter 2015-03-31 13:12:13 -07:00
Edward Stone
0def349420 Add ability to filter post data parameters 2015-03-31 13:12:13 -07:00
Ivan Malison
0dd7b05990 Get rid of all the constructor parameters that were removed in 0871c3b87c 2015-03-31 13:03:11 -07:00
Ivan Malison
630088599f Update copyright. 2015-03-31 13:02:59 -07:00
Ivan Malison
870ab276c4 Possible fix for #140. 2015-03-25 13:01:55 -07:00
Ivan 'Goat' Malison
779f3b0474 Merge pull request #141 from IvanMalison/post_files_through_requests
Add support for posting files through requests. closes #121
2015-03-24 16:43:13 -07:00
Ivan Malison
b948ed4857 Fix python3 support for requests file uploads. 2015-03-24 15:41:14 -07:00
Ivan Malison
c43e618635 Add mention of urllib3 support in readme. 2015-03-24 14:24:47 -07:00
Ivan Malison
5bd40a447a Add setter to body on vcr's request. 2015-03-24 14:11:16 -07:00
Ivan Malison
4b4be7f661 Don't use 2.7+ style ',' separated with. 2015-03-24 14:11:16 -07:00
Ivan Malison
6602a449b1 Add support for posting files through requests. closes #121. Possibly #134. 2015-03-24 14:11:16 -07:00
Ivan Malison
7cd7264034 fix tox/urllib3 stuff. 2015-03-24 14:10:14 -07:00
Ivan Malison
e9c690b9e7 Version 1.3.0. 2015-03-23 18:10:26 -07:00
Ivan Malison
bba5df2fbb clarifying comment in patch.py. 2015-03-23 18:00:23 -07:00
Ivan Malison
39c3b15e02 unused imports. 2015-03-23 17:56:46 -07:00
Ivan Malison
c87e6d6f6a Clarifying comments in patch.py. 2015-03-23 17:55:49 -07:00
Ivan Malison
5ab77e22db Use suggested emacs style coding statements (see https://www.python.org/dev/peps/pep-0263/). 2015-03-23 17:55:49 -07:00
Ivan 'Goat' Malison
ec6f27bbad Merge pull request #138 from aisch/patch-and-test-urllib3
update urllib3 patch/stub to be same as used for requests and add tests
2015-03-23 17:47:42 -07:00
aisch
8930c97ff7 rm unused imports 2015-03-23 13:56:48 -07:00
aisch
e6b43a0374 rename urllib3 patch method and rm unused imports from tests 2015-03-23 13:43:30 -07:00
aisch
63ec95be06 update urllib3 patch/stub to be same as used for requests and add tests 2015-03-23 12:12:49 -07:00
Kevin McCarthy
84c45b2742 Merge pull request #136 from abhinav/https-port-fix
Fix default port for HTTPS
2015-02-24 09:12:49 -10:00
Abhinav Gupta
87a25e9ab0 Fix httplib2 integration test. 2015-02-24 00:10:08 -08:00
Abhinav Gupta
2473bdb77a Fix default port for HTTPS. 2015-02-23 23:37:04 -08:00
Ivan 'Goat' Malison
32831d4151 Merge pull request #135 from RomuloOliveira/patch-1
Fix missing quotes on Custom Response Filtering
2015-01-28 12:02:57 -08:00
Rômulo Oliveira
4991d6f1c8 Fix missing quotes on Custom Response Filtering
Missing quotes are bad
2015-01-28 11:34:47 -02:00
Ivan Malison
14ef1e87f7 Add custom_patches section to README.md 2015-01-08 14:02:41 -08:00
Ivan 'Goat' Malison
fb14739cc1 Merge pull request #133 from IvanMalison/custom_patches
Custom patches
2015-01-08 11:08:55 -08:00
Ivan Malison
a7c7e4e279 Bump version to 1.2.0 2015-01-08 10:56:39 -08:00
Ivan Malison
c0a22df7ed Add ability to add custom patches to vcr and cassettes. 2015-01-08 10:54:27 -08:00
Ivan Malison
83aed99058 Bump vesrsion to 1.1.4, add to release notes. 2014-12-26 05:26:24 -05:00
Ivan Malison
e1f65bcbdc Add force reset around calls to actual connection from stubs, to ensure
compatibility with version of httplib/urlib2 in python 2.7.9. Closes #130.
2014-12-26 05:10:20 -05:00
Kevin McCarthy
5301149bd8 Merge pull request #128 from gazpachoking/patch-1
Update changelog to note requests 2.5 support
2014-12-09 08:55:52 -10:00
Chase Sterling
0297fcdde7 Update changelog to note requests 2.5 support 2014-12-09 13:26:46 -05:00
Kevin McCarthy
9480954c33 update release notes 2014-12-08 17:10:35 -10:00
Kevin McCarthy
8432ad32f1 Merge pull request #127 from gazpachoking/1.1.3
Version bump to v1.1.3
2014-12-08 17:09:07 -10:00
Chase Sterling
fabef3d988 Version bump to v1.1.3 2014-12-08 21:43:01 -05:00
Ivan 'Goat' Malison
da45f46b2d Merge pull request #125 from gazpachoking/pool_is_none
Fix crash with requests 2.5 where connectionpool was None
2014-12-08 13:20:36 -08:00
Ivan 'Goat' Malison
562a0ebadc Merge pull request #126 from gazpachoking/116
Play back requests requests on windows. fix #116
2014-12-08 12:29:34 -08:00
Chase Sterling
ef8ba6d51b Add requests 2.5 to testing list in .travis.yml and tox.ini 2014-12-08 14:40:55 -05:00
Chase Sterling
f6aa6eac84 Play back requests requests on windows. fix #116 2014-12-08 14:28:48 -05:00
Chase Sterling
821e148752 Fix crash with requests 2.5 where connectionpool was None 2014-12-07 13:49:23 -05:00
Ivan Malison
7306205b8a Improve test_new_episodes_record_mode_two_times test. 2014-11-21 17:15:15 -08:00
Nithin Reddy
2a128893cc Adds a test to ensure that the cassette created with "new_episodes" has different expected behavior when opened with "once". 2014-11-21 09:47:28 -08:00
Nithin Reddy
5162d183e5 Fixes #123. When attempting to replay the same request twice using record_mode="new_episodes", vcr.py raises UnhandledHTTPRequestError. 2014-11-20 19:07:21 -08:00
Ivan Malison
9d52c3ed42 Remove warning message caused by lack of is_verified property on HTTPSConnection stub. 2014-11-13 16:32:38 -08:00
Ivan 'Goat' Malison
0e37759175 Merge pull request #118 from rtaboada/fix-response-stub-headers-field
Create headers field in VCRHTTPResponse. Fixes #117.
2014-11-03 04:07:12 -08:00
Ivan 'Goat' Malison
78c6258ba3 Merge pull request #119 from telaviv/make_boto_tests_pass_again
test_boto_stubs passes again.
2014-10-31 00:14:45 -07:00
Shawn Krisman
b047336690 test_boto_stubs passes again. 2014-10-30 16:08:17 -07:00
Rodrigo Taboada
c955a5ea88 String in request body should be bytes. 2014-10-24 18:30:32 -02:00
Rodrigo Taboada
5423d99f5a Tests for VCRHTTPResponse headers field. 2014-10-24 17:40:51 -02:00
Rodrigo Taboada
a71c15f398 Create headers field in VCRHTTPResponse. Fixes #117. 2014-10-24 16:37:12 -02:00
Ivan Malison
6e049ba7a1 version bump to v1.1.2 2014-10-08 12:11:53 -07:00
Ivan Malison
916e7839e5 Actually use pytest.raises in test. 2014-10-07 13:45:09 -07:00
Ivan Malison
99692a92d2 Handle unicode error in json serialize properly. 2014-10-07 13:21:47 -07:00
Ivan Malison
a9a68ba44b Random tweaks. 2014-10-05 18:37:01 -07:00
Ivan Malison
e9f35db405 Remove .travis.yml changes. 2014-10-05 16:42:46 -07:00
Ivan Malison
7193407a07 Remove ipdb because it causes python below 2.6 to blow up. 2014-10-03 01:40:02 -07:00
Ivan Malison
c3427ae3a2 Fix pip install of tox in travis. 2014-10-02 15:48:29 -07:00
Ivan Malison
3a46a6f210 travis through tox. 2014-10-02 15:26:22 -07:00
Ivan Malison
163181844b Refactor tox.ini using new 1.8 features. 2014-10-02 14:57:53 -07:00
Ivan Malison
2c6f072d11 better logging when matches aren't working. 2014-09-25 04:49:00 -07:00
Ivan Malison
361ed82a10 Bump version to 1.1.1 2014-09-22 19:22:52 -07:00
Ivan Malison
0871c3b87c Remove instance variables for filter_headers, filter_query_params, ignore_localhost and ignore_hosts. These still exist on the VCR object, but they are automatically translated into a filter function when passed to the cassette. 2014-09-22 17:57:22 -07:00
Ivan 'Goat' Malison
d484dee50f Merge pull request #110 from IvanMalison/use_cassette_decorator_pytest_compatibility
Fix use_cassette decorator in python 2 by using wrapt.decorator
2014-09-22 17:02:23 -07:00
Ivan Malison
b046ee4bb1 Fix use_cassette decorator in python 2 by using wrapt.decorator. Add wrapt as dependency. 2014-09-22 16:40:09 -07:00
Ivan 'Goat' Malison
3dea853482 Merge pull request #108 from IvanMalison/fix_CassetteContextDecorator_nesting_issues
Fix cassette context decorator nesting issues
2014-09-21 05:18:41 -07:00
Ivan Malison
113c95f971 Bump setup.py version to v1.1.0. minor tweaks to readme. 2014-09-21 05:14:30 -07:00
Ivan Malison
a2c947dc48 Fix last bit of of #109. 2014-09-21 05:06:28 -07:00
Ivan Malison
757ad9c836 Revert "Remove ConnectionRemover class that tried to get rid of vcr connections in ConnectionPools."
This reverts commit dc249b0965.

Conflicts:
	vcr/patch.py
2014-09-20 11:59:25 -07:00
Ivan Malison
18e5898ec4 Return a tuple from the _request function on CassettePatcherBuilder even if import fails. Make _recursively_apply_get_cassette_subclass actually work with dictionaries. 2014-09-20 11:28:59 -07:00
Ivan Malison
83211a1887 Make changes from b1cdd50e9b compatible with requests1.x; Update Readme.md with description of before_record_response 2014-09-20 11:05:25 -07:00
Ivan Malison
dc249b0965 Remove ConnectionRemover class that tried to get rid of vcr connections in ConnectionPools. 2014-09-19 19:02:59 -07:00
Ivan Malison
121ed79172 Mark bad test xfail. 2014-09-19 17:10:19 -07:00
Ivan Malison
b1cdd50e9b Fix some of the issues from #109 2014-09-19 17:06:53 -07:00
Ivan Malison
1018867838 Revert "Fixed issue in test_nested_context_managers_with_session_created_before_first_nesting. by using a single class and patching cassette on that class. Not a great solution :\"
This reverts commit 2bf23b2cdf.
2014-09-19 14:32:21 -07:00
Ivan Malison
b6e96020c1 Use {[testenv]deps}, instead of repeating testing requirements. Write another failing test for #109 2014-09-19 14:31:49 -07:00
Ivan Malison
8947f0fc5c Add failing test for session still being attached to cassette after context is gone. 2014-09-18 17:03:13 -07:00
Ivan Malison
2bf23b2cdf Fixed issue in test_nested_context_managers_with_session_created_before_first_nesting. by using a single class and patching cassette on that class. Not a great solution :\ 2014-09-18 17:01:48 -07:00
Ivan Malison
58fcb2b453 Add test that fails because of the fact that a new class is used for each cassette context instead of replacing the cassette on the existing mock connection. 2014-09-18 16:17:48 -07:00
Ivan Malison
0c19acd74f Use contextdecorator from contextlib2. add logging for entering context. 2014-09-18 15:25:42 -07:00
Ivan Malison
4868a63876 Refactor build_patchers into class. Fix issue with patching non existent attribute with hasattr. 2014-09-18 14:42:56 -07:00
Ivan Malison
e1e08c7a2c hasattr check for requests 2.0 use cassette added type for httplib2 dictionary patch. 2014-09-18 08:02:50 -07:00
Ivan Malison
5edc58f10c Check for old style class when building subclass. 2014-09-18 07:10:52 -07:00
Ivan Malison
2193008150 Python version agnostic way of getting the next item in the generator. 2014-09-18 05:59:03 -07:00
Ivan Malison
958aac3af3 Use mock for patching http connection objects. 2014-09-18 05:32:55 -07:00
Ivan Malison
9a564586a4 Failing tests for nested context decoration. 2014-09-18 03:46:39 -07:00
Ivan Malison
643a4c91ee Change use_cassette to pass a function to CassetteContextDecorator so that changes to the default settings on the vcr properly propogate. 2014-09-18 02:52:44 -07:00
Ivan Malison
472cc3bffe use_cassette -> CassetteContextDecorator 2014-09-17 23:22:43 -07:00
Ivan Malison
8db46002a3 Fix failure in test_local_host resulting from attempting to add tuple to list. 2014-09-17 21:48:28 -07:00
Ivan Malison
a08c90c5d6 Revert "Add global toggle to use_cassette."
This reverts commit 366e2b75bb.

Conflicts:
	tests/unit/test_cassettes.py
2014-09-17 21:42:25 -07:00
Ivan Malison
8e01426056 Change default paramters to VCR from lists to tuples. 2014-09-17 19:29:02 -07:00
Ivan Malison
9a4f5f23a4 Add before_record_response to Cassette and VCR. 2014-09-17 04:10:05 -07:00
Ivan Malison
366e2b75bb Add global toggle to use_cassette. 2014-09-17 01:28:54 -07:00
Ivan Malison
0cfe63ef6e Bump version number for new use_cassette_decorator. 2014-09-16 23:53:50 -07:00
Ivan Malison
cb05f4163c Add use_cassette class so functinos that are decorated with use_cassette can be called multiple times. 2014-09-16 23:45:05 -07:00
Kevin McCarthy
20057a6815 Merge pull request #101 from IvanMalison/minor_cleanup
Clean up __init__.py .
2014-09-15 19:10:40 -10:00
Ivan Malison
0d313502b8 Clean up __init__.py . 2014-09-15 21:10:24 -07:00
Kevin McCarthy
d9c2b4b25d Merge pull request #100 from IvanMalison/immutable_keyword_defaults
Make defaults of Cassette constructor immutable. Clean up whitespace. Re...
2014-09-12 14:01:55 -10:00
Ivan Malison
640681138a Make defaults of Cassette constructor immutable. Clean up whitespace. Remove unused import. 2014-09-12 02:16:32 -07:00
Kevin McCarthy
a02bbbab2b Merge pull request #97 from matt-thomson/multiple-headers
Fix serialization/deserialization of multiple headers with the same value
2014-09-07 10:06:26 -10:00
Matt Thomson
f719f90e63 Fix multiple header behaviour.
Join multiple header values together, rather than losing duplicates with a
dict.
2014-09-06 17:15:15 +01:00
Matt Thomson
3c410b5f9d Don't write header values multiple times.
On Python 3, response.msg.keys() contains the same value multiple times if
there are multiple headers with the same value.  Work around this by
converting to a set before iterating over it.
2014-09-06 16:57:12 +01:00
Matt Thomson
7a5795a547 Add test to demonstrate Python 3 multiple headers bug. 2014-09-06 16:32:29 +01:00
41 changed files with 1448 additions and 1046 deletions

2
.gitignore vendored
View File

@@ -6,3 +6,5 @@ dist/
.coverage
*.egg-info/
pytestdebug.log
fixtures/

View File

@@ -8,9 +8,14 @@ env:
- WITH_LIB="requests2.2"
- WITH_LIB="requests2.3"
- WITH_LIB="requests2.4"
- WITH_LIB="requests2.5"
- WITH_LIB="requests2.7"
- WITH_LIB="requests1.x"
- WITH_LIB="httplib2"
- WITH_LIB="boto"
- WITH_LIB="urllib31.7"
- WITH_LIB="urllib31.9"
- WITH_LIB="urllib31.10"
matrix:
allow_failures:
- env: WITH_LIB="boto"
@@ -33,6 +38,11 @@ install:
- if [ $WITH_LIB = "requests2.2" ] ; then pip install requests==2.2.1; fi
- if [ $WITH_LIB = "requests2.3" ] ; then pip install requests==2.3.0; fi
- if [ $WITH_LIB = "requests2.4" ] ; then pip install requests==2.4.0; fi
- if [ $WITH_LIB = "requests2.5" ] ; then pip install requests==2.5.0; fi
- if [ $WITH_LIB = "requests2.7" ] ; then pip install requests==2.7.0; fi
- if [ $WITH_LIB = "httplib2" ] ; then pip install httplib2; fi
- if [ $WITH_LIB = "boto" ] ; then pip install boto; fi
- if [ $WITH_LIB = "urllib31.7" ] ; then pip install certifi urllib3==1.7.1; fi
- if [ $WITH_LIB = "urllib31.9" ] ; then pip install certifi urllib3==1.9.1; fi
- if [ $WITH_LIB = "urllib31.10" ] ; then pip install certifi urllib3==1.10.2; fi
script: python setup.py test

View File

@@ -1,4 +1,4 @@
Copyright (c) 2012-2014 Kevin McCarthy
Copyright (c) 2012-2015 Kevin McCarthy
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

4
MANIFEST.in Normal file
View File

@@ -0,0 +1,4 @@
include README.md
include LICENSE.txt
include tox.ini
recursive-include tests *

114
README.md
View File

@@ -2,7 +2,7 @@
![vcr.py](https://raw.github.com/kevin1024/vcrpy/master/vcr.png)
This is a Python version of [Ruby's VCR library](https://github.com/myronmarston/vcr).
This is a Python version of [Ruby's VCR library](https://github.com/vcr/vcr).
[![Build Status](https://secure.travis-ci.org/kevin1024/vcrpy.png?branch=master)](http://travis-ci.org/kevin1024/vcrpy)
[![Stories in Ready](https://badge.waffle.io/kevin1024/vcrpy.png?label=ready&title=Ready)](https://waffle.io/kevin1024/vcrpy)
@@ -24,6 +24,7 @@ VCR.py supports Python 2.6 and 2.7, 3.3, 3.4, and [pypy](http://pypy.org).
The following http libraries are supported:
* urllib2
* urllib3
* http.client (python3)
* requests (both 1.x and 2.x versions)
* httplib2
@@ -176,13 +177,13 @@ with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml') as cass:
The `Cassette` object exposes the following properties which I consider part of
the API. The fields are as follows:
* `requests`: A list of vcr.Request objects containing the requests made while
this cassette was being used, ordered by the order that the request was made.
* `requests`: A list of vcr.Request objects corresponding to the http requests
that were made during the recording of the cassette. The requests appear in the
order that they were originally processed.
* `responses`: A list of the responses made.
* `play_count`: The number of times this cassette has had a response played
back
* `all_played`: A boolean indicates whether all the responses have been
played back
* `play_count`: The number of times this cassette has played back a response.
* `all_played`: A boolean indicating whether all the responses have been
played back.
* `responses_of(request)`: Access the responses that match a given request
The `Request` object has the following properties:
@@ -215,7 +216,7 @@ Finally, register your class with VCR to use your new serializer.
```python
import vcr
BogoSerializer(object):
class BogoSerializer(object):
"""
Must implement serialize() and deserialize() methods
"""
@@ -293,12 +294,21 @@ with my_vcr.use_cassette('test.yml', filter_query_parameters=['api_key']):
requests.get('http://api.com/getdata?api_key=secretstring')
```
### Custom request filtering
### Filter information from HTTP post data
Use the `filter_post_data_parameters` configuration option with a list of post data
parameters to filter.
If neither of these covers your use case, you can register a callback that will
manipulate the HTTP request before adding it to the cassette. Use the
`before_record` configuration option to so this. Here is an
example that will never record requests to the /login endpoint.
```python
with my_vcr.use_cassette('test.yml', filter_post_data_parameters=['client_secret']):
requests.post('http://api.com/postdata', data={'api_key': 'secretstring'})
```
### Custom Request filtering
If none of these covers your request filtering needs, you can register a callback
that will manipulate the HTTP request before adding it to the cassette. Use the
`before_record` configuration option to so this. Here is an example that will
never record requests to the /login endpoint.
```python
def before_record_cb(request):
@@ -312,6 +322,40 @@ with my_vcr.use_cassette('test.yml'):
# your http code here
```
You can also mutate the response using this callback. For example, you could
remove all query parameters from any requests to the `'/login'` path.
```python
def scrub_login_request(request):
if request.path == '/login':
request.uri, _ = urllib.splitquery(response.uri)
return request
my_vcr = vcr.VCR(
before_record=scrub_login_request,
)
with my_vcr.use_cassette('test.yml'):
# your http code here
```
### Custom Response Filtering
VCR.py also suports response filtering with the `before_record_response` keyword
argument. It's usage is similar to that of `before_record`:
```python
def scrub_string(string, replacement=''):
def before_record_reponse(response):
return response['body']['string'] = response['body']['string'].replace(string, replacement)
return scrub_string
my_vcr = vcr.VCR(
before_record=scrub_string(settings.USERNAME, 'username'),
)
with my_vcr.use_cassette('test.yml'):
# your http code here
```
## Ignore requests
If you would like to completely ignore certain requests, you can do it in a
@@ -328,6 +372,23 @@ back from a cassette. VCR will completely ignore those requests as if it
didn't notice them at all, and they will continue to hit the server as if VCR
were not there.
## Custom Patches
If you use a custom `HTTPConnection` class, or otherwise make http
requests in a way that requires additional patching, you can use the
`custom_patches` keyword argument of the `VCR` and `Cassette` objects
to patch those objects whenever a cassette's context is entered. To
patch a custom version of `HTTPConnection` you can do something like
this:
```
import where_the_custom_https_connection_lives
from vcr.stubs import VCRHTTPSConnection
my_vcr = config.VCR(custom_patches=((where_the_custom_https_connection_lives, 'CustomHTTPSConnection', VCRHTTPSConnection),))
@my_vcr.use_cassette(...)
```
## Installation
VCR.py is a package on PyPI, so you can `pip install vcrpy` (first you may need
@@ -335,7 +396,7 @@ to `brew install libyaml` [[Homebrew](http://mxcl.github.com/homebrew/)])
## Ruby VCR compatibility
I'm not trying to match the format of the Ruby VCR YAML files. Cassettes
VCR.py does not aim to match the format of the Ruby VCR YAML files. Cassettes
generated by Ruby's VCR are not compatible with VCR.py.
## Running VCR's test suite
@@ -356,7 +417,7 @@ installed.
Also, in order for the boto tests to run, you will need an AWS key. Refer to
the [boto
documentation](http://boto.readthedocs.org/en/latest/getting_started.html) for
how to set this up. I have marked the boto tests as optional in Travis so you
how to set this up. I have marked the boto tests as optional in Travis so you
don't have to worry about them failing if you submit a pull request.
@@ -423,6 +484,29 @@ API in version 1.0.x
## Changelog
* 1.4.2 Fix a bug caused by requests 2.7 and chunked transfer encoding
* 1.4.1 Include README, tests, LICENSE in package. Thanks @ralphbean.
* 1.4.0 Filter post data parameters (thanks @eadmundo), support for
posting files through requests, inject_cassette kwarg to access
cassette from `use_cassette` decorated function,
`with_current_defaults` actually works (thanks @samstav).
* 1.3.0 Fix/add support for urllib3 (thanks @aisch), fix default
port for https (thanks @abhinav).
* 1.2.0 Add custom_patches argument to VCR/Cassette objects to allow
users to stub custom classes when cassettes become active.
* 1.1.4 Add force reset around calls to actual connection from stubs, to ensure
compatibility with the version of httplib/urlib2 in python 2.7.9.
* 1.1.3 Fix python3 headers field (thanks @rtaboada), fix boto test (thanks
@telaviv), fix new_episodes record mode (thanks @jashugan), fix Windows
connectionpool stub bug (thanks @gazpachoking), add support for requests 2.5
* 1.1.2 Add urllib==1.7.1 support. Make json serialize error handling correct
Improve logging of match failures.
* 1.1.1 Use function signature preserving `wrapt.decorator` to write the
decorator version of use_cassette in order to ensure compatibility with
py.test fixtures and python 2. Move all request filtering into the
`before_record_callable`.
* 1.1.0 Add `before_record_response`. Fix several bugs related to the context
management of cassettes.
* 1.0.3: Fix an issue with requests 2.4 and make sure case sensitivity is
consistent across python versions
* 1.0.2: Fix an issue with requests 2.3

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python
import sys
from setuptools import setup
from setuptools import setup, find_packages
from setuptools.command.test import test as TestCommand
@@ -20,7 +20,7 @@ class PyTest(TestCommand):
setup(
name='vcrpy',
version='1.0.3',
version='1.4.2',
description=(
"Automatically mock your HTTP interactions to simplify and "
"speed up testing"
@@ -28,20 +28,9 @@ setup(
author='Kevin McCarthy',
author_email='me@kevinmccarthy.org',
url='https://github.com/kevin1024/vcrpy',
packages=[
'vcr',
'vcr.stubs',
'vcr.compat',
'vcr.persisters',
'vcr.serializers',
],
package_dir={
'vcr': 'vcr',
'vcr.stubs': 'vcr/stubs',
'vcr.compat': 'vcr/compat',
'vcr.persisters': 'vcr/persisters',
},
install_requires=['PyYAML', 'contextdecorator', 'six'],
packages=find_packages(exclude=("tests*",)),
install_requires=['PyYAML', 'mock', 'six', 'contextlib2',
'wrapt', 'backport_collections'],
license='MIT',
tests_require=['pytest', 'mock', 'pytest-localserver'],
cmdclass={'test': PyTest},
@@ -54,5 +43,5 @@ setup(
'Topic :: Software Development :: Testing',
'Topic :: Internet :: WWW/HTTP',
'License :: OSI Approved :: MIT License',
],
]
)

View File

@@ -1,5 +1,5 @@
'''Basic tests about cassettes'''
# coding=utf-8
# -*- coding: utf-8 -*-
'''Basic tests for cassettes'''
# External imports
import os

View File

@@ -14,7 +14,7 @@ def test_boto_stubs(tmpdir):
from boto.https_connection import CertValidatingHTTPSConnection
from vcr.stubs.boto_stubs import VCRCertValidatingHTTPSConnection
# Prove that the class was patched by the stub and that we can instantiate it.
assert CertValidatingHTTPSConnection is VCRCertValidatingHTTPSConnection
assert issubclass(CertValidatingHTTPSConnection, VCRCertValidatingHTTPSConnection)
CertValidatingHTTPSConnection('hostname.does.not.matter')
def test_boto_without_vcr():

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
'''Basic tests about save behavior'''
# coding=utf-8
# External imports
import os

View File

@@ -1,6 +1,7 @@
import base64
import pytest
from six.moves.urllib.request import urlopen, Request
from six.moves.urllib.parse import urlencode
from six.moves.urllib.error import HTTPError
import vcr
@@ -54,15 +55,30 @@ def test_filter_querystring(tmpdir):
urlopen(url)
assert 'foo' not in cass.requests[0].url
def test_filter_post_data(tmpdir):
url = 'http://httpbin.org/post'
data = urlencode({'id': 'secret', 'foo': 'bar'}).encode('utf-8')
cass_file = str(tmpdir.join('filter_pd.yaml'))
with vcr.use_cassette(cass_file, filter_post_data_parameters=['id']):
urlopen(url, data)
with vcr.use_cassette(cass_file, filter_post_data_parameters=['id']) as cass:
assert b'id=secret' not in cass.requests[0].body
def test_filter_callback(tmpdir):
url = 'http://httpbin.org/get'
cass_file = str(tmpdir.join('basic_auth_filter.yaml'))
def before_record_cb(request):
if request.path != '/get':
return request
my_vcr = vcr.VCR(
before_record = before_record_cb,
)
# Test the legacy keyword.
my_vcr = vcr.VCR(before_record=before_record_cb)
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
urlopen(url)
assert len(cass) == 0
my_vcr = vcr.VCR(before_record_request=before_record_cb)
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
urlopen(url)
assert len(cass) == 0

View File

@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
'''Integration tests with httplib2'''
# coding=utf-8
# External imports
from six.moves.urllib_parse import urlencode
@@ -54,7 +54,7 @@ def test_response_headers(scheme, tmpdir):
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
resp, _ = httplib2.Http().request(url)
assert headers == resp.items()
assert set(headers) == set(resp.items())
def test_multiple_requests(scheme, tmpdir):

View File

@@ -72,6 +72,31 @@ def test_new_episodes_record_mode(tmpdir):
assert len(cass.responses) == 2
def test_new_episodes_record_mode_two_times(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
url = 'http://httpbin.org/bytes/1024'
with vcr.use_cassette(testfile, record_mode="new_episodes"):
# cassette file doesn't exist, so create.
original_first_response = urlopen(url).read()
with vcr.use_cassette(testfile, record_mode="new_episodes"):
# make the same request again
assert urlopen(url).read() == original_first_response
# in the "new_episodes" record mode, we can add the same request
# to the cassette without repercussions
original_second_response = urlopen(url).read()
with vcr.use_cassette(testfile, record_mode="once"):
# make the same request again
assert urlopen(url).read() == original_first_response
assert urlopen(url).read() == original_second_response
# now that we are back in once mode, this should raise
# an error.
with pytest.raises(Exception):
urlopen(url).read()
def test_all_record_mode(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))

View File

@@ -9,3 +9,14 @@ def test_recorded_request_uri_with_redirected_request(tmpdir):
assert cass.requests[0].uri == 'http://httpbin.org/redirect/3'
assert cass.requests[3].uri == 'http://httpbin.org/get'
assert len(cass) == 4
def test_records_multiple_header_values(tmpdir, httpserver):
httpserver.serve_content('Hello!', headers=[('foo', 'bar'), ('foo', 'baz')])
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
assert len(cass) == 0
urlopen(httpserver.url)
assert len(cass) == 1
assert cass.responses[0]['headers']['foo'] == ['bar', 'baz']

View File

@@ -1,53 +1,47 @@
# -*- coding: utf-8 -*-
'''Test requests' interaction with vcr'''
# coding=utf-8
import os
import pytest
import vcr
from assertions import (
assert_cassette_empty,
assert_cassette_has_one_response,
assert_is_json
)
from assertions import assert_cassette_empty, assert_is_json
requests = pytest.importorskip("requests")
@pytest.fixture(params=["https", "http"])
def scheme(request):
"""
Fixture that returns both http and https
"""
'''Fixture that returns both http and https.'''
return request.param
def test_status_code(scheme, tmpdir):
'''Ensure that we can read the status code'''
url = scheme + '://httpbin.org/'
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))):
status_code = requests.get(url).status_code
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))):
assert status_code == requests.get(url).status_code
def test_headers(scheme, tmpdir):
'''Ensure that we can read the headers back'''
url = scheme + '://httpbin.org/'
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))):
headers = requests.get(url).headers
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))):
assert headers == requests.get(url).headers
def test_body(tmpdir, scheme):
'''Ensure the responses are all identical enough'''
url = scheme + '://httpbin.org/bytes/1024'
with vcr.use_cassette(str(tmpdir.join('body.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('body.yaml'))):
content = requests.get(url).content
with vcr.use_cassette(str(tmpdir.join('body.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('body.yaml'))):
assert content == requests.get(url).content
@@ -55,10 +49,10 @@ def test_auth(tmpdir, scheme):
'''Ensure that we can handle basic auth'''
auth = ('user', 'passwd')
url = scheme + '://httpbin.org/basic-auth/user/passwd'
with vcr.use_cassette(str(tmpdir.join('auth.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('auth.yaml'))):
one = requests.get(url, auth=auth)
with vcr.use_cassette(str(tmpdir.join('auth.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('auth.yaml'))):
two = requests.get(url, auth=auth)
assert one.content == two.content
assert one.status_code == two.status_code
@@ -81,10 +75,10 @@ def test_post(tmpdir, scheme):
'''Ensure that we can post and cache the results'''
data = {'key1': 'value1', 'key2': 'value2'}
url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))):
req1 = requests.post(url, data).content
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))):
req2 = requests.post(url, data).content
assert req1 == req2
@@ -93,7 +87,7 @@ def test_post(tmpdir, scheme):
def test_redirects(tmpdir, scheme):
'''Ensure that we can handle redirects'''
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))):
content = requests.get(url).content
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))) as cass:
@@ -124,11 +118,11 @@ def test_gzip(tmpdir, scheme):
url = scheme + '://httpbin.org/gzip'
response = requests.get(url)
with vcr.use_cassette(str(tmpdir.join('gzip.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('gzip.yaml'))):
response = requests.get(url)
assert_is_json(response.content)
with vcr.use_cassette(str(tmpdir.join('gzip.yaml'))) as cass:
with vcr.use_cassette(str(tmpdir.join('gzip.yaml'))):
assert_is_json(response.content)
@@ -143,9 +137,83 @@ def test_session_and_connection_close(tmpdir, scheme):
with vcr.use_cassette(str(tmpdir.join('session_connection_closed.yaml'))):
session = requests.session()
resp = session.get('http://httpbin.org/get', headers={'Connection': 'close'})
resp = session.get('http://httpbin.org/get', headers={'Connection': 'close'})
session.get('http://httpbin.org/get', headers={'Connection': 'close'})
session.get('http://httpbin.org/get', headers={'Connection': 'close'})
def test_https_with_cert_validation_disabled(tmpdir):
with vcr.use_cassette(str(tmpdir.join('cert_validation_disabled.yaml'))):
requests.get('https://httpbin.org', verify=False)
def test_session_can_make_requests_after_requests_unpatched(tmpdir):
with vcr.use_cassette(str(tmpdir.join('test_session_after_unpatched.yaml'))):
session = requests.session()
session.get('http://httpbin.org/get')
with vcr.use_cassette(str(tmpdir.join('test_session_after_unpatched.yaml'))):
session = requests.session()
session.get('http://httpbin.org/get')
session.get('http://httpbin.org/status/200')
def test_session_created_before_use_cassette_is_patched(tmpdir, scheme):
url = scheme + '://httpbin.org/bytes/1024'
# Record arbitrary, random data to the cassette
with vcr.use_cassette(str(tmpdir.join('session_created_outside.yaml'))):
session = requests.session()
body = session.get(url).content
# Create a session outside of any cassette context manager
session = requests.session()
# Make a request to make sure that a connectionpool is instantiated
session.get(scheme + '://httpbin.org/get')
with vcr.use_cassette(str(tmpdir.join('session_created_outside.yaml'))):
# These should only be the same if the patching succeeded.
assert session.get(url).content == body
def test_nested_cassettes_with_session_created_before_nesting(scheme, tmpdir):
'''
This tests ensures that a session that was created while one cassette was
active is patched to the use the responses of a second cassette when it
is enabled.
'''
url = scheme + '://httpbin.org/bytes/1024'
with vcr.use_cassette(str(tmpdir.join('first_nested.yaml'))):
session = requests.session()
first_body = session.get(url).content
with vcr.use_cassette(str(tmpdir.join('second_nested.yaml'))):
second_body = session.get(url).content
third_body = requests.get(url).content
with vcr.use_cassette(str(tmpdir.join('second_nested.yaml'))):
session = requests.session()
assert session.get(url).content == second_body
with vcr.use_cassette(str(tmpdir.join('first_nested.yaml'))):
assert session.get(url).content == first_body
assert session.get(url).content == third_body
# Make sure that the session can now get content normally.
session.get('http://www.reddit.com')
def test_post_file(tmpdir, scheme):
'''Ensure that we handle posting a file.'''
url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('post_file.yaml'))) as cass:
# Don't use 2.7+ only style ',' separated with here because we support python 2.6
with open('tox.ini') as f:
original_response = requests.post(url, f).content
# This also tests that we do the right thing with matching the body when they are files.
with vcr.use_cassette(str(tmpdir.join('post_file.yaml')),
match_on=('method', 'scheme', 'host', 'port', 'path', 'query', 'body')) as cass:
with open('tox.ini', 'rb') as f:
tox_content = f.read()
assert cass.requests[0].body.read() == tox_content
with open('tox.ini', 'rb') as f:
new_response = requests.post(url, f).content
assert original_response == new_response

View File

@@ -24,3 +24,19 @@ def test_case_insensitivity(tmpdir):
# behavior should be the same both inside and outside
assert outside == inside == inside2
def _multiple_header_value(httpserver):
conn = httplib.HTTPConnection('%s:%s' % httpserver.server_address)
conn.request('GET', "/")
r = conn.getresponse()
return r.getheader('foo')
def test_multiple_headers(tmpdir, httpserver):
testfile = str(tmpdir.join('multiple_headers.yaml'))
httpserver.serve_content('Hello!', headers=[('foo', 'bar'), ('foo', 'baz')])
outside = _multiple_header_value(httpserver)
with vcr.use_cassette(testfile):
inside = _multiple_header_value(httpserver)
assert outside == inside

View File

@@ -1,8 +1,5 @@
# -*- coding: utf-8 -*-
'''Integration tests with urllib2'''
# coding=utf-8
# External imports
import os
import pytest
from six.moves.urllib.request import urlopen
@@ -11,7 +8,7 @@ from six.moves.urllib_parse import urlencode
# Internal imports
import vcr
from assertions import assert_cassette_empty, assert_cassette_has_one_response
from assertions import assert_cassette_has_one_response
@pytest.fixture(params=["https", "http"])

View File

@@ -0,0 +1,148 @@
'''Integration tests with urllib3'''
# coding=utf-8
import pytest
import vcr
from assertions import assert_cassette_empty, assert_is_json
certifi = pytest.importorskip("certifi")
urllib3 = pytest.importorskip("urllib3")
@pytest.fixture(params=["https", "http"])
def scheme(request):
"""
Fixture that returns both http and https
"""
return request.param
@pytest.fixture(scope='module')
def verify_pool_mgr():
return urllib3.PoolManager(
cert_reqs='CERT_REQUIRED', # Force certificate check.
ca_certs=certifi.where()
)
@pytest.fixture(scope='module')
def pool_mgr():
return urllib3.PoolManager()
def test_status_code(scheme, tmpdir, verify_pool_mgr):
'''Ensure that we can read the status code'''
url = scheme + '://httpbin.org/'
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))):
status_code = verify_pool_mgr.request('GET', url).status
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))):
assert status_code == verify_pool_mgr.request('GET', url).status
def test_headers(scheme, tmpdir, verify_pool_mgr):
'''Ensure that we can read the headers back'''
url = scheme + '://httpbin.org/'
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))):
headers = verify_pool_mgr.request('GET', url).headers
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))):
assert headers == verify_pool_mgr.request('GET', url).headers
def test_body(tmpdir, scheme, verify_pool_mgr):
'''Ensure the responses are all identical enough'''
url = scheme + '://httpbin.org/bytes/1024'
with vcr.use_cassette(str(tmpdir.join('body.yaml'))):
content = verify_pool_mgr.request('GET', url).data
with vcr.use_cassette(str(tmpdir.join('body.yaml'))):
assert content == verify_pool_mgr.request('GET', url).data
def test_auth(tmpdir, scheme, verify_pool_mgr):
'''Ensure that we can handle basic auth'''
auth = ('user', 'passwd')
headers = urllib3.util.make_headers(basic_auth='{0}:{1}'.format(*auth))
url = scheme + '://httpbin.org/basic-auth/user/passwd'
with vcr.use_cassette(str(tmpdir.join('auth.yaml'))):
one = verify_pool_mgr.request('GET', url, headers=headers)
with vcr.use_cassette(str(tmpdir.join('auth.yaml'))):
two = verify_pool_mgr.request('GET', url, headers=headers)
assert one.data == two.data
assert one.status == two.status
def test_auth_failed(tmpdir, scheme, verify_pool_mgr):
'''Ensure that we can save failed auth statuses'''
auth = ('user', 'wrongwrongwrong')
headers = urllib3.util.make_headers(basic_auth='{0}:{1}'.format(*auth))
url = scheme + '://httpbin.org/basic-auth/user/passwd'
with vcr.use_cassette(str(tmpdir.join('auth-failed.yaml'))) as cass:
# Ensure that this is empty to begin with
assert_cassette_empty(cass)
one = verify_pool_mgr.request('GET', url, headers=headers)
two = verify_pool_mgr.request('GET', url, headers=headers)
assert one.data == two.data
assert one.status == two.status == 401
def test_post(tmpdir, scheme, verify_pool_mgr):
'''Ensure that we can post and cache the results'''
data = {'key1': 'value1', 'key2': 'value2'}
url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('verify_pool_mgr.yaml'))):
req1 = verify_pool_mgr.request('POST', url, data).data
with vcr.use_cassette(str(tmpdir.join('verify_pool_mgr.yaml'))):
req2 = verify_pool_mgr.request('POST', url, data).data
assert req1 == req2
def test_redirects(tmpdir, scheme, verify_pool_mgr):
'''Ensure that we can handle redirects'''
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
with vcr.use_cassette(str(tmpdir.join('verify_pool_mgr.yaml'))):
content = verify_pool_mgr.request('GET', url).data
with vcr.use_cassette(str(tmpdir.join('verify_pool_mgr.yaml'))) as cass:
assert content == verify_pool_mgr.request('GET', url).data
# Ensure that we've now cached *two* responses. One for the redirect
# and one for the final fetch
assert len(cass) == 2
assert cass.play_count == 2
def test_cross_scheme(tmpdir, scheme, verify_pool_mgr):
'''Ensure that requests between schemes are treated separately'''
# First fetch a url under http, and then again under https and then
# ensure that we haven't served anything out of cache, and we have two
# requests / response pairs in the cassette
with vcr.use_cassette(str(tmpdir.join('cross_scheme.yaml'))) as cass:
verify_pool_mgr.request('GET', 'https://httpbin.org/')
verify_pool_mgr.request('GET', 'http://httpbin.org/')
assert cass.play_count == 0
assert len(cass) == 2
def test_gzip(tmpdir, scheme, verify_pool_mgr):
'''
Ensure that requests (actually urllib3) is able to automatically decompress
the response body
'''
url = scheme + '://httpbin.org/gzip'
response = verify_pool_mgr.request('GET', url)
with vcr.use_cassette(str(tmpdir.join('gzip.yaml'))):
response = verify_pool_mgr.request('GET', url)
assert_is_json(response.data)
with vcr.use_cassette(str(tmpdir.join('gzip.yaml'))):
assert_is_json(response.data)
def test_https_with_cert_validation_disabled(tmpdir, pool_mgr):
with vcr.use_cassette(str(tmpdir.join('cert_validation_disabled.yaml'))):
pool_mgr.request('GET', 'https://httpbin.org')

View File

@@ -64,3 +64,11 @@ def test_cookies(tmpdir):
r1 = s.get("http://httpbin.org/cookies/set?k1=v1&k2=v2")
r2 = s.get("http://httpbin.org/cookies")
assert len(r2.json()['cookies']) == 2
def test_amazon_doctype(tmpdir):
# amazon gzips its homepage. For some reason, in requests 2.7, it's not
# getting gunzipped.
with vcr.use_cassette(str(tmpdir.join('amz.yml'))):
r = requests.get('http://www.amazon.com')
assert 'html' in r.text

View File

@@ -1,8 +1,16 @@
import copy
from six.moves import http_client as httplib
import contextlib2
import mock
import pytest
import yaml
import mock
from vcr.cassette import Cassette
from vcr.errors import UnhandledHTTPRequestError
from vcr.patch import force_reset
from vcr.stubs import VCRHTTPSConnection
def test_cassette_load(tmpdir):
@@ -68,6 +76,46 @@ def test_cassette_cant_read_same_request_twice():
a.play_response('foo')
def make_get_request():
conn = httplib.HTTPConnection("www.python.org")
conn.request("GET", "/index.html")
return conn.getresponse()
@mock.patch('vcr.cassette.requests_match', return_value=True)
@mock.patch('vcr.cassette.load_cassette', lambda *args, **kwargs: (('foo',), (mock.MagicMock(),)))
@mock.patch('vcr.cassette.Cassette.can_play_response_for', return_value=True)
@mock.patch('vcr.stubs.VCRHTTPResponse')
def test_function_decorated_with_use_cassette_can_be_invoked_multiple_times(*args):
decorated_function = Cassette.use('test')(make_get_request)
for i in range(2):
decorated_function()
def test_arg_getter_functionality():
arg_getter = mock.Mock(return_value=('test', {}))
context_decorator = Cassette.use_arg_getter(arg_getter)
with context_decorator as cassette:
assert cassette._path == 'test'
arg_getter.return_value = ('other', {})
with context_decorator as cassette:
assert cassette._path == 'other'
arg_getter.return_value = ('', {'filter_headers': ('header_name',)})
@context_decorator
def function():
pass
with mock.patch.object(Cassette, 'load', return_value=mock.MagicMock(inject=False)) as cassette_load:
function()
cassette_load.assert_called_once_with(arg_getter.return_value[0],
**arg_getter.return_value[1])
def test_cassette_not_all_played():
a = Cassette('test')
a.append('foo', 'bar')
@@ -80,3 +128,76 @@ def test_cassette_all_played():
a.append('foo', 'bar')
a.play_response('foo')
assert a.all_played
def test_before_record_response():
before_record_response = mock.Mock(return_value='mutated')
cassette = Cassette('test', before_record_response=before_record_response)
cassette.append('req', 'res')
before_record_response.assert_called_once_with('res')
assert cassette.responses[0] == 'mutated'
def assert_get_response_body_is(value):
conn = httplib.HTTPConnection("www.python.org")
conn.request("GET", "/index.html")
assert conn.getresponse().read().decode('utf8') == value
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
@mock.patch('vcr.cassette.Cassette.can_play_response_for', return_value=True)
@mock.patch('vcr.cassette.Cassette._save', return_value=True)
def test_nesting_cassette_context_managers(*args):
first_response = {'body': {'string': b'first_response'}, 'headers': {},
'status': {'message': 'm', 'code': 200}}
second_response = copy.deepcopy(first_response)
second_response['body']['string'] = b'second_response'
with contextlib2.ExitStack() as exit_stack:
first_cassette = exit_stack.enter_context(Cassette.use('test'))
exit_stack.enter_context(mock.patch.object(first_cassette, 'play_response',
return_value=first_response))
assert_get_response_body_is('first_response')
# Make sure a second cassette can supercede the first
with Cassette.use('test') as second_cassette:
with mock.patch.object(second_cassette, 'play_response', return_value=second_response):
assert_get_response_body_is('second_response')
# Now the first cassette should be back in effect
assert_get_response_body_is('first_response')
def test_nesting_context_managers_by_checking_references_of_http_connection():
original = httplib.HTTPConnection
with Cassette.use('test'):
first_cassette_HTTPConnection = httplib.HTTPConnection
with Cassette.use('test'):
second_cassette_HTTPConnection = httplib.HTTPConnection
assert second_cassette_HTTPConnection is not first_cassette_HTTPConnection
with Cassette.use('test'):
assert httplib.HTTPConnection is not second_cassette_HTTPConnection
with force_reset():
assert httplib.HTTPConnection is original
assert httplib.HTTPConnection is second_cassette_HTTPConnection
assert httplib.HTTPConnection is first_cassette_HTTPConnection
def test_custom_patchers():
class Test(object):
attribute = None
with Cassette.use('custom_patches', custom_patches=((Test, 'attribute', VCRHTTPSConnection),)):
assert issubclass(Test.attribute, VCRHTTPSConnection)
assert VCRHTTPSConnection is not Test.attribute
old_attribute = Test.attribute
with Cassette.use('custom_patches', custom_patches=((Test, 'attribute', VCRHTTPSConnection),)):
assert issubclass(Test.attribute, VCRHTTPSConnection)
assert VCRHTTPSConnection is not Test.attribute
assert Test.attribute is not old_attribute
assert issubclass(Test.attribute, VCRHTTPSConnection)
assert VCRHTTPSConnection is not Test.attribute
assert Test.attribute is old_attribute

View File

@@ -1,37 +1,69 @@
from vcr.filters import _remove_headers, _remove_query_parameters
from vcr.filters import (
remove_headers,
remove_query_parameters,
remove_post_data_parameters
)
from vcr.request import Request
def test_remove_headers():
headers = {'hello': ['goodbye'], 'secret': ['header']}
request = Request('GET', 'http://google.com', '', headers)
_remove_headers(request, ['secret'])
remove_headers(request, ['secret'])
assert request.headers == {'hello': 'goodbye'}
def test_remove_headers_empty():
headers = {'hello': 'goodbye', 'secret': 'header'}
request = Request('GET', 'http://google.com', '', headers)
_remove_headers(request, [])
remove_headers(request, [])
assert request.headers == headers
def test_remove_query_parameters():
uri = 'http://g.com/?q=cowboys&w=1'
request = Request('GET', uri, '', {})
_remove_query_parameters(request, ['w'])
remove_query_parameters(request, ['w'])
assert request.uri == 'http://g.com/?q=cowboys'
def test_remove_all_query_parameters():
uri = 'http://g.com/?q=cowboys&w=1'
request = Request('GET', uri, '', {})
_remove_query_parameters(request, ['w', 'q'])
remove_query_parameters(request, ['w', 'q'])
assert request.uri == 'http://g.com/'
def test_remove_nonexistent_query_parameters():
uri = 'http://g.com/'
request = Request('GET', uri, '', {})
_remove_query_parameters(request, ['w', 'q'])
remove_query_parameters(request, ['w', 'q'])
assert request.uri == 'http://g.com/'
def test_remove_post_data_parameters():
body = b'id=secret&foo=bar'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b'foo=bar'
def test_preserve_multiple_post_data_parameters():
body = b'id=secret&foo=bar&foo=baz'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b'foo=bar&foo=baz'
def test_remove_all_post_data_parameters():
body = b'id=secret&foo=bar'
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id', 'foo'])
assert request.body == b''
def test_remove_nonexistent_post_data_parameters():
body = b''
request = Request('POST', 'http://google.com', body, {})
remove_post_data_parameters(request, ['id'])
assert request.body == b''

View File

@@ -21,8 +21,8 @@ def test_headers():
('http://go.com/', 80),
('http://go.com:80/', 80),
('http://go.com:3000/', 3000),
('https://go.com/', 433),
('https://go.com:433/', 433),
('https://go.com/', 443),
('https://go.com:443/', 443),
('https://go.com:3000/', 3000),
])
def test_port(uri, expected_port):

View File

@@ -0,0 +1,68 @@
# coding: UTF-8
from vcr.stubs import VCRHTTPResponse
def test_response_should_have_headers_field():
recorded_response = {
"status": {
"message": "OK",
"code": 200
},
"headers": {
"content-length": ["0"],
"server": ["gunicorn/18.0"],
"connection": ["Close"],
"access-control-allow-credentials": ["true"],
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
"access-control-allow-origin": ["*"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b""
}
}
response = VCRHTTPResponse(recorded_response)
assert response.headers is not None
def test_response_headers_should_be_equal_to_msg():
recorded_response = {
"status": {
"message": b"OK",
"code": 200
},
"headers": {
"content-length": ["0"],
"server": ["gunicorn/18.0"],
"connection": ["Close"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b""
}
}
response = VCRHTTPResponse(recorded_response)
assert response.headers == response.msg
def test_response_headers_should_have_correct_values():
recorded_response = {
"status": {
"message": "OK",
"code": 200
},
"headers": {
"content-length": ["10806"],
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b""
}
}
response = VCRHTTPResponse(recorded_response)
assert response.headers.get('content-length') == "10806"
assert response.headers.get('date') == "Fri, 24 Oct 2014 18:35:37 GMT"

View File

@@ -1,21 +1,35 @@
import mock
import pytest
from vcr.serialize import deserialize
from vcr.serializers import yamlserializer, jsonserializer
def test_deserialize_old_yaml_cassette():
with open('tests/fixtures/migration/old_cassette.yaml', 'r') as f:
with pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
def test_deserialize_old_json_cassette():
with open('tests/fixtures/migration/old_cassette.json', 'r') as f:
with pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
def test_deserialize_new_yaml_cassette():
with open('tests/fixtures/migration/new_cassette.yaml', 'r') as f:
deserialize(f.read(), yamlserializer)
def test_deserialize_new_json_cassette():
with open('tests/fixtures/migration/new_cassette.json', 'r') as f:
deserialize(f.read(), jsonserializer)
@mock.patch.object(jsonserializer.json, 'dumps',
side_effect=UnicodeDecodeError('utf-8', b'unicode error in serialization',
0, 10, 'blew up'))
def test_serialize_constructs_UnicodeDecodeError(mock_dumps):
with pytest.raises(UnicodeDecodeError):
jsonserializer.serialize({})

130
tests/unit/test_vcr.py Normal file
View File

@@ -0,0 +1,130 @@
import mock
import pytest
from vcr import VCR, use_cassette
from vcr.request import Request
from vcr.stubs import VCRHTTPSConnection
def test_vcr_use_cassette():
record_mode = mock.Mock()
test_vcr = VCR(record_mode=record_mode)
with mock.patch('vcr.cassette.Cassette.load', return_value=mock.MagicMock(inject=False)) as mock_cassette_load:
@test_vcr.use_cassette('test')
def function():
pass
assert mock_cassette_load.call_count == 0
function()
assert mock_cassette_load.call_args[1]['record_mode'] is record_mode
# Make sure that calls to function now use cassettes with the
# new filter_header_settings
test_vcr.record_mode = mock.Mock()
function()
assert mock_cassette_load.call_args[1]['record_mode'] == test_vcr.record_mode
# Ensure that explicitly provided arguments still supercede
# those on the vcr.
new_record_mode = mock.Mock()
with test_vcr.use_cassette('test', record_mode=new_record_mode) as cassette:
assert cassette.record_mode == new_record_mode
def test_vcr_before_record_request_params():
base_path = 'http://httpbin.org/'
def before_record_cb(request):
if request.path != '/get':
return request
test_vcr = VCR(filter_headers=('cookie',), before_record_request=before_record_cb,
ignore_hosts=('www.test.com',), ignore_localhost=True,
filter_query_parameters=('foo',))
with test_vcr.use_cassette('test') as cassette:
assert cassette.filter_request(Request('GET', base_path + 'get', '', {})) is None
assert cassette.filter_request(Request('GET', base_path + 'get2', '', {})) is not None
assert cassette.filter_request(Request('GET', base_path + '?foo=bar', '', {})).query == []
assert cassette.filter_request(
Request('GET', base_path + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})).headers == {'other': 'fun'}
assert cassette.filter_request(Request('GET', base_path + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})).headers == {'other': 'fun'}
assert cassette.filter_request(Request('GET', 'http://www.test.com' + '?foo=bar', '',
{'cookie': 'test', 'other': 'fun'})) is None
with test_vcr.use_cassette('test', before_record_request=None) as cassette:
# Test that before_record can be overwritten with
assert cassette.filter_request(Request('GET', base_path + 'get', '', {})) is not None
@pytest.fixture
def random_fixture():
return 1
@use_cassette('test')
def test_fixtures_with_use_cassette(random_fixture):
# Applying a decorator to a test function that requests features can cause
# problems if the decorator does not preserve the signature of the original
# test function.
# This test ensures that use_cassette preserves the signature of
# the original test function, and thus that use_cassette is
# compatible with py.test fixtures. It is admittedly a bit strange
# because the test would never even run if the relevant feature
# were broken.
pass
def test_custom_patchers():
class Test(object):
attribute = None
attribute2 = None
test_vcr = VCR(custom_patches=((Test, 'attribute', VCRHTTPSConnection),))
with test_vcr.use_cassette('custom_patches'):
assert issubclass(Test.attribute, VCRHTTPSConnection)
assert VCRHTTPSConnection is not Test.attribute
with test_vcr.use_cassette('custom_patches', custom_patches=((Test, 'attribute2', VCRHTTPSConnection),)):
assert issubclass(Test.attribute, VCRHTTPSConnection)
assert VCRHTTPSConnection is not Test.attribute
assert Test.attribute is Test.attribute2
def test_inject_cassette():
vcr = VCR(inject_cassette=True)
@vcr.use_cassette('test', record_mode='once')
def with_cassette_injected(cassette):
assert cassette.record_mode == 'once'
@vcr.use_cassette('test', record_mode='once', inject_cassette=False)
def without_cassette_injected():
pass
with_cassette_injected()
without_cassette_injected()
def test_with_current_defaults():
vcr = VCR(inject_cassette=True, record_mode='once')
@vcr.use_cassette('test', with_current_defaults=False)
def changing_defaults(cassette, checks):
checks(cassette)
@vcr.use_cassette('test', with_current_defaults=True)
def current_defaults(cassette, checks):
checks(cassette)
def assert_record_mode_once(cassette):
assert cassette.record_mode == 'once'
def assert_record_mode_all(cassette):
assert cassette.record_mode == 'all'
changing_defaults(assert_record_mode_once)
current_defaults(assert_record_mode_once)
vcr.record_mode = 'all'
changing_defaults(assert_record_mode_all)
current_defaults(assert_record_mode_once)

268
tox.ini
View File

@@ -1,259 +1,29 @@
# Tox (http://tox.testrun.org/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist =
py26,
py27,
py33,
py34,
pypy,
py26requests24,
py27requests24,
py34requests24,
pypyrequests24,
py26requests23,
py27requests23,
py34requests23,
pypyrequests23,
py26requests22,
py27requests22,
py34requests22,
pypyrequests22,
py26requests1,
py27requests1,
py33requests1,
pypyrequests1,
py26httplib2,
py27httplib2,
py33httplib2,
py34httplib2,
pypyhttplib2,
envlist = {py26,py27,py33,py34,pypy}-{requests27,requests26,requests25,requests24,requests23,requests22,requests1,httplib2,urllib317,urllib319,urllib3110,boto}
[testenv]
commands =
py.test {posargs}
basepython =
py26: python2.6
py27: python2.7
py33: python3.3
py34: python3.4
pypy: pypy
deps =
mock
pytest
pytest-localserver
PyYAML
[testenv:py26requests1]
basepython = python2.6
deps =
mock
pytest
pytest-localserver
PyYAML
requests==1.2.3
[testenv:py27requests1]
basepython = python2.7
deps =
mock
pytest
pytest-localserver
PyYAML
requests==1.2.3
[testenv:py33requests1]
basepython = python3.3
deps =
mock
pytest
pytest-localserver
PyYAML
requests==1.2.3
[testenv:pypyrequests1]
basepython = pypy
deps =
mock
pytest
pytest-localserver
PyYAML
requests==1.2.3
[testenv:py26requests24]
basepython = python2.6
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.4.0
[testenv:py27requests24]
basepython = python2.7
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.4.0
[testenv:py33requests24]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.4.0
[testenv:py34requests24]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.4.0
[testenv:pypyrequests24]
basepython = pypy
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.4.0
[testenv:py26requests23]
basepython = python2.6
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.3.0
[testenv:py27requests23]
basepython = python2.7
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.3.0
[testenv:py33requests23]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.3.0
[testenv:py34requests23]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.3.0
[testenv:pypyrequests23]
basepython = pypy
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.3.0
[testenv:py26requests22]
basepython = python2.6
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.2.1
[testenv:py27requests22]
basepython = python2.7
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.2.1
[testenv:py33requests22]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.2.1
[testenv:py34requests22]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.2.1
[testenv:pypyrequests22]
basepython = pypy
deps =
mock
pytest
pytest-localserver
PyYAML
requests==2.2.1
[testenv:py26httplib2]
basepython = python2.6
deps =
mock
pytest
pytest-localserver
PyYAML
httplib2
[testenv:py27httplib2]
basepython = python2.7
deps =
mock
pytest
pytest-localserver
PyYAML
httplib2
[testenv:py33httplib2]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
httplib2
[testenv:py34httplib2]
basepython = python3.4
deps =
mock
pytest
pytest-localserver
PyYAML
httplib2
[testenv:pypyhttplib2]
basepython = pypy
deps =
mock
pytest
pytest-localserver
PyYAML
httplib2
requests1: requests==1.2.3
requests27: requests==2.7.0
requests22: requests==2.6.0
requests25: requests==2.5.0
requests24: requests==2.4.0
requests23: requests==2.3.0
requests22: requests==2.2.1
httplib2: httplib2
urllib317: urllib3==1.7.1
urllib319: urllib3==1.9.1
urllib3110: urllib3==1.10.2
boto: boto

View File

@@ -2,7 +2,6 @@ import logging
from .config import VCR
# Set default logging handler to avoid "No handler found" warnings.
import logging
try: # Python 2.7+
from logging import NullHandler
except ImportError:
@@ -10,10 +9,9 @@ except ImportError:
def emit(self, record):
pass
logging.getLogger(__name__).addHandler(NullHandler())
default_vcr = VCR()
def use_cassette(path, **kwargs):
return default_vcr.use_cassette(path, **kwargs)
use_cassette = default_vcr.use_cassette

View File

@@ -1,60 +1,110 @@
'''The container for recorded requests and responses'''
"""The container for recorded requests and responses"""
import logging
import contextlib2
import wrapt
try:
from collections import Counter
except ImportError:
from .compat.counter import Counter
from contextdecorator import ContextDecorator
from backport_collections import Counter
# Internal imports
from .patch import install, reset
from .patch import CassettePatcherBuilder
from .persist import load_cassette, save_cassette
from .filters import filter_request
from .serializers import yamlserializer
from .matchers import requests_match, uri, method
from .errors import UnhandledHTTPRequestError
class Cassette(ContextDecorator):
'''A container for recorded requests and responses'''
log = logging.getLogger(__name__)
class CassetteContextDecorator(object):
"""Context manager/decorator that handles installing the cassette and
removing cassettes.
This class defers the creation of a new cassette instance until the point at
which it is installed by context manager or decorator. The fact that a new
cassette is used with each application prevents the state of any cassette
from interfering with another.
"""
@classmethod
def from_args(cls, cassette_class, path, **kwargs):
return cls(cassette_class, lambda: (path, kwargs))
def __init__(self, cls, args_getter):
self.cls = cls
self._args_getter = args_getter
self.__finish = None
def _patch_generator(self, cassette):
with contextlib2.ExitStack() as exit_stack:
for patcher in CassettePatcherBuilder(cassette).build():
exit_stack.enter_context(patcher)
log.debug('Entered context for cassette at {0}.'.format(cassette._path))
yield cassette
log.debug('Exiting context for cassette at {0}.'.format(cassette._path))
# TODO(@IvanMalison): Hmmm. it kind of feels like this should be
# somewhere else.
cassette._save()
def __enter__(self):
assert self.__finish is None, "Cassette already open."
path, kwargs = self._args_getter()
self.__finish = self._patch_generator(self.cls.load(path, **kwargs))
return next(self.__finish)
def __exit__(self, *args):
next(self.__finish, None)
self.__finish = None
@wrapt.decorator
def __call__(self, function, instance, args, kwargs):
with self as cassette:
if cassette.inject:
return function(cassette, *args, **kwargs)
else:
return function(*args, **kwargs)
class Cassette(object):
"""A container for recorded requests and responses"""
@classmethod
def load(cls, path, **kwargs):
'''Load in the cassette stored at the provided path'''
"""Instantiate and load the cassette stored at the specified path."""
new_cassette = cls(path, **kwargs)
new_cassette._load()
return new_cassette
def __init__(self,
path,
serializer=yamlserializer,
record_mode='once',
match_on=[uri, method],
filter_headers=[],
filter_query_parameters=[],
before_record=None,
ignore_hosts=[],
ignore_localhost=[],
):
@classmethod
def use_arg_getter(cls, arg_getter):
return CassetteContextDecorator(cls, arg_getter)
@classmethod
def use(cls, *args, **kwargs):
return CassetteContextDecorator.from_args(cls, *args, **kwargs)
def __init__(self, path, serializer=yamlserializer, record_mode='once',
match_on=(uri, method), before_record_request=None,
before_record_response=None, custom_patches=(),
inject=False):
self._path = path
self._serializer = serializer
self._match_on = match_on
self._filter_headers = filter_headers
self._filter_query_parameters = filter_query_parameters
self._before_record = before_record
self._ignore_hosts = ignore_hosts
if ignore_localhost:
self._ignore_hosts = list(set(
self._ignore_hosts + ['localhost', '0.0.0.0', '127.0.0.1']
))
self._before_record_request = before_record_request or (lambda x: x)
self._before_record_response = before_record_response or (lambda x: x)
self.inject = inject
self.record_mode = record_mode
self.custom_patches = custom_patches
# self.data is the list of (req, resp) tuples
self.data = []
self.play_counts = Counter()
self.dirty = False
self.rewound = False
self.record_mode = record_mode
@property
def play_count(self):
@@ -62,9 +112,7 @@ class Cassette(ContextDecorator):
@property
def all_played(self):
"""
Returns True if all responses have been played, False otherwise.
"""
"""Returns True if all responses have been played, False otherwise."""
return self.play_count == len(self)
@property
@@ -80,46 +128,40 @@ class Cassette(ContextDecorator):
return self.rewound and self.record_mode == 'once' or \
self.record_mode == 'none'
def _filter_request(self, request):
return filter_request(
request=request,
filter_headers=self._filter_headers,
filter_query_parameters=self._filter_query_parameters,
before_record=self._before_record,
ignore_hosts=self._ignore_hosts
)
def append(self, request, response):
'''Add a request, response pair to this cassette'''
request = self._filter_request(request)
"""Add a request, response pair to this cassette"""
request = self._before_record_request(request)
if not request:
return
if self._before_record_response:
response = self._before_record_response(response)
self.data.append((request, response))
self.dirty = True
def filter_request(self, request):
return self._before_record_request(request)
def _responses(self, request):
"""
internal API, returns an iterator with all responses matching
the request.
"""
request = self._filter_request(request)
if not request:
return
request = self._before_record_request(request)
for index, (stored_request, response) in enumerate(self.data):
if requests_match(request, stored_request, self._match_on):
yield index, response
def can_play_response_for(self, request):
request = self._filter_request(request)
request = self._before_record_request(request)
return request and request in self and \
self.record_mode != 'all' and \
self.rewound
def play_response(self, request):
'''
"""
Get the response corresponding to a request, but only if it
hasn't been played back before, and mark it as played
'''
"""
for index, response in self._responses(request):
if self.play_counts[index] == 0:
self.play_counts[index] += 1
@@ -131,11 +173,11 @@ class Cassette(ContextDecorator):
)
def responses_of(self, request):
'''
"""
Find the responses corresponding to a request.
This function isn't actually used by VCR internally, but is
provided as an external API.
'''
"""
responses = [response for index, response in self._responses(request)]
if responses:
@@ -177,20 +219,12 @@ class Cassette(ContextDecorator):
)
def __len__(self):
'''Return the number of request,response pairs stored in here'''
"""Return the number of request,response pairs stored in here"""
return len(self.data)
def __contains__(self, request):
'''Return whether or not a request has been stored'''
for response in self._responses(request):
return True
"""Return whether or not a request has been stored"""
for index, response in self._responses(request):
if self.play_counts[index] == 0:
return True
return False
def __enter__(self):
'''Patch the fetching libraries we know about'''
install(self)
return self
def __exit__(self, typ, value, traceback):
self._save()
reset()

View File

View File

@@ -1,194 +0,0 @@
from __future__ import print_function
from operator import itemgetter
from heapq import nlargest
from itertools import repeat, ifilter
# From http://code.activestate.com/recipes/576611-counter-class/
# Backported for python 2.6 support
class Counter(dict):
'''Dict subclass for counting hashable objects. Sometimes called a bag
or multiset. Elements are stored as dictionary keys and their counts
are stored as dictionary values.
>>> Counter('zyzygy')
Counter({'y': 3, 'z': 2, 'g': 1})
'''
def __init__(self, iterable=None, **kwds):
'''Create a new, empty Counter object. And if given, count elements
from an input iterable. Or, initialize the count from another mapping
of elements to their counts.
>>> c = Counter() # a new, empty counter
>>> c = Counter('gallahad') # a new counter from an iterable
>>> c = Counter({'a': 4, 'b': 2}) # a new counter from a mapping
>>> c = Counter(a=4, b=2) # a new counter from keyword args
'''
self.update(iterable, **kwds)
def __missing__(self, key):
return 0
def most_common(self, n=None):
'''List the n most common elements and their counts from the most
common to the least. If n is None, then list all element counts.
>>> Counter('abracadabra').most_common(3)
[('a', 5), ('r', 2), ('b', 2)]
'''
if n is None:
return sorted(self.iteritems(), key=itemgetter(1), reverse=True)
return nlargest(n, self.iteritems(), key=itemgetter(1))
def elements(self):
'''Iterator over elements repeating each as many times as its count.
>>> c = Counter('ABCABC')
>>> sorted(c.elements())
['A', 'A', 'B', 'B', 'C', 'C']
If an element's count has been set to zero or is a negative number,
elements() will ignore it.
'''
for elem, count in self.iteritems():
for _ in repeat(None, count):
yield elem
# Override dict methods where the meaning changes for Counter objects.
@classmethod
def fromkeys(cls, iterable, v=None):
raise NotImplementedError(
'Counter.fromkeys() is undefined. Use Counter(iterable) instead.')
def update(self, iterable=None, **kwds):
'''Like dict.update() but add counts instead of replacing them.
Source can be an iterable, a dictionary, or another Counter instance.
>>> c = Counter('which')
>>> c.update('witch') # add elements from another iterable
>>> d = Counter('watch')
>>> c.update(d) # add elements from another counter
>>> c['h'] # four 'h' in which, witch, and watch
4
'''
if iterable is not None:
if hasattr(iterable, 'iteritems'):
if self:
self_get = self.get
for elem, count in iterable.iteritems():
self[elem] = self_get(elem, 0) + count
else:
dict.update(self, iterable) # fast path when counter is empty
else:
self_get = self.get
for elem in iterable:
self[elem] = self_get(elem, 0) + 1
if kwds:
self.update(kwds)
def copy(self):
'Like dict.copy() but returns a Counter instance instead of a dict.'
return Counter(self)
def __delitem__(self, elem):
'Like dict.__delitem__() but does not raise KeyError for missing values.'
if elem in self:
dict.__delitem__(self, elem)
def __repr__(self):
if not self:
return '%s()' % self.__class__.__name__
items = ', '.join(map('%r: %r'.__mod__, self.most_common()))
return '%s({%s})' % (self.__class__.__name__, items)
# Multiset-style mathematical operations discussed in:
# Knuth TAOCP Volume II section 4.6.3 exercise 19
# and at http://en.wikipedia.org/wiki/Multiset
#
# Outputs guaranteed to only include positive counts.
#
# To strip negative and zero counts, add-in an empty counter:
# c += Counter()
def __add__(self, other):
'''Add counts from two counters.
>>> Counter('abbb') + Counter('bcc')
Counter({'b': 4, 'c': 2, 'a': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
result = Counter()
for elem in set(self) | set(other):
newcount = self[elem] + other[elem]
if newcount > 0:
result[elem] = newcount
return result
def __sub__(self, other):
''' Subtract count, but keep only results with positive counts.
>>> Counter('abbbc') - Counter('bccd')
Counter({'b': 2, 'a': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
result = Counter()
for elem in set(self) | set(other):
newcount = self[elem] - other[elem]
if newcount > 0:
result[elem] = newcount
return result
def __or__(self, other):
'''Union is the maximum of value in either of the input counters.
>>> Counter('abbb') | Counter('bcc')
Counter({'b': 3, 'c': 2, 'a': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_max = max
result = Counter()
for elem in set(self) | set(other):
newcount = _max(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
def __and__(self, other):
''' Intersection is the minimum of corresponding counts.
>>> Counter('abbb') & Counter('bcc')
Counter({'b': 1})
'''
if not isinstance(other, Counter):
return NotImplemented
_min = min
result = Counter()
if len(self) < len(other):
self, other = other, self
for elem in ifilter(self.__contains__, other):
newcount = _min(self[elem], other[elem])
if newcount > 0:
result[elem] = newcount
return result
if __name__ == '__main__':
import doctest
print(doctest.testmod())

View File

@@ -1,258 +0,0 @@
# Backport of OrderedDict() class that runs on Python 2.4, 2.5, 2.6, 2.7 and pypy.
# Passes Python2.7's test suite and incorporates all the latest updates.
try:
from thread import get_ident as _get_ident
except ImportError:
from dummy_thread import get_ident as _get_ident
try:
from _abcoll import KeysView, ValuesView, ItemsView
except ImportError:
pass
class OrderedDict(dict):
'Dictionary that remembers insertion order'
# An inherited dict maps keys to values.
# The inherited dict provides __getitem__, __len__, __contains__, and get.
# The remaining methods are order-aware.
# Big-O running times for all methods are the same as for regular dictionaries.
# The internal self.__map dictionary maps keys to links in a doubly linked list.
# The circular doubly linked list starts and ends with a sentinel element.
# The sentinel element never gets deleted (this simplifies the algorithm).
# Each link is stored as a list of length three: [PREV, NEXT, KEY].
def __init__(self, *args, **kwds):
'''Initialize an ordered dictionary. Signature is the same as for
regular dictionaries, but keyword arguments are not recommended
because their insertion order is arbitrary.
'''
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__root
except AttributeError:
self.__root = root = [] # sentinel node
root[:] = [root, root, None]
self.__map = {}
self.__update(*args, **kwds)
def __setitem__(self, key, value, dict_setitem=dict.__setitem__):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link which goes at the end of the linked
# list, and the inherited dictionary is updated with the new key/value pair.
if key not in self:
root = self.__root
last = root[0]
last[1] = root[0] = self.__map[key] = [last, root, key]
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which is
# then removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link_prev, link_next, key = self.__map.pop(key)
link_prev[1] = link_next
link_next[0] = link_prev
def __iter__(self):
'od.__iter__() <==> iter(od)'
root = self.__root
curr = root[1]
while curr is not root:
yield curr[2]
curr = curr[1]
def __reversed__(self):
'od.__reversed__() <==> reversed(od)'
root = self.__root
curr = root[0]
while curr is not root:
yield curr[2]
curr = curr[0]
def clear(self):
'od.clear() -> None. Remove all items from od.'
try:
for node in self.__map.itervalues():
del node[:]
root = self.__root
root[:] = [root, root, None]
self.__map.clear()
except AttributeError:
pass
dict.clear(self)
def popitem(self, last=True):
'''od.popitem() -> (k, v), return and remove a (key, value) pair.
Pairs are returned in LIFO order if last is true or FIFO order if false.
'''
if not self:
raise KeyError('dictionary is empty')
root = self.__root
if last:
link = root[0]
link_prev = link[0]
link_prev[1] = root
root[0] = link_prev
else:
link = root[1]
link_next = link[1]
root[1] = link_next
link_next[0] = root
key = link[2]
del self.__map[key]
value = dict.pop(self, key)
return key, value
# -- the following methods do not depend on the internal structure --
def keys(self):
'od.keys() -> list of keys in od'
return list(self)
def values(self):
'od.values() -> list of values in od'
return [self[key] for key in self]
def items(self):
'od.items() -> list of (key, value) pairs in od'
return [(key, self[key]) for key in self]
def iterkeys(self):
'od.iterkeys() -> an iterator over the keys in od'
return iter(self)
def itervalues(self):
'od.itervalues -> an iterator over the values in od'
for k in self:
yield self[k]
def iteritems(self):
'od.iteritems -> an iterator over the (key, value) items in od'
for k in self:
yield (k, self[k])
def update(*args, **kwds):
'''od.update(E, **F) -> None. Update od from dict/iterable E and F.
If E is a dict instance, does: for k in E: od[k] = E[k]
If E has a .keys() method, does: for k in E.keys(): od[k] = E[k]
Or if E is an iterable of items, does: for k, v in E: od[k] = v
In either case, this is followed by: for k, v in F.items(): od[k] = v
'''
if len(args) > 2:
raise TypeError('update() takes at most 2 positional '
'arguments (%d given)' % (len(args),))
elif not args:
raise TypeError('update() takes at least 1 argument (0 given)')
self = args[0]
# Make progressively weaker assumptions about "other"
other = ()
if len(args) == 2:
other = args[1]
if isinstance(other, dict):
for key in other:
self[key] = other[key]
elif hasattr(other, 'keys'):
for key in other.keys():
self[key] = other[key]
else:
for key, value in other:
self[key] = value
for key, value in kwds.items():
self[key] = value
__update = update # let subclasses override update without breaking __init__
__marker = object()
def pop(self, key, default=__marker):
'''od.pop(k[,d]) -> v, remove specified key and return the corresponding value.
If key is not found, d is returned if given, otherwise KeyError is raised.
'''
if key in self:
result = self[key]
del self[key]
return result
if default is self.__marker:
raise KeyError(key)
return default
def setdefault(self, key, default=None):
'od.setdefault(k[,d]) -> od.get(k,d), also set od[k]=d if k not in od'
if key in self:
return self[key]
self[key] = default
return default
def __repr__(self, _repr_running={}):
'od.__repr__() <==> repr(od)'
call_key = id(self), _get_ident()
if call_key in _repr_running:
return '...'
_repr_running[call_key] = 1
try:
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, self.items())
finally:
del _repr_running[call_key]
def __reduce__(self):
'Return state information for pickling'
items = [[k, self[k]] for k in self]
inst_dict = vars(self).copy()
for k in vars(OrderedDict()):
inst_dict.pop(k, None)
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def copy(self):
'od.copy() -> a shallow copy of od'
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S
and values equal to v (which defaults to None).
'''
d = cls()
for key in iterable:
d[key] = value
return d
def __eq__(self, other):
'''od.__eq__(y) <==> od==y. Comparison to another OD is order-sensitive
while comparison to a regular mapping is order-insensitive.
'''
if isinstance(other, OrderedDict):
return len(self)==len(other) and self.items() == other.items()
return dict.__eq__(self, other)
def __ne__(self, other):
return not self == other
# -- the following methods are only used in Python 2.7 --
def viewkeys(self):
"od.viewkeys() -> a set-like object providing a view on od's keys"
return KeysView(self)
def viewvalues(self):
"od.viewvalues() -> an object providing a view on od's values"
return ValuesView(self)
def viewitems(self):
"od.viewitems() -> a set-like object providing a view on od's items"
return ItemsView(self)

View File

@@ -1,28 +1,23 @@
import collections
import copy
import functools
import os
from .cassette import Cassette
from .serializers import yamlserializer, jsonserializer
from . import matchers
from . import filters
class VCR(object):
def __init__(self,
serializer='yaml',
cassette_library_dir=None,
record_mode="once",
filter_headers=[],
filter_query_parameters=[],
before_record=None,
match_on=[
'method',
'scheme',
'host',
'port',
'path',
'query',
],
ignore_hosts=[],
ignore_localhost=False,
):
def __init__(self, serializer='yaml', cassette_library_dir=None,
record_mode="once", filter_headers=(), ignore_localhost=False,
custom_patches=(), filter_query_parameters=(),
filter_post_data_parameters=(), before_record_request=None,
before_record_response=None, ignore_hosts=(),
match_on=('method', 'scheme', 'host', 'port', 'path', 'query'),
before_record=None, inject_cassette=False):
self.serializer = serializer
self.match_on = match_on
self.cassette_library_dir = cassette_library_dir
@@ -45,9 +40,13 @@ class VCR(object):
self.record_mode = record_mode
self.filter_headers = filter_headers
self.filter_query_parameters = filter_query_parameters
self.before_record = before_record
self.filter_post_data_parameters = filter_post_data_parameters
self.before_record_request = before_record_request or before_record
self.before_record_response = before_record_response
self.ignore_hosts = ignore_hosts
self.ignore_localhost = ignore_localhost
self.inject_cassette = inject_cassette
self._custom_patches = tuple(custom_patches)
def _get_serializer(self, serializer_name):
try:
@@ -66,44 +65,125 @@ class VCR(object):
matchers.append(self.matchers[m])
except KeyError:
raise KeyError(
"Matcher {0} doesn't exist or isn't registered".format(
m)
"Matcher {0} doesn't exist or isn't registered".format(m)
)
return matchers
def use_cassette(self, path, **kwargs):
def use_cassette(self, path, with_current_defaults=False, **kwargs):
if with_current_defaults:
path, config = self.get_path_and_merged_config(path, **kwargs)
return Cassette.use(path, **config)
# This is made a function that evaluates every time a cassette
# is made so that changes that are made to this VCR instance
# that occur AFTER the `use_cassette` decorator is applied
# still affect subsequent calls to the decorated function.
args_getter = functools.partial(self.get_path_and_merged_config,
path, **kwargs)
return Cassette.use_arg_getter(args_getter)
def get_path_and_merged_config(self, path, **kwargs):
serializer_name = kwargs.get('serializer', self.serializer)
matcher_names = kwargs.get('match_on', self.match_on)
cassette_library_dir = kwargs.get(
'cassette_library_dir',
self.cassette_library_dir
)
if cassette_library_dir:
path = os.path.join(cassette_library_dir, path)
merged_config = {
"serializer": self._get_serializer(serializer_name),
"match_on": self._get_matchers(matcher_names),
"record_mode": kwargs.get('record_mode', self.record_mode),
"filter_headers": kwargs.get(
'filter_headers', self.filter_headers
'serializer': self._get_serializer(serializer_name),
'match_on': self._get_matchers(matcher_names),
'record_mode': kwargs.get('record_mode', self.record_mode),
'before_record_request': self._build_before_record_request(kwargs),
'before_record_response': self._build_before_record_response(
kwargs
),
"filter_query_parameters": kwargs.get(
'filter_query_parameters', self.filter_query_parameters
),
"before_record": kwargs.get(
"before_record", self.before_record
),
"ignore_hosts": kwargs.get(
'ignore_hosts', self.ignore_hosts
),
"ignore_localhost": kwargs.get(
'ignore_localhost', self.ignore_localhost
'custom_patches': self._custom_patches + kwargs.get(
'custom_patches', ()
),
'inject': kwargs.get('inject_cassette', self.inject_cassette)
}
return path, merged_config
return Cassette.load(path, **merged_config)
def _build_before_record_response(self, options):
before_record_response = options.get(
'before_record_response', self.before_record_response
)
filter_functions = []
if before_record_response and not isinstance(before_record_response,
collections.Iterable):
before_record_response = (before_record_response,)
for function in before_record_response:
filter_functions.append(function)
def before_record_response(response):
for function in filter_functions:
if response is None:
break
response = function(response)
return response
return before_record_response
def _build_before_record_request(self, options):
filter_functions = []
filter_headers = options.get(
'filter_headers', self.filter_headers
)
filter_query_parameters = options.get(
'filter_query_parameters', self.filter_query_parameters
)
filter_post_data_parameters = options.get(
'filter_post_data_parameters', self.filter_post_data_parameters
)
before_record_request = options.get(
"before_record_request", options.get("before_record", self.before_record_request)
)
ignore_hosts = options.get(
'ignore_hosts', self.ignore_hosts
)
ignore_localhost = options.get(
'ignore_localhost', self.ignore_localhost
)
if filter_headers:
filter_functions.append(functools.partial(filters.remove_headers,
headers_to_remove=filter_headers))
if filter_query_parameters:
filter_functions.append(functools.partial(filters.remove_query_parameters,
query_parameters_to_remove=filter_query_parameters))
if filter_post_data_parameters:
filter_functions.append(functools.partial(filters.remove_post_data_parameters,
post_data_parameters_to_remove=filter_post_data_parameters))
hosts_to_ignore = list(ignore_hosts)
if ignore_localhost:
hosts_to_ignore.extend(('localhost', '0.0.0.0', '127.0.0.1'))
if hosts_to_ignore:
hosts_to_ignore = set(hosts_to_ignore)
filter_functions.append(self._build_ignore_hosts(hosts_to_ignore))
if before_record_request:
if not isinstance(before_record_request, collections.Iterable):
before_record_request = (before_record_request,)
for function in before_record_request:
filter_functions.append(function)
def before_record_request(request):
request = copy.copy(request)
for function in filter_functions:
if request is None:
break
request = function(request)
return request
return before_record_request
@staticmethod
def _build_ignore_hosts(hosts_to_ignore):
def filter_ignored_hosts(request):
if hasattr(request, 'host') and request.host in hosts_to_ignore:
return
return request
return filter_ignored_hosts
def register_serializer(self, name, serializer):
self.serializers[name] = serializer

View File

@@ -1,8 +1,13 @@
from six import BytesIO
from six.moves.urllib.parse import urlparse, urlencode, urlunparse
try:
from collections import OrderedDict
except ImportError:
from backport_collections import OrderedDict
import copy
def _remove_headers(request, headers_to_remove):
def remove_headers(request, headers_to_remove):
headers = copy.copy(request.headers)
headers_to_remove = [h.lower() for h in headers_to_remove]
keys = [k for k in headers if k.lower() in headers_to_remove]
@@ -13,7 +18,7 @@ def _remove_headers(request, headers_to_remove):
return request
def _remove_query_parameters(request, query_parameters_to_remove):
def remove_query_parameters(request, query_parameters_to_remove):
query = request.query
new_query = [(k, v) for (k, v) in query
if k not in query_parameters_to_remove]
@@ -24,20 +29,15 @@ def _remove_query_parameters(request, query_parameters_to_remove):
return request
def filter_request(
request,
filter_headers,
filter_query_parameters,
before_record,
ignore_hosts
):
request = copy.copy(request) # don't mutate request object
if hasattr(request, 'headers') and filter_headers:
request = _remove_headers(request, filter_headers)
if hasattr(request, 'host') and request.host in ignore_hosts:
return None
if filter_query_parameters:
request = _remove_query_parameters(request, filter_query_parameters)
if before_record:
request = before_record(request)
def remove_post_data_parameters(request, post_data_parameters_to_remove):
if request.method == 'POST' and not isinstance(request.body, BytesIO):
post_data = OrderedDict()
for k, sep, v in [p.partition(b'=') for p in request.body.split(b'&')]:
if k in post_data:
post_data[k].append(v)
elif len(k) > 0 and k.decode('utf-8') not in post_data_parameters_to_remove:
post_data[k] = [v]
request.body = b'&'.join(
b'='.join([k, v])
for k, vals in post_data.items() for v in vals)
return request

View File

@@ -31,6 +31,8 @@ def query(r1, r2):
def body(r1, r2):
if hasattr(r1.body, 'read') and hasattr(r2.body, 'read'):
return r1.body.read() == r2.body.read()
return r1.body == r2.body
@@ -38,16 +40,16 @@ def headers(r1, r2):
return r1.headers == r2.headers
def _log_matches(matches):
def _log_matches(r1, r2, matches):
differences = [m for m in matches if not m[0]]
if differences:
log.debug(
'Requests differ according to the following matchers: ' +
str(differences)
"Requests {0} and {1} differ according to "
"the following matchers: {2}".format(r1, r2, differences)
)
def requests_match(r1, r2, matchers):
matches = [(m(r1, r2), m) for m in matchers]
_log_matches(matches)
_log_matches(r1, r2, matches)
return all([m[0] for m in matches])

View File

@@ -58,7 +58,7 @@ PARTS = [
def build_uri(**parts):
port = parts['port']
scheme = parts['protocol']
default_port = {'https': 433, 'http': 80}[scheme]
default_port = {'https': 443, 'http': 80}[scheme]
parts['port'] = ':{0}'.format(port) if port != default_port else ''
return "{protocol}://{host}{port}{path}".format(**parts)

View File

@@ -1,4 +1,9 @@
'''Utilities for patching in cassettes'''
import functools
import itertools
import contextlib2
import mock
from .stubs import VCRHTTPConnection, VCRHTTPSConnection
from six.moves import http_client as httplib
@@ -8,139 +13,319 @@ from six.moves import http_client as httplib
_HTTPConnection = httplib.HTTPConnection
_HTTPSConnection = httplib.HTTPSConnection
# Try to save the original types for requests
try:
# Try to save the original types for requests
import requests.packages.urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
pass
else:
_VerifiedHTTPSConnection = cpool.VerifiedHTTPSConnection
_cpoolHTTPConnection = cpool.HTTPConnection
_cpoolHTTPSConnection = cpool.HTTPSConnection
except ImportError: # pragma: no cover
pass
# Try to save the original types for urllib3
try:
# Try to save the original types for urllib3
import urllib3
_VerifiedHTTPSConnection = urllib3.connectionpool.VerifiedHTTPSConnection
except ImportError: # pragma: no cover
pass
else:
_VerifiedHTTPSConnection = urllib3.connectionpool.VerifiedHTTPSConnection
# Try to save the original types for httplib2
try:
# Try to save the original types for httplib2
import httplib2
except ImportError: # pragma: no cover
pass
else:
_HTTPConnectionWithTimeout = httplib2.HTTPConnectionWithTimeout
_HTTPSConnectionWithTimeout = httplib2.HTTPSConnectionWithTimeout
_SCHEME_TO_CONNECTION = httplib2.SCHEME_TO_CONNECTION
except ImportError: # pragma: no cover
pass
# Try to save the original types for boto
try:
# Try to save the original types for boto
import boto.https_connection
_CertValidatingHTTPSConnection = \
boto.https_connection.CertValidatingHTTPSConnection
except ImportError: # pragma: no cover
pass
else:
_CertValidatingHTTPSConnection = boto.https_connection.CertValidatingHTTPSConnection
def install(cassette):
"""
Patch all the HTTPConnections references we can find!
This replaces the actual HTTPConnection with a VCRHTTPConnection
object which knows how to save to / read from cassettes
"""
httplib.HTTPConnection = VCRHTTPConnection
httplib.HTTPSConnection = VCRHTTPSConnection
httplib.HTTPConnection.cassette = cassette
httplib.HTTPSConnection.cassette = cassette
class CassettePatcherBuilder(object):
# patch requests v1.x
def _build_patchers_from_mock_triples_decorator(function):
@functools.wraps(function)
def wrapped(self, *args, **kwargs):
return self._build_patchers_from_mock_triples(
function(self, *args, **kwargs)
)
return wrapped
def __init__(self, cassette):
self._cassette = cassette
self._class_to_cassette_subclass = {}
def build(self):
return itertools.chain(
self._httplib(), self._requests(), self._urllib3(), self._httplib2(),
self._boto(), self._build_patchers_from_mock_triples(
self._cassette.custom_patches
)
)
def _build_patchers_from_mock_triples(self, mock_triples):
for args in mock_triples:
patcher = self._build_patcher(*args)
if patcher:
yield patcher
def _build_patcher(self, obj, patched_attribute, replacement_class):
if not hasattr(obj, patched_attribute):
return
return mock.patch.object(obj, patched_attribute,
self._recursively_apply_get_cassette_subclass(
replacement_class))
def _recursively_apply_get_cassette_subclass(self, replacement_dict_or_obj):
"""One of the subtleties of this class is that it does not directly
replace HTTPSConnection with `VCRRequestsHTTPSConnection`, but a
subclass of the aforementioned class that has the `cassette`
class attribute assigned to `self._cassette`. This behavior is
necessary to properly support nested cassette contexts.
This function exists to ensure that we use the same class
object (reference) to patch everything that replaces
VCRRequestHTTP[S]Connection, but that we can talk about
patching them with the raw references instead, and without
worrying about exactly where the subclass with the relevant
value for `cassette` is first created.
The function is recursive because it looks in to dictionaries
and replaces class values at any depth with the subclass
described in the previous paragraph.
"""
if isinstance(replacement_dict_or_obj, dict):
for key, replacement_obj in replacement_dict_or_obj.items():
replacement_obj = self._recursively_apply_get_cassette_subclass(
replacement_obj)
replacement_dict_or_obj[key] = replacement_obj
return replacement_dict_or_obj
if hasattr(replacement_dict_or_obj, 'cassette'):
replacement_dict_or_obj = self._get_cassette_subclass(
replacement_dict_or_obj)
return replacement_dict_or_obj
def _get_cassette_subclass(self, klass):
if klass.cassette is not None:
return klass
if klass not in self._class_to_cassette_subclass:
subclass = self._build_cassette_subclass(klass)
self._class_to_cassette_subclass[klass] = subclass
return self._class_to_cassette_subclass[klass]
def _build_cassette_subclass(self, base_class):
bases = (base_class,)
if not issubclass(base_class, object): # Check for old style class
bases += (object,)
return type('{0}{1}'.format(base_class.__name__, self._cassette._path),
bases, dict(cassette=self._cassette))
@_build_patchers_from_mock_triples_decorator
def _httplib(self):
yield httplib, 'HTTPConnection', VCRHTTPConnection
yield httplib, 'HTTPSConnection', VCRHTTPSConnection
def _requests(self):
try:
import requests.packages.urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
return ()
from .stubs import requests_stubs
return self._urllib3_patchers(cpool, requests_stubs)
def _patched_get_conn(self, connection_pool_class, connection_class_getter):
get_conn = connection_pool_class._get_conn
@functools.wraps(get_conn)
def patched_get_conn(pool, timeout=None):
connection = get_conn(pool, timeout)
connection_class = pool.ConnectionCls if hasattr(pool, 'ConnectionCls') \
else connection_class_getter()
# We need to make sure that we are actually providing a
# patched version of the connection class. This might not
# always be the case because the pool keeps previously
# used connections (which might actually be of a different
# class) around. This while loop will terminate because
# eventually the pool will run out of connections.
while not isinstance(connection, connection_class):
connection = get_conn(pool, timeout)
return connection
return patched_get_conn
def _patched_new_conn(self, connection_pool_class, connection_remover):
new_conn = connection_pool_class._new_conn
@functools.wraps(new_conn)
def patched_new_conn(pool):
new_connection = new_conn(pool)
connection_remover.add_connection_to_pool_entry(pool, new_connection)
return new_connection
return patched_new_conn
def _urllib3(self):
try:
import urllib3.connectionpool as cpool
except ImportError: # pragma: no cover
return ()
from .stubs import urllib3_stubs
return self._urllib3_patchers(cpool, urllib3_stubs)
@_build_patchers_from_mock_triples_decorator
def _httplib2(self):
try:
import httplib2 as cpool
except ImportError: # pragma: no cover
pass
else:
from .stubs.httplib2_stubs import VCRHTTPConnectionWithTimeout
from .stubs.httplib2_stubs import VCRHTTPSConnectionWithTimeout
yield cpool, 'HTTPConnectionWithTimeout', VCRHTTPConnectionWithTimeout
yield cpool, 'HTTPSConnectionWithTimeout', VCRHTTPSConnectionWithTimeout
yield cpool, 'SCHEME_TO_CONNECTION', {'http': VCRHTTPConnectionWithTimeout,
'https': VCRHTTPSConnectionWithTimeout}
@_build_patchers_from_mock_triples_decorator
def _boto(self):
try:
import boto.https_connection as cpool
except ImportError: # pragma: no cover
pass
else:
from .stubs.boto_stubs import VCRCertValidatingHTTPSConnection
yield cpool, 'CertValidatingHTTPSConnection', VCRCertValidatingHTTPSConnection
def _urllib3_patchers(self, cpool, stubs):
http_connection_remover = ConnectionRemover(
self._get_cassette_subclass(stubs.VCRRequestsHTTPConnection)
)
https_connection_remover = ConnectionRemover(
self._get_cassette_subclass(stubs.VCRRequestsHTTPSConnection)
)
mock_triples = (
(cpool, 'VerifiedHTTPSConnection', stubs.VCRRequestsHTTPSConnection),
(cpool, 'VerifiedHTTPSConnection', stubs.VCRRequestsHTTPSConnection),
(cpool, 'HTTPConnection', stubs.VCRRequestsHTTPConnection),
(cpool, 'HTTPSConnection', stubs.VCRRequestsHTTPSConnection),
(cpool, 'is_connection_dropped', mock.Mock(return_value=False)), # Needed on Windows only
(cpool.HTTPConnectionPool, 'ConnectionCls', stubs.VCRRequestsHTTPConnection),
(cpool.HTTPSConnectionPool, 'ConnectionCls', stubs.VCRRequestsHTTPSConnection),
)
# These handle making sure that sessions only use the
# connections of the appropriate type.
mock_triples += ((cpool.HTTPConnectionPool, '_get_conn',
self._patched_get_conn(cpool.HTTPConnectionPool,
lambda : cpool.HTTPConnection)),
(cpool.HTTPSConnectionPool, '_get_conn',
self._patched_get_conn(cpool.HTTPSConnectionPool,
lambda : cpool.HTTPSConnection)),
(cpool.HTTPConnectionPool, '_new_conn',
self._patched_new_conn(cpool.HTTPConnectionPool,
http_connection_remover)),
(cpool.HTTPSConnectionPool, '_new_conn',
self._patched_new_conn(cpool.HTTPSConnectionPool,
https_connection_remover)))
return itertools.chain(self._build_patchers_from_mock_triples(mock_triples),
(http_connection_remover, https_connection_remover))
class ConnectionRemover(object):
def __init__(self, connection_class):
self._connection_class = connection_class
self._connection_pool_to_connections = {}
def add_connection_to_pool_entry(self, pool, connection):
if isinstance(connection, self._connection_class):
self._connection_pool_to_connections.setdefault(pool, set()).add(connection)
def remove_connection_to_pool_entry(self, pool, connection):
if isinstance(connection, self._connection_class):
self._connection_pool_to_connections[self._connection_class].remove(connection)
def __enter__(self):
return self
def __exit__(self, *args):
for pool, connections in self._connection_pool_to_connections.items():
readd_connections = []
while pool.pool and not pool.pool.empty() and connections:
connection = pool.pool.get()
if isinstance(connection, self._connection_class):
connections.remove(connection)
else:
readd_connections.append(connection)
for connection in readd_connections:
pool._put_conn(connection)
def reset_patchers():
yield mock.patch.object(httplib, 'HTTPConnection', _HTTPConnection)
yield mock.patch.object(httplib, 'HTTPSConnection', _HTTPSConnection)
try:
import requests.packages.urllib3.connectionpool as cpool
from .stubs.requests_stubs import VCRRequestsHTTPConnection, VCRRequestsHTTPSConnection
cpool.VerifiedHTTPSConnection = VCRRequestsHTTPSConnection
cpool.HTTPConnection = VCRRequestsHTTPConnection
cpool.VerifiedHTTPSConnection.cassette = cassette
cpool.HTTPConnection = VCRHTTPConnection
cpool.HTTPConnection.cassette = cassette
# patch requests v2.x
cpool.HTTPConnectionPool.ConnectionCls = VCRRequestsHTTPConnection
cpool.HTTPConnectionPool.cassette = cassette
cpool.HTTPSConnectionPool.ConnectionCls = VCRRequestsHTTPSConnection
cpool.HTTPSConnectionPool.cassette = cassette
except ImportError: # pragma: no cover
pass
# patch urllib3
try:
import urllib3.connectionpool as cpool
from .stubs.urllib3_stubs import VCRVerifiedHTTPSConnection
cpool.VerifiedHTTPSConnection = VCRVerifiedHTTPSConnection
cpool.VerifiedHTTPSConnection.cassette = cassette
cpool.HTTPConnection = VCRHTTPConnection
cpool.HTTPConnection.cassette = cassette
except ImportError: # pragma: no cover
pass
# patch httplib2
try:
import httplib2 as cpool
from .stubs.httplib2_stubs import VCRHTTPConnectionWithTimeout
from .stubs.httplib2_stubs import VCRHTTPSConnectionWithTimeout
cpool.HTTPConnectionWithTimeout = VCRHTTPConnectionWithTimeout
cpool.HTTPSConnectionWithTimeout = VCRHTTPSConnectionWithTimeout
cpool.SCHEME_TO_CONNECTION = {
'http': VCRHTTPConnectionWithTimeout,
'https': VCRHTTPSConnectionWithTimeout
}
except ImportError: # pragma: no cover
pass
# patch boto
try:
import boto.https_connection as cpool
from .stubs.boto_stubs import VCRCertValidatingHTTPSConnection
cpool.CertValidatingHTTPSConnection = VCRCertValidatingHTTPSConnection
cpool.CertValidatingHTTPSConnection.cassette = cassette
except ImportError: # pragma: no cover
pass
def reset():
'''Undo all the patching'''
httplib.HTTPConnection = _HTTPConnection
httplib.HTTPSConnection = _HTTPSConnection
try:
import requests.packages.urllib3.connectionpool as cpool
else:
# unpatch requests v1.x
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
cpool.HTTPConnection = _cpoolHTTPConnection
yield mock.patch.object(cpool, 'VerifiedHTTPSConnection', _VerifiedHTTPSConnection)
yield mock.patch.object(cpool, 'HTTPConnection', _cpoolHTTPConnection)
# unpatch requests v2.x
cpool.HTTPConnectionPool.ConnectionCls = _cpoolHTTPConnection
cpool.HTTPSConnection = _cpoolHTTPSConnection
cpool.HTTPSConnectionPool.ConnectionCls = _cpoolHTTPSConnection
except ImportError: # pragma: no cover
pass
if hasattr(cpool.HTTPConnectionPool, 'ConnectionCls'):
yield mock.patch.object(cpool.HTTPConnectionPool, 'ConnectionCls',
_cpoolHTTPConnection)
yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls',
_cpoolHTTPSConnection)
if hasattr(cpool, 'HTTPSConnection'):
yield mock.patch.object(cpool, 'HTTPSConnection', _cpoolHTTPSConnection)
try:
import urllib3.connectionpool as cpool
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
cpool.HTTPConnection = _HTTPConnection
cpool.HTTPSConnection = _HTTPSConnection
cpool.HTTPConnectionPool.ConnectionCls = _HTTPConnection
cpool.HTTPSConnectionPool.ConnectionCls = _HTTPSConnection
except ImportError: # pragma: no cover
pass
else:
yield mock.patch.object(cpool, 'VerifiedHTTPSConnection', _VerifiedHTTPSConnection)
yield mock.patch.object(cpool, 'HTTPConnection', _HTTPConnection)
yield mock.patch.object(cpool, 'HTTPSConnection', _HTTPSConnection)
if hasattr(cpool.HTTPConnectionPool, 'ConnectionCls'):
yield mock.patch.object(cpool.HTTPConnectionPool, 'ConnectionCls', _HTTPConnection)
yield mock.patch.object(cpool.HTTPSConnectionPool, 'ConnectionCls', _HTTPSConnection)
try:
import httplib2 as cpool
cpool.HTTPConnectionWithTimeout = _HTTPConnectionWithTimeout
cpool.HTTPSConnectionWithTimeout = _HTTPSConnectionWithTimeout
cpool.SCHEME_TO_CONNECTION = _SCHEME_TO_CONNECTION
except ImportError: # pragma: no cover
pass
else:
yield mock.patch.object(cpool, 'HTTPConnectionWithTimeout', _HTTPConnectionWithTimeout)
yield mock.patch.object(cpool, 'HTTPSConnectionWithTimeout', _HTTPSConnectionWithTimeout)
yield mock.patch.object(cpool, 'SCHEME_TO_CONNECTION', _SCHEME_TO_CONNECTION)
try:
import boto.https_connection as cpool
cpool.CertValidatingHTTPSConnection = _CertValidatingHTTPSConnection
except ImportError: # pragma: no cover
pass
else:
yield mock.patch.object(cpool, 'CertValidatingHTTPSConnection',
_CertValidatingHTTPSConnection)
@contextlib2.contextmanager
def force_reset():
with contextlib2.ExitStack() as exit_stack:
for patcher in reset_patchers():
exit_stack.enter_context(patcher)
yield

View File

@@ -1,4 +1,3 @@
import tempfile
import os

View File

@@ -1,3 +1,4 @@
from six import BytesIO, binary_type
from six.moves.urllib.parse import urlparse, parse_qsl
@@ -26,11 +27,25 @@ class Request(object):
def __init__(self, method, uri, body, headers):
self.method = method
self.uri = uri
self.body = body
self._was_file = hasattr(body, 'read')
if self._was_file:
self._body = body.read()
if not isinstance(self._body, binary_type):
self._body = self._body.encode('utf-8')
else:
self._body = body
self.headers = {}
for key in headers:
self.add_header(key, headers[key])
@property
def body(self):
return BytesIO(self._body) if self._was_file else self._body
@body.setter
def body(self, value):
self._body = value
def add_header(self, key, value):
# see class docstring for an explanation
if isinstance(value, (tuple, list)):
@@ -51,7 +66,7 @@ class Request(object):
parse_uri = urlparse(self.uri)
port = parse_uri.port
if port is None:
port = {'https': 433, 'http': 80}[parse_uri.scheme]
port = {'https': 443, 'http': 80}[parse_uri.scheme]
return port
@property

View File

@@ -11,10 +11,14 @@ def deserialize(cassette_string):
def serialize(cassette_dict):
try:
return json.dumps(cassette_dict, indent=4)
except UnicodeDecodeError:
except UnicodeDecodeError as original:
raise UnicodeDecodeError(
"Error serializing cassette to JSON. ",
"Does this HTTP interaction contain binary data? ",
"If so, use a different serializer (like the yaml serializer) ",
"for this request"
original.encoding,
b"Error serializing cassette to JSON",
original.start,
original.end,
original.args[-1] +
("Does this HTTP interaction contain binary data? "
"If so, use a different serializer (like the yaml serializer) "
"for this request?")
)

View File

@@ -57,7 +57,7 @@ def parse_headers(header_list):
def serialize_headers(response):
out = {}
for key, values in compat.get_headers(response):
for key, values in compat.get_headers(response.msg):
out.setdefault(key, [])
out[key].extend(values)
return out
@@ -76,7 +76,15 @@ class VCRHTTPResponse(HTTPResponse):
self._closed = False
headers = self.recorded_response['headers']
self.msg = parse_headers(headers)
# Since we are loading a response that has already been serialized, our
# response is no longer chunked. That means we don't want any
# libraries trying to process a chunked response. By removing the
# transfer-encoding: chunked header, this should cause the downstream
# libraries to process this as a non-chunked response.
te_key = [h for h in headers.keys() if h.upper() == 'TRANSFER-ENCODING']
if te_key:
del headers[te_key[0]]
self.headers = self.msg = parse_headers(headers)
self.length = compat.get_header(self.msg, 'content-length') or None
@@ -108,14 +116,18 @@ class VCRHTTPResponse(HTTPResponse):
def getheaders(self):
message = parse_headers(self.recorded_response['headers'])
return compat.get_header_items(message)
return list(compat.get_header_items(message))
def getheader(self, header, default=None):
headers = dict(((k.lower(), v) for k, v in self.getheaders()))
return headers.get(header.lower(), default)
values = [v for (k, v) in self.getheaders() if k.lower() == header.lower()]
if values:
return ', '.join(values)
else:
return default
class VCRConnection:
class VCRConnection(object):
# A reference to the cassette that's currently being patched in
cassette = None
@@ -124,7 +136,7 @@ class VCRConnection:
Returns empty string for the default port and ':port' otherwise
"""
port = self.real_connection.port
default_port = {'https': 433, 'http': 80}[self._protocol]
default_port = {'https': 443, 'http': 80}[self._protocol]
return ':{0}'.format(port) if port != default_port else ''
def _uri(self, url):
@@ -200,8 +212,8 @@ class VCRConnection:
"""
pass
def getresponse(self, _=False):
'''Retrieve a the response'''
def getresponse(self, _=False, **kwargs):
'''Retrieve the response'''
# Check to see if the cassette has a response for this request. If so,
# then return it
if self.cassette.can_play_response_for(self._vcr_request):
@@ -213,11 +225,15 @@ class VCRConnection:
response = self.cassette.play_response(self._vcr_request)
return VCRHTTPResponse(response)
else:
if self.cassette.write_protected and self.cassette._filter_request(self._vcr_request):
if self.cassette.write_protected and self.cassette.filter_request(
self._vcr_request
):
raise CannotOverwriteExistingCassetteException(
"No match for the request (%r) was found. "
"Can't overwrite existing cassette (%r) in "
"your current record mode (%r)."
% (self.cassette._path, self.cassette.record_mode)
% (self._vcr_request, self.cassette._path,
self.cassette.record_mode)
)
# Otherwise, we should send the request, then get the response
@@ -228,12 +244,16 @@ class VCRConnection:
self._vcr_request
)
)
self.real_connection.request(
method=self._vcr_request.method,
url=self._url(self._vcr_request.uri),
body=self._vcr_request.body,
headers=self._vcr_request.headers,
)
# This is imported here to avoid circular import.
# TODO(@IvanMalison): Refactor to allow normal import.
from vcr.patch import force_reset
with force_reset():
self.real_connection.request(
method=self._vcr_request.method,
url=self._url(self._vcr_request.uri),
body=self._vcr_request.body,
headers=self._vcr_request.headers,
)
# get the response
response = self.real_connection.getresponse()
@@ -291,10 +311,9 @@ class VCRConnection:
# need to temporarily reset here because the real connection
# inherits from the thing that we are mocking out. Take out
# the reset if you want to see what I mean :)
from vcr.patch import install, reset
reset()
self.real_connection = self._baseclass(*args, **kwargs)
install(self.cassette)
from vcr.patch import force_reset
with force_reset():
self.real_connection = self._baseclass(*args, **kwargs)
class VCRHTTPConnection(VCRConnection):
@@ -307,3 +326,4 @@ class VCRHTTPSConnection(VCRConnection):
'''A Mocked class for HTTPS requests'''
_baseclass = HTTPSConnection
_protocol = 'https'
is_verified = True

View File

@@ -21,18 +21,17 @@ def get_header(message, name):
def get_header_items(message):
if six.PY3:
return dict(message._headers).items()
else:
return message.dict.items()
for (key, values) in get_headers(message):
for value in values:
yield key, value
def get_headers(response):
for key in response.msg.keys():
def get_headers(message):
for key in set(message.keys()):
if six.PY3:
yield key, response.msg.get_all(key)
yield key, message.get_all(key)
else:
yield key, response.msg.getheaders(key)
yield key, message.getheaders(key)
def get_httpmessage(headers):

View File

@@ -1,8 +1,13 @@
'''Stubs for urllib3'''
from urllib3.connectionpool import VerifiedHTTPSConnection
from ..stubs import VCRHTTPSConnection
from urllib3.connectionpool import HTTPConnection, VerifiedHTTPSConnection
from ..stubs import VCRHTTPConnection, VCRHTTPSConnection
# urllib3 defines its own HTTPConnection classes. It includes some polyfills
# for newer features missing in older pythons.
class VCRVerifiedHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection):
class VCRRequestsHTTPConnection(VCRHTTPConnection, HTTPConnection):
_baseclass = HTTPConnection
class VCRRequestsHTTPSConnection(VCRHTTPSConnection, VerifiedHTTPSConnection):
_baseclass = VerifiedHTTPSConnection