launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #11270
[Merge] lp:~jtv/maas/celery-shutdown into lp:maas
Jeroen T. Vermeulen has proposed merging lp:~jtv/maas/celery-shutdown into lp:maas.
Requested reviews:
MAAS Maintainers (maas-maintainers)
For more details, see:
https://code.launchpad.net/~jtv/maas/celery-shutdown/+merge/121041
Actually, getting the celery shutdown unstuck was a one-liner from Gavin in services/celery/run. In broad terms the work was discussed with him, but the details have expanded a bit. The rest of the branch is all about ensuring that the multiprocessing.Manager used for the cache gets started up (1) in celery's parent process, (2) exactly once, (3) before it forks off workers, (4) explicitly, and (5) if at all possible, nowhere else.
Previously, initialization seemed to rely on celery importing the provisioningserver.cache module before forking, which as far as I can see probably happend as a consequence of provisioningserver.tasks importing items from provisioningserver.auth, which in turn imported from provisioningserver.cache, which as a side effect initialized the multiprocessing.Manager object as a side effect of the import. Not too reassuring.
Here's how I changed the situation:
* A function in provisioningserver.cache lets you initialize the cache, but only explicitly.
* There's a guard against double-initialization.
* Initialization-as-side-effect is now isolated in a dedicated module, initialize_cache.
* Celery config is told to import this, from the parent daemon, before forking.
* A fixture replaces the manual cache setup/teardown.
* The fixture fakes out the cache's shared dict with a plain old dict.
* Use imports the cache module, not the object, to keep patching simple.
I verified with some debug output that manager initialization happens exactly once, even though every worker process re-imports the settings file. Modules from CELERY_IMPORTS are documented as imports that happen in the daemon before forking.
Perhaps the “initialized” variable in the initializer function is not strictly needed. But it seemed slightly cleaner than deriving state from the current values in _manager/cache.
Jeroen
--
https://code.launchpad.net/~jtv/maas/celery-shutdown/+merge/121041
Your team MAAS Maintainers is requested to review the proposed merge of lp:~jtv/maas/celery-shutdown into lp:maas.
=== modified file 'etc/celeryconfig.py'
--- etc/celeryconfig.py 2012-08-01 13:51:26 +0000
+++ etc/celeryconfig.py 2012-08-23 16:08:23 +0000
@@ -49,8 +49,13 @@
CELERY_IMPORTS = (
+ # Tasks.
"provisioningserver.tasks",
-)
+
+ # This import is needed for its side effect: it initializes the
+ # cache that allows workers to share data.
+ "provisioningserver.initialize_cache",
+ )
CELERY_ACKS_LATE = True
=== modified file 'services/celeryd/run'
--- services/celeryd/run 2012-08-20 15:59:23 +0000
+++ services/celeryd/run 2012-08-23 16:08:23 +0000
@@ -15,4 +15,7 @@
# Run celeryd.
export PYTHONPATH=$PYTHONPATH:`pwd`/src:`pwd`/etc
export CELERY_CONFIG_MODULE=democeleryconfig
-exec celeryd -l INFO
+
+# XXX JeroenVermeulen 2012-08-23, bug=1040529: Kludge around hanging
+# celery shutdown.
+exec fghack "$(type -p celeryd)" -l INFO
=== modified file 'src/maasserver/testing/testcase.py'
--- src/maasserver/testing/testcase.py 2012-08-16 13:43:09 +0000
+++ src/maasserver/testing/testcase.py 2012-08-23 16:08:23 +0000
@@ -21,15 +21,15 @@
from maasserver.testing.factory import factory
from maastesting.celery import CeleryFixture
import maastesting.djangotestcase
-from provisioningserver.cache import cache as pserv_cache
+from maastesting.worker_cache import WorkerCacheFixture
class TestCase(maastesting.djangotestcase.DjangoTestCase):
def setUp(self):
super(TestCase, self).setUp()
+ self.useFixture(WorkerCacheFixture())
self.addCleanup(django_cache.clear)
- self.addCleanup(pserv_cache.clear)
self.celery = self.useFixture(CeleryFixture())
=== modified file 'src/maastesting/testcase.py'
--- src/maastesting/testcase.py 2012-08-06 20:45:00 +0000
+++ src/maastesting/testcase.py 2012-08-23 16:08:23 +0000
@@ -20,6 +20,7 @@
from fixtures import TempDir
from maastesting.factory import factory
from maastesting.scenarios import WithScenarios
+from maastesting.worker_cache import WorkerCacheFixture
from nose.proxy import ResultProxy
from nose.tools import nottest
import testresources
@@ -77,6 +78,7 @@
def setUp(self):
super(TestCase, self).setUp()
+ self.useFixture(WorkerCacheFixture())
self.setUpResources()
def setUpResources(self):
=== added file 'src/maastesting/worker_cache.py'
--- src/maastesting/worker_cache.py 1970-01-01 00:00:00 +0000
+++ src/maastesting/worker_cache.py 2012-08-23 16:08:23 +0000
@@ -0,0 +1,34 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Fixture to simulate the cache that worker processes normally share."""
+
+from __future__ import (
+ absolute_import,
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = [
+ 'WorkerCacheFixture',
+ ]
+
+from fixtures import Fixture
+from provisioningserver import cache
+
+
+class WorkerCacheFixture(Fixture):
+ """Fake the cache that worker processes share."""
+
+ def setUp(self):
+ super(WorkerCacheFixture, self).setUp()
+ self.old_initialized = cache.initialized
+ self.old_cache = cache.cache
+ cache.cache = cache.Cache({})
+ cache.initalized = True
+
+ def cleanUp(self):
+ cache.cache = self.old_cache
+ cache.initialized = self.old_initialized
+ super(WorkerCacheFixture, self).cleanUp()
=== modified file 'src/provisioningserver/auth.py'
--- src/provisioningserver/auth.py 2012-08-17 05:20:19 +0000
+++ src/provisioningserver/auth.py 2012-08-23 16:08:23 +0000
@@ -19,7 +19,7 @@
]
from apiclient.creds import convert_string_to_tuple
-from provisioningserver.cache import cache
+from provisioningserver import cache
# Cache key for URL to the central MAAS server.
MAAS_URL_CACHE_KEY = 'maas_url'
@@ -33,12 +33,12 @@
def record_maas_url(maas_url):
"""Record the MAAS server URL as sent by the server."""
- cache.set(MAAS_URL_CACHE_KEY, maas_url)
+ cache.cache.set(MAAS_URL_CACHE_KEY, maas_url)
def get_recorded_maas_url():
"""Return the base URL for the MAAS server."""
- return cache.get(MAAS_URL_CACHE_KEY)
+ return cache.cache.get(MAAS_URL_CACHE_KEY)
def record_api_credentials(api_credentials):
@@ -48,7 +48,7 @@
a single string: consumer key, resource token, and resource seret
separated by colons.
"""
- cache.set(API_CREDENTIALS_CACHE_KEY, api_credentials)
+ cache.cache.set(API_CREDENTIALS_CACHE_KEY, api_credentials)
def get_recorded_api_credentials():
@@ -58,7 +58,7 @@
(consumer_key, resource_token, resource_secret) as expected by
:class:`MAASOauth`. Otherwise, None.
"""
- credentials_string = cache.get(API_CREDENTIALS_CACHE_KEY)
+ credentials_string = cache.cache.get(API_CREDENTIALS_CACHE_KEY)
if credentials_string is None:
return None
else:
@@ -67,7 +67,7 @@
def record_nodegroup_name(nodegroup_name):
"""Record the name of the nodegroup we manage, as sent by the server."""
- cache.set(NODEGROUP_NAME_CACHE_KEY, nodegroup_name)
+ cache.cache.set(NODEGROUP_NAME_CACHE_KEY, nodegroup_name)
def get_recorded_nodegroup_name():
@@ -75,4 +75,4 @@
If the server has not sent the name yet, returns None.
"""
- return cache.get(NODEGROUP_NAME_CACHE_KEY)
+ return cache.cache.get(NODEGROUP_NAME_CACHE_KEY)
=== modified file 'src/provisioningserver/cache.py'
--- src/provisioningserver/cache.py 2012-08-14 14:55:13 +0000
+++ src/provisioningserver/cache.py 2012-08-23 16:08:23 +0000
@@ -12,6 +12,7 @@
__metaclass__ = type
__all__ = [
'cache',
+ 'initialize',
]
@@ -34,8 +35,23 @@
self.cache_backend.clear()
-_manager = Manager()
-
-
-# Initialize the process-safe singleton cache.
-cache = Cache(_manager.dict())
+_manager = None
+
+cache = None
+
+initialized = False
+
+
+def initialize():
+ """Initialize cache of shared data between processes.
+
+ This needs to be done exactly once, by the parent process, before it
+ start forking off workers.
+ """
+ global _manager
+ global cache
+ global initialized
+ if not initialized:
+ _manager = Manager()
+ cache = Cache(_manager.dict())
+ initialized = True
=== modified file 'src/provisioningserver/dhcp/leases.py'
--- src/provisioningserver/dhcp/leases.py 2012-08-20 11:15:44 +0000
+++ src/provisioningserver/dhcp/leases.py 2012-08-23 16:08:23 +0000
@@ -43,12 +43,12 @@
MAASOAuth,
)
from celeryconfig import DHCP_LEASES_FILE
+from provisioningserver import cache
from provisioningserver.auth import (
get_recorded_api_credentials,
get_recorded_maas_url,
get_recorded_nodegroup_name,
)
-from provisioningserver.cache import cache
from provisioningserver.dhcp.leases_parser import parse_leases
from provisioningserver.logging import task_logger
@@ -82,8 +82,8 @@
# These variables are shared between worker threads/processes.
# A bit of inconsistency due to concurrent updates is not a problem,
# but read them both at once here to reduce the scope for trouble.
- previous_leases = cache.get(LEASES_CACHE_KEY)
- previous_leases_time = cache.get(LEASES_TIME_CACHE_KEY)
+ previous_leases = cache.cache.get(LEASES_CACHE_KEY)
+ previous_leases_time = cache.cache.get(LEASES_TIME_CACHE_KEY)
if get_leases_timestamp() == previous_leases_time:
return None
@@ -102,8 +102,8 @@
:param leases: A dict mapping each leased IP address to the MAC address
that it has been assigned to.
"""
- cache.set(LEASES_TIME_CACHE_KEY, last_change)
- cache.set(LEASES_CACHE_KEY, leases)
+ cache.cache.set(LEASES_TIME_CACHE_KEY, last_change)
+ cache.cache.set(LEASES_CACHE_KEY, leases)
def list_missing_items(knowledge):
=== modified file 'src/provisioningserver/dhcp/tests/test_leases.py'
--- src/provisioningserver/dhcp/tests/test_leases.py 2012-08-20 11:15:44 +0000
+++ src/provisioningserver/dhcp/tests/test_leases.py 2012-08-23 16:08:23 +0000
@@ -25,11 +25,11 @@
age_file,
get_write_time,
)
+from provisioningserver import cache
from provisioningserver.auth import (
MAAS_URL_CACHE_KEY,
NODEGROUP_NAME_CACHE_KEY,
)
-from provisioningserver.cache import cache
from provisioningserver.dhcp import leases as leases_module
from provisioningserver.dhcp.leases import (
check_lease_changes,
@@ -53,8 +53,10 @@
leases = factory.make_random_leases()
record_lease_state(time, leases)
self.assertEqual(
- (time, leases),
- (cache.get(LEASES_TIME_CACHE_KEY), cache.get(LEASES_CACHE_KEY)))
+ (time, leases), (
+ cache.cache.get(LEASES_TIME_CACHE_KEY),
+ cache.cache.get(LEASES_CACHE_KEY),
+ ))
class StopExecuting(BaseException):
@@ -111,12 +113,12 @@
def set_nodegroup_name(self):
"""Set the recorded nodegroup name for the duration of this test."""
name = factory.make_name('nodegroup')
- cache.set(NODEGROUP_NAME_CACHE_KEY, name)
+ cache.cache.set(NODEGROUP_NAME_CACHE_KEY, name)
return name
def clear_nodegroup_name(self):
"""Clear the recorded nodegroup name."""
- cache.set(NODEGROUP_NAME_CACHE_KEY, None)
+ cache.cache.set(NODEGROUP_NAME_CACHE_KEY, None)
def set_maas_url(self):
"""Set the recorded MAAS URL for the duration of this test."""
@@ -124,21 +126,21 @@
factory.make_name('host'),
factory.getRandomString(),
)
- cache.set(MAAS_URL_CACHE_KEY, maas_url)
+ cache.cache.set(MAAS_URL_CACHE_KEY, maas_url)
def clear_maas_url(self):
"""Clear the recorded MAAS API URL."""
- cache.set(MAAS_URL_CACHE_KEY, None)
+ cache.cache.set(MAAS_URL_CACHE_KEY, None)
def set_api_credentials(self):
"""Set recorded API credentials for the duration of this test."""
creds_string = ':'.join(
factory.getRandomString() for counter in range(3))
- cache.set('api_credentials', creds_string)
+ cache.cache.set('api_credentials', creds_string)
def clear_api_credentials(self):
"""Clear recorded API credentials."""
- cache.set('api_credentials', None)
+ cache.cache.set('api_credentials', None)
def set_items_needed_for_lease_update(self):
"""Set the recorded items required by `update_leases`."""
@@ -153,8 +155,8 @@
state so that it gets reset at the end of the test. Using this will
prevent recorded lease state from leaking into other tests.
"""
- cache.set(LEASES_TIME_CACHE_KEY, time)
- cache.set(LEASES_CACHE_KEY, leases)
+ cache.cache.set(LEASES_TIME_CACHE_KEY, time)
+ cache.cache.set(LEASES_CACHE_KEY, leases)
def test_record_lease_state_sets_leases_and_timestamp(self):
time = datetime.utcnow()
@@ -162,8 +164,10 @@
self.set_lease_state()
record_lease_state(time, leases)
self.assertEqual(
- (time, leases),
- (cache.get(LEASES_TIME_CACHE_KEY), cache.get(LEASES_CACHE_KEY)))
+ (time, leases), (
+ cache.cache.get(LEASES_TIME_CACHE_KEY),
+ cache.cache.get(LEASES_CACHE_KEY),
+ ))
def test_check_lease_changes_returns_tuple_if_no_state_cached(self):
self.set_lease_state()
=== added file 'src/provisioningserver/initialize_cache.py'
--- src/provisioningserver/initialize_cache.py 1970-01-01 00:00:00 +0000
+++ src/provisioningserver/initialize_cache.py 2012-08-23 16:08:23 +0000
@@ -0,0 +1,22 @@
+# Copyright 2012 Canonical Ltd. This software is licensed under the
+# GNU Affero General Public License version 3 (see the file LICENSE).
+
+"""Module with side effect: import to initialize the inter-worker cache.
+
+This is here merely so as to avoid accidental initialization of the cache.
+Import this module and the cache will be initialized.
+"""
+
+from __future__ import (
+ absolute_import,
+ print_function,
+ unicode_literals,
+ )
+
+__metaclass__ = type
+__all__ = []
+
+from provisioningserver.cache import initialize
+
+
+initialize()
=== modified file 'src/provisioningserver/testing/testcase.py'
--- src/provisioningserver/testing/testcase.py 2012-08-16 13:39:14 +0000
+++ src/provisioningserver/testing/testcase.py 2012-08-23 16:08:23 +0000
@@ -15,11 +15,11 @@
]
from maastesting import testcase
-from provisioningserver.cache import cache as pserv_cache
+from maastesting.worker_cache import WorkerCacheFixture
class PservTestCase(testcase.TestCase):
def setUp(self):
super(PservTestCase, self).setUp()
- self.addCleanup(pserv_cache.clear)
+ self.useFixture(WorkerCacheFixture())
=== modified file 'src/provisioningserver/tests/test_auth.py'
--- src/provisioningserver/tests/test_auth.py 2012-08-17 03:47:07 +0000
+++ src/provisioningserver/tests/test_auth.py 2012-08-23 16:08:23 +0000
@@ -14,8 +14,10 @@
from apiclient.creds import convert_tuple_to_string
from maastesting.factory import factory
-from provisioningserver import auth
-from provisioningserver.cache import cache
+from provisioningserver import (
+ auth,
+ cache,
+ )
from provisioningserver.testing.testcase import PservTestCase
@@ -34,7 +36,7 @@
creds_string = convert_tuple_to_string(make_credentials())
auth.record_api_credentials(creds_string)
self.assertEqual(
- creds_string, cache.get(auth.API_CREDENTIALS_CACHE_KEY))
+ creds_string, cache.cache.get(auth.API_CREDENTIALS_CACHE_KEY))
def test_get_recorded_api_credentials_returns_credentials_as_tuple(self):
creds = make_credentials()
=== modified file 'src/provisioningserver/tests/test_cache.py'
--- src/provisioningserver/tests/test_cache.py 2012-08-16 13:39:14 +0000
+++ src/provisioningserver/tests/test_cache.py 2012-08-23 16:08:23 +0000
@@ -15,22 +15,24 @@
from multiprocessing.managers import DictProxy
from maastesting.factory import factory
-from provisioningserver.cache import cache
+from provisioningserver import cache
from provisioningserver.testing.testcase import PservTestCase
class TestCache(PservTestCase):
- def test_cache_has_initialized_backend(self):
- self.assertIsInstance(cache.cache_backend, DictProxy)
+ def test_initialize_initializes_backend(self):
+ cache.initialize()
+ self.addCleanup(cache._manager.shutdown)
+ self.assertIsInstance(cache.cache.cache_backend, DictProxy)
def test_cache_stores_value(self):
key = factory.getRandomString()
value = factory.getRandomString()
- cache.set(key, value)
- self.assertEqual(value, cache.get(key))
+ cache.cache.set(key, value)
+ self.assertEqual(value, cache.cache.get(key))
def test_cache_clears_cache(self):
- cache.set(factory.getRandomString(), factory.getRandomString())
- cache.clear()
- self.assertEqual(0, len(cache.cache_backend))
+ cache.cache.set(factory.getRandomString(), factory.getRandomString())
+ cache.cache.clear()
+ self.assertEqual(0, len(cache.cache.cache_backend))
=== modified file 'src/provisioningserver/tests/test_tasks.py'
--- src/provisioningserver/tests/test_tasks.py 2012-08-23 08:16:14 +0000
+++ src/provisioningserver/tests/test_tasks.py 2012-08-23 16:08:23 +0000
@@ -27,9 +27,9 @@
from netaddr import IPNetwork
from provisioningserver import (
auth,
+ cache,
tasks,
)
-from provisioningserver.cache import cache
from provisioningserver.dns.config import (
conf,
DNSZoneConfig,
@@ -106,7 +106,7 @@
def test_updates_nodegroup_name(self):
nodegroup_name = factory.make_name('nodegroup')
refresh_secrets(nodegroup_name=nodegroup_name)
- self.assertEqual(nodegroup_name, cache.get('nodegroup_name'))
+ self.assertEqual(nodegroup_name, cache.cache.get('nodegroup_name'))
class TestPowerTasks(PservTestCase):
Follow ups