Skip to content

Commit

Permalink
Init etcd basics
Browse files Browse the repository at this point in the history
fixes

fixes
  • Loading branch information
reshke committed Apr 2, 2024
1 parent 8c813ab commit 2b51656
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 81 deletions.
20 changes: 20 additions & 0 deletions cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,26 @@ var runCmd = &cobra.Command{
}
}()

/* initialize metadata */
if rcfg.UseInitSQL {
i := instance.NewInitSQLMetadataBootstraper(rcfg.InitSQL)
if err := i.InitializeMetadata(ctx, router); err != nil {
return err
}
} else if rcfg.UseCoordinatorInit {
/* load config if not yet */
if err := config.LoadCoordinatorCfg(ccfgPath); err != nil {
return err
}
e := instance.NewEtcdMetadataBootstraper(config.CoordinatorConfig().QdbAddr)
if err := e.InitializeMetadata(ctx, router); err != nil {
return err
}
} else {
/* TODO: maybe error-out? */
router.Initialize()
}

wg := &sync.WaitGroup{}

wg.Add(1)
Expand Down
79 changes: 18 additions & 61 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ func (ci grpcConnectionIterator) IterRouter(cb func(cc *grpc.ClientConn, addr st
if err := cb(cc, r.Address); err != nil {
return err
}
if err := cc.Close(); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -221,6 +224,8 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
return err
}

defer cc.Close()

rrClient := routerproto.NewTopologyServiceClient(cc)

resp, err := rrClient.GetRouterStatus(ctx, &routerproto.GetRouterStatusRequest{})
Expand All @@ -234,19 +239,20 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
if err := qc.SyncRouterMetadata(ctx, internalR); err != nil {
return err
}
if _, err := rrClient.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
}

/* Mark router as opened in qdb */
err := qc.db.OpenRouter(ctx, internalR.ID)
err := qc.db.CloseRouter(ctx, internalR.ID)
if err != nil {
return err
}

case routerproto.RouterStatus_OPENED:
spqrlog.Zero.Debug().Msg("router is opened")

/* TODO: check router metadata consistency */
if err := qc.SyncRouterMetadata(ctx, internalR); err != nil {
return err
}

/* Mark router as opened in qdb */
err := qc.db.OpenRouter(ctx, internalR.ID)
Expand Down Expand Up @@ -386,6 +392,8 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc.
return err
}

defer cc.Close()

if err := cb(cc); err != nil {
spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers")
}
Expand Down Expand Up @@ -881,67 +889,16 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol
}
defer cc.Close()

// Configure distributions
dsCl := routerproto.NewDistributionServiceClient(cc)
spqrlog.Zero.Debug().Msg("qdb coordinator: configure distributions")
dss, err := qc.db.ListDistributions(ctx)
if err != nil {
return err
}
resp, err := dsCl.ListDistributions(ctx, &routerproto.ListDistributionsRequest{})
if err != nil {
return err
}
if _, err = dsCl.DropDistribution(ctx, &routerproto.DropDistributionRequest{
Ids: func() []string {
res := make([]string, len(resp.Distributions))
for i, ds := range resp.Distributions {
res[i] = ds.Id
}
return res
}(),
}); err != nil {
return err
}
if _, err = dsCl.CreateDistribution(ctx, &routerproto.CreateDistributionRequest{
Distributions: func() []*routerproto.Distribution {
res := make([]*routerproto.Distribution, len(dss))
for i, ds := range dss {
res[i] = distributions.DistributionToProto(distributions.DistributionFromDB(ds))
}
return res
}(),
}); err != nil {
return err
}
/* Update current coordinator address. */
/* Todo: check that router metadata is in sync. */

// Configure key ranges.
krClient := routerproto.NewKeyRangeServiceClient(cc)
spqrlog.Zero.Debug().Msg("qdb coordinator: configure key ranges")
keyRanges, err := qc.db.ListAllKeyRanges(ctx)
if err != nil {
return err
}
if _, err = krClient.DropAllKeyRanges(ctx, &routerproto.DropAllKeyRangesRequest{}); err != nil {
rCl := routerproto.NewTopologyServiceClient(cc)
if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{
Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort),
}); err != nil {
return err
}

for _, keyRange := range keyRanges {
resp, err := krClient.CreateKeyRange(ctx, &routerproto.CreateKeyRangeRequest{
KeyRangeInfo: kr.KeyRangeFromDB(keyRange).ToProto(),
})

if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("got response while adding key range")
}
spqrlog.Zero.Debug().Msg("successfully add all key ranges")

rCl := routerproto.NewTopologyServiceClient(cc)
if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
} else {
Expand Down
66 changes: 66 additions & 0 deletions examples/coordinator-init.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
log_level: debug

host: '::1'
router_port: '6432'
admin_console_port: '7432'
grpc_api_port: '7010'

world_shard_fallback: true
router_mode: PROXY

use_coordinator_init: true

with_coordinator: true

frontend_tls:
key_file: /etc/odyssey/ssl/server.key
cert_file: /etc/odyssey/ssl/server.crt
sslmode: disable

frontend_rules:
- usr: user1
db: db1
pool_mode: TRANSACTION
pool_prepared_statement: true
auth_rule:
auth_method: ok
password: strong
- pool_mode: TRANSACTION
pool_default: true
pool_prepared_statement: false
auth_rule:
auth_method: ok

backend_rules:
- usr: user1
db: db1
pool_discard: false
pool_rollback: true
- pool_default: true
pool_discard: false
pool_rollback: true

shards:
sh1:
tls:
key_file: /etc/odyssey/ssl/server.key
sslmode: disable
cert_file: /etc/odyssey/ssl/server.crt
db: db1
usr: user1
pwd: 12345678
type: DATA
hosts:
- 'localhost:5550'
sh2:
tls:
key_file: /etc/odyssey/ssl/server.key
sslmode: disable
cert_file: /etc/odyssey/ssl/server.crt
db: db1
usr: user1
pwd: 12345678
type: DATA
hosts:
- 'localhost:5551'

62 changes: 61 additions & 1 deletion router/instance/etcd.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,65 @@
package instance

import (
"context"

"github.com/pg-sharding/spqr/pkg/models/distributions"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
)

type EtcdMetadataBootstraper struct {
RouterMetadataBootstraper
QdbAddr string
}

// InitializeMetadata implements RouterMetadataBootstraper.
func (e *EtcdMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error {
etcdConn, err := qdb.NewEtcdQDB(e.QdbAddr)
if err != nil {
return err
}
defer etcdConn.Client().Close()

/* Initialize distributions */
ds, err := etcdConn.ListDistributions(ctx)
if err != nil {
return err
}

for _, d := range ds {
if err := r.Console().Mgr().CreateDistribution(ctx, distributions.DistributionFromDB(d)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance")
return err
}

/* initialize key ranges within distribution */
krs, err := etcdConn.ListKeyRanges(ctx, d.ID)
if err != nil {
return err
}

for _, ckr := range krs {
if err := r.Console().Mgr().CreateKeyRange(ctx, kr.KeyRangeFromDB(ckr)); err != nil {
spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance")
return err
}
}
}

c, err := etcdConn.GetCoordinator(ctx)
if err != nil {
return err
}
if err := r.Console().Mgr().UpdateCoordinator(ctx, c); err != nil {
return err
}

r.Initialize()

return nil
}

func NewEtcdMetadataBootstraper(QdbAddr string) RouterMetadataBootstraper {
return &EtcdMetadataBootstraper{QdbAddr: QdbAddr}
}
30 changes: 11 additions & 19 deletions router/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type RouterInstance interface {
Initialize() bool

Console() console.Console
Config() *config.Router
}

type InstanceImpl struct {
Expand All @@ -38,10 +39,10 @@ type InstanceImpl struct {
Mgr meta.EntityMgr
Writer workloadlog.WorkloadLog

stchan chan struct{}
addr string
frTLS *tls.Config
WithJaeger bool
stchan chan struct{}
addr string
frTLS *tls.Config
cfg *config.Router

notifier *sdnotifier.Notifier
}
Expand All @@ -51,6 +52,10 @@ func (r *InstanceImpl) Console() console.Console {
return r.AdmConsole
}

func (r *InstanceImpl) Config() *config.Router {
return r.cfg
}

func (r *InstanceImpl) ID() string {
return "noid"
}
Expand Down Expand Up @@ -143,24 +148,11 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool
Mgr: lc,
stchan: stchan,
frTLS: frTLS,
WithJaeger: rcfg.WithJaeger,
cfg: rcfg,
Writer: writ,
notifier: notifier,
}

/* initialize metadata */
if rcfg.UseInitSQL {
i := NewInitSQLMetadataBootstraper(rcfg.InitSQL)
if err := i.InitializeMetadata(ctx, r); err != nil {
return nil, err
}
} else if rcfg.UseCoordinatorInit {
panic("implement me")
} else {
/* TODO: maybe error-out? */
r.Initialize()
}

return r, nil
}

Expand Down Expand Up @@ -201,7 +193,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error {
}

func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error {
if r.WithJaeger {
if r.cfg.WithJaeger {
closer, err := r.initJaegerTracer(r.RuleRouter.Config())
if err != nil {
return fmt.Errorf("could not initialize jaeger tracer: %s", err)
Expand Down

0 comments on commit 2b51656

Please sign in to comment.