Skip to content

Commit

Permalink
Feat: Go Substrate Client - Connection Failover Implementation (#1022)
Browse files Browse the repository at this point in the history
* feat: Maintains single connection with multiple backup URLs

* fix: remove panic in Transfer

* test: adding test suite covers failover senarios

* chore: remove old workflows

* Update CI and test image

* fix: guard Substarte connection state in GetClient() and Close()

* test: Added a sync.WaitGroup to properly track when goroutines complete in TestFailoverMechanism
  • Loading branch information
sameh-farouk authored Nov 27, 2024
1 parent 5e76664 commit d8a0468
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/010_build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
build-and-test:
runs-on: [self-hosted, tfchainrunner01]
container:
image: threefolddev/tfchain:4
image: threefolddev/tfchain:5
env:
DEBIAN_FRONTEND: noninteractive
PATH: /root/.cargo/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/go/bin
Expand Down
11 changes: 6 additions & 5 deletions .github/workflows/020_lint_and_test_go_client.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.20"
go-version: "1.21"
cache: false
# cache-dependency-path: clients/tfchain-client-go/go.sum
id: go

- name: golangci-lint
uses: golangci/golangci-lint-action@v3.7.0
uses: golangci/golangci-lint-action@v6
with:
args: --timeout 3m --verbose
working-directory: clients/tfchain-client-go

- name: staticcheck
uses: dominikh/staticcheck-action@v1.3.0
uses: dominikh/staticcheck-action@v1
with:
version: "2022.1.3"
version: "latest"
install-go: false
working-directory: clients/tfchain-client-go
env:
GO111MODULE: on
Expand All @@ -44,4 +45,4 @@ jobs:
uses: Jerome1337/[email protected]
with:
gofmt-path: './clients/tfchain-client-go'
gofmt-flags: "-l -d"
gofmt-flags: "-l -d"
6 changes: 3 additions & 3 deletions .github/workflows/build_test.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu:20.04
FROM ubuntu:22.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt update && \
apt install -y \
Expand All @@ -16,8 +16,8 @@ RUN apt update && \
zstd \
wget \
protobuf-compiler && \
wget https://go.dev/dl/go1.20.2.linux-amd64.tar.gz && \
tar -xvf go1.20.2.linux-amd64.tar.gz && \
wget https://go.dev/dl/go1.21.13.linux-amd64.tar.gz && \
tar -xvf go1.21.13.linux-amd64.tar.gz && \
mv go /usr/local && \
echo "GOPATH=/usr/local/go" >> ~/.bashrc && \
echo "PATH=\$PATH:\$GOPATH/bin" >> ~/.bashrc && \
Expand Down
35 changes: 0 additions & 35 deletions clients/tfchain-client-go/.github/workflows/lint.yml

This file was deleted.

24 changes: 0 additions & 24 deletions clients/tfchain-client-go/.github/workflows/test.yml

This file was deleted.

70 changes: 58 additions & 12 deletions clients/tfchain-client-go/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
ErrUnknownVersion = fmt.Errorf("unknown version")
//ErrNotFound is returned if an object is not found
ErrNotFound = fmt.Errorf("object not found")
//ErrClosed is returned if the client is closed
ErrClosed = fmt.Errorf("client closed")
)

// Versioned base for all types
Expand Down Expand Up @@ -87,7 +89,7 @@ func (p *mgrImpl) Substrate() (*Substrate, error) {
return nil, err
}

return newSubstrate(cl, meta, p.put)
return newSubstrate(cl, meta, p.put, p.connect)
}

// Raw returns a RPC substrate client. plus meta. The returned connection
Expand All @@ -105,7 +107,7 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {

boff := backoff.WithMaxRetries(
backoff.NewConstantBackOff(200*time.Millisecond),
2*uint64(len(p.urls)),
4*uint64(len(p.urls)),
)

var (
Expand Down Expand Up @@ -145,36 +147,80 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {
return cl, meta, err
}

// connect connects to the next endpoint in roundrobin fashion
// and replaces the current connection with the new one.
// need to be called while lock is acquired.
func (p *mgrImpl) connect(s *Substrate) error {
cl, meta, err := p.Raw()
if err != nil {
return err
}
// close the old connection if it exists
if s.cl != nil {
s.cl.Client.Close()
log.Info().Str("url", s.cl.Client.URL()).Msg("unhealthy connection closed")
}
// set the new connection
s.cl = cl
s.meta = meta
log.Info().Str("url", s.cl.Client.URL()).Msg("connection restored")
return nil
}

// TODO: implement reusable connections instead of
// closing the connection.
func (p *mgrImpl) put(cl *Substrate) {
func (p *mgrImpl) put(s *Substrate) {
// naive put implementation for now
// we just immediately kill the connection
if cl.cl != nil {
cl.cl.Client.Close()
if s.cl != nil {
s.cl.Client.Close()
}
cl.cl = nil
cl.meta = nil
s.cl = nil
s.meta = nil
}

// Substrate client
type Substrate struct {
cl Conn
meta Meta
mu sync.Mutex
cl Conn
meta Meta
closed bool

close func(s *Substrate)
close func(s *Substrate)
connect func(s *Substrate) error
}

// NewSubstrate creates a substrate client
func newSubstrate(cl Conn, meta Meta, close func(*Substrate)) (*Substrate, error) {
return &Substrate{cl: cl, meta: meta, close: close}, nil
func newSubstrate(cl Conn, meta Meta, close func(*Substrate), connect func(s *Substrate) error) (*Substrate, error) {
return &Substrate{cl: cl, meta: meta, close: close, connect: connect}, nil
}

func (s *Substrate) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.close(s)
s.closed = true
}

func (s *Substrate) GetClient() (Conn, Meta, error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.closed {
return nil, nil, ErrClosed
}

// check if connection is healthy
if _, err := getTime(s.cl, s.meta); err != nil {
log.Info().Str("url", s.cl.Client.URL()).Msg("connection unhealthy, attempting failover")
err := s.connect(s)
if err != nil {
return nil, nil, err // all attempts failed, no connection available
}
}
return s.cl, s.meta, nil
}

Expand Down
118 changes: 118 additions & 0 deletions clients/tfchain-client-go/impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package substrate

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFailoverMechanism(t *testing.T) {
t.Run("should failover to next URL when current node is unhealthy", func(t *testing.T) {
// Create manager with multiple URLs
urls := []string{"ws://fail1", getUrlBasedOnEnv()}
mgr := NewManager(urls...)

// Get initial substrate client
sub, err := mgr.Substrate()
require.NoError(t, err)
defer sub.Close()

// Store initial Client
initialClient := sub.cl.Client

// Force connection to become unhealthy by closing it
sub.cl.Client.Close()

// Try to use the connection - should trigger failover
_, err = sub.Time()
require.NoError(t, err)

// Check that we're now using a different URL
newClient := sub.cl.Client
assert.NotEqual(t, initialClient, newClient)
})

t.Run("should try all URLs in rotation", func(t *testing.T) {
urls := []string{
"ws://fail1",
"ws://fail2",
getUrlBasedOnEnv(),
}

mgr := NewManager(urls...)
sub, err := mgr.Substrate()
require.NoError(t, err)
defer sub.Close()

// The final URL should be the working one
assert.Equal(t, getUrlBasedOnEnv(), sub.cl.Client.URL())
})

t.Run("should reuse connection if healthy", func(t *testing.T) {
sub := startLocalConnection(t)
defer sub.Close()

initialClient := sub.cl.Client

// Use the connection multiple times
for i := 0; i < 3; i++ {
_, err := sub.Time()
require.NoError(t, err)
assert.Equal(t, initialClient, sub.cl.Client)
}
})

t.Run("should handle all nodes being down", func(t *testing.T) {
urls := []string{"ws://fail1", "ws://fail2"}
mgr := NewManager(urls...)
_, err := mgr.Substrate()
assert.Error(t, err)
})

t.Run("should handle concurrent failover attempts", func(t *testing.T) {
urls := []string{getUrlBasedOnEnv(), getUrlBasedOnEnv()}
mgr := NewManager(urls...)
sub1, err := mgr.Substrate()
require.NoError(t, err)
defer sub1.Close()

sub2, err := mgr.Substrate()
require.NoError(t, err)
defer sub2.Close()

// Force both connections to fail
sub1.cl.Client.Close()
sub2.cl.Client.Close()

// Create WaitGroup to ensure all goroutines complete before test ends
var wg sync.WaitGroup
wg.Add(2)

// Try to use both connections concurrently
errs := make(chan error, 2)
go func() {
defer wg.Done()
_, err := sub1.Time()
errs <- err
}()

go func() {
defer wg.Done()
_, err := sub2.Time()
errs <- err
}()

// Wait for both operations to complete
go func() {
wg.Wait()
close(errs)
}()

// Check errors from both goroutines
for err := range errs {
require.NoError(t, err)
}
})
}
3 changes: 0 additions & 3 deletions clients/tfchain-client-go/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ func (s *Substrate) Transfer(identity Identity, amount uint64, destination Accou
bal := big.NewInt(int64(amount))

c, err := types.NewCall(meta, "Balances.transfer", dest, types.NewUCompact(bal))
if err != nil {
panic(err)
}

if err != nil {
return errors.Wrap(err, "failed to create call")
Expand Down
8 changes: 8 additions & 0 deletions clients/tfchain-client-go/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ func startLocalConnection(t *testing.T) *Substrate {
return cl
}

func getUrlBasedOnEnv() string {
if _, ok := os.LookupEnv("CI"); ok {
return "ws://127.0.0.1:9944"
} else {
return "wss://tfchain.dev.grid.tf"
}
}

func assertCreateTwin(t *testing.T, cl *Substrate, user AccountUser) uint32 {
u := Accounts[user]

Expand Down

0 comments on commit d8a0468

Please sign in to comment.