← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~abentley/lazr.jobrunner/add-missing-jobs into lp:lazr.jobrunner

 

Aaron Bentley has proposed merging lp:~abentley/lazr.jobrunner/add-missing-jobs into lp:lazr.jobrunner.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~abentley/lazr.jobrunner/add-missing-jobs/+merge/105680

This branch adds a list_queued function.

It works by draining the queue, not acknowledging the messages, and closing the connection.  This restores the messages, allowing them to be consumed by another consumer.

It is implemented via a drain_queues method, which is useful for test cases that need to clean up after themselves.

It bumps the version number to 0.6, so that the Launchpad-side code can request
it.
-- 
https://code.launchpad.net/~abentley/lazr.jobrunner/add-missing-jobs/+merge/105680
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~abentley/lazr.jobrunner/add-missing-jobs into lp:lazr.jobrunner.
=== modified file 'NEWS.txt'
--- NEWS.txt	2012-05-11 14:35:54 +0000
+++ NEWS.txt	2012-05-14 15:49:20 +0000
@@ -1,6 +1,11 @@
 News
 ====
 
+0.6
+___
+
+* Support list_queued for celery tasks.
+
 0.5
 ___
 

=== modified file 'setup.py'
--- setup.py	2012-05-11 14:35:54 +0000
+++ setup.py	2012-05-14 15:49:20 +0000
@@ -22,7 +22,7 @@
 NEWS = open(os.path.join(here, 'NEWS.txt')).read()
 
 
-version = '0.5'
+version = '0.6'
 
 install_requires = [
     # List your project dependencies here.
@@ -40,10 +40,10 @@
       # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
     ],
     keywords='',
-    author='',
-    author_email='',
-    url='',
-    license='',
+    author='Launchpad Developers',
+    author_email='launchpad-dev@xxxxxxxxxxxxxxxxxxx',
+    url='https://launchpad.net/lazr.jobrunner',
+    license='GPL v3',
     packages=find_packages('src'),
     package_dir = {'': 'src'},
     namespace_packages = ['lazr'],

=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py	2012-04-11 14:55:53 +0000
+++ src/lazr/jobrunner/celerytask.py	2012-05-14 15:49:20 +0000
@@ -17,8 +17,12 @@
 __metaclass__ = type
 
 
+from functools import partial
+from socket import timeout
+
 from celery.task import Task
-from functools import partial
+from kombu import Consumer, Exchange, Queue
+
 from lazr.jobrunner.jobrunner import (
     JobRunner,
     LeaseHeld,
@@ -59,3 +63,52 @@
 
     def reQueue(self, job_id, fallback_queue):
         self.apply_async(args=(job_id, ), queue=fallback_queue)
+
+
+def list_queued(app, queue_names):
+    """List the queued messages as body/message tuples for a given app.
+
+    :param app: The app to list queues for (affects backend, Queue type,
+        etc.).
+    :param queue_names: Names of the queues to list.
+    """
+    listings = []
+
+    def add_listing(body, message):
+        listings.append((body['task'], body['args']))
+
+    drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
+    return listings
+
+
+def drain_queues(app, queue_names, callbacks=None, retain=False):
+    """Drain the messages from queues.
+
+    :param app: The app to list queues for (affects backend, Queue type,
+        etc.).
+    :param queue_names: Names of the queues to list.
+    :param callbacks: Optional list of callbacks to call on each message.
+        Callback must accept (body, message) as parameters.
+    :param retain: After this operation, retain the messages in the queue.
+    """
+    if callbacks is None:
+        callbacks = [lambda x, y: None]
+    bindings = []
+    router = app.amqp.Router()
+    for queue_name in queue_names:
+        destination = router.expand_destination(queue_name)
+        exchange = Exchange(destination['exchange'])
+        queue = Queue(queue_name, exchange=exchange)
+        bindings.append(queue)
+    with app.broker_connection() as connection:
+        # The meaning of no_ack appears to be inverted.
+        # See: https://github.com/ask/kombu/issues/126
+        consumer = Consumer(
+            connection, bindings, callbacks=callbacks, no_ack=not retain)
+        with consumer:
+            try:
+                # Timeout of 0 causes error: [Errno 11] Resource temporarily
+                # unavailable
+                connection.drain_events(timeout=.1 ** 100)
+            except timeout:
+                pass

=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py	2012-05-10 14:18:11 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py	2012-05-14 15:49:20 +0000
@@ -37,7 +37,11 @@
 
 from celery.exceptions import SoftTimeLimitExceeded
 
-from lazr.jobrunner.celerytask import RunJob
+from lazr.jobrunner.celerytask import (
+    drain_queues,
+    list_queued,
+    RunJob,
+    )
 from lazr.jobrunner.jobrunner import (
     JobStatus,
     )
@@ -441,3 +445,34 @@
                 job_output)
 
         self.assertEqual(JobStatus.FAILED, job.status)
+
+
+class TestListQueues(TestCase):
+    """Tests for list_queues.
+
+    These tests deliberately do not use a celeryd, because we want to ensure
+    that the messages are retained so that they can be listed.
+    """
+
+    queue = 'steve'
+
+    def queue_job(self):
+        RunFileJob.apply_async(args=(10, ), queue=self.queue)
+        self.addCleanup(drain_queues, RunFileJob.app, [self.queue])
+
+    def test_list_queued(self):
+        """When a job is queued, it is listed."""
+        self.queue_job()
+        tasks = list_queued(RunFileJob.app, [self.queue])
+        self.assertEqual(('run_file_job', (10,)), tasks[0])
+
+    def test_list_queued_twice(self):
+        """Listing a job does not remove it from the queue."""
+        self.queue_job()
+        list_queued(RunFileJob.app, [self.queue])
+        tasks = list_queued(RunFileJob.app, [self.queue])
+        self.assertEqual(('run_file_job', (10,)), tasks[0])
+
+    def test_no_tasks(self):
+        """When no jobs are listed, the queue is shown as empty."""
+        self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))

=== modified file 'src/lazr/jobrunner/version.txt'
--- src/lazr/jobrunner/version.txt	2012-05-11 14:35:54 +0000
+++ src/lazr/jobrunner/version.txt	2012-05-14 15:49:20 +0000
@@ -1,1 +1,1 @@
-0.5
+0.6