diff --git a/pkg/cmd/drt-run/config/drt-chaos.yaml b/pkg/cmd/drt-run/config/drt-chaos.yaml index 0f6a35ecd753..35f4ea144ad3 100644 --- a/pkg/cmd/drt-run/config/drt-chaos.yaml +++ b/pkg/cmd/drt-run/config/drt-chaos.yaml @@ -4,36 +4,36 @@ workloads: steps: - command: run args: - - --warehouses 12000 - - --active-warehouses 1700 - - --db cct_tpcc + - --warehouses=12000 + - --active-warehouses=1700 + - --db=cct_tpcc - --secure - - --ramp 10m - - --display-every 5s - - --duration 12h - - --prometheus-port 2112 - - --user cct_tpcc_user + - --ramp=10m + - --display-every=5s + - --duration=12h + - --prometheus-port=2112 + - --user=cct_tpcc_user - --tolerate-errors - - --password tpcc + - --password=tpcc - name: kv-main kind: kv steps: - command: run args: - - --concurrency 8 - - --histograms kv/stats.json - - --db kv - - --splits 500 - - --read-percent 50 - - --cycle-length 100000 - - --min-block-bytes 100 - - --max-block-bytes 1000 - - --max-rate 120 + - --concurrency=8 + - --histograms=kv/stats.json + - --db=kv + - --splits=500 + - --read-percent=50 + - --cycle-length=100000 + - --min-block-bytes=100 + - --max-block-bytes=1000 + - --max-rate=120 - --secure - - --prometheus-port 2114 - - --ramp 10m - - --display-every 5s - - --duration 12h + - --prometheus-port=2114 + - --ramp=10m + - --display-every=5s + - --duration=12h - --tolerate-errors - --enum operations: diff --git a/pkg/cmd/drt-run/event.go b/pkg/cmd/drt-run/event.go index c80346edfd7c..e2f4fec7ea4a 100644 --- a/pkg/cmd/drt-run/event.go +++ b/pkg/cmd/drt-run/event.go @@ -119,16 +119,18 @@ func (l *eventLogger) logWorkloadEvent(ev Event, workloadIdx int) { ev.Source = SourceWorkload we := &l.workloadEvents[workloadIdx] - we.mu.Lock() - idx := we.mu.startIdx - we.mu.startIdx = (we.mu.startIdx + 1) % perWorkloadEventRetention - if we.mu.numEvents != len(we.events) { - we.mu.numEvents++ - } - we.mu.Unlock() - + var idx int + func() { + we.mu.Lock() + defer we.mu.Unlock() + idx = we.mu.startIdx + we.mu.startIdx = (we.mu.startIdx + 1) % perWorkloadEventRetention + if we.mu.numEvents != len(we.events) { + we.mu.numEvents++ + } + }() we.events[idx] = ev - io.WriteString(l.outputFile, ev.String()+"\n") + _, _ = io.WriteString(l.outputFile, ev.String()+"\n") } // logOperationEvent logs an event in the operation log. Up to operationEventRetention @@ -137,16 +139,18 @@ func (l *eventLogger) logOperationEvent(ev Event) { ev.Timestamp = timeutil.Now() ev.Source = SourceOperation - l.mu.Lock() - idx := l.mu.operationStartIdx - l.mu.operationStartIdx = (l.mu.operationStartIdx + 1) % operationEventRetention - if l.mu.operationEvents != len(l.operationEvents) { - l.mu.operationEvents++ - } - l.mu.Unlock() - + var idx int + func() { + l.mu.Lock() + defer l.mu.Unlock() + idx = l.mu.operationStartIdx + l.mu.operationStartIdx = (l.mu.operationStartIdx + 1) % operationEventRetention + if l.mu.operationEvents != len(l.operationEvents) { + l.mu.operationEvents++ + } + }() l.operationEvents[idx] = ev - io.WriteString(l.outputFile, ev.String()+"\n") + _, _ = io.WriteString(l.outputFile, ev.String()+"\n") } // getWorkloadEvents returns workload events for a given workload worker. Up to diff --git a/pkg/cmd/drt-run/http.go b/pkg/cmd/drt-run/http.go index 07390ade57a7..b9b577acd473 100644 --- a/pkg/cmd/drt-run/http.go +++ b/pkg/cmd/drt-run/http.go @@ -100,7 +100,7 @@ func (h *httpHandler) serve(rw http.ResponseWriter, req *http.Request) { fmt.Fprintf(rw, "

Configuration


\n") fmt.Fprintf(rw, "
")
 	encoder := yaml.NewEncoder(rw)
-	encoder.Encode(h.w.config)
+	_ = encoder.Encode(h.w.config)
 	fmt.Fprintf(rw, "
") fmt.Fprintf(rw, "\n") diff --git a/pkg/cmd/drt-run/main.go b/pkg/cmd/drt-run/main.go index a01b7a24b700..ad702d2268bf 100644 --- a/pkg/cmd/drt-run/main.go +++ b/pkg/cmd/drt-run/main.go @@ -91,7 +91,7 @@ func runDRT(configFile string) (retErr error) { o: or, eventL: eventL, } - hh.startHTTPServer(8080, "localhost") + _ = hh.startHTTPServer(8080, "localhost") or.Run(ctx) return nil diff --git a/pkg/cmd/drt-run/operations.go b/pkg/cmd/drt-run/operations.go index c0b044564ad1..bcf33c994013 100644 --- a/pkg/cmd/drt-run/operations.go +++ b/pkg/cmd/drt-run/operations.go @@ -108,54 +108,57 @@ func (r *opsRunner) pickOperation( setIdx := rng.Intn(len(r.specs)) opSpecsSet := r.specs[setIdx] opSpec := &opSpecsSet[rng.Intn(len(opSpecsSet))] - r.mu.Lock() - if r.mu.lockOutOperations { - r.mu.completed.Wait() - r.mu.Unlock() - continue - } + shouldContinue := func() bool { + r.mu.Lock() + defer r.mu.Unlock() + if r.mu.lockOutOperations { + r.mu.completed.Wait() + return true + } - lastRun := r.mu.lastRun[opSpec.Name] - eligibleForNextRun := lastRun.Add(r.config.Operations.Sets[setIdx].Cadence) + lastRun := r.mu.lastRun[opSpec.Name] + eligibleForNextRun := lastRun.Add(r.config.Operations.Sets[setIdx].Cadence) - if timeutil.Now().Compare(eligibleForNextRun) < 0 { - // Find another operation to run. - r.mu.completed.Wait() - r.mu.Unlock() - continue - } - // Ratchet lastRun forward. - r.mu.lastRun[opSpec.Name] = timeutil.Now() - r.mu.runningOperations[workerIdx] = opSpec.Name - - // See what level of isolation this operation requires - // from other operations. - switch opSpec.CanRunConcurrently { - case registry.OperationCanRunConcurrently: - // Nothing to do. - case registry.OperationCannotRunConcurrently: - r.mu.lockOutOperations = true - fallthrough - case registry.OperationCannotRunConcurrentlyWithItself: - for otherOpsRunning := true; otherOpsRunning; { - otherOpsRunning = false - for i := range r.mu.runningOperations { - if i == workerIdx { - continue + if timeutil.Now().Compare(eligibleForNextRun) < 0 { + // Find another operation to run. + r.mu.completed.Wait() + return true + } + // Ratchet lastRun forward. + r.mu.lastRun[opSpec.Name] = timeutil.Now() + r.mu.runningOperations[workerIdx] = opSpec.Name + + // See what level of isolation this operation requires + // from other operations. + switch opSpec.CanRunConcurrently { + case registry.OperationCanRunConcurrently: + // Nothing to do. + case registry.OperationCannotRunConcurrently: + r.mu.lockOutOperations = true + fallthrough + case registry.OperationCannotRunConcurrentlyWithItself: + for otherOpsRunning := true; otherOpsRunning; { + otherOpsRunning = false + for i := range r.mu.runningOperations { + if i == workerIdx { + continue + } + if r.mu.runningOperations[i] != "" && + (opSpec.CanRunConcurrently != registry.OperationCannotRunConcurrentlyWithItself || r.mu.runningOperations[i] == opSpec.Name) { + otherOpsRunning = true + break + } } - if r.mu.runningOperations[i] != "" && - (opSpec.CanRunConcurrently != registry.OperationCannotRunConcurrentlyWithItself || r.mu.runningOperations[i] == opSpec.Name) { - otherOpsRunning = true - break + if otherOpsRunning { + r.mu.completed.Wait() } } - if otherOpsRunning { - r.mu.completed.Wait() - } } + return false + }() + if shouldContinue { + continue } - - r.mu.Unlock() return opSpec } } @@ -215,7 +218,7 @@ func (r *opsRunner) runOperation( }) return } - cmd.Start() + _ = cmd.Start() wg.Add(2) // Spin up goroutines to read stdout and stderr, and pipe them // into the event logger. @@ -255,12 +258,12 @@ func (r *opsRunner) runOperation( }) r.mu.Lock() + defer r.mu.Unlock() r.mu.runningOperations[workerIdx] = "" if opSpec.CanRunConcurrently == registry.OperationCannotRunConcurrently { r.mu.lockOutOperations = false } r.mu.completed.Broadcast() - r.mu.Unlock() } // runWorker manages the infinite loop for one operation runner worker. diff --git a/pkg/cmd/drt-run/workloads.go b/pkg/cmd/drt-run/workloads.go index 475474541a51..ddff7e3d17e6 100644 --- a/pkg/cmd/drt-run/workloads.go +++ b/pkg/cmd/drt-run/workloads.go @@ -103,7 +103,7 @@ func (w *workloadRunner) runWorkloadStep( } }() - cmd.Start() + _ = cmd.Start() if err := cmd.Wait(); err != nil { w.errChan <- err return diff --git a/pkg/cmd/roachtest/operations/node_kill.go b/pkg/cmd/roachtest/operations/node_kill.go index 1ed89b716936..ffc4e6cdbdfe 100644 --- a/pkg/cmd/roachtest/operations/node_kill.go +++ b/pkg/cmd/roachtest/operations/node_kill.go @@ -103,7 +103,7 @@ func registerNodeKill(r registry.Registry) { Owner: registry.OwnerServer, Timeout: 15 * time.Minute, CompatibleClouds: registry.AllClouds, - CanRunConcurrently: registry.OperationCannotRunConcurrentlyWithItself, + CanRunConcurrently: registry.OperationCannotRunConcurrently, Dependencies: []registry.OperationDependency{registry.OperationRequiresZeroUnderreplicatedRanges}, Run: nodeKillRunner(9 /* signal */, true /* drain */), }) @@ -112,7 +112,7 @@ func registerNodeKill(r registry.Registry) { Owner: registry.OwnerServer, Timeout: 10 * time.Minute, CompatibleClouds: registry.AllClouds, - CanRunConcurrently: registry.OperationCannotRunConcurrentlyWithItself, + CanRunConcurrently: registry.OperationCannotRunConcurrently, Dependencies: []registry.OperationDependency{registry.OperationRequiresZeroUnderreplicatedRanges}, Run: nodeKillRunner(9 /* signal */, false /* drain */), }) @@ -121,7 +121,7 @@ func registerNodeKill(r registry.Registry) { Owner: registry.OwnerServer, Timeout: 15 * time.Minute, CompatibleClouds: registry.AllClouds, - CanRunConcurrently: registry.OperationCannotRunConcurrentlyWithItself, + CanRunConcurrently: registry.OperationCannotRunConcurrently, Dependencies: []registry.OperationDependency{registry.OperationRequiresZeroUnderreplicatedRanges}, Run: nodeKillRunner(15 /* signal */, true /* drain */), })