diff --git a/CHANGELOG.md b/CHANGELOG.md index d631981caf..ef825fcb30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changelog for NeoFS Node - `renew-domain` command for adm ### Fixed +- `neo-go` RPC connection lost handling by IR (#1337) ### Removed - Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400) diff --git a/pkg/innerring/indexer.go b/pkg/innerring/indexer.go index ac5fb93efb..78c50ca29e 100644 --- a/pkg/innerring/indexer.go +++ b/pkg/innerring/indexer.go @@ -82,6 +82,16 @@ func (s *innerRingIndexer) update() (ind indexes, err error) { return s.ind, nil } +func (s *innerRingIndexer) reset() { + s.Lock() + defer s.Unlock() + + // zero time, every real time is expected to + // be _much later_ after that time; `update` + // will be forced to make RPC calls + s.lastAccess = time.Time{} +} + func (s *innerRingIndexer) InnerRingIndex() (int32, error) { ind, err := s.update() if err != nil { diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index c77ba34423..5ff0ee3c20 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -71,17 +71,20 @@ type ( blockTimers []*timer.BlockTimer epochTimer *timer.BlockTimer - // global state morphClient *client.Client mainnetClient *client.Client + auditClient *auditClient.Client + balanceClient *balanceClient.Client + netmapClient *nmClient.Client + + auditTaskManager *audittask.Manager + + // global state epochCounter atomic.Uint64 epochDuration atomic.Uint64 statusIndex *innerRingIndexer - precision precision.Fixed8Converter - auditClient *auditClient.Client + precision uint32 // not changeable healthStatus atomic.Value - balanceClient *balanceClient.Client - netmapClient *nmClient.Client persistate *state.PersistentStorage // metrics @@ -96,7 +99,7 @@ type ( pubKey []byte contracts *contracts predefinedValidators keys.PublicKeys - initialEpochTickDelta uint32 + initialEpochTickDelta atomic.Uint32 withoutMainNet bool // runtime processors @@ -207,7 +210,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { // tick initial epoch initialEpochTicker := timer.NewOneTickTimer( - timer.StaticBlockMeter(s.initialEpochTickDelta), + func() (uint32, error) { + return s.initialEpochTickDelta.Load(), nil + }, func() { s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) }) @@ -339,7 +344,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) } - morphChain := &chainParams{ + morphChain := chainParams{ log: log, cfg: cfg, name: morphPrefix, @@ -434,7 +439,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan server.key = acc.PrivateKey() morphChain.key = server.key - server.morphClient, err = createClient(ctx, morphChain, errChan) + server.morphClient, err = server.createClient(ctx, morphChain, errChan) if err != nil { return nil, err } @@ -470,7 +475,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan mainnetChain.from = fromMainChainBlock // create mainnet client - server.mainnetClient, err = createClient(ctx, mainnetChain, errChan) + server.mainnetClient, err = server.createClient(ctx, mainnetChain, errChan) if err != nil { return nil, err } @@ -535,13 +540,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } - // form morph container client's options - morphCnrOpts := make([]cntClient.Option, 0, 3) - morphCnrOpts = append(morphCnrOpts, - cntClient.AsAlphabet(), - ) - - cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, 0, morphCnrOpts...) + cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, 0, cntClient.AsAlphabet()) if err != nil { return nil, err } @@ -556,6 +555,11 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } + server.precision, err = server.balanceClient.Decimals() + if err != nil { + return nil, fmt.Errorf("can't read balance contract precision: %w", err) + } + repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, 0, repClient.AsAlphabet()) if err != nil { return nil, err @@ -604,7 +608,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan porPoolSize := cfg.GetInt("audit.por.pool_size") // create audit processor dependencies - auditTaskManager := audittask.New( + server.auditTaskManager = audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), audittask.WithLogger(log), @@ -618,7 +622,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan }), ) - server.workers = append(server.workers, auditTaskManager.Listen) + server.workers = append(server.workers, server.auditTaskManager.Listen) // create audit processor auditProcessor, err := audit.New(&audit.Params{ @@ -630,7 +634,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan SGSource: clientCache, Key: &server.key.PrivateKey, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), - TaskManager: auditTaskManager, + TaskManager: server.auditTaskManager, Reporter: server, }) if err != nil { @@ -776,6 +780,8 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } + precisionConverter := precision.NewConverter(server.precision) + // create balance processor balanceProcessor, err := balance.New(&balance.Params{ Log: log, @@ -783,7 +789,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan NeoFSClient: neofsCli, BalanceSC: server.contracts.balance, AlphabetState: server, - Converter: &server.precision, + Converter: precisionConverter, }) if err != nil { return nil, err @@ -806,7 +812,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan MorphClient: server.morphClient, EpochState: server, AlphabetState: server, - Converter: &server.precision, + Converter: precisionConverter, MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), @@ -947,7 +953,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return server, nil } -func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { +func createListener(ctx context.Context, cli *client.Client, p chainParams) (event.Listener, error) { // listenerPoolCap is a capacity of a // worker pool inside the listener. It // is used to prevent blocking in neo-go: @@ -982,7 +988,7 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev return listener, err } -func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { +func (s *Server) createClient(ctx context.Context, p chainParams, errChan chan<- error) (*client.Client, error) { endpoints := p.cfg.GetStringSlice(p.name + ".endpoints") if len(endpoints) == 0 { return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) @@ -997,6 +1003,18 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c client.WithEndpoints(endpoints), client.WithReconnectionRetries(p.cfg.GetInt(p.name+".reconnections_number")), client.WithReconnectionsDelay(p.cfg.GetDuration(p.name+".reconnections_delay")), + client.WithConnSwitchCallback(func() { + var err error + + if p.name == morphPrefix { + err = s.restartMorph() + } else { + err = s.restartMainChain() + } + if err != nil { + errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", p.name, err) + } + }), client.WithConnLostCallback(func() { errChan <- fmt.Errorf("%s chain connection has been lost", p.name) }), @@ -1041,28 +1059,22 @@ func (s *Server) initConfigFromBlockchain() error { return fmt.Errorf("can't read epoch duration: %w", err) } - // get balance precision - balancePrecision, err := s.balanceClient.Decimals() + // get next epoch delta tick + delta, err := s.nextEpochBlockDelta() if err != nil { - return fmt.Errorf("can't read balance contract precision: %w", err) + return err } s.epochCounter.Store(epoch) s.epochDuration.Store(epochDuration) - s.precision.SetBalancePrecision(balancePrecision) - - // get next epoch delta tick - s.initialEpochTickDelta, err = s.nextEpochBlockDelta() - if err != nil { - return err - } + s.initialEpochTickDelta.Store(delta) s.log.Debug("read config from blockchain", zap.Bool("active", s.IsActive()), zap.Bool("alphabet", s.IsAlphabet()), zap.Uint64("epoch", epoch), - zap.Uint32("precision", balancePrecision), - zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta), + zap.Uint32("precision", s.precision), + zap.Uint32("init_epoch_tick_delta", delta), ) return nil @@ -1116,3 +1128,30 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler { return newEpochHandlers } + +func (s *Server) restartMorph() error { + s.log.Info("restarting internal services because of RPC connection loss...") + + s.auditTaskManager.Reset() + s.statusIndex.reset() + + err := s.initConfigFromBlockchain() + if err != nil { + return fmt.Errorf("side chain config reinitialization: %w", err) + } + + for _, t := range s.blockTimers { + err = t.Reset() + if err != nil { + return fmt.Errorf("could not reset block timers: %w", err) + } + } + + s.log.Info("internal services have been restarted after RPC connection loss...") + + return nil +} + +func (s *Server) restartMainChain() error { + return nil +} diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 3b5919adcb..03638a17dd 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -42,6 +42,7 @@ type cfg struct { singleCli *rpcclient.WSClient // neo-go client for single client mode inactiveModeCb Callback + rpcSwitchCb Callback reconnectionRetries int reconnectionDelay time.Duration @@ -297,3 +298,13 @@ func WithConnLostCallback(cb Callback) Option { c.inactiveModeCb = cb } } + +// WithConnSwitchCallback returns a client constructor option +// that specifies a callback that is called when the Client +// reconnected to a new RPC (from [WithEndpoints] list) +// successfully. +func WithConnSwitchCallback(cb Callback) Option { + return func(c *cfg) { + c.rpcSwitchCb = cb + } +} diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 1b5c21a56c..8b7b7aa155 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -15,21 +15,28 @@ type Endpoint struct { // SwitchRPC performs reconnection and returns true if it was successful. func (c *Client) SwitchRPC() bool { c.switchLock.Lock() - defer c.switchLock.Unlock() for attempt := 0; attempt < c.cfg.reconnectionRetries; attempt++ { if c.switchPRC() { + c.switchLock.Unlock() + + if c.cfg.rpcSwitchCb != nil { + c.cfg.rpcSwitchCb() + } + return true } select { case <-time.After(c.cfg.reconnectionDelay): case <-c.closeChan: + c.switchLock.Unlock() return false } } c.inactive = true + c.switchLock.Unlock() if c.cfg.inactiveModeCb != nil { c.cfg.inactiveModeCb() diff --git a/pkg/util/precision/converter.go b/pkg/util/precision/converter.go index bd2948f0a4..a7b80468ef 100644 --- a/pkg/util/precision/converter.go +++ b/pkg/util/precision/converter.go @@ -39,11 +39,18 @@ func convert(n, factor *big.Int, decreasePrecision bool) *big.Int { // NewConverter returns Fixed8Converter. func NewConverter(precision uint32) Fixed8Converter { - var c Fixed8Converter - - c.SetBalancePrecision(precision) + exp := int(precision) - fixed8Precision + if exp < 0 { + exp = -exp + } - return c + return Fixed8Converter{ + converter: converter{ + base: fixed8Precision, + target: precision, + factor: new(big.Int).SetInt64(int64(math.Pow10(exp))), + }, + } } func (c converter) toTarget(n *big.Int) *big.Int { @@ -64,18 +71,6 @@ func (c Fixed8Converter) ToBalancePrecision(n int64) int64 { return c.toTarget(new(big.Int).SetInt64(n)).Int64() } -// SetBalancePrecision prepares converter to work. -func (c *Fixed8Converter) SetBalancePrecision(precision uint32) { - exp := int(precision) - fixed8Precision - if exp < 0 { - exp = -exp - } - - c.base = fixed8Precision - c.target = precision - c.factor = new(big.Int).SetInt64(int64(math.Pow10(exp))) -} - // Convert is a wrapper of convert function. Use cached `converter` struct // if fromPrecision and toPrecision are constant. func Convert(fromPrecision, toPrecision uint32, n *big.Int) *big.Int {