From d4ad588f14295f7004e57d837c3b9de98cfabffc Mon Sep 17 00:00:00 2001 From: reshke Date: Thu, 11 Jan 2024 16:26:17 +0500 Subject: [PATCH] Information schema routing (#383) * Information schema routing * Fix unit tests --- router/qrouter/proxy_routing.go | 71 ++++++++++++++----- router/qrouter/proxy_routing_test.go | 100 +++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 17 deletions(-) diff --git a/router/qrouter/proxy_routing.go b/router/qrouter/proxy_routing.go index c859192a7..7fba79a74 100644 --- a/router/qrouter/proxy_routing.go +++ b/router/qrouter/proxy_routing.go @@ -17,6 +17,18 @@ import ( "github.com/pg-sharding/lyx/lyx" ) +type RelationFQN struct { + RelationName string + SchemaName string +} + +func RelationFQNFromRangeRangeVar(rv *lyx.RangeVar) RelationFQN { + return RelationFQN{ + RelationName: rv.RelationName, + SchemaName: rv.SchemaName, + } +} + type RoutingMetadataContext struct { // this maps table names to its query-defined restrictions // All columns in query should be considered in context of its table, @@ -27,8 +39,8 @@ type RoutingMetadataContext struct { // and // SELECT * FROM a join b WHERE a.c1 = and a.c2 = // can be routed with different rules - rels map[string][]string - exprs map[string]map[string]string + rels map[RelationFQN][]string + exprs map[RelationFQN]map[string]string unparsed_columns map[string]struct{} @@ -37,7 +49,7 @@ type RoutingMetadataContext struct { // needed to parse // SELECT * FROM t1 a where a.i = 1 // rarg:{range_var:{relname:"t2" inh:true relpersistence:"p" alias:{aliasname:"b"} - tableAliases map[string]string + tableAliases map[string]RelationFQN // For // INSERT INTO x VALUES(**) @@ -75,9 +87,9 @@ func NewRoutingMetadataContext( ds string, params [][]byte) *RoutingMetadataContext { return &RoutingMetadataContext{ - rels: map[string][]string{}, - tableAliases: map[string]string{}, - exprs: map[string]map[string]string{}, + rels: map[RelationFQN][]string{}, + tableAliases: map[string]RelationFQN{}, + exprs: map[RelationFQN]map[string]string{}, unparsed_columns: map[string]struct{}{}, krs: krs, rls: rls, @@ -86,7 +98,7 @@ func NewRoutingMetadataContext( } } -func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation, colname string, expr *lyx.AExprConst) { +func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation RelationFQN, colname string, expr *lyx.AExprConst) { meta.rels[resolvedRelation] = append(meta.rels[resolvedRelation], colname) if _, ok := meta.exprs[resolvedRelation]; !ok { meta.exprs[resolvedRelation] = map[string]string{} @@ -95,7 +107,7 @@ func (meta *RoutingMetadataContext) RecordConstExpr(resolvedRelation, colname st meta.exprs[resolvedRelation][colname] = expr.Value } -func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (string, error) { +func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (RelationFQN, error) { if resolvedRelation, ok := meta.tableAliases[alias]; ok { // TBD: postpone routing from here to root of parsing tree return resolvedRelation, nil @@ -103,7 +115,7 @@ func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (string // TBD: postpone routing from here to root of parsing tree if len(meta.rels) != 1 { // ambiguity in column aliasing - return "", ComplexQuery + return RelationFQN{}, ComplexQuery } for tbl := range meta.rels { resolvedRelation = tbl @@ -113,6 +125,7 @@ func (meta *RoutingMetadataContext) ResolveRelationByAlias(alias string) (string } var ComplexQuery = fmt.Errorf("too complex query to parse") +var InformationSchemaCombinedQuery = fmt.Errorf("combined information schema and regular relation is not supported") var FailedToFindKeyRange = fmt.Errorf("failed to match key with ranges") var FailedToMatch = fmt.Errorf("failed to match query to any sharding rule") var SkipColumn = fmt.Errorf("skip column for routing") @@ -308,12 +321,13 @@ func (qr *ProxyQrouter) deparseFromNode(node lyx.FromClauseNode, meta *RoutingMe Msg("deparsing from node") switch q := node.(type) { case *lyx.RangeVar: - if _, ok := meta.rels[q.RelationName]; !ok { - meta.rels[q.RelationName] = nil + rqdn := RelationFQNFromRangeRangeVar(q) + if _, ok := meta.rels[rqdn]; !ok { + meta.rels[rqdn] = nil } if q.Alias != "" { /* remember table alias */ - meta.tableAliases[q.Alias] = q.RelationName + meta.tableAliases[q.Alias] = RelationFQNFromRangeRangeVar(q) } case *lyx.JoinExpr: if err := qr.deparseFromNode(q.Rarg, meta); err != nil { @@ -596,6 +610,29 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s return nil, err } } + + /* immidiately error-out some corner cases, for example, when client + * tries to access information schema AND other relation in same TX + * as we are unable to serve this properly. Or can we? + */ + has_inf_schema := false + has_other_schema := false + for rqfn := range meta.rels { + if rqfn.SchemaName == "information_schema" { + has_inf_schema = true + } else { + has_other_schema = true + } + } + + if has_inf_schema && has_other_schema { + return nil, InformationSchemaCombinedQuery + } + if has_inf_schema { + /* metadata-only relation can actually be routed somewhere */ + return routingstate.RandomMatchState{}, nil + } + case *lyx.Delete, *lyx.Update, *lyx.Copy: // UPDATE and/or DELETE, COPY stmts, which // would be routed with their WHERE clause @@ -624,8 +661,8 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s if meta.exprs != nil { // traverse each deparsed relation from query var route_err error - for tname, cols := range meta.rels { - if rule, err := MatchShardingRule(ctx, tname, cols, qr.mgr.QDB()); err != nil { + for rfqn, cols := range meta.rels { + if rule, err := MatchShardingRule(ctx, rfqn.RelationName, cols, qr.mgr.QDB()); err != nil { for _, col := range cols { // TODO: multi-column hash functions hf, err := hashfunction.HashFunctionByName(rule.Entries[0].HashFunction) @@ -634,9 +671,9 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s continue } - hashedKey, err := hashfunction.ApplyHashFunction([]byte(meta.exprs[tname][col]), hf) + hashedKey, err := hashfunction.ApplyHashFunction([]byte(meta.exprs[rfqn][col]), hf) - spqrlog.Zero.Debug().Str("key", meta.exprs[tname][col]).Str("hashed key", string(hashedKey)).Msg("applying hash function on key") + spqrlog.Zero.Debug().Str("key", meta.exprs[rfqn][col]).Str("hashed key", string(hashedKey)).Msg("applying hash function on key") if err != nil { spqrlog.Zero.Debug().Err(err).Msg("failed to apply hash function") @@ -652,7 +689,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s spqrlog.Zero.Debug(). Interface("currroute", currroute). - Str("table", tname). + Str("table", rfqn.RelationName). Strs("columns", cols). Msg("calculated route for table/cols") diff --git a/router/qrouter/proxy_routing_test.go b/router/qrouter/proxy_routing_test.go index f26920062..c500e0e7b 100644 --- a/router/qrouter/proxy_routing_test.go +++ b/router/qrouter/proxy_routing_test.go @@ -1054,3 +1054,103 @@ func TestSetStmt(t *testing.T) { assert.Equal(tt.exp, tmp, tt.query) } } + +func TestMiscRouting(t *testing.T) { + assert := assert.New(t) + + type tcase struct { + query string + dataspace string + exp routingstate.RoutingState + err error + } + db, _ := qdb.NewMemQDB(MemQDBPath) + dataspace1 := "ds1" + dataspace2 := "ds2" + + _ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{ + ID: "id1", + DataspaceId: dataspace1, + TableName: "", + Entries: []qdb.ShardingRuleEntry{ + { + Column: "i", + }, + }, + }) + + _ = db.AddShardingRule(context.TODO(), &qdb.ShardingRule{ + ID: "id1", + DataspaceId: dataspace2, + TableName: "", + Entries: []qdb.ShardingRuleEntry{ + { + Column: "i", + }, + }, + }) + + err := db.AddKeyRange(context.TODO(), &qdb.KeyRange{ + ShardID: "sh1", + DataspaceId: dataspace1, + KeyRangeID: "id1", + LowerBound: []byte("1"), + UpperBound: []byte("11"), + }) + + assert.NoError(err) + + err = db.AddKeyRange(context.TODO(), &qdb.KeyRange{ + ShardID: "sh2", + DataspaceId: dataspace2, + KeyRangeID: "id2", + LowerBound: []byte("1"), + UpperBound: []byte("11"), + }) + + assert.NoError(err) + + lc := local.NewLocalCoordinator(db) + + pr, err := qrouter.NewProxyRouter(map[string]*config.Shard{ + "sh1": { + Hosts: nil, + }, + "sh2": { + Hosts: nil, + }, + }, lc, &config.QRouter{ + DefaultRouteBehaviour: "BLOCK", + }) + + assert.NoError(err) + + for _, tt := range []tcase{ + { + query: "SELECT * FROM information_schema.columns;", + dataspace: dataspace1, + exp: routingstate.RandomMatchState{}, + err: nil, + }, + + { + query: "SELECT * FROM information_schema.columns JOIN tt ON true", + dataspace: dataspace1, + exp: nil, + err: qrouter.InformationSchemaCombinedQuery, + }, + } { + parserRes, err := lyx.Parse(tt.query) + + assert.NoError(err, "query %s", tt.query) + + tmp, err := pr.Route(context.TODO(), parserRes, session.NewDummyHandler(tt.dataspace)) + if tt.err == nil { + assert.NoError(err, "query %s", tt.query) + + assert.Equal(tt.exp, tmp, tt.query) + } else { + assert.Error(tt.err, err, tt.query) + } + } +}