1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Add Python 2.3 support

This commit also adds the 'six' dependency
This commit is contained in:
Åsmund Grammeltvedt
2014-02-02 21:53:12 +01:00
committed by Kevin McCarthy
parent 2385176084
commit a73da71159
25 changed files with 296 additions and 122 deletions

View File

@@ -8,6 +8,7 @@ env:
python:
- 2.6
- 2.7
- 3.3
- pypy
install:
- pip install PyYAML pytest --use-mirrors

View File

@@ -18,7 +18,7 @@ to do is delete your existing cassette files, and run your tests again.
All of the mocked responses will be updated with the new API.
## Compatibility Notes
This should work with Python 2.6 and 2.7, and [pypy](http://pypy.org).
VCR.py officially supports Python 2.6 and 2.7, 3.3, and [pypy](http://pypy.org).
Currently I've only tested this with urllib2, urllib3, and requests. It's known to *NOT WORK* with urllib.

View File

@@ -37,7 +37,7 @@ setup(name='vcrpy',
'vcr.compat': 'vcr/compat',
'vcr.persisters': 'vcr/persisters',
},
install_requires=['PyYAML','contextdecorator'],
install_requires=['PyYAML','contextdecorator','six'],
license='MIT',
tests_require=['pytest','mock'],
cmdclass={'test': PyTest},
@@ -46,6 +46,7 @@ setup(name='vcrpy',
'Environment :: Console',
'Intended Audience :: Developers',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Topic :: Software Development :: Testing',
'Topic :: Internet :: WWW/HTTP',
'License :: OSI Approved :: MIT License',

View File

@@ -13,7 +13,7 @@ def assert_cassette_has_one_response(cass):
def assert_is_json(a_string):
try:
json.loads(a_string)
json.loads(a_string.decode('utf-8'))
except Exception:
assert False
assert True

View File

@@ -3,10 +3,10 @@
# External imports
import os
import urllib2
# Internal imports
import vcr
from vcr._compat import urlopen
def test_nonexistent_directory(tmpdir):
@@ -16,7 +16,7 @@ def test_nonexistent_directory(tmpdir):
# Run VCR to create dir and cassette file
with vcr.use_cassette(str(tmpdir.join('nonexistent', 'cassette.yml'))):
urllib2.urlopen('http://httpbin.org/').read()
urlopen('http://httpbin.org/').read()
# This should have made the file and the directory
assert os.path.exists(str(tmpdir.join('nonexistent', 'cassette.yml')))
@@ -25,11 +25,11 @@ def test_nonexistent_directory(tmpdir):
def test_unpatch(tmpdir):
'''Ensure that our cassette gets unpatched when we're done'''
with vcr.use_cassette(str(tmpdir.join('unpatch.yaml'))) as cass:
urllib2.urlopen('http://httpbin.org/').read()
urlopen('http://httpbin.org/').read()
# Make the same request, and assert that we haven't served any more
# requests out of cache
urllib2.urlopen('http://httpbin.org/').read()
urlopen('http://httpbin.org/').read()
assert cass.play_count == 0
@@ -38,10 +38,10 @@ def test_basic_use(tmpdir):
Copied from the docs
'''
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
response = urllib2.urlopen(
response = urlopen(
'http://www.iana.org/domains/reserved'
).read()
assert 'Example domains' in response
assert b'Example domains' in response
def test_basic_json_use(tmpdir):
@@ -50,8 +50,8 @@ def test_basic_json_use(tmpdir):
'''
test_fixture = 'fixtures/vcr_cassettes/synopsis.json'
with vcr.use_cassette(test_fixture, serializer='json'):
response = urllib2.urlopen('http://httpbin.org/').read()
assert 'difficult sometimes' in response
response = urlopen('http://httpbin.org/').read()
assert b'difficult sometimes' in response
def test_patched_content(tmpdir):
@@ -60,16 +60,16 @@ def test_patched_content(tmpdir):
request
'''
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
assert cass.play_count == 0
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
response2 = urllib2.urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
assert cass.play_count == 1
cass._save(force=True)
with vcr.use_cassette(str(tmpdir.join('synopsis.yaml'))) as cass:
response3 = urllib2.urlopen('http://httpbin.org/').read()
response3 = urlopen('http://httpbin.org/').read()
assert cass.play_count == 1
assert response == response2
@@ -85,16 +85,16 @@ def test_patched_content_json(tmpdir):
testfile = str(tmpdir.join('synopsis.json'))
with vcr.use_cassette(testfile) as cass:
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
assert cass.play_count == 0
with vcr.use_cassette(testfile) as cass:
response2 = urllib2.urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
assert cass.play_count == 1
cass._save(force=True)
with vcr.use_cassette(testfile) as cass:
response3 = urllib2.urlopen('http://httpbin.org/').read()
response3 = urlopen('http://httpbin.org/').read()
assert cass.play_count == 1
assert response == response2

View File

@@ -1,8 +1,8 @@
import os
import json
import urllib2
import pytest
import vcr
from vcr._compat import urlopen
def test_set_serializer_default_config(tmpdir):
@@ -10,7 +10,7 @@ def test_set_serializer_default_config(tmpdir):
with my_vcr.use_cassette(str(tmpdir.join('test.json'))):
assert my_vcr.serializer == 'json'
urllib2.urlopen('http://httpbin.org/get')
urlopen('http://httpbin.org/get')
with open(str(tmpdir.join('test.json'))) as f:
assert json.loads(f.read())
@@ -20,7 +20,7 @@ def test_default_set_cassette_library_dir(tmpdir):
my_vcr = vcr.VCR(cassette_library_dir=str(tmpdir.join('subdir')))
with my_vcr.use_cassette('test.json'):
urllib2.urlopen('http://httpbin.org/get')
urlopen('http://httpbin.org/get')
assert os.path.exists(str(tmpdir.join('subdir').join('test.json')))
@@ -31,7 +31,7 @@ def test_override_set_cassette_library_dir(tmpdir):
cld = str(tmpdir.join('subdir2'))
with my_vcr.use_cassette('test.json', cassette_library_dir=cld):
urllib2.urlopen('http://httpbin.org/get')
urlopen('http://httpbin.org/get')
assert os.path.exists(str(tmpdir.join('subdir2').join('test.json')))
assert not os.path.exists(str(tmpdir.join('subdir').join('test.json')))
@@ -41,10 +41,10 @@ def test_override_match_on(tmpdir):
my_vcr = vcr.VCR(match_on=['method'])
with my_vcr.use_cassette(str(tmpdir.join('test.json'))):
urllib2.urlopen('http://httpbin.org/')
urlopen('http://httpbin.org/')
with my_vcr.use_cassette(str(tmpdir.join('test.json'))) as cass:
urllib2.urlopen('http://httpbin.org/get')
urlopen('http://httpbin.org/get')
assert len(cass) == 1
assert cass.play_count == 1

View File

@@ -3,11 +3,11 @@
# External imports
import os
import urllib2
import time
# Internal imports
import vcr
from vcr._compat import urlopen
def test_disk_saver_nowrite(tmpdir):
@@ -17,12 +17,12 @@ def test_disk_saver_nowrite(tmpdir):
'''
fname = str(tmpdir.join('synopsis.yaml'))
with vcr.use_cassette(fname) as cass:
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
urlopen('http://www.iana.org/domains/reserved').read()
assert cass.play_count == 0
last_mod = os.path.getmtime(fname)
with vcr.use_cassette(fname) as cass:
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
urlopen('http://www.iana.org/domains/reserved').read()
assert cass.play_count == 1
assert cass.dirty is False
last_mod2 = os.path.getmtime(fname)
@@ -37,7 +37,7 @@ def test_disk_saver_write(tmpdir):
'''
fname = str(tmpdir.join('synopsis.yaml'))
with vcr.use_cassette(fname) as cass:
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
urlopen('http://www.iana.org/domains/reserved').read()
assert cass.play_count == 0
last_mod = os.path.getmtime(fname)
@@ -46,8 +46,8 @@ def test_disk_saver_write(tmpdir):
time.sleep(1)
with vcr.use_cassette(fname, record_mode='any') as cass:
urllib2.urlopen('http://www.iana.org/domains/reserved').read()
urllib2.urlopen('http://httpbin.org/').read()
urlopen('http://www.iana.org/domains/reserved').read()
urlopen('http://httpbin.org/').read()
assert cass.play_count == 1
assert cass.dirty
last_mod2 = os.path.getmtime(fname)

View File

@@ -1,6 +1,6 @@
import pytest
from urllib2 import urlopen
import vcr
from vcr._compat import urlopen
def test_making_extra_request_raises_exception(tmpdir):

View File

@@ -1,46 +1,46 @@
import os
import urllib2
import pytest
import vcr
from vcr._compat import urlopen
def test_once_record_mode(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="once"):
# cassette file doesn't exist, so create.
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
with vcr.use_cassette(testfile, record_mode="once") as cass:
# make the same request again
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
# the first time, it's played from the cassette.
# but, try to access something else from the same cassette, and an
# exception is raised.
with pytest.raises(Exception):
response = urllib2.urlopen('http://httpbin.org/get').read()
response = urlopen('http://httpbin.org/get').read()
def test_once_record_mode_two_times(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="once"):
# get two of the same file
response1 = urllib2.urlopen('http://httpbin.org/').read()
response2 = urllib2.urlopen('http://httpbin.org/').read()
response1 = urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
with vcr.use_cassette(testfile, record_mode="once") as cass:
# do it again
response = urllib2.urlopen('http://httpbin.org/').read()
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
def test_once_mode_three_times(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="once"):
# get three of the same file
response1 = urllib2.urlopen('http://httpbin.org/').read()
response2 = urllib2.urlopen('http://httpbin.org/').read()
response2 = urllib2.urlopen('http://httpbin.org/').read()
response1 = urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
response2 = urlopen('http://httpbin.org/').read()
def test_new_episodes_record_mode(tmpdir):
@@ -48,15 +48,15 @@ def test_new_episodes_record_mode(tmpdir):
with vcr.use_cassette(testfile, record_mode="new_episodes"):
# cassette file doesn't exist, so create.
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
with vcr.use_cassette(testfile, record_mode="new_episodes") as cass:
# make the same request again
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
# in the "new_episodes" record mode, we can add more requests to
# a cassette without repurcussions.
response = urllib2.urlopen('http://httpbin.org/get').read()
response = urlopen('http://httpbin.org/get').read()
# the first interaction was not re-recorded, but the second was added
assert cass.play_count == 1
@@ -67,15 +67,15 @@ def test_all_record_mode(tmpdir):
with vcr.use_cassette(testfile, record_mode="all"):
# cassette file doesn't exist, so create.
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
with vcr.use_cassette(testfile, record_mode="all") as cass:
# make the same request again
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
# in the "all" record mode, we can add more requests to
# a cassette without repurcussions.
response = urllib2.urlopen('http://httpbin.org/get').read()
response = urlopen('http://httpbin.org/get').read()
# The cassette was never actually played, even though it existed.
# that's because, in "all" mode, the requests all go directly to
@@ -89,7 +89,7 @@ def test_none_record_mode(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="none"):
with pytest.raises(Exception):
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
def test_none_record_mode_with_existing_cassette(tmpdir):
@@ -97,12 +97,12 @@ def test_none_record_mode_with_existing_cassette(tmpdir):
testfile = str(tmpdir.join('recordmode.yml'))
with vcr.use_cassette(testfile, record_mode="all"):
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
# play from cassette file
with vcr.use_cassette(testfile, record_mode="none") as cass:
response = urllib2.urlopen('http://httpbin.org/').read()
response = urlopen('http://httpbin.org/').read()
assert cass.play_count == 1
# but if I try to hit the net, raise an exception.
with pytest.raises(Exception):
response = urllib2.urlopen('http://httpbin.org/get').read()
response = urlopen('http://httpbin.org/get').read()

View File

@@ -1,5 +1,5 @@
import urllib2
import vcr
from vcr._compat import urlopen
def true_matcher(r1, r2):
@@ -16,13 +16,13 @@ def test_registered_serializer_true_matcher(tmpdir):
testfile = str(tmpdir.join('test.yml'))
with my_vcr.use_cassette(testfile, match_on=['true']) as cass:
# These 2 different urls are stored as the same request
urllib2.urlopen('http://httpbin.org/')
urllib2.urlopen('https://httpbin.org/get')
urlopen('http://httpbin.org/')
urlopen('https://httpbin.org/get')
with my_vcr.use_cassette(testfile, match_on=['true']) as cass:
# I can get the response twice even though I only asked for it once
urllib2.urlopen('http://httpbin.org/get')
urllib2.urlopen('https://httpbin.org/get')
urlopen('http://httpbin.org/get')
urlopen('https://httpbin.org/get')
def test_registered_serializer_false_matcher(tmpdir):
@@ -31,6 +31,6 @@ def test_registered_serializer_false_matcher(tmpdir):
testfile = str(tmpdir.join('test.yml'))
with my_vcr.use_cassette(testfile, match_on=['false']) as cass:
# These 2 different urls are stored as different requests
urllib2.urlopen('http://httpbin.org/')
urllib2.urlopen('https://httpbin.org/get')
urlopen('http://httpbin.org/')
urlopen('https://httpbin.org/get')
assert len(cass) == 2

View File

@@ -1,4 +1,3 @@
import urllib2
import vcr

View File

@@ -1,11 +1,11 @@
import urllib2
import vcr
from vcr._compat import urlopen
def test_recorded_request_url_with_redirected_request(tmpdir):
with vcr.use_cassette(str(tmpdir.join('test.yml'))) as cass:
assert len(cass) == 0
urllib2.urlopen('http://httpbin.org/redirect/3')
urlopen('http://httpbin.org/redirect/3')
assert cass.requests[0].url == 'http://httpbin.org/redirect/3'
assert cass.requests[3].url == 'http://httpbin.org/get'
assert len(cass) == 4

View File

@@ -3,12 +3,12 @@
# External imports
import os
import urllib2
from urllib import urlencode
import pytest
# Internal imports
import vcr
from vcr._compat import urlopen, urlencode
from assertions import assert_cassette_empty, assert_cassette_has_one_response
@@ -25,30 +25,30 @@ def test_response_code(scheme, tmpdir):
'''Ensure we can read a response code from a fetch'''
url = scheme + '://httpbin.org/'
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))) as cass:
code = urllib2.urlopen(url).getcode()
code = urlopen(url).getcode()
with vcr.use_cassette(str(tmpdir.join('atts.yaml'))) as cass:
assert code == urllib2.urlopen(url).getcode()
assert code == urlopen(url).getcode()
def test_random_body(scheme, tmpdir):
'''Ensure we can read the content, and that it's served from cache'''
url = scheme + '://httpbin.org/bytes/1024'
with vcr.use_cassette(str(tmpdir.join('body.yaml'))) as cass:
body = urllib2.urlopen(url).read()
body = urlopen(url).read()
with vcr.use_cassette(str(tmpdir.join('body.yaml'))) as cass:
assert body == urllib2.urlopen(url).read()
assert body == urlopen(url).read()
def test_response_headers(scheme, tmpdir):
'''Ensure we can get information from the response'''
url = scheme + '://httpbin.org/'
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
open1 = urllib2.urlopen(url).info().items()
open1 = urlopen(url).info().items()
with vcr.use_cassette(str(tmpdir.join('headers.yaml'))) as cass:
open2 = urllib2.urlopen(url).info().items()
open2 = urlopen(url).info().items()
assert open1 == open2
@@ -61,7 +61,7 @@ def test_multiple_requests(scheme, tmpdir):
scheme + '://httpbin.org/bytes/1024'
]
with vcr.use_cassette(str(tmpdir.join('multiple.yaml'))) as cass:
map(urllib2.urlopen, urls)
[urlopen(url) for url in urls]
assert len(cass) == len(urls)
@@ -70,23 +70,23 @@ def test_get_data(scheme, tmpdir):
data = urlencode({'some': 1, 'data': 'here'})
url = scheme + '://httpbin.org/get?' + data
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
res1 = urllib2.urlopen(url).read()
res1 = urlopen(url).read()
with vcr.use_cassette(str(tmpdir.join('get_data.yaml'))) as cass:
res2 = urllib2.urlopen(url).read()
res2 = urlopen(url).read()
assert res1 == res2
def test_post_data(scheme, tmpdir):
'''Ensure that it works when posting data'''
data = urlencode({'some': 1, 'data': 'here'})
data = urlencode({'some': 1, 'data': 'here'}).encode('utf-8')
url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
res1 = urllib2.urlopen(url, data).read()
res1 = urlopen(url, data).read()
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
res2 = urllib2.urlopen(url, data).read()
res2 = urlopen(url, data).read()
assert res1 == res2
assert_cassette_has_one_response(cass)
@@ -94,12 +94,12 @@ def test_post_data(scheme, tmpdir):
def test_post_unicode_data(scheme, tmpdir):
'''Ensure that it works when posting unicode data'''
data = urlencode({'snowman': u''.encode('utf-8')})
data = urlencode({'snowman': u''.encode('utf-8')}).encode('utf-8')
url = scheme + '://httpbin.org/post'
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
res1 = urllib2.urlopen(url, data).read()
res1 = urlopen(url, data).read()
with vcr.use_cassette(str(tmpdir.join('post_data.yaml'))) as cass:
res2 = urllib2.urlopen(url, data).read()
res2 = urlopen(url, data).read()
assert res1 == res2
assert_cassette_has_one_response(cass)
@@ -110,8 +110,8 @@ def test_cross_scheme(tmpdir):
# ensure that we haven't served anything out of cache, and we have two
# requests / response pairs in the cassette
with vcr.use_cassette(str(tmpdir.join('cross_scheme.yaml'))) as cass:
urllib2.urlopen('https://httpbin.org/')
urllib2.urlopen('http://httpbin.org/')
urlopen('https://httpbin.org/')
urlopen('http://httpbin.org/')
assert len(cass) == 2
assert cass.play_count == 0
@@ -121,10 +121,10 @@ def test_decorator(scheme, tmpdir):
@vcr.use_cassette(str(tmpdir.join('atts.yaml')))
def inner1():
return urllib2.urlopen(url).getcode()
return urlopen(url).getcode()
@vcr.use_cassette(str(tmpdir.join('atts.yaml')))
def inner2():
return urllib2.urlopen(url).getcode()
return urlopen(url).getcode()
assert inner1() == inner2()

View File

@@ -3,7 +3,10 @@ requests = pytest.importorskip("requests")
import vcr
import httplib
try:
import httplib
except ImportError:
import http.client as httplib
def test_domain_redirect():

18
tox.ini
View File

@@ -4,7 +4,7 @@
# and then run "tox" from this directory.
[tox]
envlist = py26, py27, pypy, py26requests, py27requests, pypyrequests, py26oldrequests, py27oldrequests, pypyoldrequests, py26httplib2, py27httplib2, pypyhttplib2
envlist = py26, py27, py33, pypy, py26requests, py27requests, pypyrequests, py26oldrequests, py27oldrequests, pypyoldrequests, py26httplib2, py27httplib2, pypyhttplib2
[testenv]
commands =
@@ -38,6 +38,14 @@ deps =
PyYAML
requests==1.2.3
[testenv:py33oldrequests]
basepython = python3.3
deps =
mock
pytest
PyYAML
requests==1.2.3
[testenv:py26requests]
basepython = python2.6
deps =
@@ -54,6 +62,14 @@ deps =
PyYAML
requests
[testenv:py33requests]
basepython = python3.3
deps =
mock
pytest
PyYAML
requests
[testenv:pypyrequests]
basepython = pypy
deps =

View File

@@ -1,4 +1,4 @@
from config import VCR
from .config import VCR
default_vcr = VCR()

9
vcr/_compat.py Normal file
View File

@@ -0,0 +1,9 @@
try:
import httplib
from urllib2 import urlopen
from urllib import urlencode
except ImportError:
import http.client as httplib
from urllib.request import urlopen
from urllib.parse import urlencode

View File

@@ -67,7 +67,7 @@ class Cassette(ContextDecorator):
def play_response(self, request):
'''
Get the response corresponding to a request, but only if it
hasn't been played back before, and mark it as playe.d
hasn't been played back before, and mark it as played
'''
for index, (stored_request, response) in enumerate(self.data):
if requests_match(request, stored_request, self._match_on):

View File

@@ -32,18 +32,21 @@ class VCR(object):
try:
serializer = self.serializers[serializer_name]
except KeyError:
print "Serializer {0} doesn't exist or isn't registered".format(
print("Serializer {0} doesn't exist or isn't registered".format(
serializer_name
)
))
raise KeyError
return serializer
def _get_matchers(self, matcher_names):
matchers = []
try:
matchers = [self.matchers[m] for m in matcher_names]
for m in matcher_names:
matchers.append(self.matchers[m])
except KeyError:
raise KeyError(
"Matcher {0} doesn't exist or isn't registered".format(m)
"Matcher {0} doesn't exist or isn't registered".format(
m)
)
return matchers

View File

@@ -1,7 +1,7 @@
'''Utilities for patching in cassettes'''
import httplib
from .stubs import VCRHTTPConnection, VCRHTTPSConnection
from vcr._compat import httplib
# Save some of the original types for the purposes of unpatching
@@ -40,9 +40,8 @@ def install(cassette):
This replaces the actual HTTPConnection with a VCRHTTPConnection
object which knows how to save to / read from cassettes
"""
httplib.HTTPConnection = httplib.HTTP._connection_class = VCRHTTPConnection
httplib.HTTPSConnection = httplib.HTTPS._connection_class = (
VCRHTTPSConnection)
httplib.HTTPConnection = VCRHTTPConnection
httplib.HTTPSConnection = VCRHTTPSConnection
httplib.HTTPConnection.cassette = cassette
httplib.HTTPSConnection.cassette = cassette
@@ -90,9 +89,8 @@ def install(cassette):
def reset():
'''Undo all the patching'''
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
_HTTPSConnection
httplib.HTTPConnection = _HTTPConnection
httplib.HTTPSConnection = _HTTPSConnection
try:
import requests.packages.urllib3.connectionpool as cpool
# unpatch requests v1.x

66
vcr/serializers/compat.py Normal file
View File

@@ -0,0 +1,66 @@
import six
def convert_to_bytes(resp):
resp = convert_headers_to_bytes(resp)
resp = convert_body_to_bytes(resp)
return resp
def convert_to_unicode(resp):
resp = convert_headers_to_unicode(resp)
resp = convert_body_to_unicode(resp)
return resp
def convert_headers_to_bytes(resp):
try:
resp['headers'] = [h.encode('utf-8') for h in resp['headers']]
except (KeyError, TypeError):
pass
return resp
def convert_headers_to_unicode(resp):
try:
resp['headers'] = [h.decode('utf-8') for h in resp['headers']]
except (KeyError, TypeError):
pass
return resp
def convert_body_to_bytes(resp):
"""
If the request body is a string, encode it to bytes (for python3 support)
By default yaml serializes to utf-8 encoded bytestrings.
When this cassette is loaded by python3, it's automatically decoded
into unicode strings. This makes sure that it stays a bytestring, since
that's what all the internal httplib machinery is expecting.
For more info on py3 yaml: http://pyyaml.org/wiki/PyYAMLDocumentation#Python3support
"""
try:
if not isinstance(resp['body']['string'], six.binary_type):
resp['body']['string'] = resp['body']['string'].encode('utf-8')
except (KeyError, TypeError, UnicodeEncodeError):
# The thing we were converting either wasn't a dictionary or didn't have
# the keys we were expecting. Some of the tests just serialize and
# deserialize a string.
# Also, sometimes the thing actually is binary, so if you can't encode
# it, just give up.
pass
return resp
def convert_body_to_unicode(resp):
"""
If the request body is bytes, decode it to a string (for python3 support)
"""
try:
if not isinstance(resp['body']['string'], six.text_type):
resp['body']['string'] = resp['body']['string'].decode('utf-8')
except (KeyError, TypeError, UnicodeDecodeError):
# The thing we were converting either wasn't a dictionary or didn't have
# the keys we were expecting. Some of the tests just serialize and
# deserialize a string.
# Also, sometimes the thing actually is binary, so if you can't decode
# it, just give up.
pass
return resp

View File

@@ -1,4 +1,5 @@
from vcr.request import Request
from . import compat
try:
import simplejson as json
except ImportError:
@@ -11,22 +12,17 @@ def _json_default(obj):
return obj
def _fix_response_unicode(d):
d['body']['string'] = d['body']['string'].encode('utf-8')
return d
def deserialize(cassette_string):
data = json.loads(cassette_string)
requests = [Request._from_dict(r['request']) for r in data]
responses = [_fix_response_unicode(r['response']) for r in data]
responses = [compat.convert_to_bytes(r['response']) for r in data]
return requests, responses
def serialize(cassette_dict):
data = ([{
'request': request._to_dict(),
'response': response,
'response': compat.convert_to_unicode(response),
} for request, response in zip(
cassette_dict['requests'],
cassette_dict['responses']

View File

@@ -1,4 +1,6 @@
import sys
import yaml
from . import compat
# Use the libYAML versions if possible
try:
@@ -6,11 +8,38 @@ try:
except ImportError:
from yaml import Loader, Dumper
"""
Just a general note on the serialization philosophy here:
I prefer cassettes to be human-readable if possible. Yaml serializes
bytestrings to !!binary, which isn't readable, so I would like to serialize to
strings and from strings, which yaml will encode as utf-8 automatically.
All the internal HTTP stuff expects bytestrings, so this whole serialization
process feels backwards.
Serializing: bytestring -> string (yaml persists to utf-8)
Deserializing: string (yaml converts from utf-8) -> bytestring
"""
def _restore_frozenset():
"""
Restore __builtin__.frozenset for cassettes serialized in python2 but
deserialized in python3 and builtins.frozenset for cassettes serialized
in python3 and deserialized in python2
"""
if '__builtin__' not in sys.modules:
import builtins
sys.modules['__builtin__'] = builtins
if 'builtins' not in sys.modules:
sys.modules['builtins'] = sys.modules['__builtin__']
def deserialize(cassette_string):
_restore_frozenset()
data = yaml.load(cassette_string, Loader=Loader)
requests = [r['request'] for r in data]
responses = [r['response'] for r in data]
responses = [compat.convert_to_bytes(r['response']) for r in data]
return requests, responses
@@ -20,6 +49,6 @@ def serialize(cassette_dict):
'response': response,
} for request, response in zip(
cassette_dict['requests'],
cassette_dict['responses']
[compat.convert_to_unicode(r) for r in cassette_dict['responses']],
)])
return yaml.dump(data, Dumper=Dumper)

View File

@@ -1,10 +1,15 @@
'''Stubs for patching HTTP and HTTPS requests'''
from httplib import HTTPConnection, HTTPSConnection, HTTPMessage, HTTPResponse
from cStringIO import StringIO
try:
import http.client
except ImportError:
pass
import six
from six.moves.http_client import HTTPConnection, HTTPSConnection, HTTPMessage, HTTPResponse
from six import BytesIO
from vcr.request import Request
from vcr.errors import CannotOverwriteExistingCassetteException
from . import compat
def parse_headers_backwards_compat(header_dict):
@@ -14,8 +19,8 @@ def parse_headers_backwards_compat(header_dict):
parses the old dictionary-style headers for
backwards-compatability reasons.
"""
msg = HTTPMessage(StringIO(""))
for key, val in header_dict.iteritems():
msg = HTTPMessage(BytesIO(""))
for key, val in header_dict.items():
msg.addheader(key, val)
msg.headers.append("{0}:{1}".format(key, val))
return msg
@@ -24,12 +29,8 @@ def parse_headers_backwards_compat(header_dict):
def parse_headers(header_list):
if isinstance(header_list, dict):
return parse_headers_backwards_compat(header_list)
headers = "".join(header_list) + "\r\n"
msg = HTTPMessage(StringIO(headers))
msg.fp.seek(0)
msg.readheaders()
return msg
headers = b"".join(header_list) + b"\r\n"
return compat.get_httpmessage(headers)
class VCRHTTPResponse(HTTPResponse):
"""
@@ -38,29 +39,38 @@ class VCRHTTPResponse(HTTPResponse):
def __init__(self, recorded_response):
self.recorded_response = recorded_response
self.reason = recorded_response['status']['message']
self.status = recorded_response['status']['code']
self.status = self.code = recorded_response['status']['code']
self.version = None
self._content = StringIO(self.recorded_response['body']['string'])
self._content = BytesIO(self.recorded_response['body']['string'])
self.closed = False
headers = self.recorded_response['headers']
self.msg = parse_headers(headers)
self.length = self.msg.getheader('content-length') or None
self.length = compat.get_header(self.msg, 'content-length') or None
def read(self, *args, **kwargs):
return self._content.read(*args, **kwargs)
def readline(self, *args, **kwargs):
return self._content.readline(*args, **kwargs)
def close(self):
self.closed = True
return True
def getcode(self):
return self.status
def isclosed(self):
return self.closed
def info(self):
return parse_headers(self.recorded_response['headers'])
def getheaders(self):
headers = parse_headers(self.recorded_response['headers'])
return headers.dict.iteritems()
message = parse_headers(self.recorded_response['headers'])
return compat.get_header_items(message)
def getheader(self, header, default=None):
headers = dict(((k, v) for k, v in self.getheaders()))
@@ -166,7 +176,7 @@ class VCRConnection:
'code': response.status,
'message': response.reason
},
'headers': response.msg.headers,
'headers': compat.get_headers(response),
'body': {'string': response.read()},
}
self.cassette.append(self._vcr_request, response)

43
vcr/stubs/compat.py Normal file
View File

@@ -0,0 +1,43 @@
import six
from six import BytesIO
from six.moves.http_client import HTTPMessage
try:
import http.client
except ImportError:
pass
"""
The python3 http.client api moved some stuff around, so this is an abstraction
layer that tries to cope with this move.
"""
def get_header(message, name):
if six.PY3:
return message.getallmatchingheaders(name)
else:
return message.getheader(name)
def get_header_items(message):
if six.PY3:
return dict(message._headers).items()
else:
return message.dict.items()
def get_headers(response):
if six.PY3:
header_list = response.msg._headers
return [b': '.join((k.encode('utf-8'), v.encode('utf-8'))) + b'\r\n'
for k, v in header_list]
else:
return response.msg.headers
def get_httpmessage(headers):
if six.PY3:
return http.client.parse_headers(BytesIO(headers))
msg = HTTPMessage(BytesIO(headers))
msg.fp.seek(0)
msg.readheaders()
return msg