launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #26942
[Merge] ~cjwatson/launchpad:remove-htpasswd-generation into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:remove-htpasswd-generation into launchpad:master with ~cjwatson/launchpad:archive-auth-inactive-person as a prerequisite.
Commit message:
Remove .htaccess and .htpasswd generation
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/401610
We now handle private PPA authorization dynamically instead.
The generate-ppa-htaccess script remains in place for now, since it still handles things like sending email to people when their subscriptions are cancelled.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:remove-htpasswd-generation into launchpad:master.
diff --git a/lib/lp/archivepublisher/htaccess.py b/lib/lp/archivepublisher/htaccess.py
deleted file mode 100644
index 613cfde..0000000
--- a/lib/lp/archivepublisher/htaccess.py
+++ /dev/null
@@ -1,124 +0,0 @@
-#!/usr/bin/python2
-#
-# Copyright 2010-2017 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Writing of htaccess and htpasswd files."""
-
-__metaclass__ = type
-
-__all__ = [
- 'htpasswd_credentials_for_archive',
- 'write_htaccess',
- 'write_htpasswd',
- ]
-
-import base64
-import crypt
-import os
-
-from lp.registry.model.person import Person
-from lp.services.database.interfaces import IStore
-from lp.soyuz.model.archiveauthtoken import ArchiveAuthToken
-
-
-HTACCESS_TEMPLATE = """
-AuthType Basic
-AuthName "Token Required"
-AuthUserFile %(path)s/.htpasswd
-Require valid-user
-"""
-
-BUILDD_USER_NAME = "buildd"
-
-
-def write_htaccess(htaccess_filename, distroot):
- """Write a htaccess file for a private archive.
-
- :param htaccess_filename: Filename of the htaccess file.
- :param distroot: Archive root path
- """
- interpolations = {"path": distroot}
- file = open(htaccess_filename, "w")
- try:
- file.write(HTACCESS_TEMPLATE % interpolations)
- finally:
- file.close()
-
-
-def write_htpasswd(filename, users):
- """Write out a new htpasswd file.
-
- :param filename: The file to create.
- :param users: Iterable over (user, password, salt) tuples.
- """
- if os.path.isfile(filename):
- os.remove(filename)
-
- file = open(filename, "a")
- try:
- for user, password, salt in users:
- encrypted = crypt.crypt(password, salt)
- file.write("%s:%s\n" % (user, encrypted))
- finally:
- file.close()
-
-
-# XXX cjwatson 2017-10-09: This whole mechanism of writing password files to
-# disk (as opposed to e.g. using a WSGI authentication provider that checks
-# passwords against the database) is terrible, but as long as we're using it
-# we should use something like bcrypt rather than DES-based crypt.
-def make_salt(s):
- """Produce a salt from an input string.
-
- This ensures that salts are drawn from the correct alphabet
- ([./a-zA-Z0-9]).
- """
- # As long as the input string is at least one character long, there will
- # be no padding within the first two characters.
- return base64.b64encode(
- (s or " ").encode("UTF-8"), altchars=b"./")[:2].decode("ASCII")
-
-
-def htpasswd_credentials_for_archive(archive):
- """Return credentials for an archive for use with write_htpasswd.
-
- :param archive: An `IArchive` (must be private)
- :return: Iterable of tuples with (user, password, salt) for use with
- write_htpasswd.
- """
- assert archive.private, "Archive %r must be private" % archive
-
- tokens = IStore(ArchiveAuthToken).find(
- (ArchiveAuthToken.person_id, ArchiveAuthToken.name,
- ArchiveAuthToken.token),
- ArchiveAuthToken.archive == archive,
- ArchiveAuthToken.date_deactivated == None)
- # We iterate tokens more than once - materialise it.
- tokens = list(tokens)
-
- # Preload map with person ID to person name.
- person_ids = {token[0] for token in tokens}
- names = dict(
- IStore(Person).find(
- (Person.id, Person.name), Person.id.is_in(person_ids)))
-
- # Format the user field by combining the token list with the person list
- # (when token has person_id) or prepending a '+' (for named tokens).
- output = []
- for person_id, token_name, token in tokens:
- if token_name:
- # A named auth token.
- output.append(('+' + token_name, token, make_salt(token_name)))
- else:
- # A subscription auth token.
- output.append(
- (names[person_id], token, make_salt(names[person_id])))
-
- # The first .htpasswd entry is the buildd_secret.
- yield (BUILDD_USER_NAME, archive.buildd_secret, BUILDD_USER_NAME[:2])
-
- # Iterate over tokens and write the appropriate htpasswd entries for them.
- # Sort by name/person ID so the file can be compared later.
- for user, password, salt in sorted(output):
- yield (user, password, salt)
diff --git a/lib/lp/archivepublisher/publishing.py b/lib/lp/archivepublisher/publishing.py
index 55614f9..b87c7ce 100644
--- a/lib/lp/archivepublisher/publishing.py
+++ b/lib/lp/archivepublisher/publishing.py
@@ -50,17 +50,14 @@ from lp.archivepublisher import HARDCODED_COMPONENT_ORDER
from lp.archivepublisher.config import getPubConfig
from lp.archivepublisher.diskpool import DiskPool
from lp.archivepublisher.domination import Dominator
-from lp.archivepublisher.htaccess import (
- htpasswd_credentials_for_archive,
- write_htaccess,
- write_htpasswd,
- )
from lp.archivepublisher.indices import (
build_binary_stanza_fields,
build_source_stanza_fields,
build_translations_stanza_fields,
)
-from lp.archivepublisher.interfaces.archivegpgsigningkey import ISignableArchive
+from lp.archivepublisher.interfaces.archivegpgsigningkey import (
+ ISignableArchive,
+ )
from lp.archivepublisher.model.ftparchive import FTPArchiveHandler
from lp.archivepublisher.utils import (
get_ppa_reference,
@@ -166,27 +163,6 @@ def _getDiskPool(pubconf, log):
return dp
-def _setupHtaccess(archive, pubconf, log):
- """Setup .htaccess/.htpasswd files for an archive.
- """
- if not archive.private:
- # FIXME: JRV 20101108 leftover .htaccess and .htpasswd files
- # should be removed when support for making existing 3PA's public
- # is added; bug=376072
- return
-
- htaccess_path = os.path.join(pubconf.archiveroot, ".htaccess")
- htpasswd_path = os.path.join(pubconf.archiveroot, ".htpasswd")
- # After the initial htaccess/htpasswd files
- # are created generate_ppa_htaccess is responsible for
- # updating the tokens.
- if not os.path.exists(htaccess_path):
- log.debug("Writing htaccess file.")
- write_htaccess(htaccess_path, pubconf.archiveroot)
- passwords = htpasswd_credentials_for_archive(archive)
- write_htpasswd(htpasswd_path, passwords)
-
-
def getPublisher(archive, allowed_suites, log, distsroot=None):
"""Return an initialized Publisher instance for the given context.
@@ -472,7 +448,6 @@ class Publisher(object):
def setupArchiveDirs(self):
self.log.debug("Setting up archive directories.")
self._config.setupArchiveDirs()
- _setupHtaccess(self.archive, self._config, self.log)
def isDirty(self, distroseries, pocket):
"""True if a publication has happened in this release and pocket."""
diff --git a/lib/lp/archivepublisher/scripts/generate_ppa_htaccess.py b/lib/lp/archivepublisher/scripts/generate_ppa_htaccess.py
index a272540..26e8db8 100644
--- a/lib/lp/archivepublisher/scripts/generate_ppa_htaccess.py
+++ b/lib/lp/archivepublisher/scripts/generate_ppa_htaccess.py
@@ -3,22 +3,10 @@
# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
-from datetime import (
- datetime,
- timedelta,
- )
-import filecmp
-import os
-import tempfile
+from datetime import datetime
import pytz
-from lp.archivepublisher.config import getPubConfig
-from lp.archivepublisher.htaccess import (
- htpasswd_credentials_for_archive,
- write_htaccess,
- write_htpasswd,
- )
from lp.registry.model.teammembership import TeamParticipation
from lp.services.config import config
from lp.services.database.interfaces import IStore
@@ -30,23 +18,19 @@ from lp.services.mail.sendmail import (
)
from lp.services.scripts.base import LaunchpadCronScript
from lp.services.webapp import canonical_url
-from lp.soyuz.enums import (
- ArchiveStatus,
- ArchiveSubscriberStatus,
- )
-from lp.soyuz.model.archive import Archive
+from lp.soyuz.enums import ArchiveSubscriberStatus
from lp.soyuz.model.archiveauthtoken import ArchiveAuthToken
from lp.soyuz.model.archivesubscriber import ArchiveSubscriber
-# These PPAs should never have their htaccess/pwd files touched.
-BLACKLISTED_PPAS = {
- 'ubuntuone': ['ppa'],
- }
-
class HtaccessTokenGenerator(LaunchpadCronScript):
- """Helper class for generating .htaccess files for private PPAs."""
- blacklist = BLACKLISTED_PPAS
+ """Expire archive subscriptions and deactivate invalid tokens."""
+
+ # XXX cjwatson 2021-04-21: This script and class are now misnamed, as we
+ # no longer generate .htaccess or .htpasswd files, but instead check
+ # archive authentication dynamically. We can remove this script once we
+ # stop running it on production and move its remaining functions
+ # elsewhere (probably garbo).
def add_my_options(self):
"""Add script command line options."""
@@ -60,68 +44,6 @@ class HtaccessTokenGenerator(LaunchpadCronScript):
dest="no_deactivation", default=False,
help="If set, tokens are not deactivated.")
- def ensureHtaccess(self, ppa):
- """Generate a .htaccess for `ppa`."""
- if self.options.dryrun:
- return
-
- # The publisher Config object does not have an
- # interface, so we need to remove the security wrapper.
- pub_config = getPubConfig(ppa)
- htaccess_filename = os.path.join(pub_config.archiveroot, ".htaccess")
- if not os.path.exists(htaccess_filename):
- # It's not there, so create it.
- if not os.path.exists(pub_config.archiveroot):
- os.makedirs(pub_config.archiveroot)
- write_htaccess(htaccess_filename, pub_config.archiveroot)
- self.logger.debug("Created .htaccess for %s" % ppa.displayname)
-
- def generateHtpasswd(self, ppa):
- """Generate a htpasswd file for `ppa`s `tokens`.
-
- :param ppa: The context PPA (an `IArchive`).
- :return: The filename of the htpasswd file that was generated.
- """
- # Create a temporary file that will be a new .htpasswd.
- pub_config = getPubConfig(ppa)
- if not os.path.exists(pub_config.temproot):
- os.makedirs(pub_config.temproot)
- fd, temp_filename = tempfile.mkstemp(dir=pub_config.temproot)
- os.close(fd)
-
- write_htpasswd(temp_filename, htpasswd_credentials_for_archive(ppa))
-
- return temp_filename
-
- def replaceUpdatedHtpasswd(self, ppa, temp_htpasswd_file):
- """Compare the new and the old htpasswd and replace if changed.
-
- :return: True if the file was replaced.
- """
- try:
- if self.options.dryrun:
- return False
-
- # The publisher Config object does not have an
- # interface, so we need to remove the security wrapper.
- pub_config = getPubConfig(ppa)
- if not os.path.exists(pub_config.archiveroot):
- os.makedirs(pub_config.archiveroot)
- htpasswd_filename = os.path.join(
- pub_config.archiveroot, ".htpasswd")
-
- if (not os.path.isfile(htpasswd_filename) or
- not filecmp.cmp(htpasswd_filename, temp_htpasswd_file)):
- # Atomically replace the old file or create a new file.
- os.rename(temp_htpasswd_file, htpasswd_filename)
- self.logger.debug("Replaced htpasswd for %s" % ppa.displayname)
- return True
-
- return False
- finally:
- if os.path.exists(temp_htpasswd_file):
- os.unlink(temp_htpasswd_file)
-
def sendCancellationEmail(self, token):
"""Send an email to the person whose subscription was cancelled."""
if token.archive.suppress_subscription_notifications:
@@ -220,8 +142,7 @@ class HtaccessTokenGenerator(LaunchpadCronScript):
:param send_email: Whether to send a cancellation email to the owner
of the token. This defaults to False to speed up the test
suite.
- :return: the set of ppas affected by token deactivations so that we
- can later update their htpasswd files.
+ :return: the set of ppas affected by token deactivations.
"""
invalid_tokens = self._getInvalidTokens()
return self.deactivateTokens(invalid_tokens, send_email=send_email)
@@ -249,129 +170,13 @@ class HtaccessTokenGenerator(LaunchpadCronScript):
self.logger.info(
"Expired subscriptions: %s" % ", ".join(subscription_names))
- def getTimeToSyncFrom(self):
- """Return the time we'll synchronize from.
-
- Any new PPAs or tokens created since this time will be used to
- generate passwords.
- """
- # NTP is running on our servers and therefore we can assume
- # only minimal skew, we include a fudge-factor of 1s so that
- # even the minimal skew cannot demonstrate bug 627608.
- last_activity = self.get_last_activity()
- if not last_activity:
- return
- return last_activity.date_started - timedelta(seconds=1)
-
- def getNewTokens(self, since=None):
- """Return result set of new tokens created since the given time."""
- store = IStore(ArchiveAuthToken)
- extra_expr = []
- if since:
- extra_expr = [ArchiveAuthToken.date_created >= since]
- new_ppa_tokens = store.find(
- ArchiveAuthToken,
- ArchiveAuthToken.date_deactivated == None,
- *extra_expr)
- return new_ppa_tokens
-
- def getDeactivatedNamedTokens(self, since=None):
- """Return result set of named tokens deactivated since given time."""
- now = datetime.now(pytz.UTC)
-
- store = IStore(ArchiveAuthToken)
- extra_expr = []
- if since:
- extra_expr = [ArchiveAuthToken.date_deactivated >= since]
- tokens = store.find(
- ArchiveAuthToken,
- ArchiveAuthToken.name != None,
- ArchiveAuthToken.date_deactivated != None,
- ArchiveAuthToken.date_deactivated <= now,
- *extra_expr)
- return tokens
-
- def getNewPrivatePPAs(self, since=None):
- """Return the recently created private PPAs."""
- store = IStore(Archive)
- extra_expr = []
- if since:
- extra_expr = [Archive.date_created >= since]
- return store.find(
- Archive, Archive._private == True, *extra_expr)
-
def main(self):
"""Script entry point."""
self.logger.info('Starting the PPA .htaccess generation')
self.expireSubscriptions()
affected_ppas = self.deactivateInvalidTokens(send_email=True)
- current_ppa_count = len(affected_ppas)
- self.logger.debug(
- '%s PPAs with deactivated tokens' % current_ppa_count)
-
- last_success = self.getTimeToSyncFrom()
-
- # Include ppas with named tokens deactivated since last time we ran.
- num_tokens = 0
- for token in self.getDeactivatedNamedTokens(since=last_success):
- affected_ppas.add(token.archive)
- num_tokens += 1
-
- new_ppa_count = len(affected_ppas)
- self.logger.debug(
- "%s deactivated named tokens since last run, %s PPAs affected"
- % (num_tokens, new_ppa_count - current_ppa_count))
- current_ppa_count = new_ppa_count
-
- # In addition to the ppas that are affected by deactivated
- # tokens, we also want to include any ppas that have tokens
- # created since the last time we ran.
- num_tokens = 0
- for token in self.getNewTokens(since=last_success):
- affected_ppas.add(token.archive)
- num_tokens += 1
-
- new_ppa_count = len(affected_ppas)
- self.logger.debug(
- "%s new tokens since last run, %s PPAs affected"
- % (num_tokens, new_ppa_count - current_ppa_count))
- current_ppa_count = new_ppa_count
-
- affected_ppas.update(self.getNewPrivatePPAs(since=last_success))
- new_ppa_count = len(affected_ppas)
self.logger.debug(
- "%s new private PPAs since last run"
- % (new_ppa_count - current_ppa_count))
-
- self.logger.debug('%s PPAs require updating' % new_ppa_count)
- for ppa in affected_ppas:
- # If this PPA is blacklisted, do not touch its htaccess/pwd
- # files.
- blacklisted_ppa_names_for_owner = self.blacklist.get(
- ppa.owner.name, [])
- if ppa.name in blacklisted_ppa_names_for_owner:
- self.logger.info(
- "Skipping htaccess updates for blacklisted PPA "
- " '%s' owned by %s.",
- ppa.name,
- ppa.owner.displayname)
- continue
- elif ppa.status == ArchiveStatus.DELETED or ppa.enabled is False:
- self.logger.info(
- "Skipping htaccess updates for deleted or disabled PPA "
- " '%s' owned by %s.",
- ppa.name,
- ppa.owner.displayname)
- continue
-
- self.ensureHtaccess(ppa)
- htpasswd_write_start = datetime.now()
- temp_htpasswd = self.generateHtpasswd(ppa)
- self.replaceUpdatedHtpasswd(ppa, temp_htpasswd)
- htpasswd_write_duration = datetime.now() - htpasswd_write_start
- self.logger.debug(
- "Wrote htpasswd for '%s': %ss"
- % (ppa.name, htpasswd_write_duration.total_seconds()))
+ '%s PPAs with deactivated tokens' % len(affected_ppas))
if self.options.no_deactivation or self.options.dryrun:
self.logger.info('Dry run, so not committing transaction.')
diff --git a/lib/lp/archivepublisher/tests/test_generate_ppa_htaccess.py b/lib/lp/archivepublisher/tests/test_generate_ppa_htaccess.py
index f11dba1..472b7bf 100644
--- a/lib/lp/archivepublisher/tests/test_generate_ppa_htaccess.py
+++ b/lib/lp/archivepublisher/tests/test_generate_ppa_htaccess.py
@@ -5,7 +5,6 @@
from __future__ import absolute_import, print_function, unicode_literals
-import crypt
from datetime import (
datetime,
timedelta,
@@ -13,20 +12,10 @@ from datetime import (
import os
import subprocess
import sys
-import tempfile
import pytz
-from testtools.matchers import (
- AllMatch,
- FileContains,
- FileExists,
- Not,
- )
-import transaction
from zope.component import getUtility
-from zope.security.proxy import removeSecurityProxy
-from lp.archivepublisher.config import getPubConfig
from lp.archivepublisher.scripts.generate_ppa_htaccess import (
HtaccessTokenGenerator,
)
@@ -36,16 +25,7 @@ from lp.registry.interfaces.teammembership import TeamMembershipStatus
from lp.services.config import config
from lp.services.features.testing import FeatureFixture
from lp.services.log.logger import BufferLogger
-from lp.services.osutils import (
- ensure_directory_exists,
- remove_if_exists,
- write_file,
- )
-from lp.services.scripts.interfaces.scriptactivity import IScriptActivitySet
-from lp.soyuz.enums import (
- ArchiveStatus,
- ArchiveSubscriberStatus,
- )
+from lp.soyuz.enums import ArchiveSubscriberStatus
from lp.soyuz.interfaces.archive import NAMED_AUTH_TOKEN_FEATURE_FLAG
from lp.testing import TestCaseWithFactory
from lp.testing.dbuser import (
@@ -102,102 +82,6 @@ class TestPPAHtaccessTokenGeneration(TestCaseWithFactory):
stdout, stderr = process.communicate()
return process.returncode, stdout, stderr
- def testEnsureHtaccess(self):
- """Ensure that the .htaccess file is generated correctly."""
- # The publisher Config object does not have an interface, so we
- # need to remove the security wrapper.
- pub_config = getPubConfig(self.ppa)
-
- filename = os.path.join(pub_config.archiveroot, ".htaccess")
- remove_if_exists(filename)
- script = self.getScript()
- script.ensureHtaccess(self.ppa)
- self.addCleanup(remove_if_exists, filename)
-
- contents = [
- "",
- "AuthType Basic",
- "AuthName \"Token Required\"",
- "AuthUserFile %s/.htpasswd" % pub_config.archiveroot,
- "Require valid-user",
- "",
- ]
- self.assertThat(filename, FileContains('\n'.join(contents)))
-
- def testGenerateHtpasswd(self):
- """Given some `ArchiveAuthToken`s, test generating htpasswd."""
- # Make some subscriptions and tokens.
- tokens = []
- for name in ['name12', 'name16']:
- person = getUtility(IPersonSet).getByName(name)
- self.ppa.newSubscription(person, self.ppa.owner)
- tokens.append(self.ppa.newAuthToken(person))
- token_usernames = [token.person.name for token in tokens]
-
- # Generate the passwd file.
- script = self.getScript()
- filename = script.generateHtpasswd(self.ppa)
- self.addCleanup(remove_if_exists, filename)
-
- # It should be a temp file on the same filesystem as the target
- # file, so os.rename() won't explode. temproot is relied on
- # elsewhere for this same purpose, so it should be safe.
- pub_config = getPubConfig(self.ppa)
- self.assertEqual(pub_config.temproot, os.path.dirname(filename))
-
- # Read it back in.
- file_contents = [
- line.strip().split(':', 1) for line in open(filename, 'r')]
-
- # First entry is buildd secret, rest are from tokens.
- usernames = list(list(zip(*file_contents))[0])
- self.assertEqual(['buildd'] + token_usernames, usernames)
-
- # We can re-encrypt the buildd_secret and it should match the
- # one in the .htpasswd file.
- password = file_contents[0][1]
- encrypted_secret = crypt.crypt(self.ppa.buildd_secret, password)
- self.assertEqual(encrypted_secret, password)
-
- def testReplaceUpdatedHtpasswd(self):
- """Test that the htpasswd file is only replaced if it changes."""
- FILE_CONTENT = b"Kneel before Zod!"
- # The publisher Config object does not have an interface, so we
- # need to remove the security wrapper.
- pub_config = getPubConfig(self.ppa)
- filename = os.path.join(pub_config.archiveroot, ".htpasswd")
-
- # Write out a dummy .htpasswd
- ensure_directory_exists(pub_config.archiveroot)
- write_file(filename, FILE_CONTENT)
-
- # Write the same contents in a temp file.
- def write_tempfile():
- fd, temp_filename = tempfile.mkstemp(dir=pub_config.archiveroot)
- file = os.fdopen(fd, "wb")
- file.write(FILE_CONTENT)
- file.close()
- return temp_filename
-
- # Replacement should not happen.
- temp_filename = write_tempfile()
- script = self.getScript()
- self.assertTrue(os.path.exists(temp_filename))
- self.assertFalse(
- script.replaceUpdatedHtpasswd(self.ppa, temp_filename))
- self.assertFalse(os.path.exists(temp_filename))
-
- # Writing a different .htpasswd should see it get replaced.
- write_file(filename, b"Come to me, son of Jor-El!")
-
- temp_filename = write_tempfile()
- self.assertTrue(os.path.exists(temp_filename))
- self.assertTrue(
- script.replaceUpdatedHtpasswd(self.ppa, temp_filename))
- self.assertFalse(os.path.exists(temp_filename))
-
- os.remove(filename)
-
def assertDeactivated(self, token):
"""Helper function to test token deactivation state."""
return self.assertNotEqual(token.date_deactivated, None)
@@ -341,15 +225,6 @@ class TestPPAHtaccessTokenGeneration(TestCaseWithFactory):
self.layer.txn.commit()
return (sub1, sub2), (token1, token2, token3)
- def ensureNoFiles(self):
- """Ensure the .ht* files don't already exist."""
- pub_config = getPubConfig(self.ppa)
- htaccess = os.path.join(pub_config.archiveroot, ".htaccess")
- htpasswd = os.path.join(pub_config.archiveroot, ".htpasswd")
- remove_if_exists(htaccess)
- remove_if_exists(htpasswd)
- return htaccess, htpasswd
-
def testSubscriptionExpiry(self):
"""Ensure subscriptions' statuses are set to EXPIRED properly."""
subs, tokens = self.setupDummyTokens()
@@ -369,51 +244,6 @@ class TestPPAHtaccessTokenGeneration(TestCaseWithFactory):
self.assertEqual(subs[0].status, ArchiveSubscriberStatus.EXPIRED)
self.assertEqual(subs[1].status, ArchiveSubscriberStatus.CURRENT)
- def testBasicOperation(self):
- """Invoke the actual script and make sure it generates some files."""
- self.setupDummyTokens()
- htaccess, htpasswd = self.ensureNoFiles()
-
- # Call the script and check that we have a .htaccess and a
- # .htpasswd.
- return_code, stdout, stderr = self.runScript()
- self.assertEqual(
- return_code, 0, "Got a bad return code of %s\nOutput:\n%s" %
- (return_code, stderr))
- self.assertThat([htaccess, htpasswd], AllMatch(FileExists()))
- os.remove(htaccess)
- os.remove(htpasswd)
-
- def testBasicOperation_with_named_tokens(self):
- """Invoke the actual script and make sure it generates some files."""
- token1 = self.ppa.newNamedAuthToken("tokenname1")
- token2 = self.ppa.newNamedAuthToken("tokenname2")
- token3 = self.ppa.newNamedAuthToken("tokenname3")
- token3.deactivate()
-
- # Call the script and check that we have a .htaccess and a .htpasswd.
- htaccess, htpasswd = self.ensureNoFiles()
- script = self.getScript()
- script.main()
- self.assertThat([htaccess, htpasswd], AllMatch(FileExists()))
- with open(htpasswd) as htpasswd_file:
- contents = htpasswd_file.read()
- self.assertIn('+' + token1.name, contents)
- self.assertIn('+' + token2.name, contents)
- self.assertNotIn('+' + token3.name, contents)
-
- # Deactivate a named token and verify it is removed from .htpasswd.
- token2.deactivate()
- script.main()
- self.assertThat([htaccess, htpasswd], AllMatch(FileExists()))
- with open(htpasswd) as htpasswd_file:
- contents = htpasswd_file.read()
- self.assertIn('+' + token1.name, contents)
- self.assertNotIn('+' + token2.name, contents)
- self.assertNotIn('+' + token3.name, contents)
- os.remove(htaccess)
- os.remove(htpasswd)
-
def _setupOptionsData(self):
"""Setup test data for option testing."""
subs, tokens = self.setupDummyTokens()
@@ -427,13 +257,9 @@ class TestPPAHtaccessTokenGeneration(TestCaseWithFactory):
"""Test that the dryrun and no-deactivation option works."""
subs, tokens = self._setupOptionsData()
- htaccess, htpasswd = self.ensureNoFiles()
script = self.getScript(test_args=["--dry-run"])
script.main()
- # Assert no files were written.
- self.assertThat([htaccess, htpasswd], AllMatch(Not(FileExists())))
-
# Assert that the cancelled subscription did not cause the token
# to get deactivated.
self.assertNotDeactivated(tokens[0])
@@ -448,65 +274,6 @@ class TestPPAHtaccessTokenGeneration(TestCaseWithFactory):
script.main()
self.assertDeactivated(tokens[0])
- def testBlacklistingPPAs(self):
- """Test that the htaccess for blacklisted PPAs are not touched."""
- subs, tokens = self.setupDummyTokens()
- htaccess, htpasswd = self.ensureNoFiles()
-
- # Setup the first subscription so that it is due to be expired.
- now = datetime.now(pytz.UTC)
- subs[0].date_expires = now - timedelta(minutes=3)
- self.assertEqual(subs[0].status, ArchiveSubscriberStatus.CURRENT)
-
- script = self.getScript()
- script.blacklist = {'joe': ['my_other_ppa', 'myppa', 'and_another']}
- script.main()
-
- # The tokens will still be deactivated, and subscriptions expired.
- self.assertDeactivated(tokens[0])
- self.assertEqual(subs[0].status, ArchiveSubscriberStatus.EXPIRED)
- # But the htaccess is not touched.
- self.assertThat([htaccess, htpasswd], AllMatch(Not(FileExists())))
-
- def testSkippingOfDisabledPPAs(self):
- """Test that the htaccess for disabled PPAs are not touched."""
- subs, tokens = self.setupDummyTokens()
- htaccess, htpasswd = self.ensureNoFiles()
-
- # Setup subscription so that htaccess/htpasswd is pending generation.
- now = datetime.now(pytz.UTC)
- subs[0].date_expires = now + timedelta(minutes=3)
- self.assertEqual(subs[0].status, ArchiveSubscriberStatus.CURRENT)
-
- # Set the PPA as disabled.
- self.ppa.disable()
- self.assertFalse(self.ppa.enabled)
-
- script = self.getScript()
- script.main()
-
- # The htaccess and htpasswd files should not be generated.
- self.assertThat([htaccess, htpasswd], AllMatch(Not(FileExists())))
-
- def testSkippingOfDeletedPPAs(self):
- """Test that the htaccess for deleted PPAs are not touched."""
- subs, tokens = self.setupDummyTokens()
- htaccess, htpasswd = self.ensureNoFiles()
-
- # Setup subscription so that htaccess/htpasswd is pending generation.
- now = datetime.now(pytz.UTC)
- subs[0].date_expires = now + timedelta(minutes=3)
- self.assertEqual(subs[0].status, ArchiveSubscriberStatus.CURRENT)
-
- # Set the PPA as deleted.
- self.ppa.status = ArchiveStatus.DELETED
-
- script = self.getScript()
- script.main()
-
- # The htaccess and htpasswd files should not be generated.
- self.assertThat([htaccess, htpasswd], AllMatch(Not(FileExists())))
-
def testSendingCancellationEmail(self):
"""Test that when a token is deactivated, its user gets an email.
@@ -568,120 +335,3 @@ class TestPPAHtaccessTokenGeneration(TestCaseWithFactory):
script.sendCancellationEmail(token)
self.assertEmailQueueLength(0)
-
- def test_getTimeToSyncFrom(self):
- # Sync from 1s before previous start to catch anything made during the
- # last script run, and to handle NTP clock skew.
- now = datetime.now(pytz.UTC)
- script_start_time = now - timedelta(seconds=2)
- script_end_time = now
-
- getUtility(IScriptActivitySet).recordSuccess(
- self.SCRIPT_NAME, script_start_time, script_end_time)
- script = self.getScript()
- self.assertEqual(
- script_start_time - timedelta(seconds=1),
- script.getTimeToSyncFrom())
-
- def test_getNewPrivatePPAs_no_previous_run(self):
- # All private PPAs are returned if there was no previous run.
- # This happens even if they have no tokens.
-
- # Create a public PPA that should not be in the list.
- self.factory.makeArchive(private=False)
-
- script = self.getScript()
- self.assertContentEqual([self.ppa], script.getNewPrivatePPAs())
-
- def test_getNewPrivatePPAs_only_those_since_last_run(self):
- # Only private PPAs created since the last run are returned.
- # This happens even if they have no tokens.
- last_start = datetime.now(pytz.UTC) - timedelta(seconds=90)
- before_last_start = last_start - timedelta(seconds=30)
- removeSecurityProxy(self.ppa).date_created = before_last_start
-
- # Create a new PPA that should show up.
- new_ppa = self.factory.makeArchive(private=True)
-
- script = self.getScript()
- new_ppas = script.getNewPrivatePPAs(since=last_start)
- self.assertContentEqual([new_ppa], new_ppas)
-
- def test_getNewTokens_no_previous_run(self):
- """All valid tokens returned if there is no record of previous run."""
- tokens = self.setupDummyTokens()[1]
-
- # If there is no record of the script running previously, all
- # valid tokens are returned.
- script = self.getScript()
- self.assertContentEqual(tokens, script.getNewTokens())
-
- def test_getNewTokens_only_those_since_last_run(self):
- """Only tokens created since the last run are returned."""
- last_start = datetime.now(pytz.UTC) - timedelta(seconds=90)
- before_last_start = last_start - timedelta(seconds=30)
-
- tokens = self.setupDummyTokens()[1]
- # This token will not be included.
- removeSecurityProxy(tokens[0]).date_created = before_last_start
-
- script = self.getScript()
- new_tokens = script.getNewTokens(since=last_start)
- self.assertContentEqual(tokens[1:], new_tokens)
-
- def test_getNewTokens_only_active_tokens(self):
- """Only active tokens are returned."""
- tokens = self.setupDummyTokens()[1]
- tokens[0].deactivate()
-
- script = self.getScript()
- self.assertContentEqual(tokens[1:], script.getNewTokens())
-
- def test_getDeactivatedNamedTokens_no_previous_run(self):
- """All deactivated named tokens returned if there is no record
- of previous run."""
- last_start = datetime.now(pytz.UTC) - timedelta(seconds=90)
- before_last_start = last_start - timedelta(seconds=30)
-
- self.ppa.newNamedAuthToken("tokenname1")
- token2 = self.ppa.newNamedAuthToken("tokenname2")
- token2.deactivate()
- token3 = self.ppa.newNamedAuthToken("tokenname3")
- token3.date_deactivated = before_last_start
-
- script = self.getScript()
- self.assertContentEqual(
- [token2, token3], script.getDeactivatedNamedTokens())
-
- def test_getDeactivatedNamedTokens_only_those_since_last_run(self):
- """Only named tokens deactivated since last run are returned."""
- last_start = datetime.now(pytz.UTC) - timedelta(seconds=90)
- before_last_start = last_start - timedelta(seconds=30)
- tomorrow = datetime.now(pytz.UTC) + timedelta(days=1)
-
- self.ppa.newNamedAuthToken("tokenname1")
- token2 = self.ppa.newNamedAuthToken("tokenname2")
- token2.deactivate()
- token3 = self.ppa.newNamedAuthToken("tokenname3")
- token3.date_deactivated = before_last_start
- token4 = self.ppa.newNamedAuthToken("tokenname4")
- token4.date_deactivated = tomorrow
-
- script = self.getScript()
- self.assertContentEqual(
- [token2], script.getDeactivatedNamedTokens(last_start))
-
- def test_processes_PPAs_without_subscription(self):
- # A .htaccess file is written for Private PPAs even if they don't have
- # any subscriptions.
- htaccess, htpasswd = self.ensureNoFiles()
- transaction.commit()
-
- # Call the script and check that we have a .htaccess and a .htpasswd.
- return_code, stdout, stderr = self.runScript()
- self.assertEqual(
- return_code, 0, "Got a bad return code of %s\nOutput:\n%s" %
- (return_code, stderr))
- self.assertThat([htaccess, htpasswd], AllMatch(FileExists()))
- os.remove(htaccess)
- os.remove(htpasswd)
diff --git a/lib/lp/archivepublisher/tests/test_htaccess.py b/lib/lp/archivepublisher/tests/test_htaccess.py
deleted file mode 100644
index d435a2d..0000000
--- a/lib/lp/archivepublisher/tests/test_htaccess.py
+++ /dev/null
@@ -1,139 +0,0 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Test htaccess/htpasswd file generation. """
-
-from __future__ import absolute_import, print_function, unicode_literals
-
-import os
-import tempfile
-
-from zope.component import getUtility
-
-from lp.archivepublisher.htaccess import (
- htpasswd_credentials_for_archive,
- write_htaccess,
- write_htpasswd,
- )
-from lp.registry.interfaces.distribution import IDistributionSet
-from lp.registry.interfaces.person import IPersonSet
-from lp.services.features.testing import FeatureFixture
-from lp.soyuz.interfaces.archive import NAMED_AUTH_TOKEN_FEATURE_FLAG
-from lp.testing import TestCaseWithFactory
-from lp.testing.layers import LaunchpadZopelessLayer
-
-
-class TestHtpasswdGeneration(TestCaseWithFactory):
- """Test htpasswd generation."""
-
- layer = LaunchpadZopelessLayer
-
- def setUp(self):
- super(TestHtpasswdGeneration, self).setUp()
- self.owner = self.factory.makePerson(
- name="joe", displayname="Joe Smith")
- self.ppa = self.factory.makeArchive(
- owner=self.owner, name="myppa", private=True)
-
- # "Ubuntu" doesn't have a proper publisher config but Ubuntutest
- # does, so override the PPA's distro here.
- ubuntutest = getUtility(IDistributionSet)['ubuntutest']
- self.ppa.distribution = ubuntutest
-
- # Enable named auth tokens.
- self.useFixture(FeatureFixture({NAMED_AUTH_TOKEN_FEATURE_FLAG: "on"}))
-
- def test_write_htpasswd(self):
- """Test that writing the .htpasswd file works properly."""
- fd, filename = tempfile.mkstemp()
- os.close(fd)
-
- TEST_PASSWORD = "password"
- TEST_PASSWORD2 = "passwor2"
-
- # We provide a constant salt to the crypt function so that we
- # can test the encrypted result.
- SALT = "XX"
-
- user1 = ("user", TEST_PASSWORD, SALT)
- user2 = ("user2", TEST_PASSWORD2, SALT)
- list_of_users = [user1]
- list_of_users.append(user2)
-
- write_htpasswd(filename, list_of_users)
-
- expected_contents = [
- "user:XXq2wKiyI43A2",
- "user2:XXaQB8b5Gtwi.",
- ]
-
- file = open(filename, "r")
- file_contents = file.read().splitlines()
- file.close()
- os.remove(filename)
-
- self.assertEqual(expected_contents, file_contents)
-
- def test_write_htaccess(self):
- # write_access can write a correct htaccess file.
- fd, filename = tempfile.mkstemp()
- os.close(fd)
-
- write_htaccess(filename, "/some/distroot")
- self.assertTrue(
- os.path.isfile(filename),
- "%s is not present when it should be" % filename)
- self.addCleanup(os.remove, filename)
-
- contents = [
- "",
- "AuthType Basic",
- "AuthName \"Token Required\"",
- "AuthUserFile /some/distroot/.htpasswd",
- "Require valid-user",
- ]
-
- file = open(filename, "r")
- file_contents = file.read().splitlines()
- file.close()
-
- self.assertEqual(contents, file_contents)
-
- def test_credentials_for_archive_empty(self):
- # If there are no ArchiveAuthTokens for an archive just
- # the buildd secret is returned.
- self.ppa.buildd_secret = "sekr1t"
- self.assertEqual(
- [("buildd", "sekr1t", "bu")],
- list(htpasswd_credentials_for_archive(self.ppa)))
-
- def test_credentials_for_archive(self):
- # ArchiveAuthTokens for an archive are returned by
- # credentials_for_archive.
- self.ppa.buildd_secret = "geheim"
- name12 = getUtility(IPersonSet).getByName("name12")
- name16 = getUtility(IPersonSet).getByName("name16")
- hyphenated = self.factory.makePerson(name="a-b-c")
- self.ppa.newSubscription(name12, self.ppa.owner)
- self.ppa.newSubscription(name16, self.ppa.owner)
- self.ppa.newSubscription(hyphenated, self.ppa.owner)
- first_created_token = self.ppa.newAuthToken(name16)
- second_created_token = self.ppa.newAuthToken(name12)
- third_created_token = self.ppa.newAuthToken(hyphenated)
- named_token_20 = self.ppa.newNamedAuthToken("name20", as_dict=False)
- named_token_14 = self.ppa.newNamedAuthToken("name14", as_dict=False)
- named_token_99 = self.ppa.newNamedAuthToken("name99", as_dict=False)
- named_token_99.deactivate()
-
- expected_credentials = [
- ("buildd", "geheim", "bu"),
- ("+name14", named_token_14.token, "bm"),
- ("+name20", named_token_20.token, "bm"),
- ("a-b-c", third_created_token.token, "YS"),
- ("name12", second_created_token.token, "bm"),
- ("name16", first_created_token.token, "bm"),
- ]
- credentials = list(htpasswd_credentials_for_archive(self.ppa))
-
- # Use assertEqual instead of assertContentEqual to verify order.
- self.assertEqual(expected_credentials, credentials)
diff --git a/lib/lp/archivepublisher/tests/test_publisher.py b/lib/lp/archivepublisher/tests/test_publisher.py
index 67ec904..0c8ff87 100644
--- a/lib/lp/archivepublisher/tests/test_publisher.py
+++ b/lib/lp/archivepublisher/tests/test_publisher.py
@@ -12,7 +12,6 @@ from collections import (
defaultdict,
OrderedDict,
)
-import crypt
from datetime import (
datetime,
timedelta,
@@ -2328,44 +2327,6 @@ class TestPublisher(TestPublisherBase):
hoary_pub.requestDeletion(self.ubuntutest.owner)
self._assertPublishesSeriesAlias(publisher, "breezy-autotest")
- def testHtaccessForPrivatePPA(self):
- # A htaccess file is created for new private PPA's.
-
- ppa = self.factory.makeArchive(
- distribution=self.ubuntutest, private=True)
- ppa.buildd_secret = "geheim"
-
- # Set up the publisher for it and publish its repository.
- # setupArchiveDirs is what actually configures the htaccess file.
- getPublisher(ppa, [], self.logger).setupArchiveDirs()
- pubconf = getPubConfig(ppa)
- htaccess_path = os.path.join(pubconf.archiveroot, ".htaccess")
- self.assertTrue(os.path.exists(htaccess_path))
- with open(htaccess_path, 'r') as htaccess_f:
- self.assertEqual(dedent("""
- AuthType Basic
- AuthName "Token Required"
- AuthUserFile %s/.htpasswd
- Require valid-user
- """) % pubconf.archiveroot,
- htaccess_f.read())
-
- htpasswd_path = os.path.join(pubconf.archiveroot, ".htpasswd")
-
- # Read it back in.
- with open(htpasswd_path, "r") as htpasswd_f:
- file_contents = htpasswd_f.readlines()
-
- self.assertEqual(1, len(file_contents))
-
- # The first line should be the buildd_secret.
- [user, password] = file_contents[0].strip().split(":", 1)
- self.assertEqual("buildd", user)
- # We can re-encrypt the buildd_secret and it should match the
- # one in the .htpasswd file.
- encrypted_secret = crypt.crypt(ppa.buildd_secret, password)
- self.assertEqual(encrypted_secret, password)
-
def testWriteSuiteI18n(self):
"""Test i18n/Index writing."""
publisher = Publisher(
diff --git a/lib/lp/registry/scripts/closeaccount.py b/lib/lp/registry/scripts/closeaccount.py
index 27b2eb1..b4e505b 100644
--- a/lib/lp/registry/scripts/closeaccount.py
+++ b/lib/lp/registry/scripts/closeaccount.py
@@ -362,12 +362,9 @@ def close_account(username, log):
# the placeholder person row.
skip.add(('sprintattendance', 'attendee'))
- # generate_ppa_htaccess currently relies on seeing active
- # ArchiveAuthToken rows so that it knows which ones to remove from
- # .htpasswd files on disk in response to the cancellation of the
- # corresponding ArchiveSubscriber rows; but even once PPA authorisation
- # is handled dynamically, we probably still want to have the per-person
- # audit trail here.
+ # PPA authorization is now handled dynamically and checks the
+ # subscriber's account status, so this isn't strictly necessary, but
+ # it's still nice to have the per-person audit trail.
archive_subscriber_ids = set(store.find(
ArchiveSubscriber.id,
ArchiveSubscriber.subscriber_id == person.id,
diff --git a/lib/lp/services/scripts/base.py b/lib/lp/services/scripts/base.py
index bb4490b..66ebdbf 100644
--- a/lib/lp/services/scripts/base.py
+++ b/lib/lp/services/scripts/base.py
@@ -406,10 +406,6 @@ class LaunchpadCronScript(LaunchpadScript):
oops_hdlr = OopsHandler(self.name, logger=self.logger)
logging.getLogger().addHandler(oops_hdlr)
- def get_last_activity(self):
- """Return the last activity, if any."""
- return getUtility(IScriptActivitySet).getLastActivity(self.name)
-
@log_unhandled_exception_and_exit
def record_activity(self, date_started, date_completed):
"""Record the successful completion of the script."""