From 7a79e853af19a107ebae01a96bd31a2bfbb84f42 Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Mon, 8 Jul 2024 15:53:11 -0500 Subject: [PATCH] feat: improve logging and synchronization for NtN TxSubmission Fixes #204 Fixes #205 --- internal/tx/tx.go | 55 +++++++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 21 deletions(-) diff --git a/internal/tx/tx.go b/internal/tx/tx.go index 4e6652c..61b97fa 100644 --- a/internal/tx/tx.go +++ b/internal/tx/tx.go @@ -22,6 +22,7 @@ import ( "math/big" "net" "net/http" + "sync" "time" "github.com/Salvionied/apollo" @@ -49,6 +50,7 @@ import ( var ntnTxBytes []byte var ntnTxHash [32]byte var ntnSentTx bool +var ntnMutex sync.Mutex var doneChan chan any func SendTx(blockData any, nonce [16]byte) error { @@ -412,6 +414,8 @@ func submitTx(txRawBytes []byte) (string, error) { } func submitTxNtN(txRawBytes []byte) (string, error) { + ntnMutex.Lock() + defer ntnMutex.Unlock() cfg := config.GetConfig() logger := logging.GetLogger() @@ -433,19 +437,13 @@ func submitTxNtN(txRawBytes []byte) (string, error) { ntnTxHash = blake2b.Sum256(txBody) // Create connection - conn := createClientConnection(cfg.Submit.Address) - errorChan := make(chan error) - // Capture errors - go func() { - err, ok := <-errorChan - if ok { - panic(fmt.Errorf("async: %s", err)) - } - }() + conn, err := createClientConnection(cfg.Submit.Address) + if err != nil { + return "", err + } o, err := ouroboros.New( ouroboros.WithConnection(conn), ouroboros.WithNetworkMagic(cfg.NetworkMagic), - ouroboros.WithErrorChan(errorChan), ouroboros.WithNodeToNode(true), ouroboros.WithKeepAlive(true), ouroboros.WithTxSubmissionConfig( @@ -458,13 +456,24 @@ func submitTxNtN(txRawBytes []byte) (string, error) { if err != nil { return "", err } - - // Start txSubmission loop + // Capture errors doneChan = make(chan any) + go func() { + err, ok := <-o.ErrorChan() + if ok { + select { + case <-doneChan: + return + } + close(doneChan) + logger.Errorf("async error submitting TX via NtN: %s", err) + } + }() + // Start txSubmission loop o.TxSubmission().Client.Init() <-doneChan - // Sleep 2s to allow time for TX to enter remote mempool before closing connection - time.Sleep(2 * time.Second) + // Sleep 1s to allow time for TX to enter remote mempool before closing connection + time.Sleep(1 * time.Second) if err := o.Close(); err != nil { return "", fmt.Errorf("failed to close connection: %s", err) @@ -515,8 +524,7 @@ func submitTxApi(txRawBytes []byte) (string, error) { } } -func createClientConnection(nodeAddress string) net.Conn { - logger := logging.GetLogger() +func createClientConnection(nodeAddress string) (net.Conn, error) { var err error var conn net.Conn var dialProto string @@ -526,10 +534,9 @@ func createClientConnection(nodeAddress string) net.Conn { conn, err = net.Dial(dialProto, dialAddress) if err != nil { - logger.Errorf("connection failed: %s", err) - panic(err) + return nil, fmt.Errorf("connection failed: %s", err) } - return conn + return conn, nil } func handleRequestTxIds( @@ -538,10 +545,16 @@ func handleRequestTxIds( ack uint16, req uint16, ) ([]txsubmission.TxIdAndSize, error) { + // Shutdown if we've already sent the TX if ntnSentTx { - // Terrible syncronization hack for shutdown + select { + case <-doneChan: + return nil, nil + default: + } close(doneChan) - time.Sleep(5 * time.Second) + // This prevents creating an async error while waiting for shutdown + time.Sleep(2 * time.Second) return nil, nil } ret := []txsubmission.TxIdAndSize{