mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-09 09:13:23 +00:00
tests / docs for decode_compressed_response
This commit is contained in:
@@ -255,6 +255,21 @@ that of ``before_record``:
|
|||||||
with my_vcr.use_cassette('test.yml'):
|
with my_vcr.use_cassette('test.yml'):
|
||||||
# your http code here
|
# your http code here
|
||||||
|
|
||||||
|
|
||||||
|
Decode compressed response
|
||||||
|
---------------------------
|
||||||
|
|
||||||
|
When the ``decode_compressed_response`` keyword argument of a ``VCR`` object
|
||||||
|
is set to True, VCR will decompress "gzip" and "deflate" response bodies
|
||||||
|
before recording. This ensures that these interactions become readable and
|
||||||
|
editable after being serialized.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
Decompression is done before any other specified `Custom Response Filtering`_.
|
||||||
|
|
||||||
|
This option should be avoided if the actual decompression of response bodies
|
||||||
|
is part of the functionality of the library or app being tested.
|
||||||
|
|
||||||
Ignore requests
|
Ignore requests
|
||||||
---------------
|
---------------
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ from six.moves.urllib.parse import urlencode
|
|||||||
from six.moves.urllib.error import HTTPError
|
from six.moves.urllib.error import HTTPError
|
||||||
import vcr
|
import vcr
|
||||||
import json
|
import json
|
||||||
|
from assertions import assert_cassette_has_one_response, assert_is_json
|
||||||
|
|
||||||
|
|
||||||
def _request_with_auth(url, username, password):
|
def _request_with_auth(url, username, password):
|
||||||
@@ -91,3 +92,37 @@ def test_filter_callback(tmpdir):
|
|||||||
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
|
with my_vcr.use_cassette(cass_file, filter_headers=['authorization']) as cass:
|
||||||
urlopen(url)
|
urlopen(url)
|
||||||
assert len(cass) == 0
|
assert len(cass) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_decompress_gzip(tmpdir):
|
||||||
|
url = 'http://httpbin.org/gzip'
|
||||||
|
cass_file = str(tmpdir.join('gzip_response.yaml'))
|
||||||
|
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||||
|
urlopen(url)
|
||||||
|
with vcr.use_cassette(cass_file) as cass:
|
||||||
|
decoded_response = urlopen(url).read()
|
||||||
|
assert_cassette_has_one_response(cass)
|
||||||
|
assert_is_json(decoded_response)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decompress_deflate(tmpdir):
|
||||||
|
url = 'http://httpbin.org/deflate'
|
||||||
|
cass_file = str(tmpdir.join('deflate_response.yaml'))
|
||||||
|
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||||
|
urlopen(url)
|
||||||
|
with vcr.use_cassette(cass_file) as cass:
|
||||||
|
decoded_response = urlopen(url).read()
|
||||||
|
assert_cassette_has_one_response(cass)
|
||||||
|
assert_is_json(decoded_response)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decompress_regular(tmpdir):
|
||||||
|
"""Test that it doesn't try to decompress content that isn't compressed"""
|
||||||
|
url = 'http://httpbin.org/get'
|
||||||
|
cass_file = str(tmpdir.join('noncompressed_response.yaml'))
|
||||||
|
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||||
|
urlopen(url)
|
||||||
|
with vcr.use_cassette(cass_file) as cass:
|
||||||
|
resp = urlopen(url).read()
|
||||||
|
assert_cassette_has_one_response(cass)
|
||||||
|
assert_is_json(resp)
|
||||||
|
|||||||
@@ -2,10 +2,14 @@ from vcr.filters import (
|
|||||||
remove_headers, replace_headers,
|
remove_headers, replace_headers,
|
||||||
remove_query_parameters, replace_query_parameters,
|
remove_query_parameters, replace_query_parameters,
|
||||||
remove_post_data_parameters, replace_post_data_parameters,
|
remove_post_data_parameters, replace_post_data_parameters,
|
||||||
|
decode_response
|
||||||
)
|
)
|
||||||
from vcr.compat import mock
|
from vcr.compat import mock
|
||||||
from vcr.request import Request
|
from vcr.request import Request
|
||||||
|
import cStringIO
|
||||||
|
import gzip
|
||||||
import json
|
import json
|
||||||
|
import zlib
|
||||||
|
|
||||||
|
|
||||||
def test_replace_headers():
|
def test_replace_headers():
|
||||||
@@ -200,3 +204,68 @@ def test_remove_all_json_post_data_parameters():
|
|||||||
request.headers['Content-Type'] = 'application/json'
|
request.headers['Content-Type'] = 'application/json'
|
||||||
replace_post_data_parameters(request, [('id', None), ('foo', None)])
|
replace_post_data_parameters(request, [('id', None), ('foo', None)])
|
||||||
assert request.body == b'{}'
|
assert request.body == b'{}'
|
||||||
|
|
||||||
|
|
||||||
|
def test_decode_response_uncompressed():
|
||||||
|
recorded_response = {
|
||||||
|
"status": {
|
||||||
|
"message": "OK",
|
||||||
|
"code": 200
|
||||||
|
},
|
||||||
|
"headers": {
|
||||||
|
"content-length": ["10806"],
|
||||||
|
"date": ["Fri, 24 Oct 2014 18:35:37 GMT"],
|
||||||
|
"content-type": ["text/html; charset=utf-8"],
|
||||||
|
},
|
||||||
|
"body": {
|
||||||
|
"string": b""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert decode_response(recorded_response) == recorded_response
|
||||||
|
|
||||||
|
|
||||||
|
def test_decode_response_deflate():
|
||||||
|
body = 'deflate message'
|
||||||
|
deflate_response = {
|
||||||
|
'body': {'string': zlib.compress(body)},
|
||||||
|
'headers': {
|
||||||
|
'access-control-allow-credentials': ['true'],
|
||||||
|
'access-control-allow-origin': ['*'],
|
||||||
|
'connection': ['keep-alive'],
|
||||||
|
'content-encoding': ['deflate'],
|
||||||
|
'content-length': ['177'],
|
||||||
|
'content-type': ['application/json'],
|
||||||
|
'date': ['Wed, 02 Dec 2015 19:44:32 GMT'],
|
||||||
|
'server': ['nginx']
|
||||||
|
},
|
||||||
|
'status': {'code': 200, 'message': 'OK'}
|
||||||
|
}
|
||||||
|
decoded_response = decode_response(deflate_response)
|
||||||
|
assert decoded_response['body']['string'] == body
|
||||||
|
assert decoded_response['content-length'] == len(body)
|
||||||
|
|
||||||
|
|
||||||
|
def test_decode_response_gzip():
|
||||||
|
body = 'gzip message'
|
||||||
|
buf = cStringIO.StringIO()
|
||||||
|
with gzip.GzipFile('a', fileobj=buf, mode='wb') as f:
|
||||||
|
f.write(body)
|
||||||
|
compressed_body = buf.getvalue()
|
||||||
|
buf.close()
|
||||||
|
gzip_response = {
|
||||||
|
'body': {'string': compressed_body},
|
||||||
|
'headers': {
|
||||||
|
'access-control-allow-credentials': ['true'],
|
||||||
|
'access-control-allow-origin': ['*'],
|
||||||
|
'connection': ['keep-alive'],
|
||||||
|
'content-encoding': ['gzip'],
|
||||||
|
'content-length': ['177'],
|
||||||
|
'content-type': ['application/json'],
|
||||||
|
'date': ['Wed, 02 Dec 2015 19:44:32 GMT'],
|
||||||
|
'server': ['nginx']
|
||||||
|
},
|
||||||
|
'status': {'code': 200, 'message': 'OK'}
|
||||||
|
}
|
||||||
|
decoded_response = decode_response(gzip_response)
|
||||||
|
assert decoded_response['body']['string'] == body
|
||||||
|
assert decoded_response['content-length'] == len(body)
|
||||||
|
|||||||
Reference in New Issue
Block a user