Skip to content

Commit

Permalink
multinode: remove generic result type
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Jan 22, 2025
1 parent f73e2f8 commit efd7093
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 107 deletions.
93 changes: 46 additions & 47 deletions multinode/transaction_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,55 @@ var (
}, []string{"network", "chainId", "invariant"})
)

type SendTxResult interface {
Code() SendTxReturnCode
Error() error
type sendTxResult struct {
code SendTxReturnCode
error error
}

const sendTxQuorum = 0.7

// SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction
type SendTxRPCClient[TX any, RESULT SendTxResult] interface {
type SendTxRPCClient[TX any] interface {
// SendTransaction errors returned should include name or other unique identifier of the RPC
SendTransaction(ctx context.Context, tx TX) RESULT
SendTransaction(ctx context.Context, tx TX) (SendTxReturnCode, error)
}

func NewTransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]](
func NewTransactionSender[TX any, CHAIN_ID ID, RPC SendTxRPCClient[TX]](
lggr logger.Logger,
chainID CHAIN_ID,
chainFamily string,
multiNode *MultiNode[CHAIN_ID, RPC],
newResult func(err error) RESULT,
classifyErr func(err error) SendTxReturnCode,
sendTxSoftTimeout time.Duration,
) *TransactionSender[TX, RESULT, CHAIN_ID, RPC] {
) *TransactionSender[TX, CHAIN_ID, RPC] {
if sendTxSoftTimeout == 0 {
sendTxSoftTimeout = QueryTimeout / 2
}
return &TransactionSender[TX, RESULT, CHAIN_ID, RPC]{
return &TransactionSender[TX, CHAIN_ID, RPC]{
chainID: chainID,
chainFamily: chainFamily,
lggr: logger.Sugared(lggr).Named("TransactionSender").With("chainID", chainID.String()),
multiNode: multiNode,
newResult: newResult,
classifyErr: classifyErr,
sendTxSoftTimeout: sendTxSoftTimeout,
chStop: make(services.StopChan),
}
}

type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]] struct {
type TransactionSender[TX any, CHAIN_ID ID, RPC SendTxRPCClient[TX]] struct {
services.StateMachine
chainID CHAIN_ID
chainFamily string
lggr logger.SugaredLogger
multiNode *MultiNode[CHAIN_ID, RPC]
newResult func(err error) RESULT
classifyErr func(err error) SendTxReturnCode
sendTxSoftTimeout time.Duration // defines max waiting time from first response til responses evaluation

wg sync.WaitGroup // waits for all reporting goroutines to finish
chStop services.StopChan
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name() string {
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Name() string {
return txSender.lggr.Name()
}

Expand All @@ -93,17 +93,16 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name() string {
// * If there is at least one terminal error - returns terminal error
// * If there is both success and terminal error - returns success and reports invariant violation
// * Otherwise, returns any (effectively random) of the errors.
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT {
var result RESULT
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (code SendTxReturnCode, err error) {
ctx, cancel := txSender.chStop.Ctx(ctx)
defer cancel()
if !txSender.IfStarted(func() {
txResults := make(chan RESULT)
txResultsToReport := make(chan RESULT)
txResults := make(chan sendTxResult)
txResultsToReport := make(chan sendTxResult)
primaryNodeWg := sync.WaitGroup{}

healthyNodesNum := 0
err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) {
err = txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) {
if isSendOnly {
txSender.wg.Add(1)
go func(ctx context.Context) {
Expand Down Expand Up @@ -151,45 +150,44 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ct
close(txResults)
}()

if err != nil {
result = txSender.newResult(err)
return
if err == nil && healthyNodesNum == 0 {
err = ErrNodeError
}

if healthyNodesNum == 0 {
result = txSender.newResult(ErrNodeError)
if err != nil {
code = txSender.classifyErr(err)
return
}

txSender.wg.Add(1)
go txSender.reportSendTxAnomalies(tx, txResultsToReport)

result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults)
code, err = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults)
}) {
result = txSender.newResult(errors.New("TransactionSender not started"))
err = errors.New("TransactionSender not started")
code = txSender.classifyErr(err)
}

return result
return
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT {
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) sendTxResult {
// broadcast is a background job, so always detach from caller's cancellation
ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx))
defer cancel()
result := rpc.SendTransaction(ctx, tx)
txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error())
if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil {
txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.Error())
code, err := rpc.SendTransaction(ctx, tx)
txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", err)
if !slices.Contains(sendTxSuccessfulCodes, code) && ctx.Err() == nil {
txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", err)
}
return result
return sendTxResult{code: code, error: err}
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) {
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan sendTxResult) {
defer txSender.wg.Done()
resultsByCode := sendTxResults[RESULT]{}
resultsByCode := sendTxResults{}
// txResults eventually will be closed
for txResult := range txResults {
resultsByCode[txResult.Code()] = append(resultsByCode[txResult.Code()], txResult)
resultsByCode[txResult.code] = append(resultsByCode[txResult.code], txResult)
}

select {
Expand All @@ -201,16 +199,16 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomal
default:
}

_, criticalErr := aggregateTxResults[RESULT](resultsByCode)
_, criticalErr := aggregateTxResults(resultsByCode)
if criticalErr != nil {
txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr)
PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc()
}
}

type sendTxResults[RESULT any] map[SendTxReturnCode][]RESULT
type sendTxResults map[SendTxReturnCode][]sendTxResult

func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result RESULT, criticalErr error) {
func aggregateTxResults(resultsByCode sendTxResults) (result sendTxResult, criticalErr error) {
severeErrors, hasSevereErrors := findFirstIn(resultsByCode, sendTxSevereErrors)
successResults, hasSuccess := findFirstIn(resultsByCode, sendTxSuccessfulCodes)
if hasSuccess {
Expand Down Expand Up @@ -239,21 +237,22 @@ func aggregateTxResults[RESULT any](resultsByCode sendTxResults[RESULT]) (result
return result, criticalErr
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan RESULT) RESULT {
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) collectTxResults(ctx context.Context, tx TX, healthyNodesNum int, txResults <-chan sendTxResult) (SendTxReturnCode, error) {
requiredResults := int(math.Ceil(float64(healthyNodesNum) * sendTxQuorum))
errorsByCode := sendTxResults[RESULT]{}
errorsByCode := sendTxResults{}
var softTimeoutChan <-chan time.Time
var resultsCount int
loop:
for {
select {
case <-ctx.Done():
txSender.lggr.Debugw("Failed to collect of the results before context was done", "tx", tx, "errorsByCode", errorsByCode)
return txSender.newResult(ctx.Err())
err := ctx.Err()
return txSender.classifyErr(err), err
case r := <-txResults:
errorsByCode[r.Code()] = append(errorsByCode[r.Code()], r)
errorsByCode[r.code] = append(errorsByCode[r.code], r)
resultsCount++
if slices.Contains(sendTxSuccessfulCodes, r.Code()) || resultsCount >= requiredResults {
if slices.Contains(sendTxSuccessfulCodes, r.code) || resultsCount >= requiredResults {
break loop
}
case <-softTimeoutChan:
Expand All @@ -273,16 +272,16 @@ loop:
// ignore critical error as it's reported in reportSendTxAnomalies
result, _ := aggregateTxResults(errorsByCode)
txSender.lggr.Debugw("Collected results", "errorsByCode", errorsByCode, "result", result)
return result
return result.code, result.error
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context.Context) error {
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Start(ctx context.Context) error {
return txSender.StartOnce("TransactionSender", func() error {
return nil
})
}

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error {
func (txSender *TransactionSender[TX, CHAIN_ID, RPC]) Close() error {
return txSender.StopOnce("TransactionSender", func() error {
txSender.lggr.Debug("Closing TransactionSender")
close(txSender.chStop)
Expand Down
Loading

0 comments on commit efd7093

Please sign in to comment.