From e7c1e7c415f8e5fdd6a3affc3657f382ba3b62b2 Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 2 Apr 2024 08:59:51 +0000 Subject: [PATCH] Init etcd basics fixes fixes modify config for coodinator test tmp tmp tmp tmp --- cmd/router/main.go | 20 +++++ coordinator/provider/coordinator.go | 77 ++++--------------- docker/router/Dockerfile | 2 +- examples/coordinator-init.yaml | 66 ++++++++++++++++ router/instance/etcd.go | 77 ++++++++++++++++++- router/instance/instance.go | 30 +++----- test/feature/conf/router_cluster.yaml | 46 +++++++++++ .../feature/conf/router_with_coordinator.yaml | 1 + test/feature/docker-compose.yaml | 2 + test/feature/features/coordinator.feature | 4 + test/feature/spqr_test.go | 1 + 11 files changed, 244 insertions(+), 82 deletions(-) create mode 100644 examples/coordinator-init.yaml create mode 100644 test/feature/conf/router_cluster.yaml diff --git a/cmd/router/main.go b/cmd/router/main.go index 5b7dc33ad..ee752d03f 100644 --- a/cmd/router/main.go +++ b/cmd/router/main.go @@ -317,6 +317,26 @@ var runCmd = &cobra.Command{ } }() + /* initialize metadata */ + if rcfg.UseInitSQL { + i := instance.NewInitSQLMetadataBootstraper(rcfg.InitSQL) + if err := i.InitializeMetadata(ctx, router); err != nil { + return err + } + } else if rcfg.UseCoordinatorInit { + /* load config if not yet */ + if err := config.LoadCoordinatorCfg(ccfgPath); err != nil { + return err + } + e := instance.NewEtcdMetadataBootstraper(config.CoordinatorConfig().QdbAddr) + if err := e.InitializeMetadata(ctx, router); err != nil { + return err + } + } else { + /* TODO: maybe error-out? */ + router.Initialize() + } + wg := &sync.WaitGroup{} wg.Add(1) diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index f526cb9d9..249ad28df 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -73,6 +73,9 @@ func (ci grpcConnectionIterator) IterRouter(cb func(cc *grpc.ClientConn, addr st if err := cb(cc, r.Address); err != nil { return err } + if err := cc.Close(); err != nil { + return err + } } return nil } @@ -238,19 +241,20 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) { if err := qc.SyncRouterMetadata(ctx, internalR); err != nil { return err } - if _, err := rrClient.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil { - return err - } /* Mark router as opened in qdb */ - err := qc.db.OpenRouter(ctx, internalR.ID) + err := qc.db.CloseRouter(ctx, internalR.ID) if err != nil { return err } + case routerproto.RouterStatus_OPENED: spqrlog.Zero.Debug().Msg("router is opened") /* TODO: check router metadata consistency */ + if err := qc.SyncRouterMetadata(ctx, internalR); err != nil { + return err + } /* Mark router as opened in qdb */ err := qc.db.OpenRouter(ctx, internalR.ID) @@ -391,6 +395,8 @@ func (qc *qdbCoordinator) traverseRouters(ctx context.Context, cb func(cc *grpc. } defer cc.Close() + defer cc.Close() + if err := cb(cc); err != nil { spqrlog.Zero.Debug().Err(err).Str("router id", rtr.ID).Msg("traverse routers") return err @@ -887,67 +893,16 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol } defer cc.Close() - // Configure distributions - dsCl := routerproto.NewDistributionServiceClient(cc) - spqrlog.Zero.Debug().Msg("qdb coordinator: configure distributions") - dss, err := qc.db.ListDistributions(ctx) - if err != nil { - return err - } - resp, err := dsCl.ListDistributions(ctx, &routerproto.ListDistributionsRequest{}) - if err != nil { - return err - } - if _, err = dsCl.DropDistribution(ctx, &routerproto.DropDistributionRequest{ - Ids: func() []string { - res := make([]string, len(resp.Distributions)) - for i, ds := range resp.Distributions { - res[i] = ds.Id - } - return res - }(), - }); err != nil { - return err - } - if _, err = dsCl.CreateDistribution(ctx, &routerproto.CreateDistributionRequest{ - Distributions: func() []*routerproto.Distribution { - res := make([]*routerproto.Distribution, len(dss)) - for i, ds := range dss { - res[i] = distributions.DistributionToProto(distributions.DistributionFromDB(ds)) - } - return res - }(), - }); err != nil { - return err - } + /* Update current coordinator address. */ + /* Todo: check that router metadata is in sync. */ - // Configure key ranges. - krClient := routerproto.NewKeyRangeServiceClient(cc) - spqrlog.Zero.Debug().Msg("qdb coordinator: configure key ranges") - keyRanges, err := qc.db.ListAllKeyRanges(ctx) - if err != nil { - return err - } - if _, err = krClient.DropAllKeyRanges(ctx, &routerproto.DropAllKeyRangesRequest{}); err != nil { + rCl := routerproto.NewTopologyServiceClient(cc) + if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{ + Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort), + }); err != nil { return err } - for _, keyRange := range keyRanges { - resp, err := krClient.CreateKeyRange(ctx, &routerproto.CreateKeyRangeRequest{ - KeyRangeInfo: kr.KeyRangeFromDB(keyRange).ToProto(), - }) - - if err != nil { - return err - } - - spqrlog.Zero.Debug(). - Interface("response", resp). - Msg("got response while adding key range") - } - spqrlog.Zero.Debug().Msg("successfully add all key ranges") - - rCl := routerproto.NewTopologyServiceClient(cc) if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil { return err } else { diff --git a/docker/router/Dockerfile b/docker/router/Dockerfile index 5a5b4f076..7654f8a4c 100644 --- a/docker/router/Dockerfile +++ b/docker/router/Dockerfile @@ -3,4 +3,4 @@ FROM spqr-base-image RUN apt-get update && apt-get install -y postgresql-client COPY ./docker/router/ssl/localhost.crt /etc/spqr/ssl/server.crt COPY ./docker/router/ssl/localhost.key /etc/spqr/ssl/server.key -ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} COORD_CONFIG_PATH=${COORDINATOR_CONFIG=/spqr/docker/coordinator/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run --config ${CONFIG_PATH} --coordinator-config ${COORD_CONFIG_PATH} --proto-debug +ENTRYPOINT CONFIG_PATH=${ROUTER_CONFIG=/spqr/docker/router/cfg.yaml} COORD_CONFIG_PATH=${COORDINATOR_CONFIG=/spqr/docker/coordinator/cfg.yaml} && CUR_HOST=$(cat ${CONFIG_PATH} | grep "host:") && sed -i "s/${CUR_HOST}/${ROUTER_HOST=${CUR_HOST}}/g" ${CONFIG_PATH} && /spqr/spqr-router run --config ${CONFIG_PATH} --coordinator-config ${COORD_CONFIG_PATH} >> ${ROUTER_LOG} diff --git a/examples/coordinator-init.yaml b/examples/coordinator-init.yaml new file mode 100644 index 000000000..0172e2556 --- /dev/null +++ b/examples/coordinator-init.yaml @@ -0,0 +1,66 @@ +log_level: debug + +host: '::1' +router_port: '6432' +admin_console_port: '7432' +grpc_api_port: '7010' + +world_shard_fallback: true +router_mode: PROXY + +use_coordinator_init: true + +with_coordinator: true + +frontend_tls: + key_file: /etc/odyssey/ssl/server.key + cert_file: /etc/odyssey/ssl/server.crt + sslmode: disable + +frontend_rules: + - usr: user1 + db: db1 + pool_mode: TRANSACTION + pool_prepared_statement: true + auth_rule: + auth_method: ok + password: strong + - pool_mode: TRANSACTION + pool_default: true + pool_prepared_statement: false + auth_rule: + auth_method: ok + +backend_rules: + - usr: user1 + db: db1 + pool_discard: false + pool_rollback: true + - pool_default: true + pool_discard: false + pool_rollback: true + +shards: + sh1: + tls: + key_file: /etc/odyssey/ssl/server.key + sslmode: disable + cert_file: /etc/odyssey/ssl/server.crt + db: db1 + usr: user1 + pwd: 12345678 + type: DATA + hosts: + - 'localhost:5550' + sh2: + tls: + key_file: /etc/odyssey/ssl/server.key + sslmode: disable + cert_file: /etc/odyssey/ssl/server.crt + db: db1 + usr: user1 + pwd: 12345678 + type: DATA + hosts: + - 'localhost:5551' + diff --git a/router/instance/etcd.go b/router/instance/etcd.go index a726047c4..9cac3f4e7 100644 --- a/router/instance/etcd.go +++ b/router/instance/etcd.go @@ -1,5 +1,80 @@ package instance +import ( + "context" + "time" + + "github.com/pg-sharding/spqr/pkg/models/distributions" + "github.com/pg-sharding/spqr/pkg/models/kr" + "github.com/pg-sharding/spqr/pkg/spqrlog" + "github.com/pg-sharding/spqr/qdb" +) + type EtcdMetadataBootstraper struct { - RouterMetadataBootstraper + QdbAddr string +} + +// InitializeMetadata implements RouterMetadataBootstraper. +func (e *EtcdMetadataBootstraper) InitializeMetadata(ctx context.Context, r RouterInstance) error { + etcdConn, err := qdb.NewEtcdQDB(e.QdbAddr) + if err != nil { + return err + } + defer etcdConn.Client().Close() + + /* Initialize distributions */ + ds, err := etcdConn.ListDistributions(ctx) + if err != nil { + return err + } + + for _, d := range ds { + if err := r.Console().Mgr().CreateDistribution(ctx, distributions.DistributionFromDB(d)); err != nil { + spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance") + return err + } + + /* initialize key ranges within distribution */ + krs, err := etcdConn.ListKeyRanges(ctx, d.ID) + if err != nil { + return err + } + + for _, ckr := range krs { + if err := r.Console().Mgr().CreateKeyRange(ctx, kr.KeyRangeFromDB(ckr)); err != nil { + spqrlog.Zero.Error().Err(err).Msg("failed to initialize instance") + return err + } + } + } + + retryCnt := 50 + + for { + c, err := etcdConn.GetCoordinator(ctx) + if err != nil { + if retryCnt > 0 { + /* await the roiter to appear */ + time.Sleep(time.Second) + retryCnt-- + continue + } + return err + } + + err = r.Console().Mgr().UpdateCoordinator(ctx, c) + + if err == nil { + break + } + return err + } + + r.Initialize() + + return nil +} + +func NewEtcdMetadataBootstraper(QdbAddr string) RouterMetadataBootstraper { + return &EtcdMetadataBootstraper{QdbAddr: QdbAddr} } diff --git a/router/instance/instance.go b/router/instance/instance.go index 22475b3fa..466a17d90 100644 --- a/router/instance/instance.go +++ b/router/instance/instance.go @@ -29,6 +29,7 @@ type RouterInstance interface { Initialize() bool Console() console.Console + Config() *config.Router } type InstanceImpl struct { @@ -38,10 +39,10 @@ type InstanceImpl struct { Mgr meta.EntityMgr Writer workloadlog.WorkloadLog - stchan chan struct{} - addr string - frTLS *tls.Config - WithJaeger bool + stchan chan struct{} + addr string + frTLS *tls.Config + cfg *config.Router notifier *sdnotifier.Notifier } @@ -51,6 +52,10 @@ func (r *InstanceImpl) Console() console.Console { return r.AdmConsole } +func (r *InstanceImpl) Config() *config.Router { + return r.cfg +} + func (r *InstanceImpl) ID() string { return "noid" } @@ -143,24 +148,11 @@ func NewRouter(ctx context.Context, rcfg *config.Router, ns string, persist bool Mgr: lc, stchan: stchan, frTLS: frTLS, - WithJaeger: rcfg.WithJaeger, + cfg: rcfg, Writer: writ, notifier: notifier, } - /* initialize metadata */ - if rcfg.UseInitSQL { - i := NewInitSQLMetadataBootstraper(rcfg.InitSQL) - if err := i.InitializeMetadata(ctx, r); err != nil { - return nil, err - } - } else if rcfg.UseCoordinatorInit { - panic("implement me") - } else { - /* TODO: maybe error-out? */ - r.Initialize() - } - return r, nil } @@ -201,7 +193,7 @@ func (r *InstanceImpl) serv(netconn net.Conn, pt port.RouterPortType) error { } func (r *InstanceImpl) Run(ctx context.Context, listener net.Listener, pt port.RouterPortType) error { - if r.WithJaeger { + if r.cfg.WithJaeger { closer, err := r.initJaegerTracer(r.RuleRouter.Config()) if err != nil { return fmt.Errorf("could not initialize jaeger tracer: %s", err) diff --git a/test/feature/conf/router_cluster.yaml b/test/feature/conf/router_cluster.yaml new file mode 100644 index 000000000..ab5fab0fd --- /dev/null +++ b/test/feature/conf/router_cluster.yaml @@ -0,0 +1,46 @@ +host: 'regress_router' +router_port: '6432' +admin_console_port: '7432' +grpc_api_port: '7000' +router_mode: PROXY +log_level: debug +time_quantiles: + - 0.75 +world_shard_fallback: true +show_notice_messages: true +use_coordinator_init: true +frontend_rules: + - db: regress + usr: regress + pool_default: true + pool_mode: TRANSACTION + auth_rule: + auth_method: ok +shards: + sh1: + db: regress + usr: regress + pwd: 12345678 + type: DATA + hosts: + - 'spqr_shard_1:6432' + sh2: + db: regress + usr: regress + pwd: 12345678 + type: DATA + hosts: + - 'spqr_shard_2:6432' + +backend_rules: + - db: regress + usr: regress + pool_discard: true + pool_rollback: true + auth_rules: + sh1: + auth_method: md5 + password: 12345678 + sh2: + auth_method: md5 + password: 12345678 diff --git a/test/feature/conf/router_with_coordinator.yaml b/test/feature/conf/router_with_coordinator.yaml index 98365631b..02773a2d4 100644 --- a/test/feature/conf/router_with_coordinator.yaml +++ b/test/feature/conf/router_with_coordinator.yaml @@ -9,6 +9,7 @@ time_quantiles: world_shard_fallback: true show_notice_messages: true with_coordinator: true +use_coordinator_init: true frontend_rules: - db: regress usr: regress diff --git a/test/feature/docker-compose.yaml b/test/feature/docker-compose.yaml index 0729988a2..0bf800f4b 100644 --- a/test/feature/docker-compose.yaml +++ b/test/feature/docker-compose.yaml @@ -40,6 +40,7 @@ services: - "7012:7002" environment: - ROUTER_CONFIG=${ROUTER_CONFIG} + - ROUTER_LOG='router1.log' - 'ROUTER_HOST=host: ''regress_router''' - COORDINATOR_CONFIG=${COORDINATOR_CONFIG} hostname: regress_router @@ -65,6 +66,7 @@ services: - "7022:7002" environment: - ROUTER_CONFIG=${ROUTER_CONFIG} + - ROUTER_LOG='router2.log' - 'ROUTER_HOST=host: ''regress_router_2''' - COORDINATOR_CONFIG=${COORDINATOR_CONFIG_2} hostname: regress_router_2 diff --git a/test/feature/features/coordinator.feature b/test/feature/features/coordinator.feature index 3a546ce95..a40eaca20 100644 --- a/test/feature/features/coordinator.feature +++ b/test/feature/features/coordinator.feature @@ -3,6 +3,10 @@ Feature: Coordinator test # # Make host "coordinator" take control # + Given cluster environment is + """ + ROUTER_CONFIG=/spqr/test/feature/conf/router_cluster.yaml + """ Given cluster is up and running And host "coordinator2" is stopped And host "coordinator2" is started diff --git a/test/feature/spqr_test.go b/test/feature/spqr_test.go index 2288438f9..a13e50912 100644 --- a/test/feature/spqr_test.go +++ b/test/feature/spqr_test.go @@ -868,6 +868,7 @@ func InitializeScenario(s *godog.ScenarioContext, t *testing.T, debug bool) { tctx.composerEnv = []string{ "ROUTER_CONFIG=/spqr/test/feature/conf/router.yaml", "COORDINATOR_CONFIG=/spqr/test/feature/conf/coordinator.yaml", + "COORDINATOR_CONFIG_2=/spqr/test/feature/conf/coordinator.yaml", } tctx.variables = make(map[string]interface{}) return ctx, nil