← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~julian-edwards/maas/beat-scheduler-startup into lp:maas

 

Julian Edwards has proposed merging lp:~julian-edwards/maas/beat-scheduler-startup into lp:maas.

Requested reviews:
  MAAS Maintainers (maas-maintainers)

For more details, see:
https://code.launchpad.net/~julian-edwards/maas/beat-scheduler-startup/+merge/120687

This adds code to create Celery tasks that are run once on startup (if you run celeryd -B or celerybeat).  It works by creating a new scheduler class for the beat framework which customises the first run (which is normally at the *end* of the first period) to run immediately and prevents further runs.

The Celery beat stuff is pretty weird.  I found by trial and error that defining this new Entry/Scheduler that you can only make tasks run once if they are defined in the install_default_entries() method of the scheduler!  Putting them in CELERYBEAT_SCHEDULE means they get assigned to the stock Entry class and are run regularly.

This actually to our advantage because it means we can define other beat tasks in the future if we want.  We can just put all the start-up code in tasks.startup() and everything else works as normal.

We will have to be REALLY careful when defining any of these tasks though because they can run on *any* celeryd.  I added n XXX to remind us to update the code when doing the scaling work that adds more workers.
-- 
https://code.launchpad.net/~julian-edwards/maas/beat-scheduler-startup/+merge/120687
Your team MAAS Maintainers is requested to review the proposed merge of lp:~julian-edwards/maas/beat-scheduler-startup into lp:maas.
=== modified file 'etc/celeryconfig.py'
--- etc/celeryconfig.py	2012-08-01 13:51:26 +0000
+++ etc/celeryconfig.py	2012-08-22 02:08:18 +0000
@@ -52,8 +52,12 @@
     "provisioningserver.tasks",
 )
 
+# Ensure that tasks don't get removed from the queue until the job
+# completes successfully.
 CELERY_ACKS_LATE = True
 
 # Do not store the tasks' return values (aka tombstones);
 # This improves performance.
 CELERY_IGNORE_RESULT = True
+
+CELERYBEAT_SCHEDULER = "provisioningserver.beatscheduler.StartupScheduler"

=== added file 'src/provisioningserver/beatscheduler.py'
--- src/provisioningserver/beatscheduler.py	1970-01-01 00:00:00 +0000
+++ src/provisioningserver/beatscheduler.py	2012-08-22 02:08:18 +0000
@@ -0,0 +1,93 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Celery beat schedulers for MAAS."""
+
+from __future__ import (
+    absolute_import,
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = []
+
+from datetime import (
+    datetime,
+    timedelta,
+    )
+
+from celery.beat import (
+    ScheduleEntry,
+    Scheduler,
+    )
+from celery.log import get_default_logger
+
+
+logger = get_default_logger()
+
+
+class ImmediateFirstEntry(ScheduleEntry):
+    """A celery.beat.ScheduleEntry that runs only once at startup.
+
+    The schedule should be defined in the Scheduler that uses this entry
+    in its install_default_entries() method, and that schedule will be the
+    only one to use this Entry.  This means that further schedules can be
+    defined in CELERYBEAT_SCHEDULE and run normally.
+    """
+
+    seconds = timedelta.max.total_seconds()
+
+    def __init__(self, name=None, task=None, last_run_at=None,
+                 total_run_count=None, schedule=None, args=(), kwargs={},
+                 options={}, relative=False):
+        super(ImmediateFirstEntry, self).__init__(
+            name, task, last_run_at, total_run_count, schedule, args, kwargs,
+            options, relative)
+
+    def _default_now(self):
+        return None
+
+    def is_due(self):
+        if self.last_run_at is not None:
+            return False, self.seconds
+        else:
+            self.last_run_at = datetime.now()
+            return True, 0
+
+    def __repr__(self):
+        return ("<ImmediateFirstEntry: %(name)s %(task)s(*%(args)s, "
+               "**%(kwargs)s) {%(schedule)s}>" % vars(self))
+
+    def __eq__(self, other):
+        return other is self
+
+
+class StartupScheduler(Scheduler):
+    """A celery.beat.Scheduler that also runs a startup task once.
+
+    provisioningserver.tasks.startup will be run once at startup and never
+    again.
+    """
+    Entry = ImmediateFirstEntry
+
+    def install_default_entries(self, data):
+        """See `Scheduler.install_default_entries`."""
+        task = "provisioningserver.tasks.startup"
+        entries = {}
+        if task not in data:
+            entries[task] = {
+                "task": task,
+                "schedule": 0,
+                # XXX: bug=1039366
+                # Add options to ensure routing to ONLY this celeryd
+                # (running with -B).
+            }
+        self.update_from_dict(entries)
+
+    def remaining_estimate(self, last_run_at):
+        """See `Scheduler.remaining_estimate`."""
+        if last_run_at:
+            return timedelta.max
+        else:
+            return timedelta(0)

=== modified file 'src/provisioningserver/tasks.py'
--- src/provisioningserver/tasks.py	2012-08-20 11:15:44 +0000
+++ src/provisioningserver/tasks.py	2012-08-22 02:08:18 +0000
@@ -305,3 +305,10 @@
 def restart_dhcp_server():
     """Restart the DHCP server."""
     check_call(['sudo', 'service', 'isc-dhcp-server', 'restart'])
+
+
+@task
+def startup():
+    """Place all one-off start-up tasks in here."""
+    # TODO: This is a placeholder.
+    pass

=== added file 'src/provisioningserver/tests/test_beatscheduler.py'
--- src/provisioningserver/tests/test_beatscheduler.py	1970-01-01 00:00:00 +0000
+++ src/provisioningserver/tests/test_beatscheduler.py	2012-08-22 02:08:18 +0000
@@ -0,0 +1,69 @@
+# Copyright 2012 Canonical Ltd.  This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Tests for beatscheduler.py"""
+
+from __future__ import (
+    absolute_import,
+    print_function,
+    unicode_literals,
+    )
+
+__metaclass__ = type
+__all__ = []
+
+from datetime import (
+    datetime,
+    timedelta,
+    )
+
+from provisioningserver.beatscheduler import (
+    ImmediateFirstEntry,
+    StartupScheduler,
+    )
+from provisioningserver.testing.testcase import PservTestCase
+
+
+def get_entry(scheduler=None):
+    if scheduler is None:
+        scheduler = StartupScheduler()
+    entry = scheduler.schedule["provisioningserver.tasks.startup"]
+    return entry
+
+
+class TestStartupScheduler(PservTestCase):
+
+    def test_install_default_entries_called_from_init(self):
+        entry = get_entry()
+        self.assertEqual(ImmediateFirstEntry, type(entry))
+
+    def test_remaining_estimate_returns_max_time_when_previously_run(self):
+        scheduler = StartupScheduler()
+        result = scheduler.remaining_estimate(datetime.now())
+        self.assertEqual(timedelta.max, result)
+
+    def test_remaining_estimate_returns_zero_when_not_previously_run(self):
+        scheduler = StartupScheduler()
+        result = scheduler.remaining_estimate(None)
+        self.assertEqual(timedelta(0), result)
+
+
+class TestImmediateFirstEntry(PservTestCase):
+
+    def test__default_now_returns_None(self):
+        entry = get_entry()
+        self.assertIs(None, entry._default_now())
+
+    def test_is_due_prevents_more_runs_if_already_run(self):
+        entry = get_entry()
+        entry.last_run_at = datetime.now()
+        self.assertEqual(
+            (False, timedelta.max.total_seconds()),
+            entry.is_due())
+
+    def test_is_due_allows_run_if_not_already_run(self):
+        entry = get_entry()
+        entry.last_run_at = None
+        self.assertEqual(
+            (True, 0),
+            entry.is_due())


Follow ups