diff --git a/protocol/app/app.go b/protocol/app/app.go index 54df039796..3e5a135fbe 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -2024,12 +2024,16 @@ func getFullNodeStreamingManagerFromOptions( logger log.Logger, ) (manager streamingtypes.FullNodeStreamingManager) { if appFlags.GrpcStreamingEnabled { - logger.Info("GRPC streaming is enabled") + logger.Info("GRPC streaming is enabled", log.ModuleKey, "full-node-streaming") + if appFlags.FullNodeStreamingSnapshotInterval > 0 { + logger.Info("Interval snapshots enabled", log.ModuleKey, "full-node-streaming") + } return streaming.NewFullNodeStreamingManager( logger, appFlags.GrpcStreamingFlushIntervalMs, appFlags.GrpcStreamingMaxBatchSize, appFlags.GrpcStreamingMaxChannelBufferSize, + appFlags.FullNodeStreamingSnapshotInterval, ) } return streaming.NewNoopGrpcStreamingManager() diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index bdbe34a514..41b13ae826 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -25,6 +25,7 @@ type Flags struct { GrpcStreamingFlushIntervalMs uint32 GrpcStreamingMaxBatchSize uint32 GrpcStreamingMaxChannelBufferSize uint32 + FullNodeStreamingSnapshotInterval uint32 VEOracleEnabled bool // Slinky Vote Extensions // Optimistic block execution @@ -47,6 +48,7 @@ const ( GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms" GrpcStreamingMaxBatchSize = "grpc-streaming-max-batch-size" GrpcStreamingMaxChannelBufferSize = "grpc-streaming-max-channel-buffer-size" + FullNodeStreamingSnapshotInterval = "fns-snapshot-interval" // Slinky VEs enabled VEOracleEnabled = "slinky-vote-extension-oracle-enabled" @@ -66,6 +68,7 @@ const ( DefaultGrpcStreamingFlushIntervalMs = 50 DefaultGrpcStreamingMaxBatchSize = 2000 DefaultGrpcStreamingMaxChannelBufferSize = 2000 + DefaultFullNodeStreamingSnapshotInterval = 0 DefaultVEOracleEnabled = true DefaultOptimisticExecutionEnabled = false @@ -117,6 +120,12 @@ func AddFlagsToCmd(cmd *cobra.Command) { DefaultGrpcStreamingMaxChannelBufferSize, "Maximum per-subscription channel size before grpc streaming cancels a singular subscription", ) + cmd.Flags().Uint32( + FullNodeStreamingSnapshotInterval, + DefaultFullNodeStreamingSnapshotInterval, + "If set to positive number, number of blocks between each periodic snapshot will be sent out. "+ + "Defaults to zero for regular behavior of one initial snapshot.", + ) cmd.Flags().Bool( VEOracleEnabled, DefaultVEOracleEnabled, @@ -155,6 +164,10 @@ func (f *Flags) Validate() error { return fmt.Errorf("grpc streaming channel size must be positive number") } } + if f.FullNodeStreamingSnapshotInterval > 0 && f.FullNodeStreamingSnapshotInterval < 50 { + return fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero") + } + return nil } @@ -178,6 +191,7 @@ func GetFlagValuesFromOptions( GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs, GrpcStreamingMaxBatchSize: DefaultGrpcStreamingMaxBatchSize, GrpcStreamingMaxChannelBufferSize: DefaultGrpcStreamingMaxChannelBufferSize, + FullNodeStreamingSnapshotInterval: DefaultFullNodeStreamingSnapshotInterval, VEOracleEnabled: true, OptimisticExecutionEnabled: DefaultOptimisticExecutionEnabled, @@ -244,6 +258,12 @@ func GetFlagValuesFromOptions( } } + if option := appOpts.Get(FullNodeStreamingSnapshotInterval); option != nil { + if v, err := cast.ToUint32E(option); err == nil { + result.FullNodeStreamingSnapshotInterval = v + } + } + if option := appOpts.Get(VEOracleEnabled); option != nil { if v, err := cast.ToBoolE(option); err == nil { result.VEOracleEnabled = v diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index 8260efe313..84b10ef93e 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -38,6 +38,9 @@ func TestAddFlagsToCommand(t *testing.T) { fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBatchSize): { flagName: flags.GrpcStreamingMaxBatchSize, }, + fmt.Sprintf("Has %s flag", flags.FullNodeStreamingSnapshotInterval): { + flagName: flags.FullNodeStreamingSnapshotInterval, + }, fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxChannelBufferSize): { flagName: flags.GrpcStreamingMaxChannelBufferSize, }, @@ -60,12 +63,13 @@ func TestValidate(t *testing.T) { }{ "success (default values)": { flags: flags.Flags{ - NonValidatingFullNode: flags.DefaultNonValidatingFullNode, - DdAgentHost: flags.DefaultDdAgentHost, - DdTraceAgentPort: flags.DefaultDdTraceAgentPort, - GrpcAddress: config.DefaultGRPCAddress, - GrpcEnable: true, - OptimisticExecutionEnabled: false, + NonValidatingFullNode: flags.DefaultNonValidatingFullNode, + DdAgentHost: flags.DefaultDdAgentHost, + DdTraceAgentPort: flags.DefaultDdTraceAgentPort, + GrpcAddress: config.DefaultGRPCAddress, + GrpcEnable: true, + FullNodeStreamingSnapshotInterval: flags.DefaultFullNodeStreamingSnapshotInterval, + OptimisticExecutionEnabled: false, }, }, "success - full node & gRPC disabled": { @@ -145,6 +149,29 @@ func TestValidate(t *testing.T) { }, expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"), }, + "failure - full node streaming enabled with <= 49 snapshot interval": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingMaxChannelBufferSize: 2000, + FullNodeStreamingSnapshotInterval: 49, + }, + expectedErr: fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero"), + }, + "success - full node streaming enabled with 50 snapshot interval": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingMaxChannelBufferSize: 2000, + FullNodeStreamingSnapshotInterval: 50, + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -173,6 +200,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcStreamingFlushMs uint32 expectedGrpcStreamingBatchSize uint32 expectedGrpcStreamingMaxChannelBufferSize uint32 + expectedFullNodeStreamingSnapshotInterval uint32 expectedOptimisticExecutionEnabled bool }{ "Sets to default if unset": { @@ -185,6 +213,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcStreamingFlushMs: 50, expectedGrpcStreamingBatchSize: 2000, expectedGrpcStreamingMaxChannelBufferSize: 2000, + expectedFullNodeStreamingSnapshotInterval: 0, expectedOptimisticExecutionEnabled: false, }, "Sets values from options": { @@ -198,6 +227,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { flags.GrpcStreamingFlushIntervalMs: uint32(408), flags.GrpcStreamingMaxBatchSize: uint32(650), flags.GrpcStreamingMaxChannelBufferSize: uint32(972), + flags.FullNodeStreamingSnapshotInterval: uint32(123), flags.OptimisticExecutionEnabled: "true", }, expectedNonValidatingFullNodeFlag: true, @@ -209,6 +239,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcStreamingFlushMs: 408, expectedGrpcStreamingBatchSize: 650, expectedGrpcStreamingMaxChannelBufferSize: 972, + expectedFullNodeStreamingSnapshotInterval: 123, expectedOptimisticExecutionEnabled: true, }, } @@ -262,6 +293,11 @@ func TestGetFlagValuesFromOptions(t *testing.T) { tc.expectedGrpcStreamingBatchSize, flags.GrpcStreamingMaxBatchSize, ) + require.Equal( + t, + tc.expectedFullNodeStreamingSnapshotInterval, + flags.FullNodeStreamingSnapshotInterval, + ) require.Equal( t, tc.expectedGrpcStreamingMaxChannelBufferSize, diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index c53b76e2c0..12bc38cfed 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -2,10 +2,11 @@ package streaming import ( "fmt" - satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "sync" "time" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" @@ -44,6 +45,10 @@ type FullNodeStreamingManagerImpl struct { maxUpdatesInCache uint32 maxSubscriptionChannelSize uint32 + + // Block interval in which snapshot info should be sent out in. + // Defaults to 0, which means only one snapshot will be sent out. + snapshotBlockInterval uint32 } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -51,7 +56,7 @@ type OrderbookSubscription struct { subscriptionId uint32 // Initialize the subscription with orderbook snapshots. - initialize sync.Once + initialize *sync.Once // Clob pair ids to subscribe to. clobPairIds []uint32 @@ -64,6 +69,10 @@ type OrderbookSubscription struct { // Channel to buffer writes before the stream updatesChannel chan []clobtypes.StreamUpdate + + // If interval snapshots are turned on, the next block height at which + // a snapshot should be sent out. + nextSnapshotBlock uint32 } func NewFullNodeStreamingManager( @@ -71,6 +80,7 @@ func NewFullNodeStreamingManager( flushIntervalMs uint32, maxUpdatesInCache uint32, maxSubscriptionChannelSize uint32, + snapshotBlockInterval uint32, ) *FullNodeStreamingManagerImpl { logger = logger.With(log.ModuleKey, "full-node-streaming") fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ @@ -87,6 +97,7 @@ func NewFullNodeStreamingManager( maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, + snapshotBlockInterval: snapshotBlockInterval, } // Start the goroutine for pushing order updates through. @@ -149,6 +160,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } subscription := &OrderbookSubscription{ subscriptionId: sm.nextSubscriptionId, + initialize: &sync.Once{}, clobPairIds: clobPairIds, subaccountIds: sIds, messageSender: messageSender, @@ -674,7 +686,15 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( sm.FlushStreamUpdatesWithLock() updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates) + for subscriptionId, subscription := range sm.orderbookSubscriptions { + // If the snapshot block interval is enabled, reset the sync.Once in order to + // re-send snapshots out. + if sm.snapshotBlockInterval > 0 && + blockHeight == subscription.nextSnapshotBlock { + subscription.initialize = &sync.Once{} + } + subscription.initialize.Do( func() { allUpdates := clobtypes.NewOffchainUpdates() @@ -688,8 +708,10 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( for _, subaccountId := range subscription.subaccountIds { saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId)) } - sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode) + if sm.snapshotBlockInterval != 0 { + subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval + } }, ) }