Skip to content

Commit

Permalink
Merge pull request #26 from cloudstruct/feature/protocol-refactor
Browse files Browse the repository at this point in the history
Refactor protocol state machine
  • Loading branch information
agaffney authored Feb 27, 2022
2 parents 662f679 + 38748ad commit d6ee6ff
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 198 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ A Go client implementation of the Cardano Ouroboros network protocol

This is loosely based on the [official Haskell implementation](https://github.com/input-output-hk/ouroboros-network)

NOTE: this library is under heavily development, and the interface should not be considered stable until it reaches `v1.0.0`

## Implementation status

The Ouroboros protocol consists of a simple multiplexer protocol and various mini-protocols that run on top of it.
Expand Down
91 changes: 56 additions & 35 deletions protocol/blockfetch/blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,51 @@ var (
STATE_DONE = protocol.NewState(4, "Done")
)

var stateMap = protocol.StateMap{
STATE_IDLE: protocol.StateMapEntry{
Agency: protocol.AGENCY_CLIENT,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_REQUEST_RANGE,
NewState: STATE_BUSY,
},
{
MsgType: MESSAGE_TYPE_CLIENT_DONE,
NewState: STATE_DONE,
},
},
},
STATE_BUSY: protocol.StateMapEntry{
Agency: protocol.AGENCY_SERVER,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_START_BATCH,
NewState: STATE_STREAMING,
},
{
MsgType: MESSAGE_TYPE_NO_BLOCKS,
NewState: STATE_IDLE,
},
},
},
STATE_STREAMING: protocol.StateMapEntry{
Agency: protocol.AGENCY_SERVER,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_BLOCK,
NewState: STATE_STREAMING,
},
{
MsgType: MESSAGE_TYPE_BATCH_DONE,
NewState: STATE_IDLE,
},
},
},
STATE_DONE: protocol.StateMapEntry{
Agency: protocol.AGENCY_NONE,
},
}

type BlockFetch struct {
proto *protocol.Protocol
callbackConfig *BlockFetchCallbackConfig
Expand All @@ -42,9 +87,17 @@ func New(m *muxer.Muxer, errorChan chan error, callbackConfig *BlockFetchCallbac
b := &BlockFetch{
callbackConfig: callbackConfig,
}
b.proto = protocol.New(PROTOCOL_NAME, PROTOCOL_ID, m, errorChan, b.messageHandler, NewMsgFromCbor)
// Set initial state
b.proto.SetState(STATE_IDLE)
protoConfig := protocol.ProtocolConfig{
Name: PROTOCOL_NAME,
ProtocolId: PROTOCOL_ID,
Muxer: m,
ErrorChan: errorChan,
MessageHandlerFunc: b.messageHandler,
MessageFromCborFunc: NewMsgFromCbor,
StateMap: stateMap,
InitialState: STATE_IDLE,
}
b.proto = protocol.New(protoConfig)
return b
}

Expand All @@ -66,57 +119,32 @@ func (b *BlockFetch) messageHandler(msg protocol.Message) error {
}

func (b *BlockFetch) RequestRange(start []interface{}, end []interface{}) error {
if err := b.proto.LockState([]protocol.State{STATE_IDLE}); err != nil {
return fmt.Errorf("%s: RequestRange: protocol not in expected state", PROTOCOL_NAME)
}
msg := newMsgRequestRange(start, end)
// Unlock and change state when we're done
defer b.proto.UnlockState(STATE_BUSY)
// Send request
return b.proto.SendMessage(msg, false)
}

func (b *BlockFetch) ClientDone() error {
if err := b.proto.LockState([]protocol.State{STATE_IDLE}); err != nil {
return fmt.Errorf("%s: ClientDone: protocol not in expected state", PROTOCOL_NAME)
}
msg := newMsgClientDone()
// Unlock and change state when we're done
defer b.proto.UnlockState(STATE_BUSY)
// Send request
return b.proto.SendMessage(msg, false)
}

func (b *BlockFetch) handleStartBatch() error {
if err := b.proto.LockState([]protocol.State{STATE_BUSY}); err != nil {
return fmt.Errorf("received block-fetch StartBatch message when protocol not in expected state")
}
if b.callbackConfig.StartBatchFunc == nil {
return fmt.Errorf("received block-fetch StartBatch message but no callback function is defined")
}
// Unlock and change state when we're done
defer b.proto.UnlockState(STATE_STREAMING)
// Call the user callback function
return b.callbackConfig.StartBatchFunc()
}

func (b *BlockFetch) handleNoBlocks() error {
if err := b.proto.LockState([]protocol.State{STATE_BUSY}); err != nil {
return fmt.Errorf("received block-fetch NoBlocks message when protocol not in expected state")
}
if b.callbackConfig.NoBlocksFunc == nil {
return fmt.Errorf("received block-fetch NoBlocks message but no callback function is defined")
}
// Unlock and change state when we're done
defer b.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return b.callbackConfig.NoBlocksFunc()
}

func (b *BlockFetch) handleBlock(msgGeneric protocol.Message) error {
if err := b.proto.LockState([]protocol.State{STATE_STREAMING}); err != nil {
return fmt.Errorf("received block-fetch Block message when protocol not in expected state")
}
if b.callbackConfig.BlockFunc == nil {
return fmt.Errorf("received block-fetch Block message but no callback function is defined")
}
Expand All @@ -130,21 +158,14 @@ func (b *BlockFetch) handleBlock(msgGeneric protocol.Message) error {
if err != nil {
return err
}
// Unlock and change state when we're done
defer b.proto.UnlockState(STATE_STREAMING)
// Call the user callback function
return b.callbackConfig.BlockFunc(wrapBlock.Type, blk)
}

func (b *BlockFetch) handleBatchDone() error {
if err := b.proto.LockState([]protocol.State{STATE_STREAMING}); err != nil {
return fmt.Errorf("received block-fetch BatchDone message when protocol not in expected state")
}
if b.callbackConfig.BatchDoneFunc == nil {
return fmt.Errorf("received block-fetch BatchDone message but no callback function is defined")
}
// Unlock and change state when we're done
defer b.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return b.callbackConfig.BatchDoneFunc()
}
125 changes: 77 additions & 48 deletions protocol/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,72 @@ var (
STATE_DONE = protocol.NewState(5, "Done")
)

var stateMap = protocol.StateMap{
STATE_IDLE: protocol.StateMapEntry{
Agency: protocol.AGENCY_CLIENT,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_REQUEST_NEXT,
NewState: STATE_CAN_AWAIT,
},
{
MsgType: MESSAGE_TYPE_FIND_INTERSECT,
NewState: STATE_INTERSECT,
},
{
MsgType: MESSAGE_TYPE_DONE,
NewState: STATE_DONE,
},
},
},
STATE_CAN_AWAIT: protocol.StateMapEntry{
Agency: protocol.AGENCY_SERVER,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_AWAIT_REPLY,
NewState: STATE_MUST_REPLY,
},
{
MsgType: MESSAGE_TYPE_ROLL_FORWARD,
NewState: STATE_IDLE,
},
{
MsgType: MESSAGE_TYPE_ROLL_BACKWARD,
NewState: STATE_IDLE,
},
},
},
STATE_INTERSECT: protocol.StateMapEntry{
Agency: protocol.AGENCY_SERVER,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_INTERSECT_FOUND,
NewState: STATE_IDLE,
},
{
MsgType: MESSAGE_TYPE_INTERSECT_NOT_FOUND,
NewState: STATE_IDLE,
},
},
},
STATE_MUST_REPLY: protocol.StateMapEntry{
Agency: protocol.AGENCY_SERVER,
Transitions: []protocol.StateTransition{
{
MsgType: MESSAGE_TYPE_ROLL_FORWARD,
NewState: STATE_IDLE,
},
{
MsgType: MESSAGE_TYPE_ROLL_BACKWARD,
NewState: STATE_IDLE,
},
},
},
STATE_DONE: protocol.StateMapEntry{
Agency: protocol.AGENCY_NONE,
},
}

type ChainSync struct {
proto *protocol.Protocol
nodeToNode bool
Expand Down Expand Up @@ -56,9 +122,17 @@ func New(m *muxer.Muxer, errorChan chan error, nodeToNode bool, callbackConfig *
nodeToNode: nodeToNode,
callbackConfig: callbackConfig,
}
c.proto = protocol.New(PROTOCOL_NAME, protocolId, m, errorChan, c.messageHandler, c.NewMsgFromCbor)
// Set initial state
c.proto.SetState(STATE_IDLE)
protoConfig := protocol.ProtocolConfig{
Name: PROTOCOL_NAME,
ProtocolId: protocolId,
Muxer: m,
ErrorChan: errorChan,
MessageHandlerFunc: c.messageHandler,
MessageFromCborFunc: c.NewMsgFromCbor,
StateMap: stateMap,
InitialState: STATE_IDLE,
}
c.proto = protocol.New(protoConfig)
return c
}

Expand All @@ -84,45 +158,24 @@ func (c *ChainSync) messageHandler(msg protocol.Message) error {
}

func (c *ChainSync) RequestNext() error {
if err := c.proto.LockState([]protocol.State{STATE_IDLE}); err != nil {
return fmt.Errorf("%s: RequestNext: protocol not in expected state", PROTOCOL_NAME)
}
// Create our request
msg := newMsgRequestNext()
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_CAN_AWAIT)
// Send request
return c.proto.SendMessage(msg, false)
}

func (c *ChainSync) FindIntersect(points []interface{}) error {
if err := c.proto.LockState([]protocol.State{STATE_IDLE}); err != nil {
return fmt.Errorf("%s: FindIntersect: protocol not in expected state", PROTOCOL_NAME)
}
msg := newMsgFindIntersect(points)
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_INTERSECT)
// Send request
return c.proto.SendMessage(msg, false)
}

func (c *ChainSync) handleAwaitReply() error {
if err := c.proto.LockState([]protocol.State{STATE_CAN_AWAIT}); err != nil {
return fmt.Errorf("received chain-sync AwaitReply message when protocol not in expected state")
}
if c.callbackConfig.AwaitReplyFunc == nil {
return fmt.Errorf("received chain-sync AwaitReply message but no callback function is defined")
}
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_MUST_REPLY)
// Call the user callback function
return c.callbackConfig.AwaitReplyFunc()
}

func (c *ChainSync) handleRollForward(msgGeneric protocol.Message) error {
if err := c.proto.LockState([]protocol.State{STATE_CAN_AWAIT, STATE_MUST_REPLY}); err != nil {
return fmt.Errorf("received chain-sync RollForward message when protocol not in expected state")
}
if c.callbackConfig.RollForwardFunc == nil {
return fmt.Errorf("received chain-sync RollForward message but no callback function is defined")
}
Expand Down Expand Up @@ -163,8 +216,6 @@ func (c *ChainSync) handleRollForward(msgGeneric protocol.Message) error {
return err
}
}
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return c.callbackConfig.RollForwardFunc(blockType, blockHeader)
} else {
Expand All @@ -178,64 +229,42 @@ func (c *ChainSync) handleRollForward(msgGeneric protocol.Message) error {
if err != nil {
return err
}
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return c.callbackConfig.RollForwardFunc(wrapBlock.Type, blk)
}
}

func (c *ChainSync) handleRollBackward(msgGeneric protocol.Message) error {
if err := c.proto.LockState([]protocol.State{STATE_CAN_AWAIT, STATE_MUST_REPLY}); err != nil {
return fmt.Errorf("received chain-sync RollBackward message when protocol not in expected state")
}
if c.callbackConfig.RollBackwardFunc == nil {
return fmt.Errorf("received chain-sync RollBackward message but no callback function is defined")
}
msg := msgGeneric.(*msgRollBackward)
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return c.callbackConfig.RollBackwardFunc(msg.Point, msg.Tip)
}

func (c *ChainSync) handleIntersectFound(msgGeneric protocol.Message) error {
if err := c.proto.LockState([]protocol.State{STATE_INTERSECT}); err != nil {
return fmt.Errorf("received chain-sync IntersectFound message when protocol not in expected state")
}
if c.callbackConfig.IntersectFoundFunc == nil {
return fmt.Errorf("received chain-sync IntersectFound message but no callback function is defined")
}
msg := msgGeneric.(*msgIntersectFound)
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return c.callbackConfig.IntersectFoundFunc(msg.Point, msg.Tip)
}

func (c *ChainSync) handleIntersectNotFound(msgGeneric protocol.Message) error {
if err := c.proto.LockState([]protocol.State{STATE_INTERSECT}); err != nil {
return fmt.Errorf("received chain-sync IntersectNotFound message when protocol not in expected state")
}
if c.callbackConfig.IntersectNotFoundFunc == nil {
return fmt.Errorf("received chain-sync IntersectNotFound message but no callback function is defined")
}
msg := msgGeneric.(*msgIntersectNotFound)
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_IDLE)
// Call the user callback function
return c.callbackConfig.IntersectNotFoundFunc(msg.Tip)
}

func (c *ChainSync) handleDone() error {
if err := c.proto.LockState([]protocol.State{STATE_IDLE}); err != nil {
return fmt.Errorf("received chain-sync Done message when protocol not in expected state")
}
if c.callbackConfig.DoneFunc == nil {
return fmt.Errorf("received chain-sync Done message but no callback function is defined")
}
// Unlock and change state when we're done
defer c.proto.UnlockState(STATE_DONE)
// Call the user callback function
return c.callbackConfig.DoneFunc()
}
Loading

0 comments on commit d6ee6ff

Please sign in to comment.