From 068cb6e330b4684248bc2f30871776dceddcb827 Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 2 Apr 2024 13:29:57 +0000 Subject: [PATCH 1/3] Allow coordinator to serve multiple clients --- coordinator/app/app.go | 24 ++++++++++++++++++++++-- qdb/etcdqdb.go | 2 +- qdb/memqdb.go | 1 + 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/coordinator/app/app.go b/coordinator/app/app.go index 4b2472b5e..9958084fb 100644 --- a/coordinator/app/app.go +++ b/coordinator/app/app.go @@ -14,15 +14,23 @@ import ( "github.com/pg-sharding/spqr/coordinator/provider" "github.com/pg-sharding/spqr/pkg/config" protos "github.com/pg-sharding/spqr/pkg/protos" + + "golang.org/x/sync/semaphore" ) type App struct { coordinator coordinator.Coordinator + sem *semaphore.Weighted } +const ( + maxWorkers = 50 +) + func NewApp(c coordinator.Coordinator) *App { return &App{ coordinator: c, + sem: semaphore.NewWeighted(int64(maxWorkers)), } } @@ -83,8 +91,20 @@ func (app *App) ServeCoordinator(wg *sync.WaitGroup) error { for { conn, err := listener.Accept() - spqrlog.Zero.Error().Err(err).Msg("") - _ = app.coordinator.ProcClient(context.TODO(), conn) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("") + } + + app.sem.Acquire(context.Background(), 1) + + go func() { + defer app.sem.Release(1) + + err := app.coordinator.ProcClient(context.TODO(), conn) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("failed to serve client") + } + }() } }(l) } diff --git a/qdb/etcdqdb.go b/qdb/etcdqdb.go index 2cbb3c787..cd45ebb57 100644 --- a/qdb/etcdqdb.go +++ b/qdb/etcdqdb.go @@ -63,7 +63,7 @@ const ( ) func keyLockPath(key string) string { - return path.Join("lock", key) + return path.Join("/lock", key) } func keyRangeNodePath(key string) string { diff --git a/qdb/memqdb.go b/qdb/memqdb.go index fa85cbe7f..4112e1ce1 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -459,6 +459,7 @@ func (q *MemQDB) UpdateCoordinator(ctx context.Context, address string) error { } func (q *MemQDB) GetCoordinator(ctx context.Context) (string, error) { + spqrlog.Zero.Debug().Str("address", q.Coordinator).Msg("memqdb: get coordinator address") return q.Coordinator, nil } From 5a7b089960dcb95c4f852a53ac26b33603d331a3 Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 2 Apr 2024 20:02:56 +0500 Subject: [PATCH 2/3] Update app.go --- coordinator/app/app.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/coordinator/app/app.go b/coordinator/app/app.go index 9958084fb..1c0fce11f 100644 --- a/coordinator/app/app.go +++ b/coordinator/app/app.go @@ -93,9 +93,12 @@ func (app *App) ServeCoordinator(wg *sync.WaitGroup) error { conn, err := listener.Accept() if err != nil { spqrlog.Zero.Error().Err(err).Msg("") + return err } - app.sem.Acquire(context.Background(), 1) + if err := app.sem.Acquire(context.Background(), 1); err != nil { + return err + } go func() { defer app.sem.Release(1) From 2f59a561799b3e010a7fe97c2ff518bb5e2182bf Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 2 Apr 2024 20:37:34 +0500 Subject: [PATCH 3/3] Update app.go --- coordinator/app/app.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/coordinator/app/app.go b/coordinator/app/app.go index 1c0fce11f..622a93a72 100644 --- a/coordinator/app/app.go +++ b/coordinator/app/app.go @@ -93,11 +93,12 @@ func (app *App) ServeCoordinator(wg *sync.WaitGroup) error { conn, err := listener.Accept() if err != nil { spqrlog.Zero.Error().Err(err).Msg("") - return err + continue } if err := app.sem.Acquire(context.Background(), 1); err != nil { - return err + spqrlog.Zero.Error().Err(err).Msg("") + continue } go func() {