Skip to content

Commit

Permalink
Merge pull request #106 from blinklabs-io/feat/event-context
Browse files Browse the repository at this point in the history
feat: support context on events and update block
  • Loading branch information
wolf31o2 authored Oct 18, 2023
2 parents 9657d44 + afbc1cc commit ccc07f3
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 25 deletions.
4 changes: 3 additions & 1 deletion event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
type Event struct {
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Context interface{} `json:"context,omitempty"`
Payload interface{} `json:"payload"`
}

func New(eventType string, timestamp time.Time, payload interface{}) Event {
func New(eventType string, timestamp time.Time, context, payload interface{}) Event {
return Event{
Type: eventType,
Timestamp: timestamp,
Context: context,
Payload: payload,
}
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/blinklabs-io/snek
go 1.20

require (
github.com/blinklabs-io/gouroboros v0.56.0
github.com/blinklabs-io/gouroboros v0.57.0
github.com/fxamacker/cbor/v2 v2.5.0
github.com/gen2brain/beeep v0.0.0-20230602101333-f384c29b62dd
github.com/gin-gonic/gin v1.9.1
github.com/kelseyhightower/envconfig v1.4.0
Expand All @@ -27,7 +28,6 @@ require (
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fxamacker/cbor/v2 v2.5.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-openapi/jsonpointer v0.20.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGB
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/blinklabs-io/gouroboros v0.56.0 h1:AdWB6SkHD5fn4wKU+/2BFsT9xae3W+nZ189LxZw/mF8=
github.com/blinklabs-io/gouroboros v0.56.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k=
github.com/blinklabs-io/gouroboros v0.57.0 h1:k5Y706vvYAGM3bCtEhh6WRHGrvS3S6n1MT9vNLFAe/E=
github.com/blinklabs-io/gouroboros v0.57.0/go.mod h1:D5YJka8EyVmiXNMbRvjH23H9lNMLA4+qSlNNC/j7R0k=
github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM=
github.com/bytedance/sonic v1.10.0-rc/go.mod h1:ElCzW+ufi8qKqNW0FY314xriJhyJhuoJ3gFZdAHF7NM=
github.com/bytedance/sonic v1.10.2 h1:GQebETVBxYB7JGWJtLBi07OVzWwt+8dWA00gEVW2ZFE=
Expand Down
42 changes: 36 additions & 6 deletions input/chainsync/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,52 @@
package chainsync

import (
"fmt"

"github.com/blinklabs-io/gouroboros/ledger"
"github.com/fxamacker/cbor/v2"
)

type BlockEvent struct {
type BlockContext struct {
BlockNumber uint64 `json:"blockNumber"`
BlockHash string `json:"blockHash"`
SlotNumber uint64 `json:"slotNumber"`
BlockCbor byteSliceJsonHex `json:"blockCbor,omitempty"`
}

func NewBlockEvent(block ledger.Block, includeCbor bool) BlockEvent {
evt := BlockEvent{
type BlockEvent struct {
BlockBodySize uint64 `json:"blockBodySize"`
IssuerVkey string `json:"issuerVkey"`
BlockHash string `json:"blockHash"`
BlockCbor byteSliceJsonHex `json:"blockCbor,omitempty"`
}

func NewBlockContext(block ledger.Block) BlockContext {
ctx := BlockContext{
BlockNumber: block.BlockNumber(),
SlotNumber: block.SlotNumber(),
}
return ctx
}

func NewBlockHeaderContext(block ledger.BlockHeader) BlockContext {
ctx := BlockContext{
BlockNumber: block.BlockNumber(),
BlockHash: block.Hash(),
SlotNumber: block.SlotNumber(),
}
return ctx
}

func NewBlockEvent(block ledger.Block, includeCbor bool) BlockEvent {
keyCbor, err := cbor.Marshal(block.IssuerVkey())
if err != nil {
panic(err)
}
// iss := ledger.NewBlake2b256(block.IssuerVkey())
evt := BlockEvent{
BlockBodySize: block.BlockBodySize(),
BlockHash: block.Hash(),
IssuerVkey: fmt.Sprintf("%x", keyCbor),
// IssuerVkey: iss.String(),
}
if includeCbor {
evt.BlockCbor = block.Cbor()
}
Expand Down
12 changes: 6 additions & 6 deletions input/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,15 @@ func (c *ChainSync) setupConnection() error {
}

func (c *ChainSync) handleRollBackward(point ocommon.Point, tip ochainsync.Tip) error {
evt := event.New("chainsync.rollback", time.Now(), NewRollbackEvent(point))
evt := event.New("chainsync.rollback", time.Now(), nil, NewRollbackEvent(point))
c.eventChan <- evt
return nil
}

func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip ochainsync.Tip) error {
switch v := blockData.(type) {
case ledger.Block:
evt := event.New("chainsync.block", time.Now(), NewBlockEvent(v, c.includeCbor))
evt := event.New("chainsync.block", time.Now(), NewBlockContext(v), NewBlockEvent(v, c.includeCbor))
c.eventChan <- evt
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash(), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash))
case ledger.BlockHeader:
Expand All @@ -218,10 +218,10 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
if err != nil {
return err
}
blockEvt := event.New("chainsync.block", time.Now(), NewBlockEvent(block, c.includeCbor))
blockEvt := event.New("chainsync.block", time.Now(), NewBlockHeaderContext(v), NewBlockEvent(block, c.includeCbor))
c.eventChan <- blockEvt
for _, transaction := range block.Transactions() {
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor))
txEvt := event.New("chainsync.transaction", time.Now(), nil, NewTransactionEvent(block, transaction, c.includeCbor))
c.eventChan <- txEvt
}
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash(), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash))
Expand All @@ -230,10 +230,10 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
}

func (c *ChainSync) handleBlockFetchBlock(block ledger.Block) error {
blockEvt := event.New("chainsync.block", time.Now(), NewBlockEvent(block, c.includeCbor))
blockEvt := event.New("chainsync.block", time.Now(), nil, NewBlockEvent(block, c.includeCbor))
c.eventChan <- blockEvt
for _, transaction := range block.Transactions() {
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor))
txEvt := event.New("chainsync.transaction", time.Now(), nil, NewTransactionEvent(block, transaction, c.includeCbor))
c.eventChan <- txEvt
}
c.updateStatus(block.SlotNumber(), block.BlockNumber(), block.Hash(), c.bulkRangeEnd.Slot, hex.EncodeToString(c.bulkRangeEnd.Hash))
Expand Down
9 changes: 7 additions & 2 deletions output/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@ func (n *NotifyOutput) Start() error {
if payload == nil {
panic(fmt.Errorf("ERROR: %v", payload))
}
context := evt.Context
if context == nil {
panic(fmt.Errorf("ERROR: %v", context))
}

be := payload.(chainsync.BlockEvent)
bc := context.(chainsync.BlockContext)
err := beeep.Notify(
n.title,
fmt.Sprintf("New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s",
be.BlockNumber,
be.SlotNumber,
bc.BlockNumber,
bc.SlotNumber,
be.BlockHash,
),
"assets/snek-icon.png",
Expand Down
9 changes: 7 additions & 2 deletions output/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,17 @@ func (p *PushOutput) Start() error {
if payload == nil {
panic(fmt.Errorf("ERROR: %v", payload))
}
context := evt.Context
if context == nil {
panic(fmt.Errorf("ERROR: %v", context))
}

be := payload.(chainsync.BlockEvent)
bc := context.(chainsync.BlockContext)
fmt.Println("Snek")
fmt.Printf("New Block!\nBlockNumber: %d, SlotNumber: %d\nHash: %s",
be.BlockNumber,
be.SlotNumber,
bc.BlockNumber,
bc.SlotNumber,
be.BlockHash,
)

Expand Down
26 changes: 22 additions & 4 deletions output/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"net/http"
"time"

// cbor "github.com/fxamacker/cbor/v2"

"github.com/blinklabs-io/snek/event"
"github.com/blinklabs-io/snek/input/chainsync"
"github.com/blinklabs-io/snek/internal/logging"
Expand Down Expand Up @@ -68,10 +70,16 @@ func (w *WebhookOutput) Start() error {
if payload == nil {
panic(fmt.Errorf("ERROR: %v", payload))
}
context := evt.Context
switch evt.Type {
case "chainsync.block":
if context == nil {
panic(fmt.Errorf("ERROR: %v", context))
}
be := payload.(chainsync.BlockEvent)
bc := context.(chainsync.BlockContext)
evt.Payload = be
evt.Context = bc
case "chainsync.rollback":
re := payload.(chainsync.RollbackEvent)
evt.Payload = re
Expand All @@ -97,7 +105,7 @@ func basicAuth(username, password string) string {
return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
}

func formatPayload(e *event.Event, format string) []byte {
func formatWebhook(e *event.Event, format string) []byte {
var data []byte
var err error
switch format {
Expand All @@ -109,19 +117,29 @@ func formatPayload(e *event.Event, format string) []byte {
switch e.Type {
case "chainsync.block":
be := e.Payload.(chainsync.BlockEvent)
bc := e.Context.(chainsync.BlockContext)
// keyCbor, err := cbor.Marshal(be.IssuerVkey)
// if err != nil {
// panic(err)
// }
dme.Title = "New Cardano Block"
dmefs = append(dmefs, &DiscordMessageEmbedField{
Name: "Block Number",
Value: fmt.Sprintf("%d", be.BlockNumber),
Value: fmt.Sprintf("%d", bc.BlockNumber),
})
dmefs = append(dmefs, &DiscordMessageEmbedField{
Name: "Slot Number",
Value: fmt.Sprintf("%d", be.SlotNumber),
Value: fmt.Sprintf("%d", bc.SlotNumber),
})
dmefs = append(dmefs, &DiscordMessageEmbedField{
Name: "Block Hash",
Value: be.BlockHash,
})
// TODO: get the pool identifier from be.IssuerVkey
// dmefs = append(dmefs, &DiscordMessageEmbedField{
// Name: "Issuer Vkey",
// Value: fmt.Sprintf("%x", keyCbor),
// })
// TODO: fix this URL for different networks
dme.URL = fmt.Sprintf("https://cexplorer.io/block/%s", be.BlockHash)
case "chainsync.rollback":
Expand Down Expand Up @@ -199,7 +217,7 @@ type DiscordMessageEmbedField struct {
func (w *WebhookOutput) SendWebhook(e *event.Event) error {
logger := logging.GetLogger()
logger.Infof("sending event %s to %s", e.Type, w.url)
data := formatPayload(e, w.format)
data := formatWebhook(e, w.format)
// Setup request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down

0 comments on commit ccc07f3

Please sign in to comment.