← Back to team overview

dulwich-users team mailing list archive

[PATCH 06/33] pack: Add a DeltaChainIterator for faster iteration of PackData.

 

From: Dave Borowitz <dborowitz@xxxxxxxxxx>

The existing pack iteration code uses an LRU size-bounded cache to store
the results of previous lookups. This works for some applications, but
in the general case of iterating over a large pack, some objects
(sometimes very many) will have to be inflated multiple times.

This commit adds a DeltaChainIterator class that iterates over a pack by
inflating each object exactly once and immediately following the chain
of deltas against that object.

For small packs where the entire inflated contents of the pack fit in
memory, this is somewhat slower than the current Dulwich implementation.
For larger packs such as linux-2.6.git and an aggressively-packed
git.git, this implementation is 2-3x slower than C git and 4+ times
faster than the current implementation.

Change-Id: I7783e691f80f191de3515b47fc5b14709a46624f
---
 NEWS                       |    5 +
 dulwich/pack.py            |  107 +++++++++++++++++
 dulwich/tests/test_pack.py |  276 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 388 insertions(+), 0 deletions(-)

diff --git a/NEWS b/NEWS
index a990242..c4f5f0b 100644
--- a/NEWS
+++ b/NEWS
@@ -1,5 +1,10 @@
 0.8.0	UNRELEASED
 
+ FEATURES
+
+  * New DeltaChainIterator abstract class for quickly inflating/iterating all
+    objects in a pack. (Dave Borowitz)
+
  BUG FIXES
 
   * Avoid storing all objects in memory when writing pack.
diff --git a/dulwich/pack.py b/dulwich/pack.py
index a94caf3..60eb306 100644
--- a/dulwich/pack.py
+++ b/dulwich/pack.py
@@ -1064,6 +1064,113 @@ class ThinPackData(PackData):
             raise KeyError([sha_to_hex(h) for h in postponed.keys()])
 
 
+class DeltaChainIterator(object):
+    """Abstract iterator over pack data based on delta chains.
+
+    Each object in the pack is guaranteed to be inflated exactly once,
+    regardless of how many objects reference it as a delta base. As a result,
+    memory usage is proportional to the length of the longest delta chain.
+
+    Subclasses override _result to define the result type of the iterator.
+    """
+
+    _compute_crc32 = False
+
+    def __init__(self, file_obj, resolve_ext_ref=None):
+        self._file = file_obj
+        self._resolve_ext_ref = resolve_ext_ref
+        self._pending_ofs = defaultdict(list)
+        self._pending_ref = defaultdict(list)
+        self._full_ofs = []
+        self._shas = {}
+
+    @classmethod
+    def for_pack_data(cls, pack_data):
+        walker = cls(None)
+        walker.set_pack_data(pack_data)
+        for offset, type_num, obj, _ in pack_data.iterobjects():
+            walker.record(offset, type_num, obj)
+        return walker
+
+    def record(self, offset, type_num, uncomp):
+        if type_num == OFS_DELTA:
+            delta_offset, _ = uncomp
+            base_offset = offset - delta_offset
+            self._pending_ofs[base_offset].append(offset)
+        elif type_num == REF_DELTA:
+            base_sha, _ = uncomp
+            self._pending_ref[base_sha].append(offset)
+        else:
+            self._full_ofs.append((offset, type_num))
+
+    def set_pack_data(self, pack_data):
+        self._file = pack_data._file
+        if isinstance(pack_data, ThinPackData):
+            self._resolve_ext_ref = pack_data.resolve_ext_ref
+
+    def _walk_all_chains(self):
+        for offset, type_num in self._full_ofs:
+            for result in self._follow_chain(offset, type_num, None):
+                yield result
+        for result in self._walk_ref_chains():
+            yield result
+        assert not self._pending_ofs
+
+    def _ensure_no_pending(self):
+        if self._pending_ref:
+            raise KeyError([sha_to_hex(s) for s in self._pending_ref])
+
+    def _walk_ref_chains(self):
+        if not self._resolve_ext_ref:
+            self._ensure_no_pending()
+            return
+
+        for base_sha, pending in sorted(self._pending_ref.iteritems()):
+            try:
+                type_num, chunks = self._resolve_ext_ref(base_sha)
+            except KeyError:
+                # Not an external ref, but may depend on one. Either it will get
+                # popped via a _follow_chain call, or we will raise an error
+                # below.
+                continue
+            self._pending_ref.pop(base_sha)
+            for new_offset in pending:
+                for result in self._follow_chain(new_offset, type_num, chunks):
+                    yield result
+
+        self._ensure_no_pending()
+
+    def _result(self, offset, type_num, chunks, sha, crc32):
+        raise NotImplementedError
+
+    def _resolve_object(self, offset, base_type_num, base_chunks):
+        self._file.seek(offset)
+        type_num, obj, _, crc32, _ = unpack_object(
+          self._file.read, compute_crc32=self._compute_crc32)
+        if base_chunks is None:
+            assert type_num == base_type_num
+            chunks = obj
+        else:
+            assert type_num in DELTA_TYPES
+            _, delta_chunks = obj
+            chunks = apply_delta(base_chunks, delta_chunks)
+        sha = obj_sha(base_type_num, chunks)
+        return chunks, sha, crc32
+
+    def _follow_chain(self, offset, base_type_num, base_chunks):
+        # Unlike PackData.get_object_at, there is no need to cache offsets as
+        # this approach by design inflates each object exactly once.
+        chunks, sha, crc32 = self._resolve_object(offset, base_type_num,
+                                                  base_chunks)
+        yield self._result(offset, base_type_num, chunks, sha, crc32)
+
+        pending = chain(self._pending_ofs.pop(offset, []),
+                        self._pending_ref.pop(sha, []))
+        for new_offset in pending:
+            for result in self._follow_chain(new_offset, base_type_num, chunks):
+                yield result
+
+
 class SHA1Reader(object):
     """Wrapper around a file-like object that remembers the SHA1 of its data."""
 
diff --git a/dulwich/tests/test_pack.py b/dulwich/tests/test_pack.py
index f1ec1fe..573b39f 100644
--- a/dulwich/tests/test_pack.py
+++ b/dulwich/tests/test_pack.py
@@ -32,15 +32,24 @@ from dulwich.errors import (
 from dulwich.file import (
     GitFile,
     )
+from dulwich.object_store import (
+    MemoryObjectStore,
+    )
 from dulwich.objects import (
     Blob,
     hex_to_sha,
     sha_to_hex,
+    Commit,
     Tree,
+    Blob,
     )
 from dulwich.pack import (
+    OFS_DELTA,
+    REF_DELTA,
+    DELTA_TYPES,
     MemoryPackIndex,
     Pack,
+    obj_sha,
     PackData,
     ThinPackData,
     apply_delta,
@@ -51,11 +60,17 @@ from dulwich.pack import (
     write_pack_header,
     write_pack_index_v1,
     write_pack_index_v2,
+    SHA1Writer,
+    write_pack_object,
     write_pack,
+    DeltaChainIterator,
     )
 from dulwich.tests import (
     TestCase,
     )
+from utils import (
+    make_object,
+    )
 
 pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
 
@@ -567,3 +582,264 @@ class DeltifyTests(TestCase):
             (b2.type_num, b2.sha().digest(), b1.sha().digest(), delta)
             ],
             list(deltify_pack_objects([(b1, ""), (b2, "")])))
+
+
+class TestPackIterator(DeltaChainIterator):
+
+    _compute_crc32 = True
+
+    def __init__(self, pack_data):
+        super(TestPackIterator, self).__init__(pack_data)
+        self._unpacked = set()
+
+    def _result(self, offset, type_num, chunks, sha, crc32):
+        return offset, type_num, ''.join(chunks), sha, crc32
+
+    def _resolve_object(self, offset, base_type_num, base_chunks):
+        assert offset not in self._unpacked, ('Attempted to re-inflate offset '
+                                              '%i' % offset)
+        self._unpacked.add(offset)
+        return super(TestPackIterator, self)._resolve_object(
+          offset, base_type_num, base_chunks)
+
+
+
+class DeltaChainIteratorTests(TestCase):
+
+    def setUp(self):
+        self.store = MemoryObjectStore()
+        self.fetched = set()
+
+    def store_blobs(self, blobs_data):
+        blobs = []
+        for data in blobs_data:
+            blob = make_object(Blob, data=data)
+            blobs.append(blob)
+            self.store.add_object(blob)
+        return blobs
+
+    def write_pack_data(self, objects_spec):
+        """Write test pack data from a concise spec.
+
+        :param objects_spec: A list of (type_num, obj). For non-delta types, obj
+            is the string of that object's data.
+
+            For delta types, obj is a tuple of (base_index, data), where
+            base_index is the index in objects_spec of the base for that delta,
+            and data is the full, non-deltified data for that object.
+            (Offsets/refs and deltas are computed within this function.)
+
+        :return: A tuple of (f, entries), where f is a file-like object pointed
+            at the beginning of a pack with the requested data, and entries is a
+            list of tuples of:
+              (offset, type num, data, sha, CRC32)
+            These tuples match the result format from TestPackIterator, and are
+            returned in the order specified by objects_spec.
+        """
+        f = StringIO()
+        sf = SHA1Writer(f)
+        num_objects = len(objects_spec)
+        write_pack_header(sf, num_objects)
+
+        full_objects = {}
+        offsets = {}
+        crc32s = {}
+
+        while len(full_objects) < num_objects:
+            for i, (type_num, data) in enumerate(objects_spec):
+                if type_num not in DELTA_TYPES:
+                    full_objects[i] = (type_num, data,
+                                       obj_sha(type_num, [data]))
+                    continue
+                base, data = data
+                if isinstance(base, int):
+                    if base not in full_objects:
+                        continue
+                    base_type_num, _, _ = full_objects[base]
+                else:
+                    base_type_num, _ = self.store.get_raw(base)
+                full_objects[i] = (base_type_num, data,
+                                   obj_sha(base_type_num, [data]))
+
+        for i, (type_num, obj) in enumerate(objects_spec):
+            offset = f.tell()
+            if type_num == OFS_DELTA:
+                base_index, data = obj
+                base = offset - offsets[base_index]
+                _, base_data, _ = full_objects[base_index]
+                obj = (base, create_delta(base_data, data))
+            elif type_num == REF_DELTA:
+                base_ref, data = obj
+                if isinstance(base_ref, int):
+                    _, base_data, base = full_objects[base_ref]
+                else:
+                    base_type_num, base_data = self.store.get_raw(base_ref)
+                    base = obj_sha(base_type_num, base_data)
+                obj = (base, create_delta(base_data, data))
+
+            crc32 = write_pack_object(sf, type_num, obj)
+            offsets[i] = offset
+            crc32s[i] = crc32
+
+        expected = []
+        for i in xrange(num_objects):
+            type_num, data, sha = full_objects[i]
+            expected.append((offsets[i], type_num, data, sha, crc32s[i]))
+
+        sf.write_sha()
+        f.seek(0)
+        return f, expected
+
+    def get_raw_no_repeat(self, bin_sha):
+        """Wrapper around store.get_raw that doesn't allow repeat lookups."""
+        hex_sha = sha_to_hex(bin_sha)
+        self.assertFalse(hex_sha in self.fetched,
+                         'Attempted to re-fetch object %s' % hex_sha)
+        self.fetched.add(hex_sha)
+        return self.store.get_raw(hex_sha)
+
+    def make_pack_iter(self, f, thin=None):
+        if thin is None:
+            thin = bool(list(self.store))
+        if thin:
+            data = ThinPackData(self.get_raw_no_repeat, 'test.pack', file=f)
+        else:
+            data = PackData('test.pack', file=f)
+        return TestPackIterator.for_pack_data(data)
+
+    def assertEntriesMatch(self, expected_indexes, entries, pack_iter):
+        expected = [entries[i] for i in expected_indexes]
+        self.assertEqual(expected, list(pack_iter._walk_all_chains()))
+
+    def test_no_deltas(self):
+        f, entries = self.write_pack_data([
+          (Commit.type_num, 'commit'),
+          (Blob.type_num, 'blob'),
+          (Tree.type_num, 'tree'),
+          ])
+        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
+
+    def test_ofs_deltas(self):
+        f, entries = self.write_pack_data([
+          (Blob.type_num, 'blob'),
+          (OFS_DELTA, (0, 'blob1')),
+          (OFS_DELTA, (0, 'blob2')),
+          ])
+        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
+
+    def test_ofs_deltas_chain(self):
+        f, entries = self.write_pack_data([
+          (Blob.type_num, 'blob'),
+          (OFS_DELTA, (0, 'blob1')),
+          (OFS_DELTA, (1, 'blob2')),
+          ])
+        self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
+
+    def test_ref_deltas(self):
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (1, 'blob1')),
+          (Blob.type_num, ('blob')),
+          (REF_DELTA, (1, 'blob2')),
+          ])
+        self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))
+
+    def test_ref_deltas_chain(self):
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (2, 'blob1')),
+          (Blob.type_num, ('blob')),
+          (REF_DELTA, (1, 'blob2')),
+          ])
+        self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
+
+    def test_ofs_and_ref_deltas(self):
+        # Deltas pending on this offset are popped before deltas depending on
+        # this ref.
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (1, 'blob1')),
+          (Blob.type_num, ('blob')),
+          (OFS_DELTA, (1, 'blob2')),
+          ])
+        self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
+
+    def test_mixed_chain(self):
+        f, entries = self.write_pack_data([
+          (Blob.type_num, 'blob'),
+          (REF_DELTA, (2, 'blob2')),
+          (OFS_DELTA, (0, 'blob1')),
+          (OFS_DELTA, (1, 'blob3')),
+          (OFS_DELTA, (0, 'bob')),
+          ])
+        self.assertEntriesMatch([0, 2, 1, 3, 4], entries,
+                                self.make_pack_iter(f))
+
+    def test_long_chain(self):
+        n = 100
+        objects_spec = [(Blob.type_num, 'blob')]
+        for i in xrange(n):
+            objects_spec.append((OFS_DELTA, (i, 'blob%i' % i)))
+        f, entries = self.write_pack_data(objects_spec)
+        self.assertEntriesMatch(xrange(n + 1), entries, self.make_pack_iter(f))
+
+    def test_branchy_chain(self):
+        n = 100
+        objects_spec = [(Blob.type_num, 'blob')]
+        for i in xrange(n):
+            objects_spec.append((OFS_DELTA, (0, 'blob%i' % i)))
+        f, entries = self.write_pack_data(objects_spec)
+        self.assertEntriesMatch(xrange(n + 1), entries, self.make_pack_iter(f))
+
+    def test_ext_ref(self):
+        blob, = self.store_blobs(['blob'])
+        f, entries = self.write_pack_data([(REF_DELTA, (blob.id, 'blob1'))])
+        self.assertEntriesMatch([0], entries, self.make_pack_iter(f))
+
+    def test_ext_ref_chain(self):
+        blob, = self.store_blobs(['blob'])
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (1, 'blob2')),
+          (REF_DELTA, (blob.id, 'blob1')),
+          ])
+        self.assertEntriesMatch([1, 0], entries, self.make_pack_iter(f))
+
+    def test_ext_ref_multiple_times(self):
+        blob, = self.store_blobs(['blob'])
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (blob.id, 'blob1')),
+          (REF_DELTA, (blob.id, 'blob2')),
+          ])
+        self.assertEntriesMatch([0, 1], entries, self.make_pack_iter(f))
+
+    def test_multiple_ext_refs(self):
+        b1, b2 = self.store_blobs(['foo', 'bar'])
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (b1.id, 'foo1')),
+          (REF_DELTA, (b2.id, 'bar2')),
+          ])
+        self.assertEntriesMatch([0, 1], entries, self.make_pack_iter(f))
+
+    def test_bad_ext_ref_non_thin_pack(self):
+        blob, = self.store_blobs(['blob'])
+        f, entries = self.write_pack_data([(REF_DELTA, (blob.id, 'blob1'))])
+        pack_iter = self.make_pack_iter(f, thin=False)
+        try:
+            list(pack_iter._walk_all_chains())
+            self.fail()
+        except KeyError, e:
+            self.assertEqual(([blob.id],), e.args)
+
+    def test_bad_ext_ref_thin_pack(self):
+        b1, b2, b3 = self.store_blobs(['foo', 'bar', 'baz'])
+        f, entries = self.write_pack_data([
+          (REF_DELTA, (1, 'foo99')),
+          (REF_DELTA, (b1.id, 'foo1')),
+          (REF_DELTA, (b2.id, 'bar2')),
+          (REF_DELTA, (b3.id, 'baz3')),
+          ])
+        del self.store[b2.id]
+        del self.store[b3.id]
+        pack_iter = self.make_pack_iter(f)
+        try:
+            list(pack_iter._walk_all_chains())
+            self.fail()
+        except KeyError, e:
+            self.assertEqual((sorted([b2.id, b3.id]),), e.args)
-- 
1.7.3.1



References