← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~wgrant/launchpad/parallel-gc into lp:launchpad

 

William Grant has proposed merging lp:~wgrant/launchpad/parallel-gc into lp:launchpad.

Commit message:
Run librariangc file deletions in parallel to work around Swift latency.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)

For more details, see:
https://code.launchpad.net/~wgrant/launchpad/parallel-gc/+merge/360461
-- 
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~wgrant/launchpad/parallel-gc into lp:launchpad.
=== modified file 'lib/lp/services/librarianserver/librariangc.py'
--- lib/lp/services/librarianserver/librariangc.py	2018-05-06 08:52:34 +0000
+++ lib/lp/services/librarianserver/librariangc.py	2018-12-10 10:21:48 +0000
@@ -11,6 +11,7 @@
     )
 import errno
 import hashlib
+import multiprocessing.pool
 import os
 import re
 import sys
@@ -523,6 +524,41 @@
         else:
             return False
 
+    def remove_content(self, content_id):
+        removed = []
+    
+        # Remove the file from disk, if it hasn't already been.
+        path = get_file_path(content_id)
+        try:
+            os.unlink(path)
+            removed.append('filesystem')
+        except OSError as e:
+            if e.errno != errno.ENOENT:
+                raise
+    
+        # Remove the file from Swift, if it hasn't already been.
+        if self.swift_enabled:
+            container, name = swift.swift_location(content_id)
+            with swift.connection() as swift_connection:
+                try:
+                    swift.quiet_swiftclient(
+                        swift_connection.delete_object, container, name)
+                    removed.append('Swift')
+                except swiftclient.ClientException as x:
+                    if x.http_status != 404:
+                        raise
+    
+        if removed:
+            log.debug3(
+                "Deleted %s from %s", content_id, ' & '.join(removed))
+    
+        elif config.librarian_server.upstream_host is None:
+            # It is normal to have files in the database that
+            # are not on disk if the Librarian has an upstream
+            # Librarian, such as on staging. Don't annoy the
+            # operator with noise in this case.
+            log.info("%s already deleted", path)
+
     def __call__(self, chunksize):
         chunksize = int(chunksize)
 
@@ -548,40 +584,13 @@
             SELECT content FROM UnreferencedLibraryFileContent
             WHERE id BETWEEN %s AND %s
             """, (self.index, self.index + chunksize - 1))
-        for content_id in (row[0] for row in cur.fetchall()):
-            removed = []
-
-            # Remove the file from disk, if it hasn't already been.
-            path = get_file_path(content_id)
-            try:
-                os.unlink(path)
-                removed.append('filesystem')
-            except OSError as e:
-                if e.errno != errno.ENOENT:
-                    raise
-
-            # Remove the file from Swift, if it hasn't already been.
-            if self.swift_enabled:
-                container, name = swift.swift_location(content_id)
-                with swift.connection() as swift_connection:
-                    try:
-                        swift.quiet_swiftclient(
-                            swift_connection.delete_object, container, name)
-                        removed.append('Swift')
-                    except swiftclient.ClientException as x:
-                        if x.http_status != 404:
-                            raise
-
-            if removed:
-                log.debug3(
-                    "Deleted %s from %s", content_id, ' & '.join(removed))
-
-            elif config.librarian_server.upstream_host is None:
-                # It is normal to have files in the database that
-                # are not on disk if the Librarian has an upstream
-                # Librarian, such as on staging. Don't annoy the
-                # operator with noise in this case.
-                log.info("%s already deleted", path)
+
+        pool = multiprocessing.pool.ThreadPool(10)
+        try:
+            pool.map(self.remove_content, (row[0] for row in cur.fetchall()))
+        finally:
+            pool.close()
+            pool.join()
         self.con.rollback()
 
         self.index += chunksize


Follow ups