mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-08 16:53:23 +00:00
Compare commits
30 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce3d7270ea | ||
|
|
39d696bc49 | ||
|
|
ce94fd72fd | ||
|
|
a66f462dcd | ||
|
|
03c22d79dd | ||
|
|
5ce67dc023 | ||
|
|
07774ae6dd | ||
|
|
e1c7eb1ec5 | ||
|
|
7f958246e0 | ||
|
|
c8299103fb | ||
|
|
98603541d6 | ||
|
|
6b8d4643e8 | ||
|
|
b55834e929 | ||
|
|
7264780960 | ||
|
|
0f2695f240 | ||
|
|
65254b4969 | ||
|
|
6005420409 | ||
|
|
c5eca93edc | ||
|
|
b688dd362d | ||
|
|
28379e9000 | ||
|
|
b7af8bae71 | ||
|
|
7a4c11bf94 | ||
|
|
1478ce82fd | ||
|
|
9073cf137e | ||
|
|
53f5cd24d6 | ||
|
|
cf744dca00 | ||
|
|
51f0f1bacd | ||
|
|
3e247a2efb | ||
|
|
762b761d0c | ||
|
|
348cc8fdfe |
180
README.md
180
README.md
@@ -1,5 +1,7 @@
|
||||
#VCR.py
|
||||
|
||||

|
||||
|
||||
This is a Python version of [Ruby's VCR library](https://github.com/myronmarston/vcr).
|
||||
|
||||
[](http://travis-ci.org/kevin1024/vcrpy)
|
||||
@@ -38,6 +40,98 @@ pass, even if you are offline, or iana.org goes down for maintenance) and
|
||||
accurate (the response will contain the same headers and body you get from a
|
||||
real request).
|
||||
|
||||
## Configuration
|
||||
|
||||
If you don't like VCR's defaults, you can set options by instantiating a
|
||||
VCR class and setting the options on it.
|
||||
|
||||
```python
|
||||
|
||||
import vcr
|
||||
|
||||
my_vcr = vcr.VCR(
|
||||
serializer = 'json',
|
||||
cassette_library_dir = 'fixtures/cassettes',
|
||||
record_mode = 'once',
|
||||
match_on = ['url', 'method'],
|
||||
)
|
||||
|
||||
with my_vcr.use_cassette('test.json'):
|
||||
# your http code here
|
||||
```
|
||||
|
||||
Otherwise, you can override options each time you use a cassette.
|
||||
|
||||
```python
|
||||
with vcr.use_cassette('test.yml', serializer='json', record_mode='once'):
|
||||
# your http code here
|
||||
```
|
||||
|
||||
Note: Per-cassette overrides take precedence over the global config.
|
||||
|
||||
## Request matching
|
||||
|
||||
Request matching is configurable and allows you to change which requests
|
||||
VCR considers identical. The default behavior is `['url', method']`
|
||||
which means that requests with both the same URL and method (ie POST or
|
||||
GET) are considered identical.
|
||||
|
||||
This can be configured by changing the `match_on` setting.
|
||||
|
||||
The following options are available :
|
||||
|
||||
* method (for example, POST or GET)
|
||||
* url (the full URL, including the protocol)
|
||||
* host (the hostname of the server receiving the request)
|
||||
* path (excluding the hostname)
|
||||
* body (the entire request body)
|
||||
* headers (the headers of the request)
|
||||
|
||||
If these options don't work for you, you can also register your own
|
||||
request matcher. This is described in the Advanced section of this
|
||||
README.
|
||||
|
||||
## Record Modes
|
||||
VCR supports 4 record modes (with the same behavior as Ruby's VCR):
|
||||
|
||||
### once
|
||||
|
||||
* Replay previously recorded interactions.
|
||||
* Record new interactions if there is no cassette file.
|
||||
* Cause an error to be raised for new requests if there is a cassette file.
|
||||
|
||||
It is similar to the :new_episodes record mode, but will prevent new,
|
||||
unexpected requests from being made (i.e. because the request URI
|
||||
changed).
|
||||
|
||||
once is the default record mode, used when you do not set one.
|
||||
|
||||
### new_episodes
|
||||
|
||||
* Record new interactions.
|
||||
* Replay previously recorded interactions.
|
||||
It is similar to the once record mode, but will always record new
|
||||
interactions, even if you have an existing recorded one that is similar,
|
||||
but not identical.
|
||||
|
||||
This was the default behavior in versions < 0.3.0
|
||||
|
||||
### none
|
||||
|
||||
* Replay previously recorded interactions.
|
||||
* Cause an error to be raised for any new requests.
|
||||
This is useful when your code makes potentially dangerous
|
||||
HTTP requests. The none record mode guarantees that no
|
||||
new HTTP requests will be made.
|
||||
|
||||
### all
|
||||
|
||||
* Record new interactions.
|
||||
* Never replay previously recorded interactions.
|
||||
This can be temporarily used to force VCR to re-record
|
||||
a cassette (i.e. to ensure the responses are not out of date)
|
||||
or can be used when you simply want to log all HTTP requests.
|
||||
|
||||
## Advanced Features
|
||||
|
||||
If you want, VCR.py can return information about the cassette it is
|
||||
@@ -82,7 +176,75 @@ The Request object has the following properties
|
||||
* `protocol`: The protocol used to make the request (http or https)
|
||||
* `body`: The body of the request, usually empty except for POST / PUT / etc
|
||||
|
||||
## Register your own serializer
|
||||
|
||||
Don't like JSON or YAML? That's OK, VCR.py can serialize to any format
|
||||
you would like. Create your own module or class instance with 2 methods:
|
||||
|
||||
* `def deserialize(cassette_string)`
|
||||
* `def serialize(cassette_dict)`
|
||||
|
||||
Finally, register your class with VCR to use your
|
||||
new serializer.
|
||||
|
||||
```python
|
||||
import vcr
|
||||
|
||||
BogoSerializer(object):
|
||||
"""
|
||||
Must implement serialize() and deserialize() methods
|
||||
"""
|
||||
pass
|
||||
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_serializer('bogo', BogoSerializer())
|
||||
|
||||
with my_vcr.use_cassette('test.bogo', serializer='bogo'):
|
||||
# your http here
|
||||
|
||||
# After you register, you can set the default serializer to your new serializer
|
||||
|
||||
my_vcr.serializer = 'bogo'
|
||||
|
||||
with my_vcr.use_cassette('test.bogo'):
|
||||
# your http here
|
||||
|
||||
```
|
||||
|
||||
## Register your own request matcher
|
||||
|
||||
Create your own method with the following signature
|
||||
|
||||
```python
|
||||
def my_matcher(r1, r2):
|
||||
```
|
||||
|
||||
Your method receives the two requests and must return True if they
|
||||
match, False if they don't.
|
||||
|
||||
Finally, register your method with VCR to use your
|
||||
new request matcher.
|
||||
|
||||
```python
|
||||
import vcr
|
||||
|
||||
def jurassic_matcher(r1, r2):
|
||||
return r1.url == r2.url and 'JURASSIC PARK' in r1.body
|
||||
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher('jurassic', jurassic_matcher)
|
||||
|
||||
with my_vcr.use_cassette('test.yml', match_on=['jurassic']):
|
||||
# your http here
|
||||
|
||||
# After you register, you can set the default match_on to use your new matcher
|
||||
|
||||
my_vcr.match_on = ['jurassic']
|
||||
|
||||
with my_vcr.use_cassette('test.yml'):
|
||||
# your http here
|
||||
|
||||
```
|
||||
|
||||
##Installation
|
||||
|
||||
@@ -97,6 +259,24 @@ This library is a work in progress, so the API might change on you.
|
||||
There are probably some [bugs](https://github.com/kevin1024/vcrpy/issues?labels=bug&page=1&state=open) floating around too.
|
||||
|
||||
##Changelog
|
||||
* 0.3.0: *Backwards incompatible release* - Added support for record
|
||||
modes, and changed the default recording behavior to the "once" record
|
||||
mode. Please see the documentation on record modes for more. Added
|
||||
support for custom request matching, and changed the default request
|
||||
matching behavior to match only on the URL and method. Also,
|
||||
improved the httplib mocking to add support for the `HTTPConnection.send()`
|
||||
method. This means that requests won't actually be sent until the
|
||||
response is read, since I need to record the entire request in order
|
||||
to match up the appropriate response. I don't think this should cause
|
||||
any issues unless you are sending requests without ever loading the
|
||||
response (which none of the standard httplib wrappers do, as far as I
|
||||
know. Thanks to @fatuhoku for some of the ideas and the motivation
|
||||
behind this release.
|
||||
* 0.2.1: Fixed missing modules in setup.py
|
||||
* 0.2.0: Added configuration API, which lets you configure some settings
|
||||
on VCR (see the README). Also, VCR no longer saves cassettes if they
|
||||
haven't changed at all and supports JSON as well as YAML
|
||||
(thanks @sirpengi). Added amazing new skeumorphic logo, thanks @hairarrow.
|
||||
* 0.1.0: *backwards incompatible release - delete your old cassette files*:
|
||||
This release adds the ability to access the cassette to make assertions
|
||||
on it, as well as a major code refactor thanks to @dlecocq. It also
|
||||
|
||||
19
setup.py
19
setup.py
@@ -19,23 +19,30 @@ class PyTest(TestCommand):
|
||||
sys.exit(errno)
|
||||
|
||||
setup(name='vcrpy',
|
||||
version='0.1.0',
|
||||
version='0.3.0',
|
||||
description="A Python port of Ruby's VCR to make mocking HTTP easier",
|
||||
author='Kevin McCarthy',
|
||||
author_email='me@kevinmccarthy.org',
|
||||
url='https://github.com/kevin1024/vcrpy',
|
||||
packages=[
|
||||
packages = [
|
||||
'vcr',
|
||||
'vcr.stubs'],
|
||||
'vcr.stubs',
|
||||
'vcr.compat',
|
||||
'vcr.persisters',
|
||||
'vcr.serializers',
|
||||
],
|
||||
package_dir={
|
||||
'vcr': 'vcr',
|
||||
'vcr.stubs': 'vcr/stubs'},
|
||||
'vcr.stubs': 'vcr/stubs',
|
||||
'vcr.compat': 'vcr/compat',
|
||||
'vcr.persisters': 'vcr/persisters',
|
||||
},
|
||||
install_requires=['PyYAML'],
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
tests_require=['pytest','mock'],
|
||||
cmdclass={'test': PyTest},
|
||||
classifiers=[
|
||||
'Development Status :: 3 - Alpha',
|
||||
'Development Status :: 4 - Beta',
|
||||
'Environment :: Console',
|
||||
'Intended Audience :: Developers',
|
||||
'Programming Language :: Python',
|
||||
|
||||
@@ -2,6 +2,7 @@ def assert_cassette_empty(cass):
|
||||
assert len(cass) == 0
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def assert_cassette_has_one_response(cass):
|
||||
assert len(cass) == 1
|
||||
assert cass.play_count == 1
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
# coding=utf-8
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import unittest
|
||||
|
||||
|
||||
class TestVCR(unittest.TestCase):
|
||||
fixtures = os.path.join('does', 'not', 'exist')
|
||||
|
||||
def tearDown(self):
|
||||
# Remove the fixtures if they exist
|
||||
if os.path.exists(self.fixtures):
|
||||
shutil.rmtree(self.fixtures)
|
||||
|
||||
def fixture(self, *names):
|
||||
'''Return a path to the provided fixture'''
|
||||
return os.path.join(self.fixtures, *names)
|
||||
@@ -8,17 +8,19 @@ import urllib2
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
def test_nonexistent_directory(tmpdir):
|
||||
'''If we load a cassette in a nonexistent directory, it can save ok'''
|
||||
# Check to make sure directory doesnt exist
|
||||
assert not os.path.exists(str(tmpdir.join('nonexistent')))
|
||||
|
||||
# Run VCR to create dir and cassette file
|
||||
with vcr.use_cassette(str(tmpdir.join('nonexistent','cassette.yml'))):
|
||||
with vcr.use_cassette(str(tmpdir.join('nonexistent', 'cassette.yml'))):
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# This should have made the file and the directory
|
||||
assert os.path.exists(str(tmpdir.join('nonexistent','cassette.yml')))
|
||||
assert os.path.exists(str(tmpdir.join('nonexistent', 'cassette.yml')))
|
||||
|
||||
|
||||
def test_unpatch(tmpdir):
|
||||
'''Ensure that our cassette gets unpatched when we're done'''
|
||||
@@ -30,10 +32,70 @@ def test_unpatch(tmpdir):
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def test_basic_use(tmpdir):
|
||||
'''
|
||||
Copied from the docs
|
||||
Copied from the docs
|
||||
'''
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
response = urllib2.urlopen(
|
||||
'http://www.iana.org/domains/reserved'
|
||||
).read()
|
||||
assert 'Example domains' in response
|
||||
|
||||
|
||||
def test_basic_json_use(tmpdir):
|
||||
'''
|
||||
Ensure you can load a json serialized cassette
|
||||
'''
|
||||
test_fixture = 'fixtures/vcr_cassettes/synopsis.json'
|
||||
with vcr.use_cassette(test_fixture, serializer='json'):
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert 'difficult sometimes' in response
|
||||
|
||||
|
||||
def test_patched_content(tmpdir):
|
||||
'''
|
||||
Ensure that what you pull from a cassette is what came from the
|
||||
request
|
||||
'''
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
cass._save(force=True)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
|
||||
assert response == response2
|
||||
assert response2 == response3
|
||||
|
||||
|
||||
def test_patched_content_json(tmpdir):
|
||||
'''
|
||||
Ensure that what you pull from a json cassette is what came from the
|
||||
request
|
||||
'''
|
||||
|
||||
testfile = str(tmpdir.join('synopsis.json'))
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
cass._save(force=True)
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
|
||||
assert response == response2
|
||||
assert response2 == response3
|
||||
|
||||
36
tests/integration/test_config.py
Normal file
36
tests/integration/test_config.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
import json
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def test_set_serializer_default_config(tmpdir):
|
||||
my_vcr = vcr.VCR(serializer='json')
|
||||
|
||||
with my_vcr.use_cassette(str(tmpdir.join('test.json'))):
|
||||
assert my_vcr.serializer == 'json'
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
with open(str(tmpdir.join('test.json'))) as f:
|
||||
assert json.loads(f.read())
|
||||
|
||||
|
||||
def test_default_set_cassette_library_dir(tmpdir):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||
|
||||
with my_vcr.use_cassette('test.json'):
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||
|
||||
|
||||
def test_override_set_cassette_library_dir(tmpdir):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||
|
||||
cld = str(tmpdir.join('subdir2'))
|
||||
|
||||
with my_vcr.use_cassette('test.json', cassette_library_dir=cld):
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
assert os.path.exists(str(tmpdir.join('subdir2').join('test.json')))
|
||||
assert not os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||
55
tests/integration/test_disksaver.py
Normal file
55
tests/integration/test_disksaver.py
Normal file
@@ -0,0 +1,55 @@
|
||||
'''Basic tests about save behavior'''
|
||||
# coding=utf-8
|
||||
|
||||
# External imports
|
||||
import os
|
||||
import urllib2
|
||||
import time
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
def test_disk_saver_nowrite(tmpdir):
|
||||
'''
|
||||
Ensure that when you close a cassette without changing it it doesn't
|
||||
rewrite the file
|
||||
'''
|
||||
fname = str(tmpdir.join('synopsis.yaml'))
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 0
|
||||
last_mod = os.path.getmtime(fname)
|
||||
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 1
|
||||
assert cass.dirty is False
|
||||
last_mod2 = os.path.getmtime(fname)
|
||||
|
||||
assert last_mod == last_mod2
|
||||
|
||||
|
||||
def test_disk_saver_write(tmpdir):
|
||||
'''
|
||||
Ensure that when you close a cassette after changing it it does
|
||||
rewrite the file
|
||||
'''
|
||||
fname = str(tmpdir.join('synopsis.yaml'))
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 0
|
||||
last_mod = os.path.getmtime(fname)
|
||||
|
||||
# Make sure at least 1 second passes, otherwise sometimes
|
||||
# the mtime doesn't change
|
||||
time.sleep(1)
|
||||
|
||||
with vcr.use_cassette(fname, record_mode='any') as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
assert cass.dirty
|
||||
last_mod2 = os.path.getmtime(fname)
|
||||
|
||||
assert last_mod != last_mod2
|
||||
86
tests/integration/test_record_mode.py
Normal file
86
tests/integration/test_record_mode.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
import urllib2
|
||||
import pytest
|
||||
import vcr
|
||||
|
||||
|
||||
def test_once_record_mode(tmpdir):
|
||||
testfile = str(tmpdir.join('recordmode.yml'))
|
||||
with vcr.use_cassette(testfile, record_mode="once"):
|
||||
# cassette file doesn't exist, so create.
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
with vcr.use_cassette(testfile, record_mode="once") as cass:
|
||||
# make the same request again
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# the first time, it's played from the cassette.
|
||||
# but, try to access something else from the same cassette, and an
|
||||
# exception is raised.
|
||||
with pytest.raises(Exception):
|
||||
response = urllib2.urlopen('http://httpbin.org/get').read()
|
||||
|
||||
|
||||
def test_new_episodes_record_mode(tmpdir):
|
||||
testfile = str(tmpdir.join('recordmode.yml'))
|
||||
|
||||
with vcr.use_cassette(testfile, record_mode="new_episodes"):
|
||||
# cassette file doesn't exist, so create.
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
with vcr.use_cassette(testfile, record_mode="new_episodes") as cass:
|
||||
# make the same request again
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# in the "new_episodes" record mode, we can add more requests to
|
||||
# a cassette without repurcussions.
|
||||
response = urllib2.urlopen('http://httpbin.org/get').read()
|
||||
|
||||
# the first interaction was not re-recorded, but the second was added
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
def test_all_record_mode(tmpdir):
|
||||
testfile = str(tmpdir.join('recordmode.yml'))
|
||||
|
||||
with vcr.use_cassette(testfile, record_mode="all"):
|
||||
# cassette file doesn't exist, so create.
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
with vcr.use_cassette(testfile, record_mode="all") as cass:
|
||||
# make the same request again
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# in the "all" record mode, we can add more requests to
|
||||
# a cassette without repurcussions.
|
||||
response = urllib2.urlopen('http://httpbin.org/get').read()
|
||||
|
||||
# The cassette was never actually played, even though it existed.
|
||||
# that's because, in "all" mode, the requests all go directly to
|
||||
# the source and bypass the cassette.
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def test_none_record_mode(tmpdir):
|
||||
# Cassette file doesn't exist, yet we are trying to make a request.
|
||||
# raise hell.
|
||||
testfile = str(tmpdir.join('recordmode.yml'))
|
||||
with vcr.use_cassette(testfile, record_mode="none"):
|
||||
with pytest.raises(Exception):
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
|
||||
def test_none_record_mode_with_existing_cassette(tmpdir):
|
||||
# create a cassette file
|
||||
testfile = str(tmpdir.join('recordmode.yml'))
|
||||
|
||||
with vcr.use_cassette(testfile, record_mode="all"):
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# play from cassette file
|
||||
with vcr.use_cassette(testfile, record_mode="none") as cass:
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
# but if I try to hit the net, raise an exception.
|
||||
with pytest.raises(Exception):
|
||||
response = urllib2.urlopen('http://httpbin.org/get').read()
|
||||
32
tests/integration/test_register_matcher.py
Normal file
32
tests/integration/test_register_matcher.py
Normal file
@@ -0,0 +1,32 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def true_matcher(r1, r2):
|
||||
return True
|
||||
|
||||
|
||||
def false_matcher(r1, r2):
|
||||
return False
|
||||
|
||||
|
||||
def test_registered_serializer_true_matcher(tmpdir):
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher('true', true_matcher)
|
||||
testfile = str(tmpdir.join('test.yml'))
|
||||
with my_vcr.use_cassette(testfile, match_on=['true']) as cass:
|
||||
# These 2 different urls are stored as the same request
|
||||
urllib2.urlopen('http://httpbin.org/')
|
||||
urllib2.urlopen('https://httpbin.org/get')
|
||||
assert len(cass) == 1
|
||||
|
||||
|
||||
def test_registered_serializer_false_matcher(tmpdir):
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_matcher('false', false_matcher)
|
||||
testfile = str(tmpdir.join('test.yml'))
|
||||
with my_vcr.use_cassette(testfile, match_on=['false']) as cass:
|
||||
# These 2 different urls are stored as different requests
|
||||
urllib2.urlopen('http://httpbin.org/')
|
||||
urllib2.urlopen('https://httpbin.org/get')
|
||||
assert len(cass) == 2
|
||||
35
tests/integration/test_register_serializer.py
Normal file
35
tests/integration/test_register_serializer.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
class MockSerializer(object):
|
||||
def __init__(self):
|
||||
self.serialize_count = 0
|
||||
self.deserialize_count = 0
|
||||
self.load_args = None
|
||||
|
||||
def deserialize(self, cassette_string):
|
||||
self.serialize_count += 1
|
||||
self.cassette_string = cassette_string
|
||||
return ([], [])
|
||||
|
||||
def serialize(self, cassette_dict):
|
||||
self.deserialize_count += 1
|
||||
return ""
|
||||
|
||||
|
||||
def test_registered_serializer(tmpdir):
|
||||
ms = MockSerializer()
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_serializer('mock', ms)
|
||||
tmpdir.join('test.mock').write('test_data')
|
||||
with my_vcr.use_cassette(str(tmpdir.join('test.mock')), serializer='mock'):
|
||||
urllib2.urlopen('http://httpbin.org/')
|
||||
# Serializer deserialized once
|
||||
assert ms.serialize_count == 1
|
||||
# and serialized the test data string
|
||||
assert ms.cassette_string == 'test_data'
|
||||
# and hasn't serialized yet
|
||||
assert ms.deserialize_count == 0
|
||||
|
||||
assert ms.serialize_count == 1
|
||||
@@ -1,12 +1,11 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def test_recorded_request_url_with_redirected_request(tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
|
||||
assert len(cass) == 0
|
||||
urllib2.urlopen('http://google.com')
|
||||
print cass.requests
|
||||
print cass.requests[0]
|
||||
assert cass.requests[0].url == 'http://google.com'
|
||||
assert cass.requests[1].url == 'http://www.google.com/'
|
||||
assert len(cass) == 2
|
||||
urllib2.urlopen('http://httpbin.org/redirect/3')
|
||||
assert cass.requests[0].url == 'http://httpbin.org/redirect/3'
|
||||
assert cass.requests[3].url == 'http://httpbin.org/get'
|
||||
assert len(cass) == 4
|
||||
|
||||
@@ -2,18 +2,14 @@
|
||||
|
||||
# coding=utf-8
|
||||
|
||||
# Internal imports
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
import vcr
|
||||
|
||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||
|
||||
requests = pytest.importorskip("requests")
|
||||
|
||||
@pytest.fixture(params=["https","http"])
|
||||
|
||||
@pytest.fixture(params=["https", "http"])
|
||||
def scheme(request):
|
||||
"""
|
||||
Fixture that returns both http and https
|
||||
@@ -31,6 +27,7 @@ def test_status_code(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_headers(scheme, tmpdir):
|
||||
'''Ensure that we can read the headers back'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
@@ -41,6 +38,7 @@ def test_headers(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_body(tmpdir, scheme):
|
||||
'''Ensure the responses are all identical enough'''
|
||||
url = scheme + '://httpbin.org/bytes/1024'
|
||||
@@ -51,6 +49,7 @@ def test_body(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_auth(tmpdir, scheme):
|
||||
'''Ensure that we can handle basic auth'''
|
||||
auth = ('user', 'passwd')
|
||||
@@ -65,6 +64,7 @@ def test_auth(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_auth_failed(tmpdir, scheme):
|
||||
'''Ensure that we can save failed auth statuses'''
|
||||
auth = ('user', 'wrongwrongwrong')
|
||||
@@ -79,21 +79,25 @@ def test_auth_failed(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_post(tmpdir, scheme):
|
||||
'''Ensure that we can post and cache the results'''
|
||||
data = {'key1': 'value1', 'key2': 'value2'}
|
||||
url = scheme + '://httpbin.org/post'
|
||||
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cass:
|
||||
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert requests.post(url, data).content == requests.post(url, data).content
|
||||
req1 = requests.post(url, data).content
|
||||
req2 = requests.post(url, data).content
|
||||
assert req1 == req2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_redirects(tmpdir, scheme):
|
||||
'''Ensure that we can handle redirects'''
|
||||
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
|
||||
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cass:
|
||||
with vcr.use_cassette(str(tmpdir.join('requests.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert requests.get(url).content == requests.get(url).content
|
||||
@@ -102,6 +106,7 @@ def test_redirects(tmpdir, scheme):
|
||||
assert len(cass) == 2
|
||||
assert cass.play_count == 2
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir, scheme):
|
||||
'''Ensure that requests between schemes are treated separately'''
|
||||
# First fetch a url under http, and then again under https and then
|
||||
@@ -112,4 +117,3 @@ def test_cross_scheme(tmpdir, scheme):
|
||||
requests.get('http://httpbin.org/')
|
||||
assert cass.play_count == 0
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
@@ -12,13 +12,15 @@ import vcr
|
||||
|
||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||
|
||||
@pytest.fixture(params=["https","http"])
|
||||
|
||||
@pytest.fixture(params=["https", "http"])
|
||||
def scheme(request):
|
||||
"""
|
||||
Fixture that returns both http and https
|
||||
"""
|
||||
return request.param
|
||||
|
||||
|
||||
def test_response_code(scheme, tmpdir):
|
||||
'''Ensure we can read a response code from a fetch'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
@@ -29,6 +31,7 @@ def test_response_code(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_random_body(scheme, tmpdir):
|
||||
'''Ensure we can read the content, and that it's served from cache'''
|
||||
url = scheme + '://httpbin.org/bytes/1024'
|
||||
@@ -39,16 +42,20 @@ def test_random_body(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_response_headers(scheme, tmpdir):
|
||||
'''Ensure we can get information from the response'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert urllib2.urlopen(url).info().items() == urllib2.urlopen(url).info().items()
|
||||
open1 = urllib2.urlopen(url).info().items()
|
||||
open2 = urllib2.urlopen(url).info().items()
|
||||
assert open1 == open2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_multiple_requests(scheme, tmpdir):
|
||||
'''Ensure that we can cache multiple requests'''
|
||||
urls = [
|
||||
@@ -65,6 +72,7 @@ def test_multiple_requests(scheme, tmpdir):
|
||||
assert len(cass) == index + 1
|
||||
assert cass.play_count == index + 1
|
||||
|
||||
|
||||
def test_get_data(scheme, tmpdir):
|
||||
'''Ensure that it works with query data'''
|
||||
data = urlencode({'some': 1, 'data': 'here'})
|
||||
@@ -72,13 +80,14 @@ def test_get_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url).read()
|
||||
res1 = urllib2.urlopen(url).read()
|
||||
res2 = urllib2.urlopen(url).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert len(cass) == 1
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
def test_post_data(scheme, tmpdir):
|
||||
'''Ensure that it works when posting data'''
|
||||
data = urlencode({'some': 1, 'data': 'here'})
|
||||
@@ -86,12 +95,13 @@ def test_post_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res2 = urllib2.urlopen(url, data).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_post_unicode_data(scheme, tmpdir):
|
||||
'''Ensure that it works when posting unicode data'''
|
||||
data = urlencode({'snowman': u'☃'.encode('utf-8')})
|
||||
@@ -99,12 +109,13 @@ def test_post_unicode_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res2 = urllib2.urlopen(url, data).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir):
|
||||
'''Ensure that requests between schemes are treated separately'''
|
||||
# First fetch a url under https, and then again under https and then
|
||||
|
||||
@@ -3,6 +3,9 @@ requests = pytest.importorskip("requests")
|
||||
|
||||
import vcr
|
||||
|
||||
import httplib
|
||||
|
||||
|
||||
def test_domain_redirect():
|
||||
'''Ensure that redirects across domains are considered unique'''
|
||||
# In this example, seomoz.org redirects to moz.com, and if those
|
||||
@@ -15,3 +18,30 @@ def test_domain_redirect():
|
||||
# redirect, and a second for the actual fetch
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
def test_flickr_multipart_upload():
|
||||
"""
|
||||
The python-flickr-api project does a multipart
|
||||
upload that confuses vcrpy
|
||||
"""
|
||||
def _pretend_to_be_flickr_library():
|
||||
content_type, body = "text/plain", "HELLO WORLD"
|
||||
h = httplib.HTTPConnection("httpbin.org")
|
||||
headers = {
|
||||
"Content-Type": content_type,
|
||||
"content-length": str(len(body))
|
||||
}
|
||||
h.request("POST", "/post/", headers=headers)
|
||||
h.send(body)
|
||||
r = h.getresponse()
|
||||
data = r.read()
|
||||
h.close()
|
||||
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/flickr.json') as cass:
|
||||
_pretend_to_be_flickr_library()
|
||||
assert len(cass) == 1
|
||||
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/flickr.json') as cass:
|
||||
assert len(cass) == 1
|
||||
_pretend_to_be_flickr_library()
|
||||
assert cass.play_count == 1
|
||||
|
||||
@@ -1,25 +1,30 @@
|
||||
import pytest
|
||||
import yaml
|
||||
import mock
|
||||
from vcr.cassette import Cassette
|
||||
|
||||
|
||||
def test_cassette_load(tmpdir):
|
||||
a_file = tmpdir.join('test_cassette.yml')
|
||||
a_file.write(yaml.dump([
|
||||
{'request':'foo', 'response':'bar'}
|
||||
{'request': 'foo', 'response': 'bar'}
|
||||
]))
|
||||
a_cassette = Cassette.load(str(a_file))
|
||||
assert len(a_cassette) == 1
|
||||
|
||||
|
||||
def test_cassette_not_played():
|
||||
a = Cassette('test')
|
||||
assert not a.play_count
|
||||
|
||||
|
||||
def test_cassette_played():
|
||||
a = Cassette('test')
|
||||
a.mark_played('foo')
|
||||
a.mark_played('foo')
|
||||
assert a.play_count == 2
|
||||
|
||||
|
||||
def test_cassette_play_counter():
|
||||
a = Cassette('test')
|
||||
a.mark_played('foo')
|
||||
@@ -27,28 +32,39 @@ def test_cassette_play_counter():
|
||||
assert a.play_counts['foo'] == 1
|
||||
assert a.play_counts['bar'] == 1
|
||||
|
||||
|
||||
def test_cassette_append():
|
||||
a = Cassette('test')
|
||||
a.append('foo', 'bar')
|
||||
assert a.requests == ['foo']
|
||||
assert a.responses == ['bar']
|
||||
|
||||
|
||||
def test_cassette_len():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo2','bar2')
|
||||
a.append('foo', 'bar')
|
||||
a.append('foo2', 'bar2')
|
||||
assert len(a) == 2
|
||||
|
||||
|
||||
def _mock_requests_match(request1, request2, matchers):
|
||||
return request1 == request2
|
||||
|
||||
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
|
||||
def test_cassette_contains():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo', 'bar')
|
||||
assert 'foo' in a
|
||||
|
||||
|
||||
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
|
||||
def test_cassette_response_of():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo', 'bar')
|
||||
assert a.response_of('foo') == 'bar'
|
||||
|
||||
|
||||
@mock.patch('vcr.cassette.requests_match', _mock_requests_match)
|
||||
def test_cassette_get_missing_response():
|
||||
a = Cassette('test')
|
||||
with pytest.raises(KeyError):
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from vcr.request import Request
|
||||
|
||||
|
||||
def test_url():
|
||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
||||
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||
assert req.url == 'http://www.google.com/'
|
||||
|
||||
|
||||
def test_str():
|
||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
||||
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||
str(req) == '<Request (GET) http://www.google.com>'
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
# Import Cassette to make it available at the top level
|
||||
from .cassette import Cassette
|
||||
from config import VCR
|
||||
|
||||
default_vcr = VCR()
|
||||
|
||||
|
||||
# Also, make a 'load' function available
|
||||
def use_cassette(path):
|
||||
return Cassette.load(path)
|
||||
def use_cassette(path, **kwargs):
|
||||
return default_vcr.use_cassette(path, **kwargs)
|
||||
|
||||
103
vcr/cassette.py
103
vcr/cassette.py
@@ -1,7 +1,5 @@
|
||||
'''The container for recorded requests and responses'''
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
try:
|
||||
from collections import Counter, OrderedDict
|
||||
except ImportError:
|
||||
@@ -11,25 +9,34 @@ except ImportError:
|
||||
# Internal imports
|
||||
from .patch import install, reset
|
||||
from .persist import load_cassette, save_cassette
|
||||
from .serializers import yamlserializer
|
||||
from .matchers import requests_match, url, method
|
||||
|
||||
|
||||
class Cassette(object):
|
||||
'''A container for recorded requests and responses'''
|
||||
|
||||
@classmethod
|
||||
def load(cls, path):
|
||||
def load(cls, path, **kwargs):
|
||||
'''Load in the cassette stored at the provided path'''
|
||||
new_cassette = cls(path)
|
||||
try:
|
||||
requests, responses = load_cassette(path)
|
||||
for request, response in zip(requests, responses):
|
||||
new_cassette.append(request, response)
|
||||
except IOError:
|
||||
pass
|
||||
new_cassette = cls(path, **kwargs)
|
||||
new_cassette._load()
|
||||
return new_cassette
|
||||
|
||||
def __init__(self, path):
|
||||
def __init__(self,
|
||||
path,
|
||||
serializer=yamlserializer,
|
||||
record_mode='once',
|
||||
match_on=[url, method]):
|
||||
self._path = path
|
||||
self.data = OrderedDict()
|
||||
self._serializer = serializer
|
||||
self._match_on = match_on
|
||||
|
||||
# self.data is the list of (req, resp) tuples
|
||||
self.data = []
|
||||
self.play_counts = Counter()
|
||||
self.dirty = False
|
||||
self.record_mode = record_mode
|
||||
|
||||
@property
|
||||
def play_count(self):
|
||||
@@ -37,11 +44,25 @@ class Cassette(object):
|
||||
|
||||
@property
|
||||
def requests(self):
|
||||
return self.data.keys()
|
||||
return [request for (request, response) in self.data]
|
||||
|
||||
@property
|
||||
def responses(self):
|
||||
return self.data.values()
|
||||
return [response for (request, response) in self.data]
|
||||
|
||||
@property
|
||||
def rewound(self):
|
||||
"""
|
||||
If the cassette has already been recorded in another session, and has
|
||||
been loaded again fresh from disk, it has been "rewound". This means
|
||||
that it should be write-only, depending on the record mode specified
|
||||
"""
|
||||
return not self.dirty and self.play_count
|
||||
|
||||
@property
|
||||
def write_protected(self):
|
||||
return self.rewound and self.record_mode == 'once' or \
|
||||
self.record_mode == 'none'
|
||||
|
||||
def mark_played(self, request):
|
||||
'''
|
||||
@@ -51,17 +72,54 @@ class Cassette(object):
|
||||
|
||||
def append(self, request, response):
|
||||
'''Add a request, response pair to this cassette'''
|
||||
self.data[request] = response
|
||||
self.data.append((request, response))
|
||||
self.dirty = True
|
||||
|
||||
def response_of(self, request):
|
||||
'''Find the response corresponding to a request'''
|
||||
return self.data[request]
|
||||
'''
|
||||
Find the response corresponding to a request
|
||||
|
||||
def _save(self):
|
||||
save_cassette(self._path, self.requests, self.responses)
|
||||
'''
|
||||
responses = []
|
||||
for stored_request, response in self.data:
|
||||
if requests_match(request, stored_request, self._match_on):
|
||||
responses.append(response)
|
||||
index = self.play_counts[request]
|
||||
try:
|
||||
return responses[index]
|
||||
except IndexError:
|
||||
# I decided that a KeyError is the best exception to raise
|
||||
# if the cassette doesn't contain the request asked for.
|
||||
raise KeyError
|
||||
|
||||
def _as_dict(self):
|
||||
return {"requests": self.requests, "responses": self.responses}
|
||||
|
||||
def _save(self, force=False):
|
||||
if force or self.dirty:
|
||||
save_cassette(
|
||||
self._path,
|
||||
self._as_dict(),
|
||||
serializer=self._serializer
|
||||
)
|
||||
self.dirty = False
|
||||
|
||||
def _load(self):
|
||||
try:
|
||||
requests, responses = load_cassette(
|
||||
self._path,
|
||||
serializer=self._serializer
|
||||
)
|
||||
for request, response in zip(requests, responses):
|
||||
self.append(request, response)
|
||||
self.dirty = False
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "<Cassette containing {0} recorded response(s)>".format(len(self))
|
||||
return "<Cassette containing {0} recorded response(s)>".format(
|
||||
len(self)
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
'''Return the number of request,response pairs stored in here'''
|
||||
@@ -69,7 +127,10 @@ class Cassette(object):
|
||||
|
||||
def __contains__(self, request):
|
||||
'''Return whether or not a request has been stored'''
|
||||
return request in self.data
|
||||
for stored_request, response in self.data:
|
||||
if requests_match(stored_request, request, self._match_on):
|
||||
return True
|
||||
return False
|
||||
|
||||
def __enter__(self):
|
||||
'''Patch the fetching libraries we know about'''
|
||||
|
||||
72
vcr/config.py
Normal file
72
vcr/config.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import os
|
||||
from .cassette import Cassette
|
||||
from .serializers import yamlserializer, jsonserializer
|
||||
from .matchers import method, url, host, path, headers, body
|
||||
|
||||
|
||||
class VCR(object):
|
||||
def __init__(self,
|
||||
serializer='yaml',
|
||||
cassette_library_dir=None,
|
||||
record_mode="once"):
|
||||
self.serializer = serializer
|
||||
self.match_on = ['url', 'method']
|
||||
self.cassette_library_dir = cassette_library_dir
|
||||
self.serializers = {
|
||||
'yaml': yamlserializer,
|
||||
'json': jsonserializer,
|
||||
}
|
||||
self.matchers = {
|
||||
'method': method,
|
||||
'url': url,
|
||||
'host': host,
|
||||
'path': path,
|
||||
'headers': headers,
|
||||
'body': body,
|
||||
}
|
||||
self.record_mode = record_mode
|
||||
|
||||
def _get_serializer(self, serializer_name):
|
||||
try:
|
||||
serializer = self.serializers[serializer_name]
|
||||
except KeyError:
|
||||
print "Serializer {0} doesn't exist or isn't registered".format(
|
||||
serializer_name
|
||||
)
|
||||
raise KeyError
|
||||
return serializer
|
||||
|
||||
def _get_matchers(self, matcher_names):
|
||||
try:
|
||||
matchers = [self.matchers[m] for m in matcher_names]
|
||||
except KeyError:
|
||||
print "Matcher {0} doesn't exist or isn't registered".format(
|
||||
matcher_name
|
||||
)
|
||||
raise KeyError
|
||||
return matchers
|
||||
|
||||
def use_cassette(self, path, **kwargs):
|
||||
serializer_name = kwargs.get('serializer', self.serializer)
|
||||
matcher_names = kwargs.get('match_on', self.match_on)
|
||||
cassette_library_dir = kwargs.get(
|
||||
'cassette_library_dir',
|
||||
self.cassette_library_dir
|
||||
)
|
||||
|
||||
if cassette_library_dir:
|
||||
path = os.path.join(cassette_library_dir, path)
|
||||
|
||||
merged_config = {
|
||||
"serializer": self._get_serializer(serializer_name),
|
||||
"match_on": self._get_matchers(matcher_names),
|
||||
"record_mode": kwargs.get('record_mode', self.record_mode),
|
||||
}
|
||||
|
||||
return Cassette.load(path, **merged_config)
|
||||
|
||||
def register_serializer(self, name, serializer):
|
||||
self.serializers[name] = serializer
|
||||
|
||||
def register_matcher(self, name, matcher):
|
||||
self.matchers[name] = matcher
|
||||
26
vcr/matchers.py
Normal file
26
vcr/matchers.py
Normal file
@@ -0,0 +1,26 @@
|
||||
def method(r1, r2):
|
||||
return r1.method == r2.method
|
||||
|
||||
|
||||
def url(r1, r2):
|
||||
return r1.url == r2.url
|
||||
|
||||
|
||||
def host(r1, r2):
|
||||
return r1.host == r2.host
|
||||
|
||||
|
||||
def path(r1, r2):
|
||||
return r1.path == r2.path
|
||||
|
||||
|
||||
def body(r1, r2):
|
||||
return r1.body == r2.body
|
||||
|
||||
|
||||
def headers(r1, r2):
|
||||
return r1.headers == r2.headers
|
||||
|
||||
|
||||
def requests_match(r1, r2, matchers):
|
||||
return all(m(r1, r2) for m in matchers)
|
||||
@@ -25,7 +25,11 @@ except ImportError: # pragma: no cover
|
||||
|
||||
|
||||
def install(cassette):
|
||||
'''Install a cassette in lieu of actuall fetching'''
|
||||
"""
|
||||
Patch all the HTTPConnections references we can find!
|
||||
This replaces the actual HTTPConnection with a VCRHTTPConnection
|
||||
object which knows how to save to / read from cassettes
|
||||
"""
|
||||
httplib.HTTPConnection = httplib.HTTP._connection_class = VCRHTTPConnection
|
||||
httplib.HTTPSConnection = httplib.HTTPS._connection_class = (
|
||||
VCRHTTPSConnection)
|
||||
@@ -59,7 +63,7 @@ def reset():
|
||||
'''Undo all the patching'''
|
||||
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
|
||||
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
|
||||
_HTTPSConnection
|
||||
_HTTPSConnection
|
||||
try:
|
||||
import requests.packages.urllib3.connectionpool as cpool
|
||||
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
|
||||
|
||||
@@ -1,44 +1,11 @@
|
||||
import tempfile
|
||||
import os
|
||||
import yaml
|
||||
from .persisters.filesystem import FilesystemPersister
|
||||
|
||||
# Use the libYAML versions if possible
|
||||
try:
|
||||
from yaml import CLoader as Loader, CDumper as Dumper
|
||||
except ImportError:
|
||||
from yaml import Loader, Dumper
|
||||
|
||||
def _serialize_cassette(requests, responses):
|
||||
'''Return a serializable version of the cassette'''
|
||||
return ([{
|
||||
'request': request,
|
||||
'response': response,
|
||||
} for request, response in zip(requests, responses)])
|
||||
def load_cassette(cassette_path, serializer):
|
||||
with open(cassette_path) as f:
|
||||
return serializer.deserialize(f.read())
|
||||
|
||||
def _deserialize_cassette(data):
|
||||
requests = [r['request'] for r in data]
|
||||
responses = [r['response'] for r in data]
|
||||
return requests, responses
|
||||
|
||||
def _secure_write(path, contents):
|
||||
"""
|
||||
We'll overwrite the old version securely by writing out a temporary
|
||||
version and then moving it to replace the old version
|
||||
"""
|
||||
dirname, filename = os.path.split(path)
|
||||
fd, name = tempfile.mkstemp(dir=dirname, prefix=filename)
|
||||
with os.fdopen(fd, 'w') as fout:
|
||||
fout.write(contents)
|
||||
os.rename(name, path)
|
||||
|
||||
def load_cassette(cassette_path):
|
||||
data = yaml.load(open(cassette_path), Loader=Loader)
|
||||
return _deserialize_cassette(data)
|
||||
|
||||
def save_cassette(cassette_path, requests, responses):
|
||||
dirname, filename = os.path.split(cassette_path)
|
||||
if dirname and not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
data = _serialize_cassette(requests, responses)
|
||||
data = yaml.dump(data, Dumper=Dumper)
|
||||
_secure_write(cassette_path, data)
|
||||
def save_cassette(cassette_path, cassette_dict, serializer):
|
||||
data = serializer.serialize(cassette_dict)
|
||||
FilesystemPersister.write(cassette_path, data)
|
||||
|
||||
0
vcr/persisters/__init__.py
Normal file
0
vcr/persisters/__init__.py
Normal file
23
vcr/persisters/filesystem.py
Normal file
23
vcr/persisters/filesystem.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
class FilesystemPersister(object):
|
||||
@classmethod
|
||||
def _secure_write(cls, path, contents):
|
||||
"""
|
||||
We'll overwrite the old version securely by writing out a temporary
|
||||
version and then moving it to replace the old version
|
||||
"""
|
||||
dirname, filename = os.path.split(path)
|
||||
fd, name = tempfile.mkstemp(dir=dirname, prefix=filename)
|
||||
with os.fdopen(fd, 'w') as fout:
|
||||
fout.write(contents)
|
||||
os.rename(name, path)
|
||||
|
||||
@classmethod
|
||||
def write(cls, cassette_path, data):
|
||||
dirname, filename = os.path.split(cassette_path)
|
||||
if dirname and not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
cls._secure_write(cassette_path, data)
|
||||
@@ -7,7 +7,7 @@ class Request(object):
|
||||
self.method = method
|
||||
self.path = path
|
||||
self.body = body
|
||||
# make haders a frozenset so it will be hashable
|
||||
# make headers a frozenset so it will be hashable
|
||||
self.headers = frozenset(headers.items())
|
||||
|
||||
@property
|
||||
@@ -15,7 +15,14 @@ class Request(object):
|
||||
return "{0}://{1}{2}".format(self.protocol, self.host, self.path)
|
||||
|
||||
def __key(self):
|
||||
return (self.host, self.port, self.method, self.path, self.body, self.headers)
|
||||
return (
|
||||
self.host,
|
||||
self.port,
|
||||
self.method,
|
||||
self.path,
|
||||
self.body,
|
||||
self.headers
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.__key())
|
||||
@@ -28,3 +35,18 @@ class Request(object):
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def _to_dict(self):
|
||||
return {
|
||||
'protocol': self.protocol,
|
||||
'host': self.host,
|
||||
'port': self.port,
|
||||
'method': self.method,
|
||||
'path': self.path,
|
||||
'body': self.body,
|
||||
'headers': self.headers,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _from_dict(cls, dct):
|
||||
return Request(**dct)
|
||||
|
||||
0
vcr/serializers/__init__.py
Normal file
0
vcr/serializers/__init__.py
Normal file
34
vcr/serializers/jsonserializer.py
Normal file
34
vcr/serializers/jsonserializer.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from vcr.request import Request
|
||||
try:
|
||||
import simplejson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
|
||||
def _json_default(obj):
|
||||
if isinstance(obj, frozenset):
|
||||
return dict(obj)
|
||||
return obj
|
||||
|
||||
|
||||
def _fix_response_unicode(d):
|
||||
d['body']['string'] = d['body']['string'].encode('utf-8')
|
||||
return d
|
||||
|
||||
|
||||
def deserialize(cassette_string):
|
||||
data = json.loads(cassette_string)
|
||||
requests = [Request._from_dict(r['request']) for r in data]
|
||||
responses = [_fix_response_unicode(r['response']) for r in data]
|
||||
return requests, responses
|
||||
|
||||
|
||||
def serialize(cassette_dict):
|
||||
data = ([{
|
||||
'request': request._to_dict(),
|
||||
'response': response,
|
||||
} for request, response in zip(
|
||||
cassette_dict['requests'],
|
||||
cassette_dict['responses']
|
||||
)])
|
||||
return json.dumps(data, indent=4, default=_json_default)
|
||||
25
vcr/serializers/yamlserializer.py
Normal file
25
vcr/serializers/yamlserializer.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import yaml
|
||||
|
||||
# Use the libYAML versions if possible
|
||||
try:
|
||||
from yaml import CLoader as Loader, CDumper as Dumper
|
||||
except ImportError:
|
||||
from yaml import Loader, Dumper
|
||||
|
||||
|
||||
def deserialize(cassette_string):
|
||||
data = yaml.load(cassette_string, Loader=Loader)
|
||||
requests = [r['request'] for r in data]
|
||||
responses = [r['response'] for r in data]
|
||||
return requests, responses
|
||||
|
||||
|
||||
def serialize(cassette_dict):
|
||||
data = ([{
|
||||
'request': request,
|
||||
'response': response,
|
||||
} for request, response in zip(
|
||||
cassette_dict['requests'],
|
||||
cassette_dict['responses']
|
||||
)])
|
||||
return yaml.dump(data, Dumper=Dumper)
|
||||
@@ -18,7 +18,7 @@ class VCRHTTPResponse(object):
|
||||
self._content = StringIO(self.recorded_response['body']['string'])
|
||||
|
||||
# We are skipping the header parsing (they have already been parsed
|
||||
# at this point) and directly adding the headers to the header
|
||||
# at this point) and directly adding the headers to the header
|
||||
# container, so just pass an empty StringIO.
|
||||
self.msg = HTTPMessage(StringIO(''))
|
||||
|
||||
@@ -55,40 +55,125 @@ class VCRConnectionMixin:
|
||||
def request(self, method, url, body=None, headers=None):
|
||||
'''Persist the request metadata in self._vcr_request'''
|
||||
self._vcr_request = Request(
|
||||
protocol = self._protocol,
|
||||
host = self.host,
|
||||
port = self.port,
|
||||
method = method,
|
||||
path = url,
|
||||
body = body,
|
||||
headers = headers or {}
|
||||
protocol=self._protocol,
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
method=method,
|
||||
path=url,
|
||||
body=body,
|
||||
headers=headers or {}
|
||||
)
|
||||
|
||||
# Check if we have a cassette set, and if we have a response saved.
|
||||
# If so, there's no need to keep processing and we can bail
|
||||
if self.cassette and self._vcr_request in self.cassette:
|
||||
return
|
||||
# Note: The request may not actually be finished at this point, so
|
||||
# I'm not sending the actual request until getresponse(). This
|
||||
# allows me to compare the entire length of the response to see if it
|
||||
# exists in the cassette.
|
||||
|
||||
# Otherwise, we should submit the request
|
||||
self._baseclass.request(
|
||||
self, method, url, body=body, headers=headers or {})
|
||||
def send(self, data):
|
||||
'''
|
||||
This method is called after request(), to add additional data to the
|
||||
body of the request. So if that happens, let's just append the data
|
||||
onto the most recent request in the cassette.
|
||||
'''
|
||||
self._vcr_request.body = (self._vcr_request.body or '') + data
|
||||
|
||||
def _send_request(self, method, url, body, headers):
|
||||
"""
|
||||
Coppy+pasted from python stdlib 2.6 source because it
|
||||
has a call to self.send() which I have overridden
|
||||
#stdlibproblems #fml
|
||||
"""
|
||||
header_names = dict.fromkeys([k.lower() for k in headers])
|
||||
skips = {}
|
||||
if 'host' in header_names:
|
||||
skips['skip_host'] = 1
|
||||
if 'accept-encoding' in header_names:
|
||||
skips['skip_accept_encoding'] = 1
|
||||
|
||||
self.putrequest(method, url, **skips)
|
||||
|
||||
if body and ('content-length' not in header_names):
|
||||
thelen = None
|
||||
try:
|
||||
thelen = str(len(body))
|
||||
except TypeError, te:
|
||||
# If this is a file-like object, try to
|
||||
# fstat its file descriptor
|
||||
import os
|
||||
try:
|
||||
thelen = str(os.fstat(body.fileno()).st_size)
|
||||
except (AttributeError, OSError):
|
||||
# Don't send a length if this failed
|
||||
if self.debuglevel > 0:
|
||||
print "Cannot stat!!"
|
||||
|
||||
if thelen is not None:
|
||||
self.putheader('Content-Length', thelen)
|
||||
for hdr, value in headers.iteritems():
|
||||
self.putheader(hdr, value)
|
||||
self.endheaders()
|
||||
|
||||
if body:
|
||||
self._baseclass.send(self, body)
|
||||
|
||||
def _send_output(self, message_body=None):
|
||||
"""
|
||||
Copy-and-pasted from httplib, just so I can modify the self.send()
|
||||
calls to call the superclass's send(), since I had to override the
|
||||
send() behavior, since send() is both an external and internal
|
||||
httplib API.
|
||||
"""
|
||||
self._buffer.extend(("", ""))
|
||||
msg = "\r\n".join(self._buffer)
|
||||
del self._buffer[:]
|
||||
# If msg and message_body are sent in a single send() call,
|
||||
# it will avoid performance problems caused by the interaction
|
||||
# between delayed ack and the Nagle algorithm.
|
||||
if isinstance(message_body, str):
|
||||
msg += message_body
|
||||
message_body = None
|
||||
self._baseclass.send(self, msg)
|
||||
if message_body is not None:
|
||||
#message_body was not a string (i.e. it is a file) and
|
||||
#we must run the risk of Nagle
|
||||
self._baseclass.send(self, message_body)
|
||||
|
||||
def getresponse(self, _=False):
|
||||
'''Retrieve a the response'''
|
||||
# Check to see if the cassette has a response for this request. If so,
|
||||
# then return it
|
||||
if self._vcr_request in self.cassette:
|
||||
if self._vcr_request in self.cassette and \
|
||||
self.cassette.record_mode != "all":
|
||||
response = self.cassette.response_of(self._vcr_request)
|
||||
# Alert the cassette to the fact that we've served another
|
||||
# response for the provided requests
|
||||
self.cassette.mark_played(self._vcr_request)
|
||||
return VCRHTTPResponse(response)
|
||||
else:
|
||||
# Otherwise, we made an actual request, and should return the response
|
||||
# we got from the actual connection
|
||||
response = HTTPConnection.getresponse(self)
|
||||
if self.cassette.write_protected:
|
||||
raise Exception("cassette is write protected")
|
||||
|
||||
# Otherwise, we should send the request, then get the response
|
||||
# and return it.
|
||||
|
||||
# make the request
|
||||
self._baseclass.request(
|
||||
self,
|
||||
method=self._vcr_request.method,
|
||||
url=self._vcr_request.url,
|
||||
body=self._vcr_request.body,
|
||||
headers=dict(self._vcr_request.headers or {})
|
||||
)
|
||||
|
||||
# get the response
|
||||
response = self._baseclass.getresponse(self)
|
||||
|
||||
# put the response into the cassette
|
||||
response = {
|
||||
'status': {'code': response.status, 'message': response.reason},
|
||||
'status': {
|
||||
'code': response.status,
|
||||
'message': response.reason
|
||||
},
|
||||
'headers': dict(response.getheaders()),
|
||||
'body': {'string': response.read()},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user