← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/launchpad/celery-everywhere-9 into lp:launchpad

 

Aaron Bentley has proposed merging lp:~abentley/launchpad/celery-everywhere-9 into lp:launchpad with lp:~abentley/launchpad/celery-everywhere-8 as a prerequisite.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-9/+merge/103723

= Summary =
Support running QuestionJob and PackageCopyJob via Celery.

== Pre-implementation notes ==
None

== LOC Rationale ==
Part of a resourced arc that will remove code.

== Implementation details ==
Clean up test set up for both sets of tests.  Add makeDerived methods to support UniversalJobSource, and QuestionJob and PackageCopyJob support to UniversalJobSource.

Use EnumeratedSubclass as the metatype for PackageCopyDerived, to support makeDerived method.

Add config members to jobs to support UniversalJobSource, which uses this to determine database user.

Extract simplified test setup.

Add TestViaCelery test cases for both classes.

== Tests ==
bin/test -vm 'test_questionjob|test_packagecopyjob'

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/registry/tests/test_membership_notification_job.py
  lib/lp/soyuz/model/packagecopyjob.py
  lib/lp/soyuz/tests/test_packagecopyjob.py
  lib/lp/answers/model/questionjob.py
  lib/lp/registry/model/persontransferjob.py
  lib/lp/registry/tests/test_person_merge_job.py
  lib/lp/services/job/model/job.py
  lib/lp/answers/tests/test_questionjob.py
-- 
https://code.launchpad.net/~abentley/launchpad/celery-everywhere-9/+merge/103723
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/launchpad/celery-everywhere-9 into lp:launchpad.
=== modified file 'lib/lp/answers/model/questionjob.py'
--- lib/lp/answers/model/questionjob.py	2011-12-30 06:14:56 +0000
+++ lib/lp/answers/model/questionjob.py	2012-04-26 15:57:16 +0000
@@ -1,4 +1,4 @@
-# Copyright 2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2011-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Job classes related to QuestionJob."""
@@ -94,6 +94,11 @@
         """See `IQuestionJob`."""
         return simplejson.loads(self._json_data)
 
+    def makeDerived(self):
+        if self.job_type != QuestionJobType.EMAIL:
+            raise ValueError('Unsupported Job type')
+        return QuestionEmailJob(self)
+
 
 class QuestionEmailJob(BaseRunnableJob):
     """Intermediate class for deriving from QuestionJob."""
@@ -101,6 +106,7 @@
     delegates(IQuestionJob)
     implements(IQuestionEmailJob)
     classProvides(IQuestionEmailJobSource)
+    config = config.IQuestionEmailJobSource
 
     def __init__(self, job):
         self.context = job
@@ -119,7 +125,9 @@
             }
         job = QuestionJob(
             question=question, job_type=cls.class_job_type, metadata=metadata)
-        return cls(job)
+        derived = cls(job)
+        derived.celeryRunOnCommit()
+        return derived
 
     @classmethod
     def iterReady(cls):

=== modified file 'lib/lp/answers/tests/test_questionjob.py'
--- lib/lp/answers/tests/test_questionjob.py	2012-01-01 02:58:52 +0000
+++ lib/lp/answers/tests/test_questionjob.py	2012-04-26 15:57:16 +0000
@@ -26,7 +26,12 @@
     QuestionJob,
     )
 from lp.services.database.lpstorm import IStore
+from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.tests import (
+    block_on_job,
+    pop_remote_notifications,
+    )
 from lp.services.log.logger import BufferLogger
 from lp.services.mail import stub
 from lp.services.mail.sendmail import (
@@ -40,7 +45,10 @@
     run_script,
     TestCaseWithFactory,
     )
-from lp.testing.layers import DatabaseFunctionalLayer
+from lp.testing.layers import (
+    CeleryJobLayer,
+    DatabaseFunctionalLayer,
+    )
 
 
 class QuestionJobTestCase(TestCaseWithFactory):
@@ -70,31 +78,45 @@
             repr(question_job))
 
 
+def make_user_subject_body_headers(factory):
+    user = factory.makePerson()
+    subject = 'email subject'
+    body = 'email body'
+    headers = {'X-Launchpad-Question': 'question metadata'}
+    return user, subject, body, headers
+
+
+def make_question_job(factory, recipient_set=QuestionRecipientSet.SUBSCRIBER,
+                      body=None, question=None, user=None):
+    if question is None:
+        question = factory.makeQuestion()
+    default_user, subject, default_body, headers = (
+        make_user_subject_body_headers(factory))
+    if body is None:
+        body = default_body
+    if user is None:
+        user = default_user
+    contact = factory.makePerson()
+    with person_logged_in(contact):
+        lang_set = getUtility(ILanguageSet)
+        contact.addLanguage(lang_set['en'])
+        question.target.addAnswerContact(contact, contact)
+    return QuestionEmailJob.create(
+        question, user, recipient_set,
+        subject, body, headers)
+
+
 class QuestionEmailJobTestCase(TestCaseWithFactory):
     """Test case for QuestionEmailJob class."""
 
     layer = DatabaseFunctionalLayer
 
-    def makeUserSubjectBodyHeaders(self):
-        user = self.factory.makePerson()
-        subject = 'email subject'
-        body = 'email body'
-        headers = {'X-Launchpad-Question': 'question metadata'}
-        return user, subject, body, headers
-
-    def addAnswerContact(self, question):
-        contact = self.factory.makePerson()
-        with person_logged_in(contact):
-            lang_set = getUtility(ILanguageSet)
-            contact.addLanguage(lang_set['en'])
-            question.target.addAnswerContact(contact, contact)
-        return contact
-
     def test_create(self):
         # The create class method converts the extra job arguments
         # to metadata.
         question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
+        user, subject, body, headers = make_user_subject_body_headers(
+            self.factory)
         job = QuestionEmailJob.create(
             question, user, QuestionRecipientSet.SUBSCRIBER,
             subject, body, headers)
@@ -124,16 +146,10 @@
             title='title', description='description', owner=asker,
             language=getUtility(ILanguageSet)['en'],
             product=product, distribution=None, sourcepackagename=None)
-        user, subject, ignore, headers = self.makeUserSubjectBodyHeaders()
-        job_1 = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, 'one', headers)
-        job_2 = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, 'two', headers)
-        job_3 = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, 'three', headers)
+        job_1 = make_question_job(self.factory, question=question, body='one')
+        job_2 = make_question_job(self.factory, question=question, body='two')
+        job_3 = make_question_job(
+            self.factory, question=question, body='three')
         job_1.start()
         job_1.complete()
         job_ids = [job.id for job in QuestionEmailJob.iterReady()]
@@ -141,35 +157,27 @@
 
     def test_user(self):
         # The user property matches the user passed to create().
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        user = self.factory.makePerson()
+        job = make_question_job(self.factory, user=user)
         self.assertEqual(user, job.user)
 
     def test_subject(self):
         # The subject property matches the subject passed to create().
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        body = 'email body'
+        job = make_question_job(self.factory, body=body)
         self.assertEqual(body, job.body)
 
     def test_body(self):
         # The body property matches the body passed to create().
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        body = 'email body'
+        job = make_question_job(self.factory, body=body)
         self.assertEqual(body, job.body)
 
     def test_headers(self):
         # The headers property matches the headers passed to create().
         question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
+        user, subject, body, headers = make_user_subject_body_headers(
+            self.factory)
         job = QuestionEmailJob.create(
             question, user, QuestionRecipientSet.SUBSCRIBER,
             subject, body, headers)
@@ -178,7 +186,8 @@
     def test_from_address(self):
         # The from_address is the question with the user displayname.
         question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
+        user, subject, body, headers = make_user_subject_body_headers(
+            self.factory)
         job = QuestionEmailJob.create(
             question, user, QuestionRecipientSet.SUBSCRIBER,
             subject, body, headers)
@@ -189,17 +198,14 @@
 
     def test_log_name(self):
         # The log_name property matches the class name.
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(self.factory)
         self.assertEqual(job.__class__.__name__, job.log_name)
 
     def test_getOopsVars(self):
         # The getOopsVars() method adds the question and user to the vars.
         question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
+        user, subject, body, headers = make_user_subject_body_headers(
+            self.factory)
         job = QuestionEmailJob.create(
             question, user, QuestionRecipientSet.SUBSCRIBER,
             subject, body, headers)
@@ -209,104 +215,68 @@
 
     def test_getErrorRecipients(self):
         # The getErrorRecipients method matches the user.
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(self.factory)
         self.assertEqual(
             [format_address_for_person(job.user)], job.getErrorRecipients())
 
     def test_recipients_asker(self):
         # The recipients property contains the question owner.
-        question = self.factory.makeQuestion()
-        self.addAnswerContact(question)
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.ASKER,
-            subject, body, headers)
+        job = make_question_job(self.factory, QuestionRecipientSet.ASKER)
         recipients = [
             person for email, person in job.recipients.getRecipientPersons()]
         self.assertEqual(1, len(recipients))
-        self.assertEqual(question.owner, recipients[0])
+        self.assertEqual(job.question.owner, recipients[0])
 
     def test_recipients_subscriber(self):
         # The recipients property matches the question recipients,
         # excluding the question owner.
-        question = self.factory.makeQuestion()
-        self.addAnswerContact(question)
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(self.factory)
         recipients = [
             person for email, person in job.recipients.getRecipientPersons()]
-        self.assertFalse(question.owner in recipients)
+        self.assertFalse(job.question.owner in recipients)
         question_recipients = [
-            person
-            for em, person in question.getRecipients().getRecipientPersons()
-            if person != question.owner]
+            person for em, person in
+            job.question.getRecipients().getRecipientPersons()
+            if person != job.question.owner]
         self.assertContentEqual(
             question_recipients, recipients)
 
     def test_recipients_asker_subscriber(self):
         # The recipients property matches the question recipients.
-        question = self.factory.makeQuestion()
-        self.addAnswerContact(question)
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.ASKER_SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(
+            self.factory, QuestionRecipientSet.ASKER_SUBSCRIBER)
         self.assertContentEqual(
-            question.getRecipients().getRecipientPersons(),
+            job.question.getRecipients().getRecipientPersons(),
             job.recipients.getRecipientPersons())
 
     def test_recipients_contact(self):
         # The recipients property matches the question target answer contacts.
-        question = self.factory.makeQuestion()
-        self.addAnswerContact(question)
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.CONTACT,
-            subject, body, headers)
+        job = make_question_job(
+            self.factory, QuestionRecipientSet.CONTACT)
         recipients = [
             person for email, person in job.recipients.getRecipientPersons()]
         self.assertContentEqual(
-            question.target.getAnswerContactRecipients(None),
+            job.question.target.getAnswerContactRecipients(None),
             recipients)
 
     def test_buildBody_with_separator(self):
         # A body with a separator is preserved.
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        body = 'body\n-- '
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(self.factory, body='body\n-- ')
         formatted_body = job.buildBody('rationale')
         self.assertEqual(
             'body\n-- \nrationale', formatted_body)
 
     def test_buildBody_without_separator(self):
         # A separator will added to body if one is not present.
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        body = 'body -- mdash'
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(self.factory, body='body -- mdash')
         formatted_body = job.buildBody('rationale')
         self.assertEqual(
             'body -- mdash\n-- \nrationale', formatted_body)
 
     def test_buildBody_wrapping(self):
         # The rationale is wrapped and added to the body.
-        question = self.factory.makeQuestion()
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
         body = 'body\n-- '
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(self.factory, body=body)
         rationale_1 = (
             'You received this email because you are indirectly subscribed '
             'to this')
@@ -319,42 +289,36 @@
 
     def test_run(self):
         # The email is sent to all the recipients.
-        question = self.factory.makeQuestion()
-        self.addAnswerContact(question)
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.ASKER_SUBSCRIBER,
-            subject, body, headers)
+        job = make_question_job(
+            self.factory, QuestionRecipientSet.ASKER_SUBSCRIBER)
         logger = BufferLogger()
         with log.use(logger):
             job.run()
         self.assertEqual(
             ["DEBUG QuestionEmailJob will send email for question %s." %
-             question.id,
+             job.question.id,
              "DEBUG QuestionEmailJob has sent email for question %s." %
-             question.id],
+             job.question.id],
             logger.getLogBuffer().splitlines())
         transaction.commit()
         self.assertEqual(2, len(stub.test_emails))
 
     def test_run_cronscript(self):
         # The cronscript is configured: schema-lazr.conf and security.cfg.
-        question = self.factory.makeQuestion()
+        job = make_question_job(
+            self.factory, QuestionRecipientSet.ASKER_SUBSCRIBER)
+        question = job.question
         with person_logged_in(question.target.owner):
             question.linkBug(self.factory.makeBug(product=question.target))
             question.linkFAQ(
                 question.target.owner,
                 self.factory.makeFAQ(target=question.target),
                 'test FAQ link')
-        self.addAnswerContact(question)
-        user, subject, body, headers = self.makeUserSubjectBodyHeaders()
+        user = job.user
         with person_logged_in(user):
             lang_set = getUtility(ILanguageSet)
             user.addLanguage(lang_set['en'])
             question.target.addAnswerContact(user, user)
-        job = QuestionEmailJob.create(
-            question, user, QuestionRecipientSet.ASKER_SUBSCRIBER,
-            subject, body, headers)
         transaction.commit()
 
         out, err, exit_code = run_script(
@@ -372,3 +336,24 @@
             'Cound not find "%s" in err log:\n%s.' % (message, err))
         IStore(job.job).invalidate()
         self.assertEqual(JobStatus.COMPLETED, job.job.status)
+
+
+class TestViaCelery(TestCaseWithFactory):
+
+    layer = CeleryJobLayer
+
+    def test_run(self):
+        # The email is sent to all the recipients.
+        # Create the question before turning on the feature flag to avoid
+        # running two jobs via Celery.
+        question = self.factory.makeQuestion()
+        self.useFixture(FeatureFixture({
+            'jobs.celery.enabled_classes': 'QuestionEmailJob',
+        }))
+        make_question_job(
+            self.factory, QuestionRecipientSet.ASKER_SUBSCRIBER,
+            question=question)
+        with block_on_job(self):
+            transaction.commit()
+        transaction.commit()
+        self.assertEqual(2, len(pop_remote_notifications()))

=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py	2012-04-26 15:57:11 +0000
+++ lib/lp/services/job/model/job.py	2012-04-26 15:57:16 +0000
@@ -277,7 +277,9 @@
             BranchMergeProposalJob,
             )
         from lp.registry.model.persontransferjob import PersonTransferJob
+        from lp.answers.model.questionjob import QuestionJob
         from lp.soyuz.model.distributionjob import DistributionJob
+        from lp.soyuz.model.packagecopyjob import PackageCopyJob
         from lp.translations.model.pofilestatsjob import POFileStatsJob
         from lp.translations.model.translationsharingjob import (
             TranslationSharingJob,
@@ -287,7 +289,8 @@
 
         for baseclass in [
             ApportJob, BranchJob, BranchMergeProposalJob, DistributionJob,
-            PersonTransferJob, POFileStatsJob, TranslationSharingJob,
+            PackageCopyJob, PersonTransferJob, POFileStatsJob, QuestionJob,
+            TranslationSharingJob,
             ]:
             derived, base_class, store = cls._getDerived(job_id, baseclass)
             if derived is not None:

=== modified file 'lib/lp/soyuz/model/packagecopyjob.py'
--- lib/lp/soyuz/model/packagecopyjob.py	2012-03-16 11:09:03 +0000
+++ lib/lp/soyuz/model/packagecopyjob.py	2012-04-26 15:57:16 +0000
@@ -40,6 +40,7 @@
 from lp.registry.interfaces.pocket import PackagePublishingPocket
 from lp.registry.interfaces.sourcepackagename import ISourcePackageNameSet
 from lp.registry.model.distroseries import DistroSeries
+from lp.services.config import config
 from lp.services.database import bulk
 from lp.services.database.decoratedresultset import DecoratedResultSet
 from lp.services.database.enumcol import EnumCol
@@ -51,7 +52,10 @@
 from lp.services.job.interfaces.job import (
     JobStatus,
     )
-from lp.services.job.model.job import Job
+from lp.services.job.model.job import (
+    EnumeratedSubclass,
+    Job,
+    )
 from lp.services.job.runner import BaseRunnableJob
 from lp.soyuz.adapters.overrides import (
     FromExistingOverridePolicy,
@@ -170,10 +174,15 @@
         """See `IPackageCopyJob`."""
         return self.metadata.get("section_override")
 
+    def makeDerived(self):
+        return PackageCopyJobDerived.makeSubclass(self)
+
 
 class PackageCopyJobDerived(BaseRunnableJob):
     """Abstract class for deriving from PackageCopyJob."""
 
+    __metaclass__ = EnumeratedSubclass
+
     delegates(IPackageCopyJob)
 
     def __init__(self, job):
@@ -235,6 +244,7 @@
 
     class_job_type = PackageCopyJobType.PLAIN
     classProvides(IPlainPackageCopyJobSource)
+    config = config.IPlainPackageCopyJobSource
 
     @classmethod
     def _makeMetadata(cls, target_pocket, package_version,
@@ -272,7 +282,9 @@
             metadata=metadata,
             requester=requester)
         IMasterStore(PackageCopyJob).add(job)
-        return cls(job)
+        derived = cls(job)
+        derived.celeryRunOnCommit()
+        return derived
 
     @classmethod
     def _composeJobInsertionTuple(cls, target_distroseries, copy_policy,

=== modified file 'lib/lp/soyuz/tests/test_packagecopyjob.py'
--- lib/lp/soyuz/tests/test_packagecopyjob.py	2012-03-16 11:09:03 +0000
+++ lib/lp/soyuz/tests/test_packagecopyjob.py	2012-04-26 15:57:16 +0000
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Canonical Ltd.  This software is licensed under the
+# Copyright 2010-2012 Canonical Ltd.  This software is licensed under the
 # GNU Affero General Public License version 3 (see the file LICENSE).
 
 """Tests for sync package jobs."""
@@ -27,6 +27,10 @@
 from lp.services.job.interfaces.job import (
     JobStatus,
     )
+from lp.services.job.tests import (
+    block_on_job,
+    pop_remote_notifications,
+    )
 from lp.services.webapp.testing import verifyObject
 from lp.soyuz.adapters.overrides import SourceOverride
 from lp.soyuz.enums import (
@@ -56,6 +60,7 @@
 from lp.soyuz.model.queue import PackageUpload
 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
 from lp.testing import (
+    celebrity_logged_in,
     person_logged_in,
     run_script,
     TestCaseWithFactory,
@@ -63,6 +68,7 @@
 from lp.testing.dbuser import switch_dbuser
 from lp.testing.fakemethod import FakeMethod
 from lp.testing.layers import (
+    CeleryJobLayer,
     LaunchpadFunctionalLayer,
     LaunchpadZopelessLayer,
     ZopelessDatabaseLayer,
@@ -78,6 +84,48 @@
         DistroSeriesDifferenceComment.distro_series_difference == dsd)
 
 
+def create_proper_job(factory):
+    """Create a job that will complete successfully."""
+    publisher = SoyuzTestPublisher()
+    with celebrity_logged_in('admin'):
+        publisher.prepareBreezyAutotest()
+    distroseries = publisher.breezy_autotest
+
+    # Synchronise from breezy-autotest to a brand new distro derived
+    # from breezy.
+    breezy_archive = factory.makeArchive(
+        distroseries.distribution, purpose=ArchivePurpose.PRIMARY)
+    dsp = factory.makeDistroSeriesParent(parent_series=distroseries)
+    target_series = dsp.derived_series
+    target_archive = factory.makeArchive(
+        target_series.distribution, purpose=ArchivePurpose.PRIMARY)
+    getUtility(ISourcePackageFormatSelectionSet).add(
+        target_series, SourcePackageFormat.FORMAT_1_0)
+
+    publisher.getPubSource(
+        distroseries=distroseries, sourcename="libc",
+        version="2.8-1", status=PackagePublishingStatus.PUBLISHED,
+        archive=breezy_archive)
+    # The target archive needs ancestry so the package is
+    # auto-accepted.
+    publisher.getPubSource(
+        distroseries=target_series, sourcename="libc",
+        version="2.8-0", status=PackagePublishingStatus.PUBLISHED,
+        archive=target_archive)
+
+    source = getUtility(IPlainPackageCopyJobSource)
+    requester = factory.makePerson()
+    with person_logged_in(target_archive.owner):
+        target_archive.newComponentUploader(requester, "main")
+    return source.create(
+        package_name="libc",
+        source_archive=breezy_archive, target_archive=target_archive,
+        target_distroseries=target_series,
+        target_pocket=PackagePublishingPocket.RELEASE,
+        package_version="2.8-1", include_binaries=False,
+        requester=requester)
+
+
 class LocalTestHelper:
     """Put test helpers that want to be in the test classes here."""
 
@@ -355,62 +403,26 @@
         # Turn on DSD jobs.
         self.useFixture(FeatureFixture({FEATURE_FLAG_ENABLE_MODULE: 'on'}))
 
-        publisher = SoyuzTestPublisher()
-        publisher.prepareBreezyAutotest()
-        distroseries = publisher.breezy_autotest
-
-        # Synchronise from breezy-autotest to a brand new distro derived
-        # from breezy.
-        breezy_archive = self.factory.makeArchive(
-            distroseries.distribution, purpose=ArchivePurpose.PRIMARY)
-        dsp = self.factory.makeDistroSeriesParent(parent_series=distroseries)
-        target_series = dsp.derived_series
-        target_archive = self.factory.makeArchive(
-            target_series.distribution, purpose=ArchivePurpose.PRIMARY)
-        getUtility(ISourcePackageFormatSelectionSet).add(
-            target_series, SourcePackageFormat.FORMAT_1_0)
-
-        publisher.getPubSource(
-            distroseries=distroseries, sourcename="libc",
-            version="2.8-1", status=PackagePublishingStatus.PUBLISHED,
-            archive=breezy_archive)
-        # The target archive needs ancestry so the package is
-        # auto-accepted.
-        publisher.getPubSource(
-            distroseries=target_series, sourcename="libc",
-            version="2.8-0", status=PackagePublishingStatus.PUBLISHED,
-            archive=target_archive)
-
-        source = getUtility(IPlainPackageCopyJobSource)
-        requester = self.factory.makePerson()
-        with person_logged_in(target_archive.owner):
-            target_archive.newComponentUploader(requester, "main")
-        job = source.create(
-            package_name="libc",
-            source_archive=breezy_archive, target_archive=target_archive,
-            target_distroseries=target_series,
-            target_pocket=PackagePublishingPocket.RELEASE,
-            package_version="2.8-1", include_binaries=False,
-            requester=requester)
+        job = create_proper_job(self.factory)
         self.assertEqual("libc", job.package_name)
         self.assertEqual("2.8-1", job.package_version)
 
         switch_dbuser(self.dbuser)
-        job.run()
-
-        published_sources = target_archive.getPublishedSources(
-            name="libc", version="2.8-1")
-        self.assertIsNot(None, published_sources.any())
-
-        # The copy should have sent an email too. (see
-        # soyuz/scripts/tests/test_copypackage.py for detailed
-        # notification tests)
-        emails = pop_notifications()
-        self.assertTrue(len(emails) > 0)
-
         # Switch back to a db user that has permission to clean up
         # featureflag.
-        switch_dbuser('launchpad_main')
+        self.addCleanup(switch_dbuser, 'launchpad_main')
+        pop_notifications()
+        job.run()
+
+        published_sources = job.target_archive.getPublishedSources(
+            name="libc", version="2.8-1")
+        self.assertIsNot(None, published_sources.any())
+
+        # The copy should have sent an email too. (see
+        # soyuz/scripts/tests/test_copypackage.py for detailed
+        # notification tests)
+        emails = pop_notifications()
+        self.assertEqual(len(emails), 1)
 
     def test_getOopsVars(self):
         distroseries = self.factory.makeDistroSeries()
@@ -1234,6 +1246,37 @@
         self.assertEqual(PackageUploadStatus.REJECTED, pu.status)
 
 
+class TestViaCelery(TestCaseWithFactory):
+    """PackageCopyJob runs under Celery."""
+
+    layer = CeleryJobLayer
+
+    def test_run(self):
+        # A proper test run synchronizes packages.
+        # Turn on DSD jobs.
+        self.useFixture(FeatureFixture({
+            FEATURE_FLAG_ENABLE_MODULE: 'on',
+            'jobs.celery.enabled_classes': 'PlainPackageCopyJob',
+        }))
+
+        job = create_proper_job(self.factory)
+        self.assertEqual("libc", job.package_name)
+        self.assertEqual("2.8-1", job.package_version)
+
+        with block_on_job(self):
+            transaction.commit()
+
+        published_sources = job.target_archive.getPublishedSources(
+            name="libc", version="2.8-1")
+        self.assertIsNot(None, published_sources.any())
+
+        # The copy should have sent an email too. (see
+        # soyuz/scripts/tests/test_copypackage.py for detailed
+        # notification tests)
+        emails = pop_remote_notifications()
+        self.assertEqual(1, len(emails))
+
+
 class TestPlainPackageCopyJobPermissions(TestCaseWithFactory):
 
     layer = LaunchpadFunctionalLayer


Follow ups