launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #06604
[Merge] lp:~abentley/launchpad/upgrade-all-2 into lp:launchpad
Aaron Bentley has proposed merging lp:~abentley/launchpad/upgrade-all-2 into lp:launchpad with lp:~abentley/launchpad/upgrade-all as a prerequisite.
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
Related bugs:
Bug #922741 in Launchpad itself: "AttributeError: 'BzrBranch7' object has no attribute 'get_config_stack'"
https://bugs.launchpad.net/launchpad/+bug/922741
For more details, see:
https://code.launchpad.net/~abentley/launchpad/upgrade-all-2/+merge/96247
= Summary =
ec2 scripts are not proper Launchpad scripts because they depend on pqm-submit, which Launchpad does not provide.
== Proposed fix ==
Add ec2 test/land scripts to lp-dev-tools and remove from Launchpad tree
== Pre-implementation notes ==
Discussed with deryck
== Implementation details ==
None
== Tests ==
None
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/code/bzr.py
lib/lp/codehosting/tests/test_upgrade.py
lib/lp/codehosting/vfs/branchfs.py
scripts/upgrade_all_branches.py
lib/lp/codehosting/bzrutils.py
lib/lp/codehosting/scripts/tests/test_upgrade_all_branches.py
lib/lp/codehosting/upgrade.py
lib/lp/testing/__init__.py
lib/lp/codehosting/vfs/tests/test_branchfs.py
lib/lp_sitecustomize.py
lib/lp/services/config/__init__.py
--
https://code.launchpad.net/~abentley/launchpad/upgrade-all-2/+merge/96247
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/upgrade-all-2 into lp:launchpad.
=== removed file 'lib/devscripts/autoland.py'
--- lib/devscripts/autoland.py 2012-02-09 13:08:47 +0000
+++ lib/devscripts/autoland.py 1970-01-01 00:00:00 +0000
@@ -1,351 +0,0 @@
-"""Land an approved merge proposal."""
-
-from bzrlib.errors import BzrCommandError
-from launchpadlib.launchpad import Launchpad
-from launchpadlib.uris import (
- DEV_SERVICE_ROOT,
- EDGE_SERVICE_ROOT,
- LPNET_SERVICE_ROOT,
- STAGING_SERVICE_ROOT,
- )
-from lazr.uri import URI
-
-
-class MissingReviewError(Exception):
- """Raised when we try to get a review message without enough reviewers."""
-
-
-class MissingBugsError(Exception):
- """Merge proposal has no linked bugs and no [no-qa] tag."""
-
-
-class MissingBugsIncrementalError(Exception):
- """Merge proposal has the [incr] tag but no linked bugs."""
-
-
-class LaunchpadBranchLander:
-
- name = 'launchpad-branch-lander'
-
- def __init__(self, launchpad):
- self._launchpad = launchpad
-
- @classmethod
- def load(cls, service_root='production'):
- # XXX: JonathanLange 2009-09-24: No unit tests.
- # XXX: JonathanLange 2009-09-24 bug=435813: If cached data invalid,
- # there's no easy way to delete it and try again.
- launchpad = Launchpad.login_with(cls.name, service_root)
- return cls(launchpad)
-
- def load_merge_proposal(self, mp_url):
- """Get the merge proposal object for the 'mp_url'."""
- # XXX: JonathanLange 2009-09-24: No unit tests.
- web_mp_uri = URI(mp_url)
- api_mp_uri = self._launchpad._root_uri.append(
- web_mp_uri.path.lstrip('/'))
- return MergeProposal(self._launchpad.load(str(api_mp_uri)))
-
- def get_lp_branch(self, branch):
- """Get the launchpadlib branch based on a bzr branch."""
- # First try the public branch.
- branch_url = branch.get_public_branch()
- if branch_url:
- lp_branch = self._launchpad.branches.getByUrl(
- url=branch_url)
- if lp_branch is not None:
- return lp_branch
- # If that didn't work try the push location.
- branch_url = branch.get_push_location()
- if branch_url:
- lp_branch = self._launchpad.branches.getByUrl(
- url=branch_url)
- if lp_branch is not None:
- return lp_branch
- raise BzrCommandError(
- "No public branch could be found. Please re-run and specify "
- "the URL for the merge proposal.")
-
- def get_merge_proposal_from_branch(self, branch):
- """Get the merge proposal from the branch."""
-
- lp_branch = self.get_lp_branch(branch)
- proposals = [
- mp for mp in lp_branch.landing_targets
- if mp.queue_status in ('Needs review', 'Approved')]
- if len(proposals) == 0:
- raise BzrCommandError(
- "The public branch has no open source merge proposals. "
- "You must have a merge proposal before attempting to "
- "land the branch.")
- elif len(proposals) > 1:
- raise BzrCommandError(
- "The public branch has multiple open source merge "
- "proposals. You must provide the URL to the one you wish "
- "to use.")
- return MergeProposal(proposals[0])
-
-
-class MergeProposal:
- """Wrapper around launchpadlib `IBranchMergeProposal` for landing."""
-
- def __init__(self, mp):
- """Construct a merge proposal.
-
- :param mp: A launchpadlib `IBranchMergeProposal`.
- """
- self._mp = mp
- self._launchpad = mp._root
-
- @property
- def source_branch(self):
- """The push URL of the source branch."""
- return str(self._get_push_url(self._mp.source_branch))
-
- @property
- def target_branch(self):
- """The push URL of the target branch."""
- return str(self._get_push_url(self._mp.target_branch))
-
- @property
- def commit_message(self):
- """The commit message specified on the merge proposal."""
- return self._mp.commit_message
-
- @property
- def is_approved(self):
- """Is this merge proposal approved for landing."""
- return self._mp.queue_status == 'Approved'
-
- def get_stakeholder_emails(self):
- """Return a collection of people who should know about branch landing.
-
- Used to determine who to email with the ec2 test results.
-
- :return: A set of `IPerson`s.
- """
- # XXX: JonathanLange 2009-09-24: No unit tests.
- emails = set(
- map(get_email,
- [self._mp.source_branch.owner, self._launchpad.me]))
- if None in emails:
- emails.remove(None)
- return emails
-
- def get_reviews(self):
- """Return a dictionary of all Approved reviews.
-
- Used to determine who has actually approved a branch for landing. The
- key of the dictionary is the type of review, and the value is the list
- of people who have voted Approve with that type.
-
- Common types include 'code', 'db', 'ui' and of course `None`.
- """
- reviews = {}
- for vote in self._mp.votes:
- comment = vote.comment
- if comment is None or comment.vote != "Approve":
- continue
- reviewers = reviews.setdefault(vote.review_type, [])
- reviewers.append(vote.reviewer)
- if self.is_approved and not reviews:
- reviews[None] = [self._mp.reviewer]
- return reviews
-
- def get_bugs(self):
- """Return a collection of bugs linked to the source branch."""
- return self._mp.source_branch.linked_bugs
-
- def _get_push_url(self, branch):
- """Return the push URL for 'branch'.
-
- This function is a work-around for Launchpad's lack of exposing the
- branch's push URL.
-
- :param branch: A launchpadlib `IBranch`.
- """
- # XXX: JonathanLange 2009-09-24: No unit tests.
- host = get_bazaar_host(str(self._launchpad._root_uri))
- # XXX: JonathanLange 2009-09-24 bug=435790: lazr.uri allows a path
- # without a leading '/' and then doesn't insert a '/' in the final
- # URL. Do it ourselves.
- return URI(scheme='bzr+ssh', host=host, path='/' + branch.unique_name)
-
- def build_commit_message(self, commit_text, testfix=False, no_qa=False,
- incremental=False, rollback=None):
- """Get the Launchpad-style commit message for a merge proposal."""
- reviews = self.get_reviews()
- bugs = self.get_bugs()
-
- tags = [
- get_testfix_clause(testfix),
- get_reviewer_clause(reviews),
- get_bugs_clause(bugs),
- get_qa_clause(bugs, no_qa,
- incremental, rollback=rollback),
- ]
-
- # Make sure we don't add duplicated tags to commit_text.
- commit_tags = tags[:]
- for tag in tags:
- if tag in commit_text:
- commit_tags.remove(tag)
-
- if commit_tags:
- return '%s %s' % (''.join(commit_tags), commit_text)
- else:
- return commit_text
-
- def set_commit_message(self, commit_message):
- """Set the Launchpad-style commit message for a merge proposal."""
- self._mp.commit_message = commit_message
- self._mp.lp_save()
-
-
-def get_testfix_clause(testfix=False):
- """Get the testfix clause."""
- if testfix:
- testfix_clause = '[testfix]'
- else:
- testfix_clause = ''
- return testfix_clause
-
-
-def get_qa_clause(bugs, no_qa=False, incremental=False, rollback=None):
- """Check the no-qa and incremental options, getting the qa clause.
-
- The qa clause will always be or no-qa, or incremental, or no-qa and
- incremental, or a revno for the rollback clause, or no tags.
-
- See https://dev.launchpad.net/QAProcessContinuousRollouts for detailed
- explanation of each clause.
- """
- qa_clause = ""
-
- if not bugs and not no_qa and not incremental and not rollback:
- raise MissingBugsError
-
- if incremental and not bugs:
- raise MissingBugsIncrementalError
-
- if no_qa and incremental:
- qa_clause = '[no-qa][incr]'
- elif incremental:
- qa_clause = '[incr]'
- elif no_qa:
- qa_clause = '[no-qa]'
- elif rollback:
- qa_clause = '[rollback=%d]' % rollback
- else:
- qa_clause = ''
-
- return qa_clause
-
-
-def get_email(person):
- """Get the preferred email address for 'person'."""
- email_object = person.preferred_email_address
- if email_object is None:
- return None # A team most likely.
- return email_object.email
-
-
-def get_bugs_clause(bugs):
- """Return the bugs clause of a commit message.
-
- :param bugs: A collection of `IBug` objects.
- :return: A string of the form "[bug=A,B,C]".
- """
- if not bugs:
- return ''
- bug_ids = []
- for bug in bugs:
- for task in bug.bug_tasks:
- if (task.bug_target_name == 'launchpad'
- and task.status not in ['Fix Committed', 'Fix Released']):
- bug_ids.append(str(bug.id))
- break
- if not bug_ids:
- return ''
- return '[bug=%s]' % ','.join(bug_ids)
-
-
-def get_reviewer_handle(reviewer):
- """Get the handle for 'reviewer'.
-
- The handles of reviewers are included in the commit message for Launchpad
- changes. Historically, these handles have been the IRC nicks. Thus, if
- 'reviewer' has an IRC nickname for Freenode, we use that. Otherwise we use
- their Launchpad username.
-
- :param reviewer: A launchpadlib `IPerson` object.
- :return: unicode text.
- """
- irc_handles = reviewer.irc_nicknames
- for handle in irc_handles:
- if handle.network == 'irc.freenode.net':
- return handle.nickname
- return reviewer.name
-
-
-def _comma_separated_names(things):
- """Return a string of comma-separated names of 'things'.
-
- The list is sorted before being joined.
- """
- return ','.join(sorted(thing.name for thing in things))
-
-
-def get_reviewer_clause(reviewers):
- """Get the reviewer section of a commit message, given the reviewers.
-
- :param reviewers: A dict mapping review types to lists of reviewers, as
- returned by 'get_reviews'.
- :return: A string like u'[r=foo,bar][ui=plop]'.
- """
- # If no review type is specified it is assumed to be a code review.
- code_reviewers = reviewers.get(None, [])
- ui_reviewers = []
- rc_reviewers = []
- for review_type, reviewer in reviewers.items():
- if review_type is None:
- continue
- if review_type == '':
- code_reviewers.extend(reviewer)
- if 'code' in review_type or 'db' in review_type:
- code_reviewers.extend(reviewer)
- if 'ui' in review_type:
- ui_reviewers.extend(reviewer)
- if 'release-critical' in review_type:
- rc_reviewers.extend(reviewer)
- if not code_reviewers:
- raise MissingReviewError("Need approved votes in order to land.")
- if ui_reviewers:
- ui_clause = '[ui=%s]' % _comma_separated_names(ui_reviewers)
- else:
- ui_clause = ''
- if rc_reviewers:
- rc_clause = (
- '[release-critical=%s]' % _comma_separated_names(rc_reviewers))
- else:
- rc_clause = ''
- return '%s[r=%s]%s' % (
- rc_clause, _comma_separated_names(code_reviewers), ui_clause)
-
-
-def get_bazaar_host(api_root):
- """Get the Bazaar service for the given API root."""
- # XXX: JonathanLange 2009-09-24 bug=435803: This is only needed because
- # Launchpad doesn't expose the push URL for branches.
- if api_root.startswith(EDGE_SERVICE_ROOT):
- return 'bazaar.launchpad.net'
- elif api_root.startswith(DEV_SERVICE_ROOT):
- return 'bazaar.launchpad.dev'
- elif api_root.startswith(STAGING_SERVICE_ROOT):
- return 'bazaar.staging.launchpad.net'
- elif api_root.startswith(LPNET_SERVICE_ROOT):
- return 'bazaar.launchpad.net'
- else:
- raise ValueError(
- 'Cannot determine Bazaar host. "%s" not a recognized Launchpad '
- 'API root.' % (api_root,))
=== removed directory 'lib/devscripts/ec2test'
=== removed file 'lib/devscripts/ec2test/__init__.py'
--- lib/devscripts/ec2test/__init__.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/__init__.py 1970-01-01 00:00:00 +0000
@@ -1,27 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Run the Launchpad tests in Amazon's Elastic Compute Cloud (EC2)."""
-
-__metaclass__ = type
-
-__all__ = []
-
-from bzrlib.plugin import load_plugins
-
-
-load_plugins()
-import paramiko
-
-#############################################################################
-# Try to guide users past support problems we've encountered before
-if not paramiko.__version__.startswith('1.7.4'):
- raise RuntimeError('Your version of paramiko (%s) is not supported. '
- 'Please use 1.7.4.' % (paramiko.__version__,))
-# maybe add similar check for bzrlib?
-# End
-#############################################################################
-
-import warnings
-warnings.filterwarnings(
- "ignore", category=DeprecationWarning, module="boto")
=== removed file 'lib/devscripts/ec2test/account.py'
--- lib/devscripts/ec2test/account.py 2012-01-25 15:27:01 +0000
+++ lib/devscripts/ec2test/account.py 1970-01-01 00:00:00 +0000
@@ -1,230 +0,0 @@
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""A representation of an Amazon Web Services account."""
-
-__metaclass__ = type
-__all__ = [
- 'EC2Account',
- 'VALID_AMI_OWNERS',
- ]
-
-from collections import defaultdict
-import cStringIO
-from datetime import datetime
-from operator import itemgetter
-import re
-import sys
-import urllib
-
-from boto.exception import EC2ResponseError
-import paramiko
-
-from devscripts.ec2test.session import EC2SessionName
-
-
-VALID_AMI_OWNERS = {
- # Amazon account number: name/nickname (only for logging).
- '255383312499': 'gary',
- '559320013529': 'flacoste',
- '038531743404': 'jelmer',
- '444667466231': 'allenap',
- '441991801793': 'gmb',
- '005470753809': 'bigjools',
- '967591634984': 'jtv',
- '507541322704': 'sinzui',
- '424228475252': 'wallyworld',
- '292290876294': 'stevenk',
- '259696152397': 'bac',
- '873925794399': 'wgrant',
- '957911449157': 'mbp',
- '340983519589': 'stub',
- # ...anyone else want in on the fun?
- }
-
-AUTH_FAILURE_MESSAGE = """\
-POSSIBLE CAUSES OF ERROR:
-- Did you sign up for EC2?
-- Did you put a credit card number in your AWS account?
-Please double-check before reporting a problem.
-"""
-
-
-def get_ip():
- """Uses AWS checkip to obtain this machine's IP address.
-
- Consults an external website to determine the public IP address of this
- machine.
-
- :return: This machine's net-visible IP address as a string.
- """
- return urllib.urlopen('http://checkip.amazonaws.com').read().strip()
-
-
-class EC2Account:
- """An EC2 account.
-
- You can use this to manage security groups, keys and images for an EC2
- account.
- """
-
- # Used to find pre-configured Amazon images.
- _image_match = re.compile(
- r'launchpad-ec2test(\d+)/image.manifest.xml$').match
-
- def __init__(self, name, connection):
- """Construct an EC2 instance.
-
- :param name: ???
- :param connection: An open boto ec2 connection.
- """
- self.name = name
- self.conn = connection
-
- def log(self, msg):
- """Log a message on stdout, flushing afterwards."""
- # XXX: JonathanLange 2009-05-31 bug=383076: Copied from EC2TestRunner.
- # Should change EC2Account to take a logger and use that instead of
- # writing to stdout.
- sys.stdout.write(msg)
- sys.stdout.flush()
-
- def _find_expired_artifacts(self, artifacts):
- now = datetime.utcnow()
- for artifact in artifacts:
- session_name = EC2SessionName(artifact.name)
- if (session_name in (self.name, self.name.base) or (
- session_name.base == self.name.base and
- session_name.expires is not None and
- session_name.expires < now)):
- yield artifact
-
- def acquire_security_group(self, demo_networks=None):
- """Get a security group with the appropriate configuration.
-
- "Appropriate" means configured to allow this machine to connect via
- SSH, HTTP and HTTPS.
-
- The name of the security group is the `EC2Account.name` attribute.
-
- :return: A boto security group.
- """
- if demo_networks is None:
- demo_networks = []
- # Create the security group.
- security_group = self.conn.create_security_group(
- self.name, 'Authorization to access the test runner instance.')
- # Authorize SSH and HTTP.
- ip = get_ip()
- security_group.authorize('tcp', 22, 22, '%s/32' % ip)
- security_group.authorize('tcp', 80, 80, '%s/32' % ip)
- security_group.authorize('tcp', 443, 443, '%s/32' % ip)
- for network in demo_networks:
- # Add missing netmask info for single ips.
- if '/' not in network:
- network += '/32'
- security_group.authorize('tcp', 80, 80, network)
- security_group.authorize('tcp', 443, 443, network)
- return security_group
-
- def delete_previous_security_groups(self):
- """Delete previously used security groups, if found."""
- expired_groups = self._find_expired_artifacts(
- self.conn.get_all_security_groups())
- for group in expired_groups:
- try:
- group.delete()
- except EC2ResponseError, e:
- if e.code != 'InvalidGroup.InUse':
- raise
- self.log('Cannot delete; security group '
- '%r in use.\n' % group.name)
- else:
- self.log('Deleted security group %r.\n' % group.name)
-
- def acquire_private_key(self):
- """Create & return a new key pair for the test runner."""
- key_pair = self.conn.create_key_pair(self.name)
- return paramiko.RSAKey.from_private_key(
- cStringIO.StringIO(key_pair.material.encode('ascii')))
-
- def delete_previous_key_pairs(self):
- """Delete previously used keypairs, if found."""
- expired_key_pairs = self._find_expired_artifacts(
- self.conn.get_all_key_pairs())
- for key_pair in expired_key_pairs:
- try:
- key_pair.delete()
- except EC2ResponseError, e:
- if e.code != 'InvalidKeyPair.NotFound':
- if e.code == 'AuthFailure':
- # Inserted because of previous support issue.
- self.log(AUTH_FAILURE_MESSAGE)
- raise
- self.log('Cannot delete; key pair not '
- 'found %r\n' % key_pair.name)
- else:
- self.log('Deleted key pair %r.\n' % key_pair.name)
-
- def collect_garbage(self):
- """Remove any old keys and security groups."""
- self.delete_previous_security_groups()
- self.delete_previous_key_pairs()
-
- def find_images(self):
- # We are trying to find an image that has a location that matches a
- # regex (see definition of _image_match, above). Part of that regex is
- # expected to be an integer with the semantics of a revision number.
- # The image location with the highest revision number is the one that
- # should be chosen. Because AWS does not guarantee that two images
- # cannot share a location string, we need to make sure that the search
- # result for this image is unique, or throw an error because the
- # choice of image is ambiguous.
- results = defaultdict(list)
-
- # Find the images with the highest revision numbers and locations that
- # match the regex.
- images = self.conn.get_all_images(owners=tuple(VALID_AMI_OWNERS))
- for image in images:
- match = self._image_match(image.location)
- if match is not None:
- revision = int(match.group(1))
- results[revision].append(image)
-
- return sorted(results.iteritems(), key=itemgetter(0), reverse=True)
-
- def acquire_image(self, machine_id):
- """Get the image.
-
- If 'machine_id' is None, then return the image with location that
- matches `EC2Account._image_match` and has the highest revision number
- (where revision number is the 'NN' in 'launchpad-ec2testNN').
-
- Otherwise, just return the image with the given 'machine_id'.
-
- :raise ValueError: if there is more than one image with the same
- location string.
-
- :raise RuntimeError: if we cannot find a test-runner image.
-
- :return: A boto image.
- """
- if machine_id is not None:
- # This may raise an exception. The user specified a machine_id, so
- # they can deal with it.
- return self.conn.get_image(machine_id)
-
- images_by_revision = self.find_images()
- if len(images_by_revision) == 0:
- raise RuntimeError(
- "You don't have access to a test-runner image.\n"
- "Request access and try again.\n")
-
- revision, images = images_by_revision[0]
- if len(images) > 1:
- raise ValueError(
- 'More than one image of revision %d found: %r' % (
- revision, images))
-
- self.log('Using machine image version %d\n' % revision)
- return images[0]
=== removed file 'lib/devscripts/ec2test/builtins.py'
--- lib/devscripts/ec2test/builtins.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/builtins.py 1970-01-01 00:00:00 +0000
@@ -1,861 +0,0 @@
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""The command classes for the 'ec2' utility."""
-
-__metaclass__ = type
-__all__ = []
-
-from datetime import (
- datetime,
- timedelta,
- )
-import os
-import pdb
-import socket
-
-from bzrlib.bzrdir import BzrDir
-from bzrlib.commands import Command
-from bzrlib.errors import (
- BzrCommandError,
- ConnectionError,
- NoSuchFile,
- )
-from bzrlib.help import help_commands
-from bzrlib.option import (
- ListOption,
- Option,
- )
-from bzrlib.trace import is_verbose
-from bzrlib.transport import get_transport
-from pytz import UTC
-import simplejson
-
-from devscripts import get_launchpad_root
-from devscripts.ec2test.account import VALID_AMI_OWNERS
-from devscripts.ec2test.credentials import EC2Credentials
-from devscripts.ec2test.instance import (
- AVAILABLE_INSTANCE_TYPES,
- DEFAULT_INSTANCE_TYPE,
- DEFAULT_REGION,
- EC2Instance,
- )
-from devscripts.ec2test.session import EC2SessionName
-from devscripts.ec2test.testrunner import (
- EC2TestRunner,
- TRUNK_BRANCH,
- )
-
-# Options accepted by more than one command.
-
-# Branches is a complicated option that lets the user specify which branches
-# to use in the sourcecode directory. Most of the complexity is still in
-# EC2TestRunner.__init__, which probably isn't ideal.
-branch_option = ListOption(
- 'branch', type=str, short_name='b', argname='BRANCH',
- help=('Branches to include in this run in sourcecode. '
- 'If the argument is only the project name, the trunk will be '
- 'used (e.g., ``-b launchpadlib``). If you want to use a '
- 'specific branch, if it is on launchpad, you can usually '
- 'simply specify it instead (e.g., '
- '``-b lp:~username/launchpadlib/branchname``). If this does '
- 'not appear to work, or if the desired branch is not on '
- 'launchpad, specify the project name and then the branch '
- 'after an equals sign (e.g., '
- '``-b launchpadlib=lp:~username/launchpadlib/branchname``). '
- 'Branches for multiple projects may be specified with '
- 'multiple instances of this option. '
- 'You may also use this option to specify the branch of launchpad '
- 'into which your branch may be merged. This defaults to %s. '
- 'Because typically the important branches of launchpad are owned '
- 'by the launchpad-pqm user, you can shorten this to only the '
- 'branch name, if desired, and the launchpad-pqm user will be '
- 'assumed. For instance, if you specify '
- '``-b launchpad=db-devel`` then this is equivalent to '
- '``-b lp:~launchpad-pqm/launchpad/db-devel``, or the even longer'
- '``-b launchpad=lp:~launchpad-pqm/launchpad/db-devel``.'
- % (TRUNK_BRANCH,)))
-
-
-machine_id_option = Option(
- 'machine', short_name='m', type=str,
- help=('The AWS machine identifier (AMI) on which to base this run. '
- 'You should typically only have to supply this if you are '
- 'testing new AWS images. Defaults to trying to find the most '
- 'recent one with an approved owner.'))
-
-
-instance_type_option = Option(
- 'instance', short_name='i',
- type=str,
- param_name='instance_type',
- help=('The AWS instance type on which to base this run. '
- 'Available options are %r. Defaults to `%s`.' %
- (AVAILABLE_INSTANCE_TYPES, DEFAULT_INSTANCE_TYPE)))
-
-
-debug_option = Option(
- 'debug', short_name='d',
- help=('Drop to pdb trace as soon as possible.'))
-
-
-trunk_option = Option(
- 'trunk', short_name='t',
- help=('Run the trunk as the branch, rather than the branch of the '
- 'current working directory.'))
-
-
-include_download_cache_changes_option = Option(
- 'include-download-cache-changes', short_name='c',
- help=('Include any changes in the download cache (added or unknown) '
- 'in the download cache of the test run. Note that, if you have '
- 'any changes in your download cache, trying to submit to pqm '
- 'will always raise an error. Also note that, if you have any '
- 'changes in your download cache, you must explicitly choose to '
- 'include or ignore the changes.'))
-
-
-postmortem_option = Option(
- 'postmortem', short_name='p',
- help=('Drop to interactive prompt after the test and before shutting '
- 'down the instance for postmortem analysis of the EC2 instance '
- 'and/or of this script.'))
-
-
-attached_option = Option(
- 'attached',
- help=("Remain attached, i.e. do not go headless. Implied by --postmortem "
- "and --file."))
-
-
-region_option = Option(
- 'region',
- type=str,
- help=("Name of the AWS region in which to run the instance. "
- "Must be the same as the region holding the image file. "
- "For example, 'us-west-1'."))
-
-
-def filename_type(filename):
- """An option validator for filenames.
-
- :raise: an error if 'filename' is not a file we can write to.
- :return: 'filename' otherwise.
- """
- if filename is None:
- return filename
-
- check_file = filename
- if os.path.exists(check_file):
- if not os.path.isfile(check_file):
- raise BzrCommandError(
- 'file argument %s exists and is not a file' % (filename,))
- else:
- check_file = os.path.dirname(check_file)
- if (not os.path.exists(check_file) or
- not os.path.isdir(check_file)):
- raise BzrCommandError(
- 'file %s cannot be created.' % (filename,))
- if not os.access(check_file, os.W_OK):
- raise BzrCommandError(
- 'you do not have permission to write %s' % (filename,))
- return filename
-
-
-def set_trace_if(enable_debugger=False):
- """If `enable_debugger` is True, drop into the debugger."""
- if enable_debugger:
- pdb.set_trace()
-
-
-class EC2Command(Command):
- """Subclass of `Command` that customizes usage to say 'ec2' not 'bzr'.
-
- When https://bugs.edge.launchpad.net/bzr/+bug/431054 is fixed, we can
- delete this class, or at least make it less of a copy/paste/hack of the
- superclass.
- """
-
- def _usage(self):
- """Return single-line grammar for this command.
-
- Only describes arguments, not options.
- """
- s = 'ec2 ' + self.name() + ' '
- for aname in self.takes_args:
- aname = aname.upper()
- if aname[-1] in ['$', '+']:
- aname = aname[:-1] + '...'
- elif aname[-1] == '?':
- aname = '[' + aname[:-1] + ']'
- elif aname[-1] == '*':
- aname = '[' + aname[:-1] + '...]'
- s += aname + ' '
- s = s[:-1] # remove last space
- return s
-
-
-def _get_branches_and_test_branch(trunk, branch, test_branch):
- """Interpret the command line options to find which branch to test.
-
- :param trunk: The value of the --trunk option.
- :param branch: The value of the --branch options.
- :param test_branch: The value of the TEST_BRANCH argument.
- """
- if trunk:
- if test_branch is not None:
- raise BzrCommandError(
- "Cannot specify both a branch to test and --trunk")
- else:
- test_branch = TRUNK_BRANCH
- else:
- if test_branch is None:
- test_branch = '.'
- branches = [data.split('=', 1) for data in branch]
- return branches, test_branch
-
-
-DEFAULT_TEST_OPTIONS = '--subunit -vvv'
-
-
-class cmd_test(EC2Command):
- """Run the test suite in ec2."""
-
- takes_options = [
- branch_option,
- trunk_option,
- machine_id_option,
- instance_type_option,
- region_option,
- Option(
- 'file', short_name='f', type=filename_type,
- help=('Store abridged test results in FILE.')),
- ListOption(
- 'email', short_name='e', argname='EMAIL', type=str,
- help=('Email address to which results should be mailed. '
- 'Defaults to the email address from `bzr whoami`. May be '
- 'supplied multiple times. `bzr whoami` will be used as '
- 'the From: address.')),
- Option(
- 'noemail', short_name='n',
- help=('Do not try to email results.')),
- Option(
- 'test-options', short_name='o', type=str,
- help=('Test options to pass to the remote test runner. Defaults '
- "to ``-o '-vv'``. For instance, to run specific tests, "
- "you might use ``-o '-vvt my_test_pattern'``.")),
- Option(
- 'submit-pqm-message', short_name='s', type=str, argname="MSG",
- help=(
- 'A pqm message to submit if the test run is successful. If '
- 'provided, you will be asked for your GPG passphrase before '
- 'the test run begins.')),
- Option(
- 'pqm-public-location', type=str,
- help=('The public location for the pqm submit, if a pqm message '
- 'is provided (see --submit-pqm-message). If this is not '
- 'provided, for local branches, bzr configuration is '
- 'consulted; for remote branches, it is assumed that the '
- 'remote branch *is* a public branch.')),
- Option(
- 'pqm-submit-location', type=str,
- help=('The submit location for the pqm submit, if a pqm message '
- 'is provided (see --submit-pqm-message). If this option '
- 'is not provided, the script will look for an explicitly '
- 'specified launchpad branch using the -b/--branch option; '
- 'if that branch was specified and is owned by the '
- 'launchpad-pqm user on launchpad, it is used as the pqm '
- 'submit location. Otherwise, for local branches, bzr '
- 'configuration is consulted; for remote branches, it is '
- 'assumed that the submit branch is %s.'
- % (TRUNK_BRANCH,))),
- Option(
- 'pqm-email', type=str,
- help=(
- 'Specify the email address of the PQM you are submitting to. '
- 'If the branch is local, then the bzr configuration is '
- 'consulted; for remote branches "Launchpad PQM '
- '<launchpad@xxxxxxxxxxxxxxxxx>" is used by default.')),
- postmortem_option,
- attached_option,
- debug_option,
- Option(
- 'open-browser',
- help=('Open the results page in your default browser')),
- include_download_cache_changes_option,
- ]
-
- takes_args = ['test_branch?']
-
- def run(self, test_branch=None, branch=None, trunk=False, machine=None,
- instance_type=DEFAULT_INSTANCE_TYPE,
- file=None, email=None, test_options=DEFAULT_TEST_OPTIONS,
- noemail=False, submit_pqm_message=None, pqm_public_location=None,
- pqm_submit_location=None, pqm_email=None, postmortem=False,
- attached=False, debug=False, open_browser=False,
- region=None,
- include_download_cache_changes=False):
- set_trace_if(debug)
- if branch is None:
- branch = []
- branches, test_branch = _get_branches_and_test_branch(
- trunk, branch, test_branch)
- if (postmortem or file):
- attached = True
- if noemail:
- if email:
- raise BzrCommandError(
- 'May not supply both --no-email and an --email address')
- else:
- if email == []:
- email = True
-
- if not attached and not (email or submit_pqm_message):
- raise BzrCommandError(
- 'You have specified no way to get the results '
- 'of your headless test run.')
-
- if (test_options != DEFAULT_TEST_OPTIONS
- and submit_pqm_message is not None):
- raise BzrCommandError(
- "Submitting to PQM with non-default test options isn't "
- "supported")
-
- session_name = EC2SessionName.make(EC2TestRunner.name)
- instance = EC2Instance.make(session_name, instance_type, machine,
- region=region)
-
- runner = EC2TestRunner(
- test_branch, email=email, file=file,
- test_options=test_options, headless=(not attached),
- branches=branches, pqm_message=submit_pqm_message,
- pqm_public_location=pqm_public_location,
- pqm_submit_location=pqm_submit_location,
- open_browser=open_browser, pqm_email=pqm_email,
- include_download_cache_changes=include_download_cache_changes,
- instance=instance, launchpad_login=instance._launchpad_login,
- timeout=480)
-
- instance.set_up_and_run(postmortem, attached, runner.run_tests)
-
-
-class cmd_land(EC2Command):
- """Land a merge proposal on Launchpad."""
-
- takes_options = [
- debug_option,
- instance_type_option,
- region_option,
- machine_id_option,
- Option('dry-run', help="Just print the equivalent ec2 test command."),
- Option('print-commit', help="Print the full commit message."),
- Option(
- 'testfix',
- help="This is a testfix (tags commit with [testfix])."),
- Option(
- 'no-qa',
- help="Does not require QA (tags commit with [no-qa])."),
- Option(
- 'incremental',
- help="Incremental to other bug fix (tags commit with [incr])."),
- Option(
- 'rollback', type=int,
- help=(
- "Rollback given revision number. (tags commit with "
- "[rollback=revno]).")),
- Option(
- 'commit-text', short_name='s', type=str,
- help=(
- 'A description of the landing, not including reviewer '
- 'metadata etc.')),
- Option(
- 'force',
- help="Land the branch even if the proposal is not approved."),
- attached_option,
- ]
-
- takes_args = ['merge_proposal?']
-
- def _get_landing_command(self, source_url, target_url, commit_message,
- emails, attached):
- """Return the command that would need to be run to submit with ec2."""
- ec2_path = os.path.join(get_launchpad_root(), 'utilities', 'ec2')
- command = [ec2_path, 'test']
- if attached:
- command.extend(['--attached'])
- command.extend(['--email=%s' % email for email in emails])
- # 'ec2 test' has a bug where you cannot pass full URLs to branches to
- # the -b option. It has special logic for 'launchpad' branches, so we
- # piggy back on this to get 'devel' or 'db-devel'.
- target_branch_name = target_url.split('/')[-1]
- command.extend(
- ['-b', 'launchpad=%s' % (target_branch_name), '-s',
- commit_message, str(source_url)])
- return command
-
- def run(self, merge_proposal=None, machine=None,
- instance_type=DEFAULT_INSTANCE_TYPE, postmortem=False,
- debug=False, commit_text=None, dry_run=False, testfix=False,
- no_qa=False, incremental=False, rollback=None, print_commit=False,
- force=False, attached=False,
- region=DEFAULT_REGION,
- ):
- try:
- from devscripts.autoland import (
- LaunchpadBranchLander, MissingReviewError, MissingBugsError,
- MissingBugsIncrementalError)
- except ImportError:
- self.outf.write(
- "***************************************************\n\n"
- "Could not load the autoland module; please ensure\n"
- "that launchpadlib and lazr.uri are installed and\n"
- "found in sys.path/PYTHONPATH.\n\n"
- "Note that these should *not* be installed system-\n"
- "wide because this will break the rest of Launchpad.\n\n"
- "***************************************************\n")
- raise
- set_trace_if(debug)
- if print_commit and dry_run:
- raise BzrCommandError(
- "Cannot specify --print-commit and --dry-run.")
- lander = LaunchpadBranchLander.load()
-
- if merge_proposal is None:
- (tree, bzrbranch, relpath) = (
- BzrDir.open_containing_tree_or_branch('.'))
- mp = lander.get_merge_proposal_from_branch(bzrbranch)
- else:
- mp = lander.load_merge_proposal(merge_proposal)
- if not mp.is_approved:
- if force:
- print "Merge proposal is not approved, landing anyway."
- else:
- raise BzrCommandError(
- "Merge proposal is not approved. Get it approved, or use "
- "--force to land it without approval.")
- if commit_text is None:
- commit_text = mp.commit_message
- if commit_text is None:
- raise BzrCommandError(
- "Commit text not specified. Use --commit-text, or specify a "
- "message on the merge proposal.")
- if rollback and (no_qa or incremental):
- print (
- "--rollback option used. Ignoring --no-qa and --incremental.")
- try:
- commit_message = mp.build_commit_message(
- commit_text, testfix, no_qa, incremental, rollback=rollback)
- except MissingReviewError:
- raise BzrCommandError(
- "Cannot land branches that haven't got approved code "
- "reviews. Get an 'Approved' vote so we can fill in the "
- "[r=REVIEWER] section.")
- except MissingBugsError:
- raise BzrCommandError(
- "Branch doesn't have linked bugs and doesn't have no-qa "
- "option set. Use --no-qa, or link the related bugs to the "
- "branch.")
- except MissingBugsIncrementalError:
- raise BzrCommandError(
- "--incremental option requires bugs linked to the branch. "
- "Link the bugs or remove the --incremental option.")
-
- # Override the commit message in the MP with the commit message built
- # with the proper tags.
- try:
- mp.set_commit_message(commit_message)
- except Exception, e:
- raise BzrCommandError(
- "Unable to set the commit message in the merge proposal.\n"
- "Got: %s" % e)
-
- if print_commit:
- print commit_message
- return
-
- emails = mp.get_stakeholder_emails()
-
- target_branch_name = mp.target_branch.split('/')[-1]
- branches = [('launchpad', target_branch_name)]
-
- landing_command = self._get_landing_command(
- mp.source_branch, mp.target_branch, commit_message,
- emails, attached)
-
- if dry_run:
- print landing_command
- return
-
- session_name = EC2SessionName.make(EC2TestRunner.name)
- instance = EC2Instance.make(
- session_name, instance_type, machine, region=region)
-
- runner = EC2TestRunner(
- mp.source_branch, email=emails,
- headless=(not attached),
- branches=branches, pqm_message=commit_message,
- instance=instance,
- launchpad_login=instance._launchpad_login,
- test_options=DEFAULT_TEST_OPTIONS,
- timeout=480)
-
- instance.set_up_and_run(postmortem, attached, runner.run_tests)
-
-
-class cmd_demo(EC2Command):
- """Start a demo instance of Launchpad.
-
- See https://wiki.canonical.com/Launchpad/EC2Test/ForDemos
- """
-
- takes_options = [
- branch_option,
- trunk_option,
- machine_id_option,
- instance_type_option,
- postmortem_option,
- debug_option,
- include_download_cache_changes_option,
- region_option,
- ListOption(
- 'demo', type=str,
- help="Allow this netmask to connect to the instance."),
- ]
-
- takes_args = ['test_branch?']
-
- def run(self, test_branch=None, branch=None, trunk=False, machine=None,
- instance_type=DEFAULT_INSTANCE_TYPE, debug=False,
- include_download_cache_changes=False, demo=None):
- set_trace_if(debug)
- if branch is None:
- branch = []
- branches, test_branch = _get_branches_and_test_branch(
- trunk, branch, test_branch)
-
- session_name = EC2SessionName.make(EC2TestRunner.name)
- instance = EC2Instance.make(
- session_name, instance_type, machine, demo)
-
- runner = EC2TestRunner(
- test_branch, branches=branches,
- include_download_cache_changes=include_download_cache_changes,
- instance=instance, launchpad_login=instance._launchpad_login)
-
- demo_network_string = '\n'.join(
- ' ' + network for network in demo)
-
- # Wait until the user exits the postmortem session, then kill the
- # instance.
- postmortem = True
- shutdown = True
- instance.set_up_and_run(
- postmortem, shutdown, self.run_server, runner, instance,
- demo_network_string)
-
- def run_server(self, runner, instance, demo_network_string):
- runner.run_demo_server()
- ec2_ip = socket.gethostbyname(instance.hostname)
- print (
- "\n\n"
- "********************** DEMO *************************\n"
- "It may take 20 seconds for the demo server to start up."
- "\nTo demo to other users, you still need to open up\n"
- "network access to the ec2 instance from their IPs by\n"
- "entering command like this in the interactive python\n"
- "interpreter at the end of the setup. "
- "\n self.security_group.authorize("
- "'tcp', 443, 443, '10.0.0.5/32')\n\n"
- "These demo networks have already been granted access on "
- "port 80 and 443:\n" + demo_network_string +
- "\n\nYou also need to edit your /etc/hosts to point\n"
- "launchpad.dev at the ec2 instance's IP like this:\n"
- " " + ec2_ip + " launchpad.dev\n\n"
- "See "
- "<https://wiki.canonical.com/Launchpad/EC2Test/ForDemos>."
- "\n*****************************************************"
- "\n\n")
-
-
-class cmd_update_image(EC2Command):
- """Make a new AMI."""
-
- takes_options = [
- machine_id_option,
- instance_type_option,
- postmortem_option,
- debug_option,
- region_option,
- ListOption(
- 'extra-update-image-command', type=str,
- help=('Run this command (with an ssh agent) on the image before '
- 'running the default update steps. Can be passed more '
- 'than once, the commands will be run in the order '
- 'specified.')),
- Option(
- 'public',
- help=('Remove proprietary code from the sourcecode directory '
- 'before bundling.')),
- ]
-
- takes_args = ['ami_name']
-
- def run(self, ami_name, machine=None, instance_type='m1.large',
- debug=False, postmortem=False, extra_update_image_command=None,
- region=None,
- public=False):
- set_trace_if(debug)
-
- if extra_update_image_command is None:
- extra_update_image_command = []
-
- # These environment variables are passed through ssh connections to
- # fresh Ubuntu images and cause havoc if the locales they refer to are
- # not available. We kill them here to ease bootstrapping, then we
- # later modify the image to prevent sshd from accepting them.
- for variable in ['LANG', 'LC_ALL', 'LC_TIME']:
- os.environ.pop(variable, None)
-
- session_name = EC2SessionName.make(EC2TestRunner.name)
- instance = EC2Instance.make(
- session_name, instance_type, machine,
- region=region)
- instance.check_bundling_prerequisites(ami_name)
- instance.set_up_and_run(
- postmortem, True, self.update_image, instance,
- extra_update_image_command, ami_name, instance._credentials,
- public)
-
- def update_image(self, instance, extra_update_image_command, ami_name,
- credentials, public):
- """Bring the image up to date.
-
- The steps we take are:
-
- * run any commands specified with --extra-update-image-command
- * update sourcecode
- * update the launchpad branch to the tip of the trunk branch.
- * update the copy of the download-cache.
- * bundle the image
-
- :param instance: `EC2Instance` to operate on.
- :param extra_update_image_command: List of commands to run on the
- instance in addition to the usual ones.
- :param ami_name: The name to give the created AMI.
- :param credentials: An `EC2Credentials` object.
- :param public: If true, remove proprietary code from the sourcecode
- directory before bundling.
- """
- # Do NOT accept environment variables via ssh connections.
- user_connection = instance.connect()
- user_connection.perform('sudo apt-get -qqy update')
- user_connection.perform('sudo apt-get -qqy upgrade')
- user_connection.perform(
- 'sudo sed -i "s/^AcceptEnv/#AcceptEnv/" /etc/ssh/sshd_config')
- user_connection.perform(
- 'sudo kill -HUP $(< /var/run/sshd.pid)')
- # Reconnect to ensure that the environment is clean.
- user_connection.reconnect()
- user_connection.perform(
- 'bzr launchpad-login %s' % (instance._launchpad_login,))
- for cmd in extra_update_image_command:
- user_connection.run_with_ssh_agent(cmd)
- user_connection.run_with_ssh_agent(
- 'bzr pull -d /var/launchpad/test ' + TRUNK_BRANCH)
- user_connection.run_with_ssh_agent(
- 'bzr pull -d /var/launchpad/download-cache '
- 'lp:lp-source-dependencies')
- if public:
- update_sourcecode_options = ' --public-only'
- else:
- update_sourcecode_options = ''
- user_connection.run_with_ssh_agent(
- "/var/launchpad/test/utilities/update-sourcecode "
- "/var/launchpad/sourcecode" + update_sourcecode_options)
- user_connection.perform(
- 'rm -rf .ssh/known_hosts .bazaar .bzr.log')
- user_connection.close()
- instance.bundle(ami_name, credentials)
-
-
-class cmd_images(EC2Command):
- """Display all available images.
-
- The first in the list is the default image.
- """
-
- takes_options = [
- region_option,
- ]
-
- def run(self, region=None):
- session_name = EC2SessionName.make(EC2TestRunner.name)
- credentials = EC2Credentials.load_from_file(region_name=region)
- account = credentials.connect(session_name)
- format = "%5s %-12s %-12s %-12s %s\n"
- self.outf.write(
- format % ("Rev", "AMI", "Owner ID", "Owner", "Description"))
- for revision, images in account.find_images():
- for image in images:
- self.outf.write(format % (
- revision, image.id, image.ownerId,
- VALID_AMI_OWNERS.get(image.ownerId, "unknown"),
- image.description or ''))
-
-
-class cmd_kill(EC2Command):
- """Kill one or more running EC2 instances.
-
- You can get the instance id from 'ec2 list'.
- """
-
- takes_options = [
- region_option,
- ]
- takes_args = ['instance_id*']
-
- def run(self, instance_id_list, region=None):
- credentials = EC2Credentials.load_from_file(region_name=region)
- account = credentials.connect('ec2 kill')
- self.outf.write("killing %d instances: " % len(instance_id_list,))
- account.conn.terminate_instances(instance_id_list)
- self.outf.write("done\n")
-
-
-class cmd_list(EC2Command):
- """List all your current EC2 test runs.
-
- If an instance is publishing an 'info.json' file with 'description' and
- 'failed-yet' fields, this command will list that instance, whether it has
- failed the test run and how long it has been up for.
-
- [FAILED] means that the has been a failing test. [OK] means that the test
- run has had no failures yet, it's not a guarantee of a successful run.
- """
-
- aliases = ["ls"]
-
- takes_options = [
- region_option,
- Option('show-urls',
- help="Include more information about each instance"),
- Option('all', short_name='a',
- help="Show all instances, not just ones with ec2test data."),
- ]
-
- def iter_instances(self, account):
- """Iterate through all instances in 'account'."""
- for reservation in account.conn.get_all_instances():
- for instance in reservation.instances:
- yield instance
-
- def get_uptime(self, instance):
- """How long has 'instance' been running?"""
- expected_format = '%Y-%m-%dT%H:%M:%S.000Z'
- launch_time = datetime.strptime(instance.launch_time, expected_format)
- delta = (
- datetime.utcnow().replace(tzinfo=UTC)
- - launch_time.replace(tzinfo=UTC))
- return timedelta(delta.days, delta.seconds) # Round it.
-
- def get_http_url(self, instance):
- hostname = instance.public_dns_name
- if not hostname:
- return
- return 'http://%s/' % (hostname,)
-
- def get_ec2test_info(self, instance):
- """Load the ec2test-specific information published by 'instance'."""
- url = self.get_http_url(instance)
- if url is None:
- return
- try:
- json = get_transport(url).get_bytes('info.json')
- except (ConnectionError, NoSuchFile):
- # Probably not an ec2test instance, or not ready yet.
- return None
- return simplejson.loads(json)
-
- def format_instance(self, instance, data, verbose):
- """Format 'instance' for display.
-
- :param instance: The EC2 instance to display.
- :param data: Launchpad-specific data.
- :param verbose: Whether we want verbose output.
- """
- description = instance.id
- uptime = self.get_uptime(instance)
- if instance.state != 'running':
- current_status = instance.state
- else:
- if data is None:
- current_status = 'unknown '
- else:
- description = data['description']
- if data['failed-yet']:
- current_status = '[FAILED]'
- else:
- current_status = '[OK] '
- output = (
- '%-40s %-10s (up for %s) %10s'
- % (description, current_status, uptime, instance.id))
- if verbose:
- url = self.get_http_url(instance)
- if url is None:
- url = "No web service"
- output += '\n %s' % (url,)
- if instance.state_reason:
- output += (
- '\n transition reason: %s'
- % instance.state_reason.get('message', ''))
- return output
-
- def format_summary(self, by_state):
- return ', '.join(
- ': '.join((state, str(num)))
- for (state, num) in sorted(list(by_state.items())))
-
- def run(self, show_urls=False, all=False, region=None):
- session_name = EC2SessionName.make(EC2TestRunner.name)
- credentials = EC2Credentials.load_from_file(region_name=region)
- account = credentials.connect(session_name)
- instances = list(self.iter_instances(account))
- if len(instances) == 0:
- print "No instances running."
- return
-
- by_state = {}
- for instance in instances:
- by_state[instance.state] = by_state.get(instance.state, 0) + 1
- data = self.get_ec2test_info(instance)
- if data is None and not all:
- continue
- print self.format_instance(
- instance, data, verbose=(show_urls or is_verbose()))
- print 'Summary: %s' % (self.format_summary(by_state),)
-
-
-class cmd_help(EC2Command):
- """Show general help or help for a command."""
-
- aliases = ["?", "--help", "-?", "-h"]
- takes_args = ["topic?"]
-
- def run(self, topic=None):
- """
- Show help for the C{bzrlib.commands.Command} matching C{topic}.
-
- @param topic: Optionally, the name of the topic to show. Default is
- to show some basic usage information.
- """
- if topic is None:
- self.outf.write('Usage: ec2 <command> <options>\n\n')
- self.outf.write('Available commands:\n')
- help_commands(self.outf)
- else:
- command = self.controller._get_command(None, topic)
- if command is None:
- self.outf.write("%s is an unknown command.\n" % (topic,))
- text = command.get_help_text()
- if text:
- self.outf.write(text)
=== removed file 'lib/devscripts/ec2test/controller.py'
--- lib/devscripts/ec2test/controller.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/controller.py 1970-01-01 00:00:00 +0000
@@ -1,181 +0,0 @@
-# This file is incuded almost verbatim from commandant,
-# https://launchpad.net/commandant. The only changes are removing some code
-# we don't use that depends on other parts of commandant. When Launchpad is
-# on Python 2.5 we can include commandant as an egg.
-
-
-# Commandant is a framework for building command-oriented tools.
-# Copyright (C) 2009 Jamshed Kakar.
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""Infrastructure to run C{bzrlib.commands.Command}s and L{HelpTopic}s."""
-
-import os
-import sys
-
-from bzrlib.commands import (
- Command,
- run_bzr,
- )
-
-
-class CommandRegistry(object):
-
- def __init__(self):
- self._commands = {}
-
- def install_bzrlib_hooks(self):
- """
- Register this controller with C{Command.hooks} so that the controller
- can take advantage of Bazaar's command infrastructure.
-
- L{_list_commands} and L{_get_command} are registered as callbacks for
- the C{list_commands} and C{get_commands} hooks, respectively.
- """
- Command.hooks.install_named_hook(
- "list_commands", self._list_commands, "commandant commands")
- Command.hooks.install_named_hook(
- "get_command", self._get_command, "commandant commands")
-
- def _list_commands(self, names):
- """Hook to find C{bzrlib.commands.Command} names is called by C{bzrlib}.
-
- @param names: A set of C{bzrlib.commands.Command} names to update with
- names from this controller.
- """
- names.update(self._commands.iterkeys())
- return names
-
- def _get_command(self, command, name):
- """
- Hook to get the C{bzrlib.commands.Command} for C{name} is called by
- C{bzrlib}.
-
- @param command: A C{bzrlib.commands.Command}, or C{None}, to be
- returned if a command matching C{name} can't be found.
- @param name: The name of the C{bzrlib.commands.Command} to retrieve.
- @return: The C{bzrlib.commands.Command} from the index or C{command}
- if one isn't available for C{name}.
- """
- try:
- local_command = self._commands[name]()
- except KeyError:
- for cmd in self._commands.itervalues():
- if name in cmd.aliases:
- local_command = cmd()
- break
- else:
- return command
- local_command.controller = self
- return local_command
-
- def register_command(self, name, command_class):
- """Register a C{bzrlib.commands.Command} with this controller.
-
- @param name: The name to register the command with.
- @param command_class: A type object, typically a subclass of
- C{bzrlib.commands.Command} to use when the command is invoked.
- """
- self._commands[name] = command_class
-
- def load_module(self, module):
- """Load C{bzrlib.commands.Command}s and L{HelpTopic}s from C{module}.
-
- Objects found in the module with names that start with C{cmd_} are
- treated as C{bzrlib.commands.Command}s and objects with names that
- start with C{topic_} are treated as L{HelpTopic}s.
- """
- for name in module.__dict__:
- if name.startswith("cmd_"):
- sanitized_name = name[4:].replace("_", "-")
- self.register_command(sanitized_name, module.__dict__[name])
- elif name.startswith("topic_"):
- sanitized_name = name[6:].replace("_", "-")
- self.register_help_topic(sanitized_name, module.__dict__[name])
-
-
-class HelpTopicRegistry(object):
-
- def __init__(self):
- self._help_topics = {}
-
- def register_help_topic(self, name, help_topic_class):
- """Register a C{bzrlib.commands.Command} to this controller.
-
- @param name: The name to register the command with.
- @param command_class: A type object, typically a subclass of
- C{bzrlib.commands.Command} to use when the command is invoked.
- """
- self._help_topics[name] = help_topic_class
-
- def get_help_topic_names(self):
- """Get a C{set} of help topic names."""
- return set(self._help_topics.iterkeys())
-
- def get_help_topic(self, name):
- """
- Get the help topic matching C{name} or C{None} if a match isn't found.
- """
- try:
- help_topic = self._help_topics[name]()
- except KeyError:
- return None
- help_topic.controller = self
- return help_topic
-
-
-
-class CommandExecutionMixin(object):
-
- def run(self, argv):
- """Run the C{bzrlib.commands.Command} specified in C{argv}.
-
- @raise BzrCommandError: Raised if a matching command can't be found.
- """
- run_bzr(argv)
-
-
-
-def import_module(filename, file_path, package_path):
- """Import a module and make it a child of C{commandant_command}.
-
- The module source in C{filename} at C{file_path} is copied to a temporary
- directory, a Python package called C{commandant_command}.
-
- @param filename: The name of the module file.
- @param file_path: The path to the module file.
- @param package_path: The path for the new C{commandant_command} package.
- @return: The new module.
- """
- module_path = os.path.join(package_path, "commandant_command")
- if not os.path.exists(module_path):
- os.mkdir(module_path)
-
- init_path = os.path.join(module_path, "__init__.py")
- open(init_path, "w").close()
-
- source_code = open(file_path, "r").read()
- module_file_path = os.path.join(module_path, filename)
- module_file = open(module_file_path, "w")
- module_file.write(source_code)
- module_file.close()
-
- name = filename[:-3]
- sys.path.append(package_path)
- try:
- return __import__("commandant_command.%s" % (name,), fromlist=[name])
- finally:
- sys.path.pop()
=== removed file 'lib/devscripts/ec2test/credentials.py'
--- lib/devscripts/ec2test/credentials.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/credentials.py 1970-01-01 00:00:00 +0000
@@ -1,76 +0,0 @@
-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Support for reading Amazon Web Service credentials from '~/.ec2/aws_id'."""
-
-__metaclass__ = type
-__all__ = [
- 'CredentialsError',
- 'EC2Credentials',
- ]
-
-import os
-
-import boto
-import boto.ec2
-from bzrlib.errors import BzrCommandError
-
-from devscripts.ec2test import instance
-from devscripts.ec2test.account import EC2Account
-
-
-class CredentialsError(BzrCommandError):
- """Raised when AWS credentials could not be loaded."""
-
- _fmt = (
- "Please put your aws access key identifier and secret access "
- "key identifier in %(filename)s. (On two lines). %(extra)s")
-
- def __init__(self, filename, extra=None):
- super(CredentialsError, self).__init__(filename=filename, extra=extra)
-
-
-class EC2Credentials:
- """Credentials for logging in to EC2."""
-
- DEFAULT_CREDENTIALS_FILE = '~/.ec2/aws_id'
-
- def __init__(self, identifier, secret, region_name):
- self.identifier = identifier
- self.secret = secret
- self.region_name = region_name or instance.DEFAULT_REGION
-
- @classmethod
- def load_from_file(cls, filename=None, region_name=None):
- """Load the EC2 credentials from 'filename'."""
- if filename is None:
- filename = os.path.expanduser(cls.DEFAULT_CREDENTIALS_FILE)
- try:
- aws_file = open(filename, 'r')
- except (IOError, OSError), e:
- raise CredentialsError(filename, str(e))
- try:
- identifier = aws_file.readline().strip()
- secret = aws_file.readline().strip()
- finally:
- aws_file.close()
- return cls(identifier, secret, region_name)
-
- def connect(self, name):
- """Connect to EC2 with these credentials.
-
- :param name: Arbitrary local name for the object.
- :return: An `EC2Account` connected to EC2 with these credentials.
- """
- conn = boto.ec2.connect_to_region(
- self.region_name,
- aws_access_key_id=self.identifier,
- aws_secret_access_key=self.secret)
- return EC2Account(name, conn)
-
- def connect_s3(self):
- """Connect to S3 with these credentials.
-
- :return: A `boto.s3.connection.S3Connection` with these credentials.
- """
- return boto.connect_s3(self.identifier, self.secret)
=== removed file 'lib/devscripts/ec2test/entrypoint.py'
--- lib/devscripts/ec2test/entrypoint.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/entrypoint.py 1970-01-01 00:00:00 +0000
@@ -1,49 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""The entry point for the 'ec2' utility."""
-
-__metaclass__ = type
-__all__ = [
- 'main',
- ]
-
-import readline
-import rlcompleter
-import sys
-
-import bzrlib
-from bzrlib.errors import BzrCommandError
-
-from devscripts.ec2test import builtins
-from devscripts.ec2test.controller import (
- CommandExecutionMixin,
- CommandRegistry,
- )
-
-# Shut up pyflakes.
-rlcompleter
-
-readline.parse_and_bind('tab: complete')
-
-class EC2CommandController(CommandRegistry, CommandExecutionMixin):
- """The 'ec2' utility registers and executes commands."""
-
-
-def main():
- """The entry point for the 'ec2' script.
-
- We run the specified command, or give help if none was specified.
- """
- with bzrlib.initialize():
- controller = EC2CommandController()
- controller.install_bzrlib_hooks()
- controller.load_module(builtins)
-
- args = sys.argv[1:]
- if not args:
- args = ['help']
- try:
- controller.run(args)
- except BzrCommandError, e:
- sys.exit('ec2: ERROR: ' + str(e))
=== removed file 'lib/devscripts/ec2test/instance.py'
--- lib/devscripts/ec2test/instance.py 2012-02-07 14:53:38 +0000
+++ lib/devscripts/ec2test/instance.py 1970-01-01 00:00:00 +0000
@@ -1,742 +0,0 @@
-# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Code to represent a single machine instance in EC2."""
-
-__metaclass__ = type
-__all__ = [
- 'EC2Instance',
- ]
-
-import code
-from datetime import datetime
-import errno
-import glob
-import os
-import select
-import socket
-import subprocess
-import sys
-import time
-import traceback
-
-from bzrlib.errors import BzrCommandError
-import paramiko
-
-from devscripts.ec2test.session import EC2SessionName
-
-
-DEFAULT_INSTANCE_TYPE = 'm2.xlarge'
-DEFAULT_REGION = 'us-east-1'
-AVAILABLE_INSTANCE_TYPES = (
- 'm1.large', 'm1.xlarge', 'm2.xlarge', 'm2.2xlarge', 'm2.4xlarge',
- 'c1.xlarge', 'cc1.4xlarge', 'cc1.8xlarge')
-
-
-class AcceptAllPolicy:
- """We accept all unknown host key."""
-
- def missing_host_key(self, client, hostname, key):
- # Normally the console output is supposed to contain the Host key but
- # it doesn't seem to be the case here, so we trust that the host we
- # are connecting to is the correct one.
- pass
-
-
-def get_user_key():
- """Get a SSH key from the agent. Raise an error if no keys were found.
-
- This key will be used to let the user log in (as $USER) to the instance.
- """
- agent = paramiko.Agent()
- keys = agent.get_keys()
- if len(keys) == 0:
- raise BzrCommandError(
- 'You must have an ssh agent running with keys installed that '
- 'will allow the script to access Launchpad and get your '
- 'branch.\n')
-
- # XXX mars 2010-05-07 bug=577118
- # Popping the first key off of the stack can create problems if the person
- # has more than one key in their ssh-agent, but alas, we have no good way
- # to detect the right key to use. See bug 577118 for a workaround.
- return keys[0]
-
-
-# Commands to run to turn a blank image into one usable for the rest of the
-# ec2 functionality. They come in two parts, one set that need to be run as
-# root and another that should be run as the 'ec2test' user.
-# Note that the sources from http://us.ec2.archive.ubuntu.com/ubuntu/ are per
-# instructions described in http://is.gd/g1MIT . When we switch to
-# Eucalyptus, we can dump this.
-
-from_scratch_root = """
-# From 'help set':
-# -x Print commands and their arguments as they are executed.
-# -e Exit immediately if a command exits with a non-zero status.
-set -xe
-
-# They end up as just one stream; this avoids ordering problems.
-exec 2>&1
-
-sed -ie 's/main universe/main universe multiverse/' /etc/apt/sources.list
-
-. /etc/lsb-release
-
-mount -o remount,data=writeback,commit=3600,async,relatime /
-
-cat >> /etc/apt/sources.list << EOF
-deb http://ppa.launchpad.net/launchpad/ubuntu $DISTRIB_CODENAME main
-deb http://ppa.launchpad.net/bzr/ubuntu $DISTRIB_CODENAME main
-EOF
-
-export DEBIAN_FRONTEND=noninteractive
-
-# PPA keys
-apt-key adv --recv-keys --keyserver pool.sks-keyservers.net 2af499cb24ac5f65461405572d1ffb6c0a5174af # launchpad
-apt-key adv --recv-keys --keyserver pool.sks-keyservers.net ece2800bacf028b31ee3657cd702bf6b8c6c1efd # bzr
-
-aptitude update
-
-# Do this first so later things don't complain about locales:
-LANG=C aptitude -y install language-pack-en
-
-aptitude -y full-upgrade
-
-# This next part is cribbed from rocketfuel-setup
-dev_host() {
- sed -i \"s/^127.0.0.88.*$/&\ ${hostname}/\" /etc/hosts
-}
-
-echo 'Adding development hosts on local machine'
-echo '
-# Launchpad virtual domains. This should be on one line.
-127.0.0.88 launchpad.dev
-' >> /etc/hosts
-
-declare -a hostnames
-hostnames=$(cat <<EOF
- answers.launchpad.dev
- api.launchpad.dev
- bazaar-internal.launchpad.dev
- beta.launchpad.dev
- blueprints.launchpad.dev
- bugs.launchpad.dev
- code.launchpad.dev
- feeds.launchpad.dev
- id.launchpad.dev
- keyserver.launchpad.dev
- lists.launchpad.dev
- openid.launchpad.dev
- ppa.launchpad.dev
- private-ppa.launchpad.dev
- testopenid.dev
- translations.launchpad.dev
- xmlrpc-private.launchpad.dev
- xmlrpc.launchpad.dev
-EOF
- )
-
-for hostname in $hostnames; do
- dev_host;
-done
-
-echo '
-127.0.0.99 bazaar.launchpad.dev
-' >> /etc/hosts
-
-# If this is uncommented, postgresql 9.1 will be explicitly installed
-# before other versisons and end up on port 5432, overriding the version
-# of postgresql specified in launchpad-database-dependencies.
-## apt-get -y install postgresql-9.1 postgresql-9.1-debversion postgresql-client-9.1 postgresql-contrib-9.1 postgresql-plpython-9.1 postgresql-server-dev-9.1 postgresql-doc-9.1
-apt-get -y install launchpad-developer-dependencies apache2 apache2-mpm-worker
-
-# Create the ec2test user, give them passwordless sudo.
-adduser --gecos "" --disabled-password ec2test
-echo 'ec2test\tALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers
-
-mkdir /home/ec2test/.ssh
-cat > /home/ec2test/.ssh/config << EOF
-CheckHostIP no
-StrictHostKeyChecking no
-EOF
-
-mkdir /var/launchpad
-chown -R ec2test:ec2test /var/www /var/launchpad /home/ec2test/
-"""
-
-
-from_scratch_ec2test = """
-# From 'help set':
-# -x Print commands and their arguments as they are executed.
-# -e Exit immediately if a command exits with a non-zero status.
-set -xe
-
-# They end up as just one stream; this avoids ordering problems.
-exec 2>&1
-
-bzr launchpad-login %(launchpad-login)s
-bzr init-repo --2a /var/launchpad
-bzr branch lp:~launchpad-pqm/launchpad/devel /var/launchpad/test
-bzr branch --standalone lp:lp-source-dependencies /var/launchpad/download-cache
-mkdir /var/launchpad/sourcecode
-/var/launchpad/test/utilities/update-sourcecode /var/launchpad/sourcecode
-"""
-
-
-postmortem_banner = """\
-Postmortem Console. EC2 instance is not yet dead.
-It will shut down when you exit this prompt (CTRL-D)
-
-Tab-completion is enabled.
-EC2Instance is available as `instance`.
-Also try these:
- http://%(dns)s/current_test.log
- ssh -A ec2test@%(dns)s
-"""
-
-
-class EC2Instance:
- """A single EC2 instance."""
-
- @classmethod
- def make(cls, name, instance_type, machine_id, demo_networks=None,
- credentials=None, region=None):
- """Construct an `EC2Instance`.
-
- :param name: The name to use for the key pair and security group for
- the instance.
- :type name: `EC2SessionName`
- :param instance_type: One of the AVAILABLE_INSTANCE_TYPES.
- :param machine_id: The AMI to use, or None to do the usual regexp
- matching. If you put 'based-on:' before the AMI id, it is assumed
- that the id specifies a blank image that should be made into one
- suitable for the other ec2 functions (see `from_scratch_root` and
- `from_scratch_ec2test` above).
- :param demo_networks: A list of networks to add to the security group
- to allow access to the instance.
- :param credentials: An `EC2Credentials` object.
- :param region: A string region name eg 'us-east-1'.
- """
- # This import breaks in the test environment. Do it here so
- # that unit tests (which don't use this factory) can still
- # import EC2Instance.
- from bzrlib.plugins.launchpad.account import get_lp_login
-
- # XXX JeroenVermeulen 2009-11-27 bug=489073: EC2Credentials
- # imports boto, which isn't necessarily installed in our test
- # environment. Doing the import here so that unit tests (which
- # don't use this factory) can still import EC2Instance.
- from devscripts.ec2test.credentials import EC2Credentials
-
- assert isinstance(name, EC2SessionName)
-
- # We call this here so that it has a chance to complain before the
- # instance is started (which can take some time).
- user_key = get_user_key()
-
- if credentials is None:
- credentials = EC2Credentials.load_from_file(region_name=region)
-
- # Make the EC2 connection.
- account = credentials.connect(name)
-
- # We do this here because it (1) cleans things up and (2) verifies
- # that the account is correctly set up. Both of these are appropriate
- # for initialization.
- #
- # We always recreate the keypairs because there is no way to
- # programmatically retrieve the private key component, unless we
- # generate it.
- account.collect_garbage()
-
- if machine_id and machine_id.startswith('based-on:'):
- from_scratch = True
- machine_id = machine_id[len('based-on:'):]
- else:
- from_scratch = False
-
- # get the image
- image = account.acquire_image(machine_id)
-
- login = get_lp_login()
- if not login:
- raise BzrCommandError(
- 'you must have set your launchpad login in bzr.')
-
- instance = EC2Instance(
- name, image, instance_type, demo_networks, account,
- from_scratch, user_key, login, region)
- instance._credentials = credentials
- return instance
-
- def __init__(self, name, image, instance_type, demo_networks, account,
- from_scratch, user_key, launchpad_login, region):
- self._name = name
- self._image = image
- self._account = account
- self._instance_type = instance_type
- self._demo_networks = demo_networks
- self._boto_instance = None
- self._from_scratch = from_scratch
- self._user_key = user_key
- self._launchpad_login = launchpad_login
- self._region = region
-
- def log(self, msg):
- """Log a message on stdout, flushing afterwards."""
- # XXX: JonathanLange 2009-05-31 bug=383076: Should delete this and use
- # Python logging module instead.
- sys.stdout.write(msg)
- sys.stdout.flush()
-
- def start(self):
- """Start the instance."""
- if self._boto_instance is not None:
- self.log('Instance %s already started' % self._boto_instance.id)
- return
- start = time.time()
- self.private_key = self._account.acquire_private_key()
- self.security_group = self._account.acquire_security_group(
- demo_networks=self._demo_networks)
- reservation = self._image.run(
- key_name=self._name, security_groups=[self._name],
- instance_type=self._instance_type)
- self._boto_instance = reservation.instances[0]
- self.log('Instance %s starting..' % self._boto_instance.id)
- while self._boto_instance.state == 'pending':
- self.log('.')
- time.sleep(5)
- self._boto_instance.update()
- if self._boto_instance.state == 'running':
- self.log(' started on %s\n' % self.hostname)
- elapsed = time.time() - start
- self.log('Started in %d minutes %d seconds\n' %
- (elapsed // 60, elapsed % 60))
- self._output = self._boto_instance.get_console_output()
- self.log(self._output.output)
- self._ec2test_user_has_keys = False
- else:
- raise BzrCommandError(
- "failed to start: %s: %r\n" % (
- self._boto_instance.state,
- self._boto_instance.state_reason,
- ))
-
- def shutdown(self):
- """Shut down the instance."""
- if self._boto_instance is None:
- self.log('no instance created\n')
- return
- self._boto_instance.update()
- if self._boto_instance.state not in ('shutting-down', 'terminated'):
- self.log("terminating %s..." % self._boto_instance)
- self._boto_instance.terminate()
- self._boto_instance.update()
- self.log(" done\n")
- self.log('instance %s\n' % (self._boto_instance.state,))
-
- @property
- def hostname(self):
- if self._boto_instance is None:
- return None
- return self._boto_instance.public_dns_name
-
- def _connect(self, username):
- """Connect to the instance as `user`. """
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(AcceptAllPolicy())
- self.log('ssh connect to %s: ' % self.hostname)
- connect_args = {
- 'username': username,
- 'pkey': self.private_key,
- 'allow_agent': False,
- 'look_for_keys': False,
- }
- for count in range(20):
- caught_errors = (
- socket.error,
- paramiko.AuthenticationException,
- EOFError,
- )
- try:
- ssh.connect(self.hostname, **connect_args)
- except caught_errors as e:
- self.log('.')
- not_connected = [
- errno.ECONNREFUSED,
- errno.ETIMEDOUT,
- errno.EHOSTUNREACH,
- ]
- if getattr(e, 'errno', None) not in not_connected:
- self.log('ssh _connect: %r\n' % (e,))
- if count < 9:
- time.sleep(5)
- else:
- raise
- else:
- break
- self.log(' ok!\n')
- return EC2InstanceConnection(self, username, ssh)
-
- def _upload_local_key(self, conn, remote_filename):
- """Upload a key from the local user's agent to `remote_filename`.
-
- The key will be uploaded in a format suitable for
- ~/.ssh/authorized_keys.
- """
- authorized_keys_file = conn.sftp.open(remote_filename, 'w')
- authorized_keys_file.write(
- "%s %s\n" % (
- self._user_key.get_name(), self._user_key.get_base64()))
- authorized_keys_file.close()
-
- def _ensure_ec2test_user_has_keys(self, connection=None):
- """Make sure that we can connect over ssh as the 'ec2test' user.
-
- We add both the key that was used to start the instance (so
- _connect('ec2test') works and a key from the locally running ssh agent
- (so EC2InstanceConnection.run_with_ssh_agent works).
- """
- if not self._ec2test_user_has_keys:
- if connection is None:
- connection = self._connect('ubuntu')
- our_connection = True
- else:
- our_connection = False
- self._upload_local_key(connection, 'local_key')
- connection.perform(
- 'cat /home/ubuntu/.ssh/authorized_keys local_key '
- '| sudo tee /home/ec2test/.ssh/authorized_keys > /dev/null'
- '&& rm local_key')
- connection.perform('sudo chown -R ec2test:ec2test /home/ec2test/')
- connection.perform('sudo chmod 644 /home/ec2test/.ssh/*')
- if our_connection:
- connection.close()
- self.log(
- 'You can now use ssh -A ec2test@%s to '
- 'log in the instance.\n' % self.hostname)
- self._ec2test_user_has_keys = True
-
- def connect(self):
- """Connect to the instance as a user with passwordless sudo.
-
- This may involve first connecting as root and adding SSH keys to the
- user's account, and in the case of a from scratch image, it will do a
- lot of set up.
- """
- if self._from_scratch:
- ubuntu_connection = self._connect('ubuntu')
- self._upload_local_key(ubuntu_connection, 'local_key')
- ubuntu_connection.perform(
- 'cat local_key >> ~/.ssh/authorized_keys && rm local_key')
- ubuntu_connection.run_script(from_scratch_root, sudo=True)
- self._ensure_ec2test_user_has_keys(ubuntu_connection)
- ubuntu_connection.close()
- conn = self._connect('ec2test')
- conn.run_script(
- from_scratch_ec2test
- % {'launchpad-login': self._launchpad_login})
- self._from_scratch = False
- self.log('done running from_scratch setup\n')
- return conn
- self._ensure_ec2test_user_has_keys()
- return self._connect('ec2test')
-
- def _report_traceback(self):
- """Print traceback."""
- traceback.print_exc()
-
- def set_up_and_run(self, postmortem, shutdown, func, *args, **kw):
- """Start, run `func` and then maybe shut down.
-
- :param config: A dictionary specifying details of how the instance
- should be run:
- :param postmortem: If true, any exceptions will be caught and an
- interactive session run to allow debugging the problem.
- :param shutdown: If true, shut down the instance after `func` and
- postmortem (if any) are completed.
- :param func: A callable that will be called when the instance is
- running and a user account has been set up on it.
- :param args: Passed to `func`.
- :param kw: Passed to `func`.
- """
- # We ignore the value of the 'shutdown' argument and always shut down
- # unless `func` returns normally.
- really_shutdown = True
- retval = None
- try:
- self.start()
- try:
- retval = func(*args, **kw)
- except Exception:
- # When running in postmortem mode, it is really helpful to see
- # if there are any exceptions before it waits in the console
- # (in the finally block), and you can't figure out why it's
- # broken.
- self._report_traceback()
- else:
- really_shutdown = shutdown
- finally:
- try:
- if postmortem:
- console = code.InteractiveConsole(locals())
- console.interact(
- postmortem_banner % {'dns': self.hostname})
- print 'Postmortem console closed.'
- finally:
- if really_shutdown:
- self.shutdown()
- return retval
-
- def _copy_single_file(self, sftp, local_path, remote_dir):
- """Copy `local_path` to `remote_dir` on this instance.
-
- The name in the remote directory will be that of the local file.
-
- :param sftp: A paramiko SFTP object.
- :param local_path: The local path.
- :param remote_dir: The directory on the instance to copy into.
- """
- name = os.path.basename(local_path)
- remote_path = os.path.join(remote_dir, name)
- remote_file = sftp.open(remote_path, 'w')
- remote_file.write(open(local_path).read())
- remote_file.close()
- return remote_path
-
- def copy_key_and_certificate_to_image(self, sftp):
- """Copy the AWS private key and certificate to the image.
-
- :param sftp: A paramiko SFTP object.
- """
- remote_ec2_dir = '/mnt/ec2'
- remote_pk = self._copy_single_file(
- sftp, self.local_pk, remote_ec2_dir)
- remote_cert = self._copy_single_file(
- sftp, self.local_cert, remote_ec2_dir)
- return (remote_pk, remote_cert)
-
- def _check_single_glob_match(self, local_dir, pattern, file_kind):
- """Check that `pattern` matches one file in `local_dir` and return it.
-
- :param local_dir: The local directory to look in.
- :param pattern: The glob patten to match.
- :param file_kind: The sort of file we're looking for, to be used in
- error messages.
- """
- pattern = os.path.join(local_dir, pattern)
- matches = glob.glob(pattern)
- if len(matches) != 1:
- raise BzrCommandError(
- '%r must match a single %s file' % (pattern, file_kind))
- return matches[0]
-
- def check_bundling_prerequisites(self, name):
- """Check, as best we can, that all the files we need to bundle exist.
- """
- local_ec2_dir = os.path.expanduser('~/.ec2')
- if not os.path.exists(local_ec2_dir):
- raise BzrCommandError(
- "~/.ec2 must exist and contain aws_user, aws_id, a private "
- "key file and a certificate.")
- aws_user_file = os.path.expanduser('~/.ec2/aws_user')
- if not os.path.exists(aws_user_file):
- raise BzrCommandError(
- "~/.ec2/aws_user must exist and contain your numeric AWS id.")
- self.aws_user = open(aws_user_file).read().strip()
- self.local_cert = self._check_single_glob_match(
- local_ec2_dir, 'cert-*.pem', 'certificate')
- self.local_pk = self._check_single_glob_match(
- local_ec2_dir, 'pk-*.pem', 'private key')
- # The bucket `name` needs to exist and be accessible. We create it
- # here to reserve the name. If the bucket already exists and conforms
- # to the above requirements, this is a no-op.
- #
- # The API for region creation is a little quirky: you apparently can't
- # explicitly ask for 'us-east-1' you must just say '', etc.
- location = self._credentials.region_name
- if location.startswith('us-east'):
- location = ''
- elif location.startswith('eu'):
- location = 'EU'
- self._credentials.connect_s3().create_bucket(
- name, location=location)
-
- def bundle(self, name, credentials):
- """Bundle, upload and register the instance as a new AMI.
-
- :param name: The name-to-be of the new AMI, eg 'launchpad-ec2test500'.
- :param credentials: An `EC2Credentials` object.
- """
- connection = self.connect()
- # See http://is.gd/g1MIT . When we switch to Eucalyptus, we can dump
- # this installation of the ec2-ami-tools.
- connection.perform(
- 'sudo env DEBIAN_FRONTEND=noninteractive '
- 'apt-get -y install ec2-ami-tools')
- connection.perform('rm -f .ssh/authorized_keys')
- connection.perform('sudo mkdir /mnt/ec2')
- connection.perform('sudo chown $USER:$USER /mnt/ec2')
-
- remote_pk, remote_cert = self.copy_key_and_certificate_to_image(
- connection.sftp)
-
- bundle_dir = os.path.join('/mnt', name)
-
- connection.perform('sudo mkdir ' + bundle_dir)
- connection.perform(' '.join([
- 'sudo ec2-bundle-vol',
- '-d %s' % bundle_dir,
- '--batch', # Set batch-mode, which doesn't use prompts.
- '-k %s' % remote_pk,
- '-c %s' % remote_cert,
- '-u %s' % self.aws_user,
- ]))
-
- # Assume that the manifest is 'image.manifest.xml', since "image" is
- # the default prefix.
- manifest = os.path.join(bundle_dir, 'image.manifest.xml')
-
- # Best check that the manifest actually exists though.
- test = 'test -f %s' % manifest
- connection.perform(test)
-
- connection.perform(' '.join([
- 'sudo ec2-upload-bundle',
- '-b %s' % name,
- '-m %s' % manifest,
- '-a %s' % credentials.identifier,
- '-s %s' % credentials.secret,
- ]))
-
- connection.close()
-
- # This is invoked locally.
- mfilename = os.path.basename(manifest)
- manifest_path = os.path.join(name, mfilename)
-
- now = datetime.strftime(datetime.utcnow(), "%Y-%m-%d %H:%M:%S UTC")
- description = "launchpad ec2test created %s by %r on %s" % (
- now,
- os.environ.get('EMAIL', '<unknown>'),
- socket.gethostname())
-
- self.log('registering image: ')
- image_id = credentials.connect('bundle').conn.register_image(
- name=name,
- description=description,
- image_location=manifest_path,
- )
- self.log('ok\n')
- self.log('** new instance: %r\n' % (image_id,))
-
-
-class EC2InstanceConnection:
- """An ssh connection to an `EC2Instance`."""
-
- def __init__(self, instance, username, ssh):
- self._instance = instance
- self._username = username
- self._ssh = ssh
- self._sftp = None
-
- @property
- def sftp(self):
- if self._sftp is None:
- self._sftp = self._ssh.open_sftp()
- if self._sftp is None:
- raise AssertionError("failed to open sftp connection")
- return self._sftp
-
- def perform(self, cmd, ignore_failure=False, out=None, err=None):
- """Perform 'cmd' on server.
-
- :param ignore_failure: If False, raise an error on non-zero exit
- statuses.
- :param out: A stream to write the output of the remote command to.
- :param err: A stream to write the error of the remote command to.
- """
- if out is None:
- out = sys.stdout
- if err is None:
- err = sys.stderr
- self._instance.log(
- '%s@%s$ %s\n'
- % (self._username, self._instance._boto_instance.id, cmd))
- session = self._ssh.get_transport().open_session()
- session.exec_command(cmd)
- session.shutdown_write()
- while 1:
- try:
- select.select([session], [], [], 0.5)
- except (IOError, select.error), e:
- if e.errno == errno.EINTR:
- continue
- if session.recv_ready():
- data = session.recv(4096)
- if data:
- out.write(data)
- out.flush()
- if session.recv_stderr_ready():
- data = session.recv_stderr(4096)
- if data:
- err.write(data)
- err.flush()
- if session.exit_status_ready():
- break
- session.close()
- # XXX: JonathanLange 2009-05-31: If the command is killed by a signal
- # on the remote server, the SSH protocol does not send an exit_status,
- # it instead sends a different message with the number of the signal
- # that killed the process. AIUI, this code will fail confusingly if
- # that happens.
- res = session.recv_exit_status()
- if res and not ignore_failure:
- raise RuntimeError('Command failed: %s' % (cmd,))
- return res
-
- def run_with_ssh_agent(self, cmd, ignore_failure=False):
- """Run 'cmd' in a subprocess.
-
- Use this to run commands that require local SSH credentials. For
- example, getting private branches from Launchpad.
- """
- self._instance.log(
- '%s@%s$ %s\n'
- % (self._username, self._instance._boto_instance.id, cmd))
- call = ['ssh', '-A', self._username + '@' + self._instance.hostname,
- '-o', 'CheckHostIP no',
- '-o', 'StrictHostKeyChecking no',
- '-o', 'UserKnownHostsFile ~/.ec2/known_hosts',
- cmd]
- res = subprocess.call(call)
- if res and not ignore_failure:
- raise RuntimeError('Command failed: %s' % (cmd,))
- return res
-
- def run_script(self, script_text, sudo=False):
- """Upload `script_text` to the instance and run it with bash."""
- script = self.sftp.open('script.sh', 'w')
- script.write(script_text)
- script.close()
- cmd = '/bin/bash script.sh'
- if sudo:
- cmd = 'sudo ' + cmd
- self.run_with_ssh_agent(cmd)
- # At least for mwhudson, the paramiko connection often drops while the
- # script is running. Reconnect just in case.
- self.reconnect()
- self.perform('rm script.sh')
-
- def reconnect(self):
- """Close the connection and reopen it."""
- self.close()
- self._ssh = self._instance._connect(self._username)._ssh
-
- def close(self):
- if self._sftp is not None:
- self._sftp.close()
- self._sftp = None
- self._ssh.close()
- self._ssh = None
=== removed file 'lib/devscripts/ec2test/remote.py'
--- lib/devscripts/ec2test/remote.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/remote.py 1970-01-01 00:00:00 +0000
@@ -1,930 +0,0 @@
-#!/usr/bin/env python
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Run tests in a daemon.
-
- * `EC2Runner` handles the daemonization and instance shutdown.
-
- * `Request` knows everything about the test request we're handling (e.g.
- "test merging foo-bar-bug-12345 into db-devel").
-
- * `LaunchpadTester` knows how to actually run the tests and gather the
- results. It uses `SummaryResult` to do so.
-
- * `WebTestLogger` knows how to display the results to the user, and is given
- the responsibility of handling the results that `LaunchpadTester` gathers.
-"""
-
-__metatype__ = type
-
-import datetime
-from email import (
- MIMEMultipart,
- MIMEText,
- )
-from email.mime.application import MIMEApplication
-import errno
-import gzip
-import optparse
-import os
-import pickle
-from StringIO import StringIO
-import subprocess
-import sys
-import tempfile
-import textwrap
-import time
-import traceback
-import unittest
-from xml.sax.saxutils import escape
-
-import bzrlib.branch
-import bzrlib.config
-from bzrlib.email_message import EmailMessage
-import bzrlib.errors
-from bzrlib.smtp_connection import SMTPConnection
-import bzrlib.workingtree
-import simplejson
-import subunit
-from testtools import MultiTestResult
-
-# We need to be able to unpickle objects from bzr-pqm, so make sure we
-# can import it.
-bzrlib.plugin.load_plugins()
-
-
-class NonZeroExitCode(Exception):
- """Raised when the child process exits with a non-zero exit code."""
-
- def __init__(self, retcode):
- super(NonZeroExitCode, self).__init__(
- 'Test process died with exit code %r, but no tests failed.'
- % (retcode,))
-
-
-class SummaryResult(unittest.TestResult):
- """Test result object used to generate the summary."""
-
- double_line = '=' * 70
- single_line = '-' * 70
-
- def __init__(self, output_stream):
- super(SummaryResult, self).__init__()
- self.stream = output_stream
-
- def _formatError(self, flavor, test, error):
- return '\n'.join(
- [self.double_line,
- '%s: %s' % (flavor, test),
- self.single_line,
- error,
- ''])
-
- def addError(self, test, error):
- super(SummaryResult, self).addError(test, error)
- self.stream.write(
- self._formatError(
- 'ERROR', test, self._exc_info_to_string(error, test)))
-
- def addFailure(self, test, error):
- super(SummaryResult, self).addFailure(test, error)
- self.stream.write(
- self._formatError(
- 'FAILURE', test, self._exc_info_to_string(error, test)))
-
- def stopTest(self, test):
- super(SummaryResult, self).stopTest(test)
- # At the very least, we should be sure that a test's output has been
- # completely displayed once it has stopped.
- self.stream.flush()
-
-
-class FailureUpdateResult(unittest.TestResult):
-
- def __init__(self, logger):
- super(FailureUpdateResult, self).__init__()
- self._logger = logger
-
- def addError(self, *args, **kwargs):
- super(FailureUpdateResult, self).addError(*args, **kwargs)
- self._logger.got_failure()
-
- def addFailure(self, *args, **kwargs):
- super(FailureUpdateResult, self).addFailure(*args, **kwargs)
- self._logger.got_failure()
-
-
-class EC2Runner:
- """Runs generic code in an EC2 instance.
-
- Handles daemonization, instance shutdown, and email in the case of
- catastrophic failure.
- """
-
- # XXX: JonathanLange 2010-08-17: EC2Runner needs tests.
-
- # The number of seconds we give this script to clean itself up, and for
- # 'ec2 test --postmortem' to grab control if needed. If we don't give
- # --postmortem enough time to log in via SSH and take control, then this
- # server will begin to shutdown on its own.
- #
- # (FWIW, "grab control" means sending SIGTERM to this script's process id,
- # thus preventing fail-safe shutdown.)
- SHUTDOWN_DELAY = 60
-
- def __init__(self, daemonize, pid_filename, shutdown_when_done,
- smtp_connection=None, emails=None):
- """Make an EC2Runner.
-
- :param daemonize: Whether or not we will daemonize.
- :param pid_filename: The filename to store the pid in.
- :param shutdown_when_done: Whether or not to shut down when the tests
- are done.
- :param smtp_connection: The `SMTPConnection` to use to send email.
- :param emails: The email address(es) to send catastrophic failure
- messages to. If not provided, the error disappears into the ether.
- """
- self._should_daemonize = daemonize
- self._pid_filename = pid_filename
- self._shutdown_when_done = shutdown_when_done
- if smtp_connection is None:
- config = bzrlib.config.GlobalConfig()
- smtp_connection = SMTPConnection(config)
- self._smtp_connection = smtp_connection
- self._emails = emails
- self._daemonized = False
-
- def _daemonize(self):
- """Turn the testrunner into a forked daemon process."""
- # This also writes our pidfile to disk to a specific location. The
- # ec2test.py --postmortem command will look there to find our PID,
- # in order to control this process.
- daemonize(self._pid_filename)
- self._daemonized = True
-
- def _shutdown_instance(self):
- """Shut down this EC2 instance."""
- # Make sure our process is daemonized, and has therefore disconnected
- # the controlling terminal. This also disconnects the ec2test.py SSH
- # connection, thus signalling ec2test.py that it may now try to take
- # control of the server.
- if not self._daemonized:
- # We only want to do this if we haven't already been daemonized.
- # Nesting daemons is bad.
- self._daemonize()
-
- time.sleep(self.SHUTDOWN_DELAY)
-
- # Cancel the running shutdown.
- subprocess.call(['sudo', 'shutdown', '-c'])
-
- # We'll only get here if --postmortem didn't kill us. This is our
- # fail-safe shutdown, in case the user got disconnected or suffered
- # some other mishap that would prevent them from shutting down this
- # server on their own.
- subprocess.call(['sudo', 'shutdown', '-P', 'now'])
-
- def run(self, name, function, *args, **kwargs):
- try:
- if self._should_daemonize:
- print 'Starting %s daemon...' % (name,)
- self._daemonize()
-
- return function(*args, **kwargs)
- except:
- config = bzrlib.config.GlobalConfig()
- # Handle exceptions thrown by the test() or daemonize() methods.
- if self._emails:
- msg = EmailMessage(
- from_address=config.username(),
- to_address=self._emails,
- subject='%s FAILED' % (name,),
- body=traceback.format_exc())
- self._smtp_connection.send_email(msg)
- raise
- finally:
- # When everything is over, if we've been ask to shut down, then
- # make sure we're daemonized, then shutdown. Otherwise, if we're
- # daemonized, just clean up the pidfile.
- if self._shutdown_when_done:
- self._shutdown_instance()
- elif self._daemonized:
- # It would be nice to clean up after ourselves, since we won't
- # be shutting down.
- remove_pidfile(self._pid_filename)
- else:
- # We're not a daemon, and we're not shutting down. The user
- # most likely started this script manually, from a shell
- # running on the instance itself.
- pass
-
-
-class LaunchpadTester:
- """Runs Launchpad tests and gathers their results in a useful way."""
-
- def __init__(self, logger, test_directory, test_options=()):
- """Construct a TestOnMergeRunner.
-
- :param logger: The WebTestLogger to log to.
- :param test_directory: The directory to run the tests in. We expect
- this directory to have a fully-functional checkout of Launchpad
- and its dependent branches.
- :param test_options: A sequence of options to pass to the test runner.
- """
- self._logger = logger
- self._test_directory = test_directory
- self._test_options = ' '.join(test_options)
-
- def build_test_command(self):
- """Return the command that will execute the test suite.
-
- Should return a list of command options suitable for submission to
- subprocess.call()
-
- Subclasses must provide their own implementation of this method.
- """
- command = ['make', 'check']
- if self._test_options:
- command.append('TESTOPTS="%s"' % self._test_options)
- return command
-
- def _spawn_test_process(self):
- """Actually run the tests.
-
- :return: A `subprocess.Popen` object for the test run.
- """
- call = self.build_test_command()
- self._logger.write_line("Running %s" % (call,))
- # bufsize=0 means do not buffer any of the output. We want to
- # display the test output as soon as it is generated.
- return subprocess.Popen(
- call, bufsize=0,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
- cwd=self._test_directory)
-
- def test(self):
- """Run the tests, log the results.
-
- Signals the ec2test process and cleans up the logs once all the tests
- have completed. If necessary, submits the branch to PQM, and mails
- the user the test results.
- """
- self._logger.prepare()
- try:
- popen = self._spawn_test_process()
- result = self._gather_test_output(popen.stdout, self._logger)
- retcode = popen.wait()
- # The process could have an error not indicated by an actual test
- # result nor by a raised exception
- if result.wasSuccessful() and retcode:
- raise NonZeroExitCode(retcode)
- except:
- self._logger.error_in_testrunner(sys.exc_info())
- else:
- self._logger.got_result(result)
-
- def _gather_test_output(self, input_stream, logger):
- """Write the testrunner output to the logs."""
- summary_stream = logger.get_summary_stream()
- summary_result = SummaryResult(summary_stream)
- result = MultiTestResult(
- summary_result,
- FailureUpdateResult(logger))
- subunit_server = subunit.TestProtocolServer(result, summary_stream)
- for line in input_stream:
- subunit_server.lineReceived(line)
- logger.got_line(line)
- summary_stream.flush()
- return summary_result
-
-
-# XXX: Publish a JSON file that includes the relevant details from this
-# request.
-class Request:
- """A request to have a branch tested and maybe landed."""
-
- def __init__(self, branch_url, revno, local_branch_path, sourcecode_path,
- emails=None, pqm_message=None, smtp_connection=None):
- """Construct a `Request`.
-
- :param branch_url: The public URL to the Launchpad branch we are
- testing.
- :param revno: The revision number of the branch we are testing.
- :param local_branch_path: A local path to the Launchpad branch we are
- testing. This must be a branch of Launchpad with a working tree.
- :param sourcecode_path: A local path to the sourcecode dependencies
- directory (normally '$local_branch_path/sourcecode'). This must
- contain up-to-date copies of all of Launchpad's sourcecode
- dependencies.
- :param emails: A list of emails to send the results to. If not
- provided, no emails are sent.
- :param pqm_message: The message to submit to PQM. If not provided, we
- don't submit to PQM.
- :param smtp_connection: The `SMTPConnection` to use to send email.
- """
- self._branch_url = branch_url
- self._revno = revno
- self._local_branch_path = local_branch_path
- self._sourcecode_path = sourcecode_path
- self._emails = emails
- self._pqm_message = pqm_message
- # Used for figuring out how to send emails.
- self._bzr_config = bzrlib.config.GlobalConfig()
- if smtp_connection is None:
- smtp_connection = SMTPConnection(self._bzr_config)
- self._smtp_connection = smtp_connection
-
- def _send_email(self, message):
- """Actually send 'message'."""
- self._smtp_connection.send_email(message)
-
- def _format_test_list(self, header, tests):
- if not tests:
- return []
- tests = [' ' + test.id() for test, error in tests]
- return [header, '-' * len(header)] + tests + ['']
-
- def format_result(self, result, start_time, end_time):
- duration = end_time - start_time
- output = [
- 'Tests started at approximately %s' % start_time,
- ]
- source = self.get_source_details()
- if source:
- output.append('Source: %s r%s' % source)
- target = self.get_target_details()
- if target:
- output.append('Target: %s r%s' % target)
- output.extend([
- '',
- '%s tests run in %s, %s failures, %s errors' % (
- result.testsRun, duration, len(result.failures),
- len(result.errors)),
- '',
- ])
-
- bad_tests = (
- self._format_test_list('Failing tests', result.failures) +
- self._format_test_list('Tests with errors', result.errors))
- output.extend(bad_tests)
-
- if bad_tests:
- full_error_stream = StringIO()
- copy_result = SummaryResult(full_error_stream)
- for test, error in result.failures:
- full_error_stream.write(
- copy_result._formatError('FAILURE', test, error))
- for test, error in result.errors:
- full_error_stream.write(
- copy_result._formatError('ERROR', test, error))
- output.append(full_error_stream.getvalue())
-
- subject = self._get_pqm_subject()
- if subject:
- if result.wasSuccessful():
- output.append('SUBMITTED TO PQM:')
- else:
- output.append('**NOT** submitted to PQM:')
- output.extend([subject, ''])
- output.extend(['(See the attached file for the complete log)', ''])
- return '\n'.join(output)
-
- def get_target_details(self):
- """Return (branch_url, revno) for trunk."""
- branch = bzrlib.branch.Branch.open(self._local_branch_path)
- return branch.get_parent().encode('utf-8'), branch.revno()
-
- def get_source_details(self):
- """Return (branch_url, revno) for the branch we're merging in.
-
- If we're not merging in a branch, but instead just testing a trunk,
- then return None.
- """
- tree = bzrlib.workingtree.WorkingTree.open(self._local_branch_path)
- parent_ids = tree.get_parent_ids()
- if len(parent_ids) < 2:
- return None
- return self._branch_url.encode('utf-8'), self._revno
-
- def _last_segment(self, url):
- """Return the last segment of a URL."""
- return url.strip('/').split('/')[-1]
-
- def get_nick(self):
- """Get the nick of the branch we are testing."""
- details = self.get_source_details()
- if not details:
- details = self.get_target_details()
- url, revno = details
- return self._last_segment(url)
-
- def get_revno(self):
- """Get the revno of the branch we are testing."""
- if self._revno is not None:
- return self._revno
- return bzrlib.branch.Branch.open(self._local_branch_path).revno()
-
- def get_merge_description(self):
- """Get a description of the merge request.
-
- If we're merging a branch, return '$SOURCE_NICK => $TARGET_NICK', if
- we're just running tests for a trunk branch without merging return
- '$TRUNK_NICK'.
- """
- source = self.get_source_details()
- if not source:
- return '%s r%s' % (self.get_nick(), self.get_revno())
- target = self.get_target_details()
- return '%s => %s' % (
- self._last_segment(source[0]), self._last_segment(target[0]))
-
- def get_summary_commit(self):
- """Get a message summarizing the change from the commit log.
-
- Returns the last commit message of the merged branch, or None.
- """
- # XXX: JonathanLange 2010-08-17: I don't actually know why we are
- # using this commit message as a summary message. It's used in the
- # test logs and the EC2 hosted web page.
- branch = bzrlib.branch.Branch.open(self._local_branch_path)
- tree = bzrlib.workingtree.WorkingTree.open(self._local_branch_path)
- parent_ids = tree.get_parent_ids()
- if len(parent_ids) == 1:
- return None
- summary = (
- branch.repository.get_revision(parent_ids[1]).get_summary())
- return summary.encode('utf-8')
-
- def _build_report_email(self, successful, body_text, full_log_gz):
- """Build a MIME email summarizing the test results.
-
- :param successful: True for pass, False for failure.
- :param body_text: The body of the email to send to the requesters.
- :param full_log_gz: A gzip of the full log.
- """
- message = MIMEMultipart.MIMEMultipart()
- message['To'] = ', '.join(self._emails)
- message['From'] = self._bzr_config.username()
- if successful:
- status = 'SUCCESS'
- else:
- status = 'FAILURE'
- subject = 'Test results: %s: %s' % (
- self.get_merge_description(), status)
- message['Subject'] = subject
-
- # Make the body.
- body = MIMEText.MIMEText(body_text, 'plain', 'utf8')
- body['Content-Disposition'] = 'inline'
- message.attach(body)
-
- # Attach the gzipped log.
- zipped_log = MIMEApplication(full_log_gz, 'x-gzip')
- zipped_log.add_header(
- 'Content-Disposition', 'attachment',
- filename='%s-r%s.subunit.gz' % (
- self.get_nick(), self.get_revno()))
- message.attach(zipped_log)
- return message
-
- def send_report_email(self, successful, body_text, full_log_gz):
- """Send an email summarizing the test results.
-
- :param successful: True for pass, False for failure.
- :param body_text: The body of the email to send to the requesters.
- :param full_log_gz: A gzip of the full log.
- """
- message = self._build_report_email(successful, body_text, full_log_gz)
- self._send_email(message)
-
- def iter_dependency_branches(self):
- """Iterate through the Bazaar branches we depend on."""
- for name in sorted(os.listdir(self._sourcecode_path)):
- path = os.path.join(self._sourcecode_path, name)
- if os.path.isdir(path):
- try:
- branch = bzrlib.branch.Branch.open(path)
- except bzrlib.errors.NotBranchError:
- continue
- yield name, branch.get_parent(), branch.revno()
-
- def _get_pqm_subject(self):
- if not self._pqm_message:
- return
- return self._pqm_message.get('Subject')
-
- def submit_to_pqm(self, successful):
- """Submit this request to PQM, if successful & configured to do so."""
- subject = self._get_pqm_subject()
- if subject and successful:
- self._send_email(self._pqm_message)
- return subject
-
- @property
- def wants_email(self):
- """Do the requesters want emails sent to them?"""
- return bool(self._emails)
-
-
-class WebTestLogger:
- """Logs test output to disk and a simple web page.
-
- :ivar successful: Whether the logger has received only successful input up
- until now.
- """
-
- def __init__(self, full_log_filename, summary_filename, index_filename,
- request, echo_to_stdout):
- """Construct a WebTestLogger.
-
- Because this writes an HTML file with links to the summary and full
- logs, you should construct this object with
- `WebTestLogger.make_in_directory`, which guarantees that the files
- are available in the correct locations.
-
- :param full_log_filename: Path to a file that will have the full
- log output written to it. The file will be overwritten.
- :param summary_file: Path to a file that will have a human-readable
- summary written to it. The file will be overwritten.
- :param index_file: Path to a file that will have an HTML page
- written to it. The file will be overwritten.
- :param request: A `Request` object representing the thing that's being
- tested.
- :param echo_to_stdout: Whether or not we should echo output to stdout.
- """
- self._full_log_filename = full_log_filename
- self._summary_filename = summary_filename
- self._index_filename = index_filename
- self._info_json = os.path.join(
- os.path.dirname(index_filename), 'info.json')
- self._request = request
- self._echo_to_stdout = echo_to_stdout
- # Actually set by prepare(), but setting to a dummy value to make
- # testing easier.
- self._start_time = datetime.datetime.utcnow()
- self.successful = True
-
- @classmethod
- def make_in_directory(cls, www_dir, request, echo_to_stdout):
- """Make a logger that logs to specific files in `www_dir`.
-
- :param www_dir: The directory in which to log the files:
- current_test.log, summary.log and index.html. These files
- will be overwritten.
- :param request: A `Request` object representing the thing that's being
- tested.
- :param echo_to_stdout: Whether or not we should echo output to stdout.
- """
- files = [
- os.path.join(www_dir, 'current_test.log'),
- os.path.join(www_dir, 'summary.log'),
- os.path.join(www_dir, 'index.html')]
- files.extend([request, echo_to_stdout])
- return cls(*files)
-
- def error_in_testrunner(self, exc_info):
- """Called when there is a catastrophic error in the test runner."""
- exc_type, exc_value, exc_tb = exc_info
- # XXX: JonathanLange 2010-08-17: This should probably log to the full
- # log as well.
- summary = self.get_summary_stream()
- summary.write('\n\nERROR IN TESTRUNNER\n\n')
- traceback.print_exception(exc_type, exc_value, exc_tb, file=summary)
- summary.flush()
- if self._request.wants_email:
- self._write_to_filename(
- self._summary_filename,
- '\n(See the attached file for the complete log)\n')
- summary = self.get_summary_contents()
- full_log_gz = gzip_data(self.get_full_log_contents())
- self._request.send_report_email(False, summary, full_log_gz)
-
- def get_index_contents(self):
- """Return the contents of the index.html page."""
- return self._get_contents(self._index_filename)
-
- def get_full_log_contents(self):
- """Return the contents of the complete log."""
- return self._get_contents(self._full_log_filename)
-
- def get_summary_contents(self):
- """Return the contents of the summary log."""
- return self._get_contents(self._summary_filename)
-
- def get_summary_stream(self):
- """Return a stream that, when written to, writes to the summary."""
- return open(self._summary_filename, 'a')
-
- def got_line(self, line):
- """Called when we get a line of output from our child processes."""
- self._write_to_filename(self._full_log_filename, line)
- if self._echo_to_stdout:
- sys.stdout.write(line)
- sys.stdout.flush()
-
- def _get_contents(self, filename):
- """Get the full contents of 'filename'."""
- try:
- return open(filename, 'r').read()
- except IOError, e:
- if e.errno == errno.ENOENT:
- return ''
-
- def got_failure(self):
- """Called when we receive word that a test has failed."""
- self.successful = False
- self._dump_json()
-
- def got_result(self, result):
- """The tests are done and the results are known."""
- self._end_time = datetime.datetime.utcnow()
- successful = result.wasSuccessful()
- self._handle_pqm_submission(successful)
- if self._request.wants_email:
- email_text = self._request.format_result(
- result, self._start_time, self._end_time)
- full_log_gz = gzip_data(self.get_full_log_contents())
- self._request.send_report_email(successful, email_text, full_log_gz)
-
- def _handle_pqm_submission(self, successful):
- subject = self._request.submit_to_pqm(successful)
- if not subject:
- return
- self.write_line('')
- self.write_line('')
- if successful:
- self.write_line('SUBMITTED TO PQM:')
- else:
- self.write_line('**NOT** submitted to PQM:')
- self.write_line(subject)
-
- def _write_to_filename(self, filename, msg):
- fd = open(filename, 'a')
- fd.write(msg)
- fd.flush()
- fd.close()
-
- def _write(self, msg):
- """Write to the summary and full log file."""
- self._write_to_filename(self._full_log_filename, msg)
- self._write_to_filename(self._summary_filename, msg)
-
- def write_line(self, msg):
- """Write to the summary and full log file with a newline."""
- self._write(msg + '\n')
-
- def _dump_json(self):
- fd = open(self._info_json, 'w')
- simplejson.dump(
- {'description': self._request.get_merge_description(),
- 'failed-yet': not self.successful,
- }, fd)
- fd.close()
-
- def prepare(self):
- """Prepares the log files on disk.
-
- Writes three log files: the raw output log, the filtered "summary"
- log file, and a HTML index page summarizing the test run paramters.
- """
- self._dump_json()
- # XXX: JonathanLange 2010-07-18: Mostly untested.
- log = self.write_line
-
- # Clear the existing index file.
- index = open(self._index_filename, 'w')
- index.truncate(0)
- index.close()
-
- def add_to_html(html):
- return self._write_to_filename(
- self._index_filename, textwrap.dedent(html))
-
- self._start_time = datetime.datetime.utcnow()
- msg = 'Tests started at approximately %(now)s UTC' % {
- 'now': self._start_time.strftime('%a, %d %b %Y %H:%M:%S')}
- add_to_html('''\
- <html>
- <head>
- <title>Testing</title>
- </head>
- <body>
- <h1>Testing</h1>
- <p>%s</p>
- <ul>
- <li><a href="summary.log">Summary results</a></li>
- <li><a href="current_test.log">Full results</a></li>
- </ul>
- ''' % (msg,))
- log(msg)
-
- add_to_html('''\
- <h2>Branches Tested</h2>
- ''')
-
- # Describe the trunk branch.
- trunk, trunk_revno = self._request.get_target_details()
- msg = '%s, revision %d\n' % (trunk, trunk_revno)
- add_to_html('''\
- <p><strong>%s</strong></p>
- ''' % (escape(msg),))
- log(msg)
-
- branch_details = self._request.get_source_details()
- if not branch_details:
- add_to_html('<p>(no merged branch)</p>\n')
- log('(no merged branch)')
- else:
- branch_name, branch_revno = branch_details
- data = {'name': branch_name,
- 'revno': branch_revno,
- 'commit': self._request.get_summary_commit()}
- msg = ('%(name)s, revision %(revno)d '
- '(commit message: %(commit)s)\n' % data)
- add_to_html('''\
- <p>Merged with<br />%(msg)s</p>
- ''' % {'msg': escape(msg)})
- log("Merged with")
- log(msg)
-
- add_to_html('<dl>\n')
- log('\nDEPENDENCY BRANCHES USED\n')
- for name, branch, revno in self._request.iter_dependency_branches():
- data = {'name': name, 'branch': branch, 'revno': revno}
- log(
- '- %(name)s\n %(branch)s\n %(revno)d\n' % data)
- escaped_data = {'name': escape(name),
- 'branch': escape(branch),
- 'revno': revno}
- add_to_html('''\
- <dt>%(name)s</dt>
- <dd>%(branch)s</dd>
- <dd>%(revno)s</dd>
- ''' % escaped_data)
- add_to_html('''\
- </dl>
- </body>
- </html>''')
- log('\n\nTEST RESULTS FOLLOW\n\n')
-
-
-def daemonize(pid_filename):
- # this seems like the sort of thing that ought to be in the
- # standard library :-/
- pid = os.fork()
- if (pid == 0): # Child 1
- os.setsid()
- pid = os.fork()
- if (pid == 0): # Child 2, the daemon.
- pass # lookie, we're ready to do work in the daemon
- else:
- os._exit(0)
- else: # Parent
- # Make sure the pidfile is written before we exit, so that people
- # who've chosen to daemonize can quickly rectify their mistake. Since
- # the daemon might terminate itself very, very quickly, we cannot poll
- # for the existence of the pidfile. Instead, we just sleep for a
- # reasonable amount of time.
- time.sleep(1)
- os._exit(0)
-
- # write a pidfile ASAP
- write_pidfile(pid_filename)
-
- # Iterate through and close all file descriptors.
- import resource
- maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
- assert maxfd != resource.RLIM_INFINITY
- for fd in range(0, maxfd):
- try:
- os.close(fd)
- except OSError:
- # we assume fd was closed
- pass
- os.open(os.devnull, os.O_RDWR) # this will be 0
- os.dup2(0, 1)
- os.dup2(0, 2)
-
-
-def gunzip_data(data):
- """Decompress 'data'.
-
- :param data: The gzip data to decompress.
- :return: The decompressed data.
- """
- fd, path = tempfile.mkstemp()
- os.write(fd, data)
- os.close(fd)
- try:
- return gzip.open(path, 'r').read()
- finally:
- os.unlink(path)
-
-
-def gzip_data(data):
- """Compress 'data'.
-
- :param data: The data to compress.
- :return: The gzip-compressed data.
- """
- fd, path = tempfile.mkstemp()
- os.close(fd)
- gz = gzip.open(path, 'wb')
- gz.writelines(data)
- gz.close()
- try:
- return open(path).read()
- finally:
- os.unlink(path)
-
-
-def write_pidfile(pid_filename):
- """Write a pidfile for the current process."""
- pid_file = open(pid_filename, "w")
- pid_file.write(str(os.getpid()))
- pid_file.close()
-
-
-def remove_pidfile(pid_filename):
- if os.path.exists(pid_filename):
- os.remove(pid_filename)
-
-
-def parse_options(argv):
- """Make an `optparse.OptionParser` for running the tests remotely.
- """
- parser = optparse.OptionParser(
- usage="%prog [options] [-- test options]",
- description=("Build and run tests for an instance."))
- parser.add_option(
- '-e', '--email', action='append', dest='email', default=None,
- help=('Email address to which results should be mailed. Defaults to '
- 'the email address from `bzr whoami`. May be supplied multiple '
- 'times. `bzr whoami` will be used as the From: address.'))
- parser.add_option(
- '-s', '--submit-pqm-message', dest='pqm_message', default=None,
- help=('A base64-encoded pickle (string) of a pqm message '
- '(bzrib.plugins.pqm.pqm_submit.PQMEmailMessage) to submit if '
- 'the test run is successful.'))
- parser.add_option(
- '--daemon', dest='daemon', default=False,
- action='store_true', help=('Run test in background as daemon.'))
- parser.add_option(
- '--debug', dest='debug', default=False,
- action='store_true',
- help=('Drop to pdb trace as soon as possible.'))
- parser.add_option(
- '--shutdown', dest='shutdown', default=False,
- action='store_true',
- help=('Terminate (shutdown) instance after completion.'))
- parser.add_option(
- '--public-branch', dest='public_branch', default=None,
- help=('The URL of the public branch being tested.'))
- parser.add_option(
- '--public-branch-revno', dest='public_branch_revno',
- type="int", default=None,
- help=('The revision number of the public branch being tested.'))
-
- return parser.parse_args(argv)
-
-
-def main(argv):
- options, args = parse_options(argv)
-
- if options.debug:
- import pdb; pdb.set_trace()
- if options.pqm_message is not None:
- pqm_message = pickle.loads(
- options.pqm_message.decode('string-escape').decode('base64'))
- else:
- pqm_message = None
-
- # Locations for Launchpad. These are actually defined by the configuration
- # of the EC2 image that we use.
- LAUNCHPAD_DIR = '/var/launchpad'
- TEST_DIR = os.path.join(LAUNCHPAD_DIR, 'test')
- SOURCECODE_DIR = os.path.join(TEST_DIR, 'sourcecode')
-
- pid_filename = os.path.join(LAUNCHPAD_DIR, 'ec2test-remote.pid')
-
- smtp_connection = SMTPConnection(bzrlib.config.GlobalConfig())
-
- request = Request(
- options.public_branch, options.public_branch_revno, TEST_DIR,
- SOURCECODE_DIR, options.email, pqm_message, smtp_connection)
- # Only write to stdout if we are running as the foreground process.
- echo_to_stdout = not options.daemon
- logger = WebTestLogger.make_in_directory(
- '/var/www', request, echo_to_stdout)
-
- runner = EC2Runner(
- options.daemon, pid_filename, options.shutdown,
- smtp_connection, options.email)
-
- tester = LaunchpadTester(logger, TEST_DIR, test_options=args[1:])
- runner.run("Test runner", tester.test)
-
-
-if __name__ == '__main__':
- main(sys.argv)
=== removed file 'lib/devscripts/ec2test/session.py'
--- lib/devscripts/ec2test/session.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/session.py 1970-01-01 00:00:00 +0000
@@ -1,96 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Code to represent a single session of EC2 use."""
-
-__metaclass__ = type
-__all__ = [
- 'EC2SessionName',
- ]
-
-from datetime import (
- datetime,
- timedelta,
- )
-
-from devscripts.ec2test.utils import (
- find_datetime_string,
- make_datetime_string,
- make_random_string,
- )
-
-
-DEFAULT_LIFETIME = timedelta(hours=6)
-
-
-class EC2SessionName(str):
- """A name for an EC2 session.
-
- This is used when naming key pairs and security groups, so it's
- useful to be unique. However, to aid garbage collection of old key
- pairs and security groups, the name contains a common element and
- an expiry timestamp. The form taken should always be:
-
- <base-name>/<expires-timestamp>/<random-data>
-
- None of the parts should contain forward-slashes, and the
- timestamp should be acceptable input to `find_datetime_string`.
-
- `EC2SessionName.make()` will generate a suitable name given a
- suitable base name.
- """
-
- @classmethod
- def make(cls, base, expires=None):
- """Create an `EC2SessionName`.
-
- This checks that `base` does not contain a forward-slash, and
- provides some convenient functionality for `expires`:
-
- - If `expires` is None, it defaults to now (UTC) plus
- `DEFAULT_LIFETIME`.
-
- - If `expires` is a `datetime`, it is converted to a timestamp
- in the correct form.
-
- - If `expires` is a `timedelta`, it is added to now (UTC) then
- converted to a timestamp.
-
- - Otherwise `expires` is assumed to be a string, so is checked
- for the absense of forward-slashes, and that a correctly
- formed timestamp can be discovered.
-
- """
- assert '/' not in base
- if expires is None:
- expires = DEFAULT_LIFETIME
- if isinstance(expires, timedelta):
- expires = datetime.utcnow() + expires
- if isinstance(expires, datetime):
- expires = make_datetime_string(expires)
- else:
- assert '/' not in expires
- assert find_datetime_string(expires) is not None
- rand = make_random_string(8)
- return cls("%s/%s/%s" % (base, expires, rand))
-
- @property
- def base(self):
- parts = self.split('/')
- if len(parts) != 3:
- return None
- return parts[0]
-
- @property
- def expires(self):
- parts = self.split('/')
- if len(parts) != 3:
- return None
- return find_datetime_string(parts[1])
-
- @property
- def rand(self):
- parts = self.split('/')
- if len(parts) != 3:
- return None
- return parts[2]
=== removed file 'lib/devscripts/ec2test/testrunner.py'
--- lib/devscripts/ec2test/testrunner.py 2012-02-07 14:53:38 +0000
+++ lib/devscripts/ec2test/testrunner.py 1970-01-01 00:00:00 +0000
@@ -1,548 +0,0 @@
-# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Code to actually run the tests in an EC2 instance."""
-
-__metaclass__ = type
-__all__ = [
- 'EC2TestRunner',
- 'TRUNK_BRANCH',
- ]
-
-import os
-import pickle
-import re
-import sys
-
-from bzrlib.branch import Branch
-from bzrlib.bzrdir import BzrDir
-from bzrlib.config import GlobalConfig
-from bzrlib.errors import UncommittedChanges
-from bzrlib.plugins.pqm.pqm_submit import (
- NoPQMSubmissionAddress,
- PQMSubmission,
- )
-
-
-TRUNK_BRANCH = 'bzr+ssh://bazaar.launchpad.net/~launchpad-pqm/launchpad/devel'
-
-
-class UnknownBranchURL(Exception):
- """Raised when we try to parse an unrecognized branch url."""
-
- def __init__(self, branch_url):
- Exception.__init__(
- self,
- "Couldn't parse '%s', not a Launchpad branch." % (branch_url,))
-
-
-def parse_branch_url(branch_url):
- """Given the URL of a branch, return its components in a dict."""
- _lp_match = re.compile(
- r'lp:\~([^/]+)/([^/]+)/([^/]+)$').match
- _bazaar_match = re.compile(
- r'bzr+ssh://bazaar.launchpad.net/\~([^/]+)/([^/]+)/([^/]+)$').match
- match = _lp_match(branch_url)
- if match is None:
- match = _bazaar_match(branch_url)
- if match is None:
- raise UnknownBranchURL(branch_url)
- owner = match.group(1)
- product = match.group(2)
- branch = match.group(3)
- unique_name = '~%s/%s/%s' % (owner, product, branch)
- url = 'bzr+ssh://bazaar.launchpad.net/%s' % (unique_name,)
- return dict(
- owner=owner, product=product, branch=branch, unique_name=unique_name,
- url=url)
-
-
-def normalize_branch_input(data):
- """Given 'data' return a ('dest', 'src') pair.
-
- :param data: One of::
- - a double of (sourcecode_location, branch_url).
- If 'sourcecode_location' is Launchpad, then 'branch_url' can
- also be the name of a branch of launchpad owned by
- launchpad-pqm.
- - a singleton of (branch_url,)
- - a singleton of (sourcecode_location,) where
- sourcecode_location corresponds to a Launchpad upstream
- project as well as a rocketfuel sourcecode location.
- - a string which could populate any of the above singletons.
-
- :return: ('dest', 'src') where 'dest' is the destination
- sourcecode location in the rocketfuel tree and 'src' is the
- URL of the branch to put there. The URL can be either a bzr+ssh
- URL or the name of a branch of launchpad owned by launchpad-pqm.
- """
- # XXX: JonathanLange 2009-06-05: Should convert lp: URL branches to
- # bzr+ssh:// branches.
- if isinstance(data, basestring):
- data = (data,)
- if len(data) == 2:
- # Already in dest, src format.
- return data
- if len(data) != 1:
- raise ValueError(
- 'invalid argument for ``branches`` argument: %r' %
- (data,))
- branch_location = data[0]
- try:
- parsed_url = parse_branch_url(branch_location)
- except UnknownBranchURL:
- return branch_location, 'lp:%s' % (branch_location,)
- return parsed_url['product'], parsed_url['url']
-
-
-def parse_specified_branches(branches):
- """Given 'branches' from the command line, return a sanitized dict.
-
- The dict maps sourcecode locations to branch URLs, according to the
- rules in `normalize_branch_input`.
- """
- return dict(map(normalize_branch_input, branches))
-
-
-class EC2TestRunner:
-
- name = 'ec2-test-runner'
-
- message = image = None
- _running = False
-
- def __init__(self, branch, email=False, file=None, test_options=None,
- headless=False, branches=(),
- pqm_message=None, pqm_public_location=None,
- pqm_submit_location=None,
- open_browser=False, pqm_email=None,
- include_download_cache_changes=None, instance=None,
- launchpad_login=None,
- timeout=None):
- """Create a new EC2TestRunner.
-
- :param timeout: Number of minutes before we force a shutdown. This is
- useful because sometimes the normal instance termination might
- fail.
-
- - original_branch
- - test_options
- - headless
- - include_download_cache_changes
- - download_cache_additions
- - branches (parses, validates)
- - message (after validating PQM submisson)
- - email (after validating email capabilities)
- - image (after connecting to ec2)
- - file
- - timeout
- """
- self.original_branch = branch
- self.test_options = test_options
- self.headless = headless
- self.include_download_cache_changes = include_download_cache_changes
- self.open_browser = open_browser
- self.file = file
- self._launchpad_login = launchpad_login
- self.timeout = timeout
-
- trunk_specified = False
- trunk_branch = TRUNK_BRANCH
-
- # normalize and validate branches
- branches = parse_specified_branches(branches)
- try:
- launchpad_url = branches.pop('launchpad')
- except KeyError:
- # No Launchpad branch specified.
- pass
- else:
- try:
- parsed_url = parse_branch_url(launchpad_url)
- except UnknownBranchURL:
- user = 'launchpad-pqm'
- src = ('bzr+ssh://bazaar.launchpad.net/'
- '~launchpad-pqm/launchpad/%s' % (launchpad_url,))
- else:
- user = parsed_url['owner']
- src = parsed_url['url']
- if user == 'launchpad-pqm':
- trunk_specified = True
- trunk_branch = src
- self._trunk_branch = trunk_branch
- self.branches = branches.items()
-
- # XXX: JonathanLange 2009-05-31: The trunk_specified stuff above and
- # the pqm location stuff below are actually doing the equivalent of
- # preparing a merge directive. Perhaps we can leverage that to make
- # this code simpler.
- self.download_cache_additions = None
- if branch is None:
- config = GlobalConfig()
- if pqm_message is not None:
- raise ValueError('Cannot submit trunk to pqm.')
- else:
- (tree,
- bzrbranch,
- relpath) = BzrDir.open_containing_tree_or_branch(branch)
- config = bzrbranch.get_config()
-
- if pqm_message is not None or tree is not None:
- # if we are going to maybe send a pqm_message, we're going to
- # go down this path. Also, even if we are not but this is a
- # local branch, we're going to use the PQM machinery to make
- # sure that the local branch has been made public, and has all
- # working changes there.
- if tree is None:
- # remote. We will make some assumptions.
- if pqm_public_location is None:
- pqm_public_location = branch
- if pqm_submit_location is None:
- pqm_submit_location = trunk_branch
- elif pqm_submit_location is None and trunk_specified:
- pqm_submit_location = trunk_branch
- # Modified from pqm_submit.py.
- submission = PQMSubmission(
- source_branch=bzrbranch,
- public_location=pqm_public_location,
- message=pqm_message or '',
- submit_location=pqm_submit_location,
- tree=tree)
- if tree is not None:
- # This is the part we want to do whether we're
- # submitting or not:
- submission.check_tree() # Any working changes.
- submission.check_public_branch() # Everything public.
- branch = submission.public_location
- if (include_download_cache_changes is None or
- include_download_cache_changes):
- # We need to get the download cache settings.
- cache_tree, cache_bzrbranch, cache_relpath = (
- BzrDir.open_containing_tree_or_branch(
- os.path.join(
- self.original_branch, 'download-cache')))
- cache_tree.lock_read()
- try:
- cache_basis_tree = cache_tree.basis_tree()
- cache_basis_tree.lock_read()
- try:
- delta = cache_tree.changes_from(
- cache_basis_tree, want_unversioned=True)
- unversioned = [
- un for un in delta.unversioned
- if not cache_tree.is_ignored(un[0])]
- added = delta.added
- self.download_cache_additions = (
- unversioned + added)
- finally:
- cache_basis_tree.unlock()
- finally:
- cache_tree.unlock()
- if pqm_message is not None:
- if self.download_cache_additions:
- raise UncommittedChanges(cache_tree)
- # Get the submission message.
- mail_from = config.get_user_option('pqm_user_email')
- if not mail_from:
- mail_from = config.username()
- mail_from = mail_from.encode('utf8')
- if pqm_email is None:
- if tree is None:
- pqm_email = (
- "Launchpad PQM <launchpad@xxxxxxxxxxxxxxxxx>")
- else:
- pqm_email = config.get_user_option('pqm_email')
- if not pqm_email:
- raise NoPQMSubmissionAddress(bzrbranch)
- mail_to = pqm_email.encode('utf8')
- self.message = submission.to_email(mail_from, mail_to)
- elif (self.download_cache_additions and
- self.include_download_cache_changes is None):
- raise UncommittedChanges(
- cache_tree,
- 'You must select whether to include download cache '
- 'changes (see --include-download-cache-changes and '
- '--ignore-download-cache-changes, -c and -g '
- 'respectively), or '
- 'commit or remove the files in the download-cache.')
- self._branch = branch
-
- if email is not False:
- if email is True:
- email = [config.username()]
- if not email[0]:
- raise ValueError('cannot find your email address.')
- elif isinstance(email, basestring):
- email = [email]
- else:
- tmp = []
- for item in email:
- if not isinstance(item, basestring):
- raise ValueError(
- 'email must be True, False, a string, or a list '
- 'of strings')
- tmp.append(item)
- email = tmp
- else:
- email = None
- self.email = email
-
- # Email configuration.
- if email is not None or pqm_message is not None:
- self._smtp_server = config.get_user_option('smtp_server')
- # Refuse localhost, because there's no SMTP server _on the actual
- # EC2 instance._
- if self._smtp_server is None or self._smtp_server == 'localhost':
- raise ValueError(
- 'To send email, a remotely accessible smtp_server (and '
- 'smtp_username and smtp_password, if necessary) must be '
- 'configured in bzr. See the SMTP server information '
- 'here: https://wiki.canonical.com/EmailSetup .'
- 'This server must be reachable from the EC2 instance.')
- self._smtp_username = config.get_user_option('smtp_username')
- self._smtp_password = config.get_user_option('smtp_password')
- self._from_email = config.username()
- if not self._from_email:
- raise ValueError(
- 'To send email, your bzr email address must be set '
- '(use ``bzr whoami``).')
-
- self._instance = instance
-
- def log(self, msg):
- """Log a message on stdout, flushing afterwards."""
- # XXX: JonathanLange 2009-05-31 bug=383076: This should use Python
- # logging, rather than printing to stdout.
- sys.stdout.write(msg)
- sys.stdout.flush()
-
- def configure_system(self):
- user_connection = self._instance.connect()
- if self.timeout is not None:
- # Activate a fail-safe shutdown just in case something goes
- # really wrong with the server or suite.
- user_connection.perform("sudo shutdown -P +%d &" % self.timeout)
- as_user = user_connection.perform
- as_user(
- "sudo mount "
- "-o remount,data=writeback,commit=3600,async,relatime /")
- for d in ['/tmp', '/var/tmp']:
- as_user(
- "sudo mkdir -p %s && sudo mount -t tmpfs none %s" % (d, d))
- as_user(
- "sudo service postgresql stop"
- "; sudo mv /var/lib/postgresql /tmp/postgresql-tmp"
- "&& sudo mkdir /var/lib/postgresql"
- "&& sudo mount -t tmpfs none /var/lib/postgresql"
- "&& sudo mv /tmp/postgresql-tmp/* /var/lib/postgresql"
- "&& sudo service postgresql start")
- as_user("sudo add-apt-repository ppa:bzr")
- as_user("sudo add-apt-repository ppa:launchpad")
- as_user("sudo aptitude update")
- as_user(
- "sudo DEBIAN_FRONTEND=noninteractive aptitude -y full-upgrade")
- # Set up bazaar.conf with smtp information if necessary
- if self.email or self.message:
- as_user('[ -d .bazaar ] || mkdir .bazaar')
- bazaar_conf_file = user_connection.sftp.open(
- ".bazaar/bazaar.conf", 'w')
- bazaar_conf_file.write(
- 'email = %s\n' % (self._from_email.encode('utf-8'),))
- bazaar_conf_file.write(
- 'smtp_server = %s\n' % (self._smtp_server,))
- if self._smtp_username:
- bazaar_conf_file.write(
- 'smtp_username = %s\n' % (self._smtp_username,))
- if self._smtp_password:
- bazaar_conf_file.write(
- 'smtp_password = %s\n' % (self._smtp_password,))
- bazaar_conf_file.close()
- # Copy remote ec2-remote over
- self.log('Copying remote.py to remote machine.\n')
- user_connection.sftp.put(
- os.path.join(
- os.path.dirname(os.path.realpath(__file__)), 'remote.py'),
- '/var/launchpad/ec2test-remote.py')
- # Set up launchpad login and email
- as_user('bzr launchpad-login %s' % (self._launchpad_login,))
- user_connection.close()
-
- def prepare_tests(self):
- user_connection = self._instance.connect()
- # Clean up the test branch left in the instance image.
- user_connection.perform('rm -rf /var/launchpad/test')
- user_connection.perform(
- 'sudo mkdir /var/launchpad/test && '
- 'sudo mount -t tmpfs none /var/launchpad/test')
- # Get trunk.
- user_connection.run_with_ssh_agent(
- 'bzr branch --use-existing-dir %s /var/launchpad/test'
- % (self._trunk_branch,))
- # Merge the branch in.
- if self._branch is not None:
- user_connection.run_with_ssh_agent(
- 'cd /var/launchpad/test; bzr merge %s' % (self._branch,))
- else:
- self.log('(Testing trunk, so no branch merge.)')
- # get newest sources
- user_connection.run_with_ssh_agent(
- "/var/launchpad/test/utilities/update-sourcecode "
- "/var/launchpad/sourcecode")
- # Get any new sourcecode branches as requested
- for dest, src in self.branches:
- fulldest = os.path.join('/var/launchpad/test/sourcecode', dest)
- user_connection.run_with_ssh_agent(
- 'bzr branch --standalone %s %s' % (src, fulldest))
- # prepare fresh copy of sourcecode and buildout sources for building
- p = user_connection.perform
- p('rm -rf /var/launchpad/tmp'
- '&& mkdir /var/launchpad/tmp '
- '&& sudo mount -t tmpfs none /var/launchpad/tmp')
- p('mv /var/launchpad/sourcecode /var/launchpad/tmp/sourcecode')
- p('mkdir /var/launchpad/tmp/eggs')
- p('mkdir /var/launchpad/tmp/yui')
- user_connection.run_with_ssh_agent(
- 'bzr pull lp:lp-source-dependencies '
- '-d /var/launchpad/download-cache')
- p(
- 'mv /var/launchpad/download-cache '
- '/var/launchpad/tmp/download-cache')
- if (self.include_download_cache_changes and
- self.download_cache_additions):
- root = os.path.realpath(
- os.path.join(self.original_branch, 'download-cache'))
- for info in self.download_cache_additions:
- src = os.path.join(root, info[0])
- self.log('Copying %s to remote machine.\n' % (src,))
- user_connection.sftp.put(
- src,
- os.path.join(
- '/var/launchpad/tmp/download-cache', info[0]))
- p('/var/launchpad/test/utilities/link-external-sourcecode '
- '-p/var/launchpad/tmp -t/var/launchpad/test'),
- # set up database
- p('/var/launchpad/test/utilities/launchpad-database-setup $USER')
- p('mkdir -p /var/tmp/launchpad_mailqueue/cur')
- p('mkdir -p /var/tmp/launchpad_mailqueue/new')
- p('mkdir -p /var/tmp/launchpad_mailqueue/tmp')
- p('chmod -R a-w /var/tmp/launchpad_mailqueue/')
- # close ssh connection
- user_connection.close()
-
- def run_demo_server(self):
- """Turn ec2 instance into a demo server."""
- self.configure_system()
- self.prepare_tests()
- user_connection = self._instance.connect()
- p = user_connection.perform
- p('make -C /var/launchpad/test schema')
- p('mkdir -p /var/tmp/bazaar.launchpad.dev/static')
- p('mkdir -p /var/tmp/bazaar.launchpad.dev/mirrors')
- p('sudo a2enmod proxy > /dev/null')
- p('sudo a2enmod proxy_http > /dev/null')
- p('sudo a2enmod rewrite > /dev/null')
- p('sudo a2enmod ssl > /dev/null')
- p('sudo a2enmod deflate > /dev/null')
- p('sudo a2enmod headers > /dev/null')
- # Install apache config file.
- p('cd /var/launchpad/test/; sudo make install')
- # Use raw string to eliminate the need to escape the backslash.
- # Put eth0's ip address in the /tmp/ip file.
- p(r"ifconfig eth0 | grep 'inet addr' "
- r"| sed -re 's/.*addr:([0-9.]*) .*/\1/' > /tmp/ip")
- # Replace 127.0.0.88 in Launchpad's apache config file with the
- # ip address just stored in the /tmp/ip file. Perl allows for
- # inplace editing unlike sed.
- p('sudo perl -pi -e "s/127.0.0.88/$(cat /tmp/ip)/g" '
- '/etc/apache2/sites-available/local-launchpad')
- # Restart apache.
- p('sudo /etc/init.d/apache2 restart')
- # Build mailman and minified javascript, etc.
- p('cd /var/launchpad/test/; make')
- # Start launchpad in the background.
- p('cd /var/launchpad/test/; make start')
- # close ssh connection
- user_connection.close()
-
- def _build_command(self):
- """Build the command that we'll use to run the tests."""
- # Make sure we activate the failsafe --shutdown feature. This will
- # make the server shut itself down after the test run completes, or
- # if the test harness suffers a critical failure.
- cmd = ['python /var/launchpad/ec2test-remote.py --shutdown']
-
- # Do we want to email the results to the user?
- if self.email:
- for email in self.email:
- cmd.append("--email='%s'" % (
- email.encode('utf8').encode('string-escape'),))
-
- # Do we want to submit the branch to PQM if the tests pass?
- if self.message is not None:
- cmd.append(
- "--submit-pqm-message='%s'" % (
- pickle.dumps(
- self.message).encode(
- 'base64').encode('string-escape'),))
-
- # Do we want to disconnect the terminal once the test run starts?
- if self.headless:
- cmd.append('--daemon')
-
- # Which branch do we want to test?
- if self._branch is not None:
- branch = self._branch
- remote_branch = Branch.open(branch)
- branch_revno = remote_branch.revno()
- else:
- branch = self._trunk_branch
- branch_revno = None
- cmd.append('--public-branch=%s' % branch)
- if branch_revno is not None:
- cmd.append('--public-branch-revno=%d' % branch_revno)
-
- # Add any additional options for ec2test-remote.py
- cmd.extend(['--', self.test_options])
- return ' '.join(cmd)
-
- def run_tests(self):
- self.configure_system()
- self.prepare_tests()
-
- self.log(
- 'Running tests... (output is available on '
- 'http://%s/)\n' % self._instance.hostname)
-
- # Try opening a browser pointed at the current test results.
- if self.open_browser:
- try:
- import webbrowser
- except ImportError:
- self.log("Could not open web browser due to ImportError.")
- else:
- status = webbrowser.open(self._instance.hostname)
- if not status:
- self.log("Could not open web browser.")
-
- # Run the remote script! Our execution will block here until the
- # remote side disconnects from the terminal.
- cmd = self._build_command()
- user_connection = self._instance.connect()
- user_connection.perform(cmd)
- self._running = True
-
- if not self.headless:
- # We ran to completion locally, so we'll be in charge of shutting
- # down the instance, in case the user has requested a postmortem.
- #
- # We only have 60 seconds to do this before the remote test
- # script shuts the server down automatically.
- user_connection.perform(
- 'kill `cat /var/launchpad/ec2test-remote.pid`')
-
- # deliver results as requested
- if self.file:
- self.log(
- 'Writing abridged test results to %s.\n' % self.file)
- user_connection.sftp.get('/var/www/summary.log', self.file)
- user_connection.close()
=== removed directory 'lib/devscripts/ec2test/tests'
=== removed file 'lib/devscripts/ec2test/tests/__init__.py'
--- lib/devscripts/ec2test/tests/__init__.py 2009-10-05 19:20:37 +0000
+++ lib/devscripts/ec2test/tests/__init__.py 1970-01-01 00:00:00 +0000
@@ -1,2 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
=== removed file 'lib/devscripts/ec2test/tests/remote_daemonization_test.py'
--- lib/devscripts/ec2test/tests/remote_daemonization_test.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/tests/remote_daemonization_test.py 1970-01-01 00:00:00 +0000
@@ -1,53 +0,0 @@
-# Copyright 2010 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Script executed by test_remote.py to verify daemonization behaviour.
-
-See TestDaemonizationInteraction.
-"""
-
-import os
-import sys
-import traceback
-
-from devscripts.ec2test.remote import (
- EC2Runner,
- WebTestLogger,
- )
-from devscripts.ec2test.tests.test_remote import TestRequest
-
-
-PID_FILENAME = os.path.abspath(sys.argv[1])
-DIRECTORY = os.path.abspath(sys.argv[2])
-LOG_FILENAME = os.path.abspath(sys.argv[3])
-
-
-def make_request():
- """Just make a request."""
- test = TestRequest('test_wants_email')
- test.setUp()
- try:
- return test.make_request()
- finally:
- test.tearDown()
-
-
-def prepare_files(logger):
- try:
- logger.prepare()
- except:
- # If anything in the above fails, we want to be able to find out about
- # it. We can't use stdout or stderr because this is a daemon.
- error_log = open(LOG_FILENAME, 'w')
- traceback.print_exc(file=error_log)
- error_log.close()
-
-
-request = make_request()
-os.mkdir(DIRECTORY)
-logger = WebTestLogger.make_in_directory(DIRECTORY, request, True)
-runner = EC2Runner(
- daemonize=True,
- pid_filename=PID_FILENAME,
- shutdown_when_done=False)
-runner.run("test daemonization interaction", prepare_files, logger)
=== removed file 'lib/devscripts/ec2test/tests/test_ec2instance.py'
--- lib/devscripts/ec2test/tests/test_ec2instance.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/tests/test_ec2instance.py 1970-01-01 00:00:00 +0000
@@ -1,140 +0,0 @@
-# Copyright 2009-2010 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-# pylint: disable-msg=E0702
-
-"""Test handling of EC2 machine images."""
-
-__metaclass__ = type
-
-from unittest import TestCase
-
-from devscripts.ec2test.instance import EC2Instance
-from lp.testing.fakemethod import FakeMethod
-
-
-class FakeAccount:
- """Helper for setting up an `EC2Instance` without EC2."""
- acquire_private_key = FakeMethod()
- acquire_security_group = FakeMethod()
-
-
-class FakeOutput:
- """Pretend stdout/stderr output from EC2 instance."""
- output = "Fake output."
-
-
-class FakeBotoInstance:
- """Helper for setting up an `EC2Instance` without EC2."""
- id = 0
- state = 'running'
- public_dns_name = 'fake-instance'
-
- update = FakeMethod()
- stop = FakeMethod()
- get_console_output = FakeOutput
-
-
-class FakeReservation:
- """Helper for setting up an `EC2Instance` without EC2."""
- def __init__(self):
- self.instances = [FakeBotoInstance()]
-
-
-class FakeImage:
- """Helper for setting up an `EC2Instance` without EC2."""
- run = FakeMethod(result=FakeReservation())
-
-
-class FakeFailure(Exception):
- """A pretend failure from the test runner."""
-
-
-class TestEC2Instance(TestCase):
- """Test running of an `EC2Instance` without EC2."""
-
- def _makeInstance(self):
- """Set up an `EC2Instance`, with stubbing where needed.
-
- `EC2Instance.shutdown` is replaced with a `FakeMethod`, so check
- its call_count to see whether it's been invoked.
- """
- session_name = None
- image = FakeImage()
- instance_type = 'c1.xlarge'
- demo_networks = None
- account = FakeAccount()
- from_scratch = None
- user_key = None
- login = None
- region = None
-
- instance = EC2Instance(
- session_name, image, instance_type, demo_networks, account,
- from_scratch, user_key, login,
- region)
-
- instance.shutdown = FakeMethod()
- instance._report_traceback = FakeMethod()
- instance.log = FakeMethod()
-
- return instance
-
- def _runInstance(self, instance, runnee=None, headless=False):
- """Set up and run an `EC2Instance` (but without EC2)."""
- if runnee is None:
- runnee = FakeMethod()
-
- instance.set_up_and_run(False, not headless, runnee)
-
- def test_EC2Instance_test_baseline(self):
- # The EC2 instances we set up have neither started nor been shut
- # down. After running, they have started.
- # Not a very useful test, except it establishes the basic
- # assumptions for the other tests.
- instance = self._makeInstance()
- runnee = FakeMethod()
-
- self.assertEqual(0, runnee.call_count)
- self.assertEqual(0, instance.shutdown.call_count)
-
- self._runInstance(instance, runnee=runnee)
-
- self.assertEqual(1, runnee.call_count)
-
- def test_set_up_and_run_headful(self):
- # A non-headless run executes all tests in the instance, then
- # shuts down.
- instance = self._makeInstance()
-
- self._runInstance(instance, headless=False)
-
- self.assertEqual(1, instance.shutdown.call_count)
-
- def test_set_up_and_run_headless(self):
- # An asynchronous, headless run kicks off the tests on the
- # instance but does not shut it down.
- instance = self._makeInstance()
-
- self._runInstance(instance, headless=True)
-
- self.assertEqual(0, instance.shutdown.call_count)
-
- def test_set_up_and_run_headful_failure(self):
- # If the test runner barfs, the instance swallows the exception
- # and shuts down.
- instance = self._makeInstance()
- runnee = FakeMethod(failure=FakeFailure("Headful barfage."))
-
- self._runInstance(instance, runnee=runnee, headless=False)
-
- self.assertEqual(1, instance.shutdown.call_count)
-
- def test_set_up_and_run_headless_failure(self):
- # If the instance's test runner fails to set up for a headless
- # run, the instance swallows the exception and shuts down.
- instance = self._makeInstance()
- runnee = FakeMethod(failure=FakeFailure("Headless boom."))
-
- self._runInstance(instance, runnee=runnee, headless=True)
-
- self.assertEqual(1, instance.shutdown.call_count)
=== removed file 'lib/devscripts/ec2test/tests/test_remote.py'
--- lib/devscripts/ec2test/tests/test_remote.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/tests/test_remote.py 1970-01-01 00:00:00 +0000
@@ -1,1084 +0,0 @@
-# Copyright 2010 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Tests for the script run on the remote server."""
-
-__metaclass__ = type
-
-from datetime import (
- datetime,
- timedelta,
- )
-import doctest
-from email.mime.application import MIMEApplication
-from email.mime.text import MIMEText
-import gzip
-from itertools import izip
-import os
-from StringIO import StringIO
-import subprocess
-import sys
-import tempfile
-import time
-import traceback
-import unittest
-
-from bzrlib.config import GlobalConfig
-from bzrlib.tests import TestCaseWithTransport
-import simplejson
-from testtools import (
- TestCase,
- TestResult,
- )
-from testtools.content import Content
-from testtools.content_type import ContentType
-from testtools.matchers import DocTestMatches
-
-from devscripts.ec2test.remote import (
- EC2Runner,
- FailureUpdateResult,
- gunzip_data,
- gzip_data,
- LaunchpadTester,
- remove_pidfile,
- Request,
- SummaryResult,
- WebTestLogger,
- write_pidfile,
- )
-
-
-class LoggingSMTPConnection(object):
- """An SMTPConnection double that logs sent email."""
-
- def __init__(self, log):
- self._log = log
-
- def send_email(self, message):
- self._log.append(message)
-
-
-class RequestHelpers:
-
- def patch(self, obj, name, value):
- orig = getattr(obj, name)
- setattr(obj, name, value)
- self.addCleanup(setattr, obj, name, orig)
- return orig
-
- def make_trunk(self, parent_url='http://example.com/bzr/trunk'):
- """Make a trunk branch suitable for use with `Request`.
-
- `Request` expects to be given a path to a working tree that has a
- branch with a configured parent URL, so this helper returns such a
- working tree.
- """
- nick = parent_url.strip('/').split('/')[-1]
- tree = self.make_branch_and_tree(nick)
- tree.branch.set_parent(parent_url)
- return tree
-
- def make_request(self, branch_url=None, revno=None,
- trunk=None, sourcecode_path=None,
- emails=None, pqm_message=None, emails_sent=None):
- """Make a request to test, specifying only things we care about.
-
- Note that the returned request object will not ever send email, but
- will instead append "sent" emails to the list provided here as
- 'emails_sent'.
- """
- if trunk is None:
- trunk = self.make_trunk()
- if sourcecode_path is None:
- sourcecode_path = self.make_sourcecode(
- [('a', 'http://example.com/bzr/a', 2),
- ('b', 'http://example.com/bzr/b', 3),
- ('c', 'http://example.com/bzr/c', 5)])
- if emails_sent is None:
- emails_sent = []
- smtp_connection = LoggingSMTPConnection(emails_sent)
- request = Request(
- branch_url, revno, trunk.basedir, sourcecode_path, emails,
- pqm_message, smtp_connection)
- return request
-
- def make_sourcecode(self, branches):
- """Make a sourcecode directory with sample branches.
-
- :param branches: A list of (name, parent_url, revno) tuples.
- :return: The path to the sourcecode directory.
- """
- self.build_tree(['sourcecode/'])
- for name, parent_url, revno in branches:
- tree = self.make_branch_and_tree('sourcecode/%s' % (name,))
- tree.branch.set_parent(parent_url)
- for i in range(revno):
- tree.commit(message=str(i))
- return 'sourcecode/'
-
- def make_tester(self, logger=None, test_directory=None, test_options=()):
- if not logger:
- logger = self.make_logger()
- return LaunchpadTester(logger, test_directory, test_options)
-
- def make_logger(self, request=None, echo_to_stdout=False):
- if request is None:
- request = self.make_request()
- return WebTestLogger(
- 'full.log', 'summary.log', 'index.html', request, echo_to_stdout)
-
-
-class TestSummaryResult(TestCase):
- """Tests for `SummaryResult`."""
-
- def makeException(self, factory=None, *args, **kwargs):
- if factory is None:
- factory = RuntimeError
- try:
- raise factory(*args, **kwargs)
- except:
- return sys.exc_info()
-
- def test_formatError(self):
- # SummaryResult._formatError() combines the name of the test, the kind
- # of error and the details of the error in a nicely-formatted way.
- result = SummaryResult(None)
- output = result._formatError('FOO', 'test', 'error')
- expected = '%s\nFOO: test\n%s\nerror\n' % (
- result.double_line, result.single_line)
- self.assertEqual(expected, output)
-
- def test_addError(self):
- # SummaryResult.addError doesn't write immediately.
- stream = StringIO()
- test = self
- error = self.makeException()
- result = SummaryResult(stream)
- expected = result._formatError(
- 'ERROR', test, result._exc_info_to_string(error, test))
- result.addError(test, error)
- self.assertEqual(expected, stream.getvalue())
-
- def test_addFailure_does_not_write_immediately(self):
- # SummaryResult.addFailure doesn't write immediately.
- stream = StringIO()
- test = self
- error = self.makeException()
- result = SummaryResult(stream)
- expected = result._formatError(
- 'FAILURE', test, result._exc_info_to_string(error, test))
- result.addFailure(test, error)
- self.assertEqual(expected, stream.getvalue())
-
- def test_stopTest_flushes_stream(self):
- # SummaryResult.stopTest() flushes the stream.
- stream = StringIO()
- flush_calls = []
- stream.flush = lambda: flush_calls.append(None)
- result = SummaryResult(stream)
- result.stopTest(self)
- self.assertEqual(1, len(flush_calls))
-
-
-class TestFailureUpdateResult(TestCaseWithTransport, RequestHelpers):
-
- def makeException(self, factory=None, *args, **kwargs):
- if factory is None:
- factory = RuntimeError
- try:
- raise factory(*args, **kwargs)
- except:
- return sys.exc_info()
-
- def test_addError_is_unsuccessful(self):
- logger = self.make_logger()
- result = FailureUpdateResult(logger)
- result.addError(self, self.makeException())
- self.assertEqual(False, logger.successful)
-
- def test_addFailure_is_unsuccessful(self):
- logger = self.make_logger()
- result = FailureUpdateResult(logger)
- result.addFailure(self, self.makeException(AssertionError))
- self.assertEqual(False, logger.successful)
-
-
-class FakePopen:
- """Fake Popen object so we don't have to spawn processes in tests."""
-
- def __init__(self, output, exit_status):
- self.stdout = StringIO(output)
- self._exit_status = exit_status
-
- def wait(self):
- return self._exit_status
-
-
-class TestLaunchpadTester(TestCaseWithTransport, RequestHelpers):
-
- def test_build_test_command_no_options(self):
- # The LaunchpadTester runs "make check" if given no options.
- tester = self.make_tester()
- command = tester.build_test_command()
- self.assertEqual(['make', 'check'], command)
-
- def test_build_test_command_options(self):
- # The LaunchpadTester runs 'make check TESTOPTIONS="<options>"' if
- # given options.
- tester = self.make_tester(test_options=('-vvv', '--subunit'))
- command = tester.build_test_command()
- self.assertEqual(
- ['make', 'check', 'TESTOPTS="-vvv --subunit"'], command)
-
- def test_spawn_test_process(self):
- # _spawn_test_process uses subprocess.Popen to run the command
- # returned by build_test_command. stdout & stderr are piped together,
- # the cwd is the test directory specified in the constructor, and the
- # bufsize is zore, meaning "don't buffer".
- popen_calls = []
- self.patch(
- subprocess, 'Popen',
- lambda *args, **kwargs: popen_calls.append((args, kwargs)))
- tester = self.make_tester(test_directory='test-directory')
- tester._spawn_test_process()
- self.assertEqual(
- [((tester.build_test_command(),),
- {'bufsize': 0,
- 'stdout': subprocess.PIPE,
- 'stderr': subprocess.STDOUT,
- 'cwd': 'test-directory'})], popen_calls)
-
- def test_running_test(self):
- # LaunchpadTester.test() runs the test command, and then calls
- # got_result with the result. This test is more of a smoke test to
- # make sure that everything integrates well.
- message = {'Subject': "One Crowded Hour"}
- log = []
- request = self.make_request(pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- tester = self.make_tester(logger=logger)
- output = "test output\n"
- tester._spawn_test_process = lambda: FakePopen(output, 0)
- tester.test()
- # Message being sent implies got_result thought it got a success.
- self.assertEqual([message], log)
-
- def test_failing_test(self):
- # If LaunchpadTester gets a failing test, then it records that on the
- # logger.
- logger = self.make_logger()
- tester = self.make_tester(logger=logger)
- output = "test: foo\nerror: foo\n"
- tester._spawn_test_process = lambda: FakePopen(output, 0)
- tester.test()
- self.assertEqual(False, logger.successful)
-
- def test_error_in_testrunner(self):
- # Any exception is raised within LaunchpadTester.test() is an error in
- # the testrunner. When we detect these, we do three things:
- # 1. Log the error to the logger using error_in_testrunner
- # 2. Call got_result with a False value, indicating test suite
- # failure.
- # 3. Re-raise the error. In the script, this triggers an email.
- message = {'Subject': "One Crowded Hour"}
- log = []
- request = self.make_request(pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- tester = self.make_tester(logger=logger)
- # Break the test runner deliberately. In production, this is more
- # likely to be a system error than a programming error.
- tester._spawn_test_process = lambda: 1/0
- tester.test()
- # Message not being sent implies got_result thought it got a failure.
- self.assertEqual([], log)
- self.assertIn("ERROR IN TESTRUNNER", logger.get_summary_contents())
- self.assertIn("ZeroDivisionError", logger.get_summary_contents())
-
- def test_nonzero_exit_code(self):
- message = {'Subject': "One Crowded Hour"}
- log = []
- request = self.make_request(pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- tester = self.make_tester(logger=logger)
- output = "test output\n"
- tester._spawn_test_process = lambda: FakePopen(output, 10)
- tester.test()
- # Message not being sent implies got_result thought it got a failure.
- self.assertEqual([], log)
-
- def test_gather_test_output(self):
- # LaunchpadTester._gather_test_output() summarises the output
- # stream as a TestResult.
- logger = self.make_logger()
- tester = self.make_tester(logger=logger)
- result = tester._gather_test_output(
- ['test: test_failure', 'failure: test_failure',
- 'test: test_success', 'successful: test_success'],
- logger)
- self.assertEquals(2, result.testsRun)
- self.assertEquals(1, len(result.failures))
-
-
-class TestPidfileHelpers(TestCase):
- """Tests for `write_pidfile` and `remove_pidfile`."""
-
- def test_write_pidfile(self):
- fd, path = tempfile.mkstemp()
- self.addCleanup(os.unlink, path)
- os.close(fd)
- write_pidfile(path)
- self.assertEqual(os.getpid(), int(open(path, 'r').read()))
-
- def test_remove_pidfile(self):
- fd, path = tempfile.mkstemp()
- os.close(fd)
- write_pidfile(path)
- remove_pidfile(path)
- self.assertEqual(False, os.path.exists(path))
-
- def test_remove_nonexistent_pidfile(self):
- directory = tempfile.mkdtemp()
- path = os.path.join(directory, 'doesntexist')
- remove_pidfile(path)
- self.assertEqual(False, os.path.exists(path))
-
-
-class TestGzip(TestCase):
- """Tests for gzip helpers."""
-
- def test_gzip_data(self):
- data = 'foobarbaz\n'
- compressed = gzip_data(data)
- fd, path = tempfile.mkstemp()
- os.write(fd, compressed)
- os.close(fd)
- self.assertEqual(data, gzip.open(path, 'r').read())
-
- def test_gunzip_data(self):
- data = 'foobarbaz\n'
- compressed = gzip_data(data)
- self.assertEqual(data, gunzip_data(compressed))
-
-
-class TestRequest(TestCaseWithTransport, RequestHelpers):
- """Tests for `Request`."""
-
- def test_doesnt_want_email(self):
- # If no email addresses were provided, then the user does not want to
- # receive email.
- req = self.make_request()
- self.assertEqual(False, req.wants_email)
-
- def test_wants_email(self):
- # If some email addresses were provided, then the user wants to
- # receive email.
- req = self.make_request(emails=['foo@xxxxxxxxxxx'])
- self.assertEqual(True, req.wants_email)
-
- def test_get_target_details(self):
- parent = 'http://example.com/bzr/branch'
- tree = self.make_trunk(parent)
- req = self.make_request(trunk=tree)
- self.assertEqual(
- (parent, tree.branch.revno()), req.get_target_details())
-
- def test_get_revno_target_only(self):
- # If there's only a target branch, then the revno is the revno of that
- # branch.
- parent = 'http://example.com/bzr/branch'
- tree = self.make_trunk(parent)
- req = self.make_request(trunk=tree)
- self.assertEqual(tree.branch.revno(), req.get_revno())
-
- def test_get_revno_source_and_target(self):
- # If we're merging in a branch, then the revno is the revno of the
- # branch we're merging in.
- tree = self.make_trunk()
- # Fake a merge, giving silly revision ids.
- tree.add_pending_merge('foo', 'bar')
- req = self.make_request(
- branch_url='https://example.com/bzr/thing', revno=42, trunk=tree)
- self.assertEqual(42, req.get_revno())
-
- def test_get_source_details_no_commits(self):
- req = self.make_request(trunk=self.make_trunk())
- self.assertEqual(None, req.get_source_details())
-
- def test_get_source_details_no_merge(self):
- tree = self.make_trunk()
- tree.commit(message='foo')
- req = self.make_request(trunk=tree)
- self.assertEqual(None, req.get_source_details())
-
- def test_get_source_details_merge(self):
- tree = self.make_trunk()
- # Fake a merge, giving silly revision ids.
- tree.add_pending_merge('foo', 'bar')
- req = self.make_request(
- branch_url='https://example.com/bzr/thing', revno=42, trunk=tree)
- self.assertEqual(
- ('https://example.com/bzr/thing', 42), req.get_source_details())
-
- def test_get_nick_trunk_only(self):
- tree = self.make_trunk(parent_url='http://example.com/bzr/db-devel')
- req = self.make_request(trunk=tree)
- self.assertEqual('db-devel', req.get_nick())
-
- def test_get_nick_merge(self):
- tree = self.make_trunk()
- # Fake a merge, giving silly revision ids.
- tree.add_pending_merge('foo', 'bar')
- req = self.make_request(
- branch_url='https://example.com/bzr/thing', revno=42, trunk=tree)
- self.assertEqual('thing', req.get_nick())
-
- def test_get_merge_description_trunk_only(self):
- tree = self.make_trunk(parent_url='http://example.com/bzr/db-devel')
- req = self.make_request(trunk=tree)
- self.assertEqual(
- 'db-devel r%s' % req.get_revno(), req.get_merge_description())
-
- def test_get_merge_description_merge(self):
- tree = self.make_trunk(parent_url='http://example.com/bzr/db-devel/')
- tree.add_pending_merge('foo', 'bar')
- req = self.make_request(
- branch_url='https://example.com/bzr/thing', revno=42, trunk=tree)
- self.assertEqual('thing => db-devel', req.get_merge_description())
-
- def test_get_summary_commit(self):
- # The summary commit message is the last commit message of the branch
- # we're merging in.
- trunk = self.make_trunk()
- trunk.commit(message="a starting point")
- thing_bzrdir = trunk.branch.bzrdir.sprout('thing')
- thing = thing_bzrdir.open_workingtree()
- thing.commit(message="a new thing")
- trunk.merge_from_branch(thing.branch)
- req = self.make_request(
- branch_url='https://example.com/bzr/thing',
- revno=thing.branch.revno(),
- trunk=trunk)
- self.assertEqual("a new thing", req.get_summary_commit())
-
- def test_iter_dependency_branches(self):
- # iter_dependency_branches yields a list of branches in the sourcecode
- # directory, along with their parent URLs and their revnos.
- sourcecode_branches = [
- ('b', 'http://example.com/parent-b', 3),
- ('a', 'http://example.com/parent-a', 2),
- ('c', 'http://example.com/parent-c', 5),
- ]
- sourcecode_path = self.make_sourcecode(sourcecode_branches)
- self.build_tree(
- ['%s/not-a-branch/' % sourcecode_path,
- '%s/just-a-file' % sourcecode_path])
- req = self.make_request(sourcecode_path=sourcecode_path)
- branches = list(req.iter_dependency_branches())
- self.assertEqual(sorted(sourcecode_branches), branches)
-
- def test_submit_to_pqm_no_message(self):
- # If there's no PQM message, then 'submit_to_pqm' returns None.
- req = self.make_request(pqm_message=None)
- subject = req.submit_to_pqm(successful=True)
- self.assertIs(None, subject)
-
- def test_submit_to_pqm_no_message_doesnt_send(self):
- # If there's no PQM message, then 'submit_to_pqm' returns None.
- log = []
- req = self.make_request(pqm_message=None, emails_sent=log)
- req.submit_to_pqm(successful=True)
- self.assertEqual([], log)
-
- def test_submit_to_pqm_unsuccessful(self):
- # submit_to_pqm returns the subject of the PQM mail even if it's
- # handling a failed test run.
- message = {'Subject': 'My PQM message'}
- req = self.make_request(pqm_message=message)
- subject = req.submit_to_pqm(successful=False)
- self.assertIs(message.get('Subject'), subject)
-
- def test_submit_to_pqm_unsuccessful_no_email(self):
- # submit_to_pqm doesn't send any email if the run was unsuccessful.
- message = {'Subject': 'My PQM message'}
- log = []
- req = self.make_request(pqm_message=message, emails_sent=log)
- req.submit_to_pqm(successful=False)
- self.assertEqual([], log)
-
- def test_submit_to_pqm_successful(self):
- # submit_to_pqm returns the subject of the PQM mail.
- message = {'Subject': 'My PQM message'}
- log = []
- req = self.make_request(pqm_message=message, emails_sent=log)
- subject = req.submit_to_pqm(successful=True)
- self.assertIs(message.get('Subject'), subject)
- self.assertEqual([message], log)
-
- def test_report_email_subject_success(self):
- req = self.make_request(emails=['foo@xxxxxxxxxxx'])
- email = req._build_report_email(True, 'foo', 'gobbledygook')
- self.assertEqual(
- 'Test results: %s: SUCCESS' % req.get_merge_description(),
- email['Subject'])
-
- def test_report_email_subject_failure(self):
- req = self.make_request(emails=['foo@xxxxxxxxxxx'])
- email = req._build_report_email(False, 'foo', 'gobbledygook')
- self.assertEqual(
- 'Test results: %s: FAILURE' % req.get_merge_description(),
- email['Subject'])
-
- def test_report_email_recipients(self):
- req = self.make_request(emails=['foo@xxxxxxxxxxx', 'bar@xxxxxxxxxxx'])
- email = req._build_report_email(False, 'foo', 'gobbledygook')
- self.assertEqual('foo@xxxxxxxxxxx, bar@xxxxxxxxxxx', email['To'])
-
- def test_report_email_sender(self):
- req = self.make_request(emails=['foo@xxxxxxxxxxx'])
- email = req._build_report_email(False, 'foo', 'gobbledygook')
- self.assertEqual(GlobalConfig().username(), email['From'])
-
- def test_report_email_body(self):
- req = self.make_request(emails=['foo@xxxxxxxxxxx'])
- email = req._build_report_email(False, 'foo', 'gobbledygook')
- [body, attachment] = email.get_payload()
- self.assertIsInstance(body, MIMEText)
- self.assertEqual('inline', body['Content-Disposition'])
- self.assertIn(
- body['Content-Type'],
- ['text/plain; charset="utf-8"', 'text/plain; charset="utf8"'])
- self.assertEqual("foo", body.get_payload(decode=True))
-
- def test_report_email_attachment(self):
- req = self.make_request(emails=['foo@xxxxxxxxxxx'])
- email = req._build_report_email(False, "foo", "gobbledygook")
- [body, attachment] = email.get_payload()
- self.assertIsInstance(attachment, MIMEApplication)
- self.assertEqual('application/x-gzip', attachment['Content-Type'])
- self.assertEqual(
- 'attachment; filename="%s-r%s.subunit.gz"' % (
- req.get_nick(), req.get_revno()),
- attachment['Content-Disposition'])
- self.assertEqual(
- "gobbledygook", attachment.get_payload(decode=True))
-
- def test_send_report_email_sends_email(self):
- log = []
- req = self.make_request(emails=['foo@xxxxxxxxxxx'], emails_sent=log)
- expected = req._build_report_email(False, "foo", "gobbledygook")
- req.send_report_email(False, "foo", "gobbledygook")
- [observed] = log
- # The standard library sucks. None of the MIME objects have __eq__
- # implementations.
- for expected_part, observed_part in izip(
- expected.walk(), observed.walk()):
- self.assertEqual(type(expected_part), type(observed_part))
- self.assertEqual(expected_part.items(), observed_part.items())
- self.assertEqual(
- expected_part.is_multipart(), observed_part.is_multipart())
- if not expected_part.is_multipart():
- self.assertEqual(
- expected_part.get_payload(), observed_part.get_payload())
-
- def test_format_result_success(self):
-
- class SomeTest(TestCase):
-
- def test_a(self):
- pass
-
- def test_b(self):
- pass
-
- def test_c(self):
- pass
-
- test = unittest.TestSuite(map(SomeTest, ['test_' + x for x in 'abc']))
- result = TestResult()
- test.run(result)
- tree = self.make_trunk()
- # Fake a merge, giving silly revision ids.
- tree.add_pending_merge('foo', 'bar')
- req = self.make_request(
- branch_url='https://example.com/bzr/thing', revno=42, trunk=tree)
- source_branch, source_revno = req.get_source_details()
- target_branch, target_revno = req.get_target_details()
- start_time = datetime.utcnow()
- end_time = start_time + timedelta(hours=1)
- data = {
- 'source_branch': source_branch,
- 'source_revno': source_revno,
- 'target_branch': target_branch,
- 'target_revno': target_revno,
- 'start_time': str(start_time),
- 'duration': str(end_time - start_time),
- 'num_tests': result.testsRun,
- 'num_failures': len(result.failures),
- 'num_errors': len(result.errors),
- }
- result_text = req.format_result(result, start_time, end_time)
- self.assertThat(
- result_text, DocTestMatches("""\
-Tests started at approximately %(start_time)s
-Source: %(source_branch)s r%(source_revno)s
-Target: %(target_branch)s r%(target_revno)s
-<BLANKLINE>
-%(num_tests)s tests run in %(duration)s, %(num_failures)s failures, \
-%(num_errors)s errors
-<BLANKLINE>
-(See the attached file for the complete log)
-""" % data, doctest.REPORT_NDIFF | doctest.ELLIPSIS))
-
- def test_format_result_with_errors(self):
-
- class SomeTest(TestCase):
-
- def test_ok(self):
- pass
-
- def test_fail(self):
- self.fail("oh no")
-
- def test_error(self):
- 1/0
-
- fail_test = SomeTest('test_fail')
- error_test = SomeTest('test_error')
- test = unittest.TestSuite(
- [fail_test, error_test, SomeTest('test_ok')])
- result = TestResult()
- test.run(result)
- tree = self.make_trunk()
- # Fake a merge, giving silly revision ids.
- tree.add_pending_merge('foo', 'bar')
- req = self.make_request(
- branch_url='https://example.com/bzr/thing', revno=42, trunk=tree)
- source_branch, source_revno = req.get_source_details()
- target_branch, target_revno = req.get_target_details()
- start_time = datetime.utcnow()
- end_time = start_time + timedelta(hours=1)
- data = {
- 'source_branch': source_branch,
- 'source_revno': source_revno,
- 'target_branch': target_branch,
- 'target_revno': target_revno,
- 'start_time': str(start_time),
- 'duration': str(end_time - start_time),
- 'fail_id': fail_test.id(),
- 'error_id': error_test.id(),
- 'num_tests': result.testsRun,
- 'num_failures': len(result.failures),
- 'num_errors': len(result.errors),
- }
- result_text = req.format_result(result, start_time, end_time)
- self.assertThat(
- result_text, DocTestMatches("""\
-Tests started at approximately %(start_time)s
-Source: %(source_branch)s r%(source_revno)s
-Target: %(target_branch)s r%(target_revno)s
-<BLANKLINE>
-%(num_tests)s tests run in %(duration)s, %(num_failures)s failures, \
-%(num_errors)s errors
-<BLANKLINE>
-Failing tests
--------------
- %(fail_id)s
-<BLANKLINE>
-Tests with errors
------------------
- %(error_id)s
-<BLANKLINE>
-======================================================================
-FAILURE: test_fail...
-----------------------------------------------------------------------
-Traceback (most recent call last):
-...
-<BLANKLINE>
-======================================================================
-ERROR: test_error...
-----------------------------------------------------------------------
-Traceback (most recent call last):
-...
-<BLANKLINE>
-<BLANKLINE>
-(See the attached file for the complete log)
-""" % data, doctest.REPORT_NDIFF | doctest.ELLIPSIS))
-
-
-class TestWebTestLogger(TestCaseWithTransport, RequestHelpers):
-
- def test_make_in_directory(self):
- # WebTestLogger.make_in_directory constructs a logger that writes to a
- # bunch of specific files in a directory.
- self.build_tree(['www/'])
- request = self.make_request()
- logger = WebTestLogger.make_in_directory('www', request, False)
- # A method on logger that writes to _everything_.
- logger.prepare()
- self.assertEqual(
- logger.get_summary_contents(), open('www/summary.log').read())
- self.assertEqual(
- logger.get_full_log_contents(),
- open('www/current_test.log').read())
- self.assertEqual(
- logger.get_index_contents(), open('www/index.html').read())
-
- def test_initial_full_log(self):
- # Initially, the full log has nothing in it.
- logger = self.make_logger()
- self.assertEqual('', logger.get_full_log_contents())
-
- def test_initial_summary_contents(self):
- # Initially, the summary log has nothing in it.
- logger = self.make_logger()
- self.assertEqual('', logger.get_summary_contents())
-
- def test_initial_json(self):
- self.build_tree(['www/'])
- request = self.make_request()
- logger = WebTestLogger.make_in_directory('www', request, False)
- logger.prepare()
- self.assertEqual(
- {'description': request.get_merge_description(),
- 'failed-yet': False,
- },
- simplejson.loads(open('www/info.json').read()))
-
- def test_initial_success(self):
- # The Logger initially thinks it is successful because there have been
- # no failures yet.
- logger = self.make_logger()
- self.assertEqual(True, logger.successful)
-
- def test_got_failure_changes_success(self):
- # Logger.got_failure() tells the logger it is no longer successful.
- logger = self.make_logger()
- logger.got_failure()
- self.assertEqual(False, logger.successful)
-
- def test_got_failure_updates_json(self):
- # Logger.got_failure() updates JSON so that interested parties can
- # determine that it is unsuccessful.
- self.build_tree(['www/'])
- request = self.make_request()
- logger = WebTestLogger.make_in_directory('www', request, False)
- logger.prepare()
- logger.got_failure()
- self.assertEqual(
- {'description': request.get_merge_description(),
- 'failed-yet': True,
- },
- simplejson.loads(open('www/info.json').read()))
-
- def test_got_line_no_echo(self):
- # got_line forwards the line to the full log, but does not forward to
- # stdout if echo_to_stdout is False.
- stdout = StringIO()
- self.patch(sys, 'stdout', stdout)
- logger = self.make_logger(echo_to_stdout=False)
- logger.got_line("output from script\n")
- self.assertEqual(
- "output from script\n", logger.get_full_log_contents())
- self.assertEqual("", stdout.getvalue())
-
- def test_got_line_echo(self):
- # got_line forwards the line to the full log, and to stdout if
- # echo_to_stdout is True.
- stdout = StringIO()
- self.patch(sys, 'stdout', stdout)
- logger = self.make_logger(echo_to_stdout=True)
- logger.got_line("output from script\n")
- self.assertEqual(
- "output from script\n", logger.get_full_log_contents())
- self.assertEqual("output from script\n", stdout.getvalue())
-
- def test_write_line(self):
- # write_line writes a line to both the full log and the summary log.
- logger = self.make_logger()
- logger.write_line('foo')
- self.assertEqual('foo\n', logger.get_full_log_contents())
- self.assertEqual('foo\n', logger.get_summary_contents())
-
- def test_error_in_testrunner_logs_to_summary(self):
- # error_in_testrunner logs the traceback to the summary log in a very
- # prominent way.
- try:
- 1/0
- except ZeroDivisionError:
- exc_info = sys.exc_info()
- stack = ''.join(traceback.format_exception(*exc_info))
- logger = self.make_logger()
- logger.error_in_testrunner(exc_info)
- self.assertEqual(
- "\n\nERROR IN TESTRUNNER\n\n%s" % (stack,),
- logger.get_summary_contents())
-
- def test_error_in_testrunner_sends_email(self):
- # If email addresses are configurd, error_in_testrunner emails them
- # with the failure and the full log.
- try:
- 1/0
- except ZeroDivisionError:
- exc_info = sys.exc_info()
- log = []
- request = self.make_request(
- emails=['foo@xxxxxxxxxxx'], emails_sent=log)
- logger = self.make_logger(request=request)
- logger.error_in_testrunner(exc_info)
- [email] = log
- self.assertEqual(
- 'Test results: %s: FAILURE' % request.get_merge_description(),
- email['Subject'])
- [body, attachment] = email.get_payload()
- self.assertIsInstance(body, MIMEText)
- self.assertEqual('inline', body['Content-Disposition'])
- self.assertIn(
- body['Content-Type'],
- ['text/plain; charset="utf-8"', 'text/plain; charset="utf8"'])
- self.assertEqual(
- logger.get_summary_contents(), body.get_payload(decode=True))
- self.assertIsInstance(attachment, MIMEApplication)
- self.assertEqual('application/x-gzip', attachment['Content-Type'])
- self.assertEqual(
- 'attachment;',
- attachment['Content-Disposition'][:len('attachment;')])
- self.assertEqual(
- logger.get_full_log_contents(),
- gunzip_data(attachment.get_payload().decode('base64')))
-
-
-class TestEC2Runner(TestCaseWithTransport, RequestHelpers):
-
- def make_ec2runner(self, emails=None, email_log=None):
- if email_log is None:
- email_log = []
- smtp_connection = LoggingSMTPConnection(email_log)
- return EC2Runner(
- False, "who-cares.pid", False, smtp_connection, emails=emails)
-
- def test_run(self):
- # EC2Runner.run() runs the given function, passing through whatever
- # arguments and keyword arguments it has been given.
- calls = []
- runner = self.make_ec2runner()
-
- def function(*args, **kwargs):
- calls.append((args, kwargs))
- runner.run("boring test method", function, "foo", "bar", baz="qux")
- self.assertEqual([(("foo", "bar"), {'baz': 'qux'})], calls)
-
- def test_email_on_failure_no_emails(self):
- # If no emails are specified, then no email is sent on failure.
- log = []
- runner = self.make_ec2runner(email_log=log)
- self.assertRaises(
- ZeroDivisionError, runner.run, "failing method", lambda: 1/0)
- self.assertEqual([], log)
-
- def test_email_on_failure_some_emails(self):
- # If emails *are* specified, then an email is sent on failure.
- log = []
- runner = self.make_ec2runner(
- email_log=log, emails=["foo@xxxxxxxxxxx"])
- self.assertRaises(
- ZeroDivisionError, runner.run, "failing method", lambda: 1/0)
- # XXX: Expect this to fail. Fix the test to be more correct.
- [message] = log
- self.assertEqual('failing method FAILED', message['Subject'])
- self.assertEqual('foo@xxxxxxxxxxx', message['To'])
- self.assertIn('ZeroDivisionError', str(message))
-
- def test_email_with_launchpad_tester_failure(self):
- # LaunchpadTester sends email on catastrophic failure.
- email_log = []
- to_emails = ['foo@xxxxxxxxxxx']
- request = self.make_request(emails=to_emails, emails_sent=email_log)
- logger = self.make_logger(request=request)
- tester = self.make_tester(logger=logger)
- # Deliberately break 'tester'. A likely failure in production is not
- # being able to spawn the child process.
- tester._spawn_test_process = lambda: 1/0
- runner = self.make_ec2runner(emails=to_emails, email_log=email_log)
- runner.run("launchpad tester", tester.test)
- # The primary thing we care about is that email *was* sent.
- self.assertNotEqual([], email_log)
- [tester_msg] = email_log
- self.assertEqual('foo@xxxxxxxxxxx', tester_msg['To'])
- self.assertIn(
- 'ZeroDivisionError',
- tester_msg.get_payload()[0].get_payload(decode=True))
-
-
-class TestDaemonizationInteraction(TestCaseWithTransport, RequestHelpers):
-
- script_file = 'remote_daemonization_test.py'
-
- def run_script(self, script_file, pidfile, directory, logfile):
- path = os.path.join(os.path.dirname(__file__), script_file)
- popen = subprocess.Popen(
- [sys.executable, path, pidfile, directory, logfile],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- stdout, stderr = popen.communicate()
- self.assertEqual((0, '', ''), (popen.returncode, stdout, stderr))
- # Make sure the daemon is finished doing its thing.
- while os.path.exists(pidfile):
- time.sleep(0.01)
- # If anything was written to 'logfile' while the script was running,
- # we want it to appear in our test errors. This way, stack traces in
- # the script are visible to us as test runners.
- if os.path.exists(logfile):
- content_type = ContentType("text", "plain", {"charset": "utf8"})
- content = Content(content_type, open(logfile).read)
- self.addDetail('logfile', content)
-
- def test_daemonization(self):
- # Daemonizing something can do funny things to its behavior. This test
- # runs a script that's very similar to remote.py but only does
- # "logger.prepare".
- pidfile = "%s.pid" % self.id()
- directory = 'www'
- logfile = "%s.log" % self.id()
- self.run_script(self.script_file, pidfile, directory, logfile)
- # Check that the output from the daemonized version matches the output
- # from a normal version. Only checking one of the files, since this is
- # more of a smoke test than a correctness test.
- logger = self.make_logger()
- logger.prepare()
- expected_summary = logger.get_summary_contents()
- observed_summary = open(os.path.join(directory, 'summary.log')).read()
- # The first line contains a timestamp, so we ignore it.
- self.assertEqual(
- expected_summary.splitlines()[1:],
- observed_summary.splitlines()[1:])
-
-
-class TestResultHandling(TestCaseWithTransport, RequestHelpers):
- """Tests for how we handle the result at the end of the test suite."""
-
- def get_body_text(self, email):
- return email.get_payload()[0].get_payload(decode=True)
-
- def make_empty_result(self):
- return TestResult()
-
- def make_successful_result(self):
- result = self.make_empty_result()
- result.startTest(self)
- result.stopTest(self)
- return result
-
- def make_failing_result(self):
- result = self.make_empty_result()
- result.startTest(self)
- try:
- 1/0
- except ZeroDivisionError:
- result.addError(self, sys.exc_info())
- result.stopTest(self)
- return result
-
- def test_success_no_emails(self):
- log = []
- request = self.make_request(emails=[], emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_successful_result())
- self.assertEqual([], log)
-
- def test_failure_no_emails(self):
- log = []
- request = self.make_request(emails=[], emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_failing_result())
- self.assertEqual([], log)
-
- def test_submits_to_pqm_on_success(self):
- log = []
- message = {'Subject': 'foo'}
- request = self.make_request(
- emails=[], pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_successful_result())
- self.assertEqual([message], log)
-
- def test_records_pqm_submission_in_email(self):
- log = []
- message = {'Subject': 'foo'}
- request = self.make_request(
- emails=['foo@xxxxxxxxxxx'], pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_successful_result())
- [pqm_message, user_message] = log
- self.assertEqual(message, pqm_message)
- self.assertIn(
- 'SUBMITTED TO PQM:\n%s' % (message['Subject'],),
- self.get_body_text(user_message))
-
- def test_doesnt_submit_to_pqm_on_failure(self):
- log = []
- message = {'Subject': 'foo'}
- request = self.make_request(
- emails=[], pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_failing_result())
- self.assertEqual([], log)
-
- def test_records_non_pqm_submission_in_email(self):
- log = []
- message = {'Subject': 'foo'}
- request = self.make_request(
- emails=['foo@xxxxxxxxxxx'], pqm_message=message, emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_failing_result())
- [user_message] = log
- self.assertIn(
- '**NOT** submitted to PQM:\n%s' % (message['Subject'],),
- self.get_body_text(user_message))
-
- def test_email_refers_to_attached_log(self):
- log = []
- request = self.make_request(
- emails=['foo@xxxxxxxxxxx'], emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_result(self.make_failing_result())
- [user_message] = log
- self.assertIn(
- '(See the attached file for the complete log)\n',
- self.get_body_text(user_message))
-
- def test_email_body_is_format_result(self):
- # The body of the email sent to the user is the summary file.
- log = []
- request = self.make_request(
- emails=['foo@xxxxxxxxxxx'], emails_sent=log)
- logger = self.make_logger(request=request)
- result = self.make_failing_result()
- logger.got_result(result)
- [user_message] = log
- self.assertEqual(
- request.format_result(
- result, logger._start_time, logger._end_time),
- self.get_body_text(user_message))
-
- def test_gzip_of_full_log_attached(self):
- # The full log is attached to the email.
- log = []
- request = self.make_request(
- emails=['foo@xxxxxxxxxxx'], emails_sent=log)
- logger = self.make_logger(request=request)
- logger.got_line("output from test process\n")
- logger.got_line("more output\n")
- logger.got_result(self.make_successful_result())
- [user_message] = log
- [body, attachment] = user_message.get_payload()
- self.assertEqual('application/x-gzip', attachment['Content-Type'])
- self.assertEqual(
- 'attachment;',
- attachment['Content-Disposition'][:len('attachment;')])
- attachment_contents = attachment.get_payload().decode('base64')
- uncompressed = gunzip_data(attachment_contents)
- self.assertEqual(
- "output from test process\nmore output\n", uncompressed)
-
-
-def test_suite():
- return unittest.TestLoader().loadTestsFromName(__name__)
=== removed file 'lib/devscripts/ec2test/tests/test_session.py'
--- lib/devscripts/ec2test/tests/test_session.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/tests/test_session.py 1970-01-01 00:00:00 +0000
@@ -1,71 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Test the session module."""
-
-__metaclass__ = type
-
-from datetime import (
- datetime,
- timedelta,
- )
-import re
-import unittest
-
-from devscripts.ec2test import session
-
-
-class TestEC2SessionName(unittest.TestCase):
- """Tests for EC2SessionName."""
-
- def test_make(self):
- # EC2SessionName.make() is the most convenient way to create
- # valid names.
- name = session.EC2SessionName.make("fred")
- check = re.compile(
- r'^fred/\d{4}-\d{2}-\d{2}-\d{4}/[0-9a-zA-Z]{8}$').match
- self.failIf(check(name) is None, "Did not match %r" % name)
- possible_expires = [
- None, '1986-04-26-0123', timedelta(hours=10),
- datetime(1986, 04, 26, 1, 23)
- ]
- for expires in possible_expires:
- name = session.EC2SessionName.make("fred", expires)
- self.failIf(check(name) is None, "Did not match %r" % name)
-
- def test_properties(self):
- # A valid EC2SessionName has properies to access the three
- # components of its name.
- base = "fred"
- timestamp = datetime(1986, 4, 26, 1, 23)
- timestamp_string = '1986-04-26-0123'
- rand = 'abcdef123456'
- name = session.EC2SessionName(
- "%s/%s/%s" % (base, timestamp_string, rand))
- self.failUnlessEqual(base, name.base)
- self.failUnlessEqual(timestamp, name.expires)
- self.failUnlessEqual(rand, name.rand)
-
- def test_invalid_base(self):
- # If the given base contains a forward-slash, an
- # AssertionError should be raised.
- self.failUnlessRaises(
- AssertionError, session.EC2SessionName.make, "forward/slash")
-
- def test_invalid_timestamp(self):
- # If the given expiry timestamp contains a forward-slash, an
- # AssertionError should be raised.
- self.failUnlessRaises(
- AssertionError, session.EC2SessionName.make, "fred", "/")
- # If the given expiry timestamp does not contain a timestamp
- # in the correct form, an AssertionError should be raised.
- self.failUnlessRaises(
- AssertionError, session.EC2SessionName.make, "fred", "1986.04.26")
-
- def test_form_not_correct(self):
- # If the form of the string is not base/timestamp/rand then
- # the corresponding properties should all return None.
- broken_name = session.EC2SessionName('bob')
- self.failUnless(broken_name.base is None)
- self.failUnless(broken_name.expires is None)
- self.failUnless(broken_name.rand is None)
=== removed file 'lib/devscripts/ec2test/tests/test_utils.py'
--- lib/devscripts/ec2test/tests/test_utils.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/ec2test/tests/test_utils.py 1970-01-01 00:00:00 +0000
@@ -1,60 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Test the utils module."""
-
-__metaclass__ = type
-
-from datetime import datetime
-import unittest
-
-from devscripts.ec2test import utils
-
-
-class TestDateTimeUtils(unittest.TestCase):
- """Tests for date/time related utilities."""
-
- example_date = datetime(1986, 4, 26, 1, 23)
- example_date_string = '1986-04-26-0123'
- example_date_text = (
- 'blah blah foo blah 23545 646 ' +
- example_date_string + ' 435 blah')
-
- def test_make_datetime_string(self):
- self.failUnlessEqual(
- self.example_date_string,
- utils.make_datetime_string(self.example_date))
-
- def test_find_datetime_string(self):
- self.failUnlessEqual(
- self.example_date,
- utils.find_datetime_string(self.example_date_string))
- self.failUnlessEqual(
- self.example_date,
- utils.find_datetime_string(self.example_date_text))
-
-
-class TestRandomUtils(unittest.TestCase):
- """Tests for randomness related utilities."""
-
- hex_chars = frozenset('0123456789abcdefABCDEF')
-
- def test_make_random_string(self):
- rand_a = utils.make_random_string()
- rand_b = utils.make_random_string()
- self.failIfEqual(rand_a, rand_b)
- self.failUnlessEqual(32, len(rand_a))
- self.failUnlessEqual(32, len(rand_b))
- self.failUnless(self.hex_chars.issuperset(rand_a))
- self.failUnless(self.hex_chars.issuperset(rand_b))
-
- def test_make_random_string_with_length(self):
- for length in (8, 16, 64):
- rand = utils.make_random_string(length)
- self.failUnlessEqual(length, len(rand))
- self.failUnless(self.hex_chars.issuperset(rand))
-
- def test_make_random_string_with_bad_length(self):
- # length must be a multiple of 2.
- self.failUnlessRaises(
- AssertionError, utils.make_random_string, 15)
=== removed file 'lib/devscripts/ec2test/utils.py'
--- lib/devscripts/ec2test/utils.py 2009-10-05 19:20:37 +0000
+++ lib/devscripts/ec2test/utils.py 1970-01-01 00:00:00 +0000
@@ -1,55 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""General useful stuff."""
-
-__metaclass__ = type
-__all__ = [
- 'find_datetime_string',
- 'make_datetime_string',
- 'make_random_string',
- ]
-
-
-import binascii
-import datetime
-import os
-import re
-
-
-def make_datetime_string(when=None):
- """Generate a simple formatted date and time string.
-
- This is intended to be embedded in text to be later found by
- `find_datetime_string`.
- """
- if when is None:
- when = datetime.datetime.utcnow()
- return when.strftime('%Y-%m-%d-%H%M')
-
-
-re_find_datetime = re.compile(
- r'(\d{4})-(\d{2})-(\d{2})-(\d{2})(\d{2})')
-
-def find_datetime_string(text):
- """Search for a simple date and time in arbitrary text.
-
- The format searched for is %Y-%m-%d-%H%M - the same as produced by
- `make_datetime_string`.
- """
- match = re_find_datetime.search(text)
- if match is None:
- return None
- else:
- return datetime.datetime(
- *(int(part) for part in match.groups()))
-
-
-def make_random_string(length=32):
- """Return a simple random UUID.
-
- The uuid module is only available in Python 2.5 and above, but a
- simple non-RFC-compliant hack here is sufficient.
- """
- assert length % 2 == 0, "length must be a multiple of 2"
- return binascii.hexlify(os.urandom(length/2))
=== removed file 'lib/devscripts/tests/test_autoland.py'
--- lib/devscripts/tests/test_autoland.py 2012-01-01 03:03:28 +0000
+++ lib/devscripts/tests/test_autoland.py 1970-01-01 00:00:00 +0000
@@ -1,515 +0,0 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
-# GNU Affero General Public License version 3 (see the file LICENSE).
-
-"""Tests for automatic landing thing."""
-
-__metaclass__ = type
-
-import re
-import unittest
-
-from launchpadlib.launchpad import (
- EDGE_SERVICE_ROOT,
- STAGING_SERVICE_ROOT,
- )
-
-from devscripts.autoland import (
- get_bazaar_host,
- get_bugs_clause,
- get_qa_clause,
- get_reviewer_clause,
- get_reviewer_handle,
- get_testfix_clause,
- MergeProposal,
- MissingBugsError,
- MissingBugsIncrementalError,
- MissingReviewError,
- )
-from lp.testing.fakemethod import FakeMethod
-
-
-class FakeBugTask:
-
- def __init__(self, target_name, status):
- self.bug_target_name = target_name
- self.status = status
-
-
-class FakeBug:
- """Fake launchpadlib Bug object.
-
- Only used for the purposes of testing.
- """
-
- def __init__(self, id, bug_tasks=None):
- self.id = id
- if bug_tasks is None:
- bug_tasks = [FakeBugTask('launchpad', 'Triaged')]
- self.bug_tasks = bug_tasks
-
-
-class FakePerson:
- """Fake launchpadlib Person object.
-
- Only used for the purposes of testing.
- """
-
- def __init__(self, name, irc_handles):
- self.name = name
- self.irc_nicknames = list(irc_handles)
-
-
-class FakeIRC:
- """Fake IRC handle.
-
- Only used for the purposes of testing.
- """
-
- def __init__(self, nickname, network):
- self.nickname = nickname
- self.network = network
-
-
-class FakeLPMergeProposal:
- """Fake launchpadlib MergeProposal object.
-
- Only used for the purposes of testing.
- """
-
- def __init__(self, root=None):
- self._root = root
- self.commit_message = None
-
- def lp_save(self):
- pass
-
-
-class TestPQMRegexAcceptance(unittest.TestCase):
- """Tests if the generated commit message is accepted by PQM regexes."""
-
- def setUp(self):
- # PQM regexes; might need update once in a while
- self.devel_open_re = ("(?is)^\s*(:?\[testfix\])?\[(?:"
- "release-critical=[^\]]+|rs?=[^\]]+)\]")
- self.dbdevel_normal_re = ("(?is)^\s*(:?\[testfix\])?\[(?:"
- "release-critical|rs?=[^\]]+)\]")
-
- self.mp = MergeProposal(FakeLPMergeProposal())
- self.fake_bug = FakeBug(20)
- self.fake_person = FakePerson('foo', [])
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- def assertRegexpMatches(self, text, expected_regexp, msg=None):
- """Fail the test unless the text matches the regular expression.
-
- Method default in Python 2.7. Can be removed as soon as LP goes 2.7.
- """
- if isinstance(expected_regexp, basestring):
- expected_regexp = re.compile(expected_regexp)
- if not expected_regexp.search(text):
- msg = msg or "Regexp didn't match"
- msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern,
- text)
- raise self.failureException(msg)
-
- def _test_commit_message_match(self, incr, no_qa, testfix):
- commit_message = self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr)
- self.assertRegexpMatches(commit_message, self.devel_open_re)
- self.assertRegexpMatches(commit_message, self.dbdevel_normal_re)
-
- def test_testfix_match(self):
- self._test_commit_message_match(incr=False, no_qa=False, testfix=True)
-
- def test_regular_match(self):
- self._test_commit_message_match(incr=False, no_qa=False, testfix=False)
-
- def test_noqa_match(self):
- self._test_commit_message_match(incr=False, no_qa=True, testfix=False)
-
- def test_incr_match(self):
- self._test_commit_message_match(incr=True, no_qa=False, testfix=False)
-
-
-class TestBugsClause(unittest.TestCase):
- """Tests for `get_bugs_clause`."""
-
- def test_no_bugs(self):
- # If there are no bugs, then there is no bugs clause.
- bugs_clause = get_bugs_clause([])
- self.assertEqual('', bugs_clause)
-
- def test_one_bug(self):
- # If there's a bug, then the bugs clause is [bug=$ID].
- bug = FakeBug(45)
- bugs_clause = get_bugs_clause([bug])
- self.assertEqual('[bug=45]', bugs_clause)
-
- def test_two_bugs(self):
- # If there are two bugs, then the bugs clause is [bug=$ID,$ID].
- bug1 = FakeBug(20)
- bug2 = FakeBug(45)
- bugs_clause = get_bugs_clause([bug1, bug2])
- self.assertEqual('[bug=20,45]', bugs_clause)
-
- def test_fixed_bugs_are_excluded(self):
- # If a bug is fixed then it is excluded from the bugs clause.
- bug1 = FakeBug(20)
- bug2 = FakeBug(45, bug_tasks=[
- FakeBugTask('fake-project', 'Fix Released')])
- bug3 = FakeBug(67, bug_tasks=[
- FakeBugTask('fake-project', 'Fix Committed')])
- bugs_clause = get_bugs_clause([bug1, bug2, bug3])
- self.assertEqual('[bug=20]', bugs_clause)
-
- def test_bugs_open_on_launchpad_are_included(self):
- # If a bug has been fixed on one target but not in launchpad, then it
- # is included in the bugs clause, because it's relevant to launchpad
- # QA.
- bug = FakeBug(20, bug_tasks=[
- FakeBugTask('fake-project', 'Fix Released'),
- FakeBugTask('launchpad', 'Triaged')])
- bugs_clause = get_bugs_clause([bug])
- self.assertEqual('[bug=20]', bugs_clause)
-
- def test_bugs_fixed_on_launchpad_but_open_in_others_are_excluded(self):
- # If a bug has been fixed in Launchpad but not fixed on a different
- # target, then it is excluded from the bugs clause, since we don't
- # want to QA it.
- bug = FakeBug(20, bug_tasks=[
- FakeBugTask('fake-project', 'Triaged'),
- FakeBugTask('launchpad', 'Fix Released')])
- bugs_clause = get_bugs_clause([bug])
- self.assertEqual('', bugs_clause)
-
-
-class TestGetCommitMessage(unittest.TestCase):
-
- def setUp(self):
- self.mp = MergeProposal(FakeLPMergeProposal())
- self.fake_bug = FakeBug(20)
- self.fake_person = self.makePerson('foo')
-
- def makePerson(self, name):
- return FakePerson(name, [])
-
- def test_commit_with_bugs(self):
- incr = False
- no_qa = False
- testfix = False
-
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual("[r=foo][bug=20] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr))
-
- def test_commit_no_bugs_no_noqa(self):
- incr = False
- no_qa = False
- testfix = False
-
- self.mp.get_bugs = FakeMethod([])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertRaises(MissingBugsError, self.mp.build_commit_message,
- testfix, no_qa, incr)
-
- def test_commit_no_bugs_with_noqa(self):
- incr = False
- no_qa = True
- testfix = False
-
- self.mp.get_bugs = FakeMethod([])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual("[r=foo][no-qa] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr))
-
- def test_commit_bugs_with_noqa(self):
- incr = False
- no_qa = True
- testfix = False
-
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual(
- "[r=foo][bug=20][no-qa] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr))
-
- def test_commit_bugs_with_incr(self):
- incr = True
- no_qa = False
- testfix = False
-
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual(
- "[r=foo][bug=20][incr] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr))
-
- def test_commit_no_bugs_with_incr(self):
- incr = True
- no_qa = False
- testfix = False
-
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual(
- "[r=foo][bug=20][incr] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr))
-
- def test_commit_with_noqa_and_incr(self):
- incr = True
- no_qa = True
- testfix = False
-
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual(
- "[r=foo][bug=20][no-qa][incr] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- testfix, no_qa, incr))
-
- def test_commit_with_rollback(self):
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual(
- "[r=foo][bug=20][rollback=123] Foobaring the sbrubble.",
- self.mp.build_commit_message("Foobaring the sbrubble.",
- rollback=123))
-
- def test_takes_into_account_existing_tags_on_commit_text(self):
- self.mp.get_bugs = FakeMethod([self.fake_bug])
- self.mp.get_reviews = FakeMethod({None : [self.fake_person]})
-
- self.assertEqual(
- "[r=foo][bug=20][rollback=123] Foobaring the sbrubble.",
- self.mp.build_commit_message(
- "[r=foo][bug=20][rollback=123] Foobaring the sbrubble.",
- rollback=123))
-
-
-class TestSetCommitMessage(unittest.TestCase):
-
- def setUp(self):
- self.mp = MergeProposal(FakeLPMergeProposal())
-
- def test_set_commit_message(self):
- commit_message = "Foobaring the sbrubble."
- self.mp.set_commit_message(commit_message)
- self.assertEqual(self.mp._mp.commit_message, commit_message)
-
-
-class TestGetTestfixClause(unittest.TestCase):
- """Tests for `get_testfix_clause`"""
-
- def test_no_testfix(self):
- testfix = False
- self.assertEqual('', get_testfix_clause(testfix))
-
- def test_is_testfix(self):
- testfix = True
- self.assertEqual('[testfix]', get_testfix_clause(testfix))
-
-
-class TestGetQaClause(unittest.TestCase):
- """Tests for `get_qa_clause`"""
-
- def test_no_bugs_no_option_given(self):
- bugs = None
- no_qa = False
- incr = False
- self.assertRaises(MissingBugsError, get_qa_clause, bugs, no_qa,
- incr)
-
- def test_bugs_noqa_option_given(self):
- bug1 = FakeBug(20)
- no_qa = True
- incr = False
- self.assertEqual('[no-qa]',
- get_qa_clause([bug1], no_qa, incr))
-
- def test_no_bugs_noqa_option_given(self):
- bugs = None
- no_qa = True
- incr = False
- self.assertEqual('[no-qa]',
- get_qa_clause(bugs, no_qa, incr))
-
- def test_bugs_no_option_given(self):
- bug1 = FakeBug(20)
- no_qa = False
- incr = False
- self.assertEqual('',
- get_qa_clause([bug1], no_qa, incr))
-
- def test_bugs_incr_option_given(self):
- bug1 = FakeBug(20)
- no_qa = False
- incr = True
- self.assertEqual('[incr]',
- get_qa_clause([bug1], no_qa, incr))
-
- def test_no_bugs_incr_option_given(self):
- bugs = None
- no_qa = False
- incr = True
- self.assertRaises(MissingBugsIncrementalError,
- get_qa_clause, bugs, no_qa, incr)
-
- def test_bugs_incr_and_noqa_option_given(self):
- bug1 = FakeBug(20)
- no_qa = True
- incr = True
- self.assertEqual('[no-qa][incr]',
- get_qa_clause([bug1], no_qa, incr))
-
- def test_rollback_given(self):
- bugs = None
- self.assertEqual('[rollback=123]',
- get_qa_clause(bugs, rollback=123))
-
- def test_rollback_and_noqa_and_incr_given(self):
- bugs = None
- self.assertEqual('[rollback=123]', get_qa_clause(bugs, rollback=123))
-
-
-class TestGetReviewerHandle(unittest.TestCase):
- """Tests for `get_reviewer_handle`."""
-
- def makePerson(self, name, irc_handles):
- return FakePerson(name, irc_handles)
-
- def test_no_irc_nicknames(self):
- # If the person has no IRC nicknames, their reviewer handle is their
- # Launchpad user name.
- person = self.makePerson(name='foo', irc_handles=[])
- self.assertEqual('foo', get_reviewer_handle(person))
-
- def test_freenode_irc_nick_preferred(self):
- # If the person has a Freenode IRC nickname, then that is preferred as
- # their user handle.
- person = self.makePerson(
- name='foo', irc_handles=[FakeIRC('bar', 'irc.freenode.net')])
- self.assertEqual('bar', get_reviewer_handle(person))
-
- def test_non_freenode_nicks_ignored(self):
- # If the person has IRC nicks that aren't freenode, we ignore them.
- person = self.makePerson(
- name='foo', irc_handles=[FakeIRC('bar', 'irc.efnet.net')])
- self.assertEqual('foo', get_reviewer_handle(person))
-
-
-class TestGetReviewerClause(unittest.TestCase):
- """Tests for `get_reviewer_clause`."""
-
- def makePerson(self, name):
- return FakePerson(name, [])
-
- def get_reviewer_clause(self, reviewers):
- return get_reviewer_clause(reviewers)
-
- def test_one_reviewer_no_type(self):
- # It's very common for a merge proposal to be reviewed by one person
- # with no specified type of review. It such cases the review clause is
- # '[r=<person>]'.
- clause = self.get_reviewer_clause({None: [self.makePerson('foo')]})
- self.assertEqual('[r=foo]', clause)
-
- def test_two_reviewers_no_type(self):
- # Branches can have more than one reviewer.
- clause = self.get_reviewer_clause(
- {None: [self.makePerson('foo'), self.makePerson('bar')]})
- self.assertEqual('[r=bar,foo]', clause)
-
- def test_mentat_reviewers(self):
- # A mentat review sometimes is marked like 'ui*'. Due to the
- # unordered nature of dictionaries, the reviewers are sorted before
- # being put into the clause for predictability.
- clause = self.get_reviewer_clause(
- {None: [self.makePerson('foo')],
- 'code*': [self.makePerson('newguy')],
- 'ui': [self.makePerson('beuno')],
- 'ui*': [self.makePerson('bac')]})
- self.assertEqual('[r=foo,newguy][ui=bac,beuno]', clause)
-
- def test_code_reviewer_counts(self):
- # Some people explicitly specify the 'code' type when they do code
- # reviews, these are treated in the same way as reviewers without any
- # given type.
- clause = self.get_reviewer_clause({'code': [self.makePerson('foo')]})
- self.assertEqual('[r=foo]', clause)
-
- def test_release_critical(self):
- # Reviews that are marked as release-critical are included in a
- # separate clause.
- clause = self.get_reviewer_clause(
- {'code': [self.makePerson('foo')],
- 'release-critical': [self.makePerson('bar')]})
- self.assertEqual('[release-critical=bar][r=foo]', clause)
-
- def test_db_reviewer_counts(self):
- # There's no special way of annotating database reviews in Launchpad
- # commit messages, so they are included with the code reviews.
- clause = self.get_reviewer_clause({'db': [self.makePerson('foo')]})
- self.assertEqual('[r=foo]', clause)
-
- def test_ui_reviewers(self):
- # If someone has done a UI review, then that appears in the clause
- # separately from the code reviews.
- clause = self.get_reviewer_clause(
- {'code': [self.makePerson('foo')],
- 'ui': [self.makePerson('bar')],
- })
- self.assertEqual('[r=foo][ui=bar]', clause)
-
- def test_no_reviewers(self):
- # If the merge proposal hasn't been approved by anyone, we cannot
- # generate a valid clause.
- self.assertRaises(MissingReviewError, self.get_reviewer_clause, {})
-
-
-class TestGetBazaarHost(unittest.TestCase):
- """Tests for `get_bazaar_host`."""
-
- def test_dev_service(self):
- # The Bazaar host for the dev service is bazaar.launchpad.dev.
- self.assertEqual(
- 'bazaar.launchpad.dev',
- get_bazaar_host('https://api.launchpad.dev/beta/'))
-
- def test_edge_service(self):
- # The Bazaar host for the edge service is bazaar.launchpad.net, since
- # there's no edge codehosting service.
- self.assertEqual(
- 'bazaar.launchpad.net', get_bazaar_host(EDGE_SERVICE_ROOT))
-
- def test_production_service(self):
- # The Bazaar host for the production service is bazaar.launchpad.net.
- self.assertEqual(
- 'bazaar.launchpad.net',
- get_bazaar_host('https://api.launchpad.net/beta/'))
-
- def test_staging_service(self):
- # The Bazaar host for the staging service is
- # bazaar.staging.launchpad.net.
- self.assertEqual(
- 'bazaar.staging.launchpad.net',
- get_bazaar_host(STAGING_SERVICE_ROOT))
-
- def test_unrecognized_service(self):
- # Any unrecognized URL will raise a ValueError.
- self.assertRaises(
- ValueError, get_bazaar_host, 'https://api.lunchpad.net')
=== modified file 'scripts/upgrade_all_branches.py'
--- scripts/upgrade_all_branches.py 2012-03-06 21:30:30 +0000
+++ scripts/upgrade_all_branches.py 2012-03-06 21:30:30 +0000
@@ -3,8 +3,9 @@
__metaclass__ = type
import _pythonpath
+#quiet pyflakes
+_pythonpath
-import sys
from lp.codehosting.upgrade import Upgrader
from lp.codehosting.bzrutils import server
from lp.codehosting.vfs.branchfs import get_rw_server
=== removed symlink 'utilities/ec2'
=== target was u'../bin/ec2'