Skip to content

Commit

Permalink
Merge pull request #16136 from serathius/robustness-watch-validation
Browse files Browse the repository at this point in the history
Robustness watch validation
  • Loading branch information
serathius authored Jun 26, 2023
2 parents 1967b8e + 09c462e commit d3233fe
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 21 deletions.
16 changes: 16 additions & 0 deletions tests/robustness/model/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model

import (
"fmt"
"strings"
)

func NewReplay(eventHistory []WatchEvent) *EtcdReplay {
Expand Down Expand Up @@ -106,3 +107,18 @@ type Event struct {
Key string
Value ValueOrHash
}

func (e Event) Match(request WatchRequest) bool {
if request.WithPrefix {
return strings.HasPrefix(e.Key, request.Key)
} else {
return e.Key == request.Key
}
}

type WatchRequest struct {
Key string
Revision int64
WithPrefix bool
WithProgressNotify bool
}
13 changes: 3 additions & 10 deletions tests/robustness/traffic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,10 @@ type RecordingClient struct {
}

type WatchOperation struct {
Request WatchRequest
Request model.WatchRequest
Responses []WatchResponse
}

type WatchRequest struct {
Key string
Revision int64
WithPrefix bool
WithProgressNotify bool
}

type WatchResponse struct {
Events []model.WatchEvent
IsProgressNotify bool
Expand Down Expand Up @@ -228,7 +221,7 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR
}

func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool) clientv3.WatchChan {
request := WatchRequest{
request := model.WatchRequest{
Key: key,
Revision: rev,
WithPrefix: withPrefix,
Expand All @@ -238,7 +231,7 @@ func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, with

}

func (c *RecordingClient) watch(ctx context.Context, request WatchRequest) clientv3.WatchChan {
func (c *RecordingClient) watch(ctx context.Context, request model.WatchRequest) clientv3.WatchChan {
ops := []clientv3.OpOption{}
if request.WithPrefix {
ops = append(ops, clientv3.WithPrefix())
Expand Down
2 changes: 0 additions & 2 deletions tests/robustness/validate/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ func validateOperationsAndVisualize(t *testing.T, lg *zap.Logger, operations []p
t.Fatalf("Unknown Linearization")
}
lg.Info("Validating serializable operations")
// TODO: Use linearization result instead of event history to get order of events
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
validateSerializableOperations(t, operations, eventHistory)
return visualize
}
Expand Down
70 changes: 61 additions & 9 deletions tests/robustness/validate/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ func validateWatch(t *testing.T, cfg Config, reports []traffic.ClientReport) []m
validateOrdered(t, r)
validateUnique(t, cfg.ExpectRevisionUnique, r)
validateAtomic(t, r)
// TODO: Validate Resumable
validateBookmarkable(t, r)
}
// TODO: Use linearization result instead of event history to get order of events
// This is currently impossible as porcupine doesn't expose operation order created during linearization.
eventHistory := mergeWatchEventHistory(t, reports)
// TODO: Validate that each watch report is reliable, not only the longest one.
validateReliable(t, eventHistory)
for _, r := range reports {
validateReliable(t, eventHistory, r)
validateResumable(t, eventHistory, r)
}
return eventHistory
}

Expand Down Expand Up @@ -107,14 +110,61 @@ func validateAtomic(t *testing.T, report traffic.ClientReport) {
}
}

func validateReliable(t *testing.T, events []model.WatchEvent) {
var lastEventRevision int64 = 1
for _, event := range events {
if event.Revision > lastEventRevision && event.Revision != lastEventRevision+1 {
t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, missing revisions from range: %d-%d", lastEventRevision, event.Revision)
func validateReliable(t *testing.T, events []model.WatchEvent, report traffic.ClientReport) {
for _, op := range report.Watch {
index := 0
revision := firstRevision(op)
for index < len(events) && events[index].Revision < revision {
index++
}
if index == len(events) {
continue
}
for _, resp := range op.Responses {
for _, event := range resp.Events {
if events[index].Match(op.Request) && events[index] != event {
t.Errorf("Broke watch guarantee: Reliable - a sequence of events will never drop any subsequence of events; if there are events ordered in time as a < b < c, then if the watch receives events a and c, it is guaranteed to receive b, event missing: %+v", events[index])
}
index++
}
}
}
}

func validateResumable(t *testing.T, events []model.WatchEvent, report traffic.ClientReport) {
for _, op := range report.Watch {
index := 0
revision := op.Request.Revision
for index < len(events) && (events[index].Revision < revision || !events[index].Match(op.Request)) {
index++
}
if index == len(events) {
continue
}
firstEvent := firstWatchEvent(op)
// If watch is resumable, first event it gets should the first event that happened after the requested revision.
if firstEvent != nil && events[index] != *firstEvent {
t.Errorf("Resumable - A broken watch can be resumed by establishing a new watch starting after the last revision received in a watch event before the break, so long as the revision is in the history window, watch request: %+v, event missing: %+v", op.Request, events[index])
}
}
}

func firstRevision(op traffic.WatchOperation) int64 {
for _, resp := range op.Responses {
for _, event := range resp.Events {
return event.Revision
}
}
return 0
}

func firstWatchEvent(op traffic.WatchOperation) *model.WatchEvent {
for _, resp := range op.Responses {
for _, event := range resp.Events {
return &event
}
lastEventRevision = event.Revision
}
return nil
}

func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []model.WatchEvent {
Expand All @@ -135,6 +185,8 @@ func mergeWatchEventHistory(t *testing.T, reports []traffic.ClientReport) []mode
events = append(events, event)
} else {
if prev, found := revisionToEvents[lastRevision]; found {
// This assumes that there are txn that would be observed differently by two watches.
// TODO: Implement merging events from multiple watches about single revision based on operations.
if diff := cmp.Diff(prev.events, events); diff != "" {
t.Errorf("Events between clients %d and %d don't match, revision: %d, diff: %s", prev.clientId, lastClientId, lastRevision, diff)
}
Expand Down

0 comments on commit d3233fe

Please sign in to comment.