Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Apr 4, 2024
1 parent 78d9097 commit ef6fc01
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 2 deletions.
94 changes: 92 additions & 2 deletions coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
switch resp.Status {
case routerproto.RouterStatus_CLOSED:
spqrlog.Zero.Debug().Msg("router is closed")
if err := qc.SyncRouterMetadata(ctx, internalR); err != nil {
if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil {
return err
}

Expand All @@ -252,7 +252,7 @@ func (qc *qdbCoordinator) watchRouters(ctx context.Context) {
spqrlog.Zero.Debug().Msg("router is opened")

/* TODO: check router metadata consistency */
if err := qc.SyncRouterMetadata(ctx, internalR); err != nil {
if err := qc.SyncRouterCoordinatorAddress(ctx, internalR); err != nil {
return err
}

Expand Down Expand Up @@ -893,6 +893,96 @@ func (qc *qdbCoordinator) SyncRouterMetadata(ctx context.Context, qRouter *topol
}
defer cc.Close()

// Configure distributions
dsCl := routerproto.NewDistributionServiceClient(cc)
spqrlog.Zero.Debug().Msg("qdb coordinator: configure distributions")
dss, err := qc.db.ListDistributions(ctx)
if err != nil {
return err
}
resp, err := dsCl.ListDistributions(ctx, &routerproto.ListDistributionsRequest{})
if err != nil {
return err
}
if _, err = dsCl.DropDistribution(ctx, &routerproto.DropDistributionRequest{
Ids: func() []string {
res := make([]string, len(resp.Distributions))
for i, ds := range resp.Distributions {
res[i] = ds.Id
}
return res
}(),
}); err != nil {
return err
}
if _, err = dsCl.CreateDistribution(ctx, &routerproto.CreateDistributionRequest{
Distributions: func() []*routerproto.Distribution {
res := make([]*routerproto.Distribution, len(dss))
for i, ds := range dss {
res[i] = distributions.DistributionToProto(distributions.DistributionFromDB(ds))
}
return res
}(),
}); err != nil {
return err
}

// Configure key ranges.
krClient := routerproto.NewKeyRangeServiceClient(cc)
spqrlog.Zero.Debug().Msg("qdb coordinator: configure key ranges")
keyRanges, err := qc.db.ListAllKeyRanges(ctx)
if err != nil {
return err
}
if _, err = krClient.DropAllKeyRanges(ctx, &routerproto.DropAllKeyRangesRequest{}); err != nil {
return err
}

for _, keyRange := range keyRanges {
resp, err := krClient.CreateKeyRange(ctx, &routerproto.CreateKeyRangeRequest{
KeyRangeInfo: kr.KeyRangeFromDB(keyRange).ToProto(),
})

if err != nil {
return err
}

spqrlog.Zero.Debug().
Interface("response", resp).
Msg("got response while adding key range")
}
spqrlog.Zero.Debug().Msg("successfully add all key ranges")

rCl := routerproto.NewTopologyServiceClient(cc)
if _, err := rCl.UpdateCoordinator(ctx, &routerproto.UpdateCoordinatorRequest{
Address: net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort),
}); err != nil {
return err
}

if resp, err := rCl.OpenRouter(ctx, &routerproto.OpenRouterRequest{}); err != nil {
return err
} else {
spqrlog.Zero.Debug().
Interface("response", resp).
Msg("open router response")
}

return nil
}

// TODO : unit tests
func (qc *qdbCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, qRouter *topology.Router) error {
spqrlog.Zero.Debug().
Str("address", qRouter.Address).
Msg("qdb coordinator: sync router metadata")

cc, err := DialRouter(qRouter)
if err != nil {
return err
}
defer cc.Close()

/* Update current coordinator address. */
/* Todo: check that router metadata is in sync. */

Expand Down
9 changes: 9 additions & 0 deletions pkg/coord/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,15 @@ func (a *Adapter) SyncRouterMetadata(ctx context.Context, router *topology.Route
return err
}

// SyncRouterCoordinatorAddress implements meta.EntityMgr.
func (a *Adapter) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
c := proto.NewRouterServiceClient(a.conn)
_, err := c.SyncMetadata(ctx, &proto.SyncMetadataRequest{
Router: topology.RouterToProto(router),
})
return err
}

// TODO : unit tests
// TODO : implement
func (a *Adapter) AddDataShard(ctx context.Context, shard *datashards.DataShard) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,10 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo
return ErrNotCoordinator
}

func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return qr.qdb.UpdateCoordinator(ctx, addr)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/models/topology/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type RouterMgr interface {
ListRouters(ctx context.Context) ([]*Router, error)
UnregisterRouter(ctx context.Context, id string) error
SyncRouterMetadata(ctx context.Context, router *Router) error
SyncRouterCoordinatorAddress(ctx context.Context, router *Router) error
UpdateCoordinator(ctx context.Context, address string) error
GetCoordinator(ctx context.Context) (string, error)
}
Expand Down

0 comments on commit ef6fc01

Please sign in to comment.