Skip to content

Commit

Permalink
fix(repository): Disable manifest manager compaction when in read-onl…
Browse files Browse the repository at this point in the history
…y mode (kopia#3226)

* Thread ReadOnly option to manifest manager

Don't allow attempting to compact manifests if the repo was opened
read-only.

* Add test for disabling compaction in readonly mode

* Allow content manager to say if it's readonly

This can be leveraged by higher layers to determine if they should
attempt operations such as manifest compaction.

* Rework manifest compaction checks

Use whether the content manager is in read-only mode to help determine
if manifests should be compacted or not.
  • Loading branch information
ashmrtn authored Aug 22, 2023
1 parent 17c7e45 commit af08a41
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 2 deletions.
6 changes: 6 additions & 0 deletions repo/content/committed_read_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ type SharedManager struct {
metricsStruct
}

// IsReadOnly returns whether this instance of the SharedManager only supports
// reads or if it also supports mutations to the index.
func (sm *SharedManager) IsReadOnly() bool {
return sm.st.IsReadOnly()
}

// LoadIndexBlob return index information loaded from the specified blob.
func (sm *SharedManager) LoadIndexBlob(ctx context.Context, ibid blob.ID, d *gather.WriteBuffer) ([]Info, error) {
err := sm.st.GetBlob(ctx, ibid, 0, -1, d)
Expand Down
4 changes: 3 additions & 1 deletion repo/manifest/committed_manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func (m *committedManifestManager) compact(ctx context.Context) error {
func (m *committedManifestManager) maybeCompactLocked(ctx context.Context) error {
m.verifyLocked()

if len(m.committedContentIDs) < autoCompactionContentCount {
// Don't attempt to compact manifests if the repo was opened in read only mode
// since we'll just end up failing.
if m.b.IsReadOnly() || len(m.committedContentIDs) < autoCompactionContentCount {
return nil
}

Expand Down
1 change: 1 addition & 0 deletions repo/manifest/manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type contentManager interface {
DisableIndexFlush(ctx context.Context)
EnableIndexFlush(ctx context.Context)
Flush(ctx context.Context) error
IsReadOnly() bool
}

// ID is a unique identifier of a single manifest.
Expand Down
47 changes: 46 additions & 1 deletion repo/manifest/manifest_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob/readonly"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/encryption"
Expand Down Expand Up @@ -306,11 +308,15 @@ func sortIDs(s []ID) {
})
}

func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap) *Manager {
func newContentManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap, readOnly bool) contentManager {
t.Helper()

st := blobtesting.NewMapStorage(data, nil, nil)

if readOnly {
st = readonly.NewWrapper(st)
}

fop, err := format.NewFormattingOptionsProvider(&format.ContentFormat{
Hash: hashing.DefaultAlgorithm,
Encryption: encryption.DefaultAlgorithm,
Expand All @@ -327,6 +333,14 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da

t.Cleanup(func() { bm.CloseShared(ctx) })

return bm
}

func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.DataMap) *Manager {
t.Helper()

bm := newContentManagerForTesting(ctx, t, data, false)

mm, err := NewManager(ctx, bm, ManagerOptions{}, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -381,3 +395,34 @@ func TestManifestAutoCompaction(t *testing.T) {
require.NoError(t, mgr.b.Flush(ctx))
}
}

func TestManifestAutoCompactionWithReadOnly(t *testing.T) {
ctx := testlogging.Context(t)
data := blobtesting.DataMap{}

bm := newContentManagerForTesting(ctx, t, data, false)

mgr, err := NewManager(ctx, bm, ManagerOptions{}, nil)
require.NoError(t, err, "getting initial manifest manager")

for i := 0; i < 100; i++ {
item1 := map[string]int{"foo": 1, "bar": 2}
labels1 := map[string]string{"type": "item", "color": "red"}

_, err = mgr.Put(ctx, labels1, item1)
require.NoError(t, err, "adding item to manifest manager")

require.NoError(t, mgr.Flush(ctx))
require.NoError(t, mgr.b.Flush(ctx))
}

// Opening another instance of the manager should cause the manifest manager
// to attempt to compact things.
bm = newContentManagerForTesting(ctx, t, data, true)

mgr, err = NewManager(ctx, bm, ManagerOptions{}, nil)
require.NoError(t, err, "getting other instance of manifest manager")

_, err = mgr.Find(ctx, map[string]string{"color": "red"})
assert.NoError(t, err, "forcing reload of manifest manager")
}

0 comments on commit af08a41

Please sign in to comment.