From 721ec848adeb22c0cdcf6ef8533be73cffcdf08c Mon Sep 17 00:00:00 2001 From: reshke Date: Tue, 16 Apr 2024 19:09:42 +0000 Subject: [PATCH] Recurce into with cluase for query routing --- go.mod | 2 +- go.sum | 4 ++ router/qrouter/proxy_routing.go | 31 ++++++++- router/qrouter/proxy_routing_test.go | 97 ++++++++++++++++++++++++++++ 4 files changed, 132 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 85b090e2c..a76ce8b7d 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/lib/pq v1.10.9 github.com/libp2p/go-reuseport v0.4.0 github.com/opentracing/opentracing-go v1.2.0 - github.com/pg-sharding/lyx v0.0.0-20240416115551-6d4ef5db77b6 + github.com/pg-sharding/lyx v0.0.0-20240416180813-af44d267ee87 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.32.0 github.com/sevlyar/go-daemon v0.1.6 diff --git a/go.sum b/go.sum index 470343790..7c745c48e 100644 --- a/go.sum +++ b/go.sum @@ -152,6 +152,10 @@ github.com/pg-sharding/lyx v0.0.0-20240416111413-d677885320c6 h1:hi3tNazpapIq7nA github.com/pg-sharding/lyx v0.0.0-20240416111413-d677885320c6/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58= github.com/pg-sharding/lyx v0.0.0-20240416115551-6d4ef5db77b6 h1:6U6B5R54e9Llr6RRXGZ/btQn8Aywo7Xk+Jd0pME6oWU= github.com/pg-sharding/lyx v0.0.0-20240416115551-6d4ef5db77b6/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58= +github.com/pg-sharding/lyx v0.0.0-20240416173852-c0ad47b7da02 h1:nGKjCPb94ByT1eNn50WbaYgaLdP2JGs6HUUwalZUfNU= +github.com/pg-sharding/lyx v0.0.0-20240416173852-c0ad47b7da02/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58= +github.com/pg-sharding/lyx v0.0.0-20240416180813-af44d267ee87 h1:w7xlPmic3L3OVSoWhX/x/5hljdhD4QhWiFkPsGWzoEI= +github.com/pg-sharding/lyx v0.0.0-20240416180813-af44d267ee87/go.mod h1:2dPBQAhqv/30mhzj2yBXQkXhsGJQ8GhM+oWOfbGua58= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= diff --git a/router/qrouter/proxy_routing.go b/router/qrouter/proxy_routing.go index 04ba59ae8..6f49592f2 100644 --- a/router/qrouter/proxy_routing.go +++ b/router/qrouter/proxy_routing.go @@ -3,9 +3,10 @@ package qrouter import ( "context" "fmt" - "github.com/pg-sharding/spqr/pkg/models/spqrerror" "strings" + "github.com/pg-sharding/spqr/pkg/models/spqrerror" + "github.com/pg-sharding/spqr/pkg/config" "github.com/pg-sharding/spqr/pkg/models/hashfunction" "github.com/pg-sharding/spqr/pkg/models/kr" @@ -44,6 +45,9 @@ type RoutingMetadataContext struct { rels map[RelationFQN]struct{} exprs map[RelationFQN]map[string]string + // cached CTE names + cteNames map[string]struct{} + unparsed_columns map[string]struct{} // needed to parse @@ -59,6 +63,7 @@ type RoutingMetadataContext struct { func NewRoutingMetadataContext(params [][]byte, paramsFormatCodes []int16) *RoutingMetadataContext { meta := &RoutingMetadataContext{ rels: map[RelationFQN]struct{}{}, + cteNames: map[string]struct{}{}, tableAliases: map[string]RelationFQN{}, exprs: map[RelationFQN]map[string]string{}, unparsed_columns: map[string]struct{}{}, @@ -86,8 +91,17 @@ func NewRoutingMetadataContext(params [][]byte, paramsFormatCodes []int16) *Rout return meta } +func (meta *RoutingMetadataContext) RFQNIsCTE(resolvedRelation RelationFQN) bool { + _, ok := meta.cteNames[resolvedRelation.RelationName] + return len(resolvedRelation.SchemaName) == 0 && ok +} + // TODO : unit tests func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation RelationFQN, colname string, expr string) { + if meta.RFQNIsCTE(resolvedRelation) { + // CTE, skip + return + } meta.rels[resolvedRelation] = struct{}{} if _, ok := meta.exprs[resolvedRelation]; !ok { meta.exprs[resolvedRelation] = map[string]string{} @@ -372,6 +386,12 @@ func (qr *ProxyQrouter) deparseFromNode(node lyx.FromClauseNode, meta *RoutingMe switch q := node.(type) { case *lyx.RangeVar: rqdn := RelationFQNFromRangeRangeVar(q) + + // CTE, skip + if meta.RFQNIsCTE(rqdn) { + return nil + } + if _, ok := meta.rels[rqdn]; !ok { meta.rels[rqdn] = struct{}{} } @@ -483,6 +503,15 @@ func (qr *ProxyQrouter) deparseShardingMapping( meta *RoutingMetadataContext) error { switch stmt := qstmt.(type) { case *lyx.Select: + if stmt.WithClause != nil { + for _, cte := range stmt.WithClause { + meta.cteNames[cte.Name] = struct{}{} + if err := qr.deparseShardingMapping(ctx, cte.SubQuery, meta); err != nil { + return err + } + } + } + if stmt.FromClause != nil { // collect table alias names, if any // for single-table queries, process as usual diff --git a/router/qrouter/proxy_routing_test.go b/router/qrouter/proxy_routing_test.go index d7208fbf0..9bbd0ab9d 100644 --- a/router/qrouter/proxy_routing_test.go +++ b/router/qrouter/proxy_routing_test.go @@ -208,6 +208,103 @@ func TestComment(t *testing.T) { } } +func TestCTE(t *testing.T) { + assert := assert.New(t) + + type tcase struct { + query string + exp routingstate.RoutingState + err error + } + /* TODO: fix by adding configurable setting */ + db, _ := qdb.NewMemQDB(MemQDBPath) + distribution := "dd" + + _ = db.CreateDistribution(context.TODO(), &qdb.Distribution{ + ID: distribution, + Relations: map[string]*qdb.DistributedRelation{ + "t": { + Name: "t", + DistributionKey: []qdb.DistributionKeyEntry{ + { + Column: "i", + }, + }, + }, + }, + }) + + err := db.CreateKeyRange(context.TODO(), &qdb.KeyRange{ + ShardID: "sh1", + DistributionId: distribution, + KeyRangeID: "id1", + LowerBound: []byte("1"), + }) + + assert.NoError(err) + + err = db.CreateKeyRange(context.TODO(), &qdb.KeyRange{ + ShardID: "sh2", + DistributionId: distribution, + KeyRangeID: "id2", + LowerBound: []byte("11"), + }) + + assert.NoError(err) + + lc := local.NewLocalCoordinator(db) + + pr, err := qrouter.NewProxyRouter(map[string]*config.Shard{ + "sh1": { + Hosts: nil, + }, + "sh2": { + Hosts: nil, + }, + }, lc, &config.QRouter{ + DefaultRouteBehaviour: "BLOCK", + }) + + assert.NoError(err) + + for _, tt := range []tcase{ + + { + query: ` + WITH xxxx AS ( + SELECT * from t where i = 1 + ) + SELECT * from xxxx; + `, + err: nil, + exp: routingstate.ShardMatchState{ + Route: &routingstate.DataShardRoute{ + Shkey: kr.ShardKey{ + Name: "sh1", + }, + Matchedkr: &kr.KeyRange{ + ShardID: "sh1", + ID: "id1", + Distribution: distribution, + LowerBound: []byte("1"), + }, + }, + TargetSessionAttrs: "any", + }, + }, + } { + parserRes, err := lyx.Parse(tt.query) + + assert.NoError(err, "query %s", tt.query) + + tmp, err := pr.Route(context.TODO(), parserRes, session.NewDummyHandler(distribution)) + + assert.NoError(err, "query %s", tt.query) + + assert.Equal(tt.exp, tmp, tt.query) + } +} + func TestSingleShard(t *testing.T) { assert := assert.New(t)