Skip to content

Commit

Permalink
Refactoring (#502)
Browse files Browse the repository at this point in the history
* Refactoring

* Fixed lint

* Fixed unittests & Removed incorrect one
  • Loading branch information
EinKrebs authored Feb 15, 2024
1 parent f35eea0 commit 113341e
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 217 deletions.
11 changes: 7 additions & 4 deletions router/mock/qrouter/mock_qrouter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion router/qrouter/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// TODO : unit tests
func (qr *ProxyQrouter) Explain(ctx context.Context, stmt *lyx.Explain, cli *clientinteractor.PSQLInteractor) error {
meta := NewRoutingMetadataContext(nil, nil, cli.GetDistribution(), nil, nil)
meta := NewRoutingMetadataContext(cli.GetDistribution(), nil, nil)

switch node := stmt.Stmt.(type) {
case *lyx.VariableSetStmt:
Expand Down
75 changes: 31 additions & 44 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/pg-sharding/spqr/pkg/config"
"github.com/pg-sharding/spqr/pkg/models/hashfunction"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/pkg/models/shrule"
"github.com/pg-sharding/spqr/pkg/session"
"github.com/pg-sharding/spqr/pkg/spqrlog"
"github.com/pg-sharding/spqr/qdb"
Expand Down Expand Up @@ -64,39 +63,20 @@ type RoutingMetadataContext struct {
// INSERT INTO x (...) SELECT 7
TargetList []lyx.Node

rls []*shrule.ShardingRule
krs []*kr.KeyRange
distribution string

params [][]byte
paramsFormatCodes []int16
// TODO: include client ops and metadata here
}

func (m *RoutingMetadataContext) CheckColumnRls(colname string) bool {
for i := range m.rls {
for _, c := range m.rls[i].Entries() {
if c.Column == colname {
return true
}
}
}
return false
}

func NewRoutingMetadataContext(
krs []*kr.KeyRange,
rls []*shrule.ShardingRule,
ds string,
params [][]byte, paramsFormatCodes []int16) *RoutingMetadataContext {
func NewRoutingMetadataContext(ds string, params [][]byte, paramsFormatCodes []int16) *RoutingMetadataContext {

meta := &RoutingMetadataContext{
rels: map[RelationFQN][]string{},
tableAliases: map[string]RelationFQN{},
exprs: map[RelationFQN]map[string]string{},
unparsed_columns: map[string]struct{}{},
krs: krs,
rls: rls,
distribution: ds,
params: params,
}
Expand Down Expand Up @@ -171,19 +151,19 @@ func (qr *ProxyQrouter) DeparseExprShardingEntries(expr lyx.Node, meta *RoutingM
}

// TODO : unit tests
func (qr *ProxyQrouter) DeparseKeyWithRangesInternal(ctx context.Context, key string, meta *RoutingMetadataContext) (*routingstate.DataShardRoute, error) {
func (qr *ProxyQrouter) DeparseKeyWithRangesInternal(_ context.Context, key string, krs []*kr.KeyRange) (*routingstate.DataShardRoute, error) {
spqrlog.Zero.Debug().
Str("key", key).
Msg("checking key")

spqrlog.Zero.Debug().
Str("key", key).
Int("key-ranges-count", len(meta.krs)).
Int("key-ranges-count", len(krs)).
Msg("checking key with key ranges")

var matched_krkey *kr.KeyRange = nil

for _, krkey := range meta.krs {
for _, krkey := range krs {
if kr.CmpRangesLessEqual(krkey.LowerBound, []byte(key)) &&
(matched_krkey == nil || kr.CmpRangesLessEqual(matched_krkey.LowerBound, krkey.LowerBound)) {
matched_krkey = krkey
Expand All @@ -205,7 +185,7 @@ func (qr *ProxyQrouter) DeparseKeyWithRangesInternal(ctx context.Context, key st
}

// TODO : unit tests
func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, meta *RoutingMetadataContext, hf hashfunction.HashFunctionType) (*routingstate.DataShardRoute, error) {
func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, meta *RoutingMetadataContext, krs []*kr.KeyRange, hf hashfunction.HashFunctionType) (*routingstate.DataShardRoute, error) {
switch e := expr.(type) {
case *lyx.ParamRef:
if e.Number > len(meta.params) {
Expand Down Expand Up @@ -233,15 +213,15 @@ func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, m
}
spqrlog.Zero.Debug().Str("key", string(routeParam)).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")

return qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), meta)
return qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), krs)
case *lyx.AExprSConst:
hashedKey, err := hashfunction.ApplyHashFunction([]byte(e.Value), hf)
if err != nil {
return nil, err
}

spqrlog.Zero.Debug().Str("key", e.Value).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")
return qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), meta)
return qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), krs)
case *lyx.AExprIConst:
val := fmt.Sprintf("%d", e.Value)
hashedKey, err := hashfunction.ApplyHashFunction([]byte(val), hf)
Expand All @@ -250,7 +230,7 @@ func (qr *ProxyQrouter) RouteKeyWithRanges(ctx context.Context, expr lyx.Node, m
}

spqrlog.Zero.Debug().Int("key", e.Value).Str("hashed key", string(hashedKey)).Msg("applying hash function on key")
return qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), meta)
return qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), krs)
default:
return nil, ComplexQuery
}
Expand Down Expand Up @@ -746,17 +726,8 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
}

/* TODO: delay this until step 2. */
krs, err := qr.mgr.ListKeyRanges(ctx, queryDistribution)
if err != nil {
return nil, err
}

rls, err := qr.mgr.ListShardingRules(ctx, queryDistribution)
if err != nil {
return nil, err
}

meta := NewRoutingMetadataContext(krs, rls, queryDistribution, sph.BindParams(), sph.BindParamFormatCodes())
meta := NewRoutingMetadataContext(queryDistribution, sph.BindParams(), sph.BindParamFormatCodes())

tsa := config.TargetSessionAttrsAny

Expand Down Expand Up @@ -931,9 +902,16 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
var route_err error
for rfqn, cols := range meta.rels {

/*
*
*/
// TODO: check by whole RFQN
ds, err := qr.mgr.GetRelationDistribution(ctx, rfqn.RelationName)
if err != nil {
return nil, err
}

krs, err := qr.mgr.ListKeyRanges(ctx, ds.Id)
if err != nil {
return nil, err
}

if rule, err := MatchShardingRule(ctx, rfqn.RelationName, cols, qr.mgr.QDB()); err != nil {
for _, col := range cols {
Expand All @@ -953,7 +931,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
continue
}

currroute, err := qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), meta)
currroute, err := qr.DeparseKeyWithRangesInternal(ctx, string(hashedKey), krs)
if err != nil {
route_err = err
spqrlog.Zero.Debug().Err(route_err).Msg("temporarily skip the route error")
Expand Down Expand Up @@ -996,6 +974,15 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
}
}

ds, err := qr.mgr.GetRelationDistribution(ctx, meta.InsertStmtRel)
if err != nil {
return nil, err
}
krs, err := qr.mgr.ListKeyRanges(ctx, ds.Id)
if err != nil {
return nil, err
}

hf, err := hashfunction.HashFunctionByName(rule.Entries[0].HashFunction)
if err != nil {
/* failed to resolve hash function */
Expand All @@ -1005,7 +992,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
meta.offsets = offsets
routed := false
if len(meta.offsets) != 0 && len(meta.TargetList) > meta.offsets[0] {
currroute, err := qr.RouteKeyWithRanges(ctx, meta.TargetList[meta.offsets[0]], meta, hf)
currroute, err := qr.RouteKeyWithRanges(ctx, meta.TargetList[meta.offsets[0]], meta, krs, hf)
if err == nil {
/* else failed, ignore */
spqrlog.Zero.Debug().
Expand All @@ -1023,7 +1010,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
if len(meta.offsets) != 0 && len(meta.ValuesLists) > meta.offsets[0] && !routed && meta.ValuesLists != nil {
// only first value from value list

currroute, err := qr.RouteKeyWithRanges(ctx, meta.ValuesLists[meta.offsets[0]], meta, hf)
currroute, err := qr.RouteKeyWithRanges(ctx, meta.ValuesLists[meta.offsets[0]], meta, krs, hf)
if err == nil { /* else failed, ignore */
spqrlog.Zero.Debug().
Interface("current-route", currroute).
Expand Down
Loading

0 comments on commit 113341e

Please sign in to comment.