Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch sync refactor #18976

Merged
merged 7 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/storage/mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}))
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, b)

w := s.NewWatchStream()
Expand Down
13 changes: 8 additions & 5 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,18 @@ type watchableStore struct {
wg sync.WaitGroup
}

var _ WatchableKV = (*watchableStore)(nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!


// cancelFunc updates unsynced and synced maps when running
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, cfg)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
s := newWatchableStore(lg, b, le, cfg)
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
Expand All @@ -95,9 +101,6 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg S
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
return s
}

Expand Down
22 changes: 4 additions & 18 deletions server/storage/mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) {

func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
defer cleanup(s, be)

k := []byte("testkey")
Expand Down Expand Up @@ -122,21 +122,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) {
// we should put to simulate the real-world use cases.
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore periodically calls syncWatchersLoop
// method to sync watchers in unsynced map. We want to keep watchers
// in unsynced for this benchmark.
ws := &watchableStore{
store: s,
unsynced: newWatcherGroup(),

// to make the test not crash from assigning to nil map.
// 'synced' doesn't get populated in this test.
synced: newWatcherGroup(),
stopc: make(chan struct{}),
}
ws := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(ws, be)

Expand All @@ -146,7 +132,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
// and force watchers to be in unsynced.
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue, lease.NoLease)
ws.Put(testKey, testValue, lease.NoLease)

w := ws.NewWatchStream()
defer w.Close()
Expand Down Expand Up @@ -178,7 +164,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, _ := betesting.NewDefaultTmpBackend(b)
s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})
s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{})

defer cleanup(s, be)

Expand Down
Loading