Skip to content

Commit

Permalink
Merge pull request #16129 from serathius/robustness-fix-patch-txn-onf…
Browse files Browse the repository at this point in the history
…ailure

Robustness fix patch txn onfailure
  • Loading branch information
serathius committed Jun 24, 2023
2 parents 9ff3e66 + 1f6e110 commit 31b20ef
Show file tree
Hide file tree
Showing 3 changed files with 389 additions and 31 deletions.
47 changes: 41 additions & 6 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,34 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

func patchedOperationHistory(reports []traffic.ClientReport) []porcupine.Operation {
allOperations := operations(reports)
uniqueEvents := uniqueWatchEvents(reports)
return patchOperationsWithWatchEvents(allOperations, uniqueEvents)
}

func operations(reports []traffic.ClientReport) []porcupine.Operation {
var ops []porcupine.Operation
for _, r := range reports {
ops = append(ops, r.OperationHistory.Operations()...)
}
return ops
}

func uniqueWatchEvents(reports []traffic.ClientReport) map[model.Event]traffic.TimedWatchEvent {
persisted := map[model.Event]traffic.TimedWatchEvent{}
for _, r := range reports {
for _, resp := range r.Watch {
for _, event := range resp.Events {
persisted[event.Event] = traffic.TimedWatchEvent{Time: resp.Time, WatchEvent: event}
}
}
}
return persisted
}

func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvents map[model.Event]traffic.TimedWatchEvent) []porcupine.Operation {

newOperations := make([]porcupine.Operation, 0, len(operations))
lastObservedOperation := lastOperationObservedInWatch(operations, watchEvents)

Expand All @@ -41,8 +68,8 @@ func patchOperationsWithWatchEvents(operations []porcupine.Operation, watchEvent
newOperations = append(newOperations, op)
continue
}
if hasNonUniqueWriteOperation(request.Txn) && !hasUniqueWriteOperation(request.Txn) {
// Leave operation as it is as we cannot match non-unique operations to watch events.
if !canBeDiscarded(request.Txn) {
// Leave operation as it is as we cannot discard it.
newOperations = append(newOperations, op)
continue
}
Expand Down Expand Up @@ -84,17 +111,25 @@ func matchWatchEvent(request *model.TxnRequest, watchEvents map[model.Event]traf
return nil
}

func hasNonUniqueWriteOperation(request *model.TxnRequest) bool {
for _, etcdOp := range request.OperationsOnSuccess {
func canBeDiscarded(request *model.TxnRequest) bool {
return operationsCanBeDiscarded(request.OperationsOnSuccess) && operationsCanBeDiscarded(request.OperationsOnFailure)
}

func operationsCanBeDiscarded(ops []model.EtcdOperation) bool {
return hasUniqueWriteOperation(ops) || !hasWriteOperation(ops)
}

func hasWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation || etcdOp.Type == model.DeleteOperation {
return true
}
}
return false
}

func hasUniqueWriteOperation(request *model.TxnRequest) bool {
for _, etcdOp := range request.OperationsOnSuccess {
func hasUniqueWriteOperation(ops []model.EtcdOperation) bool {
for _, etcdOp := range ops {
if etcdOp.Type == model.PutOperation {
return true
}
Expand Down
Loading

0 comments on commit 31b20ef

Please sign in to comment.