← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] ~pappacena/turnip:celery-repo-creation into turnip:master

 

Thiago F. Pappacena has proposed merging ~pappacena/turnip:celery-repo-creation into turnip:master with ~pappacena/turnip:async-repo-creation as a prerequisite.

Commit message:
Option on POST /repo API call to asynchronously create a repository using celery

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~pappacena/turnip/+git/turnip/+merge/387252
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of ~pappacena/turnip:celery-repo-creation into turnip:master.
diff --git a/Makefile b/Makefile
index ad76c8f..f64d409 100644
--- a/Makefile
+++ b/Makefile
@@ -8,6 +8,7 @@ PIP_CACHE = $(CURDIR)/pip-cache
 PYTHON := $(ENV)/bin/python
 PSERVE := $(ENV)/bin/pserve
 FLAKE8 := $(ENV)/bin/flake8
+CELERY := $(ENV)/bin/celery
 PIP := $(ENV)/bin/pip
 VIRTUALENV := virtualenv
 
@@ -61,7 +62,12 @@ endif
 	$(PIP) install $(PIP_ARGS) -c requirements.txt \
 		-e '.[test,deploy]'
 
-test: $(ENV)
+test-bootstrap:
+	-sudo rabbitmqctl delete_vhost test-vhost
+	-sudo rabbitmqctl add_vhost test-vhost
+	-sudo rabbitmqctl set_permissions -p "test-vhost" "guest" ".*" ".*" ".*"
+
+test: $(ENV) test-bootstrap
 	$(PYTHON) -m unittest discover $(ARGS) turnip
 
 clean:
@@ -98,6 +104,26 @@ run-api: $(ENV)
 run-pack: $(ENV)
 	$(PYTHON) turnipserver.py
 
+run-worker: $(ENV)
+	PYTHONPATH="turnip" $(CELERY) -A tasks worker \
+		--loglevel=info \
+		--concurrency=20 \
+		--pool=gevent
+
+run:
+	make run-api &\
+	make run-pack &\
+	make run-worker&\
+	wait;
+
+stop:
+	-pkill -f 'make run-api'
+	-pkill -f 'make run-pack'
+	-pkill -f 'make run-worker'
+	-pkill -f '$(CELERY) -A tasks worker'
+
+
+
 $(PIP_CACHE): $(ENV)
 	mkdir -p $(PIP_CACHE)
 	$(PIP) install $(PIP_ARGS) -d $(PIP_CACHE) \
diff --git a/config.yaml b/config.yaml
index e078c33..99b0e7b 100644
--- a/config.yaml
+++ b/config.yaml
@@ -20,3 +20,4 @@ cgit_secret_path: null
 openid_provider_root: https://testopenid.test/
 site_name: git.launchpad.test
 main_site_root: https://launchpad.test/
+celery_broker: pyamqp://guest@localhost//
diff --git a/requirements.txt b/requirements.txt
index 365f354..29ca5d2 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,9 +4,10 @@ attrs==19.3.0
 Automat==20.2.0
 bcrypt==3.1.7
 beautifulsoup4==4.6.3
+celery==4.4.6
 cffi==1.14.0
 constantly==15.1.0
-contextlib2==0.4.0
+contextlib2==0.6.0
 cornice==3.6.1
 cryptography==2.8
 docutils==0.14
@@ -15,6 +16,7 @@ envdir==0.7
 extras==1.0.0
 fixtures==3.0.0
 flake8==2.4.0
+gevent==20.6.2
 gmpy==1.17
 gunicorn==19.3.0
 hyperlink==19.0.0
@@ -31,6 +33,7 @@ Paste==2.0.2
 PasteDeploy==2.1.0
 pbr==5.4.4
 pep8==1.5.7
+psutil==5.7.0
 pyasn1==0.4.8
 pycparser==2.17
 pycrypto==2.6.1
diff --git a/setup.py b/setup.py
index 6eff51e..0c1f5f0 100755
--- a/setup.py
+++ b/setup.py
@@ -18,9 +18,11 @@ with open(os.path.join(here, 'NEWS')) as f:
     README += "\n\n" + f.read()
 
 requires = [
+    'celery',
     'contextlib2',
     'cornice',
     'enum34; python_version < "3.4"',
+    'gevent',
     'lazr.sshserver>=0.1.7',
     'Paste',
     'pygit2>=0.27.4,<0.28.0',
diff --git a/system-dependencies.txt b/system-dependencies.txt
index 845d453..fdcb1d8 100644
--- a/system-dependencies.txt
+++ b/system-dependencies.txt
@@ -6,3 +6,4 @@ libgit2-27
 libssl-dev
 python-dev
 virtualenv
+rabbitmq-server
diff --git a/turnip/api/store.py b/turnip/api/store.py
index 53b75b5..5e8d801 100644
--- a/turnip/api/store.py
+++ b/turnip/api/store.py
@@ -27,6 +27,7 @@ from pygit2 import (
     Repository,
     )
 
+from turnip.tasks import app
 from turnip.enums import BaseEnum
 from turnip.pack.helpers import ensure_config
 
@@ -214,6 +215,7 @@ class AlreadyExistsError(GitError):
         self.path = path
 
 
+@app.task
 def init_repo(repo_path, clone_from=None, clone_refs=False,
               alternate_repo_paths=None, is_bare=True):
     """Initialise a new git repository or clone from existing."""
diff --git a/turnip/api/views.py b/turnip/api/views.py
index 4aa59cb..e83c390 100644
--- a/turnip/api/views.py
+++ b/turnip/api/views.py
@@ -3,7 +3,6 @@
 
 import os
 import re
-from multiprocessing import Pool
 from subprocess import CalledProcessError
 
 from cornice.resource import resource
@@ -13,16 +12,7 @@ import pyramid.httpexceptions as exc
 
 from turnip.config import config
 from turnip.api import store
-
-# Process pool for async execution (like repository creation).
-processing_pool = None
-
-
-def run_in_background(f, *args, **kwargs):
-    global processing_pool
-    if processing_pool is None:
-        processing_pool = Pool(maxtasksperchild=10)
-    return processing_pool.apply_async(f, args=args, kwds=kwargs)
+from turnip.tests.tasks import startCeleryWorker
 
 
 def is_valid_path(repo_store, repo_path):
@@ -64,6 +54,7 @@ class RepoAPI(BaseAPI):
 
     def collection_post(self):
         """Initialise a new git repository, or clone from an existing repo."""
+        startCeleryWorker(loglevel="error")
         json_data = extract_json_data(self.request)
         repo_path = json_data.get('repo_path')
         clone_path = json_data.get('clone_from')
@@ -88,7 +79,7 @@ class RepoAPI(BaseAPI):
             kwargs = dict(
                 repo_path=repo, clone_from=repo_clone, clone_refs=clone_refs)
             if async_run:
-                run_in_background(store.init_repo, **kwargs)
+                store.init_repo.apply_async(kwargs=kwargs)
             else:
                 store.init_repo(**kwargs)
             repo_name = os.path.basename(os.path.normpath(repo))
diff --git a/turnip/tasks.py b/turnip/tasks.py
new file mode 100644
index 0000000..51b5d20
--- /dev/null
+++ b/turnip/tasks.py
@@ -0,0 +1,16 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+from __future__ import print_function, unicode_literals, absolute_import
+
+__all__ = [
+    'app'
+]
+
+from celery import Celery
+
+from turnip.config import config
+
+
+app = Celery('tasks', broker=config.get('celery_broker'))
+app.conf.update(imports=('turnip.api.store', ))
diff --git a/turnip/tests/__init__.py b/turnip/tests/__init__.py
index 494f3fd..6be77e3 100644
--- a/turnip/tests/__init__.py
+++ b/turnip/tests/__init__.py
@@ -8,6 +8,8 @@ from __future__ import (
     )
 
 from turnip.tests.logging import setupLogger
+from turnip.tests.tasks import setupCelery
 
 
 setupLogger()
+setupCelery()
diff --git a/turnip/tests/tasks.py b/turnip/tests/tasks.py
new file mode 100644
index 0000000..0543ce3
--- /dev/null
+++ b/turnip/tests/tasks.py
@@ -0,0 +1,43 @@
+# Copyright 2020 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+import os
+import subprocess
+import sys
+
+import atexit
+
+from turnip.tasks import app
+
+BROKER_URL = 'pyamqp://guest@localhost/test-vhost'
+worker_proc = None
+
+
+def setupCelery():
+    app.conf.update(broker_url=BROKER_URL)
+
+
+def startCeleryWorker(loglevel="info"):
+    """Start a celery worker for test.
+
+    :param quiet: If True, do not output celery worker on stdout.
+    """
+    global worker_proc
+    if worker_proc is not None:
+        return
+    bin_path = os.path.dirname(sys.executable)
+    celery = os.path.join(bin_path, 'celery')
+    turnip_path = os.path.join(os.path.dirname(__file__), '..')
+    cmd = [
+        celery, 'worker', '-A', 'tasks', '--quiet',
+        '--pool=gevent',
+        '--concurrency=2',
+        '--broker=%s' % BROKER_URL,
+        '--loglevel=%s' % loglevel]
+    worker_proc = subprocess.Popen(cmd, env={'PYTHONPATH': turnip_path})
+    atexit.register(stopCeleryWorker)
+
+
+def stopCeleryWorker():
+    if worker_proc is not None:
+        worker_proc.kill()

Follow ups