1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-08 16:53:23 +00:00

Enable brotli decompression if it is available (#620)

* Enable brotli decompression if it is available

* Apply PR feedback
This commit is contained in:
immerrr again
2025-12-05 22:50:36 +01:00
committed by GitHub
parent 3f78330c1e
commit b28316ab10
2 changed files with 76 additions and 31 deletions

View File

@@ -6,6 +6,49 @@ from urllib.parse import urlencode, urlparse, urlunparse
from .util import CaseInsensitiveDict
try:
# This supports both brotli & brotlipy packages
import brotli
except ImportError:
try:
import brotlicffi as brotli
except ImportError:
brotli = None
def decompress_deflate(body):
try:
return zlib.decompress(body)
except zlib.error:
# Assume the response was already decompressed
return body
def decompress_gzip(body):
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS = {
"deflate": decompress_deflate,
"gzip": decompress_gzip,
}
if brotli is not None:
def decompress_brotli(body):
try:
return brotli.decompress(body)
except brotli.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
def replace_headers(request, replacements):
"""Replace headers in request according to replacements.
@@ -136,45 +179,30 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
def decode_response(response):
"""
If the response is compressed with gzip or deflate:
If the response is compressed with any supported compression (gzip,
deflate, br if available):
1. decompress the response body
2. delete the content-encoding header
3. update content-length header to decompressed length
"""
def is_compressed(headers):
encoding = headers.get("content-encoding", [])
return encoding and encoding[0] in ("gzip", "deflate")
def decompress_body(body, encoding):
"""Returns decompressed body according to encoding using zlib.
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
"""
if not body:
return ""
if encoding == "gzip":
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.
response = copy.deepcopy(response)
headers = CaseInsensitiveDict(response["headers"])
if is_compressed(headers):
encoding = headers["content-encoding"][0]
headers["content-encoding"].remove(encoding)
if not headers["content-encoding"]:
del headers["content-encoding"]
content_encoding = headers.get("content-encoding")
if not content_encoding:
return response
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
if not decompressor:
return response
new_body = decompress_body(response["body"]["string"], encoding)
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
headers["content-encoding"].remove(content_encoding[0])
if not headers["content-encoding"]:
del headers["content-encoding"]
new_body = decompressor(response["body"]["string"])
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
return response