diff --git a/vcr/matchers.py b/vcr/matchers.py index f397e72..70e7891 100644 --- a/vcr/matchers.py +++ b/vcr/matchers.py @@ -2,9 +2,13 @@ import json import logging import urllib import xmlrpc.client +from string import hexdigits +from typing import List, Set from .util import read_body +_HEXDIG_CODE_POINTS: Set[int] = {ord(s.encode("ascii")) for s in hexdigits} + log = logging.getLogger(__name__) @@ -78,6 +82,62 @@ def _header_checker(value, header="Content-Type"): return checker +def _dechunk(body): + if isinstance(body, str): + body = body.encode("utf-8") + elif isinstance(body, bytearray): + body = bytes(body) + elif hasattr(body, "__iter__"): + body = list(body) + if body: + if isinstance(body[0], str): + body = ("".join(body)).encode("utf-8") + elif isinstance(body[0], bytes): + body = b"".join(body) + elif isinstance(body[0], int): + body = bytes(body) + else: + raise ValueError(f"Body chunk type {type(body[0])} not supported") + else: + body = None + + if not isinstance(body, bytes): + return body + + # Now decode chunked data format (https://en.wikipedia.org/wiki/Chunked_transfer_encoding) + # Example input: b"45\r\n<69 bytes>\r\n0\r\n\r\n" where int(b"45", 16) == 69. + CHUNK_GAP = b"\r\n" + BODY_LEN: int = len(body) + + chunks: List[bytes] = [] + pos: int = 0 + + while True: + for i in range(pos, BODY_LEN): + if body[i] not in _HEXDIG_CODE_POINTS: + break + + if i == 0 or body[i : i + len(CHUNK_GAP)] != CHUNK_GAP: + if pos == 0: + return body # i.e. assume non-chunk data + raise ValueError("Malformed chunked data") + + size_bytes = int(body[pos:i], 16) + if size_bytes == 0: # i.e. well-formed ending + return b"".join(chunks) + + chunk_data_first = i + len(CHUNK_GAP) + chunk_data_after_last = chunk_data_first + size_bytes + + if body[chunk_data_after_last : chunk_data_after_last + len(CHUNK_GAP)] != CHUNK_GAP: + raise ValueError("Malformed chunked data") + + chunk_data = body[chunk_data_first:chunk_data_after_last] + chunks.append(chunk_data) + + pos = chunk_data_after_last + len(CHUNK_GAP) + + def _transform_json(body): # Request body is always a byte string, but json.loads() wants a text # string. RFC 7159 says the default encoding is UTF-8 (although UTF-16 @@ -89,6 +149,7 @@ def _transform_json(body): _xml_header_checker = _header_checker("text/xml") _xmlrpc_header_checker = _header_checker("xmlrpc", header="User-Agent") _checker_transformer_pairs = ( + (_header_checker("chunked", header="Transfer-Encoding"), _dechunk), ( _header_checker("application/x-www-form-urlencoded"), lambda body: urllib.parse.parse_qs(body.decode("ascii")),