Skip to content

Commit

Permalink
Init from etcd: basic support (#590)
Browse files Browse the repository at this point in the history
change router initialization in clustered setup mechanics.
Router requests current metadata setup from qdb on start. Router sync metadata routine now sync only current coordinator address for router proxy queries.
  • Loading branch information
reshke authored Apr 4, 2024
1 parent 554a9db commit 75c910a
Show file tree
Hide file tree
Showing 22 changed files with 358 additions and 39 deletions.
5 changes: 3 additions & 2 deletions balancer/provider/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package provider
import (
"context"
"fmt"
"sort"
"strings"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/pg-sharding/spqr/balancer"
Expand All @@ -14,8 +17,6 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sort"
"strings"
)

type BalancerImpl struct {
Expand Down
20 changes: 20 additions & 0 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,26 @@ var runCmd = &cobra.Command{
}
}()

/* initialize metadata */
if rcfg.UseInitSQL {
i := instance.NewInitSQLMetadataBootstraper(rcfg.InitSQL)
if err := i.InitializeMetadata(ctx, router); err != nil {
return err
}
} else if rcfg.UseCoordinatorInit {
/* load config if not yet */
if err := config.LoadCoordinatorCfg(ccfgPath); err != nil {
return err
}
e := instance.NewEtcdMetadataBootstraper(config.CoordinatorConfig().QdbAddr)
if err := e.InitializeMetadata(ctx, router); err != nil {
return err
}
} else {
/* TODO: maybe error-out? */
router.Initialize()
}

wg := &sync.WaitGroup{}

wg.Add(1)
Expand Down
60 changes: 52 additions & 8 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package provider
import (
"context"
"crypto/tls"
"fmt"
"net"
"time"

Expand Down Expand Up @@ -73,6 +72,9 @@ func (ci grpcConnectionIterator) IterRouter(cb func(cc *grpc.ClientConn, addr st
if err := cb(cc, r.Address); err != nil {
return err
}
if err := cc.Close(); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -235,22 +237,23 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
switch resp.Status {
case routerproto.RouterStatus_CLOSED:
spqrlog.Zero.Debug().Msg("router is closed")
if err := qc.SyncRouterMetadata(ctx, internalR); err != nil {
return err
}
if _, err := rrClient.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil {
return err
}

/* Mark router as opened in qdb */
err := qc.db.OpenRouter(ctx, internalR.ID)
err := qc.db.CloseRouter(ctx, internalR.ID)
if err != nil {
return err
}

case routerproto.RouterStatus_OPENED:
spqrlog.Zero.Debug().Msg("router is opened")

/* TODO: check router metadata consistency */
if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil {
return err
}

/* Mark router as opened in qdb */
err := qc.db.OpenRouter(ctx, internalR.ID)
Expand Down Expand Up @@ -287,7 +290,7 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
}
router := &topology.Router{
ID: uuid.NewString(),
Address: fmt.Sprintf("%s:%s", config.RouterConfig().Host, config.RouterConfig().GrpcApiPort),
Address: net.JoinHostPort(config.RouterConfig().Host, config.RouterConfig().GrpcApiPort),
State: qdb.OPENED,
}
if err := qc.RegisterRouter(ctx, router); err != nil {
Expand All @@ -296,7 +299,7 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
if err := qc.SyncRouterMetadata(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("sync router metadata when locking coordinator")
}
if err := qc.UpdateCoordinator(ctx, fmt.Sprintf("%s:%s", config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)); err != nil {
if err := qc.UpdateCoordinator(ctx, net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)); err != nil {
return false
}
return true
Expand Down Expand Up @@ -391,6 +394,8 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc.
}
defer cc.Close()

defer cc.Close()

if err := cb(cc); err != nil {
spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers")
return err
Expand Down Expand Up @@ -948,6 +953,45 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol
spqrlog.Zero.Debug().Msg("successfully add all key ranges")

rCl := routerproto.NewTopologyServiceClient(cc)
if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{
Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort),
}); err != nil {
return err
}

if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
} else {
spqrlog.Zero.Debug().
Interface("response", resp).
Msg("open router response")
}

return nil
}

// TODO : unit tests
func (qc *qdbCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, qRouter *topology.Router) error {
spqrlog.Zero.Debug().
Str("address", qRouter.Address).
Msg("qdb coordinator: sync router metadata")

cc, err := DialRouter(qRouter)
if err != nil {
return err
}
defer cc.Close()

/* Update current coordinator address. */
/* Todo: check that router metadata is in sync. */

rCl := routerproto.NewTopologyServiceClient(cc)
if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{
Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort),
}); err != nil {
return err
}

if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
} else {
Expand Down
2 changes: 1 addition & 1 deletion docker/router/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ FROM spqr-base-image
RUN apt-get update && apt-get install -y postgresql-client
COPY ./docker/router/ssl/localhost.crt /etc/spqr/ssl/server.crt
COPY ./docker/router/ssl/localhost.key /etc/spqr/ssl/server.key
ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} COORD_CONFIG_PATH=${COORDINATOR_CONFIG=/spqr/docker/coordinator/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run --config ${CONFIG_PATH} --coordinator-config ${COORD_CONFIG_PATH} --proto-debug
ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} COORD_CONFIG_PATH=${COORDINATOR_CONFIG=/spqr/docker/coordinator/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run --config ${CONFIG_PATH} --coordinator-config ${COORD_CONFIG_PATH} >> ${ROUTER_LOG}
66 changes: 66 additions & 0 deletions examples/router-etcd-init.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
log_level: debug

host: '::1'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7010'

world_shard_fallback: true
router_mode: PROXY

use_coordinator_init: true

with_coordinator: true

frontend_tls:
key_file: /etc/odyssey/ssl/server.key
cert_file: /etc/odyssey/ssl/server.crt
sslmode: disable

frontend_rules:
- usr: user1
db: db1
pool_mode: TRANSACTION
pool_prepared_statement: true
auth_rule:
auth_method: ok
password: strong
- pool_mode: TRANSACTION
pool_default: true
pool_prepared_statement: false
auth_rule:
auth_method: ok

backend_rules:
- usr: user1
db: db1
pool_discard: false
pool_rollback: true
- pool_default: true
pool_discard: false
pool_rollback: true

shards:
sh1:
tls:
key_file: /etc/odyssey/ssl/server.key
sslmode: disable
cert_file: /etc/odyssey/ssl/server.crt
db: db1
usr: user1
pwd: 12345678
type: DATA
hosts:
- 'localhost:5550'
sh2:
tls:
key_file: /etc/odyssey/ssl/server.key
sslmode: disable
cert_file: /etc/odyssey/ssl/server.crt
db: db1
usr: user1
pwd: 12345678
type: DATA
hosts:
- 'localhost:5551'

9 changes: 9 additions & 0 deletions pkg/coord/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ func (a *Adapter) SyncRouterMetadata(ctx context.Context, router *topology.Route
return err
}

// SyncRouterCoordinatorAddress implements meta.EntityMgr.
func (a *Adapter) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
c := proto.NewRouterServiceClient(a.conn)
_, err := c.SyncMetadata(ctx, &proto.SyncMetadataRequest{
Router: topology.RouterToProto(router),
})
return err
}

// TODO : unit tests
// TODO : implement
func (a *Adapter) AddDataShard(ctx context.Context, shard *datashards.DataShard) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,10 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo
return ErrNotCoordinator
}

func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return qr.qdb.UpdateCoordinator(ctx, addr)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/models/topology/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type RouterMgr interface {
ListRouters(ctx context.Context) ([]*Router, error)
UnregisterRouter(ctx context.Context, id string) error
SyncRouterMetadata(ctx context.Context, router *Router) error
SyncRouterCoordinatorAddress(ctx context.Context, router *Router) error
UpdateCoordinator(ctx context.Context, address string) error
GetCoordinator(ctx context.Context) (string, error)
}
Expand Down
3 changes: 2 additions & 1 deletion qdb/etcdqdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"path"
"sort"
"sync"
Expand Down Expand Up @@ -512,7 +513,7 @@ func (q *EtcdQDB) TryCoordinatorLock(ctx context.Context) error {
return err
}

op := clientv3.OpPut(coordLockKey, config.CoordinatorConfig().Host, clientv3.WithLease(clientv3.LeaseID(leaseGrantResp.ID)))
op := clientv3.OpPut(coordLockKey, net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort), clientv3.WithLease(clientv3.LeaseID(leaseGrantResp.ID)))
tx := q.cli.Txn(ctx).If(clientv3util.KeyMissing(coordLockKey)).Then(op)
stat, err := tx.Commit()
if err != nil {
Expand Down
77 changes: 76 additions & 1 deletion router/instance/etcd.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,80 @@
package instance

import (
"context"
"time"

"github.com/pg-sharding/spqr/pkg/models/distributions"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
)

type EtcdMetadataBootstraper struct {
RouterMetadataBootstraper
QdbAddr string
}

// InitializeMetadata implements RouterMetadataBootstraper.
func (e *EtcdMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error {
etcdConn, err := qdb.NewEtcdQDB(e.QdbAddr)
if err != nil {
return err
}
defer etcdConn.Client().Close()

/* Initialize distributions */
ds, err := etcdConn.ListDistributions(ctx)
if err != nil {
return err
}

for _, d := range ds {
if err := r.Console().Mgr().CreateDistribution(ctx, distributions.DistributionFromDB(d)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance")
return err
}

/* initialize key ranges within distribution */
krs, err := etcdConn.ListKeyRanges(ctx, d.ID)
if err != nil {
return err
}

for _, ckr := range krs {
if err := r.Console().Mgr().CreateKeyRange(ctx, kr.KeyRangeFromDB(ckr)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance")
return err
}
}
}

retryCnt := 50

for {
c, err := etcdConn.GetCoordinator(ctx)
if err != nil {
if retryCnt > 0 {
/* await the roiter to appear */
time.Sleep(time.Second)
retryCnt--
continue
}
return err
}

err = r.Console().Mgr().UpdateCoordinator(ctx, c)

if err == nil {
break
}
return err
}

r.Initialize()

return nil
}

func NewEtcdMetadataBootstraper(QdbAddr string) RouterMetadataBootstraper {
return &EtcdMetadataBootstraper{QdbAddr: QdbAddr}
}
Loading

0 comments on commit 75c910a

Please sign in to comment.