From 77199fc51bf513a7f8333658fb48fa75b6f48a9d Mon Sep 17 00:00:00 2001 From: Kirill Pisarev Date: Tue, 29 Aug 2023 18:47:19 +0200 Subject: [PATCH] fix: replaced additional map with TraceID to structure in for monitored transaction --- .../calls/transactor/monitored/monitored.go | 67 ++++++++----------- .../transactor/signAndSend/signAndSend.go | 9 +-- 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/chains/evm/calls/transactor/monitored/monitored.go b/chains/evm/calls/transactor/monitored/monitored.go index cd5ea203..3679b4c3 100644 --- a/chains/evm/calls/transactor/monitored/monitored.go +++ b/chains/evm/calls/transactor/monitored/monitored.go @@ -29,6 +29,11 @@ type RawTx struct { creationTime time.Time } +type RawTxWithTraceID struct { + RawTx + traceID traceapi.TraceID +} + type MonitoredTransactor struct { txFabric calls.TxFabric gasPriceClient calls.GasPricer @@ -37,8 +42,7 @@ type MonitoredTransactor struct { maxGasPrice *big.Int increasePercentage *big.Int - pendingTxns map[common.Hash]RawTx - pendingTxnsTrace map[common.Hash]traceapi.TraceID + pendingTxns map[common.Hash]RawTxWithTraceID txLock sync.Mutex } @@ -60,30 +64,27 @@ func NewMonitoredTransactor( client: client, gasPriceClient: gasPriceClient, txFabric: txFabric, - pendingTxns: make(map[common.Hash]RawTx), - pendingTxnsTrace: make(map[common.Hash]traceapi.TraceID), + pendingTxns: make(map[common.Hash]RawTxWithTraceID), maxGasPrice: maxGasPrice, increasePercentage: increasePercentage, } } func (t *MonitoredTransactor) Transact(ctx context.Context, to *common.Address, data []byte, opts transactor.TransactOptions) (*common.Hash, error) { - _, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.EVMListener.ListenToEvents") - + _, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.evm.transactor.Monitor") + defer span.End() t.client.LockNonce() defer t.client.UnlockNonce() n, err := t.client.UnsafeNonce() if err != nil { span.RecordError(fmt.Errorf("unable to get unsafe nonce with err: %w", err)) - span.End() return &common.Hash{}, err } err = transactor.MergeTransactionOptions(&opts, &transactor.DefaultTransactionOptions) if err != nil { span.RecordError(fmt.Errorf("unable to merge transaction options with err: %w", err)) - span.End() return &common.Hash{}, err } @@ -100,39 +101,41 @@ func (t *MonitoredTransactor) Transact(ctx context.Context, to *common.Address, span.AddEvent("Calculated GasPrice", traceapi.WithAttributes(attribute.String("tx.gp", gp[0].String()))) } - rawTx := RawTx{ - to: to, - nonce: n.Uint64(), - value: opts.Value, - gasLimit: opts.GasLimit, - gasPrice: gp, - data: data, - submitTime: time.Now(), - creationTime: time.Now(), + rawTx := RawTxWithTraceID{ + RawTx{ + to: to, + nonce: n.Uint64(), + value: opts.Value, + gasLimit: opts.GasLimit, + gasPrice: gp, + data: data, + submitTime: time.Now(), + creationTime: time.Now(), + }, + span.SpanContext().TraceID(), } tx, err := t.txFabric(rawTx.nonce, rawTx.to, rawTx.value, rawTx.gasLimit, rawTx.gasPrice, rawTx.data) if err != nil { span.RecordError(fmt.Errorf("unable to call TxFabric with err: %w", err)) - span.End() return &common.Hash{}, err } h, err := t.client.SignAndSendTransaction(context.TODO(), tx) if err != nil { span.RecordError(fmt.Errorf("unable to SignAndSendTransaction with err: %w", err)) - span.End() + span.SetStatus(codes.Error, "unable to SignAndSendTransaction") return &common.Hash{}, err } + span.AddEvent("Executed transaction", traceapi.WithAttributes(attribute.String("tx.hash", h.String()))) t.txLock.Lock() t.pendingTxns[h] = rawTx - t.pendingTxnsTrace[h] = span.SpanContext().TraceID() t.txLock.Unlock() err = t.client.UnsafeIncreaseNonce() if err != nil { span.RecordError(fmt.Errorf("unable to UnsafeIncreaseNonce with err: %w", err)) - span.End() + span.SetStatus(codes.Error, "unable to UnsafeIncreaseNonce") return &common.Hash{}, err } @@ -154,27 +157,17 @@ func (t *MonitoredTransactor) Monitor( case <-ticker.C: { t.txLock.Lock() - pendingTxCopy := make(map[common.Hash]RawTx, len(t.pendingTxns)) + pendingTxCopy := make(map[common.Hash]RawTxWithTraceID, len(t.pendingTxns)) for k, v := range t.pendingTxns { pendingTxCopy[k] = v } - pendingTxTraceIDCopy := make(map[common.Hash]traceapi.TraceID, len(t.pendingTxnsTrace)) - for k, v := range t.pendingTxnsTrace { - pendingTxTraceIDCopy[k] = v - } t.txLock.Unlock() for oldHash, tx := range pendingTxCopy { if time.Since(tx.submitTime) < tooNewTransaction { continue } - tID, ok := pendingTxTraceIDCopy[oldHash] - if ok { - // Creating span context with existing TraceID - spanCtx := traceapi.NewSpanContext(traceapi.SpanContextConfig{TraceID: tID, Remote: true}) - ctx = traceapi.ContextWithSpanContext(ctx, spanCtx) - } - ctx, span := otel.Tracer("relayer-sygma").Start(ctx, "relayer.sygma.evm.transactor.Monitor") + txContextWithSpan, span := otel.Tracer("relayer-core").Start(traceapi.ContextWithSpanContext(ctx, traceapi.NewSpanContext(traceapi.SpanContextConfig{TraceID: tx.traceID})), "relayer.core.evm.transactor.Monitor") logger := log.With().Str("dd.trace_id", span.SpanContext().TraceID().String()).Logger() receipt, err := t.client.TransactionReceipt(context.Background(), oldHash) @@ -191,7 +184,6 @@ func (t *MonitoredTransactor) Monitor( span.End() } delete(t.pendingTxns, oldHash) - delete(t.pendingTxnsTrace, oldHash) continue } @@ -200,11 +192,10 @@ func (t *MonitoredTransactor) Monitor( span.RecordError(fmt.Errorf("transaction has timed out"), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce)))) span.End() delete(t.pendingTxns, oldHash) - delete(t.pendingTxnsTrace, oldHash) continue } - hash, err := t.resendTransaction(ctx, &tx) + hash, err := t.resendTransaction(txContextWithSpan, &tx.RawTx) if err != nil { span.RecordError(fmt.Errorf("error resending transaction %w", err), traceapi.WithAttributes(attribute.String("tx.hash", oldHash.String()), attribute.Int64("tx.nonce", int64(tx.nonce)))) logger.Warn().Uint64("nonce", tx.nonce).Err(err).Msgf("Failed resending transaction %s", oldHash) @@ -214,9 +205,7 @@ func (t *MonitoredTransactor) Monitor( span.End() delete(t.pendingTxns, oldHash) - delete(t.pendingTxnsTrace, oldHash) t.pendingTxns[hash] = tx - t.pendingTxnsTrace[hash] = tID } } } @@ -246,7 +235,7 @@ func (t *MonitoredTransactor) resendTransaction(ctx context.Context, tx *RawTx) // would be 11 (it floors the value). In case the gas price didn't // change it increases it by 1. func (t *MonitoredTransactor) IncreaseGas(ctx context.Context, oldGp []*big.Int) []*big.Int { - _, span := otel.Tracer("relayer-core").Start(ctx, "relayer.sygma.evm.transactor.Monitor.IncreaseGas") + _, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.evm.transactor.Monitor.IncreaseGas") newGp := make([]*big.Int, len(oldGp)) for i, gp := range oldGp { percentIncreaseValue := new(big.Int).Div(new(big.Int).Mul(gp, t.increasePercentage), big.NewInt(100)) diff --git a/chains/evm/calls/transactor/signAndSend/signAndSend.go b/chains/evm/calls/transactor/signAndSend/signAndSend.go index e353b2dd..b2dd9d78 100644 --- a/chains/evm/calls/transactor/signAndSend/signAndSend.go +++ b/chains/evm/calls/transactor/signAndSend/signAndSend.go @@ -32,12 +32,12 @@ func NewSignAndSendTransactor(txFabric calls.TxFabric, gasPriceClient calls.GasP func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address, data []byte, opts transactor.TransactOptions) (*common.Hash, error) { _, span := otel.Tracer("relayer-core").Start(ctx, "relayer.core.Transactor.signAndSendTransactor.Transact") + defer span.End() t.client.LockNonce() n, err := t.client.UnsafeNonce() if err != nil { t.client.UnlockNonce() span.RecordError(fmt.Errorf("unable to get unsafe nonce with err: %w", err)) - span.End() return &common.Hash{}, err } @@ -45,7 +45,6 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address if err != nil { t.client.UnlockNonce() span.RecordError(fmt.Errorf("unable to merge transaction options with err: %w", err)) - span.End() return &common.Hash{}, err } @@ -55,7 +54,6 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address if err != nil { t.client.UnlockNonce() span.RecordError(fmt.Errorf("unable to define gas price with err: %w", err)) - span.End() return &common.Hash{}, err } } @@ -70,7 +68,6 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address if err != nil { t.client.UnlockNonce() span.RecordError(fmt.Errorf("unable to call TxFabric with err: %w", err)) - span.End() return &common.Hash{}, err } @@ -78,7 +75,6 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address if err != nil { t.client.UnlockNonce() span.RecordError(fmt.Errorf("unable to SignAndSendTransaction with err: %w", err)) - span.End() return &common.Hash{}, err } @@ -86,17 +82,14 @@ func (t *signAndSendTransactor) Transact(ctx context.Context, to *common.Address t.client.UnlockNonce() if err != nil { span.RecordError(fmt.Errorf("unable to UnsafeIncreaseNonce with err: %w", err)) - span.End() return &common.Hash{}, err } _, err = t.client.WaitAndReturnTxReceipt(h) if err != nil { span.RecordError(fmt.Errorf("unable to WaitAndReturnTxReceipt with err: %w", err)) - span.End() return &common.Hash{}, err } span.SetStatus(codes.Ok, "Transaction sent") - span.End() return &h, nil }