launchpad-reviewers team mailing list archive
-
launchpad-reviewers team
-
Mailing list archive
-
Message #30240
[Merge] ~cjwatson/launchpad:stormify-queuedepth-queries into launchpad:master
Colin Watson has proposed merging ~cjwatson/launchpad:stormify-queuedepth-queries into launchpad:master.
Commit message:
Convert lp.buildmaster.queuedepth queries to Storm expressions
Requested reviews:
Launchpad code reviewers (launchpad-reviewers)
For more details, see:
https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/446807
We generally prefer having less in the way of constructing SQL queries by pasting strings together. This will also make it a little easier to convert `Processor` to Storm.
Most of the transformations are reasonably obvious; I switched from `EXTRACT(EPOCH FROM ...)` to `date_part('epoch', ...)` because the latter can be expressed in Storm expressions more conveniently, but https://www.postgresql.org/docs/10/functions-datetime.html says that they're equivalent.
--
Your team Launchpad code reviewers is requested to review the proposed merge of ~cjwatson/launchpad:stormify-queuedepth-queries into launchpad:master.
diff --git a/lib/lp/buildmaster/queuedepth.py b/lib/lp/buildmaster/queuedepth.py
index 05adfa1..354d07e 100644
--- a/lib/lp/buildmaster/queuedepth.py
+++ b/lib/lp/buildmaster/queuedepth.py
@@ -8,13 +8,26 @@ __all__ = [
from collections import defaultdict
from datetime import datetime, timedelta, timezone
-from storm.expr import Count
+from storm.databases.postgres import Case
+from storm.expr import (
+ And,
+ Cast,
+ Count,
+ Desc,
+ Func,
+ Is,
+ IsNot,
+ Min,
+ Not,
+ Or,
+ Select,
+ Sum,
+)
from lp.buildmaster.enums import BuildQueueStatus
from lp.buildmaster.model.builder import Builder, BuilderProcessor
from lp.buildmaster.model.buildqueue import BuildQueue
from lp.services.database.interfaces import IStore
-from lp.services.database.sqlbase import sqlvalues
def get_builder_data():
@@ -56,26 +69,29 @@ def get_builder_data():
def get_free_builders_count(processor, virtualized):
"""How many builders capable of running jobs for the given processor
and virtualization combination are idle/free at present?"""
- query = """
- SELECT COUNT(id) FROM builder
- WHERE
- builderok = TRUE AND manual = FALSE
- AND id NOT IN (
- SELECT builder FROM BuildQueue WHERE builder IS NOT NULL)
- AND virtualized = %s
- """ % sqlvalues(
- virtualized
- )
+ clauses = [
+ Is(Builder._builderok, True),
+ Is(Builder.manual, False),
+ Not(
+ Builder.id.is_in(
+ Select(
+ BuildQueue.builder_id,
+ where=IsNot(BuildQueue.builder_id, None),
+ )
+ )
+ ),
+ Is(Builder.virtualized, virtualized),
+ ]
if processor is not None:
- query += """
- AND id IN (
- SELECT builder FROM BuilderProcessor WHERE processor = %s)
- """ % sqlvalues(
- processor
+ clauses.append(
+ Builder.id.is_in(
+ Select(
+ BuilderProcessor.builder_id,
+ where=(BuilderProcessor.processor == processor),
+ )
+ )
)
- result_set = IStore(BuildQueue).execute(query)
- free_builders = result_set.get_one()[0]
- return free_builders
+ return IStore(Builder).find(Builder.id, *clauses).count()
def get_head_job_platform(bq):
@@ -89,18 +105,15 @@ def get_head_job_platform(bq):
platform or None if the JOI is the head job.
"""
my_platform = (getattr(bq.processor, "id", None), bq.virtualized)
- query = """
- SELECT
- processor,
- virtualized
- FROM BuildQueue
- WHERE
- """
- query += get_pending_jobs_clauses(bq)
- query += """
- ORDER BY lastscore DESC, id LIMIT 1
- """
- result = IStore(BuildQueue).execute(query).get_one()
+ result = (
+ IStore(BuildQueue)
+ .find(
+ (BuildQueue.processor_id, BuildQueue.virtualized),
+ *get_pending_jobs_clauses(bq),
+ )
+ .order_by(Desc(BuildQueue.lastscore), BuildQueue.id)
+ .first()
+ )
return my_platform if result is None else result
@@ -132,84 +145,77 @@ def estimate_time_to_next_builder(bq, now=None):
head_job_processor, head_job_virtualized = head_job_platform
now = now or datetime.now(timezone.utc)
- delay_query = """
- SELECT MIN(
- CASE WHEN
- EXTRACT(EPOCH FROM
- (BuildQueue.estimated_duration -
- (((%s AT TIME ZONE 'UTC') - BuildQueue.date_started)))) >= 0
- THEN
- EXTRACT(EPOCH FROM
- (BuildQueue.estimated_duration -
- (((%s AT TIME ZONE 'UTC') - BuildQueue.date_started))))
- ELSE
- -- Assume that jobs that have overdrawn their estimated
- -- duration time budget will complete within 2 minutes.
- -- This is a wild guess but has worked well so far.
- --
- -- Please note that this is entirely innocuous i.e. if our
- -- guess is off nothing bad will happen but our estimate will
- -- not be as good as it could be.
- 120
- END)
- FROM
- BuildQueue, Builder
- WHERE
- BuildQueue.builder = Builder.id
- AND Builder.manual = False
- AND Builder.builderok = True
- AND BuildQueue.status = %s
- AND Builder.virtualized = %s
- """ % sqlvalues(
- now, now, BuildQueueStatus.RUNNING, head_job_virtualized
- )
-
+ delay_clauses = [
+ BuildQueue.builder_id == Builder.id,
+ Is(Builder.manual, False),
+ Is(Builder._builderok, True),
+ BuildQueue.status == BuildQueueStatus.RUNNING,
+ Is(Builder.virtualized, head_job_virtualized),
+ ]
if head_job_processor is not None:
# Only look at builders with specific processor types.
- delay_query += """
- AND Builder.id IN (
- SELECT builder FROM BuilderProcessor WHERE processor = %s)
- """ % sqlvalues(
- head_job_processor
+ delay_clauses.append(
+ Builder.id.is_in(
+ Select(
+ BuilderProcessor.builder_id,
+ where=(BuilderProcessor.processor == head_job_processor),
+ )
+ )
+ )
+ estimated_remaining = Func(
+ "date_part",
+ "epoch",
+ BuildQueue.date_started + BuildQueue.estimated_duration - now,
+ )
+ head_job_delay = (
+ IStore(BuildQueue)
+ .find(
+ Min(
+ Case(
+ cases=((estimated_remaining >= 0, estimated_remaining),),
+ # Assume that jobs that have overdrawn their estimated
+ # duration time budget will complete within 2 minutes.
+ # This is a wild guess but has worked well so far.
+ #
+ # Please note that this is entirely innocuous, i.e. if
+ # our guess is off nothing bad will happen but our
+ # estimate will not be as good as it could be.
+ default=120,
+ )
+ ),
+ *delay_clauses,
)
+ .one()
+ )
- result_set = IStore(BuildQueue).execute(delay_query)
- head_job_delay = result_set.get_one()[0]
return 0 if head_job_delay is None else int(head_job_delay)
def get_pending_jobs_clauses(bq):
- """WHERE clauses for pending job queries, used for dipatch time
- estimation."""
- clauses = """
- BuildQueue.status = %s
- AND (
- -- The score must be either above my score or the
- -- job must be older than me in cases where the
- -- score is equal.
- BuildQueue.lastscore > %s OR
- (BuildQueue.lastscore = %s AND BuildQueue.id < %s))
- AND buildqueue.virtualized = %s
- """ % sqlvalues(
- BuildQueueStatus.WAITING,
- bq.lastscore,
- bq.lastscore,
- bq.id,
- bq.virtualized,
- )
- processor_clause = """
- AND (
- -- The processor values either match or the candidate
- -- job is processor-independent.
- buildqueue.processor = %s OR
- buildqueue.processor IS NULL)
- """ % sqlvalues(
- bq.processor
- )
+ """Storm clauses for pending job queries, used for dispatch time
+ estimation.
+ """
+ clauses = [
+ BuildQueue.status == BuildQueueStatus.WAITING,
+ # Either the score must be above my score, or the job must be older
+ # than me in cases where the score is equal.
+ Or(
+ BuildQueue.lastscore > bq.lastscore,
+ And(BuildQueue.lastscore == bq.lastscore, BuildQueue.id < bq.id),
+ ),
+ Is(BuildQueue.virtualized, bq.virtualized),
+ ]
# We don't care about processors if the estimation is for a
# processor-independent job.
if bq.processor is not None:
- clauses += processor_clause
+ # Either the processor values match, or the candidate job is
+ # processor-independent.
+ clauses.append(
+ Or(
+ BuildQueue.processor == bq.processor,
+ Is(BuildQueue.processor_id, None),
+ )
+ )
return clauses
@@ -247,23 +253,26 @@ def estimate_job_delay(bq, builder_stats):
return a == b
my_platform = (getattr(bq.processor, "id", None), bq.virtualized)
- query = """
- SELECT
- BuildQueue.processor,
- BuildQueue.virtualized,
- COUNT(BuildQueue.id),
- CAST(EXTRACT(
- EPOCH FROM
- SUM(BuildQueue.estimated_duration)) AS INTEGER)
- FROM BuildQueue
- WHERE
- """
- query += get_pending_jobs_clauses(bq)
- query += """
- GROUP BY BuildQueue.processor, BuildQueue.virtualized
- """
-
- delays_by_platform = IStore(BuildQueue).execute(query).get_all()
+ delays_by_platform = (
+ IStore(BuildQueue)
+ .find(
+ (
+ BuildQueue.processor_id,
+ BuildQueue.virtualized,
+ Count(BuildQueue.id),
+ Cast(
+ Func(
+ "date_part",
+ "epoch",
+ Sum(BuildQueue.estimated_duration),
+ ),
+ "integer",
+ ),
+ ),
+ *get_pending_jobs_clauses(bq),
+ )
+ .group_by(BuildQueue.processor_id, BuildQueue.virtualized)
+ )
# This will be used to capture per-platform delay totals.
delays = defaultdict(int)