1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-10 01:25:34 +00:00

Merge pull request #234 from tobiowo/decode-compressed-response

Decode compressed response option
This commit is contained in:
Ivan 'Goat' Malison
2015-12-31 10:18:34 -08:00
5 changed files with 171 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ from six.moves.urllib.parse import urlencode
from six.moves.urllib.error import HTTPError
import vcr
import json
from assertions import assert_cassette_has_one_response, assert_is_json
def _request_with_auth(url, username, password):
@@ -93,3 +94,39 @@ def test_filter_callback(tmpdir):
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
urlopen(url)
assert len(cass) == 0
def test_decompress_gzip(tmpdir):
url = 'http://httpbin.org/gzip'
request = Request(url, headers={'Accept-Encoding': ['gzip, deflate']})
cass_file = str(tmpdir.join('gzip_response.yaml'))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
def test_decompress_deflate(tmpdir):
url = 'http://httpbin.org/deflate'
request = Request(url, headers={'Accept-Encoding': ['gzip, deflate']})
cass_file = str(tmpdir.join('deflate_response.yaml'))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
def test_decompress_regular(tmpdir):
"""Test that it doesn't try to decompress content that isn't compressed"""
url = 'http://httpbin.org/get'
cass_file = str(tmpdir.join('noncompressed_response.yaml'))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(url)
with vcr.use_cassette(cass_file) as cass:
resp = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(resp)

View File

@@ -1,11 +1,15 @@
from six import BytesIO
from vcr.filters import (
remove_headers, replace_headers,
remove_query_parameters, replace_query_parameters,
remove_post_data_parameters, replace_post_data_parameters,
decode_response
)
from vcr.compat import mock
from vcr.request import Request
import gzip
import json
import zlib
def test_replace_headers():
@@ -200,3 +204,71 @@ def test_remove_all_json_post_data_parameters():
request.headers['Content-Type'] = 'application/json'
replace_post_data_parameters(request, [('id', None), ('foo', None)])
assert request.body == b'{}'
def test_decode_response_uncompressed():
recorded_response = {
"status": {
"message": "OK",
"code": 200
},
"headers": {
"content-length": ["10806"],
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
"content-type": ["text/html; charset=utf-8"],
},
"body": {
"string": b""
}
}
assert decode_response(recorded_response) == recorded_response
def test_decode_response_deflate():
body = b'deflate message'
deflate_response = {
'body': {'string': zlib.compress(body)},
'headers': {
'access-control-allow-credentials': ['true'],
'access-control-allow-origin': ['*'],
'connection': ['keep-alive'],
'content-encoding': ['deflate'],
'content-length': ['177'],
'content-type': ['application/json'],
'date': ['Wed, 02 Dec 2015 19:44:32 GMT'],
'server': ['nginx']
},
'status': {'code': 200, 'message': 'OK'}
}
decoded_response = decode_response(deflate_response)
assert decoded_response['body']['string'] == body
assert decoded_response['headers']['content-length'] == [str(len(body))]
def test_decode_response_gzip():
body = b'gzip message'
buf = BytesIO()
f = gzip.GzipFile('a', fileobj=buf, mode='wb')
f.write(body)
f.close()
compressed_body = buf.getvalue()
buf.close()
gzip_response = {
'body': {'string': compressed_body},
'headers': {
'access-control-allow-credentials': ['true'],
'access-control-allow-origin': ['*'],
'connection': ['keep-alive'],
'content-encoding': ['gzip'],
'content-length': ['177'],
'content-type': ['application/json'],
'date': ['Wed, 02 Dec 2015 19:44:32 GMT'],
'server': ['nginx']
},
'status': {'code': 200, 'message': 'OK'}
}
decoded_response = decode_response(gzip_response)
assert decoded_response['body']['string'] == body
assert decoded_response['headers']['content-length'] == [str(len(body))]