Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch Progress Request #212

Merged
merged 17 commits into from
Jan 13, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
little nits
louiseschmidtgen committed Jan 13, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
commit 83cd6b490ff26eb8ebe56326b4daf5d047f69174
28 changes: 8 additions & 20 deletions pkg/kine/server/watch.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
}
if cr := msg.GetCancelRequest(); cr != nil {
logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId)
w.Cancel(cr.WatchId, nil, ws.Context())
w.Cancel(cr.WatchId, nil)
}
if pr := msg.GetProgressRequest(); pr != nil {
w.Progress(ws.Context())
@@ -51,11 +51,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {

// pollProgressNotify periodically sends progress notifications to all watchers.
func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration) {
ch := make(chan struct{}, 1)

go func() {
defer close(ch)

tick := time.NewTicker(interval)
defer tick.Stop()

@@ -64,15 +60,8 @@ func (w *watcher) pollProgressNotify(ctx context.Context, interval time.Duration
case <-ctx.Done():
return
case <-tick.C:
// Skip this tick if ProgressIfSynced is still running.
select {
case ch <- struct{}{}:
if err := w.ProgressIfSynced(ctx); err != nil {
logrus.Errorf("Failed to send progress notification: %v", err)
}
<-ch
default:
logrus.Warn("Skipping progress notification: still busy.")
if err := w.ProgressIfSynced(ctx); err != nil {
logrus.Errorf("Failed to send progress notification: %v", err)
}
}
}
@@ -122,14 +111,14 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
Created: true,
WatchId: id,
}); err != nil {
w.Cancel(id, err, ctx)
w.Cancel(id, err)
return
}

watchCh, err := w.backend.Watch(ctx, key, startRevision)
if err != nil {
logrus.Errorf("Failed to start watch: %v", err)
w.Cancel(id, err, ctx)
w.Cancel(id, err)
return
}

@@ -184,7 +173,7 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
}
logrus.Tracef("WATCH SEND id=%d, key=%s, revision=%d, events=%d, size=%d, reads=%d", id, key, revision, len(wr.Events), wr.Size(), reads)
if err := w.server.Send(wr); err != nil {
w.Cancel(id, err, ctx)
w.Cancel(id, err)
}
}
}
@@ -216,7 +205,7 @@ func toEvent(event *Event) *mvccpb.Event {
return e
}

func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) {
func (w *watcher) Cancel(watchID int64, err error) {
w.Lock()
if progressCh, ok := w.progress[watchID]; ok {
close(progressCh)
@@ -236,10 +225,9 @@ func (w *watcher) Cancel(watchID int64, err error, ctx context.Context) {
if err == ErrCompacted {
// the requested start revision is compacted. Pass the current and and compact
// revision to the client via the cancel response, along with the correct error message.
compactRev, revision, err = w.backend.GetCompactRevision(ctx)
compactRev, revision, err = w.backend.GetCompactRevision(w.server.Context())
if err != nil {
logrus.Errorf("Failed to get compact and current revision for cancel response: %v", err)
compactRev = 0
}
}
}
10 changes: 5 additions & 5 deletions pkg/kine/sqllog/sqllog.go
Original file line number Diff line number Diff line change
@@ -193,7 +193,7 @@ func (s *SQLLog) DoCompact(ctx context.Context) (err error) {
// small batches. Given that this logic runs every second,
// on regime it should take usually just a couple batches
// to keep the pace.
start, target, err := s.GetCompactRevision(ctx)
start, target, err := s.d.GetCompactRevision(ctx)
if err != nil {
return err
}
@@ -238,7 +238,7 @@ func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64
attribute.Int64("limit", limit),
)

compactRevision, currentRevision, err := s.GetCompactRevision(ctx)
compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
@@ -275,7 +275,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
attribute.Int64("revision", revision),
)

compactRevision, currentRevision, err := s.GetCompactRevision(ctx)
compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx)
if err != nil {
return 0, nil, err
}
@@ -445,7 +445,7 @@ func (s *SQLLog) startWatch(ctx context.Context) (chan []*server.Event, error) {
return nil, err
}

pollStart, _, err := s.GetCompactRevision(ctx)
pollStart, _, err := s.d.GetCompactRevision(ctx)
if err != nil {
return nil, err
}
@@ -603,7 +603,7 @@ func (s *SQLLog) Count(ctx context.Context, prefix, startKey string, revision in
attribute.Int64("revision", revision),
)

compactRevision, currentRevision, err := s.GetCompactRevision(ctx)
compactRevision, currentRevision, err := s.d.GetCompactRevision(ctx)
if err != nil {
return 0, 0, err
}