Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init from etcd: basic support #590

Merged
merged 5 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions balancer/provider/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import (
"context"
"fmt"
"sort"
"strings"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/pg-sharding/spqr/balancer"
Expand All @@ -14,8 +17,6 @@
"github.com/pg-sharding/spqr/pkg/spqrlog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sort"
"strings"
)

type BalancerImpl struct {
Expand All @@ -40,7 +41,7 @@
threshold[i] = configThresholds[i]
threshold[metricsCount+i] = configThresholds[i]
}
conn, err := grpc.Dial(config.BalancerConfig().CoordinatorAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))

Check failure on line 44 in balancer/provider/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA1019: grpc.Dial is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)

Check failure on line 44 in balancer/provider/balancer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA1019: grpc.Dial is deprecated: use NewClient instead. Will be supported throughout 1.x. (staticcheck)
if err != nil {
return nil, err
}
Expand Down
20 changes: 20 additions & 0 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,26 @@ var runCmd = &cobra.Command{
}
}()

/* initialize metadata */
reshke marked this conversation as resolved.
Show resolved Hide resolved
if rcfg.UseInitSQL {
i := instance.NewInitSQLMetadataBootstraper(rcfg.InitSQL)
if err := i.InitializeMetadata(ctx, router); err != nil {
return err
}
} else if rcfg.UseCoordinatorInit {
/* load config if not yet */
if err := config.LoadCoordinatorCfg(ccfgPath); err != nil {
return err
}
e := instance.NewEtcdMetadataBootstraper(config.CoordinatorConfig().QdbAddr)
if err := e.InitializeMetadata(ctx, router); err != nil {
return err
}
} else {
/* TODO: maybe error-out? */
router.Initialize()
}

wg := &sync.WaitGroup{}

wg.Add(1)
Expand Down
60 changes: 52 additions & 8 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package provider
import (
"context"
"crypto/tls"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -73,6 +72,9 @@ func (ci grpcConnectionIterator) IterRouter(cb func(cc *grpc.ClientConn, addr st
if err := cb(cc, r.Address); err != nil {
return err
}
if err := cc.Close(); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -235,22 +237,23 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
switch resp.Status {
case routerproto.RouterStatus_CLOSED:
spqrlog.Zero.Debug().Msg("router is closed")
if err := qc.SyncRouterMetadata(ctx, internalR); err != nil {
return err
}
if _, err := rrClient.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil {
return err
}

/* Mark router as opened in qdb */
err := qc.db.OpenRouter(ctx, internalR.ID)
err := qc.db.CloseRouter(ctx, internalR.ID)
if err != nil {
return err
}

case routerproto.RouterStatus_OPENED:
spqrlog.Zero.Debug().Msg("router is opened")

/* TODO: check router metadata consistency */
if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil {
return err
}

/* Mark router as opened in qdb */
err := qc.db.OpenRouter(ctx, internalR.ID)
Expand Down Expand Up @@ -287,7 +290,7 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
}
router := &topology.Router{
ID: uuid.NewString(),
Address: fmt.Sprintf("%s:%s", config.RouterConfig().Host, config.RouterConfig().GrpcApiPort),
Address: net.JoinHostPort(config.RouterConfig().Host, config.RouterConfig().GrpcApiPort),
State: qdb.OPENED,
}
if err := qc.RegisterRouter(ctx, router); err != nil {
Expand All @@ -296,7 +299,7 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
if err := qc.SyncRouterMetadata(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("sync router metadata when locking coordinator")
}
if err := qc.UpdateCoordinator(ctx, fmt.Sprintf("%s:%s", config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)); err != nil {
if err := qc.UpdateCoordinator(ctx, net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)); err != nil {
return false
}
return true
Expand Down Expand Up @@ -391,6 +394,8 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc.
}
defer cc.Close()

defer cc.Close()

if err := cb(cc); err != nil {
spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers")
return err
Expand Down Expand Up @@ -948,6 +953,45 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol
spqrlog.Zero.Debug().Msg("successfully add all key ranges")

rCl := routerproto.NewTopologyServiceClient(cc)
if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{
Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort),
}); err != nil {
return err
}

if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
} else {
spqrlog.Zero.Debug().
Interface("response", resp).
Msg("open router response")
}

return nil
}

// TODO : unit tests
func (qc *qdbCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, qRouter *topology.Router) error {
spqrlog.Zero.Debug().
Str("address", qRouter.Address).
Msg("qdb coordinator: sync router metadata")

cc, err := DialRouter(qRouter)
if err != nil {
return err
}
defer cc.Close()

/* Update current coordinator address. */
/* Todo: check that router metadata is in sync. */

rCl := routerproto.NewTopologyServiceClient(cc)
if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{
Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort),
}); err != nil {
return err
}

if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
} else {
Expand Down
2 changes: 1 addition & 1 deletion docker/router/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ FROM spqr-base-image
RUN apt-get update && apt-get install -y postgresql-client
COPY ./docker/router/ssl/localhost.crt /etc/spqr/ssl/server.crt
COPY ./docker/router/ssl/localhost.key /etc/spqr/ssl/server.key
ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} COORD_CONFIG_PATH=${COORDINATOR_CONFIG=/spqr/docker/coordinator/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run --config ${CONFIG_PATH} --coordinator-config ${COORD_CONFIG_PATH} --proto-debug
ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} COORD_CONFIG_PATH=${COORDINATOR_CONFIG=/spqr/docker/coordinator/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run --config ${CONFIG_PATH} --coordinator-config ${COORD_CONFIG_PATH} >> ${ROUTER_LOG}
66 changes: 66 additions & 0 deletions examples/router-etcd-init.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
log_level: debug

host: '::1'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7010'

world_shard_fallback: true
router_mode: PROXY

use_coordinator_init: true

with_coordinator: true

frontend_tls:
key_file: /etc/odyssey/ssl/server.key
cert_file: /etc/odyssey/ssl/server.crt
sslmode: disable

frontend_rules:
- usr: user1
db: db1
pool_mode: TRANSACTION
pool_prepared_statement: true
auth_rule:
auth_method: ok
password: strong
- pool_mode: TRANSACTION
pool_default: true
pool_prepared_statement: false
auth_rule:
auth_method: ok

backend_rules:
- usr: user1
db: db1
pool_discard: false
pool_rollback: true
- pool_default: true
pool_discard: false
pool_rollback: true

shards:
sh1:
tls:
key_file: /etc/odyssey/ssl/server.key
sslmode: disable
cert_file: /etc/odyssey/ssl/server.crt
db: db1
usr: user1
pwd: 12345678
type: DATA
hosts:
- 'localhost:5550'
sh2:
tls:
key_file: /etc/odyssey/ssl/server.key
sslmode: disable
cert_file: /etc/odyssey/ssl/server.crt
db: db1
usr: user1
pwd: 12345678
type: DATA
hosts:
- 'localhost:5551'

9 changes: 9 additions & 0 deletions pkg/coord/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ func (a *Adapter) SyncRouterMetadata(ctx context.Context, router *topology.Route
return err
}

// SyncRouterCoordinatorAddress implements meta.EntityMgr.
func (a *Adapter) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
c := proto.NewRouterServiceClient(a.conn)
_, err := c.SyncMetadata(ctx, &proto.SyncMetadataRequest{
Router: topology.RouterToProto(router),
})
return err
}

// TODO : unit tests
// TODO : implement
func (a *Adapter) AddDataShard(ctx context.Context, shard *datashards.DataShard) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,10 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo
return ErrNotCoordinator
}

func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return qr.qdb.UpdateCoordinator(ctx, addr)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/models/topology/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type RouterMgr interface {
ListRouters(ctx context.Context) ([]*Router, error)
UnregisterRouter(ctx context.Context, id string) error
SyncRouterMetadata(ctx context.Context, router *Router) error
SyncRouterCoordinatorAddress(ctx context.Context, router *Router) error
UpdateCoordinator(ctx context.Context, address string) error
GetCoordinator(ctx context.Context) (string, error)
}
Expand Down
3 changes: 2 additions & 1 deletion qdb/etcdqdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"path"
"sort"
"sync"
Expand Down Expand Up @@ -512,7 +513,7 @@ func (q *EtcdQDB) TryCoordinatorLock(ctx context.Context) error {
return err
}

op := clientv3.OpPut(coordLockKey, config.CoordinatorConfig().Host, clientv3.WithLease(clientv3.LeaseID(leaseGrantResp.ID)))
op := clientv3.OpPut(coordLockKey, net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort), clientv3.WithLease(clientv3.LeaseID(leaseGrantResp.ID)))
tx := q.cli.Txn(ctx).If(clientv3util.KeyMissing(coordLockKey)).Then(op)
stat, err := tx.Commit()
if err != nil {
Expand Down
77 changes: 76 additions & 1 deletion router/instance/etcd.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,80 @@
package instance

import (
"context"
"time"

"github.com/pg-sharding/spqr/pkg/models/distributions"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
)

type EtcdMetadataBootstraper struct {
RouterMetadataBootstraper
QdbAddr string
}

// InitializeMetadata implements RouterMetadataBootstraper.
func (e *EtcdMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error {
etcdConn, err := qdb.NewEtcdQDB(e.QdbAddr)
if err != nil {
return err
}
defer etcdConn.Client().Close()

/* Initialize distributions */
ds, err := etcdConn.ListDistributions(ctx)
if err != nil {
return err
}

for _, d := range ds {
if err := r.Console().Mgr().CreateDistribution(ctx, distributions.DistributionFromDB(d)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance")
return err
}

/* initialize key ranges within distribution */
krs, err := etcdConn.ListKeyRanges(ctx, d.ID)
if err != nil {
return err
}

for _, ckr := range krs {
if err := r.Console().Mgr().CreateKeyRange(ctx, kr.KeyRangeFromDB(ckr)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance")
return err
}
}
}

retryCnt := 50

for {
c, err := etcdConn.GetCoordinator(ctx)
if err != nil {
if retryCnt > 0 {
/* await the roiter to appear */
time.Sleep(time.Second)
retryCnt--
continue
}
return err
}

err = r.Console().Mgr().UpdateCoordinator(ctx, c)

if err == nil {
break
}
return err
}

r.Initialize()

return nil
}

func NewEtcdMetadataBootstraper(QdbAddr string) RouterMetadataBootstraper {
return &EtcdMetadataBootstraper{QdbAddr: QdbAddr}
}
Loading
Loading