From afbc1cc50c224db410c47829f88c531802948d09 Mon Sep 17 00:00:00 2001 From: Chris Gianelloni Date: Wed, 18 Oct 2023 09:28:18 -0400 Subject: [PATCH] feat: support context on events and update block Signed-off-by: Chris Gianelloni --- event/event.go | 4 +++- go.mod | 4 ++-- go.sum | 4 ++-- input/chainsync/block.go | 42 ++++++++++++++++++++++++++++++------ input/chainsync/chainsync.go | 12 +++++------ output/notify/notify.go | 9 ++++++-- output/push/push.go | 9 ++++++-- output/webhook/webhook.go | 26 ++++++++++++++++++---- 8 files changed, 85 insertions(+), 25 deletions(-) diff --git a/event/event.go b/event/event.go index 788d1ec..bf65868 100644 --- a/event/event.go +++ b/event/event.go @@ -21,13 +21,15 @@ import ( type Event struct { Type string `json:"type"` Timestamp time.Time `json:"timestamp"` + Context interface{} `json:"context,omitempty"` Payload interface{} `json:"payload"` } -func New(eventType string, timestamp time.Time, payload interface{}) Event { +func New(eventType string, timestamp time.Time, context, payload interface{}) Event { return Event{ Type: eventType, Timestamp: timestamp, + Context: context, Payload: payload, } } diff --git a/go.mod b/go.mod index 7afe6b0..ee22b66 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/blinklabs-io/snek go 1.20 require ( - github.com/blinklabs-io/gouroboros v0.56.0 + github.com/blinklabs-io/gouroboros v0.57.0 + github.com/fxamacker/cbor/v2 v2.5.0 github.com/gen2brain/beeep v0.0.0-20230602101333-f384c29b62dd github.com/gin-gonic/gin v1.9.1 github.com/kelseyhightower/envconfig v1.4.0 @@ -27,7 +28,6 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/fxamacker/cbor/v2 v2.5.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect diff --git a/go.sum b/go.sum index 3f50a29..e994527 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= -github.com/blinklabs-io/gouroboros v0.56.0 h1:AdWB6SkHD5fn4wKU+/2BFsT9xae3W+nZ189LxZw/mF8= -github.com/blinklabs-io/gouroboros v0.56.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k= +github.com/blinklabs-io/gouroboros v0.57.0 h1:k5Y706vvYAGM3bCtEhh6WRHGrvS3S6n1MT9vNLFAe/E= +github.com/blinklabs-io/gouroboros v0.57.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k= github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM= github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE= diff --git a/input/chainsync/block.go b/input/chainsync/block.go index 6c20dc1..27c486e 100644 --- a/input/chainsync/block.go +++ b/input/chainsync/block.go @@ -15,22 +15,52 @@ package chainsync import ( + "fmt" + "github.com/blinklabs-io/gouroboros/ledger" + "github.com/fxamacker/cbor/v2" ) -type BlockEvent struct { +type BlockContext struct { BlockNumber uint64 `json:"blockNumber"` - BlockHash string `json:"blockHash"` SlotNumber uint64 `json:"slotNumber"` - BlockCbor byteSliceJsonHex `json:"blockCbor,omitempty"` } -func NewBlockEvent(block ledger.Block, includeCbor bool) BlockEvent { - evt := BlockEvent{ +type BlockEvent struct { + BlockBodySize uint64 `json:"blockBodySize"` + IssuerVkey string `json:"issuerVkey"` + BlockHash string `json:"blockHash"` + BlockCbor byteSliceJsonHex `json:"blockCbor,omitempty"` +} + +func NewBlockContext(block ledger.Block) BlockContext { + ctx := BlockContext{ + BlockNumber: block.BlockNumber(), + SlotNumber: block.SlotNumber(), + } + return ctx +} + +func NewBlockHeaderContext(block ledger.BlockHeader) BlockContext { + ctx := BlockContext{ BlockNumber: block.BlockNumber(), - BlockHash: block.Hash(), SlotNumber: block.SlotNumber(), } + return ctx +} + +func NewBlockEvent(block ledger.Block, includeCbor bool) BlockEvent { + keyCbor, err := cbor.Marshal(block.IssuerVkey()) + if err != nil { + panic(err) + } + // iss := ledger.NewBlake2b256(block.IssuerVkey()) + evt := BlockEvent{ + BlockBodySize: block.BlockBodySize(), + BlockHash: block.Hash(), + IssuerVkey: fmt.Sprintf("%x", keyCbor), + // IssuerVkey: iss.String(), + } if includeCbor { evt.BlockCbor = block.Cbor() } diff --git a/input/chainsync/chainsync.go b/input/chainsync/chainsync.go index efdf050..9d7e1a4 100644 --- a/input/chainsync/chainsync.go +++ b/input/chainsync/chainsync.go @@ -200,7 +200,7 @@ func (c *ChainSync) setupConnection() error { } func (c *ChainSync) handleRollBackward(point ocommon.Point, tip ochainsync.Tip) error { - evt := event.New("chainsync.rollback", time.Now(), NewRollbackEvent(point)) + evt := event.New("chainsync.rollback", time.Now(), nil, NewRollbackEvent(point)) c.eventChan <- evt return nil } @@ -208,7 +208,7 @@ func (c *ChainSync) handleRollBackward(point ocommon.Point, tip ochainsync.Tip) func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip ochainsync.Tip) error { switch v := blockData.(type) { case ledger.Block: - evt := event.New("chainsync.block", time.Now(), NewBlockEvent(v, c.includeCbor)) + evt := event.New("chainsync.block", time.Now(), NewBlockContext(v), NewBlockEvent(v, c.includeCbor)) c.eventChan <- evt c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash(), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash)) case ledger.BlockHeader: @@ -218,10 +218,10 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip if err != nil { return err } - blockEvt := event.New("chainsync.block", time.Now(), NewBlockEvent(block, c.includeCbor)) + blockEvt := event.New("chainsync.block", time.Now(), NewBlockHeaderContext(v), NewBlockEvent(block, c.includeCbor)) c.eventChan <- blockEvt for _, transaction := range block.Transactions() { - txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor)) + txEvt := event.New("chainsync.transaction", time.Now(), nil, NewTransactionEvent(block, transaction, c.includeCbor)) c.eventChan <- txEvt } c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash(), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash)) @@ -230,10 +230,10 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip } func (c *ChainSync) handleBlockFetchBlock(block ledger.Block) error { - blockEvt := event.New("chainsync.block", time.Now(), NewBlockEvent(block, c.includeCbor)) + blockEvt := event.New("chainsync.block", time.Now(), nil, NewBlockEvent(block, c.includeCbor)) c.eventChan <- blockEvt for _, transaction := range block.Transactions() { - txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor)) + txEvt := event.New("chainsync.transaction", time.Now(), nil, NewTransactionEvent(block, transaction, c.includeCbor)) c.eventChan <- txEvt } c.updateStatus(block.SlotNumber(), block.BlockNumber(), block.Hash(), c.bulkRangeEnd.Slot, hex.EncodeToString(c.bulkRangeEnd.Hash)) diff --git a/output/notify/notify.go b/output/notify/notify.go index d74dbda..2f7e793 100644 --- a/output/notify/notify.go +++ b/output/notify/notify.go @@ -55,13 +55,18 @@ func (n *NotifyOutput) Start() error { if payload == nil { panic(fmt.Errorf("ERROR: %v", payload)) } + context := evt.Context + if context == nil { + panic(fmt.Errorf("ERROR: %v", context)) + } be := payload.(chainsync.BlockEvent) + bc := context.(chainsync.BlockContext) err := beeep.Notify( n.title, fmt.Sprintf("New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s", - be.BlockNumber, - be.SlotNumber, + bc.BlockNumber, + bc.SlotNumber, be.BlockHash, ), "assets/snek-icon.png", diff --git a/output/push/push.go b/output/push/push.go index 94a5104..9719b49 100644 --- a/output/push/push.go +++ b/output/push/push.go @@ -83,12 +83,17 @@ func (p *PushOutput) Start() error { if payload == nil { panic(fmt.Errorf("ERROR: %v", payload)) } + context := evt.Context + if context == nil { + panic(fmt.Errorf("ERROR: %v", context)) + } be := payload.(chainsync.BlockEvent) + bc := context.(chainsync.BlockContext) fmt.Println("Snek") fmt.Printf("New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s", - be.BlockNumber, - be.SlotNumber, + bc.BlockNumber, + bc.SlotNumber, be.BlockHash, ) diff --git a/output/webhook/webhook.go b/output/webhook/webhook.go index df8c5c8..9d8ab90 100644 --- a/output/webhook/webhook.go +++ b/output/webhook/webhook.go @@ -25,6 +25,8 @@ import ( "net/http" "time" + // cbor "github.com/fxamacker/cbor/v2" + "github.com/blinklabs-io/snek/event" "github.com/blinklabs-io/snek/input/chainsync" "github.com/blinklabs-io/snek/internal/logging" @@ -68,10 +70,16 @@ func (w *WebhookOutput) Start() error { if payload == nil { panic(fmt.Errorf("ERROR: %v", payload)) } + context := evt.Context switch evt.Type { case "chainsync.block": + if context == nil { + panic(fmt.Errorf("ERROR: %v", context)) + } be := payload.(chainsync.BlockEvent) + bc := context.(chainsync.BlockContext) evt.Payload = be + evt.Context = bc case "chainsync.rollback": re := payload.(chainsync.RollbackEvent) evt.Payload = re @@ -97,7 +105,7 @@ func basicAuth(username, password string) string { return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) } -func formatPayload(e *event.Event, format string) []byte { +func formatWebhook(e *event.Event, format string) []byte { var data []byte var err error switch format { @@ -109,19 +117,29 @@ func formatPayload(e *event.Event, format string) []byte { switch e.Type { case "chainsync.block": be := e.Payload.(chainsync.BlockEvent) + bc := e.Context.(chainsync.BlockContext) + // keyCbor, err := cbor.Marshal(be.IssuerVkey) + // if err != nil { + // panic(err) + // } dme.Title = "New Cardano Block" dmefs = append(dmefs, &DiscordMessageEmbedField{ Name: "Block Number", - Value: fmt.Sprintf("%d", be.BlockNumber), + Value: fmt.Sprintf("%d", bc.BlockNumber), }) dmefs = append(dmefs, &DiscordMessageEmbedField{ Name: "Slot Number", - Value: fmt.Sprintf("%d", be.SlotNumber), + Value: fmt.Sprintf("%d", bc.SlotNumber), }) dmefs = append(dmefs, &DiscordMessageEmbedField{ Name: "Block Hash", Value: be.BlockHash, }) + // TODO: get the pool identifier from be.IssuerVkey + // dmefs = append(dmefs, &DiscordMessageEmbedField{ + // Name: "Issuer Vkey", + // Value: fmt.Sprintf("%x", keyCbor), + // }) // TODO: fix this URL for different networks dme.URL = fmt.Sprintf("https://cexplorer.io/block/%s", be.BlockHash) case "chainsync.rollback": @@ -199,7 +217,7 @@ type DiscordMessageEmbedField struct { func (w *WebhookOutput) SendWebhook(e *event.Event) error { logger := logging.GetLogger() logger.Infof("sending event %s to %s", e.Type, w.url) - data := formatPayload(e, w.format) + data := formatWebhook(e, w.format) // Setup request ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()