From ef6fc014b334a4aa5c33f8f34efb179e3222f428 Mon Sep 17 00:00:00 2001 From: reshke Date: Thu, 4 Apr 2024 13:49:50 +0000 Subject: [PATCH] tmp --- coordinator/provider/coordinator.go | 94 ++++++++++++++++++++++++++++- pkg/coord/adapter.go | 9 +++ pkg/coord/local/clocal.go | 4 ++ pkg/models/topology/routers.go | 1 + 4 files changed, 106 insertions(+), 2 deletions(-) diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index 249ad28df..729cc744f 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -238,7 +238,7 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) { switch resp.Status { case routerproto.RouterStatus_CLOSED: spqrlog.Zero.Debug().Msg("router is closed") - if err := qc.SyncRouterMetadata(ctx, internalR); err != nil { + if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil { return err } @@ -252,7 +252,7 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) { spqrlog.Zero.Debug().Msg("router is opened") /* TODO: check router metadata consistency */ - if err := qc.SyncRouterMetadata(ctx, internalR); err != nil { + if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil { return err } @@ -893,6 +893,96 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol } defer cc.Close() + // Configure distributions + dsCl := routerproto.NewDistributionServiceClient(cc) + spqrlog.Zero.Debug().Msg("qdb coordinator: configure distributions") + dss, err := qc.db.ListDistributions(ctx) + if err != nil { + return err + } + resp, err := dsCl.ListDistributions(ctx, &routerproto.ListDistributionsRequest{}) + if err != nil { + return err + } + if _, err = dsCl.DropDistribution(ctx, &routerproto.DropDistributionRequest{ + Ids: func() []string { + res := make([]string, len(resp.Distributions)) + for i, ds := range resp.Distributions { + res[i] = ds.Id + } + return res + }(), + }); err != nil { + return err + } + if _, err = dsCl.CreateDistribution(ctx, &routerproto.CreateDistributionRequest{ + Distributions: func() []*routerproto.Distribution { + res := make([]*routerproto.Distribution, len(dss)) + for i, ds := range dss { + res[i] = distributions.DistributionToProto(distributions.DistributionFromDB(ds)) + } + return res + }(), + }); err != nil { + return err + } + + // Configure key ranges. + krClient := routerproto.NewKeyRangeServiceClient(cc) + spqrlog.Zero.Debug().Msg("qdb coordinator: configure key ranges") + keyRanges, err := qc.db.ListAllKeyRanges(ctx) + if err != nil { + return err + } + if _, err = krClient.DropAllKeyRanges(ctx, &routerproto.DropAllKeyRangesRequest{}); err != nil { + return err + } + + for _, keyRange := range keyRanges { + resp, err := krClient.CreateKeyRange(ctx, &routerproto.CreateKeyRangeRequest{ + KeyRangeInfo: kr.KeyRangeFromDB(keyRange).ToProto(), + }) + + if err != nil { + return err + } + + spqrlog.Zero.Debug(). + Interface("response", resp). + Msg("got response while adding key range") + } + spqrlog.Zero.Debug().Msg("successfully add all key ranges") + + rCl := routerproto.NewTopologyServiceClient(cc) + if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{ + Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort), + }); err != nil { + return err + } + + if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil { + return err + } else { + spqrlog.Zero.Debug(). + Interface("response", resp). + Msg("open router response") + } + + return nil +} + +// TODO : unit tests +func (qc *qdbCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, qRouter *topology.Router) error { + spqrlog.Zero.Debug(). + Str("address", qRouter.Address). + Msg("qdb coordinator: sync router metadata") + + cc, err := DialRouter(qRouter) + if err != nil { + return err + } + defer cc.Close() + /* Update current coordinator address. */ /* Todo: check that router metadata is in sync. */ diff --git a/pkg/coord/adapter.go b/pkg/coord/adapter.go index 0264a8712..7dacc7ede 100644 --- a/pkg/coord/adapter.go +++ b/pkg/coord/adapter.go @@ -282,6 +282,15 @@ func (a *Adapter) SyncRouterMetadata(ctx context.Context, router *topology.Route return err } +// SyncRouterCoordinatorAddress implements meta.EntityMgr. +func (a *Adapter) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error { + c := proto.NewRouterServiceClient(a.conn) + _, err := c.SyncMetadata(ctx, &proto.SyncMetadataRequest{ + Router: topology.RouterToProto(router), + }) + return err +} + // TODO : unit tests // TODO : implement func (a *Adapter) AddDataShard(ctx context.Context, shard *datashards.DataShard) error { diff --git a/pkg/coord/local/clocal.go b/pkg/coord/local/clocal.go index 693906f78..f41a144ec 100644 --- a/pkg/coord/local/clocal.go +++ b/pkg/coord/local/clocal.go @@ -470,6 +470,10 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo return ErrNotCoordinator } +func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error { + return ErrNotCoordinator +} + func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error { return qr.qdb.UpdateCoordinator(ctx, addr) } diff --git a/pkg/models/topology/routers.go b/pkg/models/topology/routers.go index 66659c170..4f2a254f1 100644 --- a/pkg/models/topology/routers.go +++ b/pkg/models/topology/routers.go @@ -18,6 +18,7 @@ type RouterMgr interface { ListRouters(ctx context.Context) ([]*Router, error) UnregisterRouter(ctx context.Context, id string) error SyncRouterMetadata(ctx context.Context, router *Router) error + SyncRouterCoordinatorAddress(ctx context.Context, router *Router) error UpdateCoordinator(ctx context.Context, address string) error GetCoordinator(ctx context.Context) (string, error) }