← Back to team overview

launchpad-reviewers team mailing list archive

Re: [Merge] lp:~cjwatson/launchpad/dspc-update-garbo into lp:launchpad

 

Review: Approve code



Diff comments:

> 
> === modified file 'lib/lp/scripts/garbo.py'
> --- lib/lp/scripts/garbo.py	2016-05-19 20:58:16 +0000
> +++ lib/lp/scripts/garbo.py	2016-06-14 12:53:41 +0000
> @@ -480,6 +485,105 @@
>          transaction.commit()
>  
>  
> +class PopulateDistributionSourcePackageCache(TunableLoop):
> +    """Populate the DistributionSourcePackageCache table.
> +
> +    Ensure that new source publications have a row in
> +    DistributionSourcePackageCache.
> +    """
> +    maximum_chunk_size = 1000
> +
> +    def __init__(self, log, abort_time=None):
> +        super(PopulateDistributionSourcePackageCache, self).__init__(
> +            log, abort_time)
> +        self.store = IMasterStore(DistributionSourcePackageCache)
> +        # Keep a record of the processed source publication ID so we know
> +        # where the job got up to.
> +        self.last_spph_id = 0
> +        self.job_name = self.__class__.__name__
> +        job_data = load_garbo_job_state(self.job_name)
> +        if job_data:
> +            self.last_spph_id = job_data.get('last_spph_id', 0)
> +
> +    def getPendingUpdates(self):
> +        # Load the latest published source publication data.
> +        origin = [
> +            SourcePackagePublishingHistory,
> +            Join(
> +                SourcePackageRelease,
> +                SourcePackageRelease.id ==
> +                    SourcePackagePublishingHistory.sourcepackagereleaseID),

This join isn't necessary; SPPH has SPN.id.

> +            Join(
> +                SourcePackageName,
> +                SourcePackageName.id ==
> +                    SourcePackageRelease.sourcepackagenameID),
> +            Join(
> +                Archive,
> +                Archive.id == SourcePackagePublishingHistory.archiveID),
> +            ]
> +        rows = self.store.using(*origin).find(
> +            (SourcePackagePublishingHistory.id,
> +             Archive.id,
> +             Archive.distributionID,
> +             SourcePackageName.id,
> +             SourcePackageName.name),
> +            SourcePackagePublishingHistory.status.is_in((
> +                PackagePublishingStatus.PENDING,
> +                PackagePublishingStatus.PUBLISHED)),
> +            SourcePackagePublishingHistory.id > self.last_spph_id)
> +        return rows.order_by(SourcePackagePublishingHistory.id)
> +
> +    def isDone(self):
> +        return self.getPendingUpdates().is_empty()
> +
> +    def __call__(self, chunk_size):
> +        # Create a map of new source publications, keyed on (archive,
> +        # distribution, SPN).
> +        cache_filter_data = []
> +        new_records = {}
> +        for new_publication in self.getPendingUpdates()[:chunk_size]:
> +            (spph_id, archive_id, distribution_id,
> +             spn_id, spn_name) = new_publication
> +            cache_filter_data.append((archive_id, distribution_id, spn_id))
> +            new_records[(archive_id, distribution_id, spn_id)] = spn_name
> +            self.last_spph_id = spph_id
> +
> +        # Gather all the current cached records corresponding to the data in
> +        # the current batch.
> +        existing_records = set()
> +        rows = self.store.find(
> +            DistributionSourcePackageCache,
> +            In(
> +                Row(
> +                    DistributionSourcePackageCache.archiveID,
> +                    DistributionSourcePackageCache.distributionID,
> +                    DistributionSourcePackageCache.sourcepackagenameID),
> +                map(Row, cache_filter_data)))
> +        for dspc in rows:
> +            existing_records.add(
> +                (dspc.archiveID, dspc.distributionID,
> +                 dspc.sourcepackagenameID))
> +
> +        # Bulk-create missing cache rows.
> +        inserts = []
> +        for data in set(new_records) - existing_records:
> +            archive_id, distribution_id, spn_id = data
> +            inserts.append(
> +                (archive_id, distribution_id, spn_id, new_records[data]))
> +        if inserts:
> +            create(
> +                (DistributionSourcePackageCache.archiveID,
> +                 DistributionSourcePackageCache.distributionID,
> +                 DistributionSourcePackageCache.sourcepackagenameID,
> +                 DistributionSourcePackageCache.name),
> +                inserts)
> +
> +        self.store.flush()
> +        save_garbo_job_state(self.job_name, {
> +            'last_spph_id': self.last_spph_id})
> +        transaction.commit()
> +
> +
>  class PopulateLatestPersonSourcePackageReleaseCache(TunableLoop):
>      """Populate the LatestPersonSourcePackageReleaseCache table.
>  


-- 
https://code.launchpad.net/~cjwatson/launchpad/dspc-update-garbo/+merge/297333
Your team Launchpad code reviewers is subscribed to branch lp:launchpad.


References