launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #24486
[Merge] ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master.
Commit message:
Convert CodeImportJob to Storm
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/380981
We need to take a little care around the XML-RPC scheduler: despite XML-RPC strings being Unicode, Python 2's xmlrpclib likes to unmarshal them as plain str when they're within the ASCII subset, so we need to turn them back into Unicode.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-codeimportjob into launchpad:master.
diff --git a/lib/lp/code/model/codeimport.py b/lib/lp/code/model/codeimport.py
index a92ee7d..4162632 100644
--- a/lib/lp/code/model/codeimport.py
+++ b/lib/lp/code/model/codeimport.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Database classes including and related to CodeImport."""
@@ -34,6 +34,7 @@ from storm.references import Reference
from zope.component import getUtility
from zope.event import notify
from zope.interface import implementer
+from zope.security.proxy import removeSecurityProxy
from lp.app.errors import NotFoundError
from lp.code.enums import (
@@ -158,7 +159,7 @@ class CodeImport(SQLBase):
seconds = default_interval_dict.get(self.rcs_type, 21600)
return timedelta(seconds=seconds)
- import_job = Reference("<primary key>", "CodeImportJob.code_importID",
+ import_job = Reference("<primary key>", "CodeImportJob.code_import_id",
on_remote=True)
def getImportDetailsForDisplay(self):
@@ -347,9 +348,8 @@ class CodeImportSet:
def delete(self, code_import):
"""See `ICodeImportSet`."""
- from lp.code.model.codeimportjob import CodeImportJob
if code_import.import_job is not None:
- CodeImportJob.delete(code_import.import_job.id)
+ removeSecurityProxy(code_import.import_job).destroySelf()
CodeImport.delete(code_import.id)
def get(self, id):
diff --git a/lib/lp/code/model/codeimportjob.py b/lib/lp/code/model/codeimportjob.py
index 702c93c..a758adc 100644
--- a/lib/lp/code/model/codeimportjob.py
+++ b/lib/lp/code/model/codeimportjob.py
@@ -1,8 +1,10 @@
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Database classes for the CodeImportJob table."""
+from __future__ import absolute_import, print_function, unicode_literals
+
__metaclass__ = type
__all__ = [
'CodeImportJob',
@@ -10,13 +12,17 @@ __all__ = [
'CodeImportJobWorkflow',
]
-import datetime
-
-from sqlobject import (
- ForeignKey,
- IntCol,
- SQLObjectNotFound,
- StringCol,
+from datetime import timedelta
+
+import pytz
+from storm.expr import Cast
+from storm.locals import (
+ DateTime,
+ Desc,
+ Int,
+ Reference,
+ Store,
+ Unicode,
)
from zope.component import getUtility
from zope.interface import implementer
@@ -50,13 +56,10 @@ from lp.code.model.codeimportresult import CodeImportResult
from lp.registry.interfaces.person import validate_public_person
from lp.services.config import config
from lp.services.database.constants import UTC_NOW
-from lp.services.database.datetimecol import UtcDateTimeCol
-from lp.services.database.enumcol import EnumCol
+from lp.services.database.enumcol import DBEnum
from lp.services.database.interfaces import IStore
-from lp.services.database.sqlbase import (
- SQLBase,
- sqlvalues,
- )
+from lp.services.database.sqlbase import get_transaction_timestamp
+from lp.services.database.stormbase import StormBase
from lp.services.macaroons.interfaces import (
BadMacaroonContext,
IMacaroonIssuer,
@@ -66,53 +69,48 @@ from lp.services.macaroons.model import MacaroonIssuerBase
@implementer(ICodeImportJob)
-class CodeImportJob(SQLBase):
+class CodeImportJob(StormBase):
"""See `ICodeImportJob`."""
- date_created = UtcDateTimeCol(notNull=True, default=UTC_NOW)
+ __storm_table__ = 'CodeImportJob'
- code_import = ForeignKey(
- dbName='code_import', foreignKey='CodeImport', notNull=True)
+ id = Int(primary=True)
- machine = ForeignKey(
- dbName='machine', foreignKey='CodeImportMachine',
- notNull=False, default=None)
+ date_created = DateTime(tzinfo=pytz.UTC, allow_none=False, default=UTC_NOW)
- date_due = UtcDateTimeCol(notNull=True)
+ code_import_id = Int(name='code_import', allow_none=False)
+ code_import = Reference(code_import_id, 'CodeImport.id')
- state = EnumCol(
- enum=CodeImportJobState, notNull=True,
+ machine_id = Int(name='machine', allow_none=True, default=None)
+ machine = Reference(machine_id, 'CodeImportMachine.id')
+
+ date_due = DateTime(tzinfo=pytz.UTC, allow_none=False)
+
+ state = DBEnum(
+ enum=CodeImportJobState, allow_none=False,
default=CodeImportJobState.PENDING)
- requesting_user = ForeignKey(
- dbName='requesting_user', foreignKey='Person',
- storm_validator=validate_public_person,
- notNull=False, default=None)
+ requesting_user_id = Int(
+ name='requesting_user', allow_none=True,
+ validator=validate_public_person, default=None)
+ requesting_user = Reference(requesting_user_id, 'Person.id')
+
+ ordering = Int(allow_none=True, default=None)
- ordering = IntCol(notNull=False, default=None)
+ heartbeat = DateTime(tzinfo=pytz.UTC, allow_none=True, default=None)
- heartbeat = UtcDateTimeCol(notNull=False, default=None)
+ logtail = Unicode(allow_none=True, default=None)
- logtail = StringCol(notNull=False, default=None)
+ date_started = DateTime(tzinfo=pytz.UTC, allow_none=True, default=None)
- date_started = UtcDateTimeCol(notNull=False, default=None)
+ def __init__(self, code_import, date_due):
+ super(CodeImportJob, self).__init__()
+ self.code_import = code_import
+ self.date_due = date_due
def isOverdue(self):
"""See `ICodeImportJob`."""
- # SQLObject offers no easy way to compare a timestamp to UTC_NOW, so
- # we must use trickery here.
-
- # First we flush any pending update to self to ensure that the
- # following database query will give the correct result even if
- # date_due was modified in this transaction.
- self.syncUpdate()
-
- # Then, we try to find a CodeImportJob object with the id of self, and
- # a date_due of now or past. If we find one, this means self is
- # overdue.
- import_job = CodeImportJob.selectOne(
- "id = %s AND date_due <= %s" % sqlvalues(self.id, UTC_NOW))
- return import_job is not None
+ return self.date_due <= get_transaction_timestamp(Store.of(self))
def makeWorkerArguments(self):
"""See `ICodeImportJob`."""
@@ -169,6 +167,9 @@ class CodeImportJob(SQLBase):
result.append(macaroon.serialize())
return result
+ def destroySelf(self):
+ Store.of(self).remove(self)
+
@implementer(ICodeImportJobSet, ICodeImportJobSetPublic)
class CodeImportJobSet(object):
@@ -179,10 +180,7 @@ class CodeImportJobSet(object):
def getById(self, id):
"""See `ICodeImportJobSet`."""
- try:
- return CodeImportJob.get(id)
- except SQLObjectNotFound:
- return None
+ return IStore(CodeImportJob).get(CodeImportJob, id)
def getJobForMachine(self, hostname, worker_limit):
"""See `ICodeImportJobSet`."""
@@ -195,12 +193,12 @@ class CodeImportJobSet(object):
hostname, CodeImportMachineState.ONLINE)
elif not machine.shouldLookForJob(worker_limit):
return None
- job = CodeImportJob.selectOne(
- """id IN (SELECT id FROM CodeImportJob
- WHERE date_due <= %s AND state = %s
- ORDER BY requesting_user IS NULL, date_due
- LIMIT 1)"""
- % sqlvalues(UTC_NOW, CodeImportJobState.PENDING))
+ job = IStore(CodeImportJob).find(
+ CodeImportJob,
+ CodeImportJob.date_due <= UTC_NOW,
+ CodeImportJob.state == CodeImportJobState.PENDING).order_by(
+ CodeImportJob.requesting_user == None,
+ CodeImportJob.date_due).first()
if job is not None:
job_workflow.startJob(job, machine)
return job
@@ -209,11 +207,12 @@ class CodeImportJobSet(object):
def getReclaimableJobs(self):
"""See `ICodeImportJobSet`."""
+ interval = config.codeimportworker.maximum_heartbeat_interval
return IStore(CodeImportJob).find(
CodeImportJob,
- "state = %s and heartbeat < %s + '-%s seconds'"
- % sqlvalues(CodeImportJobState.RUNNING, UTC_NOW,
- config.codeimportworker.maximum_heartbeat_interval))
+ CodeImportJob.state == CodeImportJobState.RUNNING,
+ CodeImportJob.heartbeat < (
+ UTC_NOW - Cast(timedelta(seconds=interval), 'interval')))
@implementer(ICodeImportJobWorkflow)
@@ -233,16 +232,18 @@ class CodeImportJobWorkflow:
interval = code_import.effective_update_interval
job = CodeImportJob(code_import=code_import, date_due=UTC_NOW)
+ IStore(CodeImportJob).add(job)
# Find the most recent CodeImportResult for this CodeImport. We
# sort by date_created because we do not have an index on
# date_job_started in the database, and that should give the same
# sort order.
- most_recent_result_list = list(CodeImportResult.selectBy(
- code_import=code_import).orderBy(['-date_created']).limit(1))
+ most_recent_result = IStore(CodeImportResult).find(
+ CodeImportResult,
+ CodeImportResult.code_import == code_import).order_by(
+ Desc(CodeImportResult.date_created)).first()
- if len(most_recent_result_list) != 0:
- [most_recent_result] = most_recent_result_list
+ if most_recent_result is not None:
date_due = most_recent_result.date_job_started + interval
job.date_due = max(job.date_due, date_due)
@@ -301,7 +302,7 @@ class CodeImportJobWorkflow:
naked_job = removeSecurityProxy(import_job)
naked_job.date_started = UTC_NOW
naked_job.heartbeat = UTC_NOW
- naked_job.logtail = u''
+ naked_job.logtail = ''
naked_job.machine = machine
naked_job.state = CodeImportJobState.RUNNING
getUtility(ICodeImportEventSet).newStart(
@@ -365,7 +366,7 @@ class CodeImportJobWorkflow:
code_import.updateFromData(
dict(review_status=CodeImportReviewStatus.FAILING), None)
elif status == CodeImportResultStatus.SUCCESS_PARTIAL:
- interval = datetime.timedelta(0)
+ interval = timedelta(0)
elif failure_count > 0:
interval = (code_import.effective_update_interval *
(2 ** (failure_count - 1)))
@@ -406,7 +407,7 @@ class CodeImportJobWorkflow:
import_job, CodeImportResultStatus.RECLAIMED, None)
# 3)
if code_import.review_status == CodeImportReviewStatus.REVIEWED:
- self.newJob(code_import, datetime.timedelta(0))
+ self.newJob(code_import, timedelta(0))
# 4)
getUtility(ICodeImportEventSet).newReclaim(
code_import, machine, job_id)
diff --git a/lib/lp/code/model/codeimportmachine.py b/lib/lp/code/model/codeimportmachine.py
index 4dfaeb7..1983adc 100644
--- a/lib/lp/code/model/codeimportmachine.py
+++ b/lib/lp/code/model/codeimportmachine.py
@@ -1,4 +1,4 @@
-# Copyright 2009 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Database classes including and related to CodeImportMachine."""
@@ -14,6 +14,7 @@ from sqlobject import (
SQLMultipleJoin,
StringCol,
)
+from storm.locals import ReferenceSet
from zope.component import getUtility
from zope.interface import implementer
@@ -48,9 +49,9 @@ class CodeImportMachine(SQLBase):
default=CodeImportMachineState.OFFLINE)
heartbeat = UtcDateTimeCol(notNull=False)
- current_jobs = SQLMultipleJoin(
- 'CodeImportJob', joinColumn='machine',
- orderBy=['date_started', 'id'])
+ current_jobs = ReferenceSet(
+ '<primary key>', 'CodeImportJob.machine_id',
+ order_by=('CodeImportJob.date_started', 'CodeImportJob.id'))
events = SQLMultipleJoin(
'CodeImportEvent', joinColumn='machine',
diff --git a/lib/lp/code/model/tests/test_codeimport.py b/lib/lp/code/model/tests/test_codeimport.py
index a57f79e..7b5166a 100644
--- a/lib/lp/code/model/tests/test_codeimport.py
+++ b/lib/lp/code/model/tests/test_codeimport.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Unit tests for methods of CodeImport and CodeImportSet."""
@@ -47,6 +47,7 @@ from lp.code.model.codeimportresult import CodeImportResult
from lp.code.tests.codeimporthelpers import make_running_import
from lp.code.tests.helpers import GitHostingFixture
from lp.registry.interfaces.person import IPersonSet
+from lp.services.database.interfaces import IStore
from lp.testing import (
login,
login_person,
@@ -377,7 +378,7 @@ class TestCodeImportStatusUpdate(TestCodeImportBase):
self.import_operator = getUtility(IPersonSet).getByEmail(
'david.allouche@xxxxxxxxxxxxx')
# Remove existing jobs.
- for job in CodeImportJob.select():
+ for job in IStore(CodeImportJob).find(CodeImportJob):
job.destroySelf()
def makeApprovedImportWithPendingJob(self):
diff --git a/lib/lp/code/model/tests/test_codeimportjob.py b/lib/lp/code/model/tests/test_codeimportjob.py
index 68499dd..c099944 100644
--- a/lib/lp/code/model/tests/test_codeimportjob.py
+++ b/lib/lp/code/model/tests/test_codeimportjob.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2019 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Unit tests for CodeImportJob and CodeImportJobWorkflow."""
@@ -237,7 +237,7 @@ class TestCodeImportJobSetGetJobForMachine(TestCaseWithFactory):
# the sample data and set up some objects.
super(TestCodeImportJobSetGetJobForMachine, self).setUp()
login_for_code_imports()
- for job in CodeImportJob.select():
+ for job in IStore(CodeImportJob).find(CodeImportJob):
job.destroySelf()
self.machine = self.factory.makeCodeImportMachine(set_online=True)
@@ -351,7 +351,7 @@ class ReclaimableJobTests(TestCaseWithFactory):
def setUp(self):
super(ReclaimableJobTests, self).setUp()
login_for_code_imports()
- for job in CodeImportJob.select():
+ for job in IStore(CodeImportJob).find(CodeImportJob):
job.destroySelf()
def makeJobWithHeartbeatInPast(self, seconds_in_past):
diff --git a/lib/lp/code/xmlrpc/codeimportscheduler.py b/lib/lp/code/xmlrpc/codeimportscheduler.py
index 1cf05cb..35ba346 100644
--- a/lib/lp/code/xmlrpc/codeimportscheduler.py
+++ b/lib/lp/code/xmlrpc/codeimportscheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""The code import scheduler XML-RPC API."""
@@ -8,6 +8,7 @@ __all__ = [
'CodeImportSchedulerAPI',
]
+import six
from zope.component import getUtility
from zope.interface import implementer
from zope.security.proxy import removeSecurityProxy
@@ -35,7 +36,7 @@ class CodeImportSchedulerAPI(LaunchpadXMLRPCView):
def getJobForMachine(self, hostname, worker_limit):
"""See `ICodeImportScheduler`."""
job = getUtility(ICodeImportJobSet).getJobForMachine(
- hostname, worker_limit)
+ six.ensure_text(hostname), worker_limit)
if job is not None:
return job.id
else:
@@ -58,11 +59,13 @@ class CodeImportSchedulerAPI(LaunchpadXMLRPCView):
def updateHeartbeat(self, job_id, log_tail):
"""See `ICodeImportScheduler`."""
- return self._updateHeartbeat(job_id, log_tail)
+ return self._updateHeartbeat(job_id, six.ensure_text(log_tail))
def finishJobID(self, job_id, status_name, log_file_alias_url):
"""See `ICodeImportScheduler`."""
- return self._finishJobID(job_id, status_name, log_file_alias_url)
+ return self._finishJobID(
+ job_id, six.ensure_text(status_name),
+ six.ensure_text(log_file_alias_url))
@return_fault
def _getImportDataForJobID(self, job_id):
diff --git a/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py b/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py
index 60ad9ce..91ceae3 100644
--- a/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py
+++ b/lib/lp/code/xmlrpc/tests/test_codeimportscheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2010-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Test for the methods of `ICodeImportScheduler`."""
@@ -15,6 +15,7 @@ from lp.code.model.codeimportjob import CodeImportJob
from lp.code.tests.codeimporthelpers import make_running_import
from lp.code.xmlrpc.codeimportscheduler import CodeImportSchedulerAPI
from lp.services.database.constants import UTC_NOW
+from lp.services.database.interfaces import IStore
from lp.services.webapp import canonical_url
from lp.testing import (
run_with_login,
@@ -32,7 +33,7 @@ class TestCodeImportSchedulerAPI(TestCaseWithFactory):
TestCaseWithFactory.setUp(self)
self.api = CodeImportSchedulerAPI(None, None)
self.machine = self.factory.makeCodeImportMachine(set_online=True)
- for job in CodeImportJob.select():
+ for job in IStore(CodeImportJob).find(CodeImportJob):
job.destroySelf()
def makeCodeImportJob(self, running):
@@ -84,7 +85,7 @@ class TestCodeImportSchedulerAPI(TestCaseWithFactory):
def test_updateHeartbeat(self):
# updateHeartbeat calls the updateHeartbeat job workflow method.
code_import_job = self.makeCodeImportJob(running=True)
- log_tail = self.factory.getUniqueString()
+ log_tail = self.factory.getUniqueUnicode()
self.api.updateHeartbeat(code_import_job.id, log_tail)
self.assertSqlAttributeEqualsDate(
code_import_job, 'heartbeat', UTC_NOW)
diff --git a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
index 047cbc9..d2422cb 100644
--- a/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
+++ b/lib/lp/codehosting/codeimport/tests/test_workermonitor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Tests for the CodeImportWorkerMonitor and related classes."""
@@ -72,6 +72,7 @@ from lp.services.config.fixture import (
ConfigFixture,
ConfigUseFixture,
)
+from lp.services.database.interfaces import IStore
from lp.services.log.logger import BufferLogger
from lp.services.twistedsupport import suppress_stderr
from lp.services.twistedsupport.tests.test_processmonitor import (
@@ -669,7 +670,7 @@ class TestWorkerMonitorRunNoProcess(TestCase):
def nuke_codeimport_sample_data():
"""Delete all the sample data that might interfere with tests."""
- for job in CodeImportJob.select():
+ for job in IStore(CodeImportJob).find(CodeImportJob):
job.destroySelf()
for code_import in CodeImport.select():
code_import.destroySelf()
@@ -796,7 +797,7 @@ class TestWorkerMonitorIntegration(TestCaseInTempDir, TestCase):
code_import.updateFromData(
{'review_status': CodeImportReviewStatus.REVIEWED},
self.factory.makePerson())
- job = getUtility(ICodeImportJobSet).getJobForMachine('machine', 10)
+ job = getUtility(ICodeImportJobSet).getJobForMachine(u'machine', 10)
self.assertEqual(code_import, job.code_import)
source_details = CodeImportSourceDetails.fromArguments(
removeSecurityProxy(job.makeWorkerArguments()))
diff --git a/lib/lp/codehosting/codeimport/workermonitor.py b/lib/lp/codehosting/codeimport/workermonitor.py
index 767f0e3..2a2035a 100644
--- a/lib/lp/codehosting/codeimport/workermonitor.py
+++ b/lib/lp/codehosting/codeimport/workermonitor.py
@@ -1,4 +1,4 @@
-# Copyright 2009-2018 Canonical Ltd. This software is licensed under the
+# Copyright 2009-2020 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Code to talk to the database about what the worker script is doing."""
@@ -10,6 +10,7 @@ __all__ = []
import os
import tempfile
+import six
from twisted.internet import (
defer,
error,
@@ -57,7 +58,7 @@ class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout):
self, deferred, clock=clock,
timeout=config.codeimport.worker_inactivity_timeout)
self.worker_monitor = worker_monitor
- self._tail = ''
+ self._tail = b''
self._log_file = log_file
self._looping_call = task.LoopingCall(self._updateHeartbeat)
self._looping_call.clock = self._clock
@@ -91,7 +92,7 @@ class CodeImportWorkerMonitorProtocol(ProcessMonitorProtocolWithTimeout):
"""
self.resetTimeout()
self._log_file.write(data)
- self._tail = '\n'.join((self._tail + data).split('\n')[-5:])
+ self._tail = b'\n'.join((self._tail + data).split(b'\n')[-5:])
errReceived = outReceived
@@ -195,8 +196,12 @@ class CodeImportWorkerMonitor:
def updateHeartbeat(self, tail):
"""Call the updateHeartbeat method for the job we are working on."""
self._logger.debug("Updating heartbeat.")
+ # The log tail is really bytes, but it's stored in the database as a
+ # text column, so it's easiest to convert it to text now; passing
+ # text over XML-RPC requires less boilerplate than bytes anyway.
deferred = self.codeimport_endpoint.callRemote(
- 'updateHeartbeat', self._job_id, tail)
+ 'updateHeartbeat', self._job_id,
+ six.ensure_text(tail, errors='replace'))
return deferred.addErrback(self._trap_nosuchcodeimportjob)
def _createLibrarianFileAlias(self, name, size, file, contentType):