mirror of
https://github.com/kevin1024/vcrpy.git
synced 2025-12-08 16:53:23 +00:00
Add full support for Boto
Before this change, vcrpy would not work with modules of Boto (e.g., boto.iam) that use Boto's CertValidatingHTTPSConnection to connect to AWS (unless you went through the extra effort of disabling certificate validation during the tests). This change adds support for those modules.
This commit is contained in:
@@ -1,7 +1,10 @@
|
||||
import pytest
|
||||
boto = pytest.importorskip("boto")
|
||||
import boto
|
||||
import boto.iam
|
||||
from boto.s3.connection import S3Connection
|
||||
from boto.s3.key import Key
|
||||
from ConfigParser import DuplicateSectionError
|
||||
import vcr
|
||||
|
||||
def test_boto_stubs(tmpdir):
|
||||
@@ -9,6 +12,9 @@ def test_boto_stubs(tmpdir):
|
||||
# Perform the imports within the patched context so that
|
||||
# CertValidatingHTTPSConnection refers to the patched version.
|
||||
from boto.https_connection import CertValidatingHTTPSConnection
|
||||
from vcr.stubs.boto_stubs import VCRCertValidatingHTTPSConnection
|
||||
# Prove that the class was patched by the stub and that we can instantiate it.
|
||||
assert CertValidatingHTTPSConnection is VCRCertValidatingHTTPSConnection
|
||||
CertValidatingHTTPSConnection('hostname.does.not.matter')
|
||||
|
||||
def test_boto_without_vcr():
|
||||
@@ -46,3 +52,21 @@ def test_boto_hardcore_mode(tmpdir):
|
||||
k = Key(s3_bucket)
|
||||
k.key = 'test.txt'
|
||||
k.set_contents_from_string('hello world i am a string')
|
||||
|
||||
def test_boto_iam(tmpdir):
|
||||
try:
|
||||
boto.config.add_section('Boto')
|
||||
except DuplicateSectionError:
|
||||
pass
|
||||
# Ensure that boto uses HTTPS
|
||||
boto.config.set('Boto', 'is_secure', 'true')
|
||||
# Ensure that boto uses CertValidatingHTTPSConnection
|
||||
boto.config.set('Boto', 'https_validate_certificates', 'true')
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('boto-iam.yml'))) as cass:
|
||||
iam_conn = boto.iam.connect_to_region('universal')
|
||||
iam_conn.get_all_users()
|
||||
|
||||
with vcr.use_cassette(str(tmpdir.join('boto-iam.yml'))) as cass:
|
||||
iam_conn = boto.iam.connect_to_region('universal')
|
||||
iam_conn.get_all_users()
|
||||
|
||||
22
vcr/patch.py
22
vcr/patch.py
@@ -33,6 +33,13 @@ try:
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
|
||||
try:
|
||||
# Try to save the original types for boto
|
||||
import boto.https_connection
|
||||
_CertValidatingHTTPSConnection = boto.https_connection.CertValidatingHTTPSConnection
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
|
||||
|
||||
def install(cassette):
|
||||
"""
|
||||
@@ -86,6 +93,15 @@ def install(cassette):
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
|
||||
# patch boto
|
||||
try:
|
||||
import boto.https_connection as cpool
|
||||
from .stubs.boto_stubs import VCRCertValidatingHTTPSConnection
|
||||
cpool.CertValidatingHTTPSConnection = VCRCertValidatingHTTPSConnection
|
||||
cpool.CertValidatingHTTPSConnection.cassette = cassette
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
|
||||
|
||||
def reset():
|
||||
'''Undo all the patching'''
|
||||
@@ -120,3 +136,9 @@ def reset():
|
||||
cpool.SCHEME_TO_CONNECTION = _SCHEME_TO_CONNECTION
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
|
||||
try:
|
||||
import boto.https_connection as cpool
|
||||
cpool.CertValidatingHTTPSConnection = _CertValidatingHTTPSConnection
|
||||
except ImportError: # pragma: no cover
|
||||
pass
|
||||
|
||||
8
vcr/stubs/boto_stubs.py
Normal file
8
vcr/stubs/boto_stubs.py
Normal file
@@ -0,0 +1,8 @@
|
||||
'''Stubs for boto'''
|
||||
|
||||
from boto.https_connection import CertValidatingHTTPSConnection
|
||||
from ..stubs import VCRHTTPSConnection
|
||||
|
||||
|
||||
class VCRCertValidatingHTTPSConnection(VCRHTTPSConnection):
|
||||
_baseclass = CertValidatingHTTPSConnection
|
||||
Reference in New Issue
Block a user