txaws-dev team mailing list archive
-
txaws-dev team
-
Mailing list archive
-
Message #00090
[Merge] lp:~dreamhosters/txaws/920309-fix-ca-certs into lp:txaws
Duncan McGreggor has proposed merging lp:~dreamhosters/txaws/920309-fix-ca-certs into lp:txaws.
Requested reviews:
txAWS Technical List (txaws-tech)
Related bugs:
Bug #920309 in txAWS: "test_ssl_hostname_verification unit test error"
https://bugs.launchpad.net/txaws/+bug/920309
For more details, see:
https://code.launchpad.net/~dreamhosters/txaws/920309-fix-ca-certs/+merge/90325
--
https://code.launchpad.net/~dreamhosters/txaws/920309-fix-ca-certs/+merge/90325
Your team txAWS Technical List is requested to review the proposed merge of lp:~dreamhosters/txaws/920309-fix-ca-certs into lp:txaws.
=== modified file 'txaws/client/ssl.py'
--- txaws/client/ssl.py 2011-11-29 18:42:09 +0000
+++ txaws/client/ssl.py 2012-01-26 20:29:23 +0000
@@ -1,16 +1,28 @@
from glob import glob
import os
import re
+import sys
from OpenSSL import SSL
from OpenSSL.crypto import load_certificate, FILETYPE_PEM
from twisted.internet.ssl import CertificateOptions
+from txaws import exception
+
__all__ = ["VerifyingContextFactory", "get_ca_certs"]
+# Multiple defaults are supported; just add more paths, separated by colons.
+if sys.platform == "darwin":
+ DEFAULT_CERTS_PATH = "/System/Library/OpenSSL/certs/:"
+# XXX Windows users can file a bug to add theirs, since we don't know what
+# the right path is
+else:
+ DEFAULT_CERTS_PATH = "/etc/ssl/certs/:"
+
+
class VerifyingContextFactory(CertificateOptions):
"""
A SSL context factory to pass to C{connectSSL} to check for hostname
@@ -71,22 +83,37 @@
return context
-def get_ca_certs(files="/etc/ssl/certs/*.pem"):
- """Retrieve a list of CAs pointed by C{files}."""
- certificateAuthorityMap = {}
- for certFileName in glob(files):
- # There might be some dead symlinks in there, so let's make sure it's
- # real.
- if not os.path.exists(certFileName):
- continue
- certFile = open(certFileName)
- data = certFile.read()
- certFile.close()
- x509 = load_certificate(FILETYPE_PEM, data)
- digest = x509.digest("sha1")
- # Now, de-duplicate in case the same cert has multiple names.
- certificateAuthorityMap[digest] = x509
- return certificateAuthorityMap.values()
+def get_ca_certs():
+ """
+ Retrieve a list of CAs pointed by C{files}.
+
+ In order to find .pem files, this function checks first for presence of the
+ CERTS_PATH environment variable that should point to a directory containing
+ cert files. In the absense of this variable, the module-level
+ DEFAULT_CERTS_PATH will be used instead.
+
+ Note that both of these variables have have multiple paths in them, just
+ like the familiar PATH environment variable (separated by colons).
+ """
+ cert_paths = os.getenv("CERTS_PATH", DEFAULT_CERTS_PATH).split(":")
+ certificate_authority_map = {}
+ for path in cert_paths:
+ for cert_file_name in glob(os.path.join(path, "*.pem")):
+ # There might be some dead symlinks in there, so let's make sure
+ # it's real.
+ if not os.path.exists(cert_file_name):
+ continue
+ cert_file = open(cert_file_name)
+ data = cert_file.read()
+ cert_file.close()
+ x509 = load_certificate(FILETYPE_PEM, data)
+ digest = x509.digest("sha1")
+ # Now, de-duplicate in case the same cert has multiple names.
+ certificate_authority_map[digest] = x509
+ values = certificate_authority_map.values()
+ if len(values) == 0:
+ raise exception.CertsNotFoundError("Could not find any .pem files.")
+ return values
_ca_certs = None
=== renamed file 'txaws/client/tests/test_client.py' => 'txaws/client/tests/test_base.py'
--- txaws/client/tests/test_client.py 2011-11-29 18:47:00 +0000
+++ txaws/client/tests/test_base.py 2012-01-26 20:29:23 +0000
@@ -1,38 +1,22 @@
import os
-from OpenSSL.crypto import load_certificate, FILETYPE_PEM
-from OpenSSL.SSL import Error as SSLError
-from OpenSSL.version import __version__ as pyopenssl_version
-
from twisted.internet import reactor
-from twisted.internet.ssl import DefaultOpenSSLContextFactory
from twisted.internet.error import ConnectionRefusedError
from twisted.protocols.policies import WrappingFactory
from twisted.python import log
from twisted.python.filepath import FilePath
from twisted.python.failure import Failure
+from twisted.test.test_sslverify import makeCertificate
from twisted.web import server, static
from twisted.web.client import HTTPClientFactory
from twisted.web.error import Error as TwistedWebError
+from txaws.client import ssl
from txaws.client.base import BaseClient, BaseQuery, error_wrapper
-from txaws.client.ssl import VerifyingContextFactory
from txaws.service import AWSServiceEndpoint
from txaws.testing.base import TXAWSTestCase
-def sibpath(path):
- return os.path.join(os.path.dirname(__file__), path)
-
-
-PRIVKEY = sibpath("private.ssl")
-PUBKEY = sibpath("public.ssl")
-BADPRIVKEY = sibpath("badprivate.ssl")
-BADPUBKEY = sibpath("badpublic.ssl")
-PRIVSANKEY = sibpath("private_san.ssl")
-PUBSANKEY = sibpath("public_san.ssl")
-
-
class ErrorWrapperTestCase(TXAWSTestCase):
def test_204_no_content(self):
@@ -168,6 +152,9 @@
d.addCallback(query.get_response_headers)
return d.addCallback(check_results)
+ # XXX for systems that don't have certs in the DEFAULT_CERT_PATH, this test
+ # will fail; instead, let's create some certs in a temp directory and set
+ # the DEFAULT_CERT_PATH to point there.
def test_ssl_hostname_verification(self):
"""
If the endpoint passed to L{BaseQuery} has C{ssl_hostname_verification}
@@ -183,6 +170,8 @@
def connectSSL(self, host, port, client, factory):
self.connects.append((host, port, client, factory))
+ certs = makeCertificate(O="Test Certificate", CN="something")[1]
+ self.patch(ssl, "_ca_certs", certs)
fake_reactor = FakeReactor()
endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
query = BaseQuery("an action", "creds", endpoint, fake_reactor)
@@ -190,112 +179,6 @@
[(host, port, client, factory)] = fake_reactor.connects
self.assertEqual("example.com", host)
self.assertEqual(443, port)
- self.assertTrue(isinstance(factory, VerifyingContextFactory))
+ self.assertTrue(isinstance(factory, ssl.VerifyingContextFactory))
self.assertEqual("example.com", factory.host)
self.assertNotEqual([], factory.caCerts)
-
-
-class BaseQuerySSLTestCase(TXAWSTestCase):
-
- def setUp(self):
- self.cleanupServerConnections = 0
- name = self.mktemp()
- os.mkdir(name)
- FilePath(name).child("file").setContent("0123456789")
- r = static.File(name)
- self.site = server.Site(r, timeout=None)
- self.wrapper = WrappingFactory(self.site)
- from txaws.client import ssl
- pub_key = file(PUBKEY)
- pub_key_data = pub_key.read()
- pub_key.close()
- pub_key_san = file(PUBSANKEY)
- pub_key_san_data = pub_key_san.read()
- pub_key_san.close()
- ssl._ca_certs = [load_certificate(FILETYPE_PEM, pub_key_data),
- load_certificate(FILETYPE_PEM, pub_key_san_data)]
-
- def tearDown(self):
- from txaws.client import ssl
- ssl._ca_certs = None
- # If the test indicated it might leave some server-side connections
- # around, clean them up.
- connections = self.wrapper.protocols.keys()
- # If there are fewer server-side connections than requested,
- # that's okay. Some might have noticed that the client closed
- # the connection and cleaned up after themselves.
- for n in range(min(len(connections), self.cleanupServerConnections)):
- proto = connections.pop()
- log.msg("Closing %r" % (proto,))
- proto.transport.loseConnection()
- if connections:
- log.msg("Some left-over connections; this test is probably buggy.")
- return self.port.stopListening()
-
- def _get_url(self, path):
- return "https://localhost:%d/%s" % (self.portno, path)
-
- def test_ssl_verification_positive(self):
- """
- The L{VerifyingContextFactory} properly allows to connect to the
- endpoint if the certificates match.
- """
- context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
- self.port = reactor.listenSSL(
- 0, self.site, context_factory, interface="127.0.0.1")
- self.portno = self.port.getHost().port
-
- endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
- query = BaseQuery("an action", "creds", endpoint)
- d = query.get_page(self._get_url("file"))
- return d.addCallback(self.assertEquals, "0123456789")
-
- def test_ssl_verification_negative(self):
- """
- The L{VerifyingContextFactory} fails with a SSL error the certificates
- can't be checked.
- """
- context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
- self.port = reactor.listenSSL(
- 0, self.site, context_factory, interface="127.0.0.1")
- self.portno = self.port.getHost().port
-
- endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
- query = BaseQuery("an action", "creds", endpoint)
- d = query.get_page(self._get_url("file"))
- return self.assertFailure(d, SSLError)
-
- def test_ssl_verification_bypassed(self):
- """
- L{BaseQuery} doesn't use L{VerifyingContextFactory}
- if C{ssl_hostname_verification} is C{False}, thus allowing to connect
- to non-secure endpoints.
- """
- context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
- self.port = reactor.listenSSL(
- 0, self.site, context_factory, interface="127.0.0.1")
- self.portno = self.port.getHost().port
-
- endpoint = AWSServiceEndpoint(ssl_hostname_verification=False)
- query = BaseQuery("an action", "creds", endpoint)
- d = query.get_page(self._get_url("file"))
- return d.addCallback(self.assertEquals, "0123456789")
-
- def test_ssl_subject_alt_name(self):
- """
- L{VerifyingContextFactory} supports checking C{subjectAltName} in the
- certificate if it's available.
- """
- context_factory = DefaultOpenSSLContextFactory(PRIVSANKEY, PUBSANKEY)
- self.port = reactor.listenSSL(
- 0, self.site, context_factory, interface="127.0.0.1")
- self.portno = self.port.getHost().port
-
- endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
- query = BaseQuery("an action", "creds", endpoint)
- d = query.get_page("https://127.0.0.1:%d/file" % (self.portno,))
- return d.addCallback(self.assertEquals, "0123456789")
-
- if pyopenssl_version < "0.12":
- test_ssl_subject_alt_name.skip = (
- "subjectAltName not supported by older PyOpenSSL")
=== added file 'txaws/client/tests/test_ssl.py'
--- txaws/client/tests/test_ssl.py 1970-01-01 00:00:00 +0000
+++ txaws/client/tests/test_ssl.py 2012-01-26 20:29:23 +0000
@@ -0,0 +1,199 @@
+import os
+import tempfile
+
+from OpenSSL.crypto import dump_certificate, load_certificate, FILETYPE_PEM
+from OpenSSL.SSL import Error as SSLError
+from OpenSSL.version import __version__ as pyopenssl_version
+
+from twisted.internet import reactor
+from twisted.internet.ssl import DefaultOpenSSLContextFactory
+from twisted.protocols.policies import WrappingFactory
+from twisted.python import log
+from twisted.python.filepath import FilePath
+from twisted.test.test_sslverify import makeCertificate
+from twisted.web import server, static
+
+from txaws import exception
+from txaws.client import ssl
+from txaws.client.base import BaseQuery
+from txaws.service import AWSServiceEndpoint
+from txaws.testing.base import TXAWSTestCase
+
+
+def sibpath(path):
+ return os.path.join(os.path.dirname(__file__), path)
+
+
+PRIVKEY = sibpath("private.ssl")
+PUBKEY = sibpath("public.ssl")
+BADPRIVKEY = sibpath("badprivate.ssl")
+BADPUBKEY = sibpath("badpublic.ssl")
+PRIVSANKEY = sibpath("private_san.ssl")
+PUBSANKEY = sibpath("public_san.ssl")
+
+
+class BaseQuerySSLTestCase(TXAWSTestCase):
+
+ def setUp(self):
+ self.cleanupServerConnections = 0
+ name = self.mktemp()
+ os.mkdir(name)
+ FilePath(name).child("file").setContent("0123456789")
+ r = static.File(name)
+ self.site = server.Site(r, timeout=None)
+ self.wrapper = WrappingFactory(self.site)
+ pub_key = file(PUBKEY)
+ pub_key_data = pub_key.read()
+ pub_key.close()
+ pub_key_san = file(PUBSANKEY)
+ pub_key_san_data = pub_key_san.read()
+ pub_key_san.close()
+ ssl._ca_certs = [load_certificate(FILETYPE_PEM, pub_key_data),
+ load_certificate(FILETYPE_PEM, pub_key_san_data)]
+
+ def tearDown(self):
+ ssl._ca_certs = None
+ # If the test indicated it might leave some server-side connections
+ # around, clean them up.
+ connections = self.wrapper.protocols.keys()
+ # If there are fewer server-side connections than requested,
+ # that's okay. Some might have noticed that the client closed
+ # the connection and cleaned up after themselves.
+ for n in range(min(len(connections), self.cleanupServerConnections)):
+ proto = connections.pop()
+ log.msg("Closing %r" % (proto,))
+ proto.transport.loseConnection()
+ if connections:
+ log.msg("Some left-over connections; this test is probably buggy.")
+ return self.port.stopListening()
+
+ def _get_url(self, path):
+ return "https://localhost:%d/%s" % (self.portno, path)
+
+ def test_ssl_verification_positive(self):
+ """
+ The L{VerifyingContextFactory} properly allows to connect to the
+ endpoint if the certificates match.
+ """
+ context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
+ self.port = reactor.listenSSL(
+ 0, self.site, context_factory, interface="127.0.0.1")
+ self.portno = self.port.getHost().port
+
+ endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
+ query = BaseQuery("an action", "creds", endpoint)
+ d = query.get_page(self._get_url("file"))
+ return d.addCallback(self.assertEquals, "0123456789")
+
+ def test_ssl_verification_negative(self):
+ """
+ The L{VerifyingContextFactory} fails with a SSL error the certificates
+ can't be checked.
+ """
+ context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
+ self.port = reactor.listenSSL(
+ 0, self.site, context_factory, interface="127.0.0.1")
+ self.portno = self.port.getHost().port
+
+ endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
+ query = BaseQuery("an action", "creds", endpoint)
+ d = query.get_page(self._get_url("file"))
+ return self.assertFailure(d, SSLError)
+
+ def test_ssl_verification_bypassed(self):
+ """
+ L{BaseQuery} doesn't use L{VerifyingContextFactory}
+ if C{ssl_hostname_verification} is C{False}, thus allowing to connect
+ to non-secure endpoints.
+ """
+ context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
+ self.port = reactor.listenSSL(
+ 0, self.site, context_factory, interface="127.0.0.1")
+ self.portno = self.port.getHost().port
+
+ endpoint = AWSServiceEndpoint(ssl_hostname_verification=False)
+ query = BaseQuery("an action", "creds", endpoint)
+ d = query.get_page(self._get_url("file"))
+ return d.addCallback(self.assertEquals, "0123456789")
+
+ def test_ssl_subject_alt_name(self):
+ """
+ L{VerifyingContextFactory} supports checking C{subjectAltName} in the
+ certificate if it's available.
+ """
+ context_factory = DefaultOpenSSLContextFactory(PRIVSANKEY, PUBSANKEY)
+ self.port = reactor.listenSSL(
+ 0, self.site, context_factory, interface="127.0.0.1")
+ self.portno = self.port.getHost().port
+
+ endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
+ query = BaseQuery("an action", "creds", endpoint)
+ d = query.get_page("https://127.0.0.1:%d/file" % (self.portno,))
+ return d.addCallback(self.assertEquals, "0123456789")
+
+ if pyopenssl_version < "0.12":
+ test_ssl_subject_alt_name.skip = (
+ "subjectAltName not supported by older PyOpenSSL")
+
+
+class CertsFilesTestCase(TXAWSTestCase):
+
+ def setUp(self):
+ super(CertsFilesTestCase, self).setUp()
+ # set up temp dir with no certs
+ self.no_certs_dir = tempfile.mkdtemp()
+ # create certs
+ cert1 = makeCertificate(O="Server Certificate 1", CN="cn1")
+ cert2 = makeCertificate(O="Server Certificate 2", CN="cn2")
+ cert3 = makeCertificate(O="Server Certificate 3", CN="cn3")
+ # set up temp dir with one cert
+ self.one_cert_dir = tempfile.mkdtemp()
+ self.cert1 = self._write_pem(cert1, self.one_cert_dir, "cert1.pem")
+ # set up temp dir with two certs
+ self.two_certs_dir = tempfile.mkdtemp()
+ self.cert2 = self._write_pem(cert2, self.two_certs_dir, "cert2.pem")
+ self.cert3 = self._write_pem(cert3, self.two_certs_dir, "cert3.pem")
+
+ def tearDown(self):
+ super(CertsFilesTestCase, self).tearDown()
+ os.unlink(self.cert1)
+ os.unlink(self.cert2)
+ os.unlink(self.cert3)
+ os.removedirs(self.no_certs_dir)
+ os.removedirs(self.one_cert_dir)
+ os.removedirs(self.two_certs_dir)
+
+ def _write_pem(self, cert, dir, filename):
+ data = dump_certificate(FILETYPE_PEM, cert[1])
+ full_path = os.path.join(dir, filename)
+ fh = open(full_path, "w")
+ fh.write(data)
+ fh.close()
+ return full_path
+
+ def test_get_ca_certs_no_certs(self):
+ os.environ["CERTS_PATH"] = self.no_certs_dir
+ self.patch(ssl, "DEFAULT_CERTS_PATH", self.no_certs_dir)
+ self.assertRaises(exception.CertsNotFoundError, ssl.get_ca_certs)
+
+ def test_get_ca_certs_with_default_path(self):
+ self.patch(ssl, "DEFAULT_CERTS_PATH", self.two_certs_dir)
+ certs = ssl.get_ca_certs()
+ self.assertEqual(len(certs), 2)
+
+ def test_get_ca_certs_with_env_path(self):
+ os.environ["CERTS_PATH"] = self.one_cert_dir
+ certs = ssl.get_ca_certs()
+ self.assertEqual(len(certs), 1)
+
+ def test_get_ca_certs_multiple_paths(self):
+ os.environ["CERTS_PATH"] = "%s:%s" % (
+ self.one_cert_dir, self.two_certs_dir)
+ certs = ssl.get_ca_certs()
+ self.assertEqual(len(certs), 3)
+
+ def test_get_ca_certs_one_empty_path(self):
+ os.environ["CERTS_PATH"] = "%s:%s" % (
+ self.no_certs_dir, self.one_cert_dir)
+ certs = ssl.get_ca_certs()
+ self.assertEqual(len(certs), 1)
=== modified file 'txaws/exception.py'
--- txaws/exception.py 2012-01-23 00:48:29 +0000
+++ txaws/exception.py 2012-01-26 20:29:23 +0000
@@ -126,3 +126,9 @@
"""
txAWS was unable to parse the server response.
"""
+
+
+class CertsNotFoundError(Exception):
+ """
+ txAWS was not able to find any SSL certificates.
+ """
Follow ups