Skip to content

Commit

Permalink
Merge pull request #387 from alcionai/multiple-contents
Browse files Browse the repository at this point in the history
Allow manifest compaction to generate multiple content pieces
  • Loading branch information
ashmrtn authored Sep 5, 2024
2 parents 4147115 + 3f0966d commit c745927
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 15 deletions.
79 changes: 64 additions & 15 deletions repo/manifest/committed_manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/kopia/kopia/repo/content/index"
)

const maxManifestsPerContent = 1000000

// committedManifestManager manages committed manifest entries stored in 'm' contents.
type committedManifestManager struct {
b contentManager
Expand Down Expand Up @@ -87,7 +89,25 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma
m.lock()
defer m.unlock()

return m.writeEntriesLocked(ctx, entries)
return m.writeEntriesLocked(ctx, entries, false)
}

func (m *committedManifestManager) writeContentChunk(
ctx context.Context,
data any,
buf *gather.WriteBuffer,
) (content.ID, error) {
gz := gzip.NewWriter(buf)
mustSucceed(json.NewEncoder(gz).Encode(data))
mustSucceed(gz.Flush())
mustSucceed(gz.Close())

contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
if err != nil {
return content.EmptyID, errors.Wrap(err, "unable to write content")
}

return contentID, nil
}

// writeEntriesLocked writes entries in the provided map as manifest contents
Expand All @@ -98,38 +118,67 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma
// the lock via commitEntries()) and to compact existing committed entries during compaction
// where the lock is already being held.
// +checklocks:m.cmmu
func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) {
func (m *committedManifestManager) writeEntriesLocked(
ctx context.Context,
entries map[ID]*manifestEntry,
isCompaction bool,
) (map[content.ID]bool, error) {
if len(entries) == 0 {
return nil, nil
}

man := manifest{}
var (
buf gather.WriteBuffer
man manifest
newlyCommitted = map[content.ID]bool{}
)

defer buf.Close()

for _, e := range entries {
man.Entries = append(man.Entries, e)

// Still write all entries out in a single content piece if we're not
// compacting things.
if !isCompaction || len(man.Entries) < maxManifestsPerContent {
continue
}

contentID, err := m.writeContentChunk(ctx, man, &buf)
if err != nil {
return nil, errors.Wrap(err, "writing manifest data")
}

man.Entries = man.Entries[:0]
newlyCommitted[contentID] = true

buf.Reset()
}

var buf gather.WriteBuffer
defer buf.Close()
// Write out any remaining manifest entries.
if len(man.Entries) > 0 {
contentID, err := m.writeContentChunk(ctx, man, &buf)
if err != nil {
return nil, errors.Wrap(err, "writing final manifest data")
}

gz := gzip.NewWriter(&buf)
mustSucceed(json.NewEncoder(gz).Encode(man))
mustSucceed(gz.Flush())
mustSucceed(gz.Close())
man.Entries = man.Entries[:0]
newlyCommitted[contentID] = true

contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
if err != nil {
return nil, errors.Wrap(err, "unable to write content")
buf.Reset()
}

// Only update internal data if we successfully wrote all manifest contents.
for _, e := range entries {
m.committedEntries[e.ID] = e
delete(entries, e.ID)
}

m.committedContentIDs[contentID] = true
for contentID := range newlyCommitted {
m.committedContentIDs[contentID] = true
}

return map[content.ID]bool{contentID: true}, nil
return newlyCommitted, nil
}

// +checklocks:m.cmmu
Expand Down Expand Up @@ -260,7 +309,7 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error {
tmp[k] = v
}

written, err := m.writeEntriesLocked(ctx, tmp)
written, err := m.writeEntriesLocked(ctx, tmp, true)
if err != nil {
return err
}
Expand Down
68 changes: 68 additions & 0 deletions repo/manifest/manifest_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,3 +495,71 @@ func getManifestContentCount(ctx context.Context, t *testing.T, mgr *Manager) in

return foundContents
}

func TestWriteManyManifests(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}
item1 := map[string]int{"foo": 1, "bar": 2}
labels1 := map[string]string{"type": "item", "color": "red"}
numManifests := maxManifestsPerContent + 5

mgr := newManagerForTesting(ctx, t, data, ManagerOptions{})

for i := 0; i < numManifests; i++ {
addAndVerify(ctx, t, mgr, labels1, item1)
}

require.NoError(t, mgr.Flush(ctx))
require.NoError(t, mgr.b.Flush(ctx))

// Should only have a single content piece since this wasn't compaction.
foundContents := getManifestContentCount(ctx, t, mgr)
assert.Equal(t, 1, foundContents)

mans, err := mgr.Find(ctx, map[string]string{"color": "red"})
assert.NoError(t, err)
assert.Len(t, mans, numManifests)
}

func TestCompactManyManifests(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}
item1 := map[string]int{"foo": 1, "bar": 2}
labels1 := map[string]string{"type": "item", "color": "red"}

mgr := newManagerForTesting(ctx, t, data, ManagerOptions{})

for i := 0; i < maxManifestsPerContent-1; i++ {
addAndVerify(ctx, t, mgr, labels1, item1)
}

require.NoError(t, mgr.Flush(ctx))
require.NoError(t, mgr.b.Flush(ctx))

// Should only have a single content piece since this wasn't compaction.
foundContents := getManifestContentCount(ctx, t, mgr)
assert.Equal(t, 1, foundContents)

// Add individually so we can tell that compaction deleted the old content
// pieces.
for i := 0; i < 6; i++ {
addAndVerify(ctx, t, mgr, labels1, item1)

require.NoError(t, mgr.Flush(ctx))
require.NoError(t, mgr.b.Flush(ctx))
}

foundContents = getManifestContentCount(ctx, t, mgr)
assert.Equal(t, 7, foundContents)

// Run compaction which should result in multiple content pieces.
err := mgr.Compact(ctx)
require.NoError(t, err, "compacting manifests")

foundContents = getManifestContentCount(ctx, t, mgr)
assert.Equal(t, 2, foundContents)

mans, err := mgr.Find(ctx, map[string]string{"color": "red"})
assert.NoError(t, err)
assert.Len(t, mans, maxManifestsPerContent+5)
}

0 comments on commit c745927

Please sign in to comment.