diff --git a/cmd/router/main.go b/cmd/router/main.go index f2443811d..cbe660dfd 100644 --- a/cmd/router/main.go +++ b/cmd/router/main.go @@ -289,6 +289,26 @@ var runCmd = &cobra.Command{ } }() + /* initialize metadata */ + if rcfg.UseInitSQL { + i := instance.NewInitSQLMetadataBootstraper(rcfg.InitSQL) + if err := i.InitializeMetadata(ctx, router); err != nil { + return err + } + } else if rcfg.UseCoordinatorInit { + /* load config if not yet */ + if err := config.LoadCoordinatorCfg(ccfgPath); err != nil { + return err + } + e := instance.NewEtcdMetadataBootstraper(config.CoordinatorConfig().QdbAddr) + if err := e.InitializeMetadata(ctx, router); err != nil { + return err + } + } else { + /* TODO: maybe error-out? */ + router.Initialize() + } + wg := &sync.WaitGroup{} wg.Add(1) diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index c8d436c43..9c606c5eb 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -71,6 +71,9 @@ func (ci grpcConnectionIterator) IterRouter(cb func(cc *grpc.ClientConn, addr st if err := cb(cc, r.Address); err != nil { return err } + if err := cc.Close(); err != nil { + return err + } } return nil } @@ -221,6 +224,8 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) { return err } + defer cc.Close() + rrClient := routerproto.NewTopologyServiceClient(cc) resp, err := rrClient.GetRouterStatus(ctx, &routerproto.GetRouterStatusRequest{}) @@ -234,19 +239,20 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) { if err := qc.SyncRouterMetadata(ctx, internalR); err != nil { return err } - if _, err := rrClient.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil { - return err - } /* Mark router as opened in qdb */ - err := qc.db.OpenRouter(ctx, internalR.ID) + err := qc.db.CloseRouter(ctx, internalR.ID) if err != nil { return err } + case routerproto.RouterStatus_OPENED: spqrlog.Zero.Debug().Msg("router is opened") /* TODO: check router metadata consistency */ + if err := qc.SyncRouterMetadata(ctx, internalR); err != nil { + return err + } /* Mark router as opened in qdb */ err := qc.db.OpenRouter(ctx, internalR.ID) @@ -386,6 +392,8 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc. return err } + defer cc.Close() + if err := cb(cc); err != nil { spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers") } @@ -881,67 +889,16 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol } defer cc.Close() - // Configure distributions - dsCl := routerproto.NewDistributionServiceClient(cc) - spqrlog.Zero.Debug().Msg("qdb coordinator: configure distributions") - dss, err := qc.db.ListDistributions(ctx) - if err != nil { - return err - } - resp, err := dsCl.ListDistributions(ctx, &routerproto.ListDistributionsRequest{}) - if err != nil { - return err - } - if _, err = dsCl.DropDistribution(ctx, &routerproto.DropDistributionRequest{ - Ids: func() []string { - res := make([]string, len(resp.Distributions)) - for i, ds := range resp.Distributions { - res[i] = ds.Id - } - return res - }(), - }); err != nil { - return err - } - if _, err = dsCl.CreateDistribution(ctx, &routerproto.CreateDistributionRequest{ - Distributions: func() []*routerproto.Distribution { - res := make([]*routerproto.Distribution, len(dss)) - for i, ds := range dss { - res[i] = distributions.DistributionToProto(distributions.DistributionFromDB(ds)) - } - return res - }(), - }); err != nil { - return err - } + /* Update current coordinator address. */ + /* Todo: check that router metadata is in sync. */ - // Configure key ranges. - krClient := routerproto.NewKeyRangeServiceClient(cc) - spqrlog.Zero.Debug().Msg("qdb coordinator: configure key ranges") - keyRanges, err := qc.db.ListAllKeyRanges(ctx) - if err != nil { - return err - } - if _, err = krClient.DropAllKeyRanges(ctx, &routerproto.DropAllKeyRangesRequest{}); err != nil { + rCl := routerproto.NewTopologyServiceClient(cc) + if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{ + Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort), + }); err != nil { return err } - for _, keyRange := range keyRanges { - resp, err := krClient.CreateKeyRange(ctx, &routerproto.CreateKeyRangeRequest{ - KeyRangeInfo: kr.KeyRangeFromDB(keyRange).ToProto(), - }) - - if err != nil { - return err - } - - spqrlog.Zero.Debug(). - Interface("response", resp). - Msg("got response while adding key range") - } - spqrlog.Zero.Debug().Msg("successfully add all key ranges") - - rCl := routerproto.NewTopologyServiceClient(cc) if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil { return err } else { diff --git a/examples/coordinator-init.yaml b/examples/coordinator-init.yaml new file mode 100644 index 000000000..0172e2556 --- /dev/null +++ b/examples/coordinator-init.yaml @@ -0,0 +1,66 @@ +log_level: debug + +host: '::1' +router_port: '6432' +admin_console_port: '7432' +grpc_api_port: '7010' + +world_shard_fallback: true +router_mode: PROXY + +use_coordinator_init: true + +with_coordinator: true + +frontend_tls: + key_file: /etc/odyssey/ssl/server.key + cert_file: /etc/odyssey/ssl/server.crt + sslmode: disable + +frontend_rules: + - usr: user1 + db: db1 + pool_mode: TRANSACTION + pool_prepared_statement: true + auth_rule: + auth_method: ok + password: strong + - pool_mode: TRANSACTION + pool_default: true + pool_prepared_statement: false + auth_rule: + auth_method: ok + +backend_rules: + - usr: user1 + db: db1 + pool_discard: false + pool_rollback: true + - pool_default: true + pool_discard: false + pool_rollback: true + +shards: + sh1: + tls: + key_file: /etc/odyssey/ssl/server.key + sslmode: disable + cert_file: /etc/odyssey/ssl/server.crt + db: db1 + usr: user1 + pwd: 12345678 + type: DATA + hosts: + - 'localhost:5550' + sh2: + tls: + key_file: /etc/odyssey/ssl/server.key + sslmode: disable + cert_file: /etc/odyssey/ssl/server.crt + db: db1 + usr: user1 + pwd: 12345678 + type: DATA + hosts: + - 'localhost:5551' + diff --git a/router/instance/etcd.go b/router/instance/etcd.go index a726047c4..63e6f2f51 100644 --- a/router/instance/etcd.go +++ b/router/instance/etcd.go @@ -1,5 +1,65 @@ package instance +import ( + "context" + + "github.com/pg-sharding/spqr/pkg/models/distributions" + "github.com/pg-sharding/spqr/pkg/models/kr" + "github.com/pg-sharding/spqr/pkg/spqrlog" + "github.com/pg-sharding/spqr/qdb" +) + type EtcdMetadataBootstraper struct { - RouterMetadataBootstraper + QdbAddr string +} + +// InitializeMetadata implements RouterMetadataBootstraper. +func (e *EtcdMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error { + etcdConn, err := qdb.NewEtcdQDB(e.QdbAddr) + if err != nil { + return err + } + defer etcdConn.Client().Close() + + /* Initialize distributions */ + ds, err := etcdConn.ListDistributions(ctx) + if err != nil { + return err + } + + for _, d := range ds { + if err := r.Console().Mgr().CreateDistribution(ctx, distributions.DistributionFromDB(d)); err != nil { + spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance") + return err + } + + /* initialize key ranges within distribution */ + krs, err := etcdConn.ListKeyRanges(ctx, d.ID) + if err != nil { + return err + } + + for _, ckr := range krs { + if err := r.Console().Mgr().CreateKeyRange(ctx, kr.KeyRangeFromDB(ckr)); err != nil { + spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance") + return err + } + } + } + + c, err := etcdConn.GetCoordinator(ctx) + if err != nil { + return err + } + if err := r.Console().Mgr().UpdateCoordinator(ctx, c); err != nil { + return err + } + + r.Initialize() + + return nil +} + +func NewEtcdMetadataBootstraper(QdbAddr string) RouterMetadataBootstraper { + return &EtcdMetadataBootstraper{QdbAddr: QdbAddr} } diff --git a/router/instance/instance.go b/router/instance/instance.go index 22475b3fa..466a17d90 100644 --- a/router/instance/instance.go +++ b/router/instance/instance.go @@ -29,6 +29,7 @@ type RouterInstance interface { Initialize() bool Console() console.Console + Config() *config.Router } type InstanceImpl struct { @@ -38,10 +39,10 @@ type InstanceImpl struct { Mgr meta.EntityMgr Writer workloadlog.WorkloadLog - stchan chan struct{} - addr string - frTLS *tls.Config - WithJaeger bool + stchan chan struct{} + addr string + frTLS *tls.Config + cfg *config.Router notifier *sdnotifier.Notifier } @@ -51,6 +52,10 @@ func (r *InstanceImpl) Console() console.Console { return r.AdmConsole } +func (r *InstanceImpl) Config() *config.Router { + return r.cfg +} + func (r *InstanceImpl) ID() string { return "noid" } @@ -143,24 +148,11 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool Mgr: lc, stchan: stchan, frTLS: frTLS, - WithJaeger: rcfg.WithJaeger, + cfg: rcfg, Writer: writ, notifier: notifier, } - /* initialize metadata */ - if rcfg.UseInitSQL { - i := NewInitSQLMetadataBootstraper(rcfg.InitSQL) - if err := i.InitializeMetadata(ctx, r); err != nil { - return nil, err - } - } else if rcfg.UseCoordinatorInit { - panic("implement me") - } else { - /* TODO: maybe error-out? */ - r.Initialize() - } - return r, nil } @@ -201,7 +193,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error { } func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error { - if r.WithJaeger { + if r.cfg.WithJaeger { closer, err := r.initJaegerTracer(r.RuleRouter.Config()) if err != nil { return fmt.Errorf("could not initialize jaeger tracer: %s", err)