Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(logs): use slog attributes for protocol clients #763

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions protocol/blockfetch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -97,7 +101,11 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: stopping client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("stopping client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := NewMsgClientDone()
err = c.SendMessage(msg)
})
Expand All @@ -107,7 +115,18 @@ func (c *Client) Stop() error {
// GetBlockRange starts an async process to fetch all blocks in the specified range (inclusive)
func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client called GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})", ProtocolName, start.Slot, start.Hash, end.Slot, end.Hash))
Debug(
fmt.Sprintf("calling GetBlockRange(start: {Slot: %d, Hash: %x}, end: {Slot: %d, Hash: %x})",
start.Slot,
start.Hash,
end.Slot,
end.Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Lock()
c.blockUseCallback = true
msg := NewMsgRequestRange(start, end)
Expand All @@ -129,7 +148,13 @@ func (c *Client) GetBlockRange(start common.Point, end common.Point) error {
// GetBlock requests and returns a single block specified by the provided point
func (c *Client) GetBlock(point common.Point) (ledger.Block, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client called GetBlock(point: {Slot: %d, Hash: %x})", ProtocolName, point.Slot, point.Hash))
Debug(
fmt.Sprintf("calling GetBlock(point: {Slot: %d, Hash: %x})", point.Slot, point.Hash),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Lock()
c.blockUseCallback = false
msg := NewMsgRequestRange(point, point)
Expand Down Expand Up @@ -175,22 +200,37 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleStartBatch() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client start batch for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting batch",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.startBatchResultChan <- nil
return nil
}

func (c *Client) handleNoBlocks() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client no blocks found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("no blocks returned",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
err := fmt.Errorf("block(s) not found")
c.startBatchResultChan <- err
return nil
}

func (c *Client) handleBlock(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client block found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("block returned",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := msgGeneric.(*MsgBlock)
// Decode only enough to get the block type value
var wrappedBlock WrappedBlock
Expand All @@ -217,7 +257,12 @@ func (c *Client) handleBlock(msgGeneric protocol.Message) error {

func (c *Client) handleBatchDone() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client batch done for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("batch done",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Unlock()
return nil
}
148 changes: 132 additions & 16 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func NewClient(
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Start goroutine to cleanup resources on protocol shutdown
go func() {
Expand All @@ -132,7 +136,11 @@ func (c *Client) Stop() error {
var err error
c.onceStop.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: stopping client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("stopping client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
msg := NewMsgDone()
Expand All @@ -146,7 +154,12 @@ func (c *Client) Stop() error {
// GetCurrentTip returns the current chain tip
func (c *Client) GetCurrentTip() (*Tip, error) {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetCurrentTip()", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("calling GetCurrentTip()",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
done := atomic.Bool{}
requestResultChan := make(chan Tip, 1)
requestErrorChan := make(chan error, 1)
Expand Down Expand Up @@ -186,13 +199,25 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
waitingForCurrentTipChan = nil
case tip := <-waitingResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
Debug(
fmt.Sprintf("received tip results {Slot: %d, Hash: %x, BlockNumber: %d}", tip.Point.Slot, tip.Point.Hash, tip.BlockNumber),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
// The result from the other request is ready.
done.Store(true)
return &tip, nil
case tip := <-requestResultChan:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: returning tip results {Slot: %d, Hash: %x, BlockNumber: %d} to %+v", ProtocolName, tip.Point.Slot, tip.Point.Hash, tip.BlockNumber, c.callbackContext.ConnectionId.RemoteAddr))
Debug(
fmt.Sprintf("received tip results {Slot: %d, Hash: %x, BlockNumber: %d}", tip.Point.Slot, tip.Point.Hash, tip.BlockNumber),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
// If waitingForCurrentTipChan is full, the for loop that empties it might finish the
// loop before the select statement that writes to it is triggered. For that reason we
// require requestResultChan here.
Expand All @@ -215,16 +240,49 @@ func (c *Client) GetAvailableBlockRange(
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}

// Debug logging
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
Debug(
fmt.Sprintf(
"calling GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
Debug(
fmt.Sprintf(
"calling GetAvailableBlockRange(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
intersectPoints[1].Slot,
intersectPoints[1].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called GetAvailableBlockRange(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
Debug(
fmt.Sprintf(
"calling GetAvailableBlockRange(intersectPoints: %+v)",
intersectPoints,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
}

// Find our chain intersection
Expand Down Expand Up @@ -300,16 +358,49 @@ func (c *Client) Sync(intersectPoints []common.Point) error {
if len(intersectPoints) == 0 {
intersectPoints = []common.Point{common.NewPointOrigin()}
}

// Debug logging
switch len(intersectPoints) {
case 1:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash))
Debug(
fmt.Sprintf(
"calling Sync(intersectPoints: []{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
case 2:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints[0].Slot, intersectPoints[0].Hash, intersectPoints[1].Slot, intersectPoints[1].Hash))
Debug(
fmt.Sprintf(
"calling Sync(intersectPoints: []{Slot: %d, Hash: %x},{Slot: %d, Hash: %x})",
intersectPoints[0].Slot,
intersectPoints[0].Hash,
intersectPoints[1].Slot,
intersectPoints[1].Hash,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
default:
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client %+v called Sync(intersectPoints: %+v)", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr, intersectPoints))
Debug(
fmt.Sprintf(
"calling Sync(intersectPoints: %+v)",
intersectPoints,
),
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
}

intersectResultChan, cancel := c.wantIntersectFound()
Expand Down Expand Up @@ -478,13 +569,23 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleAwaitReply() error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client await reply for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("waiting for next reply",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
return nil
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client roll forward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("roll forward",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
firstBlockChan := func() chan<- clientPointResult {
select {
case ch := <-c.wantFirstBlockChan:
Expand Down Expand Up @@ -594,7 +695,12 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {

func (c *Client) handleRollBackward(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client roll backward for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("roll backward",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msgRollBackward := msg.(*MsgRollBackward)
c.sendCurrentTip(msgRollBackward.Tip)
if len(c.wantFirstBlockChan) == 0 {
Expand All @@ -621,7 +727,12 @@ func (c *Client) handleRollBackward(msg protocol.Message) error {

func (c *Client) handleIntersectFound(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client intersect found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("chain intersect found",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msgIntersectFound := msg.(*MsgIntersectFound)
c.sendCurrentTip(msgIntersectFound.Tip)

Expand All @@ -635,7 +746,12 @@ func (c *Client) handleIntersectFound(msg protocol.Message) error {

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client intersect not found for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("chain intersect not found",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.sendCurrentTip(msgIntersectNotFound.Tip)

Expand Down
20 changes: 17 additions & 3 deletions protocol/handshake/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
func (c *Client) Start() {
c.onceStart.Do(func() {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: starting client protocol for connection %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("starting client protocol",
"component", "network",
"protocol", ProtocolName,
"connection_id", c.callbackContext.ConnectionId.String(),
)
c.Protocol.Start()
// Send our ProposeVersions message
msg := NewMsgProposeVersions(c.config.ProtocolVersionMap)
Expand All @@ -97,7 +101,12 @@ func (c *Client) messageHandler(msg protocol.Message) error {

func (c *Client) handleAcceptVersion(msg protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client accept version for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("accepted version negotiation",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
if c.config.FinishedFunc == nil {
return fmt.Errorf(
"received handshake AcceptVersion message but no callback function is defined",
Expand All @@ -120,7 +129,12 @@ func (c *Client) handleAcceptVersion(msg protocol.Message) error {

func (c *Client) handleRefuse(msgGeneric protocol.Message) error {
c.Protocol.Logger().
Debug(fmt.Sprintf("%s: client refuse for %+v", ProtocolName, c.callbackContext.ConnectionId.RemoteAddr))
Debug("refused handshake",
"component", "network",
"protocol", ProtocolName,
"role", "client",
"connection_id", c.callbackContext.ConnectionId.String(),
)
msg := msgGeneric.(*MsgRefuse)
var err error
switch msg.Reason[0].(uint64) {
Expand Down
Loading