1
0
mirror of https://github.com/kevin1024/vcrpy.git synced 2025-12-09 09:13:23 +00:00

Compare commits

..

2 Commits

Author SHA1 Message Date
Sebastian Pipping
f7d3d7a142 Relax Werkzeug==2.0.3 into Werkzeug<3 2023-12-10 23:51:30 +01:00
Sebastian Pipping
85e280bd35 [REVERT ME] Cover pushes even before a pull request 2023-12-10 23:51:30 +01:00
66 changed files with 867 additions and 1362 deletions

View File

@@ -13,10 +13,10 @@ permissions:
jobs:
codespell:
name: Check for spelling errors
runs-on: ubuntu-24.04
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v6
uses: actions/checkout@v4
- name: Codespell
uses: codespell-project/actions-codespell@v2

View File

@@ -7,14 +7,14 @@ on:
jobs:
validate:
runs-on: ubuntu-24.04
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
- uses: actions/checkout@v4
- uses: actions/setup-python@v4
with:
python-version: "3.12"
- name: Install build dependencies
run: pip install -r docs/requirements.txt
- name: Rendering HTML documentation

View File

@@ -2,60 +2,40 @@ name: Test
on:
push:
branches:
- master
pull_request:
schedule:
- cron: "0 16 * * 5" # Every Friday 4pm
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-24.04
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version:
- "3.10"
- "3.11"
- "3.12"
- "3.13"
- "pypy-3.11"
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "pypy-3.8", "pypy-3.9", "pypy-3.10"]
steps:
- uses: actions/checkout@v6
- name: Install uv
uses: astral-sh/setup-uv@v7
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v6
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
allow-prereleases: true
- name: Install project dependencies
run: |
uv pip install --system --upgrade pip setuptools
uv pip install --system codecov '.[tests]'
uv pip check
pip3 install --upgrade pip
pip3 install codecov tox tox-gh-actions
- name: Allow creation of user namespaces (e.g. to the unshare command)
run: |
# .. so that we don't get error:
# unshare: write failed /proc/self/uid_map: Operation not permitted
# Idea from https://github.com/YoYoGames/GameMaker-Bugs/issues/6015#issuecomment-2135552784 .
sudo sysctl kernel.apparmor_restrict_unprivileged_userns=0
- name: Run online tests with tox
run: tox -- -m online
- name: Run online tests
run: ./runtests.sh --cov=./vcr --cov-branch --cov-report=xml --cov-append -m online
- name: Run offline tests with no access to the Internet
- name: Run offline tests with tox with no access to the Internet
run: |
# We're using unshare to take Internet access
# away so that we'll notice whenever some new test
# away from tox so that we'll notice whenever some new test
# is missing @pytest.mark.online decoration in the future
unshare --map-root-user --net -- \
sh -c 'ip link set lo up; ./runtests.sh --cov=./vcr --cov-branch --cov-report=xml --cov-append -m "not online"'
sh -c 'ip link set lo up; tox -- -m "not online"'
- name: Run coverage
run: codecov

View File

@@ -1,62 +0,0 @@
# Copyright (c) 2023 Sebastian Pipping <sebastian@pipping.org>
# Licensed under the MIT license
name: Detect outdated pre-commit hooks
on:
schedule:
- cron: '0 16 * * 5' # Every Friday 4pm
# NOTE: This will drop all permissions from GITHUB_TOKEN except metadata read,
# and then (re)add the ones listed below:
permissions:
contents: write
pull-requests: write
jobs:
pre_commit_detect_outdated:
name: Detect outdated pre-commit hooks
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- name: Set up Python 3.12
uses: actions/setup-python@v6
with:
python-version: 3.12
- name: Install pre-commit
run: |-
pip install \
--disable-pip-version-check \
--no-warn-script-location \
--user \
pre-commit
echo "PATH=${HOME}/.local/bin:${PATH}" >> "${GITHUB_ENV}"
- name: Check for outdated hooks
run: |-
pre-commit autoupdate
git diff -- .pre-commit-config.yaml
- name: Create pull request from changes (if any)
id: create-pull-request
uses: peter-evans/create-pull-request@v7
with:
author: 'pre-commit <pre-commit@tools.invalid>'
base: master
body: |-
For your consideration.
:warning: Please **CLOSE AND RE-OPEN** this pull request so that [further workflow runs get triggered](https://github.com/peter-evans/create-pull-request/blob/main/docs/concepts-guidelines.md#triggering-further-workflow-runs) for this pull request.
branch: precommit-autoupdate
commit-message: "pre-commit: Autoupdate"
delete-branch: true
draft: true
labels: enhancement
title: "pre-commit: Autoupdate"
- name: Log pull request URL
if: "${{ steps.create-pull-request.outputs.pull-request-url }}"
run: |
echo "Pull request URL is: ${{ steps.create-pull-request.outputs.pull-request-url }}"

View File

@@ -1,20 +0,0 @@
# Copyright (c) 2023 Sebastian Pipping <sebastian@pipping.org>
# Licensed under the MIT license
name: Run pre-commit
on:
- pull_request
- push
- workflow_dispatch
jobs:
pre-commit:
name: Run pre-commit
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v6
- uses: actions/setup-python@v6
with:
python-version: 3.12
- uses: pre-commit/action@v3.0.1

View File

@@ -1,17 +0,0 @@
# Copyright (c) 2023 Sebastian Pipping <sebastian@pipping.org>
# Licensed under the MIT license
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.6
hooks:
- id: ruff
args: ["--output-format=full"]
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
hooks:
- id: check-merge-conflict
- id: end-of-file-fixer
- id: trailing-whitespace

View File

@@ -7,7 +7,7 @@ version: 2
# Set the version of Python and other tools you might need
build:
os: ubuntu-24.04
os: ubuntu-22.04
tools:
python: "3.12"

View File

@@ -1,5 +1,6 @@
include README.rst
include LICENSE.txt
include tox.ini
recursive-include tests *
recursive-exclude * __pycache__
recursive-exclude * *.py[co]

View File

@@ -4,7 +4,7 @@ VCR.py 📼
###########
|PyPI| |Python versions| |Build Status| |CodeCov| |Gitter|
|PyPI| |Python versions| |Build Status| |CodeCov| |Gitter| |CodeStyleBlack|
----
@@ -70,3 +70,6 @@ more details
.. |CodeCov| image:: https://codecov.io/gh/kevin1024/vcrpy/branch/master/graph/badge.svg
:target: https://codecov.io/gh/kevin1024/vcrpy
:alt: Code Coverage Status
.. |CodeStyleBlack| image:: https://img.shields.io/badge/code%20style-black-000000.svg
:target: https://github.com/psf/black
:alt: Code Style: black

View File

@@ -24,4 +24,4 @@
<stop offset="1" stop-color="#27DDA6"/>
</linearGradient>
</defs>
</svg>
</svg>

Before

Width:  |  Height:  |  Size: 6.2 KiB

After

Width:  |  Height:  |  Size: 6.2 KiB

View File

@@ -16,7 +16,7 @@ a nice addition. Here's an example:
with vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml') as cass:
response = urllib2.urlopen('http://www.zombo.com/').read()
# cass should have 1 request inside it
assert len(cass) == 1
assert len(cass) == 1
# the request uri should have been http://www.zombo.com/
assert cass.requests[0].uri == 'http://www.zombo.com/'
@@ -208,7 +208,7 @@ So these two calls are the same:
# original (still works)
vcr = VCR(filter_headers=['authorization'])
# new
vcr = VCR(filter_headers=[('authorization', None)])
@@ -218,7 +218,7 @@ Here are two examples of the new functionality:
# replace with a static value (most common)
vcr = VCR(filter_headers=[('authorization', 'XXXXXX')])
# replace with a callable, for example when testing
# lots of different kinds of authorization.
def replace_auth(key, value, request):
@@ -286,7 +286,7 @@ sensitive data from the response body:
before_record_response=scrub_string(settings.USERNAME, 'username'),
)
with my_vcr.use_cassette('test.yml'):
# your http code here
# your http code here
Decode compressed response
@@ -427,16 +427,3 @@ If you want to save the cassette only when the test succeeds, set the Cassette
# Since there was an exception, the cassette file hasn't been created.
assert not os.path.exists('fixtures/vcr_cassettes/synopsis.yaml')
Drop unused requests
--------------------
Even if any HTTP request is changed or removed from tests, previously recorded
interactions remain in the cassette file. If set the ``drop_unused_requests``
option to ``True``, VCR will not save old HTTP interactions if they are not used.
.. code:: python
my_vcr = VCR(drop_unused_requests=True)
with my_vcr.use_cassette('fixtures/vcr_cassettes/synopsis.yaml'):
... # your HTTP interactions here

View File

@@ -7,42 +7,6 @@ For a full list of triaged issues, bugs and PRs and what release they are target
All help in providing PRs to close out bug issues is appreciated. Even if that is providing a repo that fully replicates issues. We have very generous contributors that have added these to bug issues which meant another contributor picked up the bug and closed it out.
- 8.0.0
- BREAKING: Drop support for Python 3.9 (major version bump) - thanks @jairhenrique
- BREAKING: Drop support for urllib3 < 2 - fixes CVE warnings from urllib3 1.x (#926, #880) - thanks @jairhenrique
- New feature: ``drop_unused_requests`` option to remove unused interactions from cassettes (#763) - thanks @danielnsilva
- Rewrite httpx support to patch httpcore instead of httpx (#943) - thanks @seowalex
- Fixes ``httpx.ResponseNotRead`` exceptions (#832, #834)
- Fixes ``KeyError: 'follow_redirects'`` (#945)
- Adds support for custom httpx transports
- Fix HTTPS proxy handling - proxy address no longer ends up in cassette URIs (#809, #914) - thanks @alga
- Fix ``iscoroutinefunction`` deprecation warning on Python 3.14 - thanks @kloczek
- Only log message if response is appended - thanks @talfus-laddus
- Optimize urllib.parse calls - thanks @Martin-Brunthaler
- Fix CI for Ubuntu 24.04 - thanks @hartwork
- Various CI improvements: migrate to uv, update GitHub Actions - thanks @jairhenrique
- Various linting and test improvements - thanks @jairhenrique and @hartwork
- 7.0.0
- Drop support for python 3.8 (major version bump) - thanks @jairhenrique
- Various linting and test fixes - thanks @jairhenrique
- Bugfix for urllib2>=2.3.0 - missing version_string (#888)
- Bugfix for asyncio.run - thanks @alekeik1
- 6.0.2
- Ensure body is consumed only once (#846) - thanks @sathieu
- Permit urllib3 2.x for non-PyPy Python >=3.10
- Fix typos in test commands - thanks @chuckwondo
- Several test and workflow improvements - thanks @hartwork and @graingert
- 6.0.1
- Bugfix with to Tornado cassette generator (thanks @graingert)
- 6.0.0
- BREAKING: Fix issue with httpx support (thanks @parkerhancock) in #784. NOTE: You may have to recreate some of your cassettes produced in previous releases due to the binary format being saved incorrectly in previous releases
- BREAKING: Drop support for `boto` (vcrpy still supports boto3, but is dropping the deprecated `boto` support in this release. (thanks @jairhenrique)
- Fix compatibility issue with Python 3.12 (thanks @hartwork)
- Drop simplejson (fixes some compatibility issues) (thanks @jairhenrique)
- Run CI on Python 3.12 and PyPy 3.9-3.10 (thanks @mgorny)
- Various linting and docs improvements (thanks @jairhenrique)
- Tornado fixes (thanks @graingert)
- 5.1.0
- Use ruff for linting (instead of current flake8/isort/pyflakes) - thanks @jairhenrique
- Enable rule B (flake8-bugbear) on ruff - thanks @jairhenrique
@@ -323,3 +287,4 @@ All help in providing PRs to close out bug issues is appreciated. Even if that i
- Add support for requests / urllib3
- 0.0.1
- Initial Release

View File

@@ -316,5 +316,5 @@ texinfo_documents = [
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {"python": ("https://docs.python.org/3", None)}
intersphinx_mapping = {"https://docs.python.org/": None}
html_theme = "alabaster"

View File

@@ -24,7 +24,7 @@ So whilst reporting issues are valuable, please consider:
- contributing an issue with a toy repo that replicates the issue.
- contributing PRs is a more valuable donation of your time and effort.
Thanks again for your interest and support in VCRpy.
Thanks again for your interest and support in VCRpy.
We really appreciate it.
@@ -57,7 +57,7 @@ Simply adding these three labels for incoming issues means a lot for maintaining
- Which library does it affect? ``core``, ``aiohttp``, ``requests``, ``urllib3``, ``tornado4``, ``httplib2``
- If it is a bug, is it ``Verified Can Replicate`` or ``Requires Help Replicating``
- Thanking people for raising issues. Feedback is always appreciated.
- Politely asking if they are able to link to an example repo that replicates the issue if they haven't already. Being able to *clone and go* helps the next person and we like that. 😃
- Politely asking if they are able to link to an example repo that replicates the issue if they haven't already. Being able to *clone and go* helps the next person and we like that. 😃
**Maintainer:**
@@ -68,7 +68,7 @@ This involves creating PRs to address bugs and enhancement requests. It also mea
The PR reviewer is a second set of eyes to see if:
- Are there tests covering the code paths added/modified?
- Do the tests and modifications make sense seem appropriate?
- Add specific feedback, even on approvals, why it is accepted. eg "I like how you use a context manager there. 😄 "
- Add specific feedback, even on approvals, why it is accepted. eg "I like how you use a context manager there. 😄 "
- Also make sure they add a line to `docs/changelog.rst` to claim credit for their contribution.
**Release Manager:**
@@ -83,21 +83,39 @@ The PR reviewer is a second set of eyes to see if:
Running VCR's test suite
------------------------
The tests are all run automatically on `Github Actions CI <https://github.com/kevin1024/vcrpy/actions>`__,
but you can also run them yourself using `pytest <http://pytest.org/>`__.
The tests are all run automatically on `Travis
CI <https://travis-ci.org/kevin1024/vcrpy>`__, but you can also run them
yourself using `pytest <http://pytest.org/>`__ and
`Tox <http://tox.testrun.org/>`__.
In order for the boto3 tests to run, you will need an AWS key.
Tox will automatically run them in all environments VCR.py supports if they are available on your `PATH`. Alternatively you can use `tox-pyenv <https://pypi.org/project/tox-pyenv/>`_ with
`pyenv <https://github.com/pyenv/pyenv>`_.
We recommend you read the documentation for each and see the section further below.
The test suite is pretty big and slow, but you can tell tox to only run specific tests like this::
tox -e {pyNN}-{HTTP_LIBRARY} -- <pytest flags passed through>
tox -e py38-requests -- -v -k "'test_status_code or test_gzip'"
tox -e py38-requests -- -v --last-failed
This will run only tests that look like ``test_status_code`` or
``test_gzip`` in the test suite, and only in the python 3.8 environment
that has ``requests`` installed.
Also, in order for the boto3 tests to run, you will need an AWS key.
Refer to the `boto3
documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html>`__
for how to set this up. I have marked the boto3 tests as optional in
Travis so you don't have to worry about them failing if you submit a
pull request.
Using Pyenv with VCR's test suite
Using PyEnv with VCR's test suite
---------------------------------
Pyenv is a tool for managing multiple installation of python on your system.
See the full documentation at their `github <https://github.com/pyenv/pyenv>`_
PyEnv is a tool for managing multiple installation of python on your system.
See the full documentation at their `github <https://github.com/pyenv/pyenv>`_
but we are also going to use `tox-pyenv <https://pypi.org/project/tox-pyenv/>`_
in this example::
git clone https://github.com/pyenv/pyenv ~/.pyenv
@@ -108,21 +126,27 @@ in this example::
# Setup shim paths
eval "$(pyenv init -)"
# Setup your local system tox tooling
pip3 install tox tox-pyenv
# Install supported versions (at time of writing), this does not activate them
pyenv install 3.12.0 pypy3.10
pyenv install 3.8.0 pypy3.8
# This activates them
pyenv local 3.12.0 pypy3.10
pyenv local 3.8.0 pypy3.8
# Run the whole test suite
pip install .[tests]
./runtests.sh
tox
# Run the whole test suite or just part of it
tox -e lint
tox -e py38-requests
Troubleshooting on MacOSX
-------------------------
If you have this kind of error when running tests :
If you have this kind of error when running tox :
.. code:: python

View File

@@ -9,7 +9,7 @@ with pip::
Compatibility
-------------
VCR.py supports Python 3.9+, and `pypy <http://pypy.org>`__.
VCR.py supports Python 3.8+, and `pypy <http://pypy.org>`__.
The following HTTP libraries are supported:
@@ -22,7 +22,6 @@ The following HTTP libraries are supported:
- ``urllib2``
- ``urllib3``
- ``httpx``
- ``httpcore``
Speed
-----

View File

@@ -1,2 +1,2 @@
sphinx<9
sphinx_rtd_theme==3.0.2
sphinx<8
sphinx_rtd_theme==1.3.0

View File

@@ -1,18 +1,18 @@
[tool.black]
line-length=110
[tool.codespell]
skip = '.git,*.pdf,*.svg,.tox'
ignore-regex = "\\\\[fnrstv]"
#
# ignore-words-list = ''
[tool.pytest]
addopts = ["--strict-config", "--strict-markers"]
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"
markers = ["online"]
[tool.pytest.ini_options]
markers = [
"online",
]
[tool.ruff]
line-length = 110
target-version = "py310"
[tool.ruff.lint]
select = [
"B", # flake8-bugbear
"C4", # flake8-comprehensions
@@ -25,8 +25,9 @@ select = [
"RUF", # Ruff-specific rules
"UP", # pyupgrade
"W", # pycodestyle warning
"SIM",
]
line-length = 110
target-version = "py38"
[tool.ruff.lint.isort]
known-first-party = ["vcr"]
[tool.ruff.isort]
known-first-party = [ "vcr" ]

View File

@@ -1,5 +1,7 @@
#!/bin/bash
# If you are getting an INVOCATION ERROR for this script then there is a good chance you are running on Windows.
# You can and should use WSL for running tests on Windows when it calls bash scripts.
# https://blog.ionelmc.ro/2015/04/14/tox-tricks-and-patterns/#when-it-inevitably-leads-to-shell-scripts
# If you are getting an INVOCATION ERROR for this script then there is
# a good chance you are running on Windows.
# You can and should use WSL for running tox on Windows when it calls bash scripts.
REQUESTS_CA_BUNDLE=`python3 -m pytest_httpbin.certs` exec pytest "$@"

View File

@@ -3,11 +3,12 @@
import codecs
import os
import re
from pathlib import Path
import sys
from setuptools import find_packages, setup
from setuptools.command.test import test as TestCommand
long_description = Path("README.rst").read_text()
long_description = open("README.rst").read()
here = os.path.abspath(os.path.dirname(__file__))
@@ -27,32 +28,53 @@ def find_version(*file_paths):
raise RuntimeError("Unable to find version string.")
class PyTest(TestCommand):
def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = []
self.test_suite = True
def run_tests(self):
# import here, cause outside the eggs aren't loaded
import pytest
errno = pytest.main(self.test_args)
sys.exit(errno)
install_requires = [
"PyYAML",
"wrapt",
"yarl",
# Support for urllib3 >=2 needs CPython >=3.10
# so we need to block urllib3 >=2 for Python <3.10 and PyPy for now.
# Note that vcrpy would work fine without any urllib3 around,
# so this block and the dependency can be dropped at some point
# in the future. For more Details:
# https://github.com/kevin1024/vcrpy/pull/699#issuecomment-1551439663
"urllib3 <2; python_version <'3.10'",
# https://github.com/kevin1024/vcrpy/pull/775#issuecomment-1847849962
"urllib3 <2; platform_python_implementation =='PyPy'",
]
extras_require = {
"tests": [
"aiohttp",
"boto3",
"cryptography",
"httpbin",
"httpcore",
"httplib2",
"httpx",
"pycurl; platform_python_implementation !='PyPy'",
"pytest",
"pytest-aiohttp",
"pytest-asyncio",
"pytest-cov",
"pytest-httpbin",
"requests>=2.22.0",
"tornado",
"urllib3",
"werkzeug==2.0.3",
],
}
tests_require = [
"aiohttp",
"boto3",
"httplib2",
"httpx",
"pytest",
"pytest-aiohttp",
"pytest-httpbin",
"requests>=2.16.2",
"tornado",
# Needed to un-break httpbin 0.10.1. For httpbin >=0.10.2,
# this cap and the dependency itself can be removed, provided
# that the related bug in httpbin has has been fixed in a new release:
# https://github.com/psf/httpbin/issues/28
# https://github.com/psf/httpbin/pull/29
# https://github.com/psf/httpbin/pull/37
"Werkzeug<3",
]
setup(
name="vcrpy",
@@ -64,21 +86,21 @@ setup(
author_email="me@kevinmccarthy.org",
url="https://github.com/kevin1024/vcrpy",
packages=find_packages(exclude=["tests*"]),
python_requires=">=3.10",
python_requires=">=3.8",
install_requires=install_requires,
license="MIT",
extras_require=extras_require,
tests_require=extras_require["tests"],
tests_require=tests_require,
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",

View File

View File

@@ -15,9 +15,9 @@
},
"response": {
"status": {
"message": "OK",
"message": "OK",
"code": 200
},
},
"headers": {
"access-control-allow-origin": ["*"],
"content-type": ["application/json"],
@@ -25,7 +25,7 @@
"server": ["gunicorn/0.17.4"],
"content-length": ["32"],
"connection": ["keep-alive"]
},
},
"body": {
"string": "{\n \"origin\": \"217.122.164.194\"\n}"
}

View File

@@ -2,7 +2,7 @@ version: 1
interactions:
- request:
body: null
headers:
headers:
accept: ['*/*']
accept-encoding: ['gzip, deflate, compress']
user-agent: ['python-requests/2.2.1 CPython/2.6.1 Darwin/10.8.0']

View File

@@ -1,31 +1,31 @@
[
{
"request": {
"body": null,
"protocol": "http",
"method": "GET",
"body": null,
"protocol": "http",
"method": "GET",
"headers": {
"accept-encoding": "gzip, deflate, compress",
"accept": "*/*",
"accept-encoding": "gzip, deflate, compress",
"accept": "*/*",
"user-agent": "python-requests/2.2.1 CPython/2.6.1 Darwin/10.8.0"
},
"host": "httpbin.org",
"path": "/ip",
},
"host": "httpbin.org",
"path": "/ip",
"port": 80
},
},
"response": {
"status": {
"message": "OK",
"message": "OK",
"code": 200
},
},
"headers": [
"access-control-allow-origin: *\r\n",
"content-type: application/json\r\n",
"date: Mon, 21 Apr 2014 23:13:40 GMT\r\n",
"server: gunicorn/0.17.4\r\n",
"content-length: 32\r\n",
"access-control-allow-origin: *\r\n",
"content-type: application/json\r\n",
"date: Mon, 21 Apr 2014 23:13:40 GMT\r\n",
"server: gunicorn/0.17.4\r\n",
"content-length: 32\r\n",
"connection: keep-alive\r\n"
],
],
"body": {
"string": "{\n \"origin\": \"217.122.164.194\"\n}"
}

View File

@@ -10,7 +10,7 @@ interactions:
uri: http://seomoz.org/
response:
body: {string: ''}
headers:
headers:
Location: ['http://moz.com/']
Server: ['BigIP']
Connection: ['Keep-Alive']

View File

@@ -5,24 +5,24 @@ import aiohttp
async def aiohttp_request(loop, method, url, output="text", encoding="utf-8", content_type=None, **kwargs):
async with aiohttp.ClientSession(loop=loop) as session:
response_ctx = session.request(method, url, **kwargs)
session = aiohttp.ClientSession(loop=loop)
response_ctx = session.request(method, url, **kwargs)
response = await response_ctx.__aenter__()
if output == "text":
content = await response.text()
elif output == "json":
content_type = content_type or "application/json"
content = await response.json(encoding=encoding, content_type=content_type)
elif output == "raw":
content = await response.read()
elif output == "stream":
content = await response.content.read()
response = await response_ctx.__aenter__()
if output == "text":
content = await response.text()
elif output == "json":
content_type = content_type or "application/json"
content = await response.json(encoding=encoding, content_type=content_type)
elif output == "raw":
content = await response.read()
elif output == "stream":
content = await response.content.read()
response_ctx._resp.close()
await session.close()
response_ctx._resp.close()
await session.close()
return response, content
return response, content
def aiohttp_app():

View File

@@ -1,41 +0,0 @@
interactions:
- request:
body: ''
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate, br
connection:
- keep-alive
host:
- httpbin.org
user-agent:
- python-httpx/0.23.0
method: GET
uri: https://httpbin.org/gzip
response:
content: "{\n \"gzipped\": true, \n \"headers\": {\n \"Accept\": \"*/*\",
\n \"Accept-Encoding\": \"gzip, deflate, br\", \n \"Host\": \"httpbin.org\",
\n \"User-Agent\": \"python-httpx/0.23.0\", \n \"X-Amzn-Trace-Id\": \"Root=1-62a62a8d-5f39b5c50c744da821d6ea99\"\n
\ }, \n \"method\": \"GET\", \n \"origin\": \"146.200.25.115\"\n}\n"
headers:
Access-Control-Allow-Credentials:
- 'true'
Access-Control-Allow-Origin:
- '*'
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Length:
- '230'
Content-Type:
- application/json
Date:
- Sun, 12 Jun 2022 18:03:57 GMT
Server:
- gunicorn/19.9.0
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -1,42 +0,0 @@
interactions:
- request:
body: null
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, br
Connection:
- keep-alive
User-Agent:
- python-requests/2.28.0
method: GET
uri: https://httpbin.org/gzip
response:
body:
string: !!binary |
H4sIAKwrpmIA/z2OSwrCMBCG956izLIkfQSxkl2RogfQA9R2bIM1iUkqaOndnYDIrGa+/zELDB9l
LfYgg5uRwYhtj86DXKDuOrQBJKR5Cuy38kZ3pld6oHu0sqTH29QGZMnVkepgtMYuKKNJcEe0vJ3U
C4mcjI9hpaiygqaUW7ETFYGLR8frAXXE9h1Go7nD54w++FxkYp8VsDJ4IBH6E47NmVzGqUHFkn8g
rJsvp2omYs8AAAA=
headers:
Access-Control-Allow-Credentials:
- 'true'
Access-Control-Allow-Origin:
- '*'
Connection:
- Close
Content-Encoding:
- gzip
Content-Length:
- '182'
Content-Type:
- application/json
Date:
- Sun, 12 Jun 2022 18:08:44 GMT
Server:
- Pytest-HTTPBIN/0.1.0
status:
code: 200
message: great
version: 1

View File

@@ -0,0 +1,16 @@
import os
import ssl
import pytest
@pytest.fixture
def httpbin_ssl_context():
ssl_ca_location = os.environ["REQUESTS_CA_BUNDLE"]
ssl_cert_location = os.environ["REQUESTS_CA_BUNDLE"].replace("cacert.pem", "cert.pem")
ssl_key_location = os.environ["REQUESTS_CA_BUNDLE"].replace("cacert.pem", "key.pem")
ssl_context = ssl.create_default_context(cafile=ssl_ca_location)
ssl_context.load_cert_chain(ssl_cert_location, ssl_key_location)
return ssl_context

View File

@@ -1,11 +1,8 @@
import io
import contextlib
import logging
import ssl
import urllib.parse
import pytest
import pytest_httpbin.certs
import yarl
import vcr
@@ -15,14 +12,12 @@ aiohttp = pytest.importorskip("aiohttp")
from .aiohttp_utils import aiohttp_app, aiohttp_request # noqa: E402
HTTPBIN_SSL_CONTEXT = ssl.create_default_context(cafile=pytest_httpbin.certs.where())
def run_in_loop(fn):
async def wrapper():
return await fn(asyncio.get_running_loop())
return asyncio.run(wrapper())
with contextlib.closing(asyncio.new_event_loop()) as loop:
asyncio.set_event_loop(loop)
task = loop.create_task(fn(loop))
return loop.run_until_complete(task)
def request(method, url, output="text", **kwargs):
@@ -194,11 +189,9 @@ def test_params_same_url_distinct_params(tmpdir, httpbin):
assert cassette.play_count == 1
other_params = {"other": "params"}
with (
vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette,
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
):
get(url, output="text", params=other_params)
with vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette:
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
get(url, output="text", params=other_params)
@pytest.mark.online
@@ -340,7 +333,7 @@ def test_double_requests(tmpdir, httpbin):
assert cassette.play_count == 2
def test_cookies(httpbin_both, tmpdir):
def test_cookies(httpbin_both, httpbin_ssl_context, tmpdir):
async def run(loop):
cookies_url = httpbin_both.url + (
"/response-headers?"
@@ -355,12 +348,12 @@ def test_cookies(httpbin_both, tmpdir):
# ------------------------- Record -------------------------- #
with vcr.use_cassette(tmp) as cassette:
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=HTTPBIN_SSL_CONTEXT)
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
home_resp = await session.get(
home_url,
cookies=req_cookies,
headers=req_headers,
ssl=HTTPBIN_SSL_CONTEXT,
ssl=httpbin_ssl_context,
)
assert cassette.play_count == 0
assert_responses(cookies_resp, home_resp)
@@ -368,12 +361,12 @@ def test_cookies(httpbin_both, tmpdir):
# -------------------------- Play --------------------------- #
with vcr.use_cassette(tmp, record_mode=vcr.mode.NONE) as cassette:
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=HTTPBIN_SSL_CONTEXT)
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
home_resp = await session.get(
home_url,
cookies=req_cookies,
headers=req_headers,
ssl=HTTPBIN_SSL_CONTEXT,
ssl=httpbin_ssl_context,
)
assert cassette.play_count == 2
assert_responses(cookies_resp, home_resp)
@@ -390,7 +383,7 @@ def test_cookies(httpbin_both, tmpdir):
run_in_loop(run)
def test_cookies_redirect(httpbin_both, tmpdir):
def test_cookies_redirect(httpbin_both, httpbin_ssl_context, tmpdir):
async def run(loop):
# Sets cookie as provided by the query string and redirects
cookies_url = httpbin_both.url + "/cookies/set?Cookie_1=Val_1"
@@ -399,9 +392,9 @@ def test_cookies_redirect(httpbin_both, tmpdir):
# ------------------------- Record -------------------------- #
with vcr.use_cassette(tmp) as cassette:
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=HTTPBIN_SSL_CONTEXT)
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
assert not cookies_resp.cookies
cookies = session.cookie_jar.filter_cookies(yarl.URL(cookies_url))
cookies = session.cookie_jar.filter_cookies(cookies_url)
assert cookies["Cookie_1"].value == "Val_1"
assert cassette.play_count == 0
@@ -410,9 +403,9 @@ def test_cookies_redirect(httpbin_both, tmpdir):
# -------------------------- Play --------------------------- #
with vcr.use_cassette(tmp, record_mode=vcr.mode.NONE) as cassette:
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=HTTPBIN_SSL_CONTEXT)
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
assert not cookies_resp.cookies
cookies = session.cookie_jar.filter_cookies(yarl.URL(cookies_url))
cookies = session.cookie_jar.filter_cookies(cookies_url)
assert cookies["Cookie_1"].value == "Val_1"
assert cassette.play_count == 2
@@ -424,9 +417,9 @@ def test_cookies_redirect(httpbin_both, tmpdir):
"Cookie_1=Val_1; Expires=Wed, 21 Oct 2015 07:28:00 GMT",
]
async with aiohttp.ClientSession(loop=loop, cookie_jar=aiohttp.CookieJar(unsafe=True)) as session:
cookies_resp = await session.get(cookies_url, ssl=HTTPBIN_SSL_CONTEXT)
cookies_resp = await session.get(cookies_url, ssl=httpbin_ssl_context)
assert not cookies_resp.cookies
cookies = session.cookie_jar.filter_cookies(yarl.URL(cookies_url))
cookies = session.cookie_jar.filter_cookies(cookies_url)
assert cookies["Cookie_1"].value == "Val_1"
run_in_loop(run)
@@ -463,19 +456,3 @@ def test_filter_query_parameters(tmpdir, httpbin):
cassette_content = f.read()
assert "password" not in cassette_content
assert "secret" not in cassette_content
@pytest.mark.online
def test_use_cassette_with_io(tmpdir, caplog, httpbin):
url = httpbin.url + "/post"
# test without cassettes
data = io.BytesIO(b"hello")
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"
# test with cassettes
data = io.BytesIO(b"hello")
with vcr.use_cassette(str(tmpdir.join("post.yaml"))):
_, response_json = request("POST", url, output="json", data=data)
assert response_json["data"] == "hello"

View File

@@ -39,7 +39,7 @@ def test_basic_json_use(tmpdir, httpbin):
test_fixture = str(tmpdir.join("synopsis.json"))
with vcr.use_cassette(test_fixture, serializer="json"):
response = urlopen(httpbin.url).read()
assert b"HTTP Request &amp; Response Service" in response
assert b"A simple HTTP Request &amp; Response Service." in response
def test_patched_content(tmpdir, httpbin):

View File

@@ -5,7 +5,6 @@ from urllib.request import urlopen
import pytest
import vcr
from vcr.cassette import Cassette
@pytest.mark.online
@@ -62,8 +61,9 @@ def test_override_match_on(tmpdir, httpbin):
def test_missing_matcher():
my_vcr = vcr.VCR()
my_vcr.register_matcher("awesome", object)
with pytest.raises(KeyError), my_vcr.use_cassette("test.yaml", match_on=["notawesome"]):
pass
with pytest.raises(KeyError):
with my_vcr.use_cassette("test.yaml", match_on=["notawesome"]):
pass
@pytest.mark.online
@@ -80,25 +80,8 @@ def test_dont_record_on_exception(tmpdir, httpbin):
assert not os.path.exists(str(tmpdir.join("dontsave.yml")))
# Make sure context decorator has the same behavior
with pytest.raises(AssertionError), my_vcr.use_cassette(str(tmpdir.join("dontsave2.yml"))):
assert b"Not in content" in urlopen(httpbin.url).read()
with pytest.raises(AssertionError):
with my_vcr.use_cassette(str(tmpdir.join("dontsave2.yml"))):
assert b"Not in content" in urlopen(httpbin.url).read()
assert not os.path.exists(str(tmpdir.join("dontsave2.yml")))
def test_set_drop_unused_requests(tmpdir, httpbin):
my_vcr = vcr.VCR(drop_unused_requests=True)
file = str(tmpdir.join("test.yaml"))
with my_vcr.use_cassette(file):
urlopen(httpbin.url)
urlopen(httpbin.url + "/get")
cassette = Cassette.load(path=file)
assert len(cassette) == 2
with my_vcr.use_cassette(file):
urlopen(httpbin.url)
cassette = Cassette.load(path=file)
assert len(cassette) == 1

View File

@@ -5,11 +5,9 @@ from urllib.parse import urlencode
from urllib.request import Request, urlopen
import pytest
from assertions import assert_cassette_has_one_response, assert_is_json_bytes
import vcr
from vcr.filters import brotli
from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes
def _request_with_auth(url, username, password):
@@ -139,22 +137,6 @@ def test_decompress_deflate(tmpdir, httpbin):
assert_is_json_bytes(decoded_response)
def test_decompress_brotli(tmpdir, httpbin):
if brotli is None:
# XXX: this is never true, because brotlipy is installed with "httpbin"
pytest.skip("Brotli is not installed")
url = httpbin.url + "/brotli"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
cass_file = str(tmpdir.join("brotli_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json_bytes(decoded_response)
def test_decompress_regular(tmpdir, httpbin):
"""Test that it doesn't try to decompress content that isn't compressed"""
url = httpbin.url + "/get"

View File

@@ -1,14 +1,12 @@
"""Integration tests with httplib2"""
from urllib.parse import urlencode
import pytest
import pytest_httpbin.certs
from assertions import assert_cassette_has_one_response
import vcr
from ..assertions import assert_cassette_has_one_response
httplib2 = pytest.importorskip("httplib2")

View File

@@ -1,11 +1,7 @@
import os
import pytest
import vcr
from ..assertions import assert_is_json_bytes
asyncio = pytest.importorskip("asyncio")
httpx = pytest.importorskip("httpx")
@@ -32,35 +28,24 @@ class DoSyncRequest(BaseDoRequest):
_client_class = httpx.Client
def __enter__(self):
self._client = self._make_client()
return self
def __exit__(self, *args):
self._client.close()
del self._client
pass
@property
def client(self):
try:
return self._client
except AttributeError as e:
raise ValueError('To access sync client, use "with do_request() as client"') from e
except AttributeError:
self._client = self._make_client()
return self._client
def __call__(self, *args, **kwargs):
if hasattr(self, "_client"):
return self.client.request(*args, timeout=60, **kwargs)
# Use one-time context and dispose of the client afterwards
with self:
return self.client.request(*args, timeout=60, **kwargs)
return self.client.request(*args, timeout=60, **kwargs)
def stream(self, *args, **kwargs):
if hasattr(self, "_client"):
with self.client.stream(*args, **kwargs) as response:
return b"".join(response.iter_bytes())
# Use one-time context and dispose of the client afterwards
with self, self.client.stream(*args, **kwargs) as response:
with self.client.stream(*args, **kwargs) as response:
return b"".join(response.iter_bytes())
@@ -194,11 +179,9 @@ def test_params_same_url_distinct_params(tmpdir, httpbin, do_request):
assert cassette.play_count == 1
params = {"other": "params"}
with (
vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette,
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
):
do_request()("GET", url, params=params, headers=headers)
with vcr.use_cassette(str(tmpdir.join("get.yaml"))) as cassette:
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
do_request()("GET", url, params=params, headers=headers)
@pytest.mark.online
@@ -224,6 +207,22 @@ def test_redirect(httpbin, yml, do_request):
assert cassette_response.request.headers.items() == response.request.headers.items()
@pytest.mark.online
def test_work_with_gzipped_data(httpbin, do_request, yml):
url = httpbin.url + "/gzip?foo=bar"
headers = {"accept-encoding": "deflate, gzip"}
with vcr.use_cassette(yml):
do_request(headers=headers)("GET", url)
with vcr.use_cassette(yml) as cassette:
cassette_response = do_request(headers=headers)("GET", url)
assert cassette_response.headers["content-encoding"] == "gzip"
assert cassette_response.read()
assert cassette.play_count == 1
@pytest.mark.online
@pytest.mark.parametrize("url", ["https://github.com/kevin1024/vcrpy/issues/" + str(i) for i in range(3, 6)])
def test_simple_fetching(do_request, yml, url):
@@ -286,77 +285,3 @@ def test_stream(tmpdir, httpbin, do_request):
assert cassette_content == response_content
assert len(cassette_content) == 512
assert cassette.play_count == 1
# Regular cassette formats support the status reason,
# but the old HTTPX cassette format does not.
@pytest.mark.parametrize(
"cassette_name,reason",
[
("requests", "great"),
("httpx_old_format", "OK"),
],
)
def test_load_cassette_format(do_request, cassette_name, reason):
mydir = os.path.dirname(os.path.realpath(__file__))
yml = f"{mydir}/cassettes/gzip_{cassette_name}.yaml"
url = "https://httpbin.org/gzip"
with vcr.use_cassette(yml) as cassette:
cassette_response = do_request()("GET", url)
assert str(cassette_response.request.url) == url
assert cassette.play_count == 1
# Should be able to load up the JSON inside,
# regardless whether the content is the gzipped
# in the cassette or not.
json = cassette_response.json()
assert json["method"] == "GET", json
assert cassette_response.status_code == 200
assert cassette_response.reason_phrase == reason
def test_gzip__decode_compressed_response_false(tmpdir, httpbin, do_request):
"""
Ensure that httpx is able to automatically decompress the response body.
"""
for _ in range(2): # one for recording, one for re-playing
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cassette:
response = do_request()("GET", httpbin + "/gzip")
assert response.headers["content-encoding"] == "gzip" # i.e. not removed
# The content stored in the cassette should be gzipped.
assert cassette.responses[0]["body"]["string"][:2] == b"\x1f\x8b"
assert_is_json_bytes(response.content) # i.e. uncompressed bytes
def test_gzip__decode_compressed_response_true(do_request, tmpdir, httpbin):
url = httpbin + "/gzip"
expected_response = do_request()("GET", url)
expected_content = expected_response.content
assert expected_response.headers["content-encoding"] == "gzip" # self-test
with vcr.use_cassette(
str(tmpdir.join("decode_compressed.yaml")),
decode_compressed_response=True,
) as cassette:
r = do_request()("GET", url)
assert r.headers["content-encoding"] == "gzip" # i.e. not removed
content_length = r.headers["content-length"]
assert r.content == expected_content
# Has the cassette body been decompressed?
cassette_response_body = cassette.responses[0]["body"]["string"]
assert isinstance(cassette_response_body, str)
# Content should be JSON.
assert cassette_response_body[0:1] == "{"
with vcr.use_cassette(str(tmpdir.join("decode_compressed.yaml")), decode_compressed_response=True):
r = httpx.get(url)
assert "content-encoding" not in r.headers # i.e. removed
assert r.content == expected_content
# As the content is uncompressed, it should have a bigger
# length than the compressed version.
assert r.headers["content-length"] > content_length

View File

@@ -51,11 +51,9 @@ def test_matchers(httpbin, httpbin_secure, cassette, matcher, matching_uri, not_
assert cass.play_count == 1
# play cassette with not matching on uri, it should fail
with (
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
vcr.use_cassette(cassette, match_on=[matcher]) as cass,
):
urlopen(not_matching_uri)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette, match_on=[matcher]) as cass:
urlopen(not_matching_uri)
def test_method_matcher(cassette, httpbin, httpbin_secure):
@@ -67,12 +65,10 @@ def test_method_matcher(cassette, httpbin, httpbin_secure):
assert cass.play_count == 1
# should fail if method does not match
with (
pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException),
vcr.use_cassette(cassette, match_on=["method"]) as cass,
):
# is a POST request
urlopen(default_uri, data=b"")
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette, match_on=["method"]) as cass:
# is a POST request
urlopen(default_uri, data=b"")
@pytest.mark.parametrize(
@@ -102,12 +98,14 @@ def test_default_matcher_matches(cassette, uri, httpbin, httpbin_secure):
)
def test_default_matcher_does_not_match(cassette, uri, httpbin, httpbin_secure):
uri = _replace_httpbin(uri, httpbin, httpbin_secure)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException), vcr.use_cassette(cassette):
urlopen(uri)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette):
urlopen(uri)
def test_default_matcher_does_not_match_on_method(cassette, httpbin, httpbin_secure):
default_uri = _replace_httpbin(DEFAULT_URI, httpbin, httpbin_secure)
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException), vcr.use_cassette(cassette):
# is a POST request
urlopen(default_uri, data=b"")
with pytest.raises(vcr.errors.CannotOverwriteExistingCassetteException):
with vcr.use_cassette(cassette):
# is a POST request
urlopen(default_uri, data=b"")

View File

@@ -1,6 +1,5 @@
"""Test using a proxy."""
import asyncio
import http.server
import socketserver
import threading
@@ -37,44 +36,15 @@ class Proxy(http.server.SimpleHTTPRequestHandler):
self.end_headers()
self.copyfile(upstream_response, self.wfile)
def do_CONNECT(self):
host, port = self.path.split(":")
asyncio.run(self._tunnel(host, port, self.connection))
async def _tunnel(self, host, port, client_sock):
target_r, target_w = await asyncio.open_connection(host=host, port=port)
self.send_response(http.HTTPStatus.OK)
self.end_headers()
source_r, source_w = await asyncio.open_connection(sock=client_sock)
async def channel(reader, writer):
while True:
data = await reader.read(1024)
if not data:
break
writer.write(data)
await writer.drain()
writer.close()
await writer.wait_closed()
await asyncio.gather(
channel(target_r, source_w),
channel(source_r, target_w),
)
@pytest.fixture(scope="session")
def proxy_server():
with socketserver.ThreadingTCPServer(("", 0), Proxy) as httpd:
proxy_process = threading.Thread(target=httpd.serve_forever)
proxy_process.start()
yield "http://{}:{}".format(*httpd.server_address)
httpd.shutdown()
proxy_process.join()
httpd = socketserver.ThreadingTCPServer(("", 0), Proxy)
proxy_process = threading.Thread(target=httpd.serve_forever)
proxy_process.start()
yield "http://{}:{}".format(*httpd.server_address)
httpd.shutdown()
proxy_process.join()
def test_use_proxy(tmpdir, httpbin, proxy_server):
@@ -82,26 +52,10 @@ def test_use_proxy(tmpdir, httpbin, proxy_server):
with vcr.use_cassette(str(tmpdir.join("proxy.yaml"))):
response = requests.get(httpbin.url, proxies={"http": proxy_server})
with vcr.use_cassette(str(tmpdir.join("proxy.yaml")), mode="none") as cassette:
with vcr.use_cassette(str(tmpdir.join("proxy.yaml")), mode="once") as cassette:
cassette_response = requests.get(httpbin.url, proxies={"http": proxy_server})
for key in set(cassette_response.headers.keys()) & set(response.headers.keys()):
assert cassette_response.headers[key] == response.headers[key]
assert cassette_response.headers == response.headers
assert cassette.play_count == 1
def test_use_https_proxy(tmpdir, httpbin_secure, proxy_server):
"""Ensure that it works with an HTTPS proxy."""
with vcr.use_cassette(str(tmpdir.join("proxy.yaml"))):
response = requests.get(httpbin_secure.url, proxies={"https": proxy_server})
with vcr.use_cassette(str(tmpdir.join("proxy.yaml")), mode="none") as cassette:
cassette_response = requests.get(
httpbin_secure.url,
proxies={"https": proxy_server},
)
assert cassette_response.headers == response.headers
assert cassette.play_count == 1
# The cassette URL points to httpbin, not the proxy
assert cassette.requests[0].url == httpbin_secure.url + "/"

View File

@@ -124,11 +124,9 @@ def test_none_record_mode(tmpdir, httpbin):
# Cassette file doesn't exist, yet we are trying to make a request.
# raise hell.
testfile = str(tmpdir.join("recordmode.yml"))
with (
vcr.use_cassette(testfile, record_mode=vcr.mode.NONE),
pytest.raises(CannotOverwriteExistingCassetteException),
):
urlopen(httpbin.url).read()
with vcr.use_cassette(testfile, record_mode=vcr.mode.NONE):
with pytest.raises(CannotOverwriteExistingCassetteException):
urlopen(httpbin.url).read()
def test_none_record_mode_with_existing_cassette(tmpdir, httpbin):

View File

@@ -66,7 +66,7 @@ def test_load_cassette_with_custom_persister(tmpdir, httpbin):
with my_vcr.use_cassette(test_fixture, serializer="json"):
response = urlopen(httpbin.url).read()
assert b"HTTP Request &amp; Response Service" in response
assert b"A simple HTTP Request &amp; Response Service." in response
def test_load_cassette_persister_exception_handling(tmpdir, httpbin):
@@ -83,5 +83,6 @@ def test_load_cassette_persister_exception_handling(tmpdir, httpbin):
with my_vcr.use_cassette("bad/encoding") as cass:
assert len(cass) == 0
with pytest.raises(ValueError), my_vcr.use_cassette("bad/buggy") as cass:
pass
with pytest.raises(ValueError):
with my_vcr.use_cassette("bad/buggy") as cass:
pass

View File

@@ -1,11 +1,9 @@
"""Test requests' interaction with vcr"""
import pytest
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from ..assertions import assert_cassette_empty, assert_is_json_bytes
requests = pytest.importorskip("requests")
@@ -266,7 +264,7 @@ def test_nested_cassettes_with_session_created_before_nesting(httpbin_both, tmpd
def test_post_file(tmpdir, httpbin_both):
"""Ensure that we handle posting a file."""
url = httpbin_both + "/post"
with vcr.use_cassette(str(tmpdir.join("post_file.yaml"))) as cass, open(".editorconfig", "rb") as f:
with vcr.use_cassette(str(tmpdir.join("post_file.yaml"))) as cass, open("tox.ini", "rb") as f:
original_response = requests.post(url, f).content
# This also tests that we do the right thing with matching the body when they are files.
@@ -274,10 +272,10 @@ def test_post_file(tmpdir, httpbin_both):
str(tmpdir.join("post_file.yaml")),
match_on=("method", "scheme", "host", "port", "path", "query", "body"),
) as cass:
with open(".editorconfig", "rb") as f:
editorconfig = f.read()
assert cass.requests[0].body.read() == editorconfig
with open(".editorconfig", "rb") as f:
with open("tox.ini", "rb") as f:
tox_content = f.read()
assert cass.requests[0].body.read() == tox_content
with open("tox.ini", "rb") as f:
new_response = requests.post(url, f).content
assert original_response == new_response

View File

@@ -2,9 +2,9 @@ import http.client as httplib
import json
import zlib
import vcr
from assertions import assert_is_json_bytes
from ..assertions import assert_is_json_bytes
import vcr
def _headers_are_case_insensitive(host, port):
@@ -66,7 +66,7 @@ def test_original_decoded_response_is_not_modified(tmpdir, httpbin):
# Assert that we do not modify the original response while appending
# to the cassette.
assert inside.headers["content-encoding"] == "gzip"
assert "gzip" == inside.headers["content-encoding"]
# They should effectively be the same response.
inside_headers = (h for h in inside.headers.items() if h[0].lower() != "date")
@@ -122,7 +122,7 @@ def test_original_response_is_not_modified_by_before_filter(tmpdir, httpbin):
# Furthermore, the responses should be identical.
inside_body = json.loads(inside.read())
outside_body = json.loads(outside.read())
assert inside_body[field_to_scrub] != replacement
assert not inside_body[field_to_scrub] == replacement
assert inside_body[field_to_scrub] == outside_body[field_to_scrub]
# Ensure that when a cassette exists, the scrubbed response is returned.

View File

@@ -1,60 +1,32 @@
"""Test requests' interaction with vcr"""
import asyncio
import functools
import inspect
import json
import os
import ssl
import pytest
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from vcr.errors import CannotOverwriteExistingCassetteException
from ..assertions import assert_cassette_empty, assert_is_json_bytes
tornado = pytest.importorskip("tornado")
gen = pytest.importorskip("tornado.gen")
http = pytest.importorskip("tornado.httpclient")
# whether the current version of Tornado supports the raise_error argument for
# fetch().
supports_raise_error = tornado.version_info >= (4,)
raise_error_for_response_code_only = tornado.version_info >= (6,)
def gen_test(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
async def coro():
return await gen.coroutine(func)(*args, **kwargs)
return asyncio.run(coro())
# Patch the signature so pytest can inject fixtures
# we can't use wrapt.decorator because it returns a generator function
wrapper.__signature__ = inspect.signature(func)
return wrapper
@pytest.fixture(params=["simple", "curl", "default"])
def get_client(request):
ca_bundle_path = os.environ.get("REQUESTS_CA_BUNDLE")
ssl_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations(cafile=ca_bundle_path)
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
if request.param == "simple":
from tornado import simple_httpclient as simple
return lambda: simple.SimpleAsyncHTTPClient(defaults={"ssl_options": ssl_ctx})
if request.param == "curl":
return lambda: simple.SimpleAsyncHTTPClient()
elif request.param == "curl":
curl = pytest.importorskip("tornado.curl_httpclient")
return lambda: curl.CurlAsyncHTTPClient(defaults={"ca_certs": ca_bundle_path})
return lambda: http.AsyncHTTPClient(defaults={"ssl_options": ssl_ctx})
return lambda: curl.CurlAsyncHTTPClient()
else:
return lambda: http.AsyncHTTPClient()
def get(client, url, **kwargs):
@@ -71,47 +43,44 @@ def post(client, url, data=None, **kwargs):
return client.fetch(http.HTTPRequest(url, method="POST", **kwargs))
@pytest.mark.online
@gen_test
def test_status_code(get_client, tmpdir, httpbin_both):
@pytest.mark.gen_test
def test_status_code(get_client, scheme, tmpdir):
"""Ensure that we can read the status code"""
url = httpbin_both.url
url = scheme + "://httpbin.org/"
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))):
status_code = (yield get(get_client(), url)).code
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))) as cass:
assert status_code == (yield get(get_client(), url)).code
assert cass.play_count == 1
assert 1 == cass.play_count
@pytest.mark.online
@gen_test
def test_headers(get_client, httpbin_both, tmpdir):
@pytest.mark.gen_test
def test_headers(get_client, scheme, tmpdir):
"""Ensure that we can read the headers back"""
url = httpbin_both.url
url = scheme + "://httpbin.org/"
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
headers = (yield get(get_client(), url)).headers
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))) as cass:
assert headers == (yield get(get_client(), url)).headers
assert cass.play_count == 1
assert 1 == cass.play_count
@pytest.mark.online
@gen_test
def test_body(get_client, tmpdir, httpbin_both):
@pytest.mark.gen_test
def test_body(get_client, tmpdir, scheme):
"""Ensure the responses are all identical enough"""
url = httpbin_both.url + "/bytes/1024"
url = scheme + "://httpbin.org/bytes/1024"
with vcr.use_cassette(str(tmpdir.join("body.yaml"))):
content = (yield get(get_client(), url)).body
with vcr.use_cassette(str(tmpdir.join("body.yaml"))) as cass:
assert content == (yield get(get_client(), url)).body
assert cass.play_count == 1
assert 1 == cass.play_count
@gen_test
@pytest.mark.gen_test
def test_effective_url(get_client, tmpdir, httpbin):
"""Ensure that the effective_url is captured"""
url = httpbin.url + "/redirect/1"
@@ -121,15 +90,14 @@ def test_effective_url(get_client, tmpdir, httpbin):
with vcr.use_cassette(str(tmpdir.join("url.yaml"))) as cass:
assert effective_url == (yield get(get_client(), url)).effective_url
assert cass.play_count == 1
assert 1 == cass.play_count
@pytest.mark.online
@gen_test
def test_auth(get_client, tmpdir, httpbin_both):
@pytest.mark.gen_test
def test_auth(get_client, tmpdir, scheme):
"""Ensure that we can handle basic auth"""
auth = ("user", "passwd")
url = httpbin_both.url + "/basic-auth/user/passwd"
url = scheme + "://httpbin.org/basic-auth/user/passwd"
with vcr.use_cassette(str(tmpdir.join("auth.yaml"))):
one = yield get(get_client(), url, auth_username=auth[0], auth_password=auth[1])
@@ -137,15 +105,14 @@ def test_auth(get_client, tmpdir, httpbin_both):
two = yield get(get_client(), url, auth_username=auth[0], auth_password=auth[1])
assert one.body == two.body
assert one.code == two.code
assert cass.play_count == 1
assert 1 == cass.play_count
@pytest.mark.online
@gen_test
def test_auth_failed(get_client, tmpdir, httpbin_both):
@pytest.mark.gen_test
def test_auth_failed(get_client, tmpdir, scheme):
"""Ensure that we can save failed auth statuses"""
auth = ("user", "wrongwrongwrong")
url = httpbin_both.url + "/basic-auth/user/passwd"
url = scheme + "://httpbin.org/basic-auth/user/passwd"
with vcr.use_cassette(str(tmpdir.join("auth-failed.yaml"))) as cass:
# Ensure that this is empty to begin with
assert_cassette_empty(cass)
@@ -161,15 +128,14 @@ def test_auth_failed(get_client, tmpdir, httpbin_both):
assert exc_info.value.code == 401
assert one.body == two.body
assert one.code == two.code == 401
assert cass.play_count == 1
assert 1 == cass.play_count
@pytest.mark.online
@gen_test
def test_post(get_client, tmpdir, httpbin_both):
@pytest.mark.gen_test
def test_post(get_client, tmpdir, scheme):
"""Ensure that we can post and cache the results"""
data = {"key1": "value1", "key2": "value2"}
url = httpbin_both.url + "/post"
url = scheme + "://httpbin.org/post"
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
req1 = (yield post(get_client(), url, data)).body
@@ -177,13 +143,13 @@ def test_post(get_client, tmpdir, httpbin_both):
req2 = (yield post(get_client(), url, data)).body
assert req1 == req2
assert cass.play_count == 1
assert 1 == cass.play_count
@gen_test
def test_redirects(get_client, tmpdir, httpbin):
@pytest.mark.gen_test
def test_redirects(get_client, tmpdir, scheme):
"""Ensure that we can handle redirects"""
url = httpbin + "/redirect-to?url=bytes/1024&status_code=301"
url = scheme + "://mockbin.org/redirect/301?url=bytes/1024"
with vcr.use_cassette(str(tmpdir.join("requests.yaml"))):
content = (yield get(get_client(), url)).body
@@ -192,38 +158,32 @@ def test_redirects(get_client, tmpdir, httpbin):
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_cross_scheme(get_client, tmpdir, httpbin, httpbin_secure):
@pytest.mark.gen_test
def test_cross_scheme(get_client, tmpdir, scheme):
"""Ensure that requests between schemes are treated separately"""
# First fetch a url under http, and then again under https and then
# ensure that we haven't served anything out of cache, and we have two
# requests / response pairs in the cassette
url = httpbin.url
url_secure = httpbin_secure.url
with vcr.use_cassette(str(tmpdir.join("cross_scheme.yaml"))) as cass:
yield get(get_client(), url)
yield get(get_client(), url_secure)
yield get(get_client(), "https://httpbin.org/")
yield get(get_client(), "http://httpbin.org/")
assert cass.play_count == 0
assert len(cass) == 2
# Then repeat the same requests and ensure both were replayed.
with vcr.use_cassette(str(tmpdir.join("cross_scheme.yaml"))) as cass:
yield get(get_client(), url)
yield get(get_client(), url_secure)
yield get(get_client(), "https://httpbin.org/")
yield get(get_client(), "http://httpbin.org/")
assert cass.play_count == 2
@pytest.mark.online
@gen_test
def test_gzip(get_client, tmpdir, httpbin_both):
@pytest.mark.gen_test
def test_gzip(get_client, tmpdir, scheme):
"""
Ensure that httpclient is able to automatically decompress the response
body
"""
url = httpbin_both + "/gzip"
url = scheme + "://httpbin.org/gzip"
# use_gzip was renamed to decompress_response in 4.0
kwargs = {}
@@ -239,26 +199,23 @@ def test_gzip(get_client, tmpdir, httpbin_both):
with vcr.use_cassette(str(tmpdir.join("gzip.yaml"))) as cass:
response = yield get(get_client(), url, **kwargs)
assert_is_json_bytes(response.body)
assert cass.play_count == 1
assert 1 == cass.play_count
@pytest.mark.online
@gen_test
def test_https_with_cert_validation_disabled(get_client, tmpdir, httpbin_secure):
@pytest.mark.gen_test
def test_https_with_cert_validation_disabled(get_client, tmpdir):
cass_path = str(tmpdir.join("cert_validation_disabled.yaml"))
url = httpbin_secure.url
with vcr.use_cassette(cass_path):
yield get(get_client(), url, validate_cert=False)
yield get(get_client(), "https://httpbin.org", validate_cert=False)
with vcr.use_cassette(cass_path) as cass:
yield get(get_client(), url, validate_cert=False)
assert cass.play_count == 1
yield get(get_client(), "https://httpbin.org", validate_cert=False)
assert 1 == cass.play_count
@gen_test
def test_unsupported_features_raises_in_future(get_client, tmpdir, httpbin):
@pytest.mark.gen_test
def test_unsupported_features_raises_in_future(get_client, tmpdir):
"""Ensure that the exception for an AsyncHTTPClient feature not being
supported is raised inside the future."""
@@ -266,7 +223,7 @@ def test_unsupported_features_raises_in_future(get_client, tmpdir, httpbin):
raise AssertionError("Did not expect to be called.")
with vcr.use_cassette(str(tmpdir.join("invalid.yaml"))):
future = get(get_client(), httpbin.url, streaming_callback=callback)
future = get(get_client(), "http://httpbin.org", streaming_callback=callback)
with pytest.raises(Exception) as excinfo:
yield future
@@ -275,11 +232,7 @@ def test_unsupported_features_raises_in_future(get_client, tmpdir, httpbin):
@pytest.mark.skipif(not supports_raise_error, reason="raise_error unavailable in tornado <= 3")
@pytest.mark.skipif(
raise_error_for_response_code_only,
reason="raise_error only ignores HTTPErrors due to response code",
)
@gen_test
@pytest.mark.gen_test
def test_unsupported_features_raise_error_disabled(get_client, tmpdir):
"""Ensure that the exception for an AsyncHTTPClient feature not being
supported is not raised if raise_error=False."""
@@ -298,53 +251,44 @@ def test_unsupported_features_raise_error_disabled(get_client, tmpdir):
assert "not yet supported by VCR" in str(response.error)
@pytest.mark.online
@gen_test
def test_cannot_overwrite_cassette_raises_in_future(get_client, tmpdir, httpbin):
@pytest.mark.gen_test
def test_cannot_overwrite_cassette_raises_in_future(get_client, tmpdir):
"""Ensure that CannotOverwriteExistingCassetteException is raised inside
the future."""
url = httpbin.url
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
yield get(get_client(), "http://httpbin.org/get")
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
yield get(get_client(), url + "/get")
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
future = get(get_client(), url + "/headers")
future = get(get_client(), "http://httpbin.org/headers")
with pytest.raises(CannotOverwriteExistingCassetteException):
yield future
@pytest.mark.skipif(not supports_raise_error, reason="raise_error unavailable in tornado <= 3")
@pytest.mark.skipif(
raise_error_for_response_code_only,
reason="raise_error only ignores HTTPErrors due to response code",
)
@gen_test
def test_cannot_overwrite_cassette_raise_error_disabled(get_client, tmpdir, httpbin):
@pytest.mark.gen_test
def test_cannot_overwrite_cassette_raise_error_disabled(get_client, tmpdir):
"""Ensure that CannotOverwriteExistingCassetteException is not raised if
raise_error=False in the fetch() call."""
url = httpbin.url
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
yield get(get_client(), "http://httpbin.org/get", raise_error=False)
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
yield get(get_client(), url + "/get", raise_error=False)
with vcr.use_cassette(str(tmpdir.join("overwrite.yaml"))):
response = yield get(get_client(), url + "/headers", raise_error=False)
response = yield get(get_client(), "http://httpbin.org/headers", raise_error=False)
assert isinstance(response.error, CannotOverwriteExistingCassetteException)
@gen_test
@pytest.mark.gen_test
@vcr.use_cassette(path_transformer=vcr.default_vcr.ensure_suffix(".yaml"))
def test_tornado_with_decorator_use_cassette(get_client):
response = yield get_client().fetch(http.HTTPRequest("http://www.google.com/", method="GET"))
assert response.body.decode("utf-8") == "not actually google"
@gen_test
@pytest.mark.gen_test
@vcr.use_cassette(path_transformer=vcr.default_vcr.ensure_suffix(".yaml"))
def test_tornado_exception_can_be_caught(get_client):
try:
@@ -358,53 +302,45 @@ def test_tornado_exception_can_be_caught(get_client):
assert e.code == 404
@pytest.mark.online
@gen_test
def test_existing_references_get_patched(tmpdir, httpbin):
@pytest.mark.gen_test
def test_existing_references_get_patched(tmpdir):
from tornado.httpclient import AsyncHTTPClient
url = httpbin.url + "/get"
with vcr.use_cassette(str(tmpdir.join("data.yaml"))):
client = AsyncHTTPClient()
yield get(client, url)
yield get(client, "http://httpbin.org/get")
with vcr.use_cassette(str(tmpdir.join("data.yaml"))) as cass:
yield get(client, url)
yield get(client, "http://httpbin.org/get")
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_existing_instances_get_patched(get_client, tmpdir, httpbin):
@pytest.mark.gen_test
def test_existing_instances_get_patched(get_client, tmpdir):
"""Ensure that existing instances of AsyncHTTPClient get patched upon
entering VCR context."""
url = httpbin.url + "/get"
client = get_client()
with vcr.use_cassette(str(tmpdir.join("data.yaml"))):
yield get(client, url)
yield get(client, "http://httpbin.org/get")
with vcr.use_cassette(str(tmpdir.join("data.yaml"))) as cass:
yield get(client, url)
yield get(client, "http://httpbin.org/get")
assert cass.play_count == 1
@pytest.mark.online
@gen_test
def test_request_time_is_set(get_client, tmpdir, httpbin):
@pytest.mark.gen_test
def test_request_time_is_set(get_client, tmpdir):
"""Ensures that the request_time on HTTPResponses is set."""
url = httpbin.url + "/get"
with vcr.use_cassette(str(tmpdir.join("data.yaml"))):
client = get_client()
response = yield get(client, url)
response = yield get(client, "http://httpbin.org/get")
assert response.request_time is not None
with vcr.use_cassette(str(tmpdir.join("data.yaml"))) as cass:
client = get_client()
response = yield get(client, url)
response = yield get(client, "http://httpbin.org/get")
assert response.request_time is not None
assert cass.play_count == 1

View File

@@ -0,0 +1,146 @@
"""Integration tests with urllib2"""
import ssl
from urllib.parse import urlencode
from urllib.request import urlopen
import pytest_httpbin.certs
from assertions import assert_cassette_has_one_response
from pytest import mark
# Internal imports
import vcr
def urlopen_with_cafile(*args, **kwargs):
context = ssl.create_default_context(cafile=pytest_httpbin.certs.where())
context.check_hostname = False
kwargs["context"] = context
try:
return urlopen(*args, **kwargs)
except TypeError:
# python2/pypi don't let us override this
del kwargs["cafile"]
return urlopen(*args, **kwargs)
def test_response_code(httpbin_both, tmpdir):
"""Ensure we can read a response code from a fetch"""
url = httpbin_both.url
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))):
code = urlopen_with_cafile(url).getcode()
with vcr.use_cassette(str(tmpdir.join("atts.yaml"))):
assert code == urlopen_with_cafile(url).getcode()
def test_random_body(httpbin_both, tmpdir):
"""Ensure we can read the content, and that it's served from cache"""
url = httpbin_both.url + "/bytes/1024"
with vcr.use_cassette(str(tmpdir.join("body.yaml"))):
body = urlopen_with_cafile(url).read()
with vcr.use_cassette(str(tmpdir.join("body.yaml"))):
assert body == urlopen_with_cafile(url).read()
def test_response_headers(httpbin_both, tmpdir):
"""Ensure we can get information from the response"""
url = httpbin_both.url
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
open1 = urlopen_with_cafile(url).info().items()
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
open2 = urlopen_with_cafile(url).info().items()
assert sorted(open1) == sorted(open2)
@mark.online
def test_effective_url(tmpdir, httpbin):
"""Ensure that the effective_url is captured"""
url = httpbin.url + "/redirect-to?url=.%2F&status_code=301"
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
effective_url = urlopen_with_cafile(url).geturl()
assert effective_url == httpbin.url + "/"
with vcr.use_cassette(str(tmpdir.join("headers.yaml"))):
assert effective_url == urlopen_with_cafile(url).geturl()
def test_multiple_requests(httpbin_both, tmpdir):
"""Ensure that we can cache multiple requests"""
urls = [httpbin_both.url, httpbin_both.url, httpbin_both.url + "/get", httpbin_both.url + "/bytes/1024"]
with vcr.use_cassette(str(tmpdir.join("multiple.yaml"))) as cass:
[urlopen_with_cafile(url) for url in urls]
assert len(cass) == len(urls)
def test_get_data(httpbin_both, tmpdir):
"""Ensure that it works with query data"""
data = urlencode({"some": 1, "data": "here"})
url = httpbin_both.url + "/get?" + data
with vcr.use_cassette(str(tmpdir.join("get_data.yaml"))):
res1 = urlopen_with_cafile(url).read()
with vcr.use_cassette(str(tmpdir.join("get_data.yaml"))):
res2 = urlopen_with_cafile(url).read()
assert res1 == res2
def test_post_data(httpbin_both, tmpdir):
"""Ensure that it works when posting data"""
data = urlencode({"some": 1, "data": "here"}).encode("utf-8")
url = httpbin_both.url + "/post"
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))):
res1 = urlopen_with_cafile(url, data).read()
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))) as cass:
res2 = urlopen_with_cafile(url, data).read()
assert len(cass) == 1
assert res1 == res2
assert_cassette_has_one_response(cass)
def test_post_unicode_data(httpbin_both, tmpdir):
"""Ensure that it works when posting unicode data"""
data = urlencode({"snowman": "".encode()}).encode("utf-8")
url = httpbin_both.url + "/post"
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))):
res1 = urlopen_with_cafile(url, data).read()
with vcr.use_cassette(str(tmpdir.join("post_data.yaml"))) as cass:
res2 = urlopen_with_cafile(url, data).read()
assert len(cass) == 1
assert res1 == res2
assert_cassette_has_one_response(cass)
def test_cross_scheme(tmpdir, httpbin_secure, httpbin):
"""Ensure that requests between schemes are treated separately"""
# First fetch a url under https, and then again under https and then
# ensure that we haven't served anything out of cache, and we have two
# requests / response pairs in the cassette
with vcr.use_cassette(str(tmpdir.join("cross_scheme.yaml"))) as cass:
urlopen_with_cafile(httpbin_secure.url)
urlopen_with_cafile(httpbin.url)
assert len(cass) == 2
assert cass.play_count == 0
def test_decorator(httpbin_both, tmpdir):
"""Test the decorator version of VCR.py"""
url = httpbin_both.url
@vcr.use_cassette(str(tmpdir.join("atts.yaml")))
def inner1():
return urlopen_with_cafile(url).getcode()
@vcr.use_cassette(str(tmpdir.join("atts.yaml")))
def inner2():
return urlopen_with_cafile(url).getcode()
assert inner1() == inner2()

View File

@@ -4,13 +4,12 @@
import pytest
import pytest_httpbin
from assertions import assert_cassette_empty, assert_is_json_bytes
import vcr
from vcr.patch import force_reset
from vcr.stubs.compat import get_headers
from ..assertions import assert_cassette_empty, assert_is_json_bytes
urllib3 = pytest.importorskip("urllib3")

View File

@@ -62,7 +62,8 @@ def test_flickr_should_respond_with_200(tmpdir):
def test_cookies(tmpdir, httpbin):
testfile = str(tmpdir.join("cookies.yml"))
with vcr.use_cassette(testfile), requests.Session() as s:
with vcr.use_cassette(testfile):
s = requests.Session()
s.get(httpbin.url + "/cookies/set?k1=v1&k2=v2")
assert s.cookies.keys() == ["k1", "k2"]

View File

@@ -11,7 +11,6 @@ import yaml
from vcr.cassette import Cassette
from vcr.errors import UnhandledHTTPRequestError
from vcr.patch import force_reset
from vcr.request import Request
from vcr.stubs import VCRHTTPSConnection
@@ -227,11 +226,9 @@ def test_nesting_cassette_context_managers(*args):
assert_get_response_body_is("first_response")
# Make sure a second cassette can supersede the first
with (
Cassette.use(path="test") as second_cassette,
mock.patch.object(second_cassette, "play_response", return_value=second_response),
):
assert_get_response_body_is("second_response")
with Cassette.use(path="test") as second_cassette:
with mock.patch.object(second_cassette, "play_response", return_value=second_response):
assert_get_response_body_is("second_response")
# Now the first cassette should be back in effect
assert_get_response_body_is("first_response")
@@ -413,25 +410,3 @@ def test_find_requests_with_most_matches_many_similar_requests(mock_get_matchers
(1, ["method", "path"], [("query", "failed : query")]),
(3, ["method", "path"], [("query", "failed : query")]),
]
def test_used_interactions(tmpdir):
interactions = [
{"request": {"body": "", "uri": "foo1", "method": "GET", "headers": {}}, "response": "bar1"},
{"request": {"body": "", "uri": "foo2", "method": "GET", "headers": {}}, "response": "bar2"},
{"request": {"body": "", "uri": "foo3", "method": "GET", "headers": {}}, "response": "bar3"},
]
file = tmpdir.join("test_cassette.yml")
file.write(yaml.dump({"interactions": [interactions[0], interactions[1]]}))
cassette = Cassette.load(path=str(file))
request = Request._from_dict(interactions[1]["request"])
cassette.play_response(request)
assert len(cassette._played_interactions) < len(cassette._old_interactions)
request = Request._from_dict(interactions[2]["request"])
cassette.append(request, interactions[2]["response"])
assert len(cassette._new_interactions()) == 1
used_interactions = cassette._played_interactions + cassette._new_interactions()
assert len(used_interactions) == 2

View File

@@ -8,13 +8,15 @@ from vcr.serializers import compat, jsonserializer, yamlserializer
def test_deserialize_old_yaml_cassette():
with open("tests/fixtures/migration/old_cassette.yaml") as f, pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
with open("tests/fixtures/migration/old_cassette.yaml") as f:
with pytest.raises(ValueError):
deserialize(f.read(), yamlserializer)
def test_deserialize_old_json_cassette():
with open("tests/fixtures/migration/old_cassette.json") as f, pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
with open("tests/fixtures/migration/old_cassette.json") as f:
with pytest.raises(ValueError):
deserialize(f.read(), jsonserializer)
def test_deserialize_new_yaml_cassette():
@@ -74,7 +76,7 @@ def test_deserialize_py2py3_yaml_cassette(tmpdir, req_body, expect):
cfile = tmpdir.join("test_cassette.yaml")
cfile.write(REQBODY_TEMPLATE.format(req_body=req_body))
with open(str(cfile)) as f:
(requests, _) = deserialize(f.read(), yamlserializer)
(requests, responses) = deserialize(f.read(), yamlserializer)
assert requests[0].body == expect

View File

@@ -1,12 +1,8 @@
import contextlib
import http.client as httplib
from io import BytesIO
from tempfile import NamedTemporaryFile
from unittest import mock
from pytest import mark
from vcr import mode, use_cassette
from vcr import mode
from vcr.cassette import Cassette
from vcr.stubs import VCRHTTPSConnection
@@ -20,56 +16,7 @@ class TestVCRConnection:
@mark.online
@mock.patch("vcr.cassette.Cassette.can_play_response_for", return_value=False)
def testing_connect(*args):
with contextlib.closing(VCRHTTPSConnection("www.google.com")) as vcr_connection:
vcr_connection.cassette = Cassette("test", record_mode=mode.ALL)
vcr_connection.real_connection.connect()
assert vcr_connection.real_connection.sock is not None
def test_body_consumed_once_stream(self, tmpdir, httpbin):
self._test_body_consumed_once(
tmpdir,
httpbin,
BytesIO(b"1234567890"),
BytesIO(b"9876543210"),
BytesIO(b"9876543210"),
)
def test_body_consumed_once_iterator(self, tmpdir, httpbin):
self._test_body_consumed_once(
tmpdir,
httpbin,
iter([b"1234567890"]),
iter([b"9876543210"]),
iter([b"9876543210"]),
)
# data2 and data3 should serve the same data, potentially as iterators
def _test_body_consumed_once(
self,
tmpdir,
httpbin,
data1,
data2,
data3,
):
with NamedTemporaryFile(dir=tmpdir, suffix=".yml") as f:
testpath = f.name
# NOTE: ``use_cassette`` is not okay with the file existing
# already. So we using ``.close()`` to not only
# close but also delete the empty file, before we start.
f.close()
host, port = httpbin.host, httpbin.port
match_on = ["method", "uri", "body"]
with use_cassette(testpath, match_on=match_on):
conn1 = httplib.HTTPConnection(host, port)
conn1.request("POST", "/anything", body=data1)
conn1.getresponse()
conn2 = httplib.HTTPConnection(host, port)
conn2.request("POST", "/anything", body=data2)
conn2.getresponse()
with use_cassette(testpath, match_on=match_on) as cass:
conn3 = httplib.HTTPConnection(host, port)
conn3.request("POST", "/anything", body=data3)
conn3.getresponse()
assert cass.play_counts[0] == 0
assert cass.play_counts[1] == 1
vcr_connection = VCRHTTPSConnection("www.google.com")
vcr_connection.cassette = Cassette("test", record_mode=mode.ALL)
vcr_connection.real_connection.connect()
assert vcr_connection.real_connection.sock is not None

View File

@@ -178,7 +178,7 @@ def test_testcase_playback(tmpdir):
return str(cassette_dir)
test = run_testcase(MyTest)[0][0]
assert b"Example Domain" in test.response
assert b"illustrative examples" in test.response
assert len(test.cassette.requests) == 1
assert test.cassette.play_count == 0
@@ -186,7 +186,7 @@ def test_testcase_playback(tmpdir):
test2 = run_testcase(MyTest)[0][0]
assert test.cassette is not test2.cassette
assert b"Example Domain" in test.response
assert b"illustrative examples" in test.response
assert len(test2.cassette.requests) == 1
assert test2.cassette.play_count == 1

View File

@@ -1,33 +0,0 @@
from io import BytesIO, StringIO
import pytest
from vcr import request
from vcr.util import read_body
@pytest.mark.parametrize(
"input_, expected_output",
[
(BytesIO(b"Stream"), b"Stream"),
(StringIO("Stream"), b"Stream"),
(iter(["StringIter"]), b"StringIter"),
(iter(["String", "Iter"]), b"StringIter"),
(iter([b"BytesIter"]), b"BytesIter"),
(iter([b"Bytes", b"Iter"]), b"BytesIter"),
(iter([70, 111, 111]), b"Foo"),
(iter([]), b""),
("String", b"String"),
(b"Bytes", b"Bytes"),
],
)
def test_read_body(input_, expected_output):
r = request.Request("POST", "http://host.com/", input_, {})
assert read_body(r) == expected_output
def test_unsupported_read_body():
r = request.Request("POST", "http://host.com/", iter([[]]), {})
with pytest.raises(ValueError) as excinfo:
assert read_body(r)
assert excinfo.value.args == ("Body type <class 'list'> not supported",)

View File

@@ -372,19 +372,3 @@ def test_path_class_as_cassette():
)
with use_cassette(path):
pass
def test_use_cassette_generator_return():
ret_val = object()
vcr = VCR()
@vcr.use_cassette("test")
def gen():
return ret_val
yield
with pytest.raises(StopIteration) as exc_info:
next(gen())
assert exc_info.value.value is ret_val

87
tox.ini Normal file
View File

@@ -0,0 +1,87 @@
[tox]
skip_missing_interpreters=true
envlist =
cov-clean,
lint,
{py38,py39,py310,py311,py312}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3,aiohttp,httpx},
{py310,py311,py312}-{requests-urllib3-2,urllib3-2},
{pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},
#{py310}-httpx019,
cov-report
[gh-actions]
python =
3.8: py38
3.9: py39
3.10: py310, lint
3.11: py311
3.12: py312
pypy-3: pypy3
# Coverage environment tasks: cov-clean and cov-report
# https://pytest-cov.readthedocs.io/en/latest/tox.html
[testenv:cov-clean]
deps = coverage
skip_install=true
commands = coverage erase
[testenv:cov-report]
deps = coverage
skip_install=true
commands =
coverage html
coverage report --fail-under=90
[testenv:lint]
skipsdist = True
commands =
black --version
black --check --diff .
ruff --version
ruff check .
deps =
black
ruff
basepython = python3.10
[testenv]
# Need to use develop install so that paths
# for aggregate code coverage combine
usedevelop=true
commands =
./runtests.sh --cov=./vcr --cov-branch --cov-report=xml --cov-append {posargs}
allowlist_externals =
./runtests.sh
deps =
Werkzeug<3
pytest
pytest-httpbin>=1.0.1
pytest-cov
PyYAML
ipaddress
requests: requests>=2.22.0
httplib2: httplib2
urllib3-1: urllib3<2
urllib3-2: urllib3<3
boto3: boto3
aiohttp: aiohttp
aiohttp: pytest-asyncio
aiohttp: pytest-aiohttp
httpx: httpx
{py38,py39,py310}-{httpx}: httpx
{py38,py39,py310}-{httpx}: pytest-asyncio
httpx: httpx>0.19
httpx019: httpx==0.19
{py38,py39,py310}-{httpx}: pytest-asyncio
depends =
lint,{py38,py39,py310,py311,py312,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311,py312}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311,py312}-{aiohttp},{py38,py39,py310,py311,py312}-{httpx}: cov-clean
cov-report: lint,{py38,py39,py310,py311,py312,pypy3}-{requests-urllib3-1,httplib2,urllib3-1,tornado4,boto3},{py310,py311,py312}-{requests-urllib3-2,urllib3-2},{py38,py39,py310,py311,py312}-{aiohttp}
passenv =
AWS_ACCESS_KEY_ID
AWS_DEFAULT_REGION
AWS_SECRET_ACCESS_KEY
setenv =
# workaround for broken C extension in aiohttp
# see: https://github.com/aio-libs/aiohttp/issues/7229
py312: AIOHTTP_NO_EXTENSIONS=1

View File

@@ -4,7 +4,7 @@ from logging import NullHandler
from .config import VCR
from .record_mode import RecordMode as mode # noqa: F401
__version__ = "8.0.0"
__version__ = "5.1.0"
logging.getLogger(__name__).addHandler(NullHandler())

View File

@@ -3,7 +3,8 @@ import contextlib
import copy
import inspect
import logging
from inspect import iscoroutinefunction
import sys
from asyncio import iscoroutinefunction
import wrapt
@@ -125,7 +126,20 @@ class CassetteContextDecorator:
duration of the generator.
"""
with self as cassette:
return (yield from fn(cassette))
coroutine = fn(cassette)
# We don't need to catch StopIteration. The caller (Tornado's
# gen.coroutine, for example) will handle that.
to_yield = next(coroutine)
while True:
try:
to_send = yield to_yield
except Exception:
to_yield = coroutine.throw(*sys.exc_info())
else:
try:
to_yield = coroutine.send(to_send)
except StopIteration:
break
def _handle_function(self, fn):
with self as cassette:
@@ -177,7 +191,6 @@ class Cassette:
custom_patches=(),
inject=False,
allow_playback_repeats=False,
drop_unused_requests=False,
):
self._persister = persister or FilesystemPersister
self._path = path
@@ -190,7 +203,6 @@ class Cassette:
self.record_mode = record_mode
self.custom_patches = custom_patches
self.allow_playback_repeats = allow_playback_repeats
self.drop_unused_requests = drop_unused_requests
# self.data is the list of (req, resp) tuples
self.data = []
@@ -198,10 +210,6 @@ class Cassette:
self.dirty = False
self.rewound = False
# Subsets of self.data to store old and played interactions
self._old_interactions = []
self._played_interactions = []
@property
def play_count(self):
return sum(self.play_counts.values())
@@ -221,14 +229,14 @@ class Cassette:
@property
def write_protected(self):
return (self.rewound and self.record_mode == RecordMode.ONCE) or self.record_mode == RecordMode.NONE
return self.rewound and self.record_mode == RecordMode.ONCE or self.record_mode == RecordMode.NONE
def append(self, request, response):
"""Add a request, response pair to this cassette"""
log.info("Appending request %s and response %s", request, response)
request = self._before_record_request(request)
if not request:
return
log.info("Appending request %s and response %s", request, response)
# Deepcopy is here because mutation of `response` will corrupt the
# real response.
response = copy.deepcopy(response)
@@ -263,7 +271,6 @@ class Cassette:
for index, response in self._responses(request):
if self.play_counts[index] == 0 or self.allow_playback_repeats:
self.play_counts[index] += 1
self._played_interactions.append((request, response))
return response
# The cassette doesn't contain the request asked for.
raise UnhandledHTTPRequestError(
@@ -324,44 +331,19 @@ class Cassette:
return final_best_matches
def _new_interactions(self):
"""List of new HTTP interactions (request/response tuples)"""
new_interactions = []
for request, response in self.data:
if all(
not requests_match(request, old_request, self._match_on)
for old_request, _ in self._old_interactions
):
new_interactions.append((request, response))
return new_interactions
def _as_dict(self):
return {"requests": self.requests, "responses": self.responses}
def _build_used_interactions_dict(self):
interactions = self._played_interactions + self._new_interactions()
cassete_dict = {
"requests": [request for request, _ in interactions],
"responses": [response for _, response in interactions],
}
return cassete_dict
def _save(self, force=False):
if self.drop_unused_requests and len(self._played_interactions) < len(self._old_interactions):
cassete_dict = self._build_used_interactions_dict()
force = True
else:
cassete_dict = self._as_dict()
if force or self.dirty:
self._persister.save_cassette(self._path, cassete_dict, serializer=self._serializer)
self._persister.save_cassette(self._path, self._as_dict(), serializer=self._serializer)
self.dirty = False
def _load(self):
try:
requests, responses = self._persister.load_cassette(self._path, serializer=self._serializer)
for request, response in zip(requests, responses, strict=False):
for request, response in zip(requests, responses):
self.append(request, response)
self._old_interactions.append((request, response))
self.dirty = False
self.rewound = True
except (CassetteDecodeError, CassetteNotFoundError):

View File

@@ -48,7 +48,6 @@ class VCR:
func_path_generator=None,
decode_compressed_response=False,
record_on_exception=True,
drop_unused_requests=False,
):
self.serializer = serializer
self.match_on = match_on
@@ -82,7 +81,6 @@ class VCR:
self.decode_compressed_response = decode_compressed_response
self.record_on_exception = record_on_exception
self._custom_patches = tuple(custom_patches)
self.drop_unused_requests = drop_unused_requests
def _get_serializer(self, serializer_name):
try:
@@ -153,7 +151,6 @@ class VCR:
"func_path_generator": func_path_generator,
"allow_playback_repeats": kwargs.get("allow_playback_repeats", False),
"record_on_exception": record_on_exception,
"drop_unused_requests": kwargs.get("drop_unused_requests", self.drop_unused_requests),
}
path = kwargs.get("path")
if path:

View File

@@ -6,49 +6,6 @@ from urllib.parse import urlencode, urlparse, urlunparse
from .util import CaseInsensitiveDict
try:
# This supports both brotli & brotlipy packages
import brotli
except ImportError:
try:
import brotlicffi as brotli
except ImportError:
brotli = None
def decompress_deflate(body):
try:
return zlib.decompress(body)
except zlib.error:
# Assume the response was already decompressed
return body
def decompress_gzip(body):
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS = {
"deflate": decompress_deflate,
"gzip": decompress_gzip,
}
if brotli is not None:
def decompress_brotli(body):
try:
return brotli.decompress(body)
except brotli.error:
# Assume the response was already decompressed
return body
AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli
def replace_headers(request, replacements):
"""Replace headers in request according to replacements.
@@ -179,30 +136,45 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):
def decode_response(response):
"""
If the response is compressed with any supported compression (gzip,
deflate, br if available):
If the response is compressed with gzip or deflate:
1. decompress the response body
2. delete the content-encoding header
3. update content-length header to decompressed length
"""
def is_compressed(headers):
encoding = headers.get("content-encoding", [])
return encoding and encoding[0] in ("gzip", "deflate")
def decompress_body(body, encoding):
"""Returns decompressed body according to encoding using zlib.
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
"""
if not body:
return ""
if encoding == "gzip":
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed
# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.
response = copy.deepcopy(response)
headers = CaseInsensitiveDict(response["headers"])
content_encoding = headers.get("content-encoding")
if not content_encoding:
return response
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
if not decompressor:
return response
if is_compressed(headers):
encoding = headers["content-encoding"][0]
headers["content-encoding"].remove(encoding)
if not headers["content-encoding"]:
del headers["content-encoding"]
headers["content-encoding"].remove(content_encoding[0])
if not headers["content-encoding"]:
del headers["content-encoding"]
new_body = decompressor(response["body"]["string"])
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
new_body = decompress_body(response["body"]["string"], encoding)
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
return response

View File

@@ -3,10 +3,11 @@ import logging
import urllib
import xmlrpc.client
from string import hexdigits
from typing import List, Set
from .util import read_body
_HEXDIG_CODE_POINTS: set[int] = {ord(s.encode("ascii")) for s in hexdigits}
_HEXDIG_CODE_POINTS: Set[int] = {ord(s.encode("ascii")) for s in hexdigits}
log = logging.getLogger(__name__)
@@ -108,7 +109,7 @@ def _dechunk(body):
CHUNK_GAP = b"\r\n"
BODY_LEN: int = len(body)
chunks: list[bytes] = []
chunks: List[bytes] = []
pos: int = 0
while True:
@@ -162,7 +163,7 @@ def _get_transformers(request):
def requests_match(r1, r2, matchers):
_, failures = get_matchers_results(r1, r2, matchers)
successes, failures = get_matchers_results(r1, r2, matchers)
if failures:
log.debug(f"Requests {r1} and {r2} differ.\nFailure details:\n{failures}")
return len(failures) == 0

View File

@@ -1,5 +1,4 @@
"""Utilities for patching in cassettes"""
import contextlib
import functools
import http.client as httplib
@@ -92,12 +91,12 @@ else:
try:
import httpcore
import httpx
except ImportError: # pragma: no cover
pass
else:
_HttpcoreConnectionPool_handle_request = httpcore.ConnectionPool.handle_request
_HttpcoreAsyncConnectionPool_handle_async_request = httpcore.AsyncConnectionPool.handle_async_request
_HttpxSyncClient_send_single_request = httpx.Client._send_single_request
_HttpxAsyncClient_send_single_request = httpx.AsyncClient._send_single_request
class CassettePatcherBuilder:
@@ -121,7 +120,7 @@ class CassettePatcherBuilder:
self._httplib2(),
self._tornado(),
self._aiohttp(),
self._httpcore(),
self._httpx(),
self._build_patchers_from_mock_triples(self._cassette.custom_patches),
)
@@ -261,14 +260,10 @@ class CassettePatcherBuilder:
yield cpool, "HTTPConnectionWithTimeout", VCRHTTPConnectionWithTimeout
yield cpool, "HTTPSConnectionWithTimeout", VCRHTTPSConnectionWithTimeout
yield (
cpool,
"SCHEME_TO_CONNECTION",
{
"http": VCRHTTPConnectionWithTimeout,
"https": VCRHTTPSConnectionWithTimeout,
},
)
yield cpool, "SCHEME_TO_CONNECTION", {
"http": VCRHTTPConnectionWithTimeout,
"https": VCRHTTPSConnectionWithTimeout,
}
@_build_patchers_from_mock_triples_decorator
def _tornado(self):
@@ -304,22 +299,19 @@ class CassettePatcherBuilder:
yield client.ClientSession, "_request", new_request
@_build_patchers_from_mock_triples_decorator
def _httpcore(self):
def _httpx(self):
try:
import httpcore
import httpx
except ImportError: # pragma: no cover
return
else:
from .stubs.httpcore_stubs import vcr_handle_async_request, vcr_handle_request
from .stubs.httpx_stubs import async_vcr_send, sync_vcr_send
new_handle_async_request = vcr_handle_async_request(
self._cassette,
_HttpcoreAsyncConnectionPool_handle_async_request,
)
yield httpcore.AsyncConnectionPool, "handle_async_request", new_handle_async_request
new_async_client_send = async_vcr_send(self._cassette, _HttpxAsyncClient_send_single_request)
yield httpx.AsyncClient, "_send_single_request", new_async_client_send
new_handle_request = vcr_handle_request(self._cassette, _HttpcoreConnectionPool_handle_request)
yield httpcore.ConnectionPool, "handle_request", new_handle_request
new_sync_client_send = sync_vcr_send(self._cassette, _HttpxSyncClient_send_single_request)
yield httpx.Client, "_send_single_request", new_sync_client_send
def _urllib3_patchers(self, cpool, conn, stubs):
http_connection_remover = ConnectionRemover(
@@ -376,6 +368,10 @@ class ConnectionRemover:
if isinstance(connection, self._connection_class):
self._connection_pool_to_connections.setdefault(pool, set()).add(connection)
def remove_connection_to_pool_entry(self, pool, connection):
if isinstance(connection, self._connection_class):
self._connection_pool_to_connections[self._connection_class].remove(connection)
def __enter__(self):
return self
@@ -386,13 +382,10 @@ class ConnectionRemover:
connection = pool.pool.get()
if isinstance(connection, self._connection_class):
connections.remove(connection)
connection.close()
else:
readd_connections.append(connection)
for connection in readd_connections:
pool._put_conn(connection)
for connection in connections:
connection.close()
def reset_patchers():

View File

@@ -1,10 +1,9 @@
import logging
import warnings
from contextlib import suppress
from io import BytesIO
from urllib.parse import parse_qsl, urlparse
from .util import CaseInsensitiveDict, _is_nonsequence_iterator
from .util import CaseInsensitiveDict
log = logging.getLogger(__name__)
@@ -18,30 +17,13 @@ class Request:
self.method = method
self.uri = uri
self._was_file = hasattr(body, "read")
self._was_iter = _is_nonsequence_iterator(body)
if self._was_file:
if hasattr(body, "tell"):
tell = body.tell()
self.body = body.read()
body.seek(tell)
else:
self.body = body.read()
elif self._was_iter:
self.body = list(body)
self.body = body.read()
else:
self.body = body
self.headers = headers
log.debug("Invoking Request %s", self.uri)
@property
def uri(self):
return self._uri
@uri.setter
def uri(self, uri):
self._uri = uri
self.parsed_uri = urlparse(uri)
@property
def headers(self):
return self._headers
@@ -54,11 +36,7 @@ class Request:
@property
def body(self):
if self._was_file:
return BytesIO(self._body)
if self._was_iter:
return iter(self._body)
return self._body
return BytesIO(self._body) if self._was_file else self._body
@body.setter
def body(self, value):
@@ -76,28 +54,30 @@ class Request:
@property
def scheme(self):
return self.parsed_uri.scheme
return urlparse(self.uri).scheme
@property
def host(self):
return self.parsed_uri.hostname
return urlparse(self.uri).hostname
@property
def port(self):
port = self.parsed_uri.port
parse_uri = urlparse(self.uri)
port = parse_uri.port
if port is None:
with suppress(KeyError):
port = {"https": 443, "http": 80}[self.parsed_uri.scheme]
try:
port = {"https": 443, "http": 80}[parse_uri.scheme]
except KeyError:
pass
return port
@property
def path(self):
return self.parsed_uri.path
return urlparse(self.uri).path
@property
def query(self):
q = self.parsed_uri.query
q = urlparse(self.uri).query
return sorted(parse_qsl(q))
# alias for backwards compatibility

View File

@@ -53,7 +53,7 @@ def serialize(cassette_dict, serializer):
"request": compat.convert_to_unicode(request._to_dict()),
"response": compat.convert_to_unicode(response),
}
for request, response in zip(cassette_dict["requests"], cassette_dict["responses"], strict=False)
for request, response in zip(cassette_dict["requests"], cassette_dict["responses"])
]
data = {"version": CASSETTE_FORMAT_VERSION, "interactions": interactions}
return serializer.serialize(data)

View File

@@ -1,7 +1,6 @@
"""Stubs for patching HTTP and HTTPS requests"""
import logging
from contextlib import suppress
from http.client import HTTPConnection, HTTPResponse, HTTPSConnection
from io import BytesIO
@@ -67,7 +66,6 @@ class VCRHTTPResponse(HTTPResponse):
self.reason = recorded_response["status"]["message"]
self.status = self.code = recorded_response["status"]["code"]
self.version = None
self.version_string = None
self._content = BytesIO(self.recorded_response["body"]["string"])
self._closed = False
self._original_response = self # for requests.session.Session cookie extraction
@@ -78,7 +76,7 @@ class VCRHTTPResponse(HTTPResponse):
# libraries trying to process a chunked response. By removing the
# transfer-encoding: chunked header, this should cause the downstream
# libraries to process this as a non-chunked response.
te_key = [h for h in headers if h.upper() == "TRANSFER-ENCODING"]
te_key = [h for h in headers.keys() if h.upper() == "TRANSFER-ENCODING"]
if te_key:
del headers[te_key[0]]
self.headers = self.msg = parse_headers(headers)
@@ -188,34 +186,22 @@ class VCRConnection:
"""
Returns empty string for the default port and ':port' otherwise
"""
port = (
self.real_connection.port
if not self.real_connection._tunnel_host
else self.real_connection._tunnel_port
)
port = self.real_connection.port
default_port = {"https": 443, "http": 80}[self._protocol]
return f":{port}" if port != default_port else ""
def _real_host(self):
"""Returns the request host"""
if self.real_connection._tunnel_host:
# The real connection is to an HTTPS proxy
return self.real_connection._tunnel_host
else:
return self.real_connection.host
def _uri(self, url):
"""Returns request absolute URI"""
if url and not url.startswith("/"):
# Then this must be a proxy request.
return url
uri = f"{self._protocol}://{self._real_host()}{self._port_postfix()}{url}"
uri = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}{url}"
log.debug("Absolute URI: %s", uri)
return uri
def _url(self, uri):
"""Returns request selector url from absolute URI"""
prefix = f"{self._protocol}://{self._real_host()}{self._port_postfix()}"
prefix = f"{self._protocol}://{self.real_connection.host}{self._port_postfix()}"
return uri.replace(prefix, "", 1)
def request(self, method, url, body=None, headers=None, *args, **kwargs):
@@ -371,8 +357,12 @@ class VCRConnection:
TODO: Separately setting the attribute on the two instances is not
ideal. We should switch to a proxying implementation.
"""
with suppress(AttributeError):
try:
setattr(self.real_connection, name, value)
except AttributeError:
# raised if real_connection has not been set yet, such as when
# we're setting the real_connection itself for the first time
pass
super().__setattr__(name, value)

View File

@@ -1,11 +1,10 @@
"""Stubs for aiohttp HTTP clients"""
import asyncio
import functools
import json
import logging
from collections.abc import Mapping
from http.cookies import CookieError, Morsel, SimpleCookie
from typing import Mapping, Union
from aiohttp import ClientConnectionError, ClientResponse, CookieJar, RequestInfo, hdrs, streams
from aiohttp.helpers import strip_auth_from_url
@@ -229,7 +228,7 @@ def _build_cookie_header(session, cookies, cookie_header, url):
return c.output(header="", sep=";").strip()
def _build_url_with_params(url_str: str, params: Mapping[str, str | int | float]) -> URL:
def _build_url_with_params(url_str: str, params: Mapping[str, Union[str, int, float]]) -> URL:
# This code is basically a copy&paste of aiohttp.
# https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_reqrep.py#L225
url = URL(url_str)

View File

@@ -1,5 +1,4 @@
"""Stubs for boto3"""
from botocore.awsrequest import AWSHTTPConnection as HTTPConnection
from botocore.awsrequest import AWSHTTPSConnection as VerifiedHTTPSConnection

View File

@@ -1,215 +0,0 @@
import asyncio
import functools
import logging
from collections import defaultdict
from collections.abc import AsyncIterable, Iterable
from httpcore import Response
from httpcore._models import ByteStream
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.filters import decode_response
from vcr.request import Request as VcrRequest
from vcr.serializers.compat import convert_body_to_bytes
_logger = logging.getLogger(__name__)
async def _convert_byte_stream(stream):
if isinstance(stream, Iterable):
return list(stream)
if isinstance(stream, AsyncIterable):
return [part async for part in stream]
raise TypeError(
f"_convert_byte_stream: stream must be Iterable or AsyncIterable, got {type(stream).__name__}",
)
def _serialize_headers(real_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore serialize every header key to a list of values.
"""
headers = defaultdict(list)
for name, value in real_response.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))
return dict(headers)
async def _serialize_response(real_response):
# The reason_phrase may not exist
try:
reason_phrase = real_response.extensions["reason_phrase"].decode("ascii")
except KeyError:
reason_phrase = None
# Reading the response stream consumes the iterator, so we need to restore it afterwards
content = b"".join(await _convert_byte_stream(real_response.stream))
real_response.stream = ByteStream(content)
return {
"status": {"code": real_response.status, "message": reason_phrase},
"headers": _serialize_headers(real_response),
"body": {"string": content},
}
def _deserialize_headers(headers):
"""
httpcore accepts headers as list of tuples of header key and value.
"""
return [
(name.encode("ascii"), value.encode("ascii")) for name, values in headers.items() for value in values
]
def _deserialize_response(vcr_response):
# Cassette format generated for HTTPX requests by older versions of
# vcrpy. We restructure the content to resemble what a regular
# cassette looks like.
if "status_code" in vcr_response:
vcr_response = decode_response(
convert_body_to_bytes(
{
"headers": vcr_response["headers"],
"body": {"string": vcr_response["content"]},
"status": {"code": vcr_response["status_code"]},
},
),
)
extensions = None
else:
extensions = (
{"reason_phrase": vcr_response["status"]["message"].encode("ascii")}
if vcr_response["status"]["message"]
else None
)
return Response(
vcr_response["status"]["code"],
headers=_deserialize_headers(vcr_response["headers"]),
content=vcr_response["body"]["string"],
extensions=extensions,
)
async def _make_vcr_request(real_request):
# Reading the request stream consumes the iterator, so we need to restore it afterwards
body = b"".join(await _convert_byte_stream(real_request.stream))
real_request.stream = ByteStream(body)
uri = bytes(real_request.url).decode("ascii")
# As per HTTPX: If there are multiple headers with the same key, then we concatenate them with commas
headers = defaultdict(list)
for name, value in real_request.headers:
headers[name.decode("ascii")].append(value.decode("ascii"))
headers = {name: ", ".join(values) for name, values in headers.items()}
return VcrRequest(real_request.method.decode("ascii"), uri, body, headers)
async def _vcr_request(cassette, real_request):
vcr_request = await _make_vcr_request(real_request)
if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, vcr_request)
if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(
cassette=cassette,
failed_request=vcr_request,
)
_logger.info("%s not in cassette, sending to real server", vcr_request)
return vcr_request, None
async def _record_responses(cassette, vcr_request, real_response):
cassette.append(vcr_request, await _serialize_response(real_response))
def _play_responses(cassette, vcr_request):
vcr_response = cassette.play_response(vcr_request)
real_response = _deserialize_response(vcr_response)
return real_response
async def _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
):
vcr_request, vcr_response = await _vcr_request(cassette, real_request)
if vcr_response:
return vcr_response
real_response = await real_handle_async_request(self, real_request)
await _record_responses(cassette, vcr_request, real_response)
return real_response
def vcr_handle_async_request(cassette, real_handle_async_request):
@functools.wraps(real_handle_async_request)
def _inner_handle_async_request(self, real_request):
return _vcr_handle_async_request(
cassette,
real_handle_async_request,
self,
real_request,
)
return _inner_handle_async_request
def _run_async_function(sync_func, *args, **kwargs):
"""
Safely run an asynchronous function from a synchronous context.
Handles both cases:
- An event loop is already running.
- No event loop exists yet.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(sync_func(*args, **kwargs))
else:
# If inside a running loop, create a task and wait for it
return asyncio.ensure_future(sync_func(*args, **kwargs))
def _vcr_handle_request(cassette, real_handle_request, self, real_request):
vcr_request, vcr_response = _run_async_function(
_vcr_request,
cassette,
real_request,
)
if vcr_response:
return vcr_response
real_response = real_handle_request(self, real_request)
_run_async_function(_record_responses, cassette, vcr_request, real_response)
return real_response
def vcr_handle_request(cassette, real_handle_request):
@functools.wraps(real_handle_request)
def _inner_handle_request(self, real_request):
return _vcr_handle_request(cassette, real_handle_request, self, real_request)
return _inner_handle_request

151
vcr/stubs/httpx_stubs.py Normal file
View File

@@ -0,0 +1,151 @@
import functools
import inspect
import logging
from unittest.mock import MagicMock, patch
import httpx
from vcr.errors import CannotOverwriteExistingCassetteException
from vcr.request import Request as VcrRequest
_httpx_signature = inspect.signature(httpx.Client.request)
try:
HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["follow_redirects"]
except KeyError:
HTTPX_REDIRECT_PARAM = _httpx_signature.parameters["allow_redirects"]
_logger = logging.getLogger(__name__)
def _transform_headers(httpx_response):
"""
Some headers can appear multiple times, like "Set-Cookie".
Therefore transform to every header key to list of values.
"""
out = {}
for key, var in httpx_response.headers.raw:
decoded_key = key.decode("utf-8")
out.setdefault(decoded_key, [])
out[decoded_key].append(var.decode("utf-8"))
return out
def _to_serialized_response(httpx_response):
return {
"status_code": httpx_response.status_code,
"http_version": httpx_response.http_version,
"headers": _transform_headers(httpx_response),
"content": httpx_response.content,
}
def _from_serialized_headers(headers):
"""
httpx accepts headers as list of tuples of header key and value.
"""
header_list = []
for key, values in headers.items():
for v in values:
header_list.append((key, v))
return header_list
@patch("httpx.Response.close", MagicMock())
@patch("httpx.Response.read", MagicMock())
def _from_serialized_response(request, serialized_response, history=None):
content = serialized_response.get("content")
response = httpx.Response(
status_code=serialized_response.get("status_code"),
request=request,
headers=_from_serialized_headers(serialized_response.get("headers")),
content=content,
history=history or [],
)
response._content = content
return response
def _make_vcr_request(httpx_request, **kwargs):
body = httpx_request.read().decode("utf-8")
uri = str(httpx_request.url)
headers = dict(httpx_request.headers)
return VcrRequest(httpx_request.method, uri, body, headers)
def _shared_vcr_send(cassette, real_send, *args, **kwargs):
real_request = args[1]
vcr_request = _make_vcr_request(real_request, **kwargs)
if cassette.can_play_response_for(vcr_request):
return vcr_request, _play_responses(cassette, real_request, vcr_request, args[0], kwargs)
if cassette.write_protected and cassette.filter_request(vcr_request):
raise CannotOverwriteExistingCassetteException(cassette=cassette, failed_request=vcr_request)
_logger.info("%s not in cassette, sending to real server", vcr_request)
return vcr_request, None
def _record_responses(cassette, vcr_request, real_response):
for past_real_response in real_response.history:
past_vcr_request = _make_vcr_request(past_real_response.request)
cassette.append(past_vcr_request, _to_serialized_response(past_real_response))
if real_response.history:
# If there was a redirection keep we want the request which will hold the
# final redirect value
vcr_request = _make_vcr_request(real_response.request)
cassette.append(vcr_request, _to_serialized_response(real_response))
return real_response
def _play_responses(cassette, request, vcr_request, client, kwargs):
vcr_response = cassette.play_response(vcr_request)
response = _from_serialized_response(request, vcr_response)
return response
async def _async_vcr_send(cassette, real_send, *args, **kwargs):
vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs)
if response:
# add cookies from response to session cookie store
args[0].cookies.extract_cookies(response)
return response
real_response = await real_send(*args, **kwargs)
await real_response.aread()
return _record_responses(cassette, vcr_request, real_response)
def async_vcr_send(cassette, real_send):
@functools.wraps(real_send)
def _inner_send(*args, **kwargs):
return _async_vcr_send(cassette, real_send, *args, **kwargs)
return _inner_send
def _sync_vcr_send(cassette, real_send, *args, **kwargs):
vcr_request, response = _shared_vcr_send(cassette, real_send, *args, **kwargs)
if response:
# add cookies from response to session cookie store
args[0].cookies.extract_cookies(response)
return response
real_response = real_send(*args, **kwargs)
real_response.read()
return _record_responses(cassette, vcr_request, real_response)
def sync_vcr_send(cassette, real_send):
@functools.wraps(real_send)
def _inner_send(*args, **kwargs):
return _sync_vcr_send(cassette, real_send, *args, **kwargs)
return _inner_send

View File

@@ -1,5 +1,4 @@
"""Stubs for tornado HTTP clients"""
import functools
from io import BytesIO
@@ -74,7 +73,7 @@ def vcr_fetch_impl(cassette, real_fetch_impl):
return callback(response)
def new_callback(response):
headers = [(k, response.headers.get_list(k)) for k in response.headers]
headers = [(k, response.headers.get_list(k)) for k in response.headers.keys()]
vcr_response = {
"status": {"code": response.code, "message": response.reason},

View File

@@ -89,28 +89,9 @@ def compose(*functions):
return composed
def _is_nonsequence_iterator(obj):
return hasattr(obj, "__iter__") and not isinstance(
obj,
(bytearray, bytes, dict, list, str),
)
def read_body(request):
if hasattr(request.body, "read"):
return request.body.read()
if _is_nonsequence_iterator(request.body):
body = list(request.body)
if body:
if isinstance(body[0], str):
return "".join(body).encode("utf-8")
elif isinstance(body[0], (bytes, bytearray)):
return b"".join(body)
elif isinstance(body[0], int):
return bytes(body)
else:
raise ValueError(f"Body type {type(body[0])} not supported")
return b""
return request.body