From bac9f138dacefd3d84da85babc02d22419a34317 Mon Sep 17 00:00:00 2001 From: Sameh Abouel-saad Date: Sun, 17 Nov 2024 23:43:41 +0200 Subject: [PATCH] feat: Implement connection manager --- clients/tfchain-client-go/impl.go | 518 ++++++++++++++++++++----- clients/tfchain-client-go/impl_test.go | 193 +++++++++ clients/tfchain-client-go/transfer.go | 3 - 3 files changed, 604 insertions(+), 110 deletions(-) create mode 100644 clients/tfchain-client-go/impl_test.go diff --git a/clients/tfchain-client-go/impl.go b/clients/tfchain-client-go/impl.go index b6911a932..1031e76ca 100644 --- a/clients/tfchain-client-go/impl.go +++ b/clients/tfchain-client-go/impl.go @@ -1,9 +1,12 @@ package substrate import ( + "context" "fmt" "math/rand" + "slices" "sync" + "sync/atomic" "time" "github.com/cenkalti/backoff" @@ -13,20 +16,16 @@ import ( "github.com/rs/zerolog/log" ) -const ( - // acceptable delay is amount of blocks (in second) that a node can - // be behind before we don't accept it. block time is 6 seconds, so - // right now we only allow 2 blocks delay - acceptableDelay = 2 * 6 * time.Second +var ( + ErrInvalidVersion = fmt.Errorf("invalid version") + ErrUnknownVersion = fmt.Errorf("unknown version") + ErrNotFound = fmt.Errorf("object not found") + ErrNoConnectionsAvailable = fmt.Errorf("no healthy connections available") + ErrMaxPoolSizeReached = fmt.Errorf("max pool size reached") ) -var ( - //ErrInvalidVersion is returned if version 4bytes is invalid - ErrInvalidVersion = fmt.Errorf("invalid version") - //ErrUnknownVersion is returned if version number is not supported - ErrUnknownVersion = fmt.Errorf("unknown version") - //ErrNotFound is returned if an object is not found - ErrNotFound = fmt.Errorf("object not found") +const ( + AcceptableDelay = 2 * 6 * time.Second ) // Versioned base for all types @@ -37,145 +36,445 @@ type Versioned struct { type Conn = *gsrpc.SubstrateAPI type Meta = *types.Metadata +// Pool connection +type poolConn struct { + conn Conn + meta Meta + url string + lastUsed atomic.Int64 // Unix timestamp + inUse atomic.Bool +} + +func (pc *poolConn) isHealthy() bool { + if pc == nil || pc.conn == nil || pc.meta == nil { + return false + } + _, err := getTime(pc.conn, pc.meta) + return err == nil +} + +func (pc *poolConn) close() { + if pc != nil && pc.conn != nil { + pc.conn.Client.Close() + pc.conn = nil + pc.meta = nil + log.Debug().Str("url", pc.url).Msg("closed connection") + } +} + type Manager interface { + GetConnection(ctx context.Context) (*Substrate, error) + Close() error + + // Deprecated methods Raw() (Conn, Meta, error) Substrate() (*Substrate, error) } -type mgrImpl struct { - urls []string +type ManagerConfig struct { + // Maximum number of connections in the pool + MaxPoolSize int + // Minimum number of connections to maintain + MinPoolSize int + // Maximum time a connection can be idle before being closed (if the pool has more than MinPoolSize) + MaxIdleTime time.Duration + // Interval between health checks + // After thinking about it, we don't need to periodically check the health of the connections + // because this creates a lot of overhead + // so instead we just check the health when we need to and do the maintanance in demand + // HealthCheckInterval time.Duration + // Timeout for creating new connections + ConnectionTimeout time.Duration +} + +// Default configuration +var DefaultConfig = ManagerConfig{ + MaxPoolSize: 5, + MinPoolSize: 2, + MaxIdleTime: 30 * time.Minute, + // HealthCheckInterval: 120 * time.Second, + ConnectionTimeout: 10 * time.Second, +} + +type manager struct { + urls []string + pool []*poolConn + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + config ManagerConfig + checkChan chan struct{} +} - r int - m sync.Mutex +func NewManager(urls ...string) Manager { + return NewManagerWithConfig(DefaultConfig, urls...) } -func NewManager(url ...string) Manager { - if len(url) == 0 { - panic("at least one url is required") +func NewManagerWithConfig(config ManagerConfig, urls ...string) Manager { + if len(urls) == 0 { + panic("at least one URL required") } - // the shuffle is needed so if one endpoints fails, and the next one - // is tried, we will end up moving all connections to the "next" endpoint - // which will get overloaded. Instead the shuffle helps to make the "next" - // different for reach instace of the pool. - rand.Shuffle(len(url), func(i, j int) { - url[i], url[j] = url[j], url[i] - }) + // Validate and adjust configuration + if config.MaxPoolSize < 1 { + config.MaxPoolSize = DefaultConfig.MaxPoolSize + } + if config.MinPoolSize < 1 || config.MinPoolSize > config.MaxPoolSize { + config.MinPoolSize = min(DefaultConfig.MinPoolSize, config.MaxPoolSize) + } + if config.MaxIdleTime <= 0 { + config.MaxIdleTime = DefaultConfig.MaxIdleTime + } + // if config.HealthCheckInterval <= 0 { + // config.HealthCheckInterval = DefaultConfig.HealthCheckInterval + // } + if config.ConnectionTimeout <= 0 { + config.ConnectionTimeout = DefaultConfig.ConnectionTimeout + } + + ctx, cancel := context.WithCancel(context.Background()) + m := &manager{ + urls: shuffle(urls), + pool: make([]*poolConn, 0, config.MaxPoolSize), + ctx: ctx, + cancel: cancel, + config: config, + checkChan: make(chan struct{}, 1), + } + + m.initializePool() + m.wg.Add(1) + go m.healthChecker() + + return m +} - return &mgrImpl{ - urls: url, - r: rand.Intn(len(url)), // start with random url, then roundrobin +func (m *manager) initializePool() { + log.Debug().Msg("initializing connection pool") + for i := 0; i < m.config.MinPoolSize; i++ { + select { + case m.checkChan <- struct{}{}: + default: + } } } -// endpoint return the next endpoint to use -// in roundrobin fashion. need to be called -// while lock is acquired. -func (p *mgrImpl) endpoint() string { - defer func() { - p.r = (p.r + 1) % len(p.urls) - }() +func (m *manager) createConnection(ctx context.Context, url string) (*poolConn, error) { + log.Debug().Str("url", url).Msg("attempting to create a new connection") + ctx, cancel := context.WithTimeout(ctx, m.config.ConnectionTimeout) + defer cancel() + + select { + case <-ctx.Done(): + log.Error().Str("url", url).Msg("context done while creating connection") + return nil, ctx.Err() + default: + if conn, meta, err := createSubstrateConn(url); err == nil { + log.Debug().Str("url", url).Msg("created new connection") + return &poolConn{ + conn: conn, + meta: meta, + url: url, + lastUsed: atomic.Int64{}, + inUse: atomic.Bool{}, + }, nil + } else { + log.Error().Str("url", url).Err(err).Msg("failed to create connection") + } + } + return nil, fmt.Errorf("failed to create connection to %s", url) +} - return p.urls[p.r] +func (m *manager) GetConnection(ctx context.Context) (*Substrate, error) { + log.Debug().Msg("getting a connection") + conn, err := m.getHealthyConn() + if err != nil { + log.Error().Err(err).Msg("failed to get connection") + return nil, fmt.Errorf("failed to get connection: %w", err) + } + log.Debug().Str("url", conn.url).Msg("successfully obtained connection") + return newSubstrate(conn, m), nil } -// Substrate return a new wrapped substrate connection -// the connection must be closed after you are done using it -func (p *mgrImpl) Substrate() (*Substrate, error) { - cl, meta, err := p.Raw() +func (m *manager) getHealthyConn() (*poolConn, error) { + log.Debug().Int("pool_size", len(m.pool)).Int("aquired_count", m.aquiredConnCount()). + Msg("checking for healthy connections") + + // Try getting existing connection first + if conn := m.getExistingConn(); conn != nil { + return conn, nil + } + + b := backoff.NewExponentialBackOff() + b.MaxInterval = 2 * time.Second + b.InitialInterval = 500 * time.Millisecond + b.Multiplier = 2 + + var conn *poolConn + err := backoff.Retry(func() error { + // Check if we can get an existing connection + if c := m.getExistingConn(); c != nil { + conn = c + return nil + } + + m.mu.RLock() + poolSize := len(m.pool) + m.mu.RUnlock() + + if poolSize >= m.config.MaxPoolSize { + return backoff.Permanent(ErrMaxPoolSizeReached) + } + + select { + case m.checkChan <- struct{}{}: + log.Debug().Msg("triggered connection check") + default: + log.Debug().Msg("connection check already pending") + } + + // time.Sleep(50 * time.Millisecond) + return ErrNoConnectionsAvailable + }, b) + if err != nil { return nil, err } - return newSubstrate(cl, meta, p.put) -} - -// Raw returns a RPC substrate client. plus meta. The returned connection -// is not tracked by the pool, nor reusable. It's the caller responsibility -// to close the connection when done -func (p *mgrImpl) Raw() (Conn, Meta, error) { - // right now this pool implementation just tests the connection - // makes sure that it is still active, otherwise, tries again - // until the connection is restored. - // A better pool implementation can be done later were multiple connections - // can be handled - // TODO: thread safety! - p.m.Lock() - defer p.m.Unlock() - - boff := backoff.WithMaxRetries( - backoff.NewConstantBackOff(200*time.Millisecond), - 2*uint64(len(p.urls)), - ) - - var ( - cl *gsrpc.SubstrateAPI - meta *types.Metadata - err error - ) - - err = backoff.RetryNotify(func() error { - endpoint := p.endpoint() - log.Debug().Str("url", endpoint).Msg("connecting") - cl, err = newSubstrateAPI(endpoint) - if err != nil { - return errors.Wrapf(err, "error connecting to substrate at '%s'", endpoint) + return conn, nil +} + +func (m *manager) healthChecker() { + defer m.wg.Done() + // ticker := time.NewTicker(m.config.HealthCheckInterval) + // defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + // case <-ticker.C: + // m.checkConnections() + case <-m.checkChan: + m.checkConnections() } + } +} - meta, err = cl.RPC.State.GetMetadataLatest() - if err != nil { - return errors.Wrapf(err, "error getting latest metadata at '%s'", endpoint) +func (m *manager) checkConnections() { + m.mu.Lock() + healthy := make([]*poolConn, 0, len(m.pool)) + for _, conn := range m.pool { + if conn == nil { + continue } - t, err := getTime(cl, meta) - if err != nil { - return errors.Wrapf(err, "error getting node time at '%s'", endpoint) + if !conn.isHealthy() { + log.Debug().Str("url", conn.url).Msg("closing unhealthy connection") + conn.close() + continue } - if time.Since(t) > acceptableDelay { - return fmt.Errorf("node '%s' is behind acceptable delay with timestamp '%s'", endpoint, t) + // Check if connection is idle for too long if we have more than min pool size + if !conn.inUse.Load() && time.Since(time.Unix(conn.lastUsed.Load(), 0)) > m.config.MaxIdleTime && len(m.pool) > m.config.MinPoolSize { + log.Debug().Str("url", conn.url).Msg("closing idle connection") + conn.close() + continue } - return nil + healthy = append(healthy, conn) + } - }, boff, func(err error, _ time.Duration) { - log.Error().Err(err).Msg("failed to connect to endpoint, retrying") - }) + m.pool = healthy + m.mu.Unlock() + m.ensureMinConnections() - return cl, meta, err } -// TODO: implement reusable connections instead of -// closing the connection. -func (p *mgrImpl) put(cl *Substrate) { - // naive put implementation for now - // we just immediately kill the connection - if cl.cl != nil { - cl.cl.Client.Close() +func (m *manager) ensureMinConnections() { + log.Debug().Msg("ensuring minimum connections in the pool") + inUseCount := m.aquiredConnCount() + urls := shuffle(m.unusedURLs()) + urls = append(urls, m.urls...) + + for _, url := range urls { + poolSize := len(m.pool) + + if poolSize < m.config.MinPoolSize || (poolSize < m.config.MaxPoolSize && poolSize == inUseCount) { + if conn, err := m.createConnection(m.ctx, url); err == nil { + m.mu.Lock() + m.pool = append(m.pool, conn) + m.mu.Unlock() + log.Debug().Str("url", url).Msg("added new connection to pool") + } + } else { + break + } } - cl.cl = nil - cl.meta = nil } -// Substrate client -type Substrate struct { - cl Conn - meta Meta +func (m *manager) Close() error { + m.cancel() + m.wg.Wait() - close func(s *Substrate) + m.mu.Lock() + defer m.mu.Unlock() + + for _, conn := range m.pool { + conn.close() + } + m.pool = nil + return nil } -// NewSubstrate creates a substrate client -func newSubstrate(cl Conn, meta Meta, close func(*Substrate)) (*Substrate, error) { - return &Substrate{cl: cl, meta: meta, close: close}, nil +// Helper methods +func (m *manager) unusedURLs() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + // get all urls that are not in the pool + used := make([]string, 0, len(m.pool)) + for _, conn := range m.pool { + used = append(used, conn.url) + } + unused := make([]string, 0, len(m.urls)) + for _, url := range m.urls { + if !slices.Contains(used, url) { + unused = append(unused, url) + } + } + return unused } -func (s *Substrate) Close() { - s.close(s) +func (m *manager) aquiredConnCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + count := 0 + for _, conn := range m.pool { + if conn.inUse.Load() { + count++ + } + } + return count +} + +func (m *manager) getExistingConn() *poolConn { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, conn := range m.pool { + if conn.isHealthy() && !conn.inUse.Load() { + if conn.inUse.CompareAndSwap(false, true) { + conn.lastUsed.Store(time.Now().Unix()) + return conn + } + } + } + return nil +} + +func shuffle(urls []string) []string { + result := make([]string, len(urls)) + copy(result, urls) + rand.Shuffle(len(result), func(i, j int) { + result[i], result[j] = result[j], result[i] + }) + return result +} + +// Deprecated methods implementation +func (m *manager) Raw() (Conn, Meta, error) { + conn, err := m.GetConnection(context.Background()) + if err != nil { + return nil, nil, err + } + return conn.conn.conn, conn.conn.meta, nil +} + +func (m *manager) Substrate() (*Substrate, error) { + return m.GetConnection(context.Background()) +} + +type Substrate struct { + conn *poolConn + mgr *manager + mu sync.Mutex + closed bool +} + +func newSubstrate(conn *poolConn, mgr *manager) *Substrate { + return &Substrate{ + conn: conn, + mgr: mgr, + } +} + +func createSubstrateConn(url string) (Conn, Meta, error) { + cl, err := newSubstrateAPI(url) + if err != nil { + return nil, nil, err + } + + meta, err := cl.RPC.State.GetMetadataLatest() + if err != nil { + cl.Client.Close() + return nil, nil, err + } + + t, err := getTime(cl, meta) + if err != nil || time.Since(t) > AcceptableDelay { + cl.Client.Close() + return nil, nil, fmt.Errorf("node health check failed") + } + + return cl, meta, nil } func (s *Substrate) GetClient() (Conn, Meta, error) { - return s.cl, s.meta, nil + if s.closed { + log.Error().Msg("attempted to get client from closed substrate") + return nil, nil, fmt.Errorf("substrate connection closed") + } + + if s.conn.isHealthy() { + conn := s.conn.conn + meta := s.conn.meta + s.conn.lastUsed.Store(time.Now().Unix()) + return conn, meta, nil + } + s.conn.inUse.Store(false) + + conn, err := s.mgr.getHealthyConn() + if err != nil { + log.Error().Err(err).Msg("failed to get healthy connection for client") + return nil, nil, err + } + + s.mu.Lock() + + s.conn = conn + s.mu.Unlock() + + log.Debug().Str("url", conn.url).Msg("swapped connection") + return conn.conn, conn.meta, nil +} + +func (s *Substrate) Release() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.closed { + return + } + s.closed = true + + if s.conn != nil { + s.conn.inUse.Store(false) + log.Debug().Str("url", s.conn.url).Msg("releasing connection to pool") + s.conn = nil + } } func (s *Substrate) getVersion(b types.StorageDataRaw) (uint32, error) { @@ -196,6 +495,11 @@ func (s *Substrate) Time() (t time.Time, err error) { return getTime(cl, meta) } +// deprecated methods +func (s *Substrate) Close() { + s.Release() +} + func getTime(cl Conn, meta Meta) (t time.Time, err error) { key, err := types.CreateStorageKey(meta, "Timestamp", "Now", nil) if err != nil { diff --git a/clients/tfchain-client-go/impl_test.go b/clients/tfchain-client-go/impl_test.go new file mode 100644 index 000000000..f798fb6d6 --- /dev/null +++ b/clients/tfchain-client-go/impl_test.go @@ -0,0 +1,193 @@ +package substrate + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPoolInitialization(t *testing.T) { + urls := []string{"ws://127.0.0.1:9944", "ws://127.0.0.1:9945", "ws://127.0.0.1:9946"} + mgr := NewManager(urls...) + defer mgr.Close() + + time.Sleep(100 * time.Millisecond) + + mgrImpl := mgr.(*manager) + mgrImpl.mu.RLock() + defer mgrImpl.mu.RUnlock() + + assert.LessOrEqual(t, len(mgrImpl.pool), mgrImpl.config.MinPoolSize) + assert.Greater(t, len(mgrImpl.pool), 0) +} + +func TestConnectionReuse(t *testing.T) { + mgr := NewManager("ws://127.0.0.1:9944") + defer mgr.Close() + + // Wait for pool initialization + time.Sleep(100 * time.Millisecond) + + // Get first connection + sub1, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + + // Store connection details for comparison + conn1 := sub1.conn + url1 := sub1.conn.url + + // Release it back to pool properly + sub1.Release() + + // Small delay to ensure connection is properly released + time.Sleep(10 * time.Millisecond) + + // Get another connection + sub2, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub2.Release() + + // Should be the same underlying connection + assert.Equal(t, conn1, sub2.conn) + assert.Equal(t, url1, sub2.conn.url) +} + +func TestConcurrentAccess(t *testing.T) { + mgr := NewManager("ws://127.0.0.1:9944", "ws://127.0.0.1:9945") + defer mgr.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + sub, err := mgr.GetConnection(ctx) + if err != nil { + return + } + defer sub.Release() + sub.Time() + time.Sleep(10 * time.Millisecond) + }() + } + + wg.Wait() +} + +func TestFailover(t *testing.T) { + mgr := NewManager("ws://fail1", "ws://127.0.0.1:9944") + defer mgr.Close() + + sub1, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub1.Release() + sub2, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub2.Release() + assert.Equal(t, sub1.conn.url, "ws://127.0.0.1:9944") + assert.Equal(t, sub2.conn.url, "ws://127.0.0.1:9944") +} + +func TestHealthChecking(t *testing.T) { + mgr := NewManager("ws://127.0.0.1:9944") + defer mgr.Close() + + sub, err := mgr.GetConnection(context.Background()) + require.NoError(t, err) + defer sub.Release() + + // Simulate connection failure + old := sub.conn.conn + old.Client.Close() + // simulate usage of the client + sub.Time() + assert.NotEqual(t, old, sub.conn.conn) +} + +func TestStressWithFailures(t *testing.T) { + if testing.Short() { + t.Skip("Skipping stress test in short mode") + } + + // Use test-specific configuration + config := ManagerConfig{ + MaxPoolSize: 30, + MinPoolSize: 3, + MaxIdleTime: time.Minute, + // HealthCheckInterval: time.Second, + ConnectionTimeout: time.Second, + } + + mgr := NewManagerWithConfig(config, "ws://127.0.0.1:9944", "ws://127.0.0.1:9945") + defer mgr.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + var ( + wg sync.WaitGroup + mu sync.Mutex + errors []error + ) + + for i := 0; i < 30; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + retryBackoff := time.Millisecond * 100 + maxBackoff := time.Second + + for ctx.Err() == nil { + sub, err := mgr.GetConnection(ctx) + if err != nil { + mu.Lock() + errors = append(errors, fmt.Errorf("goroutine %d: %w", id, err)) + mu.Unlock() + + jitter := time.Duration(rand.Int63n(int64(retryBackoff))) + time.Sleep(retryBackoff + jitter) + retryBackoff *= 2 + if retryBackoff > maxBackoff { + retryBackoff = maxBackoff + } + continue + } + + // Reset backoff on success + retryBackoff = time.Millisecond * 100 + + // Simulate work + sub.Time() + time.Sleep(time.Duration(rand.Intn(250)+50) * time.Millisecond) + + if id%2 == 0 && rand.Float32() < 0.1 { + sub.conn.conn.Client.Close() + } + + sub.Release() + } + }(i) + } + + wg.Wait() + + // Log and check errors + for _, err := range errors { + t.Logf("Error: %v", err) + } + + assert.Less(t, len(errors), 10, + "Too many errors occurred during stress test: %d", len(errors)) +} diff --git a/clients/tfchain-client-go/transfer.go b/clients/tfchain-client-go/transfer.go index 5648a851e..c739a5fef 100644 --- a/clients/tfchain-client-go/transfer.go +++ b/clients/tfchain-client-go/transfer.go @@ -20,9 +20,6 @@ func (s *Substrate) Transfer(identity Identity, amount uint64, destination Accou bal := big.NewInt(int64(amount)) c, err := types.NewCall(meta, "Balances.transfer", dest, types.NewUCompact(bal)) - if err != nil { - panic(err) - } if err != nil { return errors.Wrap(err, "failed to create call")