← Back to team overview

txaws-dev team mailing list archive

[Merge] lp:~dreamhosters/txaws/920309-fix-ca-certs into lp:txaws

 

Duncan McGreggor has proposed merging lp:~dreamhosters/txaws/920309-fix-ca-certs into lp:txaws.

Requested reviews:
  txAWS Technical List (txaws-tech)
Related bugs:
  Bug #920309 in txAWS: "test_ssl_hostname_verification unit test error"
  https://bugs.launchpad.net/txaws/+bug/920309

For more details, see:
https://code.launchpad.net/~dreamhosters/txaws/920309-fix-ca-certs/+merge/90325
-- 
https://code.launchpad.net/~dreamhosters/txaws/920309-fix-ca-certs/+merge/90325
Your team txAWS Technical List is requested to review the proposed merge of lp:~dreamhosters/txaws/920309-fix-ca-certs into lp:txaws.
=== modified file 'txaws/client/ssl.py'
--- txaws/client/ssl.py	2011-11-29 18:42:09 +0000
+++ txaws/client/ssl.py	2012-01-26 20:29:23 +0000
@@ -1,16 +1,28 @@
 from glob import glob
 import os
 import re
+import sys
 
 from OpenSSL import SSL
 from OpenSSL.crypto import load_certificate, FILETYPE_PEM
 
 from twisted.internet.ssl import CertificateOptions
 
+from txaws import exception
+
 
 __all__ = ["VerifyingContextFactory", "get_ca_certs"]
 
 
+# Multiple defaults are supported; just add more paths, separated by colons.
+if sys.platform == "darwin":
+    DEFAULT_CERTS_PATH = "/System/Library/OpenSSL/certs/:"
+# XXX Windows users can file a bug to add theirs, since we don't know what
+# the right path is
+else:
+    DEFAULT_CERTS_PATH = "/etc/ssl/certs/:"
+
+
 class VerifyingContextFactory(CertificateOptions):
     """
     A SSL context factory to pass to C{connectSSL} to check for hostname
@@ -71,22 +83,37 @@
         return context
 
 
-def get_ca_certs(files="/etc/ssl/certs/*.pem"):
-    """Retrieve a list of CAs pointed by C{files}."""
-    certificateAuthorityMap = {}
-    for certFileName in glob(files):
-        # There might be some dead symlinks in there, so let's make sure it's
-        # real.
-        if not os.path.exists(certFileName):
-            continue
-        certFile = open(certFileName)
-        data = certFile.read()
-        certFile.close()
-        x509 = load_certificate(FILETYPE_PEM, data)
-        digest = x509.digest("sha1")
-        # Now, de-duplicate in case the same cert has multiple names.
-        certificateAuthorityMap[digest] = x509
-    return certificateAuthorityMap.values()
+def get_ca_certs():
+    """
+    Retrieve a list of CAs pointed by C{files}.
+    
+    In order to find .pem files, this function checks first for presence of the
+    CERTS_PATH environment variable that should point to a directory containing
+    cert files. In the absense of this variable, the module-level
+    DEFAULT_CERTS_PATH will be used instead.
+
+    Note that both of these variables have have multiple paths in them, just
+    like the familiar PATH environment variable (separated by colons).
+    """
+    cert_paths = os.getenv("CERTS_PATH", DEFAULT_CERTS_PATH).split(":")
+    certificate_authority_map = {}
+    for path in cert_paths:
+        for cert_file_name in glob(os.path.join(path, "*.pem")):
+            # There might be some dead symlinks in there, so let's make sure
+            # it's real.
+            if not os.path.exists(cert_file_name):
+                continue
+            cert_file = open(cert_file_name)
+            data = cert_file.read()
+            cert_file.close()
+            x509 = load_certificate(FILETYPE_PEM, data)
+            digest = x509.digest("sha1")
+            # Now, de-duplicate in case the same cert has multiple names.
+            certificate_authority_map[digest] = x509
+    values = certificate_authority_map.values()
+    if len(values) == 0:
+        raise exception.CertsNotFoundError("Could not find any .pem files.")
+    return values
 
 
 _ca_certs = None

=== renamed file 'txaws/client/tests/test_client.py' => 'txaws/client/tests/test_base.py'
--- txaws/client/tests/test_client.py	2011-11-29 18:47:00 +0000
+++ txaws/client/tests/test_base.py	2012-01-26 20:29:23 +0000
@@ -1,38 +1,22 @@
 import os
 
-from OpenSSL.crypto import load_certificate, FILETYPE_PEM
-from OpenSSL.SSL import Error as SSLError
-from OpenSSL.version import __version__ as pyopenssl_version
-
 from twisted.internet import reactor
-from twisted.internet.ssl import DefaultOpenSSLContextFactory
 from twisted.internet.error import ConnectionRefusedError
 from twisted.protocols.policies import WrappingFactory
 from twisted.python import log
 from twisted.python.filepath import FilePath
 from twisted.python.failure import Failure
+from twisted.test.test_sslverify import makeCertificate
 from twisted.web import server, static
 from twisted.web.client import HTTPClientFactory
 from twisted.web.error import Error as TwistedWebError
 
+from txaws.client import ssl
 from txaws.client.base import BaseClient, BaseQuery, error_wrapper
-from txaws.client.ssl import VerifyingContextFactory
 from txaws.service import AWSServiceEndpoint
 from txaws.testing.base import TXAWSTestCase
 
 
-def sibpath(path):
-    return os.path.join(os.path.dirname(__file__), path)
-
-
-PRIVKEY = sibpath("private.ssl")
-PUBKEY = sibpath("public.ssl")
-BADPRIVKEY = sibpath("badprivate.ssl")
-BADPUBKEY = sibpath("badpublic.ssl")
-PRIVSANKEY = sibpath("private_san.ssl")
-PUBSANKEY = sibpath("public_san.ssl")
-
-
 class ErrorWrapperTestCase(TXAWSTestCase):
 
     def test_204_no_content(self):
@@ -168,6 +152,9 @@
         d.addCallback(query.get_response_headers)
         return d.addCallback(check_results)
 
+    # XXX for systems that don't have certs in the DEFAULT_CERT_PATH, this test
+    # will fail; instead, let's create some certs in a temp directory and set
+    # the DEFAULT_CERT_PATH to point there.
     def test_ssl_hostname_verification(self):
         """
         If the endpoint passed to L{BaseQuery} has C{ssl_hostname_verification}
@@ -183,6 +170,8 @@
             def connectSSL(self, host, port, client, factory):
                 self.connects.append((host, port, client, factory))
 
+        certs = makeCertificate(O="Test Certificate", CN="something")[1]
+        self.patch(ssl, "_ca_certs", certs)
         fake_reactor = FakeReactor()
         endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
         query = BaseQuery("an action", "creds", endpoint, fake_reactor)
@@ -190,112 +179,6 @@
         [(host, port, client, factory)] = fake_reactor.connects
         self.assertEqual("example.com", host)
         self.assertEqual(443, port)
-        self.assertTrue(isinstance(factory, VerifyingContextFactory))
+        self.assertTrue(isinstance(factory, ssl.VerifyingContextFactory))
         self.assertEqual("example.com", factory.host)
         self.assertNotEqual([], factory.caCerts)
-
-
-class BaseQuerySSLTestCase(TXAWSTestCase):
-
-    def setUp(self):
-        self.cleanupServerConnections = 0
-        name = self.mktemp()
-        os.mkdir(name)
-        FilePath(name).child("file").setContent("0123456789")
-        r = static.File(name)
-        self.site = server.Site(r, timeout=None)
-        self.wrapper = WrappingFactory(self.site)
-        from txaws.client import ssl
-        pub_key = file(PUBKEY)
-        pub_key_data = pub_key.read()
-        pub_key.close()
-        pub_key_san = file(PUBSANKEY)
-        pub_key_san_data = pub_key_san.read()
-        pub_key_san.close()
-        ssl._ca_certs = [load_certificate(FILETYPE_PEM, pub_key_data),
-                         load_certificate(FILETYPE_PEM, pub_key_san_data)]
-
-    def tearDown(self):
-        from txaws.client import ssl
-        ssl._ca_certs = None
-        # If the test indicated it might leave some server-side connections
-        # around, clean them up.
-        connections = self.wrapper.protocols.keys()
-        # If there are fewer server-side connections than requested,
-        # that's okay.  Some might have noticed that the client closed
-        # the connection and cleaned up after themselves.
-        for n in range(min(len(connections), self.cleanupServerConnections)):
-            proto = connections.pop()
-            log.msg("Closing %r" % (proto,))
-            proto.transport.loseConnection()
-        if connections:
-            log.msg("Some left-over connections; this test is probably buggy.")
-        return self.port.stopListening()
-
-    def _get_url(self, path):
-        return "https://localhost:%d/%s"; % (self.portno, path)
-
-    def test_ssl_verification_positive(self):
-        """
-        The L{VerifyingContextFactory} properly allows to connect to the
-        endpoint if the certificates match.
-        """
-        context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
-        self.port = reactor.listenSSL(
-            0, self.site, context_factory, interface="127.0.0.1")
-        self.portno = self.port.getHost().port
-
-        endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
-        query = BaseQuery("an action", "creds", endpoint)
-        d = query.get_page(self._get_url("file"))
-        return d.addCallback(self.assertEquals, "0123456789")
-
-    def test_ssl_verification_negative(self):
-        """
-        The L{VerifyingContextFactory} fails with a SSL error the certificates
-        can't be checked.
-        """
-        context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
-        self.port = reactor.listenSSL(
-            0, self.site, context_factory, interface="127.0.0.1")
-        self.portno = self.port.getHost().port
-
-        endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
-        query = BaseQuery("an action", "creds", endpoint)
-        d = query.get_page(self._get_url("file"))
-        return self.assertFailure(d, SSLError)
-
-    def test_ssl_verification_bypassed(self):
-        """
-        L{BaseQuery} doesn't use L{VerifyingContextFactory}
-        if C{ssl_hostname_verification} is C{False}, thus allowing to connect
-        to non-secure endpoints.
-        """
-        context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
-        self.port = reactor.listenSSL(
-            0, self.site, context_factory, interface="127.0.0.1")
-        self.portno = self.port.getHost().port
-
-        endpoint = AWSServiceEndpoint(ssl_hostname_verification=False)
-        query = BaseQuery("an action", "creds", endpoint)
-        d = query.get_page(self._get_url("file"))
-        return d.addCallback(self.assertEquals, "0123456789")
-
-    def test_ssl_subject_alt_name(self):
-        """
-        L{VerifyingContextFactory} supports checking C{subjectAltName} in the
-        certificate if it's available.
-        """
-        context_factory = DefaultOpenSSLContextFactory(PRIVSANKEY, PUBSANKEY)
-        self.port = reactor.listenSSL(
-            0, self.site, context_factory, interface="127.0.0.1")
-        self.portno = self.port.getHost().port
-
-        endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
-        query = BaseQuery("an action", "creds", endpoint)
-        d = query.get_page("https://127.0.0.1:%d/file"; % (self.portno,))
-        return d.addCallback(self.assertEquals, "0123456789")
-
-    if pyopenssl_version < "0.12":
-        test_ssl_subject_alt_name.skip = (
-            "subjectAltName not supported by older PyOpenSSL")

=== added file 'txaws/client/tests/test_ssl.py'
--- txaws/client/tests/test_ssl.py	1970-01-01 00:00:00 +0000
+++ txaws/client/tests/test_ssl.py	2012-01-26 20:29:23 +0000
@@ -0,0 +1,199 @@
+import os
+import tempfile
+
+from OpenSSL.crypto import dump_certificate, load_certificate, FILETYPE_PEM
+from OpenSSL.SSL import Error as SSLError
+from OpenSSL.version import __version__ as pyopenssl_version
+
+from twisted.internet import reactor
+from twisted.internet.ssl import DefaultOpenSSLContextFactory
+from twisted.protocols.policies import WrappingFactory
+from twisted.python import log
+from twisted.python.filepath import FilePath
+from twisted.test.test_sslverify import makeCertificate
+from twisted.web import server, static
+
+from txaws import exception
+from txaws.client import ssl
+from txaws.client.base import BaseQuery
+from txaws.service import AWSServiceEndpoint
+from txaws.testing.base import TXAWSTestCase
+
+
+def sibpath(path):
+    return os.path.join(os.path.dirname(__file__), path)
+
+
+PRIVKEY = sibpath("private.ssl")
+PUBKEY = sibpath("public.ssl")
+BADPRIVKEY = sibpath("badprivate.ssl")
+BADPUBKEY = sibpath("badpublic.ssl")
+PRIVSANKEY = sibpath("private_san.ssl")
+PUBSANKEY = sibpath("public_san.ssl")
+
+
+class BaseQuerySSLTestCase(TXAWSTestCase):
+
+    def setUp(self):
+        self.cleanupServerConnections = 0
+        name = self.mktemp()
+        os.mkdir(name)
+        FilePath(name).child("file").setContent("0123456789")
+        r = static.File(name)
+        self.site = server.Site(r, timeout=None)
+        self.wrapper = WrappingFactory(self.site)
+        pub_key = file(PUBKEY)
+        pub_key_data = pub_key.read()
+        pub_key.close()
+        pub_key_san = file(PUBSANKEY)
+        pub_key_san_data = pub_key_san.read()
+        pub_key_san.close()
+        ssl._ca_certs = [load_certificate(FILETYPE_PEM, pub_key_data),
+                         load_certificate(FILETYPE_PEM, pub_key_san_data)]
+
+    def tearDown(self):
+        ssl._ca_certs = None
+        # If the test indicated it might leave some server-side connections
+        # around, clean them up.
+        connections = self.wrapper.protocols.keys()
+        # If there are fewer server-side connections than requested,
+        # that's okay.  Some might have noticed that the client closed
+        # the connection and cleaned up after themselves.
+        for n in range(min(len(connections), self.cleanupServerConnections)):
+            proto = connections.pop()
+            log.msg("Closing %r" % (proto,))
+            proto.transport.loseConnection()
+        if connections:
+            log.msg("Some left-over connections; this test is probably buggy.")
+        return self.port.stopListening()
+
+    def _get_url(self, path):
+        return "https://localhost:%d/%s"; % (self.portno, path)
+
+    def test_ssl_verification_positive(self):
+        """
+        The L{VerifyingContextFactory} properly allows to connect to the
+        endpoint if the certificates match.
+        """
+        context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
+        self.port = reactor.listenSSL(
+            0, self.site, context_factory, interface="127.0.0.1")
+        self.portno = self.port.getHost().port
+
+        endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
+        query = BaseQuery("an action", "creds", endpoint)
+        d = query.get_page(self._get_url("file"))
+        return d.addCallback(self.assertEquals, "0123456789")
+
+    def test_ssl_verification_negative(self):
+        """
+        The L{VerifyingContextFactory} fails with a SSL error the certificates
+        can't be checked.
+        """
+        context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
+        self.port = reactor.listenSSL(
+            0, self.site, context_factory, interface="127.0.0.1")
+        self.portno = self.port.getHost().port
+
+        endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
+        query = BaseQuery("an action", "creds", endpoint)
+        d = query.get_page(self._get_url("file"))
+        return self.assertFailure(d, SSLError)
+
+    def test_ssl_verification_bypassed(self):
+        """
+        L{BaseQuery} doesn't use L{VerifyingContextFactory}
+        if C{ssl_hostname_verification} is C{False}, thus allowing to connect
+        to non-secure endpoints.
+        """
+        context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
+        self.port = reactor.listenSSL(
+            0, self.site, context_factory, interface="127.0.0.1")
+        self.portno = self.port.getHost().port
+
+        endpoint = AWSServiceEndpoint(ssl_hostname_verification=False)
+        query = BaseQuery("an action", "creds", endpoint)
+        d = query.get_page(self._get_url("file"))
+        return d.addCallback(self.assertEquals, "0123456789")
+
+    def test_ssl_subject_alt_name(self):
+        """
+        L{VerifyingContextFactory} supports checking C{subjectAltName} in the
+        certificate if it's available.
+        """
+        context_factory = DefaultOpenSSLContextFactory(PRIVSANKEY, PUBSANKEY)
+        self.port = reactor.listenSSL(
+            0, self.site, context_factory, interface="127.0.0.1")
+        self.portno = self.port.getHost().port
+
+        endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
+        query = BaseQuery("an action", "creds", endpoint)
+        d = query.get_page("https://127.0.0.1:%d/file"; % (self.portno,))
+        return d.addCallback(self.assertEquals, "0123456789")
+
+    if pyopenssl_version < "0.12":
+        test_ssl_subject_alt_name.skip = (
+            "subjectAltName not supported by older PyOpenSSL")
+
+
+class CertsFilesTestCase(TXAWSTestCase):
+
+    def setUp(self):
+        super(CertsFilesTestCase, self).setUp()
+        # set up temp dir with no certs
+        self.no_certs_dir = tempfile.mkdtemp()
+        # create certs
+        cert1 = makeCertificate(O="Server Certificate 1", CN="cn1")
+        cert2 = makeCertificate(O="Server Certificate 2", CN="cn2")
+        cert3 = makeCertificate(O="Server Certificate 3", CN="cn3")
+        # set up temp dir with one cert
+        self.one_cert_dir = tempfile.mkdtemp()
+        self.cert1 = self._write_pem(cert1, self.one_cert_dir, "cert1.pem")
+        # set up temp dir with two certs
+        self.two_certs_dir = tempfile.mkdtemp()
+        self.cert2 = self._write_pem(cert2, self.two_certs_dir, "cert2.pem")
+        self.cert3 = self._write_pem(cert3, self.two_certs_dir, "cert3.pem")
+
+    def tearDown(self):
+        super(CertsFilesTestCase, self).tearDown()
+        os.unlink(self.cert1)
+        os.unlink(self.cert2)
+        os.unlink(self.cert3)
+        os.removedirs(self.no_certs_dir)
+        os.removedirs(self.one_cert_dir)
+        os.removedirs(self.two_certs_dir)
+
+    def _write_pem(self, cert, dir, filename):
+        data = dump_certificate(FILETYPE_PEM, cert[1])
+        full_path = os.path.join(dir, filename)
+        fh = open(full_path, "w")
+        fh.write(data)
+        fh.close()
+        return full_path
+
+    def test_get_ca_certs_no_certs(self):
+        os.environ["CERTS_PATH"] = self.no_certs_dir
+        self.patch(ssl, "DEFAULT_CERTS_PATH", self.no_certs_dir)
+        self.assertRaises(exception.CertsNotFoundError, ssl.get_ca_certs)
+
+    def test_get_ca_certs_with_default_path(self):
+        self.patch(ssl, "DEFAULT_CERTS_PATH", self.two_certs_dir)
+        certs = ssl.get_ca_certs()
+        self.assertEqual(len(certs), 2)
+
+    def test_get_ca_certs_with_env_path(self):
+        os.environ["CERTS_PATH"] = self.one_cert_dir
+        certs = ssl.get_ca_certs()
+        self.assertEqual(len(certs), 1)
+
+    def test_get_ca_certs_multiple_paths(self):
+        os.environ["CERTS_PATH"] = "%s:%s" % (
+            self.one_cert_dir, self.two_certs_dir)
+        certs = ssl.get_ca_certs()
+        self.assertEqual(len(certs), 3)
+
+    def test_get_ca_certs_one_empty_path(self):
+        os.environ["CERTS_PATH"] = "%s:%s" % (
+            self.no_certs_dir, self.one_cert_dir)
+        certs = ssl.get_ca_certs()
+        self.assertEqual(len(certs), 1)

=== modified file 'txaws/exception.py'
--- txaws/exception.py	2012-01-23 00:48:29 +0000
+++ txaws/exception.py	2012-01-26 20:29:23 +0000
@@ -126,3 +126,9 @@
     """
     txAWS was unable to parse the server response.
     """
+
+
+class CertsNotFoundError(Exception):
+    """
+    txAWS was not able to find any SSL certificates.
+    """


Follow ups