Skip to content

Commit

Permalink
ir: Reload internals on connection switch
Browse files Browse the repository at this point in the history
Update cached side chain configurations, reset timers and audit tasks queue.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
notimetoname committed Jun 26, 2023
1 parent 429e23a commit dec2cf0
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 21 deletions.
85 changes: 65 additions & 20 deletions pkg/innerring/innerring.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,19 @@ type (
blockTimers []*timer.BlockTimer
epochTimer *timer.BlockTimer

morphClient *client.Client
mainnetClient *client.Client
auditClient *auditClient.Client
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
auditTaskManager *audittask.Manager

// global state
morphClient *client.Client
mainnetClient *client.Client
epochCounter atomic.Uint64
epochDuration atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
auditClient *auditClient.Client
healthStatus atomic.Value
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
persistate *state.PersistentStorage

// metrics
Expand All @@ -98,7 +100,7 @@ type (
pubKey []byte
contracts *contracts
predefinedValidators keys.PublicKeys
initialEpochTickDelta uint32
initialEpochTickDelta atomic.Uint32
withoutMainNet bool

// runtime processors
Expand Down Expand Up @@ -211,7 +213,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {

// tick initial epoch
initialEpochTicker := timer.NewOneTickTimer(
timer.StaticBlockMeter(s.initialEpochTickDelta),
func() (uint32, error) {
return s.initialEpochTickDelta.Load(), nil
},
func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
})
Expand Down Expand Up @@ -438,7 +442,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
server.key = acc.PrivateKey()
morphChain.key = server.key

server.morphClient, err = createClient(ctx, morphChain, errChan)
server.morphClient, err = server.createClient(ctx, morphChain, errChan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -474,7 +478,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
mainnetChain.from = fromMainChainBlock

// create mainnet client
server.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
server.mainnetClient, err = server.createClient(ctx, mainnetChain, errChan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -616,7 +620,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
porPoolSize := cfg.GetInt("audit.por.pool_size")

// create audit processor dependencies
auditTaskManager := audittask.New(
server.auditTaskManager = audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
audittask.WithWorkerPool(auditPool),
audittask.WithLogger(log),
Expand All @@ -630,7 +634,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
}),
)

server.workers = append(server.workers, auditTaskManager.Listen)
server.workers = append(server.workers, server.auditTaskManager.Listen)

// create audit processor
auditProcessor, err := audit.New(&audit.Params{
Expand All @@ -642,7 +646,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
SGSource: clientCache,
Key: &server.key.PrivateKey,
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
TaskManager: auditTaskManager,
TaskManager: server.auditTaskManager,
Reporter: server,
})
if err != nil {
Expand Down Expand Up @@ -1011,8 +1015,9 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev
return listener, err
}

func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) {
endpoints := p.cfg.GetStringSlice(p.name + ".endpoints")
func (s *Server) createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) {
name := p.name
endpoints := p.cfg.GetStringSlice(name + ".endpoints")
if len(endpoints) == 0 {
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
}
Expand All @@ -1026,6 +1031,18 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c
client.WithEndpoints(endpoints),
client.WithReconnectionRetries(p.cfg.GetInt(p.name+".reconnections_number")),
client.WithReconnectionsDelay(p.cfg.GetDuration(p.name+".reconnections_delay")),
client.WithConnSwitchCallback(func() {
var err error

if name == morphPrefix {
err = s.restartMorph()
} else {
err = s.restartMainChain()
}
if err != nil {
errChan <- fmt.Errorf("RPC connection switch to %s: %w", name, err)
}
}),
client.WithConnLostCallback(func() {
errChan <- fmt.Errorf("%s chain connection has been lost", p.name)
}),
Expand Down Expand Up @@ -1076,22 +1093,23 @@ func (s *Server) initConfigFromBlockchain() error {
return fmt.Errorf("can't read balance contract precision: %w", err)
}

s.epochCounter.Store(epoch)
s.epochDuration.Store(epochDuration)
s.precision.SetBalancePrecision(balancePrecision)

// get next epoch delta tick
s.initialEpochTickDelta, err = s.nextEpochBlockDelta()
delta, err := s.nextEpochBlockDelta()
if err != nil {
return err
}

s.epochCounter.Store(epoch)
s.epochDuration.Store(epochDuration)
s.precision.SetBalancePrecision(balancePrecision)
s.initialEpochTickDelta.Store(delta)

s.log.Debug("read config from blockchain",
zap.Bool("active", s.IsActive()),
zap.Bool("alphabet", s.IsAlphabet()),
zap.Uint64("epoch", epoch),
zap.Uint32("precision", balancePrecision),
zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta),
zap.Uint32("init_epoch_tick_delta", delta),
)

return nil
Expand Down Expand Up @@ -1145,3 +1163,30 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler {

return newEpochHandlers
}

func (s *Server) restartMorph() error {
s.log.Info("restarting internals because of RPC connection loss...")

s.auditTaskManager.Reset()
s.statusIndex.reset()

err := s.initConfigFromBlockchain()
if err != nil {
return fmt.Errorf("side chain config reinitialization: %w", err)
}

for _, t := range s.blockTimers {
err = t.Reset()
if err != nil {
return fmt.Errorf("could not reset block timers: %w", err)
}
}

s.log.Info("internals have been restarted after RPC connection loss...")

return nil
}

func (s *Server) restartMainChain() error {
return nil
}
2 changes: 1 addition & 1 deletion pkg/util/precision/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type (
// This is a JSON bound that uses neo node. Neo-go has int64 limit for
// `smartcontract.Parameter` of integer type.
Fixed8Converter struct {
m *sync.RWMutex
m sync.RWMutex
converter
}
)
Expand Down

0 comments on commit dec2cf0

Please sign in to comment.