Skip to content

Commit

Permalink
Information schema routing (#383)
Browse files Browse the repository at this point in the history
* Information schema routing

* Fix unit tests
  • Loading branch information
reshke authored Jan 11, 2024
1 parent af7fe63 commit d4ad588
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 17 deletions.
71 changes: 54 additions & 17 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ import (
"github.com/pg-sharding/lyx/lyx"
)

type RelationFQN struct {
RelationName string
SchemaName string
}

func RelationFQNFromRangeRangeVar(rv *lyx.RangeVar) RelationFQN {
return RelationFQN{
RelationName: rv.RelationName,
SchemaName: rv.SchemaName,
}
}

type RoutingMetadataContext struct {
// this maps table names to its query-defined restrictions
// All columns in query should be considered in context of its table,
Expand All @@ -27,8 +39,8 @@ type RoutingMetadataContext struct {
// and
// SELECT * FROM a join b WHERE a.c1 = <val> and a.c2 = <val>
// can be routed with different rules
rels map[string][]string
exprs map[string]map[string]string
rels map[RelationFQN][]string
exprs map[RelationFQN]map[string]string

unparsed_columns map[string]struct{}

Expand All @@ -37,7 +49,7 @@ type RoutingMetadataContext struct {
// needed to parse
// SELECT * FROM t1 a where a.i = 1
// rarg:{range_var:{relname:"t2" inh:true relpersistence:"p" alias:{aliasname:"b"}
tableAliases map[string]string
tableAliases map[string]RelationFQN

// For
// INSERT INTO x VALUES(**)
Expand Down Expand Up @@ -75,9 +87,9 @@ func NewRoutingMetadataContext(
ds string,
params [][]byte) *RoutingMetadataContext {
return &RoutingMetadataContext{
rels: map[string][]string{},
tableAliases: map[string]string{},
exprs: map[string]map[string]string{},
rels: map[RelationFQN][]string{},
tableAliases: map[string]RelationFQN{},
exprs: map[RelationFQN]map[string]string{},
unparsed_columns: map[string]struct{}{},
krs: krs,
rls: rls,
Expand All @@ -86,7 +98,7 @@ func NewRoutingMetadataContext(
}
}

func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation, colname string, expr *lyx.AExprConst) {
func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation RelationFQN, colname string, expr *lyx.AExprConst) {
meta.rels[resolvedRelation] = append(meta.rels[resolvedRelation], colname)
if _, ok := meta.exprs[resolvedRelation]; !ok {
meta.exprs[resolvedRelation] = map[string]string{}
Expand All @@ -95,15 +107,15 @@ func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation, colname st
meta.exprs[resolvedRelation][colname] = expr.Value
}

func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (string, error) {
func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (RelationFQN, error) {
if resolvedRelation, ok := meta.tableAliases[alias]; ok {
// TBD: postpone routing from here to root of parsing tree
return resolvedRelation, nil
} else {
// TBD: postpone routing from here to root of parsing tree
if len(meta.rels) != 1 {
// ambiguity in column aliasing
return "", ComplexQuery
return RelationFQN{}, ComplexQuery
}
for tbl := range meta.rels {
resolvedRelation = tbl
Expand All @@ -113,6 +125,7 @@ func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (string
}

var ComplexQuery = fmt.Errorf("too complex query to parse")
var InformationSchemaCombinedQuery = fmt.Errorf("combined information schema and regular relation is not supported")
var FailedToFindKeyRange = fmt.Errorf("failed to match key with ranges")
var FailedToMatch = fmt.Errorf("failed to match query to any sharding rule")
var SkipColumn = fmt.Errorf("skip column for routing")
Expand Down Expand Up @@ -308,12 +321,13 @@ func (qr *ProxyQrouter) deparseFromNode(node lyx.FromClauseNode, meta *RoutingMe
Msg("deparsing from node")
switch q := node.(type) {
case *lyx.RangeVar:
if _, ok := meta.rels[q.RelationName]; !ok {
meta.rels[q.RelationName] = nil
rqdn := RelationFQNFromRangeRangeVar(q)
if _, ok := meta.rels[rqdn]; !ok {
meta.rels[rqdn] = nil
}
if q.Alias != "" {
/* remember table alias */
meta.tableAliases[q.Alias] = q.RelationName
meta.tableAliases[q.Alias] = RelationFQNFromRangeRangeVar(q)
}
case *lyx.JoinExpr:
if err := qr.deparseFromNode(q.Rarg, meta); err != nil {
Expand Down Expand Up @@ -596,6 +610,29 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
return nil, err
}
}

/* immidiately error-out some corner cases, for example, when client
* tries to access information schema AND other relation in same TX
* as we are unable to serve this properly. Or can we?
*/
has_inf_schema := false
has_other_schema := false
for rqfn := range meta.rels {
if rqfn.SchemaName == "information_schema" {
has_inf_schema = true
} else {
has_other_schema = true
}
}

if has_inf_schema && has_other_schema {
return nil, InformationSchemaCombinedQuery
}
if has_inf_schema {
/* metadata-only relation can actually be routed somewhere */
return routingstate.RandomMatchState{}, nil
}

case *lyx.Delete, *lyx.Update, *lyx.Copy:
// UPDATE and/or DELETE, COPY stmts, which
// would be routed with their WHERE clause
Expand Down Expand Up @@ -624,8 +661,8 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
if meta.exprs != nil {
// traverse each deparsed relation from query
var route_err error
for tname, cols := range meta.rels {
if rule, err := MatchShardingRule(ctx, tname, cols, qr.mgr.QDB()); err != nil {
for rfqn, cols := range meta.rels {
if rule, err := MatchShardingRule(ctx, rfqn.RelationName, cols, qr.mgr.QDB()); err != nil {
for _, col := range cols {
// TODO: multi-column hash functions
hf, err := hashfunction.HashFunctionByName(rule.Entries[0].HashFunction)
Expand All @@ -634,9 +671,9 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
continue
}

hashedKey, err := hashfunction.ApplyHashFunction([]byte(meta.exprs[tname][col]), hf)
hashedKey, err := hashfunction.ApplyHashFunction([]byte(meta.exprs[rfqn][col]), hf)

spqrlog.Zero.Debug().Str("key", meta.exprs[tname][col]).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")
spqrlog.Zero.Debug().Str("key", meta.exprs[rfqn][col]).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")

if err != nil {
spqrlog.Zero.Debug().Err(err).Msg("failed to apply hash function")
Expand All @@ -652,7 +689,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s

spqrlog.Zero.Debug().
Interface("currroute", currroute).
Str("table", tname).
Str("table", rfqn.RelationName).
Strs("columns", cols).
Msg("calculated route for table/cols")

Expand Down
100 changes: 100 additions & 0 deletions router/qrouter/proxy_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,3 +1054,103 @@ func TestSetStmt(t *testing.T) {
assert.Equal(tt.exp, tmp, tt.query)
}
}

func TestMiscRouting(t *testing.T) {
assert := assert.New(t)

type tcase struct {
query string
dataspace string
exp routingstate.RoutingState
err error
}
db, _ := qdb.NewMemQDB(MemQDBPath)
dataspace1 := "ds1"
dataspace2 := "ds2"

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
DataspaceId: dataspace1,
TableName: "",
Entries: []qdb.ShardingRuleEntry{
{
Column: "i",
},
},
})

_ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{
ID: "id1",
DataspaceId: dataspace2,
TableName: "",
Entries: []qdb.ShardingRuleEntry{
{
Column: "i",
},
},
})

err := db.AddKeyRange(context.TODO(), &qdb.KeyRange{
ShardID: "sh1",
DataspaceId: dataspace1,
KeyRangeID: "id1",
LowerBound: []byte("1"),
UpperBound: []byte("11"),
})

assert.NoError(err)

err = db.AddKeyRange(context.TODO(), &qdb.KeyRange{
ShardID: "sh2",
DataspaceId: dataspace2,
KeyRangeID: "id2",
LowerBound: []byte("1"),
UpperBound: []byte("11"),
})

assert.NoError(err)

lc := local.NewLocalCoordinator(db)

pr, err := qrouter.NewProxyRouter(map[string]*config.Shard{
"sh1": {
Hosts: nil,
},
"sh2": {
Hosts: nil,
},
}, lc, &config.QRouter{
DefaultRouteBehaviour: "BLOCK",
})

assert.NoError(err)

for _, tt := range []tcase{
{
query: "SELECT * FROM information_schema.columns;",
dataspace: dataspace1,
exp: routingstate.RandomMatchState{},
err: nil,
},

{
query: "SELECT * FROM information_schema.columns JOIN tt ON true",
dataspace: dataspace1,
exp: nil,
err: qrouter.InformationSchemaCombinedQuery,
},
} {
parserRes, err := lyx.Parse(tt.query)

assert.NoError(err, "query %s", tt.query)

tmp, err := pr.Route(context.TODO(), parserRes, session.NewDummyHandler(tt.dataspace))
if tt.err == nil {
assert.NoError(err, "query %s", tt.query)

assert.Equal(tt.exp, tmp, tt.query)
} else {
assert.Error(tt.err, err, tt.query)
}
}
}

0 comments on commit d4ad588

Please sign in to comment.