mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-09 01:03:24 +00:00
Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07774ae6dd | ||
|
|
e1c7eb1ec5 | ||
|
|
7f958246e0 | ||
|
|
c8299103fb | ||
|
|
98603541d6 | ||
|
|
6b8d4643e8 | ||
|
|
b55834e929 | ||
|
|
7264780960 | ||
|
|
0f2695f240 | ||
|
|
65254b4969 | ||
|
|
6005420409 | ||
|
|
c5eca93edc | ||
|
|
b688dd362d | ||
|
|
28379e9000 | ||
|
|
b7af8bae71 | ||
|
|
7a4c11bf94 | ||
|
|
1478ce82fd | ||
|
|
9073cf137e | ||
|
|
53f5cd24d6 | ||
|
|
cf744dca00 | ||
|
|
51f0f1bacd | ||
|
|
3e247a2efb | ||
|
|
762b761d0c | ||
|
|
348cc8fdfe |
67
README.md
67
README.md
@@ -1,5 +1,7 @@
|
||||
#VCR.py
|
||||
|
||||

|
||||
|
||||
This is a Python version of [Ruby's VCR library](https://github.com/myronmarston/vcr).
|
||||
|
||||
[](http://travis-ci.org/kevin1024/vcrpy)
|
||||
@@ -38,6 +40,33 @@ pass, even if you are offline, or iana.org goes down for maintenance) and
|
||||
accurate (the response will contain the same headers and body you get from a
|
||||
real request).
|
||||
|
||||
## Configuration
|
||||
|
||||
If you don't like VCR's defaults, you can set options by instantiating a
|
||||
VCR class and setting the options on it.
|
||||
|
||||
```python
|
||||
|
||||
import vcr
|
||||
|
||||
my_vcr = vcr.VCR(
|
||||
serializer = 'json',
|
||||
cassette_library_dir = 'fixtures/cassettes',
|
||||
)
|
||||
|
||||
with my_vcr.use_cassette('test.json'):
|
||||
# your http code here
|
||||
```
|
||||
|
||||
Otherwise, you can override options each time you use a cassette.
|
||||
|
||||
```python
|
||||
with vcr.use_cassette('test.yml', serializer='json'):
|
||||
# your http code here
|
||||
```
|
||||
|
||||
Note: Per-cassette overrides take precedence over the global config.
|
||||
|
||||
## Advanced Features
|
||||
|
||||
If you want, VCR.py can return information about the cassette it is
|
||||
@@ -82,7 +111,40 @@ The Request object has the following properties
|
||||
* `protocol`: The protocol used to make the request (http or https)
|
||||
* `body`: The body of the request, usually empty except for POST / PUT / etc
|
||||
|
||||
## Register your own serializer
|
||||
|
||||
Don't like JSON or YAML? That's OK, VCR.py can serialize to any format
|
||||
you would like. Create your own module or class instance with 2 methods:
|
||||
|
||||
* `def deserialize(cassette_string)`
|
||||
* `def serialize(cassette_dict)`
|
||||
|
||||
Finally, register your class with VCR to use your
|
||||
new serializer.
|
||||
|
||||
```
|
||||
import vcr
|
||||
|
||||
BogoSerializer(object):
|
||||
"""
|
||||
Must implement serialize() and deserialize() methods
|
||||
"""
|
||||
pass
|
||||
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_serializer('bogo', BogoSerializer())
|
||||
|
||||
with my_vcr.use_cassette('test.bogo', serializer='bogo'):
|
||||
# your http here
|
||||
|
||||
# After you register, you can set the default serializer to your new serializer
|
||||
|
||||
my_vcr.serializer = 'bogo'
|
||||
|
||||
with my_vcr.use_cassette('test.bogo'):
|
||||
# your http here
|
||||
|
||||
```
|
||||
|
||||
##Installation
|
||||
|
||||
@@ -97,6 +159,11 @@ This library is a work in progress, so the API might change on you.
|
||||
There are probably some [bugs](https://github.com/kevin1024/vcrpy/issues?labels=bug&page=1&state=open) floating around too.
|
||||
|
||||
##Changelog
|
||||
* 0.2.1: Fixed missing modules in setup.py
|
||||
* 0.2.0: Added configuration API, which lets you configure some settings
|
||||
on VCR (see the README). Also, VCR no longer saves cassettes if they
|
||||
haven't changed at all and supports JSON as well as YAML
|
||||
(thanks @sirpengi). Added amazing new skeumorphic logo, thanks @hairarrow.
|
||||
* 0.1.0: *backwards incompatible release - delete your old cassette files*:
|
||||
This release adds the ability to access the cassette to make assertions
|
||||
on it, as well as a major code refactor thanks to @dlecocq. It also
|
||||
|
||||
15
setup.py
15
setup.py
@@ -19,17 +19,24 @@ class PyTest(TestCommand):
|
||||
sys.exit(errno)
|
||||
|
||||
setup(name='vcrpy',
|
||||
version='0.1.0',
|
||||
version='0.2.1',
|
||||
description="A Python port of Ruby's VCR to make mocking HTTP easier",
|
||||
author='Kevin McCarthy',
|
||||
author_email='me@kevinmccarthy.org',
|
||||
url='https://github.com/kevin1024/vcrpy',
|
||||
packages=[
|
||||
packages = [
|
||||
'vcr',
|
||||
'vcr.stubs'],
|
||||
'vcr.stubs',
|
||||
'vcr.compat',
|
||||
'vcr.persisters',
|
||||
'vcr.serializers',
|
||||
],
|
||||
package_dir={
|
||||
'vcr': 'vcr',
|
||||
'vcr.stubs': 'vcr/stubs'},
|
||||
'vcr.stubs': 'vcr/stubs',
|
||||
'vcr.compat': 'vcr/compat',
|
||||
'vcr.persisters': 'vcr/persisters',
|
||||
},
|
||||
install_requires=['PyYAML'],
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
|
||||
@@ -2,6 +2,7 @@ def assert_cassette_empty(cass):
|
||||
assert len(cass) == 0
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def assert_cassette_has_one_response(cass):
|
||||
assert len(cass) == 1
|
||||
assert cass.play_count == 1
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
# coding=utf-8
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import unittest
|
||||
|
||||
|
||||
class TestVCR(unittest.TestCase):
|
||||
fixtures = os.path.join('does', 'not', 'exist')
|
||||
|
||||
def tearDown(self):
|
||||
# Remove the fixtures if they exist
|
||||
if os.path.exists(self.fixtures):
|
||||
shutil.rmtree(self.fixtures)
|
||||
|
||||
def fixture(self, *names):
|
||||
'''Return a path to the provided fixture'''
|
||||
return os.path.join(self.fixtures, *names)
|
||||
@@ -8,17 +8,19 @@ import urllib2
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
def test_nonexistent_directory(tmpdir):
|
||||
'''If we load a cassette in a nonexistent directory, it can save ok'''
|
||||
# Check to make sure directory doesnt exist
|
||||
assert not os.path.exists(str(tmpdir.join('nonexistent')))
|
||||
|
||||
# Run VCR to create dir and cassette file
|
||||
with vcr.use_cassette(str(tmpdir.join('nonexistent','cassette.yml'))):
|
||||
with vcr.use_cassette(str(tmpdir.join('nonexistent', 'cassette.yml'))):
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# This should have made the file and the directory
|
||||
assert os.path.exists(str(tmpdir.join('nonexistent','cassette.yml')))
|
||||
assert os.path.exists(str(tmpdir.join('nonexistent', 'cassette.yml')))
|
||||
|
||||
|
||||
def test_unpatch(tmpdir):
|
||||
'''Ensure that our cassette gets unpatched when we're done'''
|
||||
@@ -30,10 +32,70 @@ def test_unpatch(tmpdir):
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def test_basic_use(tmpdir):
|
||||
'''
|
||||
Copied from the docs
|
||||
Copied from the docs
|
||||
'''
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
response = urllib2.urlopen(
|
||||
'http://www.iana.org/domains/reserved'
|
||||
).read()
|
||||
assert 'Example domains' in response
|
||||
|
||||
|
||||
def test_basic_json_use(tmpdir):
|
||||
'''
|
||||
Ensure you can load a json serialized cassette
|
||||
'''
|
||||
test_fixture = 'fixtures/vcr_cassettes/synopsis.json'
|
||||
with vcr.use_cassette(test_fixture, serializer='json'):
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert 'difficult sometimes' in response
|
||||
|
||||
|
||||
def test_patched_content(tmpdir):
|
||||
'''
|
||||
Ensure that what you pull from a cassette is what came from the
|
||||
request
|
||||
'''
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
cass._save(force=True)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
|
||||
assert response == response2
|
||||
assert response2 == response3
|
||||
|
||||
|
||||
def test_patched_content_json(tmpdir):
|
||||
'''
|
||||
Ensure that what you pull from a json cassette is what came from the
|
||||
request
|
||||
'''
|
||||
|
||||
testfile = str(tmpdir.join('synopsis.json'))
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
cass._save(force=True)
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
|
||||
assert response == response2
|
||||
assert response2 == response3
|
||||
|
||||
36
tests/integration/test_config.py
Normal file
36
tests/integration/test_config.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
import json
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def test_set_serializer_default_config(tmpdir):
|
||||
my_vcr = vcr.VCR(serializer='json')
|
||||
|
||||
with my_vcr.use_cassette(str(tmpdir.join('test.json'))):
|
||||
assert my_vcr.serializer == 'json'
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
with open(str(tmpdir.join('test.json'))) as f:
|
||||
assert json.loads(f.read())
|
||||
|
||||
|
||||
def test_default_set_cassette_library_dir(tmpdir):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||
|
||||
with my_vcr.use_cassette('test.json'):
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||
|
||||
|
||||
def test_override_set_cassette_library_dir(tmpdir):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||
|
||||
cld = str(tmpdir.join('subdir2'))
|
||||
|
||||
with my_vcr.use_cassette('test.json', cassette_library_dir=cld):
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
assert os.path.exists(str(tmpdir.join('subdir2').join('test.json')))
|
||||
assert not os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||
55
tests/integration/test_disksaver.py
Normal file
55
tests/integration/test_disksaver.py
Normal file
@@ -0,0 +1,55 @@
|
||||
'''Basic tests about save behavior'''
|
||||
# coding=utf-8
|
||||
|
||||
# External imports
|
||||
import os
|
||||
import urllib2
|
||||
import time
|
||||
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
def test_disk_saver_nowrite(tmpdir):
|
||||
'''
|
||||
Ensure that when you close a cassette without changing it it doesn't
|
||||
rewrite the file
|
||||
'''
|
||||
fname = str(tmpdir.join('synopsis.yaml'))
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 0
|
||||
last_mod = os.path.getmtime(fname)
|
||||
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 1
|
||||
assert cass.dirty is False
|
||||
last_mod2 = os.path.getmtime(fname)
|
||||
|
||||
assert last_mod == last_mod2
|
||||
|
||||
|
||||
def test_disk_saver_write(tmpdir):
|
||||
'''
|
||||
Ensure that when you close a cassette after changing it it does
|
||||
rewrite the file
|
||||
'''
|
||||
fname = str(tmpdir.join('synopsis.yaml'))
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 0
|
||||
last_mod = os.path.getmtime(fname)
|
||||
|
||||
# Make sure at least 1 second passes, otherwise sometimes
|
||||
# the mtime doesn't change
|
||||
time.sleep(1)
|
||||
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
assert cass.dirty
|
||||
last_mod2 = os.path.getmtime(fname)
|
||||
|
||||
assert last_mod != last_mod2
|
||||
35
tests/integration/test_register_serializer.py
Normal file
35
tests/integration/test_register_serializer.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
class MockSerializer(object):
|
||||
def __init__(self):
|
||||
self.serialize_count = 0
|
||||
self.deserialize_count = 0
|
||||
self.load_args = None
|
||||
|
||||
def deserialize(self, cassette_string):
|
||||
self.serialize_count += 1
|
||||
self.cassette_string = cassette_string
|
||||
return ([], [])
|
||||
|
||||
def serialize(self, cassette_dict):
|
||||
self.deserialize_count += 1
|
||||
return ""
|
||||
|
||||
|
||||
def test_registered_serializer(tmpdir):
|
||||
ms = MockSerializer()
|
||||
my_vcr = vcr.VCR()
|
||||
my_vcr.register_serializer('mock', ms)
|
||||
tmpdir.join('test.mock').write('test_data')
|
||||
with my_vcr.use_cassette(str(tmpdir.join('test.mock')), serializer='mock'):
|
||||
urllib2.urlopen('http://httpbin.org/')
|
||||
# Serializer deserialized once
|
||||
assert ms.serialize_count == 1
|
||||
# and serialized the test data string
|
||||
assert ms.cassette_string == 'test_data'
|
||||
# and hasn't serialized yet
|
||||
assert ms.deserialize_count == 0
|
||||
|
||||
assert ms.serialize_count == 1
|
||||
@@ -1,12 +1,11 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def test_recorded_request_url_with_redirected_request(tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
|
||||
assert len(cass) == 0
|
||||
urllib2.urlopen('http://google.com')
|
||||
print cass.requests
|
||||
print cass.requests[0]
|
||||
assert cass.requests[0].url == 'http://google.com'
|
||||
assert cass.requests[1].url == 'http://www.google.com/'
|
||||
assert len(cass) == 2
|
||||
urllib2.urlopen('http://httpbin.org/redirect/3')
|
||||
assert cass.requests[0].url == 'http://httpbin.org/redirect/3'
|
||||
assert cass.requests[3].url == 'http://httpbin.org/get'
|
||||
assert len(cass) == 4
|
||||
|
||||
@@ -2,18 +2,14 @@
|
||||
|
||||
# coding=utf-8
|
||||
|
||||
# Internal imports
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
import vcr
|
||||
|
||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||
|
||||
requests = pytest.importorskip("requests")
|
||||
|
||||
@pytest.fixture(params=["https","http"])
|
||||
|
||||
@pytest.fixture(params=["https", "http"])
|
||||
def scheme(request):
|
||||
"""
|
||||
Fixture that returns both http and https
|
||||
@@ -31,6 +27,7 @@ def test_status_code(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_headers(scheme, tmpdir):
|
||||
'''Ensure that we can read the headers back'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
@@ -41,6 +38,7 @@ def test_headers(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_body(tmpdir, scheme):
|
||||
'''Ensure the responses are all identical enough'''
|
||||
url = scheme + '://httpbin.org/bytes/1024'
|
||||
@@ -51,6 +49,7 @@ def test_body(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_auth(tmpdir, scheme):
|
||||
'''Ensure that we can handle basic auth'''
|
||||
auth = ('user', 'passwd')
|
||||
@@ -65,6 +64,7 @@ def test_auth(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_auth_failed(tmpdir, scheme):
|
||||
'''Ensure that we can save failed auth statuses'''
|
||||
auth = ('user', 'wrongwrongwrong')
|
||||
@@ -79,6 +79,7 @@ def test_auth_failed(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_post(tmpdir, scheme):
|
||||
'''Ensure that we can post and cache the results'''
|
||||
data = {'key1': 'value1', 'key2': 'value2'}
|
||||
@@ -86,10 +87,13 @@ def test_post(tmpdir, scheme):
|
||||
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert requests.post(url, data).content == requests.post(url, data).content
|
||||
req1 = requests.post(url, data).content
|
||||
req2 = requests.post(url, data).content
|
||||
assert req1 == req2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_redirects(tmpdir, scheme):
|
||||
'''Ensure that we can handle redirects'''
|
||||
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
|
||||
@@ -102,6 +106,7 @@ def test_redirects(tmpdir, scheme):
|
||||
assert len(cass) == 2
|
||||
assert cass.play_count == 2
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir, scheme):
|
||||
'''Ensure that requests between schemes are treated separately'''
|
||||
# First fetch a url under http, and then again under https and then
|
||||
@@ -112,4 +117,3 @@ def test_cross_scheme(tmpdir, scheme):
|
||||
requests.get('http://httpbin.org/')
|
||||
assert cass.play_count == 0
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
@@ -12,13 +12,15 @@ import vcr
|
||||
|
||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||
|
||||
@pytest.fixture(params=["https","http"])
|
||||
|
||||
@pytest.fixture(params=["https", "http"])
|
||||
def scheme(request):
|
||||
"""
|
||||
Fixture that returns both http and https
|
||||
"""
|
||||
return request.param
|
||||
|
||||
|
||||
def test_response_code(scheme, tmpdir):
|
||||
'''Ensure we can read a response code from a fetch'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
@@ -29,6 +31,7 @@ def test_response_code(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_random_body(scheme, tmpdir):
|
||||
'''Ensure we can read the content, and that it's served from cache'''
|
||||
url = scheme + '://httpbin.org/bytes/1024'
|
||||
@@ -39,16 +42,20 @@ def test_random_body(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_response_headers(scheme, tmpdir):
|
||||
'''Ensure we can get information from the response'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert urllib2.urlopen(url).info().items() == urllib2.urlopen(url).info().items()
|
||||
open1 = urllib2.urlopen(url).info().items()
|
||||
open2 = urllib2.urlopen(url).info().items()
|
||||
assert open1 == open2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_multiple_requests(scheme, tmpdir):
|
||||
'''Ensure that we can cache multiple requests'''
|
||||
urls = [
|
||||
@@ -65,6 +72,7 @@ def test_multiple_requests(scheme, tmpdir):
|
||||
assert len(cass) == index + 1
|
||||
assert cass.play_count == index + 1
|
||||
|
||||
|
||||
def test_get_data(scheme, tmpdir):
|
||||
'''Ensure that it works with query data'''
|
||||
data = urlencode({'some': 1, 'data': 'here'})
|
||||
@@ -72,13 +80,14 @@ def test_get_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url).read()
|
||||
res1 = urllib2.urlopen(url).read()
|
||||
res2 = urllib2.urlopen(url).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert len(cass) == 1
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
def test_post_data(scheme, tmpdir):
|
||||
'''Ensure that it works when posting data'''
|
||||
data = urlencode({'some': 1, 'data': 'here'})
|
||||
@@ -86,12 +95,13 @@ def test_post_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res2 = urllib2.urlopen(url, data).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_post_unicode_data(scheme, tmpdir):
|
||||
'''Ensure that it works when posting unicode data'''
|
||||
data = urlencode({'snowman': u'☃'.encode('utf-8')})
|
||||
@@ -99,12 +109,13 @@ def test_post_unicode_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res2 = urllib2.urlopen(url, data).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir):
|
||||
'''Ensure that requests between schemes are treated separately'''
|
||||
# First fetch a url under https, and then again under https and then
|
||||
|
||||
@@ -3,6 +3,7 @@ requests = pytest.importorskip("requests")
|
||||
|
||||
import vcr
|
||||
|
||||
|
||||
def test_domain_redirect():
|
||||
'''Ensure that redirects across domains are considered unique'''
|
||||
# In this example, seomoz.org redirects to moz.com, and if those
|
||||
@@ -14,4 +15,3 @@ def test_domain_redirect():
|
||||
# Ensure that we've now served two responses. One for the original
|
||||
# redirect, and a second for the actual fetch
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
@@ -2,24 +2,28 @@ import pytest
|
||||
import yaml
|
||||
from vcr.cassette import Cassette
|
||||
|
||||
|
||||
def test_cassette_load(tmpdir):
|
||||
a_file = tmpdir.join('test_cassette.yml')
|
||||
a_file.write(yaml.dump([
|
||||
{'request':'foo', 'response':'bar'}
|
||||
{'request': 'foo', 'response': 'bar'}
|
||||
]))
|
||||
a_cassette = Cassette.load(str(a_file))
|
||||
assert len(a_cassette) == 1
|
||||
|
||||
|
||||
def test_cassette_not_played():
|
||||
a = Cassette('test')
|
||||
assert not a.play_count
|
||||
|
||||
|
||||
def test_cassette_played():
|
||||
a = Cassette('test')
|
||||
a.mark_played('foo')
|
||||
a.mark_played('foo')
|
||||
assert a.play_count == 2
|
||||
|
||||
|
||||
def test_cassette_play_counter():
|
||||
a = Cassette('test')
|
||||
a.mark_played('foo')
|
||||
@@ -27,28 +31,33 @@ def test_cassette_play_counter():
|
||||
assert a.play_counts['foo'] == 1
|
||||
assert a.play_counts['bar'] == 1
|
||||
|
||||
|
||||
def test_cassette_append():
|
||||
a = Cassette('test')
|
||||
a.append('foo', 'bar')
|
||||
assert a.requests == ['foo']
|
||||
assert a.responses == ['bar']
|
||||
|
||||
|
||||
def test_cassette_len():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo2','bar2')
|
||||
a.append('foo', 'bar')
|
||||
a.append('foo2', 'bar2')
|
||||
assert len(a) == 2
|
||||
|
||||
|
||||
def test_cassette_contains():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo', 'bar')
|
||||
assert 'foo' in a
|
||||
|
||||
|
||||
def test_cassette_response_of():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo', 'bar')
|
||||
assert a.response_of('foo') == 'bar'
|
||||
|
||||
|
||||
def test_cassette_get_missing_response():
|
||||
a = Cassette('test')
|
||||
with pytest.raises(KeyError):
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from vcr.request import Request
|
||||
|
||||
|
||||
def test_url():
|
||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
||||
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||
assert req.url == 'http://www.google.com/'
|
||||
|
||||
|
||||
def test_str():
|
||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
||||
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||
str(req) == '<Request (GET) http://www.google.com>'
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
# Import Cassette to make it available at the top level
|
||||
from .cassette import Cassette
|
||||
from config import VCR
|
||||
|
||||
default_vcr = VCR()
|
||||
|
||||
|
||||
# Also, make a 'load' function available
|
||||
def use_cassette(path):
|
||||
return Cassette.load(path)
|
||||
def use_cassette(path, **kwargs):
|
||||
return default_vcr.use_cassette(path, **kwargs)
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
'''The container for recorded requests and responses'''
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
try:
|
||||
from collections import Counter, OrderedDict
|
||||
except ImportError:
|
||||
@@ -11,25 +9,24 @@ except ImportError:
|
||||
# Internal imports
|
||||
from .patch import install, reset
|
||||
from .persist import load_cassette, save_cassette
|
||||
from .serializers import yamlserializer
|
||||
|
||||
|
||||
class Cassette(object):
|
||||
'''A container for recorded requests and responses'''
|
||||
@classmethod
|
||||
def load(cls, path):
|
||||
def load(cls, path, **kwargs):
|
||||
'''Load in the cassette stored at the provided path'''
|
||||
new_cassette = cls(path)
|
||||
try:
|
||||
requests, responses = load_cassette(path)
|
||||
for request, response in zip(requests, responses):
|
||||
new_cassette.append(request, response)
|
||||
except IOError:
|
||||
pass
|
||||
new_cassette = cls(path, **kwargs)
|
||||
new_cassette._load()
|
||||
return new_cassette
|
||||
|
||||
def __init__(self, path):
|
||||
def __init__(self, path, serializer=yamlserializer):
|
||||
self._path = path
|
||||
self._serializer = serializer
|
||||
self.data = OrderedDict()
|
||||
self.play_counts = Counter()
|
||||
self.dirty = False
|
||||
|
||||
@property
|
||||
def play_count(self):
|
||||
@@ -52,16 +49,40 @@ class Cassette(object):
|
||||
def append(self, request, response):
|
||||
'''Add a request, response pair to this cassette'''
|
||||
self.data[request] = response
|
||||
self.dirty = True
|
||||
|
||||
def response_of(self, request):
|
||||
'''Find the response corresponding to a request'''
|
||||
return self.data[request]
|
||||
|
||||
def _save(self):
|
||||
save_cassette(self._path, self.requests, self.responses)
|
||||
def _as_dict(self):
|
||||
return {"requests": self.requests, "responses": self.responses}
|
||||
|
||||
def _save(self, force=False):
|
||||
if force or self.dirty:
|
||||
save_cassette(
|
||||
self._path,
|
||||
self._as_dict(),
|
||||
serializer=self._serializer
|
||||
)
|
||||
self.dirty = False
|
||||
|
||||
def _load(self):
|
||||
try:
|
||||
requests, responses = load_cassette(
|
||||
self._path,
|
||||
serializer=self._serializer
|
||||
)
|
||||
for request, response in zip(requests, responses):
|
||||
self.append(request, response)
|
||||
self.dirty = False
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "<Cassette containing {0} recorded response(s)>".format(len(self))
|
||||
return "<Cassette containing {0} recorded response(s)>".format(
|
||||
len(self)
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
'''Return the number of request,response pairs stored in here'''
|
||||
|
||||
42
vcr/config.py
Normal file
42
vcr/config.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import os
|
||||
from .cassette import Cassette
|
||||
from .serializers import yamlserializer, jsonserializer
|
||||
|
||||
|
||||
class VCR(object):
|
||||
def __init__(self, serializer='yaml', cassette_library_dir=None):
|
||||
self.serializer = serializer
|
||||
self.cassette_library_dir = cassette_library_dir
|
||||
self.serializers = {
|
||||
'yaml': yamlserializer,
|
||||
'json': jsonserializer,
|
||||
}
|
||||
|
||||
def _get_serializer(self, serializer_name):
|
||||
try:
|
||||
serializer = self.serializers[serializer_name]
|
||||
except KeyError:
|
||||
print "Serializer {0} doesn't exist or isn't registered".format(
|
||||
serializer_name
|
||||
)
|
||||
raise KeyError
|
||||
return serializer
|
||||
|
||||
def use_cassette(self, path, **kwargs):
|
||||
serializer_name = kwargs.get('serializer', self.serializer)
|
||||
cassette_library_dir = kwargs.get(
|
||||
'cassette_library_dir',
|
||||
self.cassette_library_dir
|
||||
)
|
||||
|
||||
if cassette_library_dir:
|
||||
path = os.path.join(cassette_library_dir, path)
|
||||
|
||||
merged_config = {
|
||||
"serializer": self._get_serializer(serializer_name),
|
||||
}
|
||||
|
||||
return Cassette.load(path, **merged_config)
|
||||
|
||||
def register_serializer(self, name, serializer):
|
||||
self.serializers[name] = serializer
|
||||
@@ -59,7 +59,7 @@ def reset():
|
||||
'''Undo all the patching'''
|
||||
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
|
||||
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
|
||||
_HTTPSConnection
|
||||
_HTTPSConnection
|
||||
try:
|
||||
import requests.packages.urllib3.connectionpool as cpool
|
||||
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
|
||||
|
||||
@@ -1,44 +1,11 @@
|
||||
import tempfile
|
||||
import os
|
||||
import yaml
|
||||
from .persisters.filesystem import FilesystemPersister
|
||||
|
||||
# Use the libYAML versions if possible
|
||||
try:
|
||||
from yaml import CLoader as Loader, CDumper as Dumper
|
||||
except ImportError:
|
||||
from yaml import Loader, Dumper
|
||||
|
||||
def _serialize_cassette(requests, responses):
|
||||
'''Return a serializable version of the cassette'''
|
||||
return ([{
|
||||
'request': request,
|
||||
'response': response,
|
||||
} for request, response in zip(requests, responses)])
|
||||
def load_cassette(cassette_path, serializer):
|
||||
with open(cassette_path) as f:
|
||||
return serializer.deserialize(f.read())
|
||||
|
||||
def _deserialize_cassette(data):
|
||||
requests = [r['request'] for r in data]
|
||||
responses = [r['response'] for r in data]
|
||||
return requests, responses
|
||||
|
||||
def _secure_write(path, contents):
|
||||
"""
|
||||
We'll overwrite the old version securely by writing out a temporary
|
||||
version and then moving it to replace the old version
|
||||
"""
|
||||
dirname, filename = os.path.split(path)
|
||||
fd, name = tempfile.mkstemp(dir=dirname, prefix=filename)
|
||||
with os.fdopen(fd, 'w') as fout:
|
||||
fout.write(contents)
|
||||
os.rename(name, path)
|
||||
|
||||
def load_cassette(cassette_path):
|
||||
data = yaml.load(open(cassette_path), Loader=Loader)
|
||||
return _deserialize_cassette(data)
|
||||
|
||||
def save_cassette(cassette_path, requests, responses):
|
||||
dirname, filename = os.path.split(cassette_path)
|
||||
if dirname and not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
data = _serialize_cassette(requests, responses)
|
||||
data = yaml.dump(data, Dumper=Dumper)
|
||||
_secure_write(cassette_path, data)
|
||||
def save_cassette(cassette_path, cassette_dict, serializer):
|
||||
data = serializer.serialize(cassette_dict)
|
||||
FilesystemPersister.write(cassette_path, data)
|
||||
|
||||
0
vcr/persisters/__init__.py
Normal file
0
vcr/persisters/__init__.py
Normal file
23
vcr/persisters/filesystem.py
Normal file
23
vcr/persisters/filesystem.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
class FilesystemPersister(object):
|
||||
@classmethod
|
||||
def _secure_write(cls, path, contents):
|
||||
"""
|
||||
We'll overwrite the old version securely by writing out a temporary
|
||||
version and then moving it to replace the old version
|
||||
"""
|
||||
dirname, filename = os.path.split(path)
|
||||
fd, name = tempfile.mkstemp(dir=dirname, prefix=filename)
|
||||
with os.fdopen(fd, 'w') as fout:
|
||||
fout.write(contents)
|
||||
os.rename(name, path)
|
||||
|
||||
@classmethod
|
||||
def write(cls, cassette_path, data):
|
||||
dirname, filename = os.path.split(cassette_path)
|
||||
if dirname and not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
cls._secure_write(cassette_path, data)
|
||||
@@ -7,7 +7,7 @@ class Request(object):
|
||||
self.method = method
|
||||
self.path = path
|
||||
self.body = body
|
||||
# make haders a frozenset so it will be hashable
|
||||
# make headers a frozenset so it will be hashable
|
||||
self.headers = frozenset(headers.items())
|
||||
|
||||
@property
|
||||
@@ -15,7 +15,14 @@ class Request(object):
|
||||
return "{0}://{1}{2}".format(self.protocol, self.host, self.path)
|
||||
|
||||
def __key(self):
|
||||
return (self.host, self.port, self.method, self.path, self.body, self.headers)
|
||||
return (
|
||||
self.host,
|
||||
self.port,
|
||||
self.method,
|
||||
self.path,
|
||||
self.body,
|
||||
self.headers
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.__key())
|
||||
@@ -28,3 +35,18 @@ class Request(object):
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def _to_dict(self):
|
||||
return {
|
||||
'protocol': self.protocol,
|
||||
'host': self.host,
|
||||
'port': self.port,
|
||||
'method': self.method,
|
||||
'path': self.path,
|
||||
'body': self.body,
|
||||
'headers': self.headers,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _from_dict(cls, dct):
|
||||
return Request(**dct)
|
||||
|
||||
0
vcr/serializers/__init__.py
Normal file
0
vcr/serializers/__init__.py
Normal file
34
vcr/serializers/jsonserializer.py
Normal file
34
vcr/serializers/jsonserializer.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from vcr.request import Request
|
||||
try:
|
||||
import simplejson as json
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
|
||||
def _json_default(obj):
|
||||
if isinstance(obj, frozenset):
|
||||
return dict(obj)
|
||||
return obj
|
||||
|
||||
|
||||
def _fix_response_unicode(d):
|
||||
d['body']['string'] = d['body']['string'].encode('utf-8')
|
||||
return d
|
||||
|
||||
|
||||
def deserialize(cassette_string):
|
||||
data = json.loads(cassette_string)
|
||||
requests = [Request._from_dict(r['request']) for r in data]
|
||||
responses = [_fix_response_unicode(r['response']) for r in data]
|
||||
return requests, responses
|
||||
|
||||
|
||||
def serialize(cassette_dict):
|
||||
data = ([{
|
||||
'request': request._to_dict(),
|
||||
'response': response,
|
||||
} for request, response in zip(
|
||||
cassette_dict['requests'],
|
||||
cassette_dict['responses']
|
||||
)])
|
||||
return json.dumps(data, indent=4, default=_json_default)
|
||||
25
vcr/serializers/yamlserializer.py
Normal file
25
vcr/serializers/yamlserializer.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import yaml
|
||||
|
||||
# Use the libYAML versions if possible
|
||||
try:
|
||||
from yaml import CLoader as Loader, CDumper as Dumper
|
||||
except ImportError:
|
||||
from yaml import Loader, Dumper
|
||||
|
||||
|
||||
def deserialize(cassette_string):
|
||||
data = yaml.load(cassette_string, Loader=Loader)
|
||||
requests = [r['request'] for r in data]
|
||||
responses = [r['response'] for r in data]
|
||||
return requests, responses
|
||||
|
||||
|
||||
def serialize(cassette_dict):
|
||||
data = ([{
|
||||
'request': request,
|
||||
'response': response,
|
||||
} for request, response in zip(
|
||||
cassette_dict['requests'],
|
||||
cassette_dict['responses']
|
||||
)])
|
||||
return yaml.dump(data, Dumper=Dumper)
|
||||
@@ -18,7 +18,7 @@ class VCRHTTPResponse(object):
|
||||
self._content = StringIO(self.recorded_response['body']['string'])
|
||||
|
||||
# We are skipping the header parsing (they have already been parsed
|
||||
# at this point) and directly adding the headers to the header
|
||||
# at this point) and directly adding the headers to the header
|
||||
# container, so just pass an empty StringIO.
|
||||
self.msg = HTTPMessage(StringIO(''))
|
||||
|
||||
@@ -55,13 +55,13 @@ class VCRConnectionMixin:
|
||||
def request(self, method, url, body=None, headers=None):
|
||||
'''Persist the request metadata in self._vcr_request'''
|
||||
self._vcr_request = Request(
|
||||
protocol = self._protocol,
|
||||
host = self.host,
|
||||
port = self.port,
|
||||
method = method,
|
||||
path = url,
|
||||
body = body,
|
||||
headers = headers or {}
|
||||
protocol=self._protocol,
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
method=method,
|
||||
path=url,
|
||||
body=body,
|
||||
headers=headers or {}
|
||||
)
|
||||
|
||||
# Check if we have a cassette set, and if we have a response saved.
|
||||
@@ -84,11 +84,14 @@ class VCRConnectionMixin:
|
||||
self.cassette.mark_played(self._vcr_request)
|
||||
return VCRHTTPResponse(response)
|
||||
else:
|
||||
# Otherwise, we made an actual request, and should return the response
|
||||
# we got from the actual connection
|
||||
# Otherwise, we made an actual request, and should return the
|
||||
# response we got from the actual connection
|
||||
response = HTTPConnection.getresponse(self)
|
||||
response = {
|
||||
'status': {'code': response.status, 'message': response.reason},
|
||||
'status': {
|
||||
'code': response.status,
|
||||
'message': response.reason
|
||||
},
|
||||
'headers': dict(response.getheaders()),
|
||||
'body': {'string': response.read()},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user