Skip to content

Commit

Permalink
Restart IR services after neo-go is down (#2406)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider committed Jul 3, 2023
2 parents 27dc0a6 + 8bbbec0 commit c408b57
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Changelog for NeoFS Node
- `renew-domain` command for adm

### Fixed
- `neo-go` RPC connection lost handling by IR (#1337)

### Removed
- Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400)
Expand Down
10 changes: 10 additions & 0 deletions pkg/innerring/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ func (s *innerRingIndexer) update() (ind indexes, err error) {
return s.ind, nil
}

func (s *innerRingIndexer) reset() {
s.Lock()
defer s.Unlock()

// zero time, every real time is expected to
// be _much later_ after that time; `update`
// will be forced to make RPC calls
s.lastAccess = time.Time{}
}

func (s *innerRingIndexer) InnerRingIndex() (int32, error) {
ind, err := s.update()
if err != nil {
Expand Down
111 changes: 75 additions & 36 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,20 @@ type (
blockTimers []*timer.BlockTimer
epochTimer *timer.BlockTimer

// global state
morphClient *client.Client
mainnetClient *client.Client
auditClient *auditClient.Client
balanceClient *balanceClient.Client
netmapClient *nmClient.Client

auditTaskManager *audittask.Manager

// global state
epochCounter atomic.Uint64
epochDuration atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
auditClient *auditClient.Client
precision uint32 // not changeable
healthStatus atomic.Value
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
persistate *state.PersistentStorage

// metrics
Expand All @@ -96,7 +99,7 @@ type (
pubKey []byte
contracts *contracts
predefinedValidators keys.PublicKeys
initialEpochTickDelta uint32
initialEpochTickDelta atomic.Uint32
withoutMainNet bool

// runtime processors
Expand Down Expand Up @@ -207,7 +210,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {

// tick initial epoch
initialEpochTicker := timer.NewOneTickTimer(
timer.StaticBlockMeter(s.initialEpochTickDelta),
func() (uint32, error) {
return s.initialEpochTickDelta.Load(), nil
},
func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
})
Expand Down Expand Up @@ -339,7 +344,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
}

morphChain := &chainParams{
morphChain := chainParams{
log: log,
cfg: cfg,
name: morphPrefix,
Expand Down Expand Up @@ -434,7 +439,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
server.key = acc.PrivateKey()
morphChain.key = server.key

server.morphClient, err = createClient(ctx, morphChain, errChan)
server.morphClient, err = server.createClient(ctx, morphChain, errChan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -470,7 +475,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
mainnetChain.from = fromMainChainBlock

// create mainnet client
server.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
server.mainnetClient, err = server.createClient(ctx, mainnetChain, errChan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -535,13 +540,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err
}

// form morph container client's options
morphCnrOpts := make([]cntClient.Option, 0, 3)
morphCnrOpts = append(morphCnrOpts,
cntClient.AsAlphabet(),
)

cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, 0, morphCnrOpts...)
cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, 0, cntClient.AsAlphabet())
if err != nil {
return nil, err
}
Expand All @@ -556,6 +555,11 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err
}

server.precision, err = server.balanceClient.Decimals()
if err != nil {
return nil, fmt.Errorf("can't read balance contract precision: %w", err)
}

repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, 0, repClient.AsAlphabet())
if err != nil {
return nil, err
Expand Down Expand Up @@ -604,7 +608,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
porPoolSize := cfg.GetInt("audit.por.pool_size")

// create audit processor dependencies
auditTaskManager := audittask.New(
server.auditTaskManager = audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
audittask.WithWorkerPool(auditPool),
audittask.WithLogger(log),
Expand All @@ -618,7 +622,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
}),
)

server.workers = append(server.workers, auditTaskManager.Listen)
server.workers = append(server.workers, server.auditTaskManager.Listen)

// create audit processor
auditProcessor, err := audit.New(&audit.Params{
Expand All @@ -630,7 +634,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
SGSource: clientCache,
Key: &server.key.PrivateKey,
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
TaskManager: auditTaskManager,
TaskManager: server.auditTaskManager,
Reporter: server,
})
if err != nil {
Expand Down Expand Up @@ -776,14 +780,16 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return nil, err
}

precisionConverter := precision.NewConverter(server.precision)

// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: log,
PoolSize: cfg.GetInt("workers.balance"),
NeoFSClient: neofsCli,
BalanceSC: server.contracts.balance,
AlphabetState: server,
Converter: &server.precision,
Converter: precisionConverter,
})
if err != nil {
return nil, err
Expand All @@ -806,7 +812,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
MorphClient: server.morphClient,
EpochState: server,
AlphabetState: server,
Converter: &server.precision,
Converter: precisionConverter,
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
Expand Down Expand Up @@ -947,7 +953,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
return server, nil
}

func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
func createListener(ctx context.Context, cli *client.Client, p chainParams) (event.Listener, error) {
// listenerPoolCap is a capacity of a
// worker pool inside the listener. It
// is used to prevent blocking in neo-go:
Expand Down Expand Up @@ -982,7 +988,7 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev
return listener, err
}

func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) {
func (s *Server) createClient(ctx context.Context, p chainParams, errChan chan<- error) (*client.Client, error) {
endpoints := p.cfg.GetStringSlice(p.name + ".endpoints")
if len(endpoints) == 0 {
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
Expand All @@ -997,6 +1003,18 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
client.WithEndpoints(endpoints),
client.WithReconnectionRetries(p.cfg.GetInt(p.name+".reconnections_number")),
client.WithReconnectionsDelay(p.cfg.GetDuration(p.name+".reconnections_delay")),
client.WithConnSwitchCallback(func() {
var err error

if p.name == morphPrefix {
err = s.restartMorph()
} else {
err = s.restartMainChain()
}
if err != nil {
errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", p.name, err)
}
}),
client.WithConnLostCallback(func() {
errChan <- fmt.Errorf("%s chain connection has been lost", p.name)
}),
Expand Down Expand Up @@ -1041,28 +1059,22 @@ func (s *Server) initConfigFromBlockchain() error {
return fmt.Errorf("can't read epoch duration: %w", err)
}

// get balance precision
balancePrecision, err := s.balanceClient.Decimals()
// get next epoch delta tick
delta, err := s.nextEpochBlockDelta()
if err != nil {
return fmt.Errorf("can't read balance contract precision: %w", err)
return err
}

s.epochCounter.Store(epoch)
s.epochDuration.Store(epochDuration)
s.precision.SetBalancePrecision(balancePrecision)

// get next epoch delta tick
s.initialEpochTickDelta, err = s.nextEpochBlockDelta()
if err != nil {
return err
}
s.initialEpochTickDelta.Store(delta)

s.log.Debug("read config from blockchain",
zap.Bool("active", s.IsActive()),
zap.Bool("alphabet", s.IsAlphabet()),
zap.Uint64("epoch", epoch),
zap.Uint32("precision", balancePrecision),
zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta),
zap.Uint32("precision", s.precision),
zap.Uint32("init_epoch_tick_delta", delta),
)

return nil
Expand Down Expand Up @@ -1116,3 +1128,30 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler {

return newEpochHandlers
}

func (s *Server) restartMorph() error {
s.log.Info("restarting internal services because of RPC connection loss...")

s.auditTaskManager.Reset()
s.statusIndex.reset()

err := s.initConfigFromBlockchain()
if err != nil {
return fmt.Errorf("side chain config reinitialization: %w", err)
}

for _, t := range s.blockTimers {
err = t.Reset()
if err != nil {
return fmt.Errorf("could not reset block timers: %w", err)
}
}

s.log.Info("internal services have been restarted after RPC connection loss...")

return nil
}

func (s *Server) restartMainChain() error {
return nil
}
11 changes: 11 additions & 0 deletions pkg/morph/client/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type cfg struct {
singleCli *rpcclient.WSClient // neo-go client for single client mode

inactiveModeCb Callback
rpcSwitchCb Callback

reconnectionRetries int
reconnectionDelay time.Duration
Expand Down Expand Up @@ -297,3 +298,13 @@ func WithConnLostCallback(cb Callback) Option {
c.inactiveModeCb = cb
}
}

// WithConnSwitchCallback returns a client constructor option
// that specifies a callback that is called when the Client
// reconnected to a new RPC (from [WithEndpoints] list)
// successfully.
func WithConnSwitchCallback(cb Callback) Option {
return func(c *cfg) {
c.rpcSwitchCb = cb
}
}
9 changes: 8 additions & 1 deletion pkg/morph/client/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,28 @@ type Endpoint struct {
// SwitchRPC performs reconnection and returns true if it was successful.
func (c *Client) SwitchRPC() bool {
c.switchLock.Lock()
defer c.switchLock.Unlock()

for attempt := 0; attempt < c.cfg.reconnectionRetries; attempt++ {
if c.switchPRC() {
c.switchLock.Unlock()

if c.cfg.rpcSwitchCb != nil {
c.cfg.rpcSwitchCb()
}

return true
}

select {
case <-time.After(c.cfg.reconnectionDelay):
case <-c.closeChan:
c.switchLock.Unlock()
return false
}
}

c.inactive = true
c.switchLock.Unlock()

if c.cfg.inactiveModeCb != nil {
c.cfg.inactiveModeCb()
Expand Down
27 changes: 11 additions & 16 deletions pkg/util/precision/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ func convert(n, factor *big.Int, decreasePrecision bool) *big.Int {

// NewConverter returns Fixed8Converter.
func NewConverter(precision uint32) Fixed8Converter {
var c Fixed8Converter

c.SetBalancePrecision(precision)
exp := int(precision) - fixed8Precision
if exp < 0 {
exp = -exp
}

return c
return Fixed8Converter{
converter: converter{
base: fixed8Precision,
target: precision,
factor: new(big.Int).SetInt64(int64(math.Pow10(exp))),
},
}
}

func (c converter) toTarget(n *big.Int) *big.Int {
Expand All @@ -64,18 +71,6 @@ func (c Fixed8Converter) ToBalancePrecision(n int64) int64 {
return c.toTarget(new(big.Int).SetInt64(n)).Int64()
}

// SetBalancePrecision prepares converter to work.
func (c *Fixed8Converter) SetBalancePrecision(precision uint32) {
exp := int(precision) - fixed8Precision
if exp < 0 {
exp = -exp
}

c.base = fixed8Precision
c.target = precision
c.factor = new(big.Int).SetInt64(int64(math.Pow10(exp)))
}

// Convert is a wrapper of convert function. Use cached `converter` struct
// if fromPrecision and toPrecision are constant.
func Convert(fromPrecision, toPrecision uint32, n *big.Int) *big.Int {
Expand Down

0 comments on commit c408b57

Please sign in to comment.