mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-09 01:03:24 +00:00
Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
07774ae6dd | ||
|
|
e1c7eb1ec5 | ||
|
|
7f958246e0 | ||
|
|
c8299103fb | ||
|
|
98603541d6 | ||
|
|
6b8d4643e8 | ||
|
|
b55834e929 | ||
|
|
7264780960 | ||
|
|
0f2695f240 | ||
|
|
65254b4969 | ||
|
|
6005420409 | ||
|
|
c5eca93edc | ||
|
|
b688dd362d | ||
|
|
28379e9000 | ||
|
|
b7af8bae71 | ||
|
|
7a4c11bf94 | ||
|
|
1478ce82fd | ||
|
|
9073cf137e | ||
|
|
53f5cd24d6 | ||
|
|
cf744dca00 | ||
|
|
51f0f1bacd | ||
|
|
3e247a2efb | ||
|
|
762b761d0c | ||
|
|
348cc8fdfe |
67
README.md
67
README.md
@@ -1,5 +1,7 @@
|
|||||||
#VCR.py
|
#VCR.py
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
This is a Python version of [Ruby's VCR library](https://github.com/myronmarston/vcr).
|
This is a Python version of [Ruby's VCR library](https://github.com/myronmarston/vcr).
|
||||||
|
|
||||||
[](http://travis-ci.org/kevin1024/vcrpy)
|
[](http://travis-ci.org/kevin1024/vcrpy)
|
||||||
@@ -38,6 +40,33 @@ pass, even if you are offline, or iana.org goes down for maintenance) and
|
|||||||
accurate (the response will contain the same headers and body you get from a
|
accurate (the response will contain the same headers and body you get from a
|
||||||
real request).
|
real request).
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
If you don't like VCR's defaults, you can set options by instantiating a
|
||||||
|
VCR class and setting the options on it.
|
||||||
|
|
||||||
|
```python
|
||||||
|
|
||||||
|
import vcr
|
||||||
|
|
||||||
|
my_vcr = vcr.VCR(
|
||||||
|
serializer = 'json',
|
||||||
|
cassette_library_dir = 'fixtures/cassettes',
|
||||||
|
)
|
||||||
|
|
||||||
|
with my_vcr.use_cassette('test.json'):
|
||||||
|
# your http code here
|
||||||
|
```
|
||||||
|
|
||||||
|
Otherwise, you can override options each time you use a cassette.
|
||||||
|
|
||||||
|
```python
|
||||||
|
with vcr.use_cassette('test.yml', serializer='json'):
|
||||||
|
# your http code here
|
||||||
|
```
|
||||||
|
|
||||||
|
Note: Per-cassette overrides take precedence over the global config.
|
||||||
|
|
||||||
## Advanced Features
|
## Advanced Features
|
||||||
|
|
||||||
If you want, VCR.py can return information about the cassette it is
|
If you want, VCR.py can return information about the cassette it is
|
||||||
@@ -82,7 +111,40 @@ The Request object has the following properties
|
|||||||
* `protocol`: The protocol used to make the request (http or https)
|
* `protocol`: The protocol used to make the request (http or https)
|
||||||
* `body`: The body of the request, usually empty except for POST / PUT / etc
|
* `body`: The body of the request, usually empty except for POST / PUT / etc
|
||||||
|
|
||||||
|
## Register your own serializer
|
||||||
|
|
||||||
|
Don't like JSON or YAML? That's OK, VCR.py can serialize to any format
|
||||||
|
you would like. Create your own module or class instance with 2 methods:
|
||||||
|
|
||||||
|
* `def deserialize(cassette_string)`
|
||||||
|
* `def serialize(cassette_dict)`
|
||||||
|
|
||||||
|
Finally, register your class with VCR to use your
|
||||||
|
new serializer.
|
||||||
|
|
||||||
|
```
|
||||||
|
import vcr
|
||||||
|
|
||||||
|
BogoSerializer(object):
|
||||||
|
"""
|
||||||
|
Must implement serialize() and deserialize() methods
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
my_vcr = vcr.VCR()
|
||||||
|
my_vcr.register_serializer('bogo', BogoSerializer())
|
||||||
|
|
||||||
|
with my_vcr.use_cassette('test.bogo', serializer='bogo'):
|
||||||
|
# your http here
|
||||||
|
|
||||||
|
# After you register, you can set the default serializer to your new serializer
|
||||||
|
|
||||||
|
my_vcr.serializer = 'bogo'
|
||||||
|
|
||||||
|
with my_vcr.use_cassette('test.bogo'):
|
||||||
|
# your http here
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
##Installation
|
##Installation
|
||||||
|
|
||||||
@@ -97,6 +159,11 @@ This library is a work in progress, so the API might change on you.
|
|||||||
There are probably some [bugs](https://github.com/kevin1024/vcrpy/issues?labels=bug&page=1&state=open) floating around too.
|
There are probably some [bugs](https://github.com/kevin1024/vcrpy/issues?labels=bug&page=1&state=open) floating around too.
|
||||||
|
|
||||||
##Changelog
|
##Changelog
|
||||||
|
* 0.2.1: Fixed missing modules in setup.py
|
||||||
|
* 0.2.0: Added configuration API, which lets you configure some settings
|
||||||
|
on VCR (see the README). Also, VCR no longer saves cassettes if they
|
||||||
|
haven't changed at all and supports JSON as well as YAML
|
||||||
|
(thanks @sirpengi). Added amazing new skeumorphic logo, thanks @hairarrow.
|
||||||
* 0.1.0: *backwards incompatible release - delete your old cassette files*:
|
* 0.1.0: *backwards incompatible release - delete your old cassette files*:
|
||||||
This release adds the ability to access the cassette to make assertions
|
This release adds the ability to access the cassette to make assertions
|
||||||
on it, as well as a major code refactor thanks to @dlecocq. It also
|
on it, as well as a major code refactor thanks to @dlecocq. It also
|
||||||
|
|||||||
15
setup.py
15
setup.py
@@ -19,17 +19,24 @@ class PyTest(TestCommand):
|
|||||||
sys.exit(errno)
|
sys.exit(errno)
|
||||||
|
|
||||||
setup(name='vcrpy',
|
setup(name='vcrpy',
|
||||||
version='0.1.0',
|
version='0.2.1',
|
||||||
description="A Python port of Ruby's VCR to make mocking HTTP easier",
|
description="A Python port of Ruby's VCR to make mocking HTTP easier",
|
||||||
author='Kevin McCarthy',
|
author='Kevin McCarthy',
|
||||||
author_email='me@kevinmccarthy.org',
|
author_email='me@kevinmccarthy.org',
|
||||||
url='https://github.com/kevin1024/vcrpy',
|
url='https://github.com/kevin1024/vcrpy',
|
||||||
packages=[
|
packages = [
|
||||||
'vcr',
|
'vcr',
|
||||||
'vcr.stubs'],
|
'vcr.stubs',
|
||||||
|
'vcr.compat',
|
||||||
|
'vcr.persisters',
|
||||||
|
'vcr.serializers',
|
||||||
|
],
|
||||||
package_dir={
|
package_dir={
|
||||||
'vcr': 'vcr',
|
'vcr': 'vcr',
|
||||||
'vcr.stubs': 'vcr/stubs'},
|
'vcr.stubs': 'vcr/stubs',
|
||||||
|
'vcr.compat': 'vcr/compat',
|
||||||
|
'vcr.persisters': 'vcr/persisters',
|
||||||
|
},
|
||||||
install_requires=['PyYAML'],
|
install_requires=['PyYAML'],
|
||||||
license='MIT',
|
license='MIT',
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ def assert_cassette_empty(cass):
|
|||||||
assert len(cass) == 0
|
assert len(cass) == 0
|
||||||
assert cass.play_count == 0
|
assert cass.play_count == 0
|
||||||
|
|
||||||
|
|
||||||
def assert_cassette_has_one_response(cass):
|
def assert_cassette_has_one_response(cass):
|
||||||
assert len(cass) == 1
|
assert len(cass) == 1
|
||||||
assert cass.play_count == 1
|
assert cass.play_count == 1
|
||||||
|
|||||||
@@ -1,18 +0,0 @@
|
|||||||
# coding=utf-8
|
|
||||||
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
|
|
||||||
class TestVCR(unittest.TestCase):
|
|
||||||
fixtures = os.path.join('does', 'not', 'exist')
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
# Remove the fixtures if they exist
|
|
||||||
if os.path.exists(self.fixtures):
|
|
||||||
shutil.rmtree(self.fixtures)
|
|
||||||
|
|
||||||
def fixture(self, *names):
|
|
||||||
'''Return a path to the provided fixture'''
|
|
||||||
return os.path.join(self.fixtures, *names)
|
|
||||||
@@ -8,17 +8,19 @@ import urllib2
|
|||||||
# Internal imports
|
# Internal imports
|
||||||
import vcr
|
import vcr
|
||||||
|
|
||||||
|
|
||||||
def test_nonexistent_directory(tmpdir):
|
def test_nonexistent_directory(tmpdir):
|
||||||
'''If we load a cassette in a nonexistent directory, it can save ok'''
|
'''If we load a cassette in a nonexistent directory, it can save ok'''
|
||||||
# Check to make sure directory doesnt exist
|
# Check to make sure directory doesnt exist
|
||||||
assert not os.path.exists(str(tmpdir.join('nonexistent')))
|
assert not os.path.exists(str(tmpdir.join('nonexistent')))
|
||||||
|
|
||||||
# Run VCR to create dir and cassette file
|
# Run VCR to create dir and cassette file
|
||||||
with vcr.use_cassette(str(tmpdir.join('nonexistent','cassette.yml'))):
|
with vcr.use_cassette(str(tmpdir.join('nonexistent', 'cassette.yml'))):
|
||||||
urllib2.urlopen('http://httpbin.org/').read()
|
urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
|
||||||
# This should have made the file and the directory
|
# This should have made the file and the directory
|
||||||
assert os.path.exists(str(tmpdir.join('nonexistent','cassette.yml')))
|
assert os.path.exists(str(tmpdir.join('nonexistent', 'cassette.yml')))
|
||||||
|
|
||||||
|
|
||||||
def test_unpatch(tmpdir):
|
def test_unpatch(tmpdir):
|
||||||
'''Ensure that our cassette gets unpatched when we're done'''
|
'''Ensure that our cassette gets unpatched when we're done'''
|
||||||
@@ -30,10 +32,70 @@ def test_unpatch(tmpdir):
|
|||||||
urllib2.urlopen('http://httpbin.org/').read()
|
urllib2.urlopen('http://httpbin.org/').read()
|
||||||
assert cass.play_count == 0
|
assert cass.play_count == 0
|
||||||
|
|
||||||
|
|
||||||
def test_basic_use(tmpdir):
|
def test_basic_use(tmpdir):
|
||||||
'''
|
'''
|
||||||
Copied from the docs
|
Copied from the docs
|
||||||
'''
|
'''
|
||||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
||||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
response = urllib2.urlopen(
|
||||||
|
'http://www.iana.org/domains/reserved'
|
||||||
|
).read()
|
||||||
assert 'Example domains' in response
|
assert 'Example domains' in response
|
||||||
|
|
||||||
|
|
||||||
|
def test_basic_json_use(tmpdir):
|
||||||
|
'''
|
||||||
|
Ensure you can load a json serialized cassette
|
||||||
|
'''
|
||||||
|
test_fixture = 'fixtures/vcr_cassettes/synopsis.json'
|
||||||
|
with vcr.use_cassette(test_fixture, serializer='json'):
|
||||||
|
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert 'difficult sometimes' in response
|
||||||
|
|
||||||
|
|
||||||
|
def test_patched_content(tmpdir):
|
||||||
|
'''
|
||||||
|
Ensure that what you pull from a cassette is what came from the
|
||||||
|
request
|
||||||
|
'''
|
||||||
|
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||||
|
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 0
|
||||||
|
|
||||||
|
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||||
|
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 1
|
||||||
|
cass._save(force=True)
|
||||||
|
|
||||||
|
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||||
|
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 1
|
||||||
|
|
||||||
|
assert response == response2
|
||||||
|
assert response2 == response3
|
||||||
|
|
||||||
|
|
||||||
|
def test_patched_content_json(tmpdir):
|
||||||
|
'''
|
||||||
|
Ensure that what you pull from a json cassette is what came from the
|
||||||
|
request
|
||||||
|
'''
|
||||||
|
|
||||||
|
testfile = str(tmpdir.join('synopsis.json'))
|
||||||
|
|
||||||
|
with vcr.use_cassette(testfile) as cass:
|
||||||
|
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 0
|
||||||
|
|
||||||
|
with vcr.use_cassette(testfile) as cass:
|
||||||
|
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 1
|
||||||
|
cass._save(force=True)
|
||||||
|
|
||||||
|
with vcr.use_cassette(testfile) as cass:
|
||||||
|
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 1
|
||||||
|
|
||||||
|
assert response == response2
|
||||||
|
assert response2 == response3
|
||||||
|
|||||||
36
tests/integration/test_config.py
Normal file
36
tests/integration/test_config.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
import urllib2
|
||||||
|
import vcr
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_serializer_default_config(tmpdir):
|
||||||
|
my_vcr = vcr.VCR(serializer='json')
|
||||||
|
|
||||||
|
with my_vcr.use_cassette(str(tmpdir.join('test.json'))):
|
||||||
|
assert my_vcr.serializer == 'json'
|
||||||
|
urllib2.urlopen('http://httpbin.org/get')
|
||||||
|
|
||||||
|
with open(str(tmpdir.join('test.json'))) as f:
|
||||||
|
assert json.loads(f.read())
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_set_cassette_library_dir(tmpdir):
|
||||||
|
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||||
|
|
||||||
|
with my_vcr.use_cassette('test.json'):
|
||||||
|
urllib2.urlopen('http://httpbin.org/get')
|
||||||
|
|
||||||
|
assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||||
|
|
||||||
|
|
||||||
|
def test_override_set_cassette_library_dir(tmpdir):
|
||||||
|
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||||
|
|
||||||
|
cld = str(tmpdir.join('subdir2'))
|
||||||
|
|
||||||
|
with my_vcr.use_cassette('test.json', cassette_library_dir=cld):
|
||||||
|
urllib2.urlopen('http://httpbin.org/get')
|
||||||
|
|
||||||
|
assert os.path.exists(str(tmpdir.join('subdir2').join('test.json')))
|
||||||
|
assert not os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||||
55
tests/integration/test_disksaver.py
Normal file
55
tests/integration/test_disksaver.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
'''Basic tests about save behavior'''
|
||||||
|
# coding=utf-8
|
||||||
|
|
||||||
|
# External imports
|
||||||
|
import os
|
||||||
|
import urllib2
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Internal imports
|
||||||
|
import vcr
|
||||||
|
|
||||||
|
|
||||||
|
def test_disk_saver_nowrite(tmpdir):
|
||||||
|
'''
|
||||||
|
Ensure that when you close a cassette without changing it it doesn't
|
||||||
|
rewrite the file
|
||||||
|
'''
|
||||||
|
fname = str(tmpdir.join('synopsis.yaml'))
|
||||||
|
with vcr.use_cassette(fname) as cass:
|
||||||
|
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||||
|
assert cass.play_count == 0
|
||||||
|
last_mod = os.path.getmtime(fname)
|
||||||
|
|
||||||
|
with vcr.use_cassette(fname) as cass:
|
||||||
|
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||||
|
assert cass.play_count == 1
|
||||||
|
assert cass.dirty is False
|
||||||
|
last_mod2 = os.path.getmtime(fname)
|
||||||
|
|
||||||
|
assert last_mod == last_mod2
|
||||||
|
|
||||||
|
|
||||||
|
def test_disk_saver_write(tmpdir):
|
||||||
|
'''
|
||||||
|
Ensure that when you close a cassette after changing it it does
|
||||||
|
rewrite the file
|
||||||
|
'''
|
||||||
|
fname = str(tmpdir.join('synopsis.yaml'))
|
||||||
|
with vcr.use_cassette(fname) as cass:
|
||||||
|
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||||
|
assert cass.play_count == 0
|
||||||
|
last_mod = os.path.getmtime(fname)
|
||||||
|
|
||||||
|
# Make sure at least 1 second passes, otherwise sometimes
|
||||||
|
# the mtime doesn't change
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
with vcr.use_cassette(fname) as cass:
|
||||||
|
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||||
|
urllib2.urlopen('http://httpbin.org/').read()
|
||||||
|
assert cass.play_count == 1
|
||||||
|
assert cass.dirty
|
||||||
|
last_mod2 = os.path.getmtime(fname)
|
||||||
|
|
||||||
|
assert last_mod != last_mod2
|
||||||
35
tests/integration/test_register_serializer.py
Normal file
35
tests/integration/test_register_serializer.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
import urllib2
|
||||||
|
import vcr
|
||||||
|
|
||||||
|
|
||||||
|
class MockSerializer(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.serialize_count = 0
|
||||||
|
self.deserialize_count = 0
|
||||||
|
self.load_args = None
|
||||||
|
|
||||||
|
def deserialize(self, cassette_string):
|
||||||
|
self.serialize_count += 1
|
||||||
|
self.cassette_string = cassette_string
|
||||||
|
return ([], [])
|
||||||
|
|
||||||
|
def serialize(self, cassette_dict):
|
||||||
|
self.deserialize_count += 1
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_registered_serializer(tmpdir):
|
||||||
|
ms = MockSerializer()
|
||||||
|
my_vcr = vcr.VCR()
|
||||||
|
my_vcr.register_serializer('mock', ms)
|
||||||
|
tmpdir.join('test.mock').write('test_data')
|
||||||
|
with my_vcr.use_cassette(str(tmpdir.join('test.mock')), serializer='mock'):
|
||||||
|
urllib2.urlopen('http://httpbin.org/')
|
||||||
|
# Serializer deserialized once
|
||||||
|
assert ms.serialize_count == 1
|
||||||
|
# and serialized the test data string
|
||||||
|
assert ms.cassette_string == 'test_data'
|
||||||
|
# and hasn't serialized yet
|
||||||
|
assert ms.deserialize_count == 0
|
||||||
|
|
||||||
|
assert ms.serialize_count == 1
|
||||||
@@ -1,12 +1,11 @@
|
|||||||
import urllib2
|
import urllib2
|
||||||
import vcr
|
import vcr
|
||||||
|
|
||||||
|
|
||||||
def test_recorded_request_url_with_redirected_request(tmpdir):
|
def test_recorded_request_url_with_redirected_request(tmpdir):
|
||||||
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
|
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
|
||||||
assert len(cass) == 0
|
assert len(cass) == 0
|
||||||
urllib2.urlopen('http://google.com')
|
urllib2.urlopen('http://httpbin.org/redirect/3')
|
||||||
print cass.requests
|
assert cass.requests[0].url == 'http://httpbin.org/redirect/3'
|
||||||
print cass.requests[0]
|
assert cass.requests[3].url == 'http://httpbin.org/get'
|
||||||
assert cass.requests[0].url == 'http://google.com'
|
assert len(cass) == 4
|
||||||
assert cass.requests[1].url == 'http://www.google.com/'
|
|
||||||
assert len(cass) == 2
|
|
||||||
|
|||||||
@@ -2,18 +2,14 @@
|
|||||||
|
|
||||||
# coding=utf-8
|
# coding=utf-8
|
||||||
|
|
||||||
# Internal imports
|
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import vcr
|
import vcr
|
||||||
|
|
||||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||||
|
|
||||||
requests = pytest.importorskip("requests")
|
requests = pytest.importorskip("requests")
|
||||||
|
|
||||||
@pytest.fixture(params=["https","http"])
|
|
||||||
|
@pytest.fixture(params=["https", "http"])
|
||||||
def scheme(request):
|
def scheme(request):
|
||||||
"""
|
"""
|
||||||
Fixture that returns both http and https
|
Fixture that returns both http and https
|
||||||
@@ -31,6 +27,7 @@ def test_status_code(scheme, tmpdir):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_headers(scheme, tmpdir):
|
def test_headers(scheme, tmpdir):
|
||||||
'''Ensure that we can read the headers back'''
|
'''Ensure that we can read the headers back'''
|
||||||
url = scheme + '://httpbin.org/'
|
url = scheme + '://httpbin.org/'
|
||||||
@@ -41,6 +38,7 @@ def test_headers(scheme, tmpdir):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_body(tmpdir, scheme):
|
def test_body(tmpdir, scheme):
|
||||||
'''Ensure the responses are all identical enough'''
|
'''Ensure the responses are all identical enough'''
|
||||||
url = scheme + '://httpbin.org/bytes/1024'
|
url = scheme + '://httpbin.org/bytes/1024'
|
||||||
@@ -51,6 +49,7 @@ def test_body(tmpdir, scheme):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_auth(tmpdir, scheme):
|
def test_auth(tmpdir, scheme):
|
||||||
'''Ensure that we can handle basic auth'''
|
'''Ensure that we can handle basic auth'''
|
||||||
auth = ('user', 'passwd')
|
auth = ('user', 'passwd')
|
||||||
@@ -65,6 +64,7 @@ def test_auth(tmpdir, scheme):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_auth_failed(tmpdir, scheme):
|
def test_auth_failed(tmpdir, scheme):
|
||||||
'''Ensure that we can save failed auth statuses'''
|
'''Ensure that we can save failed auth statuses'''
|
||||||
auth = ('user', 'wrongwrongwrong')
|
auth = ('user', 'wrongwrongwrong')
|
||||||
@@ -79,6 +79,7 @@ def test_auth_failed(tmpdir, scheme):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_post(tmpdir, scheme):
|
def test_post(tmpdir, scheme):
|
||||||
'''Ensure that we can post and cache the results'''
|
'''Ensure that we can post and cache the results'''
|
||||||
data = {'key1': 'value1', 'key2': 'value2'}
|
data = {'key1': 'value1', 'key2': 'value2'}
|
||||||
@@ -86,10 +87,13 @@ def test_post(tmpdir, scheme):
|
|||||||
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cass:
|
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cass:
|
||||||
# Ensure that this is empty to begin with
|
# Ensure that this is empty to begin with
|
||||||
assert_cassette_empty(cass)
|
assert_cassette_empty(cass)
|
||||||
assert requests.post(url, data).content == requests.post(url, data).content
|
req1 = requests.post(url, data).content
|
||||||
|
req2 = requests.post(url, data).content
|
||||||
|
assert req1 == req2
|
||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_redirects(tmpdir, scheme):
|
def test_redirects(tmpdir, scheme):
|
||||||
'''Ensure that we can handle redirects'''
|
'''Ensure that we can handle redirects'''
|
||||||
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
|
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
|
||||||
@@ -102,6 +106,7 @@ def test_redirects(tmpdir, scheme):
|
|||||||
assert len(cass) == 2
|
assert len(cass) == 2
|
||||||
assert cass.play_count == 2
|
assert cass.play_count == 2
|
||||||
|
|
||||||
|
|
||||||
def test_cross_scheme(tmpdir, scheme):
|
def test_cross_scheme(tmpdir, scheme):
|
||||||
'''Ensure that requests between schemes are treated separately'''
|
'''Ensure that requests between schemes are treated separately'''
|
||||||
# First fetch a url under http, and then again under https and then
|
# First fetch a url under http, and then again under https and then
|
||||||
@@ -112,4 +117,3 @@ def test_cross_scheme(tmpdir, scheme):
|
|||||||
requests.get('http://httpbin.org/')
|
requests.get('http://httpbin.org/')
|
||||||
assert cass.play_count == 0
|
assert cass.play_count == 0
|
||||||
assert len(cass) == 2
|
assert len(cass) == 2
|
||||||
|
|
||||||
|
|||||||
@@ -12,13 +12,15 @@ import vcr
|
|||||||
|
|
||||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||||
|
|
||||||
@pytest.fixture(params=["https","http"])
|
|
||||||
|
@pytest.fixture(params=["https", "http"])
|
||||||
def scheme(request):
|
def scheme(request):
|
||||||
"""
|
"""
|
||||||
Fixture that returns both http and https
|
Fixture that returns both http and https
|
||||||
"""
|
"""
|
||||||
return request.param
|
return request.param
|
||||||
|
|
||||||
|
|
||||||
def test_response_code(scheme, tmpdir):
|
def test_response_code(scheme, tmpdir):
|
||||||
'''Ensure we can read a response code from a fetch'''
|
'''Ensure we can read a response code from a fetch'''
|
||||||
url = scheme + '://httpbin.org/'
|
url = scheme + '://httpbin.org/'
|
||||||
@@ -29,6 +31,7 @@ def test_response_code(scheme, tmpdir):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_random_body(scheme, tmpdir):
|
def test_random_body(scheme, tmpdir):
|
||||||
'''Ensure we can read the content, and that it's served from cache'''
|
'''Ensure we can read the content, and that it's served from cache'''
|
||||||
url = scheme + '://httpbin.org/bytes/1024'
|
url = scheme + '://httpbin.org/bytes/1024'
|
||||||
@@ -39,16 +42,20 @@ def test_random_body(scheme, tmpdir):
|
|||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_response_headers(scheme, tmpdir):
|
def test_response_headers(scheme, tmpdir):
|
||||||
'''Ensure we can get information from the response'''
|
'''Ensure we can get information from the response'''
|
||||||
url = scheme + '://httpbin.org/'
|
url = scheme + '://httpbin.org/'
|
||||||
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
|
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
|
||||||
# Ensure that this is empty to begin with
|
# Ensure that this is empty to begin with
|
||||||
assert_cassette_empty(cass)
|
assert_cassette_empty(cass)
|
||||||
assert urllib2.urlopen(url).info().items() == urllib2.urlopen(url).info().items()
|
open1 = urllib2.urlopen(url).info().items()
|
||||||
|
open2 = urllib2.urlopen(url).info().items()
|
||||||
|
assert open1 == open2
|
||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_multiple_requests(scheme, tmpdir):
|
def test_multiple_requests(scheme, tmpdir):
|
||||||
'''Ensure that we can cache multiple requests'''
|
'''Ensure that we can cache multiple requests'''
|
||||||
urls = [
|
urls = [
|
||||||
@@ -65,6 +72,7 @@ def test_multiple_requests(scheme, tmpdir):
|
|||||||
assert len(cass) == index + 1
|
assert len(cass) == index + 1
|
||||||
assert cass.play_count == index + 1
|
assert cass.play_count == index + 1
|
||||||
|
|
||||||
|
|
||||||
def test_get_data(scheme, tmpdir):
|
def test_get_data(scheme, tmpdir):
|
||||||
'''Ensure that it works with query data'''
|
'''Ensure that it works with query data'''
|
||||||
data = urlencode({'some': 1, 'data': 'here'})
|
data = urlencode({'some': 1, 'data': 'here'})
|
||||||
@@ -72,13 +80,14 @@ def test_get_data(scheme, tmpdir):
|
|||||||
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
|
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
|
||||||
# Ensure that this is empty to begin with
|
# Ensure that this is empty to begin with
|
||||||
assert_cassette_empty(cass)
|
assert_cassette_empty(cass)
|
||||||
res1 = urllib2.urlopen(url).read()
|
res1 = urllib2.urlopen(url).read()
|
||||||
res2 = urllib2.urlopen(url).read()
|
res2 = urllib2.urlopen(url).read()
|
||||||
assert res1 == res2
|
assert res1 == res2
|
||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert len(cass) == 1
|
assert len(cass) == 1
|
||||||
assert cass.play_count == 1
|
assert cass.play_count == 1
|
||||||
|
|
||||||
|
|
||||||
def test_post_data(scheme, tmpdir):
|
def test_post_data(scheme, tmpdir):
|
||||||
'''Ensure that it works when posting data'''
|
'''Ensure that it works when posting data'''
|
||||||
data = urlencode({'some': 1, 'data': 'here'})
|
data = urlencode({'some': 1, 'data': 'here'})
|
||||||
@@ -86,12 +95,13 @@ def test_post_data(scheme, tmpdir):
|
|||||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||||
# Ensure that this is empty to begin with
|
# Ensure that this is empty to begin with
|
||||||
assert_cassette_empty(cass)
|
assert_cassette_empty(cass)
|
||||||
res1 = urllib2.urlopen(url, data).read()
|
res1 = urllib2.urlopen(url, data).read()
|
||||||
res2 = urllib2.urlopen(url, data).read()
|
res2 = urllib2.urlopen(url, data).read()
|
||||||
assert res1 == res2
|
assert res1 == res2
|
||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_post_unicode_data(scheme, tmpdir):
|
def test_post_unicode_data(scheme, tmpdir):
|
||||||
'''Ensure that it works when posting unicode data'''
|
'''Ensure that it works when posting unicode data'''
|
||||||
data = urlencode({'snowman': u'☃'.encode('utf-8')})
|
data = urlencode({'snowman': u'☃'.encode('utf-8')})
|
||||||
@@ -99,12 +109,13 @@ def test_post_unicode_data(scheme, tmpdir):
|
|||||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||||
# Ensure that this is empty to begin with
|
# Ensure that this is empty to begin with
|
||||||
assert_cassette_empty(cass)
|
assert_cassette_empty(cass)
|
||||||
res1 = urllib2.urlopen(url, data).read()
|
res1 = urllib2.urlopen(url, data).read()
|
||||||
res2 = urllib2.urlopen(url, data).read()
|
res2 = urllib2.urlopen(url, data).read()
|
||||||
assert res1 == res2
|
assert res1 == res2
|
||||||
# Ensure that we've now cached a single response
|
# Ensure that we've now cached a single response
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|
||||||
|
|
||||||
def test_cross_scheme(tmpdir):
|
def test_cross_scheme(tmpdir):
|
||||||
'''Ensure that requests between schemes are treated separately'''
|
'''Ensure that requests between schemes are treated separately'''
|
||||||
# First fetch a url under https, and then again under https and then
|
# First fetch a url under https, and then again under https and then
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ requests = pytest.importorskip("requests")
|
|||||||
|
|
||||||
import vcr
|
import vcr
|
||||||
|
|
||||||
|
|
||||||
def test_domain_redirect():
|
def test_domain_redirect():
|
||||||
'''Ensure that redirects across domains are considered unique'''
|
'''Ensure that redirects across domains are considered unique'''
|
||||||
# In this example, seomoz.org redirects to moz.com, and if those
|
# In this example, seomoz.org redirects to moz.com, and if those
|
||||||
@@ -14,4 +15,3 @@ def test_domain_redirect():
|
|||||||
# Ensure that we've now served two responses. One for the original
|
# Ensure that we've now served two responses. One for the original
|
||||||
# redirect, and a second for the actual fetch
|
# redirect, and a second for the actual fetch
|
||||||
assert len(cass) == 2
|
assert len(cass) == 2
|
||||||
|
|
||||||
|
|||||||
@@ -2,24 +2,28 @@ import pytest
|
|||||||
import yaml
|
import yaml
|
||||||
from vcr.cassette import Cassette
|
from vcr.cassette import Cassette
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_load(tmpdir):
|
def test_cassette_load(tmpdir):
|
||||||
a_file = tmpdir.join('test_cassette.yml')
|
a_file = tmpdir.join('test_cassette.yml')
|
||||||
a_file.write(yaml.dump([
|
a_file.write(yaml.dump([
|
||||||
{'request':'foo', 'response':'bar'}
|
{'request': 'foo', 'response': 'bar'}
|
||||||
]))
|
]))
|
||||||
a_cassette = Cassette.load(str(a_file))
|
a_cassette = Cassette.load(str(a_file))
|
||||||
assert len(a_cassette) == 1
|
assert len(a_cassette) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_not_played():
|
def test_cassette_not_played():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
assert not a.play_count
|
assert not a.play_count
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_played():
|
def test_cassette_played():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
a.mark_played('foo')
|
a.mark_played('foo')
|
||||||
a.mark_played('foo')
|
a.mark_played('foo')
|
||||||
assert a.play_count == 2
|
assert a.play_count == 2
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_play_counter():
|
def test_cassette_play_counter():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
a.mark_played('foo')
|
a.mark_played('foo')
|
||||||
@@ -27,28 +31,33 @@ def test_cassette_play_counter():
|
|||||||
assert a.play_counts['foo'] == 1
|
assert a.play_counts['foo'] == 1
|
||||||
assert a.play_counts['bar'] == 1
|
assert a.play_counts['bar'] == 1
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_append():
|
def test_cassette_append():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
a.append('foo', 'bar')
|
a.append('foo', 'bar')
|
||||||
assert a.requests == ['foo']
|
assert a.requests == ['foo']
|
||||||
assert a.responses == ['bar']
|
assert a.responses == ['bar']
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_len():
|
def test_cassette_len():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
a.append('foo','bar')
|
a.append('foo', 'bar')
|
||||||
a.append('foo2','bar2')
|
a.append('foo2', 'bar2')
|
||||||
assert len(a) == 2
|
assert len(a) == 2
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_contains():
|
def test_cassette_contains():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
a.append('foo','bar')
|
a.append('foo', 'bar')
|
||||||
assert 'foo' in a
|
assert 'foo' in a
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_response_of():
|
def test_cassette_response_of():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
a.append('foo','bar')
|
a.append('foo', 'bar')
|
||||||
assert a.response_of('foo') == 'bar'
|
assert a.response_of('foo') == 'bar'
|
||||||
|
|
||||||
|
|
||||||
def test_cassette_get_missing_response():
|
def test_cassette_get_missing_response():
|
||||||
a = Cassette('test')
|
a = Cassette('test')
|
||||||
with pytest.raises(KeyError):
|
with pytest.raises(KeyError):
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
from vcr.request import Request
|
from vcr.request import Request
|
||||||
|
|
||||||
|
|
||||||
def test_url():
|
def test_url():
|
||||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||||
assert req.url == 'http://www.google.com/'
|
assert req.url == 'http://www.google.com/'
|
||||||
|
|
||||||
|
|
||||||
def test_str():
|
def test_str():
|
||||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||||
str(req) == '<Request (GET) http://www.google.com>'
|
str(req) == '<Request (GET) http://www.google.com>'
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
# Import Cassette to make it available at the top level
|
from config import VCR
|
||||||
from .cassette import Cassette
|
|
||||||
|
default_vcr = VCR()
|
||||||
|
|
||||||
|
|
||||||
# Also, make a 'load' function available
|
# Also, make a 'load' function available
|
||||||
def use_cassette(path):
|
def use_cassette(path, **kwargs):
|
||||||
return Cassette.load(path)
|
return default_vcr.use_cassette(path, **kwargs)
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
'''The container for recorded requests and responses'''
|
'''The container for recorded requests and responses'''
|
||||||
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
try:
|
try:
|
||||||
from collections import Counter, OrderedDict
|
from collections import Counter, OrderedDict
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -11,25 +9,24 @@ except ImportError:
|
|||||||
# Internal imports
|
# Internal imports
|
||||||
from .patch import install, reset
|
from .patch import install, reset
|
||||||
from .persist import load_cassette, save_cassette
|
from .persist import load_cassette, save_cassette
|
||||||
|
from .serializers import yamlserializer
|
||||||
|
|
||||||
|
|
||||||
class Cassette(object):
|
class Cassette(object):
|
||||||
'''A container for recorded requests and responses'''
|
'''A container for recorded requests and responses'''
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls, path):
|
def load(cls, path, **kwargs):
|
||||||
'''Load in the cassette stored at the provided path'''
|
'''Load in the cassette stored at the provided path'''
|
||||||
new_cassette = cls(path)
|
new_cassette = cls(path, **kwargs)
|
||||||
try:
|
new_cassette._load()
|
||||||
requests, responses = load_cassette(path)
|
|
||||||
for request, response in zip(requests, responses):
|
|
||||||
new_cassette.append(request, response)
|
|
||||||
except IOError:
|
|
||||||
pass
|
|
||||||
return new_cassette
|
return new_cassette
|
||||||
|
|
||||||
def __init__(self, path):
|
def __init__(self, path, serializer=yamlserializer):
|
||||||
self._path = path
|
self._path = path
|
||||||
|
self._serializer = serializer
|
||||||
self.data = OrderedDict()
|
self.data = OrderedDict()
|
||||||
self.play_counts = Counter()
|
self.play_counts = Counter()
|
||||||
|
self.dirty = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def play_count(self):
|
def play_count(self):
|
||||||
@@ -52,16 +49,40 @@ class Cassette(object):
|
|||||||
def append(self, request, response):
|
def append(self, request, response):
|
||||||
'''Add a request, response pair to this cassette'''
|
'''Add a request, response pair to this cassette'''
|
||||||
self.data[request] = response
|
self.data[request] = response
|
||||||
|
self.dirty = True
|
||||||
|
|
||||||
def response_of(self, request):
|
def response_of(self, request):
|
||||||
'''Find the response corresponding to a request'''
|
'''Find the response corresponding to a request'''
|
||||||
return self.data[request]
|
return self.data[request]
|
||||||
|
|
||||||
def _save(self):
|
def _as_dict(self):
|
||||||
save_cassette(self._path, self.requests, self.responses)
|
return {"requests": self.requests, "responses": self.responses}
|
||||||
|
|
||||||
|
def _save(self, force=False):
|
||||||
|
if force or self.dirty:
|
||||||
|
save_cassette(
|
||||||
|
self._path,
|
||||||
|
self._as_dict(),
|
||||||
|
serializer=self._serializer
|
||||||
|
)
|
||||||
|
self.dirty = False
|
||||||
|
|
||||||
|
def _load(self):
|
||||||
|
try:
|
||||||
|
requests, responses = load_cassette(
|
||||||
|
self._path,
|
||||||
|
serializer=self._serializer
|
||||||
|
)
|
||||||
|
for request, response in zip(requests, responses):
|
||||||
|
self.append(request, response)
|
||||||
|
self.dirty = False
|
||||||
|
except IOError:
|
||||||
|
pass
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "<Cassette containing {0} recorded response(s)>".format(len(self))
|
return "<Cassette containing {0} recorded response(s)>".format(
|
||||||
|
len(self)
|
||||||
|
)
|
||||||
|
|
||||||
def __len__(self):
|
def __len__(self):
|
||||||
'''Return the number of request,response pairs stored in here'''
|
'''Return the number of request,response pairs stored in here'''
|
||||||
|
|||||||
42
vcr/config.py
Normal file
42
vcr/config.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import os
|
||||||
|
from .cassette import Cassette
|
||||||
|
from .serializers import yamlserializer, jsonserializer
|
||||||
|
|
||||||
|
|
||||||
|
class VCR(object):
|
||||||
|
def __init__(self, serializer='yaml', cassette_library_dir=None):
|
||||||
|
self.serializer = serializer
|
||||||
|
self.cassette_library_dir = cassette_library_dir
|
||||||
|
self.serializers = {
|
||||||
|
'yaml': yamlserializer,
|
||||||
|
'json': jsonserializer,
|
||||||
|
}
|
||||||
|
|
||||||
|
def _get_serializer(self, serializer_name):
|
||||||
|
try:
|
||||||
|
serializer = self.serializers[serializer_name]
|
||||||
|
except KeyError:
|
||||||
|
print "Serializer {0} doesn't exist or isn't registered".format(
|
||||||
|
serializer_name
|
||||||
|
)
|
||||||
|
raise KeyError
|
||||||
|
return serializer
|
||||||
|
|
||||||
|
def use_cassette(self, path, **kwargs):
|
||||||
|
serializer_name = kwargs.get('serializer', self.serializer)
|
||||||
|
cassette_library_dir = kwargs.get(
|
||||||
|
'cassette_library_dir',
|
||||||
|
self.cassette_library_dir
|
||||||
|
)
|
||||||
|
|
||||||
|
if cassette_library_dir:
|
||||||
|
path = os.path.join(cassette_library_dir, path)
|
||||||
|
|
||||||
|
merged_config = {
|
||||||
|
"serializer": self._get_serializer(serializer_name),
|
||||||
|
}
|
||||||
|
|
||||||
|
return Cassette.load(path, **merged_config)
|
||||||
|
|
||||||
|
def register_serializer(self, name, serializer):
|
||||||
|
self.serializers[name] = serializer
|
||||||
@@ -59,7 +59,7 @@ def reset():
|
|||||||
'''Undo all the patching'''
|
'''Undo all the patching'''
|
||||||
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
|
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
|
||||||
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
|
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
|
||||||
_HTTPSConnection
|
_HTTPSConnection
|
||||||
try:
|
try:
|
||||||
import requests.packages.urllib3.connectionpool as cpool
|
import requests.packages.urllib3.connectionpool as cpool
|
||||||
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
|
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
|
||||||
|
|||||||
@@ -1,44 +1,11 @@
|
|||||||
import tempfile
|
from .persisters.filesystem import FilesystemPersister
|
||||||
import os
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
# Use the libYAML versions if possible
|
|
||||||
try:
|
|
||||||
from yaml import CLoader as Loader, CDumper as Dumper
|
|
||||||
except ImportError:
|
|
||||||
from yaml import Loader, Dumper
|
|
||||||
|
|
||||||
def _serialize_cassette(requests, responses):
|
def load_cassette(cassette_path, serializer):
|
||||||
'''Return a serializable version of the cassette'''
|
with open(cassette_path) as f:
|
||||||
return ([{
|
return serializer.deserialize(f.read())
|
||||||
'request': request,
|
|
||||||
'response': response,
|
|
||||||
} for request, response in zip(requests, responses)])
|
|
||||||
|
|
||||||
def _deserialize_cassette(data):
|
|
||||||
requests = [r['request'] for r in data]
|
|
||||||
responses = [r['response'] for r in data]
|
|
||||||
return requests, responses
|
|
||||||
|
|
||||||
def _secure_write(path, contents):
|
def save_cassette(cassette_path, cassette_dict, serializer):
|
||||||
"""
|
data = serializer.serialize(cassette_dict)
|
||||||
We'll overwrite the old version securely by writing out a temporary
|
FilesystemPersister.write(cassette_path, data)
|
||||||
version and then moving it to replace the old version
|
|
||||||
"""
|
|
||||||
dirname, filename = os.path.split(path)
|
|
||||||
fd, name = tempfile.mkstemp(dir=dirname, prefix=filename)
|
|
||||||
with os.fdopen(fd, 'w') as fout:
|
|
||||||
fout.write(contents)
|
|
||||||
os.rename(name, path)
|
|
||||||
|
|
||||||
def load_cassette(cassette_path):
|
|
||||||
data = yaml.load(open(cassette_path), Loader=Loader)
|
|
||||||
return _deserialize_cassette(data)
|
|
||||||
|
|
||||||
def save_cassette(cassette_path, requests, responses):
|
|
||||||
dirname, filename = os.path.split(cassette_path)
|
|
||||||
if dirname and not os.path.exists(dirname):
|
|
||||||
os.makedirs(dirname)
|
|
||||||
data = _serialize_cassette(requests, responses)
|
|
||||||
data = yaml.dump(data, Dumper=Dumper)
|
|
||||||
_secure_write(cassette_path, data)
|
|
||||||
|
|||||||
0
vcr/persisters/__init__.py
Normal file
0
vcr/persisters/__init__.py
Normal file
23
vcr/persisters/filesystem.py
Normal file
23
vcr/persisters/filesystem.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class FilesystemPersister(object):
|
||||||
|
@classmethod
|
||||||
|
def _secure_write(cls, path, contents):
|
||||||
|
"""
|
||||||
|
We'll overwrite the old version securely by writing out a temporary
|
||||||
|
version and then moving it to replace the old version
|
||||||
|
"""
|
||||||
|
dirname, filename = os.path.split(path)
|
||||||
|
fd, name = tempfile.mkstemp(dir=dirname, prefix=filename)
|
||||||
|
with os.fdopen(fd, 'w') as fout:
|
||||||
|
fout.write(contents)
|
||||||
|
os.rename(name, path)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def write(cls, cassette_path, data):
|
||||||
|
dirname, filename = os.path.split(cassette_path)
|
||||||
|
if dirname and not os.path.exists(dirname):
|
||||||
|
os.makedirs(dirname)
|
||||||
|
cls._secure_write(cassette_path, data)
|
||||||
@@ -7,7 +7,7 @@ class Request(object):
|
|||||||
self.method = method
|
self.method = method
|
||||||
self.path = path
|
self.path = path
|
||||||
self.body = body
|
self.body = body
|
||||||
# make haders a frozenset so it will be hashable
|
# make headers a frozenset so it will be hashable
|
||||||
self.headers = frozenset(headers.items())
|
self.headers = frozenset(headers.items())
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -15,7 +15,14 @@ class Request(object):
|
|||||||
return "{0}://{1}{2}".format(self.protocol, self.host, self.path)
|
return "{0}://{1}{2}".format(self.protocol, self.host, self.path)
|
||||||
|
|
||||||
def __key(self):
|
def __key(self):
|
||||||
return (self.host, self.port, self.method, self.path, self.body, self.headers)
|
return (
|
||||||
|
self.host,
|
||||||
|
self.port,
|
||||||
|
self.method,
|
||||||
|
self.path,
|
||||||
|
self.body,
|
||||||
|
self.headers
|
||||||
|
)
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.__key())
|
return hash(self.__key())
|
||||||
@@ -28,3 +35,18 @@ class Request(object):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return self.__str__()
|
return self.__str__()
|
||||||
|
|
||||||
|
def _to_dict(self):
|
||||||
|
return {
|
||||||
|
'protocol': self.protocol,
|
||||||
|
'host': self.host,
|
||||||
|
'port': self.port,
|
||||||
|
'method': self.method,
|
||||||
|
'path': self.path,
|
||||||
|
'body': self.body,
|
||||||
|
'headers': self.headers,
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _from_dict(cls, dct):
|
||||||
|
return Request(**dct)
|
||||||
|
|||||||
0
vcr/serializers/__init__.py
Normal file
0
vcr/serializers/__init__.py
Normal file
34
vcr/serializers/jsonserializer.py
Normal file
34
vcr/serializers/jsonserializer.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
from vcr.request import Request
|
||||||
|
try:
|
||||||
|
import simplejson as json
|
||||||
|
except ImportError:
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
def _json_default(obj):
|
||||||
|
if isinstance(obj, frozenset):
|
||||||
|
return dict(obj)
|
||||||
|
return obj
|
||||||
|
|
||||||
|
|
||||||
|
def _fix_response_unicode(d):
|
||||||
|
d['body']['string'] = d['body']['string'].encode('utf-8')
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def deserialize(cassette_string):
|
||||||
|
data = json.loads(cassette_string)
|
||||||
|
requests = [Request._from_dict(r['request']) for r in data]
|
||||||
|
responses = [_fix_response_unicode(r['response']) for r in data]
|
||||||
|
return requests, responses
|
||||||
|
|
||||||
|
|
||||||
|
def serialize(cassette_dict):
|
||||||
|
data = ([{
|
||||||
|
'request': request._to_dict(),
|
||||||
|
'response': response,
|
||||||
|
} for request, response in zip(
|
||||||
|
cassette_dict['requests'],
|
||||||
|
cassette_dict['responses']
|
||||||
|
)])
|
||||||
|
return json.dumps(data, indent=4, default=_json_default)
|
||||||
25
vcr/serializers/yamlserializer.py
Normal file
25
vcr/serializers/yamlserializer.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import yaml
|
||||||
|
|
||||||
|
# Use the libYAML versions if possible
|
||||||
|
try:
|
||||||
|
from yaml import CLoader as Loader, CDumper as Dumper
|
||||||
|
except ImportError:
|
||||||
|
from yaml import Loader, Dumper
|
||||||
|
|
||||||
|
|
||||||
|
def deserialize(cassette_string):
|
||||||
|
data = yaml.load(cassette_string, Loader=Loader)
|
||||||
|
requests = [r['request'] for r in data]
|
||||||
|
responses = [r['response'] for r in data]
|
||||||
|
return requests, responses
|
||||||
|
|
||||||
|
|
||||||
|
def serialize(cassette_dict):
|
||||||
|
data = ([{
|
||||||
|
'request': request,
|
||||||
|
'response': response,
|
||||||
|
} for request, response in zip(
|
||||||
|
cassette_dict['requests'],
|
||||||
|
cassette_dict['responses']
|
||||||
|
)])
|
||||||
|
return yaml.dump(data, Dumper=Dumper)
|
||||||
@@ -18,7 +18,7 @@ class VCRHTTPResponse(object):
|
|||||||
self._content = StringIO(self.recorded_response['body']['string'])
|
self._content = StringIO(self.recorded_response['body']['string'])
|
||||||
|
|
||||||
# We are skipping the header parsing (they have already been parsed
|
# We are skipping the header parsing (they have already been parsed
|
||||||
# at this point) and directly adding the headers to the header
|
# at this point) and directly adding the headers to the header
|
||||||
# container, so just pass an empty StringIO.
|
# container, so just pass an empty StringIO.
|
||||||
self.msg = HTTPMessage(StringIO(''))
|
self.msg = HTTPMessage(StringIO(''))
|
||||||
|
|
||||||
@@ -55,13 +55,13 @@ class VCRConnectionMixin:
|
|||||||
def request(self, method, url, body=None, headers=None):
|
def request(self, method, url, body=None, headers=None):
|
||||||
'''Persist the request metadata in self._vcr_request'''
|
'''Persist the request metadata in self._vcr_request'''
|
||||||
self._vcr_request = Request(
|
self._vcr_request = Request(
|
||||||
protocol = self._protocol,
|
protocol=self._protocol,
|
||||||
host = self.host,
|
host=self.host,
|
||||||
port = self.port,
|
port=self.port,
|
||||||
method = method,
|
method=method,
|
||||||
path = url,
|
path=url,
|
||||||
body = body,
|
body=body,
|
||||||
headers = headers or {}
|
headers=headers or {}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check if we have a cassette set, and if we have a response saved.
|
# Check if we have a cassette set, and if we have a response saved.
|
||||||
@@ -84,11 +84,14 @@ class VCRConnectionMixin:
|
|||||||
self.cassette.mark_played(self._vcr_request)
|
self.cassette.mark_played(self._vcr_request)
|
||||||
return VCRHTTPResponse(response)
|
return VCRHTTPResponse(response)
|
||||||
else:
|
else:
|
||||||
# Otherwise, we made an actual request, and should return the response
|
# Otherwise, we made an actual request, and should return the
|
||||||
# we got from the actual connection
|
# response we got from the actual connection
|
||||||
response = HTTPConnection.getresponse(self)
|
response = HTTPConnection.getresponse(self)
|
||||||
response = {
|
response = {
|
||||||
'status': {'code': response.status, 'message': response.reason},
|
'status': {
|
||||||
|
'code': response.status,
|
||||||
|
'message': response.reason
|
||||||
|
},
|
||||||
'headers': dict(response.getheaders()),
|
'headers': dict(response.getheaders()),
|
||||||
'body': {'string': response.read()},
|
'body': {'string': response.read()},
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user