Skip to content

Commit

Permalink
Change yacc and protos for multidim key ranges.
Browse files Browse the repository at this point in the history
Other fix
  • Loading branch information
reshke committed Apr 2, 2024
1 parent 6dd5979 commit 12737b5
Show file tree
Hide file tree
Showing 39 changed files with 1,577 additions and 877 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ spqr-worldmock
spqr-balancer
spqr-mover
spqr-workloadreplay
spqrdump
spqr-dump
y.output
*.swp
*.swo
memqdb.json
test/feature/generatedFeatures
test/feature/logs
yacc/console/gram.y.save
.DS_Store
.DS_Store
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ build_workloadreplay:
go build -pgo=auto -o spqr-workloadreplay ./cmd/workloadreplay

build_spqrdump:
go build -pgo=auto -o spqrdump ./cmd/spqrdump
go build -pgo=auto -o spqr-dump ./cmd/spqrdump

build: build_balancer build_coordinator build_coorctl build_router build_mover build_worldmock build_workloadreplay build_spqrdump

Expand Down
21 changes: 15 additions & 6 deletions balancer/provider/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package provider
import (
"context"
"fmt"
"sort"
"strings"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/pg-sharding/spqr/balancer"
Expand All @@ -14,8 +17,6 @@ import (
"github.com/pg-sharding/spqr/pkg/spqrlog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"sort"
"strings"
)

type BalancerImpl struct {
Expand Down Expand Up @@ -346,10 +347,11 @@ func (b *BalancerImpl) getKRCondition(rel *distributions.DistributedRelation, kR
} else {
hashedCol = entry.Column
}
// TODO: fix multidim case
if nextKR != nil {
buf[i] = fmt.Sprintf("%s >= %s AND %s < %s", hashedCol, string(kRange.LowerBound), hashedCol, string(nextKR.LowerBound))
buf[i] = fmt.Sprintf("%s >= %s AND %s < %s", hashedCol, kRange.SendRaw()[0], hashedCol, nextKR.SendRaw()[0])
} else {
buf[i] = fmt.Sprintf("%s >= %s", hashedCol, string(kRange.LowerBound))
buf[i] = fmt.Sprintf("%s >= %s", hashedCol, kRange.SendRaw()[0])
}
}
return strings.Join(buf, " AND "), nil
Expand Down Expand Up @@ -666,6 +668,7 @@ func (b *BalancerImpl) executeTasks(ctx context.Context, group *tasks.TaskGroup)

func (b *BalancerImpl) updateKeyRanges(ctx context.Context) error {
keyRangeService := protos.NewKeyRangeServiceClient(b.coordinatorConn)
distrService := protos.NewDistributionServiceClient(b.coordinatorConn)
keyRangesProto, err := keyRangeService.ListAllKeyRanges(ctx, &protos.ListAllKeyRangesRequest{})
if err != nil {
return err
Expand All @@ -675,11 +678,17 @@ func (b *BalancerImpl) updateKeyRanges(ctx context.Context) error {
if _, ok := keyRanges[krProto.DistributionId]; !ok {
keyRanges[krProto.DistributionId] = make([]*kr.KeyRange, 0)
}
keyRanges[krProto.DistributionId] = append(keyRanges[krProto.DistributionId], kr.KeyRangeFromProto(krProto))
ds, err := distrService.GetDistribution(ctx, &protos.GetDistributionRequest{
Id: krProto.DistributionId,
})
if err != nil {
return err
}
keyRanges[krProto.DistributionId] = append(keyRanges[krProto.DistributionId], kr.KeyRangeFromProto(krProto, ds.Distribution.ColumnTypes))
}
for _, krs := range keyRanges {
sort.Slice(krs, func(i, j int) bool {
return kr.CmpRangesLess(krs[i].LowerBound, krs[j].LowerBound)
return kr.CmpRangesLess(krs[i].LowerBound, krs[j].LowerBound, krs[j].ColumnTypes)
})
}

Expand Down
24 changes: 17 additions & 7 deletions cmd/mover/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"flag"
"fmt"
"github.com/pg-sharding/spqr/pkg/models/distributions"
"io"
"os"
"strings"

"github.com/pg-sharding/spqr/pkg/models/distributions"

"github.com/jackc/pgx/v5"
_ "github.com/lib/pq"
"github.com/pg-sharding/spqr/pkg/models/kr"
Expand Down Expand Up @@ -103,10 +104,10 @@ FROM information_schema.tables;
// TODO: support multi-column move in SPQR2
if nextKeyRange == nil {
qry = fmt.Sprintf("copy (delete from %s WHERE %s >= %s returning *) to stdout", rel.Name,
rel.DistributionKey[0].Column, keyRange.LowerBound)
rel.DistributionKey[0].Column, keyRange.SendRaw()[0])
} else {
qry = fmt.Sprintf("copy (delete from %s WHERE %s >= %s and %s < %s returning *) to stdout", rel.Name,
rel.DistributionKey[0].Column, keyRange.LowerBound, rel.DistributionKey[0].Column, nextKeyRange.LowerBound)
rel.DistributionKey[0].Column, keyRange.SendRaw()[0], rel.DistributionKey[0].Column, nextKeyRange.SendRaw()[0])
}

spqrlog.Zero.Debug().
Expand All @@ -132,6 +133,7 @@ FROM information_schema.tables;
spqrlog.Zero.Debug().Msg("copy cmd executed")
}

/* TODO: handle errors here */
_ = txTo.Commit(ctx)
_ = txFrom.Commit(ctx)
return nil
Expand Down Expand Up @@ -165,7 +167,14 @@ func main() {
spqrlog.Zero.Error().Err(err).Msg("")
return
}
keyRange := kr.KeyRangeFromDB(qdbKr)

ds, err := db.GetDistribution(ctx, qdbKr.DistributionId)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return
}

keyRange := kr.KeyRangeFromDB(qdbKr, ds.ColTypes)

krs, err := db.ListKeyRanges(ctx, keyRange.Distribution)
if err != nil {
Expand All @@ -176,9 +185,10 @@ func main() {
var nextKeyRange *kr.KeyRange

for _, currkr := range krs {
if kr.CmpRangesLess(keyRange.LowerBound, currkr.LowerBound) {
if nextKeyRange == nil || kr.CmpRangesLess(currkr.LowerBound, nextKeyRange.LowerBound) {
nextKeyRange = kr.KeyRangeFromDB(currkr)
typedKr := kr.KeyRangeFromDB(currkr, ds.ColTypes)
if kr.CmpRangesLess(keyRange.LowerBound, typedKr.LowerBound, ds.ColTypes) {
if nextKeyRange == nil || kr.CmpRangesLess(typedKr.LowerBound, nextKeyRange.LowerBound, ds.ColTypes) {
nextKeyRange = typedKr
}
}
}
Expand Down
25 changes: 18 additions & 7 deletions cmd/spqrdump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc"

"github.com/pg-sharding/spqr/pkg/conn"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/spqrlog"

"github.com/pg-sharding/spqr/pkg/decode"
Expand All @@ -25,7 +26,7 @@ func Dial(addr string) (*grpc.ClientConn, error) {
}

var rootCmd = &cobra.Command{
Use: "spqrdump -e localhost:7003",
Use: "spqr-dump -e localhost:7003",
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
Expand Down Expand Up @@ -116,14 +117,16 @@ func getconn() (*pgproto3.Frontend, error) {
// TODO : unit tests
func DumpKeyRangesPsql() error {
return dumpPsql("SHOW key_ranges;", func(v *pgproto3.DataRow) (string, error) {
l := string(v.Values[2])
l := v.Values[2]
id := string(v.Values[0])
shard := string(v.Values[1])

return decode.KeyRange(
&protos.KeyRangeInfo{
KeyRange: &protos.KeyRange{LowerBound: l},
ShardId: shard, Krid: id}), nil
&kr.KeyRange{
LowerBound: []interface{}{l},
ID: id,
ShardID: shard,
}), nil
})
}

Expand Down Expand Up @@ -171,13 +174,21 @@ func DumpKeyRanges() error {
}

rCl := protos.NewKeyRangeServiceClient(cc)
dCl := protos.NewDistributionServiceClient(cc)
if keys, err := rCl.ListAllKeyRanges(context.Background(), &protos.ListAllKeyRangesRequest{}); err != nil {
spqrlog.Zero.Error().
Err(err).
Msg("failed to dump endpoint rules")
} else {
for _, krg := range keys.KeyRangesInfo {
fmt.Println(decode.KeyRange(krg))
ds, err := dCl.GetDistribution(context.Background(), &protos.GetDistributionRequest{
Id: krg.DistributionId,
})
if err != nil {
return err
}
krCurr := kr.KeyRangeFromProto(krg, ds.Distribution.ColumnTypes)
fmt.Println(decode.KeyRange(krCurr))
}
}

Expand Down Expand Up @@ -260,7 +271,7 @@ var dump = &cobra.Command{
}
spqrlog.Zero.Debug().
Str("endpoint", endpoint).
Msg("dialing spqrdump on")
Msg("dialing spqr-dump on")

switch proto {
case "grpc":
Expand Down
Loading

0 comments on commit 12737b5

Please sign in to comment.