mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-10 01:25:34 +00:00
support python 3.4
This commit is contained in:
@@ -96,9 +96,10 @@ def test_filter_callback(tmpdir):
|
|||||||
|
|
||||||
def test_decompress_gzip(tmpdir):
|
def test_decompress_gzip(tmpdir):
|
||||||
url = 'http://httpbin.org/gzip'
|
url = 'http://httpbin.org/gzip'
|
||||||
|
request = Request(url, headers={'Accept-Encoding': ['gzip, deflate']})
|
||||||
cass_file = str(tmpdir.join('gzip_response.yaml'))
|
cass_file = str(tmpdir.join('gzip_response.yaml'))
|
||||||
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||||
urlopen(url)
|
urlopen(request)
|
||||||
with vcr.use_cassette(cass_file) as cass:
|
with vcr.use_cassette(cass_file) as cass:
|
||||||
decoded_response = urlopen(url).read()
|
decoded_response = urlopen(url).read()
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
@@ -107,9 +108,10 @@ def test_decompress_gzip(tmpdir):
|
|||||||
|
|
||||||
def test_decompress_deflate(tmpdir):
|
def test_decompress_deflate(tmpdir):
|
||||||
url = 'http://httpbin.org/deflate'
|
url = 'http://httpbin.org/deflate'
|
||||||
|
request = Request(url, headers={'Accept-Encoding': ['gzip, deflate']})
|
||||||
cass_file = str(tmpdir.join('deflate_response.yaml'))
|
cass_file = str(tmpdir.join('deflate_response.yaml'))
|
||||||
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
with vcr.use_cassette(cass_file, decode_compressed_response=True):
|
||||||
urlopen(url)
|
urlopen(request)
|
||||||
with vcr.use_cassette(cass_file) as cass:
|
with vcr.use_cassette(cass_file) as cass:
|
||||||
decoded_response = urlopen(url).read()
|
decoded_response = urlopen(url).read()
|
||||||
assert_cassette_has_one_response(cass)
|
assert_cassette_has_one_response(cass)
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
from six import BytesIO
|
||||||
from vcr.filters import (
|
from vcr.filters import (
|
||||||
remove_headers, replace_headers,
|
remove_headers, replace_headers,
|
||||||
remove_query_parameters, replace_query_parameters,
|
remove_query_parameters, replace_query_parameters,
|
||||||
@@ -6,7 +7,6 @@ from vcr.filters import (
|
|||||||
)
|
)
|
||||||
from vcr.compat import mock
|
from vcr.compat import mock
|
||||||
from vcr.request import Request
|
from vcr.request import Request
|
||||||
import cStringIO
|
|
||||||
import gzip
|
import gzip
|
||||||
import json
|
import json
|
||||||
import zlib
|
import zlib
|
||||||
@@ -225,7 +225,7 @@ def test_decode_response_uncompressed():
|
|||||||
|
|
||||||
|
|
||||||
def test_decode_response_deflate():
|
def test_decode_response_deflate():
|
||||||
body = 'deflate message'
|
body = b'deflate message'
|
||||||
deflate_response = {
|
deflate_response = {
|
||||||
'body': {'string': zlib.compress(body)},
|
'body': {'string': zlib.compress(body)},
|
||||||
'headers': {
|
'headers': {
|
||||||
@@ -242,14 +242,17 @@ def test_decode_response_deflate():
|
|||||||
}
|
}
|
||||||
decoded_response = decode_response(deflate_response)
|
decoded_response = decode_response(deflate_response)
|
||||||
assert decoded_response['body']['string'] == body
|
assert decoded_response['body']['string'] == body
|
||||||
assert decoded_response['content-length'] == len(body)
|
assert decoded_response['headers']['content-length'] == [str(len(body))]
|
||||||
|
|
||||||
|
|
||||||
def test_decode_response_gzip():
|
def test_decode_response_gzip():
|
||||||
body = 'gzip message'
|
body = b'gzip message'
|
||||||
buf = cStringIO.StringIO()
|
|
||||||
with gzip.GzipFile('a', fileobj=buf, mode='wb') as f:
|
buf = BytesIO()
|
||||||
f.write(body)
|
f = gzip.GzipFile('a', fileobj=buf, mode='wb')
|
||||||
|
f.write(body)
|
||||||
|
f.close()
|
||||||
|
|
||||||
compressed_body = buf.getvalue()
|
compressed_body = buf.getvalue()
|
||||||
buf.close()
|
buf.close()
|
||||||
gzip_response = {
|
gzip_response = {
|
||||||
@@ -268,4 +271,4 @@ def test_decode_response_gzip():
|
|||||||
}
|
}
|
||||||
decoded_response = decode_response(gzip_response)
|
decoded_response = decode_response(gzip_response)
|
||||||
assert decoded_response['body']['string'] == body
|
assert decoded_response['body']['string'] == body
|
||||||
assert decoded_response['content-length'] == len(body)
|
assert decoded_response['headers']['content-length'] == [str(len(body))]
|
||||||
|
|||||||
@@ -1,8 +1,11 @@
|
|||||||
from six import BytesIO, text_type
|
from six import BytesIO, text_type
|
||||||
from six.moves.urllib.parse import urlparse, urlencode, urlunparse
|
from six.moves.urllib.parse import urlparse, urlencode, urlunparse
|
||||||
|
import copy
|
||||||
import json
|
import json
|
||||||
import zlib
|
import zlib
|
||||||
|
|
||||||
|
from .util import CaseInsensitiveDict
|
||||||
|
|
||||||
|
|
||||||
def replace_headers(request, replacements):
|
def replace_headers(request, replacements):
|
||||||
"""
|
"""
|
||||||
@@ -130,8 +133,8 @@ def decode_response(response):
|
|||||||
2. delete the content-encoding header
|
2. delete the content-encoding header
|
||||||
3. update content-length header to decompressed length
|
3. update content-length header to decompressed length
|
||||||
"""
|
"""
|
||||||
def is_compressed(response):
|
def is_compressed(headers):
|
||||||
encoding = response['headers'].get('content-encoding', [])
|
encoding = headers.get('content-encoding', [])
|
||||||
return encoding and encoding[0] in ('gzip', 'deflate')
|
return encoding and encoding[0] in ('gzip', 'deflate')
|
||||||
|
|
||||||
def decompress_body(body, encoding):
|
def decompress_body(body, encoding):
|
||||||
@@ -143,15 +146,16 @@ def decode_response(response):
|
|||||||
else: # encoding == 'deflate'
|
else: # encoding == 'deflate'
|
||||||
return zlib.decompress(body)
|
return zlib.decompress(body)
|
||||||
|
|
||||||
if is_compressed(response):
|
headers = CaseInsensitiveDict(response['headers'])
|
||||||
response = response.copy()
|
if is_compressed(headers):
|
||||||
encoding = response['headers']['content-encoding'][0]
|
response = copy.deepcopy(response)
|
||||||
|
encoding = headers['content-encoding'][0]
|
||||||
response['headers']['content-encoding'].remove(encoding)
|
headers['content-encoding'].remove(encoding)
|
||||||
if not response['headers']['content-encoding']:
|
if not headers['content-encoding']:
|
||||||
del response['headers']['content-encoding']
|
del headers['content-encoding']
|
||||||
|
|
||||||
new_body = decompress_body(response['body']['string'], encoding)
|
new_body = decompress_body(response['body']['string'], encoding)
|
||||||
response['body']['string'] = new_body
|
response['body']['string'] = new_body
|
||||||
response['content-length'] = len(new_body)
|
headers['content-length'] = [str(len(new_body))]
|
||||||
|
response['headers'] = dict(headers)
|
||||||
return response
|
return response
|
||||||
|
|||||||
Reference in New Issue
Block a user