← Back to team overview

launchpad-reviewers team mailing list archive

[Merge] lp:~lifeless/python-oops-datedir-repo/less-rsync into lp:python-oops-datedir-repo

 

Robert Collins has proposed merging lp:~lifeless/python-oops-datedir-repo/less-rsync into lp:python-oops-datedir-repo.

Requested reviews:
  Launchpad code reviewers (launchpad-reviewers)
Related bugs:
  Bug #884551 in Python OOPS Date-dir repository: "RSync is needed to extract reports from a DateDirRepo"
  https://bugs.launchpad.net/python-oops-datedir-repo/+bug/884551

For more details, see:
https://code.launchpad.net/~lifeless/python-oops-datedir-repo/less-rsync/+merge/80861

This adds necessary support to have the datedir repo on each server scanner and uploaded to amqp (or any other publisher) in the future. The glue code for this will go in a separate project to avoid inappropriate dependencies between the repo and the amqp publisher.
-- 
https://code.launchpad.net/~lifeless/python-oops-datedir-repo/less-rsync/+merge/80861
Your team Launchpad code reviewers is requested to review the proposed merge of lp:~lifeless/python-oops-datedir-repo/less-rsync into lp:python-oops-datedir-repo.
=== modified file 'NEWS'
--- NEWS	2011-10-10 02:50:20 +0000
+++ NEWS	2011-11-01 02:13:23 +0000
@@ -6,6 +6,18 @@
 NEXT
 ----
 
+0.0.10
+------
+
+* New files are written to $name.tmp and then renamed to $name, allowing
+  readers to detect whether the file was finished or not.
+  (Robert Collins)
+
+* DateDirRepo.republish(publisher) can be used to treat a DateDirRepo as the
+  source of reports for feeding into a different publisher. This will remove
+  reports that are successfully republished.
+  (Robert Collins, #884551)
+
 0.0.9
 -----
 

=== modified file 'oops_datedir_repo/__init__.py'
--- oops_datedir_repo/__init__.py	2011-10-10 02:51:42 +0000
+++ oops_datedir_repo/__init__.py	2011-11-01 02:13:23 +0000
@@ -25,7 +25,7 @@
 # established at this point, and setup.py will use a version of next-$(revno).
 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
 # Otherwise it is major.minor.micro~$(revno).
-__version__ = (0, 0, 9, 'beta', 0)
+__version__ = (0, 0, 10, 'beta', 0)
 
 __all__ = [
     'DateDirRepo',

=== modified file 'oops_datedir_repo/repository.py'
--- oops_datedir_repo/repository.py	2011-10-13 02:14:59 +0000
+++ oops_datedir_repo/repository.py	2011-11-01 02:13:23 +0000
@@ -23,12 +23,14 @@
     ]
 
 import datetime
+from functools  import partial
 from hashlib import md5
 import os.path
 import stat
 
 from pytz import utc
 
+import serializer
 import serializer_bson
 import serializer_rfc822
 from uniquefileallocator import UniqueFileAllocator
@@ -78,6 +80,10 @@
     def publish(self, report, now=None):
         """Write the report to disk.
 
+        The report is written to a temporary file, and then renamed to its
+        final location. Programs concurrently reading from a DateDirRepo 
+        should ignore files ending in .tmp.
+
         :param now: The datetime to use as the current time.  Will be
             determined if not supplied.  Useful for testing.
         """
@@ -100,7 +106,8 @@
         if self.inherit_id:
             oopsid = report.get('id') or oopsid
         report['id'] = oopsid
-        self.serializer.write(report, open(filename, 'wb'))
+        self.serializer.write(report, open(filename + '.tmp', 'wb'))
+        os.rename(filename + '.tmp', filename)
         if self.stash_path:
             original_report['datedir_repo_filepath'] = filename
         # Set file permission to: rw-r--r-- (so that reports from
@@ -110,3 +117,57 @@
             stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
         os.chmod(filename, wanted_permission)
         return report['id']
+
+    def republish(self, publisher):
+        """Republish the contents of the DateDirRepo to another publisher.
+        
+        This makes it easy to treat a DateDirRepo as a backing store in message
+        queue environments: if the message queue is down, flush to the
+        DateDirRepo, then later pick the OOPSes up and send them to the message
+        queue environment.
+
+        For instance:
+
+          >>> repo = DateDirRepo('.')
+          >>> repo.publish({'some':'report'})
+          >>> queue = []
+          >>> def queue_publisher(report):
+          ...     queue.append(report)
+          ...     return report['id']
+          >>> repo.republish(queue_publisher)
+
+        Will scan the disk and send the single found report to queue_publisher,
+        deleting the report afterwards.
+
+        Empty datedir directories are automatically cleaned up, as are stale
+        .tmp files.
+
+        If the publisher returns None, signalling that it did not publish the
+        report, then the report is not deleted from disk.
+        """
+        two_days = datetime.timedelta(2)
+        now = datetime.date.today()
+        old = now - two_days
+        for dirname in os.listdir(self.root):
+            try:
+                y, m, d = dirname.split('-')
+            except ValueError:
+                # Not a datedir
+                continue
+            date = datetime.date(int(y),int(m),int(d))
+            prune = date < old
+            dirpath = os.path.join(self.root, dirname)
+            files = os.listdir(dirpath)
+            if not files and prune:
+                # Cleanup no longer needed directory.
+                os.rmdir(dirpath)
+            for candidate in map(partial(os.path.join, dirpath), files):
+                if candidate.endswith('.tmp'):
+                    if prune:
+                        os.unlink(candidate)
+                    continue
+                with file(candidate, 'rb') as report_file:
+                    report = serializer.read(report_file)
+                oopsid = publisher(report)
+                if oopsid:
+                    os.unlink(candidate)

=== modified file 'oops_datedir_repo/tests/test_repository.py'
--- oops_datedir_repo/tests/test_repository.py	2011-10-13 02:14:59 +0000
+++ oops_datedir_repo/tests/test_repository.py	2011-11-01 02:13:23 +0000
@@ -157,3 +157,81 @@
         with open(expected_path, 'rb') as fp:
             self.assertEqual(expected_disk_report, bson.loads(fp.read()))
 
+    def test_republish_not_published(self):
+        # If an OOPS being republished is not republished, it is preserved on
+        # disk.
+        repo = DateDirRepo(self.useFixture(TempDir()).path)
+        now = datetime.datetime(2006, 04, 01, 00, 30, 00, tzinfo=utc)
+        report = {'time': now}
+        repo.publish(report, now)
+        dir = repo.root + '/2006-04-01/'
+        files = os.listdir(dir)
+        expected_path = dir + files[0]
+        oopses = []
+        # append() returns None
+        publisher = oopses.append
+        repo.republish(publisher)
+        self.assertTrue(os.path.isfile(expected_path))
+        self.assertEqual(1, len(oopses))
+
+    def test_republish_ignores_current_dot_tmp_files(self):
+        # .tmp files are in-progress writes and not to be touched.
+        repo = DateDirRepo(self.useFixture(TempDir()).path, stash_path=True)
+        report = {}
+        repo.publish(report)
+        finished_path = report['datedir_repo_filepath']
+        inprogress_path = finished_path + '.tmp'
+        # Move the file to a temp path, simulating an in-progress write.
+        os.rename(finished_path, inprogress_path)
+        oopses = []
+        publisher = oopses.append
+        repo.republish(publisher)
+        self.assertTrue(os.path.isfile(inprogress_path))
+        self.assertEqual([], oopses)
+
+    def test_republish_republishes_and_removes(self):
+        # When a report is republished it is then removed from disk.
+        repo = DateDirRepo(self.useFixture(TempDir()).path, stash_path=True)
+        report = {}
+        repo.publish(report)
+        finished_path = report['datedir_repo_filepath']
+        oopses = []
+        def publish(report):
+            oopses.append(report)
+            return report['id']
+        repo.republish(publish)
+        self.assertFalse(os.path.isfile(finished_path))
+        self.assertEqual(1, len(oopses))
+
+    def test_republish_cleans_empty_old_directories(self):
+        # An empty old datedir directory cannot get new reports in it, so gets
+        # cleaned up to keep the worker efficient.
+        repo = DateDirRepo(self.useFixture(TempDir()).path)
+        os.mkdir(repo.root + '/2006-04-12')
+        repo.republish([].append)
+        self.assertFalse(os.path.exists(repo.root + '/2006-04-12'))
+
+    def test_republish_removes_old_dot_tmp_files(self):
+        # A .tmp file more than 24 hours old is probably never going to get
+        # renamed into place, so we just unlink it.
+        repo = DateDirRepo(self.useFixture(TempDir()).path)
+        now = datetime.datetime(2006, 04, 01, 00, 30, 00, tzinfo=utc)
+        report = {'time': now}
+        repo.publish(report, now)
+        dir = repo.root + '/2006-04-01/'
+        files = os.listdir(dir)
+        finished_path = dir + files[0]
+        inprogress_path = finished_path + '.tmp'
+        os.rename(finished_path, inprogress_path)
+        oopses = []
+        publisher = oopses.append
+        repo.republish(publisher)
+        self.assertFalse(os.path.isfile(inprogress_path))
+        self.assertEqual([], oopses)
+
+    def test_republish_no_error_non_datedir(self):
+        # The present of a non datedir directory in a datedir repo doesn't
+        # break things.
+        repo = DateDirRepo(self.useFixture(TempDir()).path)
+        os.mkdir(repo.root + '/foo')
+        repo.republish([].append)

=== modified file 'setup.py'
--- setup.py	2011-10-10 02:51:42 +0000
+++ setup.py	2011-11-01 02:13:23 +0000
@@ -22,7 +22,7 @@
 description = file(os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
 
 setup(name="oops_datedir_repo",
-      version="0.0.9",
+      version="0.0.10",
       description="OOPS disk serialisation and repository management.",
       long_description=description,
       maintainer="Launchpad Developers",