diff --git a/protocol/txsubmission/server.go b/protocol/txsubmission/server.go index 2a0ebeea..23696e87 100644 --- a/protocol/txsubmission/server.go +++ b/protocol/txsubmission/server.go @@ -22,12 +22,18 @@ import ( type Server struct { *protocol.Protocol - config *Config + config *Config + ackCount int + stateDone bool + requestTxIdsResultChan chan []TxIdAndSize + requestTxsResultChan chan []TxBody } func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { s := &Server{ - config: cfg, + config: cfg, + requestTxIdsResultChan: make(chan []TxIdAndSize), + requestTxsResultChan: make(chan []TxBody), } protoConfig := protocol.ProtocolConfig{ Name: ProtocolName, @@ -42,6 +48,12 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server { InitialState: stateInit, } s.Protocol = protocol.New(protoConfig) + // Start goroutine to cleanup resources on protocol shutdown + go func() { + <-s.Protocol.DoneChan() + close(s.requestTxIdsResultChan) + close(s.requestTxsResultChan) + }() return s } @@ -66,36 +78,57 @@ func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error { return err } -func (s *Server) handleReplyTxIds(msg protocol.Message) error { - if s.config.ReplyTxIdsFunc == nil { - return fmt.Errorf( - "received tx-submission ReplyTxIds message but no callback function is defined", - ) +func (s *Server) RequestTxIds(blocking bool, reqCount int) ([]TxIdAndSize, error) { + if s.stateDone { + return nil, protocol.ProtocolShuttingDownError } + msg := NewMsgRequestTxIds(blocking, uint16(s.ackCount), uint16(reqCount)) + if err := s.SendMessage(msg); err != nil { + return nil, err + } + // Reset ack count + s.ackCount = 0 + // Wait for result + txIds, ok := <-s.requestTxIdsResultChan + if !ok { + return nil, protocol.ProtocolShuttingDownError + } + return txIds, nil +} + +func (s *Server) RequestTxs(txIds []TxId) ([]TxBody, error) { + if s.stateDone { + return nil, protocol.ProtocolShuttingDownError + } + msg := NewMsgRequestTxs(txIds) + if err := s.SendMessage(msg); err != nil { + return nil, err + } + // Wait for result + txs, ok := <-s.requestTxsResultChan + if !ok { + return nil, protocol.ProtocolShuttingDownError + } + // Set the ack count for the next RequestTxIds request based on the number we got for this one + s.ackCount = len(txs) + return txs, nil +} + +func (s *Server) handleReplyTxIds(msg protocol.Message) error { msgReplyTxIds := msg.(*MsgReplyTxIds) - // Call the user callback function - return s.config.ReplyTxIdsFunc(msgReplyTxIds.TxIds) + s.requestTxIdsResultChan <- msgReplyTxIds.TxIds + return nil } func (s *Server) handleReplyTxs(msg protocol.Message) error { - if s.config.ReplyTxsFunc == nil { - return fmt.Errorf( - "received tx-submission ReplyTxs message but no callback function is defined", - ) - } msgReplyTxs := msg.(*MsgReplyTxs) - // Call the user callback function - return s.config.ReplyTxsFunc(msgReplyTxs.Txs) + s.requestTxsResultChan <- msgReplyTxs.Txs + return nil } func (s *Server) handleDone() error { - if s.config.DoneFunc == nil { - return fmt.Errorf( - "received tx-submission Done message but no callback function is defined", - ) - } - // Call the user callback function - return s.config.DoneFunc() + s.stateDone = true + return nil } func (s *Server) handleInit() error { diff --git a/protocol/txsubmission/txsubmission.go b/protocol/txsubmission/txsubmission.go index ef42d606..9dfed881 100644 --- a/protocol/txsubmission/txsubmission.go +++ b/protocol/txsubmission/txsubmission.go @@ -114,20 +114,14 @@ type TxSubmission struct { type Config struct { RequestTxIdsFunc RequestTxIdsFunc - ReplyTxIdsFunc ReplyTxIdsFunc RequestTxsFunc RequestTxsFunc - ReplyTxsFunc ReplyTxsFunc - DoneFunc DoneFunc InitFunc InitFunc IdleTimeout time.Duration } // Callback function types type RequestTxIdsFunc func(bool, uint16, uint16) ([]TxIdAndSize, error) -type ReplyTxIdsFunc func(interface{}) error type RequestTxsFunc func([]TxId) ([]TxBody, error) -type ReplyTxsFunc func(interface{}) error -type DoneFunc func() error type InitFunc func() error func New(protoOptions protocol.ProtocolOptions, cfg *Config) *TxSubmission { @@ -159,30 +153,12 @@ func WithRequestTxIdsFunc( } } -func WithReplyTxIdsFunc(replyTxIdsFunc ReplyTxIdsFunc) TxSubmissionOptionFunc { - return func(c *Config) { - c.ReplyTxIdsFunc = replyTxIdsFunc - } -} - func WithRequestTxsFunc(requestTxsFunc RequestTxsFunc) TxSubmissionOptionFunc { return func(c *Config) { c.RequestTxsFunc = requestTxsFunc } } -func WithReplyTxsFunc(replyTxsFunc ReplyTxsFunc) TxSubmissionOptionFunc { - return func(c *Config) { - c.ReplyTxsFunc = replyTxsFunc - } -} - -func WithDoneFunc(doneFunc DoneFunc) TxSubmissionOptionFunc { - return func(c *Config) { - c.DoneFunc = doneFunc - } -} - func WithInitFunc(initFunc InitFunc) TxSubmissionOptionFunc { return func(c *Config) { c.InitFunc = initFunc