Skip to content

Commit

Permalink
Add explicit checks for assumptions in robustness test validation
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Apr 12, 2024
1 parent bfbfee0 commit f8de338
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 2 deletions.
9 changes: 7 additions & 2 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
t.Fatal(err)
}
defer cc.Close()
// Ensure that first operation succeeds
_, err = cc.Put(ctx, "start", "true")
if err != nil {
t.Fatalf("First operation failed, validation requires first operation to succeed, err: %s", err)
}
wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
for i := 0; i < profile.ClientCount; i++ {
Expand All @@ -85,11 +90,11 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
wg.Wait()
endTime := time.Now()

// Ensure that last operation is succeeds
time.Sleep(time.Second)
// Ensure that last operation succeeds
_, err = cc.Put(ctx, "tombstone", "true")
if err != nil {
t.Error(err)
t.Fatalf("Last operation failed, validation requires last operation to succeed, err: %s", err)
}
reports = append(reports, cc.Report())

Expand Down
147 changes: 147 additions & 0 deletions tests/robustness/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ import (

// ValidateAndReturnVisualize returns visualize as porcupine.linearizationInfo used to generate visualization is private.
func ValidateAndReturnVisualize(t *testing.T, lg *zap.Logger, cfg Config, reports []report.ClientReport, timeout time.Duration) (visualize func(basepath string) error) {
err := checkValidationAssumptions(reports)
if err != nil {
t.Fatalf("Broken validation assumptions: %s", err)
}
patchedOperations := patchedOperationHistory(reports)
linearizable, visualize := validateLinearizableOperationsAndVisualize(lg, patchedOperations, timeout)
if linearizable != porcupine.Ok {
Expand All @@ -52,6 +56,149 @@ type Config struct {
ExpectRevisionUnique bool
}

func checkValidationAssumptions(reports []report.ClientReport) error {
err := validatePutOperationUnique(reports)
if err != nil {
return err
}
err = validateEmptyDatabaseAtStart(reports)
if err != nil {
return err
}
err = validateLastOperationAndObservedInWatch(reports)
if err != nil {
return err
}
err = validateObservedAllRevisionsInWatch(reports)
if err != nil {
return err
}
err = validateNonConcurrentClientRequests(reports)
if err != nil {
return err
}
return nil
}

func validatePutOperationUnique(reports []report.ClientReport) error {
type KV struct {
Key string
Value model.ValueOrHash
}
putValue := map[KV]struct{}{}
for _, r := range reports {
for _, op := range r.KeyValue {
request := op.Input.(model.EtcdRequest)
if request.Type != model.Txn {
continue
}
for _, op := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
if op.Type != model.PutOperation {
continue
}
kv := KV{
Key: op.Put.Key,
Value: op.Put.Value,
}
if _, ok := putValue[kv]; ok {
return fmt.Errorf("non unique put %v, required to patch operation history", kv)
}
putValue[kv] = struct{}{}
}
}
}
return nil
}

func validateEmptyDatabaseAtStart(reports []report.ClientReport) error {
for _, r := range reports {
for _, op := range r.KeyValue {
request := op.Input.(model.EtcdRequest)
response := op.Output.(model.MaybeEtcdResponse)
if response.Revision == 2 && !request.IsRead() {
return nil
}
}
}
return fmt.Errorf("non empty database at start or first write didn't succeed, required by model implementation")
}

func validateLastOperationAndObservedInWatch(reports []report.ClientReport) error {
var lastOperation porcupine.Operation

for _, r := range reports {
for _, op := range r.KeyValue {
if op.Call > lastOperation.Call {
lastOperation = op
}
}
}
lastResponse := lastOperation.Output.(model.MaybeEtcdResponse)
if lastResponse.PartialResponse || lastResponse.Error != "" {
return fmt.Errorf("last operation %v failed, its success is required to validate watch", lastOperation)
}
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
if e.Revision == lastResponse.Revision {
return nil
}
}
}
}
}
return fmt.Errorf("revision from the last operation %d was not observed in watch, required to validate watch", lastResponse.Revision)
}

func validateObservedAllRevisionsInWatch(reports []report.ClientReport) error {
var maxRevision int64
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
if e.Revision > maxRevision {
maxRevision = e.Revision
}
}
}
}
}
observedRevisions := make([]bool, maxRevision+1)
for _, r := range reports {
for _, watch := range r.Watch {
for _, watchResp := range watch.Responses {
for _, e := range watchResp.Events {
observedRevisions[e.Revision] = true
}
}
}
}
for i := 2; i < len(observedRevisions); i++ {
if !observedRevisions[i] {
return fmt.Errorf("didn't observe revision %d in watch, required to patch operation and validate serializable requests", i)
}
}
return nil
}

func validateNonConcurrentClientRequests(reports []report.ClientReport) error {
lastClientRequestReturn := map[int]int64{}
for _, r := range reports {
for _, op := range r.KeyValue {
lastRequest := lastClientRequestReturn[op.ClientId]
if op.Call <= lastRequest {
return fmt.Errorf("client %d has concurrent request, required for operation linearization", op.ClientId)
}
if op.Return <= op.Call {
return fmt.Errorf("operation %v ends before it starts, required for operation linearization", op)
}
lastClientRequestReturn[op.ClientId] = op.Return
}
}
return nil
}

func mergeWatchEventHistory(reports []report.ClientReport) ([]model.PersistedEvent, error) {
type revisionEvents struct {
events []model.PersistedEvent
Expand Down

0 comments on commit f8de338

Please sign in to comment.