mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-08 16:53:23 +00:00
Format project with black (#467)
Format with line length 110 to match flake8 make black part of linting check Update travis spec for updated black requirements Add diff output for black on failure update changelog
This commit is contained in:
@@ -83,9 +83,9 @@ def replace_post_data_parameters(request, replacements):
|
||||
value or None.
|
||||
"""
|
||||
replacements = dict(replacements)
|
||||
if request.method == 'POST' and not isinstance(request.body, BytesIO):
|
||||
if request.headers.get('Content-Type') == 'application/json':
|
||||
json_data = json.loads(request.body.decode('utf-8'))
|
||||
if request.method == "POST" and not isinstance(request.body, BytesIO):
|
||||
if request.headers.get("Content-Type") == "application/json":
|
||||
json_data = json.loads(request.body.decode("utf-8"))
|
||||
for k, rv in replacements.items():
|
||||
if k in json_data:
|
||||
ov = json_data.pop(k)
|
||||
@@ -93,28 +93,26 @@ def replace_post_data_parameters(request, replacements):
|
||||
rv = rv(key=k, value=ov, request=request)
|
||||
if rv is not None:
|
||||
json_data[k] = rv
|
||||
request.body = json.dumps(json_data).encode('utf-8')
|
||||
request.body = json.dumps(json_data).encode("utf-8")
|
||||
else:
|
||||
if isinstance(request.body, text_type):
|
||||
request.body = request.body.encode('utf-8')
|
||||
splits = [p.partition(b'=') for p in request.body.split(b'&')]
|
||||
request.body = request.body.encode("utf-8")
|
||||
splits = [p.partition(b"=") for p in request.body.split(b"&")]
|
||||
new_splits = []
|
||||
for k, sep, ov in splits:
|
||||
if sep is None:
|
||||
new_splits.append((k, sep, ov))
|
||||
else:
|
||||
rk = k.decode('utf-8')
|
||||
rk = k.decode("utf-8")
|
||||
if rk not in replacements:
|
||||
new_splits.append((k, sep, ov))
|
||||
else:
|
||||
rv = replacements[rk]
|
||||
if callable(rv):
|
||||
rv = rv(key=rk, value=ov.decode('utf-8'),
|
||||
request=request)
|
||||
rv = rv(key=rk, value=ov.decode("utf-8"), request=request)
|
||||
if rv is not None:
|
||||
new_splits.append((k, sep, rv.encode('utf-8')))
|
||||
request.body = b'&'.join(k if sep is None else b''.join([k, sep, v])
|
||||
for k, sep, v in new_splits)
|
||||
new_splits.append((k, sep, rv.encode("utf-8")))
|
||||
request.body = b"&".join(k if sep is None else b"".join([k, sep, v]) for k, sep, v in new_splits)
|
||||
return request
|
||||
|
||||
|
||||
@@ -133,15 +131,16 @@ def decode_response(response):
|
||||
2. delete the content-encoding header
|
||||
3. update content-length header to decompressed length
|
||||
"""
|
||||
|
||||
def is_compressed(headers):
|
||||
encoding = headers.get('content-encoding', [])
|
||||
return encoding and encoding[0] in ('gzip', 'deflate')
|
||||
encoding = headers.get("content-encoding", [])
|
||||
return encoding and encoding[0] in ("gzip", "deflate")
|
||||
|
||||
def decompress_body(body, encoding):
|
||||
"""Returns decompressed body according to encoding using zlib.
|
||||
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
|
||||
"""
|
||||
if encoding == 'gzip':
|
||||
if encoding == "gzip":
|
||||
return zlib.decompress(body, zlib.MAX_WBITS | 16)
|
||||
else: # encoding == 'deflate'
|
||||
return zlib.decompress(body)
|
||||
@@ -149,15 +148,15 @@ def decode_response(response):
|
||||
# Deepcopy here in case `headers` contain objects that could
|
||||
# be mutated by a shallow copy and corrupt the real response.
|
||||
response = copy.deepcopy(response)
|
||||
headers = CaseInsensitiveDict(response['headers'])
|
||||
headers = CaseInsensitiveDict(response["headers"])
|
||||
if is_compressed(headers):
|
||||
encoding = headers['content-encoding'][0]
|
||||
headers['content-encoding'].remove(encoding)
|
||||
if not headers['content-encoding']:
|
||||
del headers['content-encoding']
|
||||
encoding = headers["content-encoding"][0]
|
||||
headers["content-encoding"].remove(encoding)
|
||||
if not headers["content-encoding"]:
|
||||
del headers["content-encoding"]
|
||||
|
||||
new_body = decompress_body(response['body']['string'], encoding)
|
||||
response['body']['string'] = new_body
|
||||
headers['content-length'] = [str(len(new_body))]
|
||||
response['headers'] = dict(headers)
|
||||
new_body = decompress_body(response["body"]["string"], encoding)
|
||||
response["body"]["string"] = new_body
|
||||
headers["content-length"] = [str(len(new_body))]
|
||||
response["headers"] = dict(headers)
|
||||
return response
|
||||
|
||||
Reference in New Issue
Block a user