From dec2cf076c1d33720f9ddccedda3a7e196561d27 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 27 Jun 2023 02:27:03 +0300 Subject: [PATCH] ir: Reload internals on connection switch Update cached side chain configurations, reset timers and audit tasks queue. Signed-off-by: Pavel Karpy --- pkg/innerring/innerring.go | 85 +++++++++++++++++++++++++-------- pkg/util/precision/converter.go | 2 +- 2 files changed, 66 insertions(+), 21 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 6ad17ac6e8c..4e4f6363399 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -73,17 +73,19 @@ type ( blockTimers []*timer.BlockTimer epochTimer *timer.BlockTimer + morphClient *client.Client + mainnetClient *client.Client + auditClient *auditClient.Client + balanceClient *balanceClient.Client + netmapClient *nmClient.Client + auditTaskManager *audittask.Manager + // global state - morphClient *client.Client - mainnetClient *client.Client epochCounter atomic.Uint64 epochDuration atomic.Uint64 statusIndex *innerRingIndexer precision precision.Fixed8Converter - auditClient *auditClient.Client healthStatus atomic.Value - balanceClient *balanceClient.Client - netmapClient *nmClient.Client persistate *state.PersistentStorage // metrics @@ -98,7 +100,7 @@ type ( pubKey []byte contracts *contracts predefinedValidators keys.PublicKeys - initialEpochTickDelta uint32 + initialEpochTickDelta atomic.Uint32 withoutMainNet bool // runtime processors @@ -211,7 +213,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { // tick initial epoch initialEpochTicker := timer.NewOneTickTimer( - timer.StaticBlockMeter(s.initialEpochTickDelta), + func() (uint32, error) { + return s.initialEpochTickDelta.Load(), nil + }, func() { s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) }) @@ -438,7 +442,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan server.key = acc.PrivateKey() morphChain.key = server.key - server.morphClient, err = createClient(ctx, morphChain, errChan) + server.morphClient, err = server.createClient(ctx, morphChain, errChan) if err != nil { return nil, err } @@ -474,7 +478,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan mainnetChain.from = fromMainChainBlock // create mainnet client - server.mainnetClient, err = createClient(ctx, mainnetChain, errChan) + server.mainnetClient, err = server.createClient(ctx, mainnetChain, errChan) if err != nil { return nil, err } @@ -616,7 +620,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan porPoolSize := cfg.GetInt("audit.por.pool_size") // create audit processor dependencies - auditTaskManager := audittask.New( + server.auditTaskManager = audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), audittask.WithLogger(log), @@ -630,7 +634,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan }), ) - server.workers = append(server.workers, auditTaskManager.Listen) + server.workers = append(server.workers, server.auditTaskManager.Listen) // create audit processor auditProcessor, err := audit.New(&audit.Params{ @@ -642,7 +646,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan SGSource: clientCache, Key: &server.key.PrivateKey, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), - TaskManager: auditTaskManager, + TaskManager: server.auditTaskManager, Reporter: server, }) if err != nil { @@ -1011,8 +1015,9 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev return listener, err } -func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { - endpoints := p.cfg.GetStringSlice(p.name + ".endpoints") +func (s *Server) createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { + name := p.name + endpoints := p.cfg.GetStringSlice(name + ".endpoints") if len(endpoints) == 0 { return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) } @@ -1026,6 +1031,18 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c client.WithEndpoints(endpoints), client.WithReconnectionRetries(p.cfg.GetInt(p.name+".reconnections_number")), client.WithReconnectionsDelay(p.cfg.GetDuration(p.name+".reconnections_delay")), + client.WithConnSwitchCallback(func() { + var err error + + if name == morphPrefix { + err = s.restartMorph() + } else { + err = s.restartMainChain() + } + if err != nil { + errChan <- fmt.Errorf("RPC connection switch to %s: %w", name, err) + } + }), client.WithConnLostCallback(func() { errChan <- fmt.Errorf("%s chain connection has been lost", p.name) }), @@ -1076,22 +1093,23 @@ func (s *Server) initConfigFromBlockchain() error { return fmt.Errorf("can't read balance contract precision: %w", err) } - s.epochCounter.Store(epoch) - s.epochDuration.Store(epochDuration) - s.precision.SetBalancePrecision(balancePrecision) - // get next epoch delta tick - s.initialEpochTickDelta, err = s.nextEpochBlockDelta() + delta, err := s.nextEpochBlockDelta() if err != nil { return err } + s.epochCounter.Store(epoch) + s.epochDuration.Store(epochDuration) + s.precision.SetBalancePrecision(balancePrecision) + s.initialEpochTickDelta.Store(delta) + s.log.Debug("read config from blockchain", zap.Bool("active", s.IsActive()), zap.Bool("alphabet", s.IsAlphabet()), zap.Uint64("epoch", epoch), zap.Uint32("precision", balancePrecision), - zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta), + zap.Uint32("init_epoch_tick_delta", delta), ) return nil @@ -1145,3 +1163,30 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler { return newEpochHandlers } + +func (s *Server) restartMorph() error { + s.log.Info("restarting internals because of RPC connection loss...") + + s.auditTaskManager.Reset() + s.statusIndex.reset() + + err := s.initConfigFromBlockchain() + if err != nil { + return fmt.Errorf("side chain config reinitialization: %w", err) + } + + for _, t := range s.blockTimers { + err = t.Reset() + if err != nil { + return fmt.Errorf("could not reset block timers: %w", err) + } + } + + s.log.Info("internals have been restarted after RPC connection loss...") + + return nil +} + +func (s *Server) restartMainChain() error { + return nil +} diff --git a/pkg/util/precision/converter.go b/pkg/util/precision/converter.go index d904c797424..e90000d2d2e 100644 --- a/pkg/util/precision/converter.go +++ b/pkg/util/precision/converter.go @@ -22,7 +22,7 @@ type ( // This is a JSON bound that uses neo node. Neo-go has int64 limit for // `smartcontract.Parameter` of integer type. Fixed8Converter struct { - m *sync.RWMutex + m sync.RWMutex converter } )