1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-10 01:25:34 +00:00

Cassettes now store multiple requests

This commit is contained in:
Kevin McCarthy
2012-06-30 15:10:20 -10:00
parent 66e7ce9572
commit 2576878f2e
6 changed files with 59 additions and 17 deletions

View File

@@ -42,8 +42,8 @@ Ruby's VCR are not compatible with VCR.py. The API is similar but VCR.py
doesn't have nearly as many features. doesn't have nearly as many features.
##Known Issues ##Known Issues
* Only works with the first HTTP request made in each use_cassette block. That's because I haven't gotten around to implementing multiple requests yet. This means anything that does a 301 / 302 won't work.
* Probably only works with urllib2. Hey, doesn't [requests](http://docs.python-requests.org/en/latest/index.html) use urllib2? Maybe that works too then. * Probably only works with urllib2. Hey, doesn't [requests](http://docs.python-requests.org/en/latest/index.html) use urllib2? Maybe that works too then.
* Loading urls that do a 301/302 redirect don't work
##Similar libraries in Python ##Similar libraries in Python
Neither of these really implement the API I want, but I have cribbed some code Neither of these really implement the API I want, but I have cribbed some code

17
test.py
View File

@@ -1,6 +1,7 @@
import os import os
import unittest import unittest
import vcr import vcr
from vcr.cassette import Cassette
import urllib2 import urllib2
TEST_CASSETTE_FILE = 'test/test_req.yaml' TEST_CASSETTE_FILE = 'test/test_req.yaml'
@@ -48,7 +49,7 @@ class TestHttps(unittest.TestCase):
os.remove(TEST_CASSETTE_FILE) os.remove(TEST_CASSETTE_FILE)
except OSError: except OSError:
pass pass
def test_response_code(self): def test_response_code(self):
code = urllib2.urlopen('https://api.twitter.com/1/legal/tos.json').getcode() code = urllib2.urlopen('https://api.twitter.com/1/legal/tos.json').getcode()
with vcr.use_cassette(TEST_CASSETTE_FILE): with vcr.use_cassette(TEST_CASSETTE_FILE):
@@ -65,7 +66,19 @@ class TestHttps(unittest.TestCase):
with vcr.use_cassette(TEST_CASSETTE_FILE): with vcr.use_cassette(TEST_CASSETTE_FILE):
headers = urllib2.urlopen('https://api.twitter.com/1/legal/tos.json').info().items() headers = urllib2.urlopen('https://api.twitter.com/1/legal/tos.json').info().items()
self.assertEqual(headers, urllib2.urlopen('https://api.twitter.com/1/legal/tos.json').info().items()) self.assertEqual(headers, urllib2.urlopen('https://api.twitter.com/1/legal/tos.json').info().items())
class TestCassette(unittest.TestCase):
def test_serialize_cassette(self):
c1 = Cassette()
c1.requests = ['a','b','c']
c1.responses = ['d','e','f']
ser = c1.serialize()
c2 = Cassette(ser)
self.assertEqual(c1.requests,c2.requests)
self.assertEqual(c1.responses,c2.responses)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -1,16 +1,27 @@
import yaml
class Cassette(object): class Cassette(object):
def __init__(self): def __init__(self, ser_cassette=None):
self.requests = [] self.requests = []
self.responses = [] self.responses = []
if ser_cassette:
self._unserialize(ser_cassette)
def serialize(self): def serialize(self):
return yaml.dump([{ return ([{
'request': req, 'request': req,
'response': res, 'response': res,
} for req,res in zip(self.requests,self.responses)]) } for req, res in zip(self.requests, self.responses)])
def _unserialize(self, source):
self.requests, self.responses = [r['request'] for r in source], [r['response'] for r in source]
def get_request(self, match):
try:
return self.requests[self.requests.index(match)]
except ValueError:
return None
def get_response(self, match):
try:
return self.responses[self.requests.index(match)]
except ValueError:
return None

View File

@@ -1,10 +1,13 @@
import os import os
import yaml import yaml
from .cassette import Cassette
def load_cassette(cassette_path): def load_cassette(cassette_path):
try: try:
return yaml.load(open(cassette_path)) pc = yaml.load(open(cassette_path))
cassette = Cassette(pc)
return cassette
except IOError: except IOError:
return None return None
@@ -14,4 +17,4 @@ def save_cassette(cassette_path, cassette):
if not os.path.exists(dirname): if not os.path.exists(dirname):
os.makedirs(dirname) os.makedirs(dirname)
with open(cassette_path, 'wc') as cassette_file: with open(cassette_path, 'wc') as cassette_file:
cassette_file.write(cassette.serialize()) cassette_file.write(yaml.dump(cassette.serialize()))

View File

@@ -12,14 +12,15 @@ def install(cassette_path):
httplib.HTTPConnection._vcr_cassette_path = cassette_path httplib.HTTPConnection._vcr_cassette_path = cassette_path
httplib.HTTPSConnection._vcr_cassette_path = cassette_path httplib.HTTPSConnection._vcr_cassette_path = cassette_path
def reset(): def reset():
httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection httplib.HTTPConnection = httplib.HTTP._connection_class = _HTTPConnection
httplib.HTTPSConnection = httplib.HTTPS._connection_class = \ httplib.HTTPSConnection = httplib.HTTPS._connection_class = \
_HTTPSConnection _HTTPSConnection
@contextmanager @contextmanager
def use_cassette(cassette_path): def use_cassette(cassette_path):
install(cassette_path) install(cassette_path)
yield yield
reset() reset()

View File

@@ -28,9 +28,23 @@ class VCRHTTPConnection(HTTPConnection):
def _save_cassette(self): def _save_cassette(self):
save_cassette(self._vcr_cassette_path, self._cassette) save_cassette(self._vcr_cassette_path, self._cassette)
def request(self, method, url, body=None, headers={}): def _load_old_response(self):
old_cassette = load_cassette(self._vcr_cassette_path) old_cassette = load_cassette(self._vcr_cassette_path)
if old_cassette: if old_cassette:
return old_cassette.get_response(self._vcr)
def request(self, method, url, body=None, headers={}):
"""
Persist the request metadata in self._vcr
"""
self._vcr = {
'method': method,
'url': url,
'body': body,
'headers': headers,
}
old_cassette = load_cassette(self._vcr_cassette_path)
if old_cassette and old_cassette.get_request(self._vcr):
return return
self._cassette.requests.append(dict( self._cassette.requests.append(dict(
method=method, method=method,
@@ -41,8 +55,8 @@ class VCRHTTPConnection(HTTPConnection):
return HTTPConnection.request(self, method, url, body=body, headers=headers) return HTTPConnection.request(self, method, url, body=body, headers=headers)
def getresponse(self, buffering=False): def getresponse(self, buffering=False):
old_cassette = load_cassette(self._vcr_cassette_path) old_response = self._load_old_response()
if not old_cassette: if not old_response:
response = HTTPConnection.getresponse(self) response = HTTPConnection.getresponse(self)
self._cassette.responses.append({ self._cassette.responses.append({
'status': {'code': response.status, 'message': response.reason}, 'status': {'code': response.status, 'message': response.reason},
@@ -50,8 +64,8 @@ class VCRHTTPConnection(HTTPConnection):
'body': {'string': response.read()}, 'body': {'string': response.read()},
}) })
self._save_cassette() self._save_cassette()
old_cassette = load_cassette(self._vcr_cassette_path) old_response = self._load_old_response()
return VCRHTTPResponse(old_cassette[0]['response']) return VCRHTTPResponse(old_response)
class VCRHTTPSConnection(VCRHTTPConnection): class VCRHTTPSConnection(VCRHTTPConnection):