Skip to content

Commit

Permalink
Allow persisting multiple content pieces
Browse files Browse the repository at this point in the history
When compacting manifest data, allow persisting multiple content pieces.
This should be alright because flushes on the index are already
disabled, which means all changes should appear atomically.

Uses a max number of manifests per content piece as that's easily
available and can be used as a proxy for total content size to some
extent.
  • Loading branch information
ashmrtn committed Sep 3, 2024
1 parent 4147115 commit 35b8b0f
Showing 1 changed file with 64 additions and 15 deletions.
79 changes: 64 additions & 15 deletions repo/manifest/committed_manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/kopia/kopia/repo/content/index"
)

const maxManifestsPerContent = 1000000

// committedManifestManager manages committed manifest entries stored in 'm' contents.
type committedManifestManager struct {
b contentManager
Expand Down Expand Up @@ -87,7 +89,25 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma
m.lock()
defer m.unlock()

return m.writeEntriesLocked(ctx, entries)
return m.writeEntriesLocked(ctx, entries, false)
}

func (m *committedManifestManager) writeContentChunk(
ctx context.Context,
data any,
buf *gather.WriteBuffer,
) (content.ID, error) {
gz := gzip.NewWriter(buf)
mustSucceed(json.NewEncoder(gz).Encode(data))
mustSucceed(gz.Flush())
mustSucceed(gz.Close())

contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
if err != nil {
return content.EmptyID, errors.Wrap(err, "unable to write content")
}

return contentID, nil
}

// writeEntriesLocked writes entries in the provided map as manifest contents
Expand All @@ -98,38 +118,67 @@ func (m *committedManifestManager) commitEntries(ctx context.Context, entries ma
// the lock via commitEntries()) and to compact existing committed entries during compaction
// where the lock is already being held.
// +checklocks:m.cmmu
func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entries map[ID]*manifestEntry) (map[content.ID]bool, error) {
func (m *committedManifestManager) writeEntriesLocked(
ctx context.Context,
entries map[ID]*manifestEntry,
isCompaction bool,
) (map[content.ID]bool, error) {
if len(entries) == 0 {
return nil, nil
}

man := manifest{}
var (
buf gather.WriteBuffer
man manifest
newlyCommitted = map[content.ID]bool{}
)

defer buf.Close()

for _, e := range entries {
man.Entries = append(man.Entries, e)

// Still write all entries out in a single content piece if we're not
// compacting things.
if !isCompaction || len(man.Entries) < maxManifestsPerContent {
continue
}

contentID, err := m.writeContentChunk(ctx, man, &buf)
if err != nil {
return nil, errors.Wrap(err, "writing manifest data")
}

man.Entries = man.Entries[:0]
newlyCommitted[contentID] = true

buf.Reset()
}

var buf gather.WriteBuffer
defer buf.Close()
// Write out any remaining manifest entries.
if len(man.Entries) > 0 {
contentID, err := m.writeContentChunk(ctx, man, &buf)
if err != nil {
return nil, errors.Wrap(err, "writing final manifest data")
}

gz := gzip.NewWriter(&buf)
mustSucceed(json.NewEncoder(gz).Encode(man))
mustSucceed(gz.Flush())
mustSucceed(gz.Close())
man.Entries = man.Entries[:0]
newlyCommitted[contentID] = true

contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
if err != nil {
return nil, errors.Wrap(err, "unable to write content")
buf.Reset()
}

// Only update internal data if we successfully wrote all manifest contents.
for _, e := range entries {
m.committedEntries[e.ID] = e
delete(entries, e.ID)
}

m.committedContentIDs[contentID] = true
for contentID := range newlyCommitted {
m.committedContentIDs[contentID] = true
}

return map[content.ID]bool{contentID: true}, nil
return newlyCommitted, nil
}

// +checklocks:m.cmmu
Expand Down Expand Up @@ -260,7 +309,7 @@ func (m *committedManifestManager) compactLocked(ctx context.Context) error {
tmp[k] = v
}

written, err := m.writeEntriesLocked(ctx, tmp)
written, err := m.writeEntriesLocked(ctx, tmp, true)
if err != nil {
return err
}
Expand Down

0 comments on commit 35b8b0f

Please sign in to comment.