From f6caee8109ada8a6b52afdfffe96416b2933deb8 Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Tue, 1 Aug 2023 16:27:31 -0500 Subject: [PATCH] feat: chainsync input plugin status update callback Fixes #51 --- input/chainsync/chainsync.go | 44 +++++++++++++++++++++++++++--------- input/chainsync/options.go | 8 +++++++ 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/input/chainsync/chainsync.go b/input/chainsync/chainsync.go index 325943e..e771877 100644 --- a/input/chainsync/chainsync.go +++ b/input/chainsync/chainsync.go @@ -28,25 +28,36 @@ import ( ) type ChainSync struct { - oConn *ouroboros.Connection - network string - networkMagic uint32 - address string - socketPath string - ntcTcp bool - intersectTip bool - intersectPoints []ocommon.Point - includeCbor bool - errorChan chan error - eventChan chan event.Event + oConn *ouroboros.Connection + network string + networkMagic uint32 + address string + socketPath string + ntcTcp bool + intersectTip bool + intersectPoints []ocommon.Point + includeCbor bool + statusUpdateFunc StatusUpdateFunc + status *ChainSyncStatus + errorChan chan error + eventChan chan event.Event } +type ChainSyncStatus struct { + SlotNumber uint64 + BlockNumber uint64 + BlockHash string +} + +type StatusUpdateFunc func(ChainSyncStatus) + // New returns a new ChainSync object with the specified options applied func New(options ...ChainSyncOptionFunc) *ChainSync { c := &ChainSync{ errorChan: make(chan error), eventChan: make(chan event.Event, 10), intersectPoints: []ocommon.Point{}, + status: &ChainSyncStatus{}, } for _, option := range options { option(c) @@ -176,6 +187,7 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip case ledger.Block: evt := event.New("chainsync.block", time.Now(), NewBlockEvent(v, c.includeCbor)) c.eventChan <- evt + c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash()) case ledger.BlockHeader: blockSlot := v.SlotNumber() blockHash, _ := hex.DecodeString(v.Hash()) @@ -189,6 +201,16 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor)) c.eventChan <- txEvt } + c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash()) } return nil } + +func (c *ChainSync) updateStatus(slotNumber uint64, blockNumber uint64, blockHash string) { + c.status.SlotNumber = slotNumber + c.status.BlockNumber = blockNumber + c.status.BlockHash = blockHash + if c.statusUpdateFunc != nil { + c.statusUpdateFunc(*(c.status)) + } +} diff --git a/input/chainsync/options.go b/input/chainsync/options.go index 5a0ad3f..c8d5ffb 100644 --- a/input/chainsync/options.go +++ b/input/chainsync/options.go @@ -75,3 +75,11 @@ func WithIncludeCbor(includeCbor bool) ChainSyncOptionFunc { c.includeCbor = includeCbor } } + +// WithStatusUpdateFunc specifies a callback function for status updates. This is useful for tracking the chain-sync status +// to be able to resume a sync at a later time, especially when any filtering could prevent you from getting all block update events +func WithStatusUpdateFunc(statusUpdateFunc StatusUpdateFunc) ChainSyncOptionFunc { + return func(c *ChainSync) { + c.statusUpdateFunc = statusUpdateFunc + } +}