mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-08 16:53:23 +00:00
Apply PR feedback
This commit is contained in:
@@ -16,9 +16,38 @@ except ImportError:
|
|||||||
brotli = None
|
brotli = None
|
||||||
|
|
||||||
|
|
||||||
AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"}
|
def decompress_deflate(body):
|
||||||
|
try:
|
||||||
|
return zlib.decompress(body)
|
||||||
|
except zlib.error:
|
||||||
|
# Assume the response was already decompressed
|
||||||
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
def decompress_gzip(body):
|
||||||
|
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
|
||||||
|
try:
|
||||||
|
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||||
|
except zlib.error:
|
||||||
|
# Assume the response was already decompressed
|
||||||
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
AVAILABLE_DECOMPRESSORS = {
|
||||||
|
"deflate": decompress_deflate,
|
||||||
|
"gzip": decompress_gzip,
|
||||||
|
}
|
||||||
|
|
||||||
if brotli is not None:
|
if brotli is not None:
|
||||||
AVAILABLE_DECOMPRESSORS.add("br")
|
|
||||||
|
def decompress_brotli(body):
|
||||||
|
try:
|
||||||
|
return brotli.decompress(body)
|
||||||
|
except brotli.error:
|
||||||
|
# Assume the response was already decompressed
|
||||||
|
return body
|
||||||
|
|
||||||
|
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
|
||||||
|
|
||||||
|
|
||||||
def replace_headers(request, replacements):
|
def replace_headers(request, replacements):
|
||||||
@@ -157,45 +186,23 @@ def decode_response(response):
|
|||||||
3. update content-length header to decompressed length
|
3. update content-length header to decompressed length
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def is_decompressable(headers):
|
|
||||||
encoding = headers.get("content-encoding", [])
|
|
||||||
return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS
|
|
||||||
|
|
||||||
def decompress_body(body, encoding):
|
|
||||||
"""Returns decompressed body according to encoding using zlib.
|
|
||||||
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
|
|
||||||
"""
|
|
||||||
if not body:
|
|
||||||
return ""
|
|
||||||
if encoding == "gzip":
|
|
||||||
try:
|
|
||||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
|
||||||
except zlib.error:
|
|
||||||
return body # assumes that the data was already decompressed
|
|
||||||
elif encoding == 'deflate':
|
|
||||||
try:
|
|
||||||
return zlib.decompress(body)
|
|
||||||
except zlib.error:
|
|
||||||
return body # assumes that the data was already decompressed
|
|
||||||
else: # encoding == 'br'
|
|
||||||
try:
|
|
||||||
return brotli.decompress(body)
|
|
||||||
except brotli.error:
|
|
||||||
return body # assumes that the data was already decompressed
|
|
||||||
|
|
||||||
|
|
||||||
# Deepcopy here in case `headers` contain objects that could
|
# Deepcopy here in case `headers` contain objects that could
|
||||||
# be mutated by a shallow copy and corrupt the real response.
|
# be mutated by a shallow copy and corrupt the real response.
|
||||||
response = copy.deepcopy(response)
|
response = copy.deepcopy(response)
|
||||||
headers = CaseInsensitiveDict(response["headers"])
|
headers = CaseInsensitiveDict(response["headers"])
|
||||||
if is_decompressable(headers):
|
content_encoding = headers.get("content-encoding")
|
||||||
encoding = headers["content-encoding"][0]
|
if not content_encoding:
|
||||||
headers["content-encoding"].remove(encoding)
|
return response
|
||||||
if not headers["content-encoding"]:
|
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
|
||||||
del headers["content-encoding"]
|
if not decompressor:
|
||||||
|
return response
|
||||||
|
|
||||||
new_body = decompress_body(response["body"]["string"], encoding)
|
headers["content-encoding"].remove(content_encoding[0])
|
||||||
response["body"]["string"] = new_body
|
if not headers["content-encoding"]:
|
||||||
headers["content-length"] = [str(len(new_body))]
|
del headers["content-encoding"]
|
||||||
response["headers"] = dict(headers)
|
|
||||||
|
new_body = decompressor(response["body"]["string"])
|
||||||
|
response["body"]["string"] = new_body
|
||||||
|
headers["content-length"] = [str(len(new_body))]
|
||||||
|
response["headers"] = dict(headers)
|
||||||
return response
|
return response
|
||||||
|
|||||||
Reference in New Issue
Block a user