Skip to content

Commit

Permalink
Added 'live' mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxwell Dulin authored and Maxwell Dulin committed Sep 26, 2024
1 parent 73b795c commit 6fac2f9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 49 deletions.
83 changes: 36 additions & 47 deletions node/cmd/transfer-verifier/transfer-verifier-sui.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package transferverifier
import (
// "bytes"
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
"fmt"
Expand All @@ -27,7 +26,6 @@ import (
ipfslog "github.com/ipfs/go-log/v2"
"github.com/spf13/cobra"
"go.uber.org/zap"
"nhooyr.io/websocket"

"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
Expand Down Expand Up @@ -348,7 +346,7 @@ func init() {
// envStr = TransferVerifierCmd.Flags().String("env", "", `environment (may be "testnet" or "mainnet")`)

// TODO - fix the flag handling
suiRPC = *TransferVerifierCmdSui.Flags().String("suiRPC", "https://rpc.ankr.com/sui/22fe735acb187df41c2e84b758d081aa48b31e69cce2dee73951b5bbfb88b403", "Sui RPC url")
suiRPC = *TransferVerifierCmdSui.Flags().String("suiRPC", "<RPC HERE>", "Sui RPC url")
logLevel = TransferVerifierCmdSui.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
suiCoreContract = *TransferVerifierCmdSui.Flags().String("suiCoreContract", "0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a", "Event to listen to in Sui")
suiTokenBridgeEmitter = *TransferVerifierCmdSui.Flags().String("suiTokenBridgeEmitter", "0xccceeb29348f71bdd22ffef43a2a19c1f5b5e17c5cca5411529120182672ade5", "Token bridge emitter on Sui. Tied to the token bridge package.")
Expand Down Expand Up @@ -381,64 +379,55 @@ func runTransferVerifierSui(cmd *cobra.Command, args []string) {
//processDigest(*logger)

// Process ALL of the incoming ones
processAllEvents(*logger)
//processEventsLive(*logger)
//processAllEvents(*logger)
processEventsLive(*logger)
}

// https://github.com/wormhole-foundation/wormhole/blob/e297d96d101857f98e0fbba10168b6dc7b55d9c0/node/pkg/watchers/sui/watcher.go#L449
func processEventsLive(logger zap.Logger) {
nBig, _ := rand.Int(rand.Reader, big.NewInt(27))
subId := nBig.Int64()
cursor := "null"
prevFirstDigest := ""

ctx := context.Background()
ws, _, err := websocket.Dial(ctx, "wss://rpc.ankr.com/sui/ws/22fe735acb187df41c2e84b758d081aa48b31e69cce2dee73951b5bbfb88b403", nil)
if err != nil {
logger.Error("couldn't connect to websocket Sui")
return
}
defer ws.Close(websocket.StatusNormalClosure, "")
for true {
time.Sleep(10 * time.Second) // Sleep a little bit to let things get processed

subscription := fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "suix_subscribeEvent", "params": {"All" : [{"MoveEventType": "%s"}, {"Package":"0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a"},]}}`, subId, suiMoveEventType)
queryEventsCmd := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "suix_queryEvents", "params": [{ "MoveEventType": "%s" }, %s, %d, %t]}`,
suiMoveEventType, cursor, 10, true)

err = ws.Write(ctx, websocket.MessageText, []byte(subscription))
if err != nil {
logger.Error("couldn't create subscription with websocket")
}
res, err := suiQueryEvents(suiRPC, queryEventsCmd)

_, p, err := ws.Read(ctx)
if err != nil {
logger.Error("Failed to read websocket response to event subscription", zap.Error(err))
}
if err != nil {
logger.Error(fmt.Sprintf("suiQueryEvents failed: %s", err))
return
}
//cursor = fmt.Sprintf(`{"txDigest":"%s", "eventSeq":"%s"}`, res.Result.NextCursor.TxDigest, res.Result.NextCursor.EventSeq)

var subRes map[string]any
err = json.Unmarshal(p, &subRes)
if err != nil {
logger.Error("Failed to unmarshal req in subscription request", zap.Error(err))
return
}
logger.Debug("Unmarshalled json", zap.Any("subRes", subRes))
actualResult := subRes["result"]
logger.Debug("actualResult", zap.Any("res", actualResult))
if len(res.Result.Data) == 0 { // Empty query
continue
}

if actualResult == nil {
logger.Error("Failed to request filter in subscription request", zap.Error(err))
return
}
txDigestCurrent := res.Result.Data[0].ID.TxDigest
if prevFirstDigest == *txDigestCurrent { // No new data
logger.Info(fmt.Sprintf("No new events for hash %s", *txDigestCurrent))
continue
}

for {
select {
case <-ctx.Done():
logger.Error("sui_data_pump context done")
entries := res.Result.Data

default:
_, msg, err := ws.Read(ctx)
//var res SuiEventMsg
for _, entry := range entries {

if prevFirstDigest == *entry.ID.TxDigest { // Already seen the TX. Don't need to process again. Should be sequential so we can leave this loop.
break
}
fmt.Println("============================================")
logger.Info("", zap.String("Hash", *entry.ID.TxDigest))
err = processIncomingEvent(*entry.ID.TxDigest, logger)
if err != nil {
continue
logger.Error(fmt.Sprintf("Unable to process event: %s", err.Error()))
}
//err = json.Unmarshal(msg, &res)

fmt.Println(string(msg[:]))
}

prevFirstDigest = *txDigestCurrent
}
}

Expand Down
3 changes: 1 addition & 2 deletions scripts/sui-transfer-verifier.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ TOKEN_BRIDGE_CONTRACT="0x26efee2b51c911237888e5dc6702868abca3c7ac12c53f76ef8eba0

TOKEN_BRIDGE_EMITTER="0xccceeb29348f71bdd22ffef43a2a19c1f5b5e17c5cca5411529120182672ade5"

# RPC="${ALCHEMY_RPC}"
RPC=https://rpc.ankr.com/sui/22fe735acb187df41c2e84b758d081aa48b31e69cce2dee73951b5bbfb88b403
RPC=<RPC_HERE>

LOG_LEVEL="info"

Expand Down

0 comments on commit 6fac2f9

Please sign in to comment.