Skip to content

Commit

Permalink
Fix proxy_console feature test (#498)
Browse files Browse the repository at this point in the history
* Artifacts

* Fixed coordinator address setting

* Implemented DistributionServiceServer in app.go

* Fix AddKeyRange in Provider

* Refactor AddKeyRangeWithChecks

* Fixed proxy_console feature test
  • Loading branch information
EinKrebs authored Feb 14, 2024
1 parent 56c69ca commit 6a5bf41
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 27 deletions.
15 changes: 8 additions & 7 deletions coordinator/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ func (app *App) ServeGrpcApi(wg *sync.WaitGroup) error {
serv := grpc.NewServer()
reflection.Register(serv)

krserv := provider.NewKeyRangeService(app.coordinator)
rrserv := provider.NewRouterService(app.coordinator)
toposerv := provider.NewTopologyService(app.coordinator)
krServ := provider.NewKeyRangeService(app.coordinator)
rrServ := provider.NewRouterService(app.coordinator)
topServ := provider.NewTopologyService(app.coordinator)
shardingRulesServ := provider.NewShardingRulesServer(app.coordinator)
shardServ := provider.NewShardServer(app.coordinator)

protos.RegisterKeyRangeServiceServer(serv, krserv)
protos.RegisterRouterServiceServer(serv, rrserv)
protos.RegisterTopologyServiceServer(serv, toposerv)
dsServ := provider.NewDistributionServer(app.coordinator)
protos.RegisterKeyRangeServiceServer(serv, krServ)
protos.RegisterRouterServiceServer(serv, rrServ)
protos.RegisterTopologyServiceServer(serv, topServ)
protos.RegisterShardingRulesServiceServer(serv, shardingRulesServ)
protos.RegisterShardServiceServer(serv, shardServ)
protos.RegisterDistributionServiceServer(serv, dsServ)

address := net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)
listener, err := net.Listen("tcp", address)
Expand Down
2 changes: 1 addition & 1 deletion coordinator/provider/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (qc *qdbCoordinator) lockCoordinator(ctx context.Context, initialRouter boo
if err := qc.SyncRouterMetadata(ctx, router); err != nil {
spqrlog.Zero.Error().Err(err).Msg("sync router metadata when locking coordinator")
}
if err := qc.UpdateCoordinator(ctx, config.CoordinatorConfig().Host); err != nil {
if err := qc.UpdateCoordinator(ctx, fmt.Sprintf("%s:%s", config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort)); err != nil {
return false
}
return true
Expand Down
91 changes: 91 additions & 0 deletions coordinator/provider/distributions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package provider

import (
"context"
"github.com/pg-sharding/spqr/coordinator"
"github.com/pg-sharding/spqr/pkg/models/distributions"
protos "github.com/pg-sharding/spqr/pkg/protos"
)

type DistributionsServer struct {
protos.UnimplementedDistributionServiceServer

impl coordinator.Coordinator
}

func NewDistributionServer(impl coordinator.Coordinator) *DistributionsServer {
return &DistributionsServer{
impl: impl,
}
}

var _ protos.DistributionServiceServer = &DistributionsServer{}

func (d *DistributionsServer) CreateDistribution(ctx context.Context, req *protos.CreateDistributionRequest) (*protos.CreateDistributionReply, error) {
for _, ds := range req.Distributions {
if err := d.impl.CreateDistribution(ctx, distributions.DistributionFromProto(ds)); err != nil {
return nil, err
}
}
return &protos.CreateDistributionReply{}, nil
}

func (d *DistributionsServer) DropDistribution(ctx context.Context, req *protos.DropDistributionRequest) (*protos.DropDistributionReply, error) {
for _, id := range req.GetIds() {
if err := d.impl.DropDistribution(ctx, id); err != nil {
return nil, err
}
}
return &protos.DropDistributionReply{}, nil
}

func (d *DistributionsServer) ListDistributions(ctx context.Context, req *protos.ListDistributionsRequest) (*protos.ListDistributionsReply, error) {
dss, err := d.impl.ListDistributions(ctx)
if err != nil {
return nil, err
}
return &protos.ListDistributionsReply{
Distributions: func() []*protos.Distribution {
res := make([]*protos.Distribution, len(dss))
for i, ds := range dss {
res[i] = distributions.DistributionToProto(ds)
}
return res
}(),
}, nil
}

func (d *DistributionsServer) AlterDistributionAttach(ctx context.Context, req *protos.AlterDistributionAttachRequest) (*protos.AlterDistributionAttachReply, error) {
return &protos.AlterDistributionAttachReply{}, d.impl.AlterDistributionAttach(ctx, req.GetId(), func() []*distributions.DistributedRelation {
res := make([]*distributions.DistributedRelation, len(req.GetRelations()))
for i, rel := range req.GetRelations() {
res[i] = distributions.DistributedRelationFromProto(rel)
}
return res
}())
}

func (d *DistributionsServer) AlterDistributionDetach(ctx context.Context, req *protos.AlterDistributionDetachRequest) (*protos.AlterDistributionDetachReply, error) {
for _, rel := range req.GetRelNames() {
if err := d.impl.AlterDistributionDetach(ctx, req.GetId(), rel); err != nil {
return nil, err
}
}
return &protos.AlterDistributionDetachReply{}, nil
}

func (d *DistributionsServer) GetDistribution(ctx context.Context, req *protos.GetDistributionRequest) (*protos.GetDistributionReply, error) {
ds, err := d.impl.GetDistribution(ctx, req.GetId())
if err != nil {
return nil, err
}
return &protos.GetDistributionReply{Distribution: distributions.DistributionToProto(ds)}, nil
}

func (d *DistributionsServer) GetRelationDistribution(ctx context.Context, req *protos.GetRelationDistributionRequest) (*protos.GetRelationDistributionReply, error) {
ds, err := d.impl.GetRelationDistribution(ctx, req.GetId())
if err != nil {
return nil, err
}
return &protos.GetRelationDistributionReply{Distribution: distributions.DistributionToProto(ds)}, nil
}
7 changes: 1 addition & 6 deletions coordinator/provider/keyranges.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ type CoordinatorService struct {

// TODO : unit tests
func (c *CoordinatorService) AddKeyRange(ctx context.Context, request *protos.AddKeyRangeRequest) (*protos.ModifyReply, error) {
err := c.impl.AddKeyRange(ctx, &kr.KeyRange{
LowerBound: []byte(request.KeyRangeInfo.KeyRange.LowerBound),
ID: request.KeyRangeInfo.Krid,
ShardID: request.KeyRangeInfo.ShardId,
Distribution: "default",
})
err := c.impl.AddKeyRange(ctx, kr.KeyRangeFromProto(request.KeyRangeInfo))
if err != nil {
return nil, err
}
Expand Down
26 changes: 26 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,40 +41,65 @@ require (
require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Microsoft/go-winio v0.4.14 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.27+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/cucumber/gherkin/go/v26 v26.2.0 // indirect
github.com/cucumber/messages/go/v21 v21.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-memdb v1.3.4 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/urfave/cli v1.22.14 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
go.etcd.io/etcd/api/v3 v3.5.12 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.12 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
Expand All @@ -94,6 +119,7 @@ require (
google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.1 // indirect
)
Loading

0 comments on commit 6a5bf41

Please sign in to comment.