1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-09 09:13:23 +00:00

matchers.py: Decode chunked request bodies

This commit is contained in:
Sebastian Pipping
2023-07-06 21:51:44 +02:00
parent e35205c5c8
commit a6b9a070a5

View File

@@ -2,9 +2,13 @@ import json
import logging
import urllib
import xmlrpc.client
from string import hexdigits
from typing import List, Set
from .util import read_body
_HEXDIG_CODE_POINTS: Set[int] = {ord(s.encode("ascii")) for s in hexdigits}
log = logging.getLogger(__name__)
@@ -78,6 +82,62 @@ def _header_checker(value, header="Content-Type"):
return checker
def _dechunk(body):
if isinstance(body, str):
body = body.encode("utf-8")
elif isinstance(body, bytearray):
body = bytes(body)
elif hasattr(body, "__iter__"):
body = list(body)
if body:
if isinstance(body[0], str):
body = ("".join(body)).encode("utf-8")
elif isinstance(body[0], bytes):
body = b"".join(body)
elif isinstance(body[0], int):
body = bytes(body)
else:
raise ValueError(f"Body chunk type {type(body[0])} not supported")
else:
body = None
if not isinstance(body, bytes):
return body
# Now decode chunked data format (https://en.wikipedia.org/wiki/Chunked_transfer_encoding)
# Example input: b"45\r\n<69 bytes>\r\n0\r\n\r\n" where int(b"45", 16) == 69.
CHUNK_GAP = b"\r\n"
BODY_LEN: int = len(body)
chunks: List[bytes] = []
pos: int = 0
while True:
for i in range(pos, BODY_LEN):
if body[i] not in _HEXDIG_CODE_POINTS:
break
if i == 0 or body[i : i + len(CHUNK_GAP)] != CHUNK_GAP:
if pos == 0:
return body # i.e. assume non-chunk data
raise ValueError("Malformed chunked data")
size_bytes = int(body[pos:i], 16)
if size_bytes == 0: # i.e. well-formed ending
return b"".join(chunks)
chunk_data_first = i + len(CHUNK_GAP)
chunk_data_after_last = chunk_data_first + size_bytes
if body[chunk_data_after_last : chunk_data_after_last + len(CHUNK_GAP)] != CHUNK_GAP:
raise ValueError("Malformed chunked data")
chunk_data = body[chunk_data_first:chunk_data_after_last]
chunks.append(chunk_data)
pos = chunk_data_after_last + len(CHUNK_GAP)
def _transform_json(body):
# Request body is always a byte string, but json.loads() wants a text
# string. RFC 7159 says the default encoding is UTF-8 (although UTF-16
@@ -89,6 +149,7 @@ def _transform_json(body):
_xml_header_checker = _header_checker("text/xml")
_xmlrpc_header_checker = _header_checker("xmlrpc", header="User-Agent")
_checker_transformer_pairs = (
(_header_checker("chunked", header="Transfer-Encoding"), _dechunk),
(
_header_checker("application/x-www-form-urlencoded"),
lambda body: urllib.parse.parse_qs(body.decode("ascii")),