Skip to content

Commit

Permalink
kgo-repeater: add additional error handling for txn to inc stability
Browse files Browse the repository at this point in the history
  • Loading branch information
ballard26 committed Dec 1, 2022
1 parent 3c01706 commit 18aca5e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
23 changes: 23 additions & 0 deletions pkg/worker/repeater/repeater_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
_ "net/http/pprof"
Expand All @@ -29,6 +30,7 @@ import (

"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/redpanda-data/kgo-verifier/pkg/util"
Expand Down Expand Up @@ -381,6 +383,14 @@ loop:

if v.transactionsEnabled {
if _, err := v.transactionSTM.BeforeMessageSent(); err != nil {
if errors.Is(err, kerr.OperationNotAttempted) {
// Try to recover this producer by rolling back the transaction.
err = v.transactionSTM.TryRollbackTransaction()
if err == nil {
continue
}
}

log.Errorf("Produce error; transaction failure: %v", err)
break loop
}
Expand Down Expand Up @@ -430,6 +440,18 @@ loop:
// consumer needs logic to handle the unexpected token
log.Debugf("Produce %s acked %d on partition %d offset %d", v.config.workerCfg.Name, token, r.Partition, r.Offset)
if err != nil {
// For transactions an INVALID_TXN_STATE is encountered often while restarting nodes
// Try to be tolerant of this error.
// TODO: Is there a way to avoid this?
if v.transactionsEnabled && (errors.Is(err, kerr.OperationNotAttempted) || errors.Is(err, kerr.InvalidTxnState)) {
err = v.transactionSTM.TryRollbackTransaction()
if err == nil {
v.pending <- token
ackWait.Done()
return
}
}

// On produce error, we drop the token: we rely on producer errors
// being rare and/or a background Tuner re-injecting fresh tokens
log.Errorf("Produce %s error, dropped token %d: %v", v.config.workerCfg.Name, token, err)
Expand All @@ -440,6 +462,7 @@ loop:
v.globalStats.Ack_latency.Update(ackLatency.Microseconds())
v.totalProduced += 1
}

ackWait.Done()
}

Expand Down
19 changes: 19 additions & 0 deletions pkg/worker/transaction_stm.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ func (t *TransactionSTM) TryEndTransaction() error {
return nil
}

// If an OPERATION_NOT_ATTEMPTED error occurs on the producer it
// can sometimes recover by rolling back the current transaction.
func (t *TransactionSTM) TryRollbackTransaction() error {
if err := t.client.AbortBufferedRecords(t.ctx); err != nil {
log.Errorf("Error aborting buffered records: %v", err)
return err
}
if err := t.client.EndTransaction(t.ctx, kgo.TryAbort); err != nil {
log.Errorf("Error rolling back transaction: %v", err)
return err
}

log.Debugf("Rolled back a transaction; currentMgsProduced = %d aborted = %t", t.currentMgsProduced, t.abortedTransaction)

t.currentMgsProduced = 0
t.activeTransaction = false
return nil
}

// Returns true iff a new transaction was started and/or a current
// transaction ended. This is to notify any producers that control
// markers will be added to a partition's log.
Expand Down

0 comments on commit 18aca5e

Please sign in to comment.