Skip to content

Commit

Permalink
[DDC | Storage Cluster | Delivery set 17] (Subscribe storage node to …
Browse files Browse the repository at this point in the history
…change topology blockchain event)

Read events from the blockchain seens last block in case websocket was disconnected
  • Loading branch information
Max-Levitskiy committed Jul 3, 2023
1 parent 1327c08 commit 6b16a2f
Show file tree
Hide file tree
Showing 10 changed files with 724 additions and 57 deletions.
166 changes: 109 additions & 57 deletions contract/pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/sdktypes"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/subscription"
"github.com/cerebellum-network/cere-ddc-sdk-go/contract/pkg/utils"
"math"
"os/signal"
"reflect"
"sync"
Expand All @@ -23,13 +25,20 @@ const (
CERE = 10_000_000_000
)

var (
chainSubscriptionFactory = subscription.NewChainFactory()
watchdogFactory = subscription.NewWatchdogFactory()
watchdogTimeout = time.Minute
)

type (
blockchainClient struct {
*gsrpc.SubstrateAPI
eventContractAccount types.AccountID
eventDispatcher map[types.Hash]sdktypes.ContractEventDispatchEntry
eventContextCancel context.CancelFunc
eventContextCancel []context.CancelFunc
connectMutex sync.Mutex
eventDecoder subscription.EventDecoder
}
)

Expand All @@ -41,6 +50,7 @@ func CreateBlockchainClient(apiUrl string) sdktypes.BlockchainClient {

return &blockchainClient{
SubstrateAPI: substrateAPI,
eventDecoder: subscription.NewEventDecoder(),
}
}

Expand Down Expand Up @@ -69,33 +79,71 @@ func (b *blockchainClient) listenContractEvents() error {
return err
}

sub, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
s, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
if err != nil {
return err
}
b.processChainSubscription(chainSubscriptionFactory.NewChainSubscription(s), key, meta)
return nil
}

func (b *blockchainClient) processChainSubscription(sub subscription.ChainSubscription, key types.StorageKey, meta *types.Metadata) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
b.eventContextCancel = cancel
watchdog := time.NewTicker(time.Minute)
b.eventContextCancel = append(b.eventContextCancel, cancel)
watchdog := watchdogFactory.NewWatchdog(watchdogTimeout)
eventArrived := true
var lastEventBlock types.BlockNumber
var lastEventBlockHash types.Hash
go func() {
defer sub.Unsubscribe()
for {
select {
case <-ctx.Done():
log.Info("Chain subscription context done")
return

case <-watchdog.C:
case <-watchdog.C():
if !eventArrived {
log.WithField("block", lastEventBlockHash.Hex()).Warn("Watchdog event timeout")
// read missed blocks
lastEventBlock, err := b.RPC.Chain.GetBlock(lastEventBlockHash)
if err != nil {
log.WithError(err).Warn("Error fetching block")
break
}
lastEventBlockNumber := lastEventBlock.Block.Header.Number
headerLatest, err := b.RPC.Chain.GetHeaderLatest()
if headerLatest.Number > lastEventBlockNumber {
for i := lastEventBlockNumber + 1; i <= headerLatest.Number; i++ {
missedBlock, err := b.RPC.Chain.GetBlockHash(uint64(i))
if err != nil {
log.Println(err)
continue
}
storageData, err := b.RPC.State.GetStorageRaw(key, missedBlock)
if err != nil {
log.WithError(err).Error("Error fetching storage data")
continue
}
events, err := b.eventDecoder.DecodeEvents(*storageData, meta)
if err != nil {
log.WithError(err).Error("Error parsing events")
continue
}

b.processEvents(events, missedBlock)
lastEventBlockHash = missedBlock
}
}

// try to resubscribe
s, err := b.RPC.State.SubscribeStorageRaw([]types.StorageKey{key})
if err != nil {
log.WithError(err).Warn("Watchdog resubscribtion failed")
break
}
log.Info("Watchdog event resubscribed")
sub.Unsubscribe()
sub = s
sub = chainSubscriptionFactory.NewChainSubscription(s)
}
eventArrived = false

Expand All @@ -108,13 +156,7 @@ func (b *blockchainClient) listenContractEvents() error {
break
}
eventArrived = true
block, err := b.RPC.Chain.GetBlock(evt.Block)
if err != nil {
log.WithError(err).Warn("Error fetching block")
break
}
lastEventBlock = block.Block.Header.Number
print(lastEventBlock)
lastEventBlockHash = evt.Block

// parse all events for this block
for _, chng := range evt.Changes {
Expand All @@ -123,54 +165,58 @@ func (b *blockchainClient) listenContractEvents() error {
continue
}

events := types.EventRecords{}
err = types.EventRecordsRaw(chng.StorageData).DecodeEventRecords(meta, &events)
storageData := chng.StorageData
events, err := b.eventDecoder.DecodeEvents(storageData, meta)
if err != nil {
log.WithError(err).Warnf("Error parsing event %x", chng.StorageData[:])
log.WithError(err).Warnf("Error parsing event %x", storageData[:])
continue
}

for _, e := range events.Contracts_ContractEmitted {
if !b.eventContractAccount.Equal(&e.Contract) {
continue
}

// Identify the event by matching one of its topics against known signatures. The topics are sorted so
// the needed one may be in the arbitrary position.
var dispatchEntry sdktypes.ContractEventDispatchEntry
found := false
for _, topic := range e.Topics {
dispatchEntry, found = b.eventDispatcher[topic]
if found {
break
}
}
if !found {
log.WithField("block", evt.Block.Hex()).
Warnf("Unknown event emitted by our contract: %x", e.Data[:16])
continue
}

if dispatchEntry.Handler == nil {
log.WithField("block", evt.Block.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debug("Event unhandeled")
continue
}
args := reflect.New(dispatchEntry.ArgumentType).Interface()
if err := codec.Decode(e.Data[1:], args); err != nil {
log.WithError(err).WithField("block", evt.Block.Hex()).
WithField("event", dispatchEntry.ArgumentType.Name()).
Errorf("Cannot decode event data %x", e.Data)
}
log.WithField("block", evt.Block.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debugf("Event args: %x", e.Data)
dispatchEntry.Handler(args)
}
b.processEvents(events, evt.Block)
}
}
}
}()
return nil
}

func (b *blockchainClient) processEvents(events *types.EventRecords, blockHash types.Hash) {
for _, e := range events.Contracts_ContractEmitted {
if !b.eventContractAccount.Equal(&e.Contract) {
continue
}

// Identify the event by matching one of its topics against known signatures. The topics are sorted so
// the needed one may be in the arbitrary position.
var dispatchEntry sdktypes.ContractEventDispatchEntry
found := false
for _, topic := range e.Topics {
dispatchEntry, found = b.eventDispatcher[topic]
if found {
break
}
}
if !found {

log.WithField("block", blockHash.Hex()).
Warnf("Unknown event emitted by our contract: %x", e.Data[:uint32(math.Min(16, float64(len(e.Data))))])
continue
}

if dispatchEntry.Handler == nil {
log.WithField("block", blockHash.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debug("Event unhandeled")
continue
}
args := reflect.New(dispatchEntry.ArgumentType).Interface()
if err := codec.Decode(e.Data[1:], args); err != nil {
log.WithError(err).WithField("block", blockHash.Hex()).
WithField("event", dispatchEntry.ArgumentType.Name()).
Errorf("Cannot decode event data %x", e.Data)
}
log.WithField("block", blockHash.Hex()).WithField("event", dispatchEntry.ArgumentType.Name()).
Debugf("Event args: %x", e.Data)
dispatchEntry.Handler(args)
}
}

func (b *blockchainClient) CallToReadEncoded(contractAddressSS58 string, fromAddress string, method []byte, args ...interface{}) (string, error) {
Expand Down Expand Up @@ -410,9 +456,8 @@ func (b *blockchainClient) reconnect() error {
return nil
}

if b.eventContextCancel != nil {
b.eventContextCancel()
}
b.unsubscribeAll()

substrateAPI, err := gsrpc.NewSubstrateAPI(b.Client.URL())
if err != nil {
log.WithError(err).Warningf("Blockchain client can't reconnect to %s", b.Client.URL())
Expand All @@ -428,3 +473,10 @@ func (b *blockchainClient) reconnect() error {

return nil
}

func (b *blockchainClient) unsubscribeAll() {
for _, c := range b.eventContextCancel {
c()
}
b.eventContextCancel = nil
}
Loading

0 comments on commit 6b16a2f

Please sign in to comment.