mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-08 16:53:23 +00:00
pep8 fixes
This commit is contained in:
@@ -2,6 +2,7 @@ def assert_cassette_empty(cass):
|
||||
assert len(cass) == 0
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def assert_cassette_has_one_response(cass):
|
||||
assert len(cass) == 1
|
||||
assert cass.play_count == 1
|
||||
|
||||
@@ -8,17 +8,19 @@ import urllib2
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
def test_nonexistent_directory(tmpdir):
|
||||
'''If we load a cassette in a nonexistent directory, it can save ok'''
|
||||
# Check to make sure directory doesnt exist
|
||||
assert not os.path.exists(str(tmpdir.join('nonexistent')))
|
||||
|
||||
# Run VCR to create dir and cassette file
|
||||
with vcr.use_cassette(str(tmpdir.join('nonexistent','cassette.yml'))):
|
||||
with vcr.use_cassette(str(tmpdir.join('nonexistent', 'cassette.yml'))):
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
|
||||
# This should have made the file and the directory
|
||||
assert os.path.exists(str(tmpdir.join('nonexistent','cassette.yml')))
|
||||
assert os.path.exists(str(tmpdir.join('nonexistent', 'cassette.yml')))
|
||||
|
||||
|
||||
def test_unpatch(tmpdir):
|
||||
'''Ensure that our cassette gets unpatched when we're done'''
|
||||
@@ -30,51 +32,69 @@ def test_unpatch(tmpdir):
|
||||
urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
|
||||
def test_basic_use(tmpdir):
|
||||
'''
|
||||
Copied from the docs
|
||||
Copied from the docs
|
||||
'''
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
|
||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
response = urllib2.urlopen(
|
||||
'http://www.iana.org/domains/reserved'
|
||||
).read()
|
||||
assert 'Example domains' in response
|
||||
|
||||
|
||||
def test_basic_json_use(tmpdir):
|
||||
'''Ensure you can load a json serialized cassette'''
|
||||
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.json', serializer='json'):
|
||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert 'Example domains' in response
|
||||
'''
|
||||
Ensure you can load a json serialized cassette
|
||||
'''
|
||||
test_fixture = 'fixtures/vcr_cassettes/synopsis.json'
|
||||
with vcr.use_cassette(test_fixture, serializer='json'):
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert 'difficult sometimes' in response
|
||||
|
||||
|
||||
def test_patched_content(tmpdir):
|
||||
'''Ensure that what you pull from a cassette is what came from the request'''
|
||||
'''
|
||||
Ensure that what you pull from a cassette is what came from the
|
||||
request
|
||||
'''
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response2 = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
cass._save(force=True)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
|
||||
response3 = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
|
||||
assert response == response2
|
||||
assert response2 == response3
|
||||
|
||||
|
||||
def test_patched_content_json(tmpdir):
|
||||
'''Ensure that what you pull from a json cassette is what came from the request'''
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.json')), serializer='json') as cass:
|
||||
response = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
'''
|
||||
Ensure that what you pull from a json cassette is what came from the
|
||||
request
|
||||
'''
|
||||
|
||||
testfile = str(tmpdir.join('synopsis.json'))
|
||||
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 0
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.json')), serializer='json') as cass:
|
||||
response2 = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response2 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
cass._save(force=True)
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('synopsis.json')), serializer='json') as cass:
|
||||
response3 = urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
with vcr.use_cassette(testfile) as cass:
|
||||
response3 = urllib2.urlopen('http://httpbin.org/').read()
|
||||
assert cass.play_count == 1
|
||||
|
||||
assert response == response2
|
||||
|
||||
@@ -3,6 +3,7 @@ import json
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def test_set_serializer_default_config(tmpdir):
|
||||
my_vcr = vcr.VCR(serializer='json')
|
||||
|
||||
@@ -13,6 +14,7 @@ def test_set_serializer_default_config(tmpdir):
|
||||
with open(str(tmpdir.join('test.json'))) as f:
|
||||
assert json.loads(f.read())
|
||||
|
||||
|
||||
def test_default_set_cassette_library_dir(tmpdir):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||
|
||||
@@ -21,12 +23,14 @@ def test_default_set_cassette_library_dir(tmpdir):
|
||||
|
||||
assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||
|
||||
|
||||
def test_override_set_cassette_library_dir(tmpdir):
|
||||
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
|
||||
|
||||
with my_vcr.use_cassette('test.json', cassette_library_dir=str(tmpdir.join('subdir2'))):
|
||||
cld = str(tmpdir.join('subdir2'))
|
||||
|
||||
with my_vcr.use_cassette('test.json', cassette_library_dir=cld):
|
||||
urllib2.urlopen('http://httpbin.org/get')
|
||||
|
||||
assert os.path.exists(str(tmpdir.join('subdir2').join('test.json')))
|
||||
assert not os.path.exists(str(tmpdir.join('subdir').join('test.json')))
|
||||
|
||||
|
||||
@@ -9,8 +9,12 @@ import time
|
||||
# Internal imports
|
||||
import vcr
|
||||
|
||||
|
||||
def test_disk_saver_nowrite(tmpdir):
|
||||
'''Ensure that when you close a cassette without changing it it doesn't rewrite the file'''
|
||||
'''
|
||||
Ensure that when you close a cassette without changing it it doesn't
|
||||
rewrite the file
|
||||
'''
|
||||
fname = str(tmpdir.join('synopsis.yaml'))
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
@@ -20,21 +24,26 @@ def test_disk_saver_nowrite(tmpdir):
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 1
|
||||
assert cass.dirty == False
|
||||
assert cass.dirty is False
|
||||
last_mod2 = os.path.getmtime(fname)
|
||||
|
||||
assert last_mod == last_mod2
|
||||
|
||||
|
||||
def test_disk_saver_write(tmpdir):
|
||||
'''Ensure that when you close a cassette after changing it it does rewrite the file'''
|
||||
'''
|
||||
Ensure that when you close a cassette after changing it it does
|
||||
rewrite the file
|
||||
'''
|
||||
fname = str(tmpdir.join('synopsis.yaml'))
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
assert cass.play_count == 0
|
||||
last_mod = os.path.getmtime(fname)
|
||||
|
||||
time.sleep(1) # Make sure at least 1 second passes, otherwise sometimes
|
||||
# Make sure at least 1 second passes, otherwise sometimes
|
||||
# the mtime doesn't change
|
||||
time.sleep(1)
|
||||
|
||||
with vcr.use_cassette(fname) as cass:
|
||||
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
|
||||
@@ -44,4 +53,3 @@ def test_disk_saver_write(tmpdir):
|
||||
last_mod2 = os.path.getmtime(fname)
|
||||
|
||||
assert last_mod != last_mod2
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
class MockSerializer(object):
|
||||
def __init__(self):
|
||||
self.serialize_count = 0
|
||||
@@ -16,6 +17,7 @@ class MockSerializer(object):
|
||||
self.deserialize_count += 1
|
||||
return ""
|
||||
|
||||
|
||||
def test_registered_serializer(tmpdir):
|
||||
ms = MockSerializer()
|
||||
my_vcr = vcr.VCR()
|
||||
@@ -23,11 +25,11 @@ def test_registered_serializer(tmpdir):
|
||||
tmpdir.join('test.mock').write('test_data')
|
||||
with my_vcr.use_cassette(str(tmpdir.join('test.mock')), serializer='mock'):
|
||||
urllib2.urlopen('http://httpbin.org/')
|
||||
assert ms.serialize_count == 1 # Serializer deserialized once
|
||||
assert ms.cassette_string == 'test_data' # and serialized the test data string
|
||||
assert ms.deserialize_count == 0 # and hasn't serialized yet
|
||||
# Serializer deserialized once
|
||||
assert ms.serialize_count == 1
|
||||
# and serialized the test data string
|
||||
assert ms.cassette_string == 'test_data'
|
||||
# and hasn't serialized yet
|
||||
assert ms.deserialize_count == 0
|
||||
|
||||
assert ms.serialize_count == 1
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import urllib2
|
||||
import vcr
|
||||
|
||||
|
||||
def test_recorded_request_url_with_redirected_request(tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
|
||||
assert len(cass) == 0
|
||||
|
||||
@@ -2,18 +2,14 @@
|
||||
|
||||
# coding=utf-8
|
||||
|
||||
# Internal imports
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
import vcr
|
||||
|
||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||
|
||||
requests = pytest.importorskip("requests")
|
||||
|
||||
@pytest.fixture(params=["https","http"])
|
||||
|
||||
@pytest.fixture(params=["https", "http"])
|
||||
def scheme(request):
|
||||
"""
|
||||
Fixture that returns both http and https
|
||||
@@ -31,6 +27,7 @@ def test_status_code(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_headers(scheme, tmpdir):
|
||||
'''Ensure that we can read the headers back'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
@@ -41,6 +38,7 @@ def test_headers(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_body(tmpdir, scheme):
|
||||
'''Ensure the responses are all identical enough'''
|
||||
url = scheme + '://httpbin.org/bytes/1024'
|
||||
@@ -51,6 +49,7 @@ def test_body(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_auth(tmpdir, scheme):
|
||||
'''Ensure that we can handle basic auth'''
|
||||
auth = ('user', 'passwd')
|
||||
@@ -65,6 +64,7 @@ def test_auth(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_auth_failed(tmpdir, scheme):
|
||||
'''Ensure that we can save failed auth statuses'''
|
||||
auth = ('user', 'wrongwrongwrong')
|
||||
@@ -79,6 +79,7 @@ def test_auth_failed(tmpdir, scheme):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_post(tmpdir, scheme):
|
||||
'''Ensure that we can post and cache the results'''
|
||||
data = {'key1': 'value1', 'key2': 'value2'}
|
||||
@@ -86,10 +87,13 @@ def test_post(tmpdir, scheme):
|
||||
with vcr.use_cassette(str(tmpdir.join('redirect.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert requests.post(url, data).content == requests.post(url, data).content
|
||||
req1 = requests.post(url, data).content
|
||||
req2 = requests.post(url, data).content
|
||||
assert req1 == req2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_redirects(tmpdir, scheme):
|
||||
'''Ensure that we can handle redirects'''
|
||||
url = scheme + '://httpbin.org/redirect-to?url=bytes/1024'
|
||||
@@ -102,6 +106,7 @@ def test_redirects(tmpdir, scheme):
|
||||
assert len(cass) == 2
|
||||
assert cass.play_count == 2
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir, scheme):
|
||||
'''Ensure that requests between schemes are treated separately'''
|
||||
# First fetch a url under http, and then again under https and then
|
||||
@@ -112,4 +117,3 @@ def test_cross_scheme(tmpdir, scheme):
|
||||
requests.get('http://httpbin.org/')
|
||||
assert cass.play_count == 0
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
@@ -12,13 +12,15 @@ import vcr
|
||||
|
||||
from assertions import assert_cassette_empty, assert_cassette_has_one_response
|
||||
|
||||
@pytest.fixture(params=["https","http"])
|
||||
|
||||
@pytest.fixture(params=["https", "http"])
|
||||
def scheme(request):
|
||||
"""
|
||||
Fixture that returns both http and https
|
||||
"""
|
||||
return request.param
|
||||
|
||||
|
||||
def test_response_code(scheme, tmpdir):
|
||||
'''Ensure we can read a response code from a fetch'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
@@ -29,6 +31,7 @@ def test_response_code(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_random_body(scheme, tmpdir):
|
||||
'''Ensure we can read the content, and that it's served from cache'''
|
||||
url = scheme + '://httpbin.org/bytes/1024'
|
||||
@@ -39,16 +42,20 @@ def test_random_body(scheme, tmpdir):
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_response_headers(scheme, tmpdir):
|
||||
'''Ensure we can get information from the response'''
|
||||
url = scheme + '://httpbin.org/'
|
||||
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
assert urllib2.urlopen(url).info().items() == urllib2.urlopen(url).info().items()
|
||||
open1 = urllib2.urlopen(url).info().items()
|
||||
open2 = urllib2.urlopen(url).info().items()
|
||||
assert open1 == open2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_multiple_requests(scheme, tmpdir):
|
||||
'''Ensure that we can cache multiple requests'''
|
||||
urls = [
|
||||
@@ -65,6 +72,7 @@ def test_multiple_requests(scheme, tmpdir):
|
||||
assert len(cass) == index + 1
|
||||
assert cass.play_count == index + 1
|
||||
|
||||
|
||||
def test_get_data(scheme, tmpdir):
|
||||
'''Ensure that it works with query data'''
|
||||
data = urlencode({'some': 1, 'data': 'here'})
|
||||
@@ -72,13 +80,14 @@ def test_get_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url).read()
|
||||
res1 = urllib2.urlopen(url).read()
|
||||
res2 = urllib2.urlopen(url).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert len(cass) == 1
|
||||
assert cass.play_count == 1
|
||||
|
||||
|
||||
def test_post_data(scheme, tmpdir):
|
||||
'''Ensure that it works when posting data'''
|
||||
data = urlencode({'some': 1, 'data': 'here'})
|
||||
@@ -86,12 +95,13 @@ def test_post_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res2 = urllib2.urlopen(url, data).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_post_unicode_data(scheme, tmpdir):
|
||||
'''Ensure that it works when posting unicode data'''
|
||||
data = urlencode({'snowman': u'☃'.encode('utf-8')})
|
||||
@@ -99,12 +109,13 @@ def test_post_unicode_data(scheme, tmpdir):
|
||||
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
|
||||
# Ensure that this is empty to begin with
|
||||
assert_cassette_empty(cass)
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res1 = urllib2.urlopen(url, data).read()
|
||||
res2 = urllib2.urlopen(url, data).read()
|
||||
assert res1 == res2
|
||||
# Ensure that we've now cached a single response
|
||||
assert_cassette_has_one_response(cass)
|
||||
|
||||
|
||||
def test_cross_scheme(tmpdir):
|
||||
'''Ensure that requests between schemes are treated separately'''
|
||||
# First fetch a url under https, and then again under https and then
|
||||
|
||||
@@ -3,6 +3,7 @@ requests = pytest.importorskip("requests")
|
||||
|
||||
import vcr
|
||||
|
||||
|
||||
def test_domain_redirect():
|
||||
'''Ensure that redirects across domains are considered unique'''
|
||||
# In this example, seomoz.org redirects to moz.com, and if those
|
||||
@@ -14,4 +15,3 @@ def test_domain_redirect():
|
||||
# Ensure that we've now served two responses. One for the original
|
||||
# redirect, and a second for the actual fetch
|
||||
assert len(cass) == 2
|
||||
|
||||
|
||||
@@ -2,24 +2,28 @@ import pytest
|
||||
import yaml
|
||||
from vcr.cassette import Cassette
|
||||
|
||||
|
||||
def test_cassette_load(tmpdir):
|
||||
a_file = tmpdir.join('test_cassette.yml')
|
||||
a_file.write(yaml.dump([
|
||||
{'request':'foo', 'response':'bar'}
|
||||
{'request': 'foo', 'response': 'bar'}
|
||||
]))
|
||||
a_cassette = Cassette.load(str(a_file))
|
||||
assert len(a_cassette) == 1
|
||||
|
||||
|
||||
def test_cassette_not_played():
|
||||
a = Cassette('test')
|
||||
assert not a.play_count
|
||||
|
||||
|
||||
def test_cassette_played():
|
||||
a = Cassette('test')
|
||||
a.mark_played('foo')
|
||||
a.mark_played('foo')
|
||||
assert a.play_count == 2
|
||||
|
||||
|
||||
def test_cassette_play_counter():
|
||||
a = Cassette('test')
|
||||
a.mark_played('foo')
|
||||
@@ -27,28 +31,33 @@ def test_cassette_play_counter():
|
||||
assert a.play_counts['foo'] == 1
|
||||
assert a.play_counts['bar'] == 1
|
||||
|
||||
|
||||
def test_cassette_append():
|
||||
a = Cassette('test')
|
||||
a.append('foo', 'bar')
|
||||
assert a.requests == ['foo']
|
||||
assert a.responses == ['bar']
|
||||
|
||||
|
||||
def test_cassette_len():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo2','bar2')
|
||||
a.append('foo', 'bar')
|
||||
a.append('foo2', 'bar2')
|
||||
assert len(a) == 2
|
||||
|
||||
|
||||
def test_cassette_contains():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo', 'bar')
|
||||
assert 'foo' in a
|
||||
|
||||
|
||||
def test_cassette_response_of():
|
||||
a = Cassette('test')
|
||||
a.append('foo','bar')
|
||||
a.append('foo', 'bar')
|
||||
assert a.response_of('foo') == 'bar'
|
||||
|
||||
|
||||
def test_cassette_get_missing_response():
|
||||
a = Cassette('test')
|
||||
with pytest.raises(KeyError):
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from vcr.request import Request
|
||||
|
||||
|
||||
def test_url():
|
||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
||||
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||
assert req.url == 'http://www.google.com/'
|
||||
|
||||
|
||||
def test_str():
|
||||
req = Request('http','www.google.com',80,'GET','/','',{})
|
||||
req = Request('http', 'www.google.com', 80, 'GET', '/', '', {})
|
||||
str(req) == '<Request (GET) http://www.google.com>'
|
||||
|
||||
@@ -2,6 +2,7 @@ from config import VCR
|
||||
|
||||
default_vcr = VCR()
|
||||
|
||||
|
||||
# Also, make a 'load' function available
|
||||
def use_cassette(path, **kwargs):
|
||||
return default_vcr.use_cassette(path, **kwargs)
|
||||
|
||||
@@ -11,6 +11,7 @@ from .patch import install, reset
|
||||
from .persist import load_cassette, save_cassette
|
||||
from .serializers import yamlserializer
|
||||
|
||||
|
||||
class Cassette(object):
|
||||
'''A container for recorded requests and responses'''
|
||||
@classmethod
|
||||
@@ -59,12 +60,19 @@ class Cassette(object):
|
||||
|
||||
def _save(self, force=False):
|
||||
if force or self.dirty:
|
||||
save_cassette(self._path, self._as_dict(), serializer=self._serializer)
|
||||
save_cassette(
|
||||
self._path,
|
||||
self._as_dict(),
|
||||
serializer=self._serializer
|
||||
)
|
||||
self.dirty = False
|
||||
|
||||
def _load(self):
|
||||
try:
|
||||
requests, responses = load_cassette(self._path, serializer=self._serializer)
|
||||
requests, responses = load_cassette(
|
||||
self._path,
|
||||
serializer=self._serializer
|
||||
)
|
||||
for request, response in zip(requests, responses):
|
||||
self.append(request, response)
|
||||
self.dirty = False
|
||||
@@ -72,7 +80,9 @@ class Cassette(object):
|
||||
pass
|
||||
|
||||
def __str__(self):
|
||||
return "<Cassette containing {0} recorded response(s)>".format(len(self))
|
||||
return "<Cassette containing {0} recorded response(s)>".format(
|
||||
len(self)
|
||||
)
|
||||
|
||||
def __len__(self):
|
||||
'''Return the number of request,response pairs stored in here'''
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
from .cassette import Cassette
|
||||
from .serializers import yamlserializer, jsonserializer
|
||||
|
||||
|
||||
class VCR(object):
|
||||
def __init__(self, serializer='yaml', cassette_library_dir=None):
|
||||
self.serializer = serializer
|
||||
@@ -15,13 +16,18 @@ class VCR(object):
|
||||
try:
|
||||
serializer = self.serializers[serializer_name]
|
||||
except KeyError:
|
||||
print "Serializer {0} doesn't exist or isn't registered".format(serializer_name)
|
||||
print "Serializer {0} doesn't exist or isn't registered".format(
|
||||
serializer_name
|
||||
)
|
||||
raise KeyError
|
||||
return serializer
|
||||
|
||||
def use_cassette(self, path, **kwargs):
|
||||
serializer_name = kwargs.get('serializer', self.serializer)
|
||||
cassette_library_dir = kwargs.get('cassette_library_dir', self.cassette_library_dir)
|
||||
cassette_library_dir = kwargs.get(
|
||||
'cassette_library_dir',
|
||||
self.cassette_library_dir
|
||||
)
|
||||
|
||||
if cassette_library_dir:
|
||||
path = os.path.join(cassette_library_dir, path)
|
||||
@@ -34,4 +40,3 @@ class VCR(object):
|
||||
|
||||
def register_serializer(self, name, serializer):
|
||||
self.serializers[name] = serializer
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ def reset():
|
||||
'''Undo all the patching'''
|
||||
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
|
||||
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
|
||||
_HTTPSConnection
|
||||
_HTTPSConnection
|
||||
try:
|
||||
import requests.packages.urllib3.connectionpool as cpool
|
||||
cpool.VerifiedHTTPSConnection = _VerifiedHTTPSConnection
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from .persisters.filesystem import FilesystemPersister
|
||||
|
||||
|
||||
def load_cassette(cassette_path, serializer):
|
||||
with open(cassette_path) as f:
|
||||
return serializer.deserialize(f.read())
|
||||
|
||||
|
||||
def save_cassette(cassette_path, cassette_dict, serializer):
|
||||
data = serializer.serialize(cassette_dict)
|
||||
FilesystemPersister.write(cassette_path, data)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
class FilesystemPersister(object):
|
||||
@classmethod
|
||||
def _secure_write(cls, path, contents):
|
||||
|
||||
@@ -15,7 +15,14 @@ class Request(object):
|
||||
return "{0}://{1}{2}".format(self.protocol, self.host, self.path)
|
||||
|
||||
def __key(self):
|
||||
return (self.host, self.port, self.method, self.path, self.body, self.headers)
|
||||
return (
|
||||
self.host,
|
||||
self.port,
|
||||
self.method,
|
||||
self.path,
|
||||
self.body,
|
||||
self.headers
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.__key())
|
||||
|
||||
@@ -4,14 +4,17 @@ try:
|
||||
except ImportError:
|
||||
import json
|
||||
|
||||
|
||||
def _json_default(obj):
|
||||
if isinstance(obj, frozenset):
|
||||
return dict(obj)
|
||||
return obj
|
||||
|
||||
|
||||
def _fix_response_unicode(d):
|
||||
d['body']['string'] = d['body']['string'].encode('utf-8')
|
||||
return d
|
||||
d['body']['string'] = d['body']['string'].encode('utf-8')
|
||||
return d
|
||||
|
||||
|
||||
def deserialize(cassette_string):
|
||||
data = json.loads(cassette_string)
|
||||
@@ -19,9 +22,13 @@ def deserialize(cassette_string):
|
||||
responses = [_fix_response_unicode(r['response']) for r in data]
|
||||
return requests, responses
|
||||
|
||||
|
||||
def serialize(cassette_dict):
|
||||
data = ([{
|
||||
'request': request._to_dict(),
|
||||
'response': response,
|
||||
} for request, response in zip(cassette_dict['requests'], cassette_dict['responses'])])
|
||||
} for request, response in zip(
|
||||
cassette_dict['requests'],
|
||||
cassette_dict['responses']
|
||||
)])
|
||||
return json.dumps(data, indent=4, default=_json_default)
|
||||
|
||||
@@ -18,5 +18,8 @@ def serialize(cassette_dict):
|
||||
data = ([{
|
||||
'request': request,
|
||||
'response': response,
|
||||
} for request, response in zip(cassette_dict['requests'], cassette_dict['responses'])])
|
||||
} for request, response in zip(
|
||||
cassette_dict['requests'],
|
||||
cassette_dict['responses']
|
||||
)])
|
||||
return yaml.dump(data, Dumper=Dumper)
|
||||
|
||||
@@ -18,7 +18,7 @@ class VCRHTTPResponse(object):
|
||||
self._content = StringIO(self.recorded_response['body']['string'])
|
||||
|
||||
# We are skipping the header parsing (they have already been parsed
|
||||
# at this point) and directly adding the headers to the header
|
||||
# at this point) and directly adding the headers to the header
|
||||
# container, so just pass an empty StringIO.
|
||||
self.msg = HTTPMessage(StringIO(''))
|
||||
|
||||
@@ -55,13 +55,13 @@ class VCRConnectionMixin:
|
||||
def request(self, method, url, body=None, headers=None):
|
||||
'''Persist the request metadata in self._vcr_request'''
|
||||
self._vcr_request = Request(
|
||||
protocol = self._protocol,
|
||||
host = self.host,
|
||||
port = self.port,
|
||||
method = method,
|
||||
path = url,
|
||||
body = body,
|
||||
headers = headers or {}
|
||||
protocol=self._protocol,
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
method=method,
|
||||
path=url,
|
||||
body=body,
|
||||
headers=headers or {}
|
||||
)
|
||||
|
||||
# Check if we have a cassette set, and if we have a response saved.
|
||||
@@ -84,11 +84,14 @@ class VCRConnectionMixin:
|
||||
self.cassette.mark_played(self._vcr_request)
|
||||
return VCRHTTPResponse(response)
|
||||
else:
|
||||
# Otherwise, we made an actual request, and should return the response
|
||||
# we got from the actual connection
|
||||
# Otherwise, we made an actual request, and should return the
|
||||
# response we got from the actual connection
|
||||
response = HTTPConnection.getresponse(self)
|
||||
response = {
|
||||
'status': {'code': response.status, 'message': response.reason},
|
||||
'status': {
|
||||
'code': response.status,
|
||||
'message': response.reason
|
||||
},
|
||||
'headers': dict(response.getheaders()),
|
||||
'body': {'string': response.read()},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user