Skip to content

Commit

Permalink
fix: improve logging and synchronization for NtN TxSubmission
Browse files Browse the repository at this point in the history
Fixes #204
Fixes #205
  • Loading branch information
agaffney committed Jul 8, 2024
1 parent fd748e7 commit 0d2a9cf
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions internal/tx/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/big"
"net"
"net/http"
"sync"
"time"

"github.com/Salvionied/apollo"
Expand Down Expand Up @@ -49,6 +50,7 @@ import (
var ntnTxBytes []byte
var ntnTxHash [32]byte
var ntnSentTx bool
var ntnMutex sync.Mutex
var doneChan chan any

func SendTx(blockData any, nonce [16]byte) error {
Expand Down Expand Up @@ -412,6 +414,8 @@ func submitTx(txRawBytes []byte) (string, error) {
}

func submitTxNtN(txRawBytes []byte) (string, error) {
ntnMutex.Lock()
defer ntnMutex.Unlock()
cfg := config.GetConfig()
logger := logging.GetLogger()

Expand All @@ -433,19 +437,13 @@ func submitTxNtN(txRawBytes []byte) (string, error) {
ntnTxHash = blake2b.Sum256(txBody)

// Create connection
conn := createClientConnection(cfg.Submit.Address)
errorChan := make(chan error)
// Capture errors
go func() {
err, ok := <-errorChan
if ok {
panic(fmt.Errorf("async: %s", err))
}
}()
conn, err := createClientConnection(cfg.Submit.Address)
if err != nil {
return "", err
}
o, err := ouroboros.New(
ouroboros.WithConnection(conn),
ouroboros.WithNetworkMagic(cfg.NetworkMagic),
ouroboros.WithErrorChan(errorChan),
ouroboros.WithNodeToNode(true),
ouroboros.WithKeepAlive(true),
ouroboros.WithTxSubmissionConfig(
Expand All @@ -458,13 +456,24 @@ func submitTxNtN(txRawBytes []byte) (string, error) {
if err != nil {
return "", err
}

// Start txSubmission loop
// Capture errors
doneChan = make(chan any)
go func() {
err, ok := <-o.ErrorChan()

Check failure on line 462 in internal/tx/tx.go

View workflow job for this annotation

GitHub Actions / lint

SA4006: this value of `err` is never used (staticcheck)
if ok {
select {

Check failure on line 464 in internal/tx/tx.go

View workflow job for this annotation

GitHub Actions / lint

S1000: should use a simple channel send/receive instead of `select` with a single case (gosimple)
case <-doneChan:
return
}
close(doneChan)

Check failure on line 468 in internal/tx/tx.go

View workflow job for this annotation

GitHub Actions / lint

unreachable: unreachable code (govet)
logger.Errorf("async error submitting TX via NtN: %s", err)
}
}()
// Start txSubmission loop
o.TxSubmission().Client.Init()
<-doneChan
// Sleep 2s to allow time for TX to enter remote mempool before closing connection
time.Sleep(2 * time.Second)
// Sleep 1s to allow time for TX to enter remote mempool before closing connection
time.Sleep(1 * time.Second)

if err := o.Close(); err != nil {
return "", fmt.Errorf("failed to close connection: %s", err)
Expand Down Expand Up @@ -515,8 +524,7 @@ func submitTxApi(txRawBytes []byte) (string, error) {
}
}

func createClientConnection(nodeAddress string) net.Conn {
logger := logging.GetLogger()
func createClientConnection(nodeAddress string) (net.Conn, error) {
var err error
var conn net.Conn
var dialProto string
Expand All @@ -526,10 +534,9 @@ func createClientConnection(nodeAddress string) net.Conn {

conn, err = net.Dial(dialProto, dialAddress)
if err != nil {
logger.Errorf("connection failed: %s", err)
panic(err)
return nil, fmt.Errorf("connection failed: %s", err)
}
return conn
return conn, nil
}

func handleRequestTxIds(
Expand All @@ -538,10 +545,16 @@ func handleRequestTxIds(
ack uint16,
req uint16,
) ([]txsubmission.TxIdAndSize, error) {
// Shutdown if we've already sent the TX
if ntnSentTx {
// Terrible syncronization hack for shutdown
select {
case <-doneChan:
return nil, nil
default:
}
close(doneChan)
time.Sleep(5 * time.Second)
// This prevents creating an async error while waiting for shutdown
time.Sleep(2 * time.Second)
return nil, nil
}
ret := []txsubmission.TxIdAndSize{
Expand Down

0 comments on commit 0d2a9cf

Please sign in to comment.