← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/turnip:celery-test-fixture into turnip:master


Thiago F. Pappacena has proposed merging ~pappacena/turnip:celery-test-fixture into turnip:master.

Commit message:
Celery test fixture

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:

Celery test helpers were originally implemented in https://code.launchpad.net/~pappacena/turnip/+git/turnip/+merge/387691. I'm extracting it to be merged before we merge that feature, so we can unlock the development of other features depending on celery.
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/turnip:celery-test-fixture into turnip:master.
diff --git a/turnip/api/store.py b/turnip/api/store.py
index f4df7e9..9387c4c 100644
--- a/turnip/api/store.py
+++ b/turnip/api/store.py
@@ -114,6 +114,34 @@ def write_alternates(repo_path, alternate_repo_paths):
 object_dir_re = re.compile(r'\A[0-9a-f][0-9a-f]\Z')
+def copy_ref(from_root, from_ref, to_root, to_ref=None):
+    """Copy a single ref from one git repository to another.
+    If `to_ref` is None, the copy will use the `from_ref` name from
+    origin to the destination repository.
+    This is implemented now using git client's "git fetch" command,
+    since it's way easier than trying to copy the refs, commits and objects
+    manually using pygit.
+    :param from_root: The root directory of the source git repository.
+    :param from_ref: The source ref name.
+    :param to_root: The root directory of the destination git repository.
+    :param to_ref: The destination ref name. If None, copy_ref will use the
+                   source ref name.
+    """
+    if to_ref is None:
+        to_ref = from_ref
+    cmd = [
+        b'git', b'fetch', b'--no-tags',
+        from_root, b'%s:%s' % (from_ref, to_ref)
+    ]
+    proc = subprocess.Popen(
+        cmd, cwd=to_root, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    if proc.wait() != 0:
+        raise GitError("Error copying refs: %s" % proc.stderr.read())
 def copy_refs(from_root, to_root):
     """Copy refs from one .git directory to another.
diff --git a/turnip/api/tests/test_store.py b/turnip/api/tests/test_store.py
index 02ed3fd..45be9ad 100644
--- a/turnip/api/tests/test_store.py
+++ b/turnip/api/tests/test_store.py
@@ -95,8 +95,9 @@ class InitTestCase(TestCase):
     def makeOrig(self):
         self.orig_path = os.path.join(self.repo_store, 'orig/')
-        orig = RepoFactory(
-            self.orig_path, num_branches=3, num_commits=2, num_tags=2).build()
+        self.orig_factory = RepoFactory(
+            self.orig_path, num_branches=3, num_commits=2, num_tags=2)
+        orig = self.orig_factory.build()
         self.orig_refs = {}
         for ref in orig.references.objects:
             obj = orig[ref.target]
@@ -356,3 +357,51 @@ class InitTestCase(TestCase):
             self.orig_refs, os.path.join(to_path, 'turnip-subordinate'))
             packed_refs, os.path.join(too_path, 'turnip-subordinate'))
+    def test_copy_ref(self):
+        self.makeOrig()
+        # Creates a new branch in the orig repository.
+        orig_path = self.orig_path
+        orig = self.orig_factory.repo
+        master_tip = orig.references['refs/heads/master'].target.hex
+        orig_branch_name = 'new-branch'
+        orig_ref_name = 'refs/heads/new-branch'
+        orig.create_branch(orig_branch_name, orig[master_tip])
+        orig_commit_oid = self.orig_factory.add_commit(
+            b'foobar file content', 'foobar.txt', parents=[master_tip],
+            ref=orig_ref_name)
+        orig_blob_id = orig[orig_commit_oid].tree[0].id
+        dest_path = os.path.join(self.repo_store, 'to/')
+        store.init_repo(dest_path, clone_from=self.orig_path)
+        dest = pygit2.Repository(dest_path)
+        self.assertEqual([], dest.references.objects)
+        dest_ref_name = 'refs/merge/123'
+        store.copy_ref(orig_path, orig_ref_name, dest_path, dest_ref_name)
+        self.assertEqual(1, len(dest.references.objects))
+        copied_ref = dest.references.objects[0]
+        self.assertEqual(dest_ref_name, copied_ref.name)
+        self.assertEqual(
+            orig.references[orig_ref_name].target,
+            dest.references[dest_ref_name].target)
+        self.assertEqual(b'foobar file content', dest[orig_blob_id].data)
+        # Updating and copying again should work.
+        orig_commit_oid = self.orig_factory.add_commit(
+            b'changed foobar content', 'foobar.txt', parents=[orig_commit_oid],
+            ref=orig_ref_name)
+        orig_blob_id = orig[orig_commit_oid].tree[0].id
+        store.copy_ref(orig_path, orig_ref_name, dest_path, dest_ref_name)
+        self.assertEqual(1, len(dest.references.objects))
+        copied_ref = dest.references.objects[0]
+        self.assertEqual(dest_ref_name, copied_ref.name)
+        self.assertEqual(
+            orig.references[orig_ref_name].target,
+            dest.references[dest_ref_name].target)
+        self.assertEqual(b'changed foobar content', dest[orig_blob_id].data)
diff --git a/turnip/tests/__init__.py b/turnip/tests/__init__.py
index 494f3fd..775ba63 100644
--- a/turnip/tests/__init__.py
+++ b/turnip/tests/__init__.py
@@ -8,6 +8,7 @@ from __future__ import (
 from turnip.tests.logging import setupLogger
+from turnip.tests.tasks import setupCelery
diff --git a/turnip/tests/tasks.py b/turnip/tests/tasks.py
new file mode 100644
index 0000000..658cce5
--- /dev/null
+++ b/turnip/tests/tasks.py
@@ -0,0 +1,98 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+import atexit
+from datetime import datetime, timedelta
+import os
+import subprocess
+import sys
+import time
+from testtools.testcase import fixtures
+from turnip.config import config
+from turnip.tasks import app
+BROKER_URL = 'pyamqp://guest@localhost/turnip-test-vhost'
+def setupCelery():
+    app.conf.update(broker_url=BROKER_URL)
+class CeleryWorkerFixture(fixtures.Fixture):
+    """Celery worker fixture for tests.
+    This fixture starts a celery worker with the configuration set when the
+    fixture is setUp. Keep in mind that this will run in a separated
+    new process, so mock patches for example will be lost.
+    """
+    _worker_proc = None
+    def __init__(self, loglevel="error", force_restart=True, env=None):
+        """
+        Build a celery worker for test cases.
+        :param loglevel: Which log level to use for the worker.
+        :param force_restart: If True and a celery worker is already running,
+            stop it. If False, do not restart if another worker is
+            already running.
+        :param env: The environment variables to be used when creating
+            the worker.
+        """
+        self.force_restart = force_restart
+        self.loglevel = loglevel
+        self.env = env
+    def startCeleryWorker(self):
+        """Start a celery worker for integration tests."""
+        if self.force_restart:
+            self.stopCeleryWorker()
+        if CeleryWorkerFixture._worker_proc is not None:
+            return
+        bin_path = os.path.dirname(sys.executable)
+        celery = os.path.join(bin_path, 'celery')
+        turnip_path = os.path.join(os.path.dirname(__file__), '..')
+        cmd = [
+            celery, 'worker', '-A', 'tasks', '--quiet',
+            '--pool=gevent',
+            '--concurrency=20',
+            '--broker=%s' % BROKER_URL,
+            '--loglevel=%s' % self.loglevel]
+        # Send to the subprocess, as env variables, the same configurations we
+        # are currently using.
+        proc_env = {'PYTHONPATH': turnip_path}
+        for k in config.defaults:
+            proc_env[k.upper()] = str(config.get(k))
+        proc_env.update(self.env or {})
+        CeleryWorkerFixture._worker_proc = subprocess.Popen(cmd, env=proc_env)
+        atexit.register(self.stopCeleryWorker)
+    def stopCeleryWorker(self):
+        worker_proc = CeleryWorkerFixture._worker_proc
+        if worker_proc:
+            worker_proc.kill()
+            worker_proc.wait()
+        CeleryWorkerFixture._worker_proc = None
+        # Cleanup the queue.
+        app.control.purge()
+    def waitUntil(self, seconds, callable, *args, **kwargs):
+        """Waits some seconds until a callable(*args, **kwargs) returns
+        true. Raises exception if that never happens"""
+        start = datetime.now()
+        while datetime.now() < start + timedelta(seconds=seconds):
+            if callable(*args, **kwargs):
+                return
+            time.sleep(0.2)
+        raise AttributeError(
+            "%s(*%s, **%s) never returned True after %s seconds" %
+            (callable.func_name, args, kwargs, seconds))
+    def _setUp(self):
+        self.startCeleryWorker()
+    def _cleanup(self):
+        self.stopCeleryWorker()