Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor proxy routing logic: record only distributed column query restrictions #500

Merged
merged 8 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/tsa/tsa.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func CheckTSA(sh shard.Shard) (bool, string, error) {
Msg("shard recieved error during check rw")
return false, reason, err
}

spqrlog.Zero.Debug().
Uint("shard", sh.ID()).
Interface("message", msg).
Expand Down
5 changes: 5 additions & 0 deletions qdb/memqdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,11 @@ func (q *MemQDB) CreateDistribution(ctx context.Context, distribution *Distribut
q.mu.Lock()
defer q.mu.Unlock()

for _, r := range distribution.Relations {
q.RelationDistribution[r.Name] = distribution.ID
_ = ExecuteCommands(q.DumpState, NewUpdateCommand(q.RelationDistribution, r.Name, distribution.ID))
}

return ExecuteCommands(q.DumpState, NewUpdateCommand(q.Distributions, distribution.ID, distribution))
}

Expand Down
3 changes: 2 additions & 1 deletion qdb/qdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type QDB interface {
AlterDistributionDetach(ctx context.Context, id string, relName string) error

GetDistribution(ctx context.Context, id string) (*Distribution, error)
GetRelationDistribution(ctx context.Context, id string) (*Distribution, error)
// TODO: fix this by passing FQRN (fully qualified relation name (+schema))
GetRelationDistribution(ctx context.Context, relation string) (*Distribution, error)

UpdateCoordinator(ctx context.Context, address string) error
GetCoordinator(ctx context.Context) (string, error)
Expand Down
116 changes: 67 additions & 49 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,7 @@ func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, m
}
}

func (meta *RoutingMetadataContext) RecordShardingColumnValue(alias, colname, value string) {
if !meta.CheckColumnRls(colname) {
spqrlog.Zero.Debug().
Str("colname", colname).
Msg("skip column due no rule mathing")
return
}
func (qr *ProxyQrouter) RecordShardingColumnValue(meta *RoutingMetadataContext, alias, colname, value string) {

resolvedRelation, err := meta.ResolveRelationByAlias(alias)
if err != nil {
Expand All @@ -271,6 +265,24 @@ func (meta *RoutingMetadataContext) RecordShardingColumnValue(alias, colname, va
return
}

/* do not process non-distributed relations or columns not from relation distribution key */
if ds, err := qr.Mgr().GetRelationDistribution(context.TODO(), resolvedRelation.RelationName); err != nil {
return
} else {
// TODO: optimize
ok := false
for _, c := range ds.Relations[resolvedRelation.RelationName].ColumnNames {
if c == colname {
ok = true
break
}
}
if !ok {
// some junk column
return
}
}

// will not work not ints
meta.RecordConstExpr(resolvedRelation, colname, value)
}
Expand Down Expand Up @@ -298,27 +310,27 @@ func (qr *ProxyQrouter) routeByClause(ctx context.Context, expr lyx.Node, meta *
switch rght := texpr.Right.(type) {
case *lyx.ParamRef:
if rght.Number <= len(meta.params) {
meta.RecordShardingColumnValue(alias, colname, string(meta.params[rght.Number-1]))
qr.RecordShardingColumnValue(meta, alias, colname, string(meta.params[rght.Number-1]))
}
// else error out?
case *lyx.AExprSConst:
// TBD: postpone routing from here to root of parsing tree
meta.RecordShardingColumnValue(alias, colname, rght.Value)
qr.RecordShardingColumnValue(meta, alias, colname, rght.Value)
case *lyx.AExprIConst:
// TBD: postpone routing from here to root of parsing tree
// maybe expimely inefficient. Will be fixed in SPQR-2.0
meta.RecordShardingColumnValue(alias, colname, fmt.Sprintf("%d", rght.Value))
qr.RecordShardingColumnValue(meta, alias, colname, fmt.Sprintf("%d", rght.Value))
case *lyx.AExprList:
if len(rght.List) != 0 {
expr := rght.List[0]
switch bexpr := expr.(type) {
case *lyx.AExprSConst:
// TBD: postpone routing from here to root of parsing tree
meta.RecordShardingColumnValue(alias, colname, bexpr.Value)
qr.RecordShardingColumnValue(meta, alias, colname, bexpr.Value)
case *lyx.AExprIConst:
// TBD: postpone routing from here to root of parsing tree
// maybe expimely inefficient. Will be fixed in SPQR-2.0
meta.RecordShardingColumnValue(alias, colname, fmt.Sprintf("%d", bexpr.Value))
qr.RecordShardingColumnValue(meta, alias, colname, fmt.Sprintf("%d", bexpr.Value))
}
}
case *lyx.FuncApplication:
Expand Down Expand Up @@ -547,6 +559,9 @@ func (qr *ProxyQrouter) getRelationFromNode(node lyx.FromClauseNode) (*RelationL
Msg("getting relation name out of from node")
switch q := node.(type) {
case *lyx.RangeVar:
if q.SchemaName == "information_schema" {
return &RelationList{Relations: nil}, nil
}
return &RelationList{Relations: []string{q.RelationName}}, nil
case *lyx.JoinExpr:
var rRel, lRel *RelationList
Expand Down Expand Up @@ -730,6 +745,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
queryDistribution = sph.Distribution()
}

/* TODO: delay this until step 2. */
krs, err := qr.mgr.ListKeyRanges(ctx, queryDistribution)
if err != nil {
return nil, err
Expand Down Expand Up @@ -907,56 +923,58 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
}

/*
* Step 2: match all deparsed rules to sharding rules.
* Step 2: traverse all aggregated relation distribution tuples and route on them.
*/

var route routingstate.RoutingState
route = nil
if meta.exprs != nil {
// traverse each deparsed relation from query
var route_err error
for rfqn, cols := range meta.rels {
if rule, err := MatchShardingRule(ctx, rfqn.RelationName, cols, qr.mgr.QDB()); err != nil {
for _, col := range cols {
// TODO: multi-column hash functions
hf, err := hashfunction.HashFunctionByName(rule.Entries[0].HashFunction)
if err != nil {
spqrlog.Zero.Debug().Err(err).Msg("failed to resolve hash function")
continue
}
var route_err error
for rfqn, cols := range meta.rels {

hashedKey, err := hashfunction.ApplyHashFunction([]byte(meta.exprs[rfqn][col]), hf)
/*
*
*/

spqrlog.Zero.Debug().Str("key", meta.exprs[rfqn][col]).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")
if rule, err := MatchShardingRule(ctx, rfqn.RelationName, cols, qr.mgr.QDB()); err != nil {
for _, col := range cols {
// TODO: multi-column hash functions
hf, err := hashfunction.HashFunctionByName(rule.Entries[0].HashFunction)
if err != nil {
spqrlog.Zero.Debug().Err(err).Msg("failed to resolve hash function")
continue
}

if err != nil {
spqrlog.Zero.Debug().Err(err).Msg("failed to apply hash function")
continue
}
hashedKey, err := hashfunction.ApplyHashFunction([]byte(meta.exprs[rfqn][col]), hf)

currroute, err := qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), meta)
if err != nil {
route_err = err
spqrlog.Zero.Debug().Err(route_err).Msg("temporarily skip the route error")
continue
}
spqrlog.Zero.Debug().Str("key", meta.exprs[rfqn][col]).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")

spqrlog.Zero.Debug().
Interface("currroute", currroute).
Str("table", rfqn.RelationName).
Strs("columns", cols).
Msg("calculated route for table/cols")
if err != nil {
spqrlog.Zero.Debug().Err(err).Msg("failed to apply hash function")
continue
}

route = routingstate.Combine(route, routingstate.ShardMatchState{
Route: currroute,
TargetSessionAttrs: tsa,
})
currroute, err := qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), meta)
if err != nil {
route_err = err
spqrlog.Zero.Debug().Err(route_err).Msg("temporarily skip the route error")
continue
}

spqrlog.Zero.Debug().
Interface("currroute", currroute).
Str("table", rfqn.RelationName).
Strs("columns", cols).
Msg("calculated route for table/cols")

route = routingstate.Combine(route, routingstate.ShardMatchState{
Route: currroute,
TargetSessionAttrs: tsa,
})
}
}
if route == nil && route_err != nil {
return nil, route_err
}
}
if route == nil && route_err != nil {
return nil, route_err
}

spqrlog.Zero.Debug().Interface("deparsed-values-list", meta.ValuesLists)
Expand Down
118 changes: 91 additions & 27 deletions router/qrouter/proxy_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,17 @@ func TestComment(t *testing.T) {
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "default"
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
Relations: map[string]*qdb.DistributedRelation{
"xx": {
Name: "xx",
ColumnNames: []string{"i"},
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
Expand Down Expand Up @@ -249,7 +259,33 @@ func TestSingleShard(t *testing.T) {
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "default"
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
Relations: map[string]*qdb.DistributedRelation{
"t": {
Name: "t",
ColumnNames: []string{"i"},
},
"yy": {
Name: "yy",
ColumnNames: []string{"i"},
},
"xxtt1": {
Name: "xxtt1",
ColumnNames: []string{"i"},
},
"xx": {
Name: "xx",
ColumnNames: []string{"i"},
},
"xxmixed": {
Name: "xxmixed",
ColumnNames: []string{"i"},
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
Expand Down Expand Up @@ -488,7 +524,7 @@ func TestSingleShard(t *testing.T) {

assert.NoError(err, "query %s", tt.query)

assert.Equal(tt.exp, tmp)
assert.Equal(tt.exp, tmp, tt.query)
}
}

Expand All @@ -502,7 +538,17 @@ func TestInsertOffsets(t *testing.T) {
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "default"
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
Relations: map[string]*qdb.DistributedRelation{
"xx": {
Name: "xx",
ColumnNames: []string{"i"},
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
Expand Down Expand Up @@ -591,7 +637,18 @@ func TestJoins(t *testing.T) {
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "default"
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
ColTypes: []string{qdb.ColumnTypeVarchar},
Relations: map[string]*qdb.DistributedRelation{
"sshjt1": {
Name: "sshjt1",
ColumnNames: []string{"i"},
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
Expand Down Expand Up @@ -706,7 +763,17 @@ func TestUnnest(t *testing.T) {
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "default"
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
Relations: map[string]*qdb.DistributedRelation{
"xxtt1": {
Name: "xxtt1",
ColumnNames: []string{"i"},
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
Expand Down Expand Up @@ -814,7 +881,17 @@ func TestCopySingleShard(t *testing.T) {
}
/* TODO: fix by adding configurable setting */
db, _ := qdb.NewMemQDB(MemQDBPath)
distribution := "default"
distribution := "dd"

_ = db.CreateDistribution(context.TODO(), &qdb.Distribution{
ID: distribution,
Relations: map[string]*qdb.DistributedRelation{
"xx": {
Name: "xx",
ColumnNames: []string{"i"},
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
Expand Down Expand Up @@ -960,26 +1037,6 @@ func TestInsertMultiDistribution(t *testing.T) {
assert.NoError(err)

for _, tt := range []tcase{
{
query: "SELECT curr_version from schema_version where i=2 and db_name=$1'",
exp: routingstate.ShardMatchState{
Route: &routingstate.DataShardRoute{
Shkey: kr.ShardKey{
Name: "sh1",
},
Matchedkr: &kr.KeyRange{
ShardID: "sh1",
ID: "id1",
Distribution: distribution1,
LowerBound: []byte("1"),
},
},
TargetSessionAttrs: "any",
},
distribution: distribution1,
err: nil,
},

{

query: "INSERT INTO xxxdst1(i) VALUES(5);",
Expand Down Expand Up @@ -1214,6 +1271,13 @@ func TestMiscRouting(t *testing.T) {
err: nil,
},

{
query: "SELECT * FROM information_schema.sequences;",
distribution: distribution1,
exp: routingstate.RandomMatchState{},
err: nil,
},

{
query: "SELECT * FROM information_schema.columns JOIN tt ON true",
distribution: distribution1,
Expand Down
Loading