Skip to content

Commit

Permalink
fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Feb 15, 2024
1 parent d1c0fcb commit e498312
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 40 deletions.
106 changes: 67 additions & 39 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,55 @@ func (r RelationList) iRelation() {}
var _ StatementRelation = AnyRelation{}
var _ StatementRelation = SpecificRelation{}

func (qr *ProxyQrouter) deparseInsertFromSelectOffsets(ctx context.Context, stmt *lyx.Insert) ([]int, RelationFQN, bool, error) {
insertCols := stmt.Columns

spqrlog.Zero.Debug().
Strs("insert columns", insertCols).
Msg("deparsed insert statement columns")

// compute matched sharding rule offsets
offsets := make([]int, 0)
var rfqn RelationFQN

switch q := stmt.TableRef.(type) {
case *lyx.RangeVar:
rfqn = RelationFQNFromRangeRangeVar(q)

var ds *qdb.Distribution
var err error

if ds, err = qr.Mgr().QDB().GetRelationDistribution(ctx, rfqn.RelationName); err != nil {
return nil, RelationFQN{}, false, err
}

insertColsPos := map[string]int{}
for i, c := range insertCols {
insertColsPos[c] = i
}

distributionKey := ds.Relations[rfqn.RelationName].DistributionKey
// TODO: check mapping by rules with multiple columns
for _, col := range distributionKey {
if val, ok := insertColsPos[col.Column]; !ok {
/* Do not return err here.
* This particulat insert stmt is un-routable, but still, give it a try
* and continue parsing.
* Example: INSERT INTO xx SELECT * FROM xx a WHERE a.w_id = 20;
* we have no insert cols specified, but still able to route on select
*/
return nil, RelationFQN{}, false, nil
} else {
offsets = append(offsets, val)
}
}

return offsets, rfqn, true, nil
default:
return nil, RelationFQN{}, false, ComplexQuery
}
}

// TODO : unit tests
func (qr *ProxyQrouter) deparseShardingMapping(
ctx context.Context,
Expand All @@ -445,54 +494,26 @@ func (qr *ProxyQrouter) deparseShardingMapping(
return qr.routeByClause(ctx, clause, meta)

case *lyx.Insert:
insertCols := stmt.Columns

spqrlog.Zero.Debug().
Strs("insert columns", insertCols).
Msg("deparsed insert statement columns")

// compute matched sharding rule offsets
offsets := make([]int, 0)
var rfqn RelationFQN

switch q := stmt.TableRef.(type) {
case *lyx.RangeVar:
rfqn = RelationFQNFromRangeRangeVar(q)

var ds *qdb.Distribution
var err error

if ds, err = qr.Mgr().QDB().GetRelationDistribution(ctx, rfqn.RelationName); err != nil {
return err
}

insertColsPos := map[string]int{}
for i, c := range insertCols {
insertColsPos[c] = i
}

distributionKey := ds.Relations[rfqn.RelationName].DistributionKey
// TODO: check mapping by rules with multiple columns
for _, col := range distributionKey {
if val, ok := insertColsPos[col.Column]; !ok {
return ComplexQuery
} else {
offsets = append(offsets, val)
}
}
if selectStmt := stmt.SubSelect; selectStmt != nil {

default:
return ComplexQuery
}
insertCols := stmt.Columns

if selectStmt := stmt.SubSelect; selectStmt != nil {
switch subS := selectStmt.(type) {
case *lyx.Select:
spqrlog.Zero.Debug().Msg("routing insert stmt on select clause")
_ = qr.DeparseSelectStmt(ctx, subS, meta)
/* try target list */
spqrlog.Zero.Debug().Msg("routing insert stmt on target list")
/* this target list for some insert (...) sharding column */

offsets, rfqn, success, err := qr.deparseInsertFromSelectOffsets(ctx, stmt)
if err != nil {
return err
}
if !success {
break
}

targetList := subS.TargetList
/* record all values from tl */

Expand All @@ -519,6 +540,13 @@ func (qr *ProxyQrouter) deparseShardingMapping(
case *lyx.ValueClause:
valList := subS.Values
/* record all values from values scan */
offsets, rfqn, success, err := qr.deparseInsertFromSelectOffsets(ctx, stmt)
if err != nil {
return err
}
if !success {
break
}

vlUsable := true
for i := range offsets {
Expand Down
19 changes: 19 additions & 0 deletions router/qrouter/proxy_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,25 @@ func TestInsertOffsets(t *testing.T) {

for _, tt := range []tcase{

{
query: `INSERT INTO xx SELECT * FROM xx a WHERE a.w_id = 20;`,
exp: routingstate.ShardMatchState{
Route: &routingstate.DataShardRoute{
Shkey: kr.ShardKey{
Name: "sh2",
},
Matchedkr: &kr.KeyRange{
ShardID: "sh2",
ID: "id2",
Distribution: distribution,
LowerBound: []byte("11"),
},
},
TargetSessionAttrs: "any",
},
err: nil,
},

{
query: `
INSERT INTO xxtt1 (j, i, w_id) VALUES(2121221, -211212, '21');
Expand Down
2 changes: 1 addition & 1 deletion test/regress/tests/router/expected/shard_routing.out
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ ALTER DISTRIBUTION ds1 ATTACH RELATION xxerr DISTRIBUTION KEY id;
attached relation xxerr to distribution ds1
(1 row)

ALTER DISTRIBUTION ds1 ATTACH RELATION xxtt1 DISTRIBUTION KEY w_id, id;
ALTER DISTRIBUTION ds1 ATTACH RELATION xxtt1 DISTRIBUTION KEY w_id;
attach table
---------------------------------------------
attached relation xxtt1 to distribution ds1
Expand Down

0 comments on commit e498312

Please sign in to comment.