Skip to content

Commit

Permalink
Merge pull request #411 from blinklabs-io/feat/protocol-txsubmission-…
Browse files Browse the repository at this point in the history
…server

feat: finish implementing server side of tx-submission
  • Loading branch information
agaffney authored Oct 27, 2023
2 parents f1cfd4a + 5f7cbf7 commit 2d52b71
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 47 deletions.
79 changes: 56 additions & 23 deletions protocol/txsubmission/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ import (

type Server struct {
*protocol.Protocol
config *Config
config *Config
ackCount int
stateDone bool
requestTxIdsResultChan chan []TxIdAndSize
requestTxsResultChan chan []TxBody
}

func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
s := &Server{
config: cfg,
config: cfg,
requestTxIdsResultChan: make(chan []TxIdAndSize),
requestTxsResultChan: make(chan []TxBody),
}
protoConfig := protocol.ProtocolConfig{
Name: ProtocolName,
Expand All @@ -42,6 +48,12 @@ func NewServer(protoOptions protocol.ProtocolOptions, cfg *Config) *Server {
InitialState: stateInit,
}
s.Protocol = protocol.New(protoConfig)
// Start goroutine to cleanup resources on protocol shutdown
go func() {
<-s.Protocol.DoneChan()
close(s.requestTxIdsResultChan)
close(s.requestTxsResultChan)
}()
return s
}

Expand All @@ -66,36 +78,57 @@ func (s *Server) messageHandler(msg protocol.Message, isResponse bool) error {
return err
}

func (s *Server) handleReplyTxIds(msg protocol.Message) error {
if s.config.ReplyTxIdsFunc == nil {
return fmt.Errorf(
"received tx-submission ReplyTxIds message but no callback function is defined",
)
func (s *Server) RequestTxIds(blocking bool, reqCount int) ([]TxIdAndSize, error) {
if s.stateDone {
return nil, protocol.ProtocolShuttingDownError
}
msg := NewMsgRequestTxIds(blocking, uint16(s.ackCount), uint16(reqCount))
if err := s.SendMessage(msg); err != nil {
return nil, err
}
// Reset ack count
s.ackCount = 0
// Wait for result
txIds, ok := <-s.requestTxIdsResultChan
if !ok {
return nil, protocol.ProtocolShuttingDownError
}
return txIds, nil
}

func (s *Server) RequestTxs(txIds []TxId) ([]TxBody, error) {
if s.stateDone {
return nil, protocol.ProtocolShuttingDownError
}
msg := NewMsgRequestTxs(txIds)
if err := s.SendMessage(msg); err != nil {
return nil, err
}
// Wait for result
txs, ok := <-s.requestTxsResultChan
if !ok {
return nil, protocol.ProtocolShuttingDownError
}
// Set the ack count for the next RequestTxIds request based on the number we got for this one
s.ackCount = len(txs)
return txs, nil
}

func (s *Server) handleReplyTxIds(msg protocol.Message) error {
msgReplyTxIds := msg.(*MsgReplyTxIds)
// Call the user callback function
return s.config.ReplyTxIdsFunc(msgReplyTxIds.TxIds)
s.requestTxIdsResultChan <- msgReplyTxIds.TxIds
return nil
}

func (s *Server) handleReplyTxs(msg protocol.Message) error {
if s.config.ReplyTxsFunc == nil {
return fmt.Errorf(
"received tx-submission ReplyTxs message but no callback function is defined",
)
}
msgReplyTxs := msg.(*MsgReplyTxs)
// Call the user callback function
return s.config.ReplyTxsFunc(msgReplyTxs.Txs)
s.requestTxsResultChan <- msgReplyTxs.Txs
return nil
}

func (s *Server) handleDone() error {
if s.config.DoneFunc == nil {
return fmt.Errorf(
"received tx-submission Done message but no callback function is defined",
)
}
// Call the user callback function
return s.config.DoneFunc()
s.stateDone = true
return nil
}

func (s *Server) handleInit() error {
Expand Down
24 changes: 0 additions & 24 deletions protocol/txsubmission/txsubmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,20 +114,14 @@ type TxSubmission struct {

type Config struct {
RequestTxIdsFunc RequestTxIdsFunc
ReplyTxIdsFunc ReplyTxIdsFunc
RequestTxsFunc RequestTxsFunc
ReplyTxsFunc ReplyTxsFunc
DoneFunc DoneFunc
InitFunc InitFunc
IdleTimeout time.Duration
}

// Callback function types
type RequestTxIdsFunc func(bool, uint16, uint16) ([]TxIdAndSize, error)
type ReplyTxIdsFunc func(interface{}) error
type RequestTxsFunc func([]TxId) ([]TxBody, error)
type ReplyTxsFunc func(interface{}) error
type DoneFunc func() error
type InitFunc func() error

func New(protoOptions protocol.ProtocolOptions, cfg *Config) *TxSubmission {
Expand Down Expand Up @@ -159,30 +153,12 @@ func WithRequestTxIdsFunc(
}
}

func WithReplyTxIdsFunc(replyTxIdsFunc ReplyTxIdsFunc) TxSubmissionOptionFunc {
return func(c *Config) {
c.ReplyTxIdsFunc = replyTxIdsFunc
}
}

func WithRequestTxsFunc(requestTxsFunc RequestTxsFunc) TxSubmissionOptionFunc {
return func(c *Config) {
c.RequestTxsFunc = requestTxsFunc
}
}

func WithReplyTxsFunc(replyTxsFunc ReplyTxsFunc) TxSubmissionOptionFunc {
return func(c *Config) {
c.ReplyTxsFunc = replyTxsFunc
}
}

func WithDoneFunc(doneFunc DoneFunc) TxSubmissionOptionFunc {
return func(c *Config) {
c.DoneFunc = doneFunc
}
}

func WithInitFunc(initFunc InitFunc) TxSubmissionOptionFunc {
return func(c *Config) {
c.InitFunc = initFunc
Expand Down

0 comments on commit 2d52b71

Please sign in to comment.