Skip to content

Commit

Permalink
Recurce into with cluase for query routing
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Apr 16, 2024
1 parent 37fc052 commit 721ec84
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 2 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/lib/pq v1.10.9
github.com/libp2p/go-reuseport v0.4.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pg-sharding/lyx v0.0.0-20240416115551-6d4ef5db77b6
github.com/pg-sharding/lyx v0.0.0-20240416180813-af44d267ee87
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.32.0
github.com/sevlyar/go-daemon v0.1.6
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ github.com/pg-sharding/lyx v0.0.0-20240416111413-d677885320c6 h1:hi3tNazpapIq7nA
github.com/pg-sharding/lyx v0.0.0-20240416111413-d677885320c6/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58=
github.com/pg-sharding/lyx v0.0.0-20240416115551-6d4ef5db77b6 h1:6U6B5R54e9Llr6RRXGZ/btQn8Aywo7Xk+Jd0pME6oWU=
github.com/pg-sharding/lyx v0.0.0-20240416115551-6d4ef5db77b6/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58=
github.com/pg-sharding/lyx v0.0.0-20240416173852-c0ad47b7da02 h1:nGKjCPb94ByT1eNn50WbaYgaLdP2JGs6HUUwalZUfNU=
github.com/pg-sharding/lyx v0.0.0-20240416173852-c0ad47b7da02/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58=
github.com/pg-sharding/lyx v0.0.0-20240416180813-af44d267ee87 h1:w7xlPmic3L3OVSoWhX/x/5hljdhD4QhWiFkPsGWzoEI=
github.com/pg-sharding/lyx v0.0.0-20240416180813-af44d267ee87/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
31 changes: 30 additions & 1 deletion router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package qrouter
import (
"context"
"fmt"
"github.com/pg-sharding/spqr/pkg/models/spqrerror"
"strings"

"github.com/pg-sharding/spqr/pkg/models/spqrerror"

"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/pkg/models/hashfunction"
"github.com/pg-sharding/spqr/pkg/models/kr"
Expand Down Expand Up @@ -44,6 +45,9 @@ type RoutingMetadataContext struct {
rels map[RelationFQN]struct{}
exprs map[RelationFQN]map[string]string

// cached CTE names
cteNames map[string]struct{}

unparsed_columns map[string]struct{}

// needed to parse
Expand All @@ -59,6 +63,7 @@ type RoutingMetadataContext struct {
func NewRoutingMetadataContext(params [][]byte, paramsFormatCodes []int16) *RoutingMetadataContext {
meta := &RoutingMetadataContext{
rels: map[RelationFQN]struct{}{},
cteNames: map[string]struct{}{},
tableAliases: map[string]RelationFQN{},
exprs: map[RelationFQN]map[string]string{},
unparsed_columns: map[string]struct{}{},
Expand Down Expand Up @@ -86,8 +91,17 @@ func NewRoutingMetadataContext(params [][]byte, paramsFormatCodes []int16) *Rout
return meta
}

func (meta *RoutingMetadataContext) RFQNIsCTE(resolvedRelation RelationFQN) bool {
_, ok := meta.cteNames[resolvedRelation.RelationName]
return len(resolvedRelation.SchemaName) == 0 && ok
}

// TODO : unit tests
func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation RelationFQN, colname string, expr string) {
if meta.RFQNIsCTE(resolvedRelation) {
// CTE, skip
return
}
meta.rels[resolvedRelation] = struct{}{}
if _, ok := meta.exprs[resolvedRelation]; !ok {
meta.exprs[resolvedRelation] = map[string]string{}
Expand Down Expand Up @@ -372,6 +386,12 @@ func (qr *ProxyQrouter) deparseFromNode(node lyx.FromClauseNode, meta *RoutingMe
switch q := node.(type) {
case *lyx.RangeVar:
rqdn := RelationFQNFromRangeRangeVar(q)

// CTE, skip
if meta.RFQNIsCTE(rqdn) {
return nil
}

if _, ok := meta.rels[rqdn]; !ok {
meta.rels[rqdn] = struct{}{}
}
Expand Down Expand Up @@ -483,6 +503,15 @@ func (qr *ProxyQrouter) deparseShardingMapping(
meta *RoutingMetadataContext) error {
switch stmt := qstmt.(type) {
case *lyx.Select:
if stmt.WithClause != nil {
for _, cte := range stmt.WithClause {
meta.cteNames[cte.Name] = struct{}{}
if err := qr.deparseShardingMapping(ctx, cte.SubQuery, meta); err != nil {
return err
}
}
}

if stmt.FromClause != nil {
// collect table alias names, if any
// for single-table queries, process as usual
Expand Down
97 changes: 97 additions & 0 deletions router/qrouter/proxy_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,103 @@ func TestComment(t *testing.T) {
}
}

func TestCTE(t *testing.T) {
assert := assert.New(t)

type tcase struct {
query string
exp routingstate.RoutingState
err error
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
Relations: map[string]*qdb.DistributedRelation{
"t": {
Name: "t",
DistributionKey: []qdb.DistributionKeyEntry{
{
Column: "i",
},
},
},
},
})

err := db.CreateKeyRange(context.TODO(), &qdb.KeyRange{
ShardID: "sh1",
DistributionId: distribution,
KeyRangeID: "id1",
LowerBound: []byte("1"),
})

assert.NoError(err)

err = db.CreateKeyRange(context.TODO(), &qdb.KeyRange{
ShardID: "sh2",
DistributionId: distribution,
KeyRangeID: "id2",
LowerBound: []byte("11"),
})

assert.NoError(err)

lc := local.NewLocalCoordinator(db)

pr, err := qrouter.NewProxyRouter(map[string]*config.Shard{
"sh1": {
Hosts: nil,
},
"sh2": {
Hosts: nil,
},
}, lc, &config.QRouter{
DefaultRouteBehaviour: "BLOCK",
})

assert.NoError(err)

for _, tt := range []tcase{

{
query: `
WITH xxxx AS (
SELECT * from t where i = 1
)
SELECT * from xxxx;
`,
err: nil,
exp: routingstate.ShardMatchState{
Route: &routingstate.DataShardRoute{
Shkey: kr.ShardKey{
Name: "sh1",
},
Matchedkr: &kr.KeyRange{
ShardID: "sh1",
ID: "id1",
Distribution: distribution,
LowerBound: []byte("1"),
},
},
TargetSessionAttrs: "any",
},
},
} {
parserRes, err := lyx.Parse(tt.query)

assert.NoError(err, "query %s", tt.query)

tmp, err := pr.Route(context.TODO(), parserRes, session.NewDummyHandler(distribution))

assert.NoError(err, "query %s", tt.query)

assert.Equal(tt.exp, tmp, tt.query)
}
}

func TestSingleShard(t *testing.T) {
assert := assert.New(t)

Expand Down

0 comments on commit 721ec84

Please sign in to comment.