diff --git a/pkg/clientinteractor/interactor.go b/pkg/clientinteractor/interactor.go index f735e5fc1..3a3eb4c67 100644 --- a/pkg/clientinteractor/interactor.go +++ b/pkg/clientinteractor/interactor.go @@ -241,6 +241,34 @@ func (pi *PSQLInteractor) Version(_ context.Context) error { return pi.CompleteMsg(0) } +// Quantiles sends the row description message for total time quantiles of queries in router and in shard. +// +// Parameters: +// - _ (context.Context): The context parameter (not used in the function). +// +// Returns: +// - error: An error if sending the messages fails, otherwise nil. +// +// TODO: unit tests +func (pi *PSQLInteractor) Quantiles(_ context.Context) error { + if err := pi.WriteHeader("quantile_type", "value"); err != nil { + spqrlog.Zero.Error().Err(err).Msg("") + return err + } + + quantiles := statistics.GetQuantiles() + for _, q := range *quantiles { + if err := pi.WriteDataRow(fmt.Sprintf("router_time_%.2f", q), fmt.Sprintf("%.2fms", statistics.GetTotalTimeQuantile(statistics.Router, q))); err != nil { + return err + } + if err := pi.WriteDataRow(fmt.Sprintf("shard_time_%.2f", q), fmt.Sprintf("%.2fms", statistics.GetTotalTimeQuantile(statistics.Shard, q))); err != nil { + return err + } + } + + return pi.CompleteMsg(len(*quantiles) * 2) +} + // TODO : unit tests // AddShard sends the row description message for adding a data shard, followed by a data row diff --git a/pkg/meta/meta.go b/pkg/meta/meta.go index 1e5996e6f..838c57f83 100644 --- a/pkg/meta/meta.go +++ b/pkg/meta/meta.go @@ -542,6 +542,8 @@ func ProcessShow(ctx context.Context, stmt *spqrparser.Show, mngr EntityMgr, ci } return cli.PreparedStatements(ctx, resp) + case spqrparser.QuantilesStr: + return cli.Quantiles(ctx) default: return unknownCoordinatorCommand } diff --git a/router/statistics/query_time_statistics.go b/router/statistics/query_time_statistics.go index 6f306d123..f1acd4ee3 100644 --- a/router/statistics/query_time_statistics.go +++ b/router/statistics/query_time_statistics.go @@ -23,6 +23,8 @@ type startTimes struct { type statistics struct { RouterTime map[uint]*tdigest.TDigest ShardTime map[uint]*tdigest.TDigest + RouterTimeTotal *tdigest.TDigest + ShardTimeTotal *tdigest.TDigest TimeData map[uint]*startTimes Quantiles []float64 NeedToCollectData bool @@ -30,10 +32,12 @@ type statistics struct { } var queryStatistics = statistics{ - RouterTime: make(map[uint]*tdigest.TDigest), - ShardTime: make(map[uint]*tdigest.TDigest), - TimeData: make(map[uint]*startTimes), - lock: sync.RWMutex{}, + RouterTime: make(map[uint]*tdigest.TDigest), + ShardTime: make(map[uint]*tdigest.TDigest), + RouterTimeTotal: nil, + ShardTimeTotal: nil, + TimeData: make(map[uint]*startTimes), + lock: sync.RWMutex{}, } func InitStatistics(q []float64) { @@ -44,19 +48,22 @@ func InitStatistics(q []float64) { } else { queryStatistics.NeedToCollectData = true } + + queryStatistics.RouterTimeTotal, _ = tdigest.New() + queryStatistics.ShardTimeTotal, _ = tdigest.New() } func GetQuantiles() *[]float64 { return &queryStatistics.Quantiles } -func GetTimeQuantile(tip StatisticsType, q float64, client uint) float64 { +func GetTimeQuantile(statType StatisticsType, q float64, client uint) float64 { queryStatistics.lock.Lock() defer queryStatistics.lock.Unlock() var stat *tdigest.TDigest - switch tip { + switch statType { case Router: stat = queryStatistics.RouterTime[client] if stat == nil { @@ -74,7 +81,27 @@ func GetTimeQuantile(tip StatisticsType, q float64, client uint) float64 { } } -func RecordStartTime(tip StatisticsType, t time.Time, client uint) { +func GetTotalTimeQuantile(statType StatisticsType, q float64) float64 { + queryStatistics.lock.Lock() + defer queryStatistics.lock.Unlock() + + switch statType { + case Router: + if queryStatistics.RouterTimeTotal == nil || queryStatistics.RouterTimeTotal.Count() == 0 { + return 0 + } + return queryStatistics.RouterTimeTotal.Quantile(q) + case Shard: + if queryStatistics.ShardTimeTotal == nil || queryStatistics.ShardTimeTotal.Count() == 0 { + return 0 + } + return queryStatistics.ShardTimeTotal.Quantile(q) + default: + return 0 + } +} + +func RecordStartTime(statType StatisticsType, t time.Time, client uint) { if queryStatistics.NeedToCollectData { return } @@ -85,7 +112,7 @@ func RecordStartTime(tip StatisticsType, t time.Time, client uint) { if queryStatistics.TimeData[client] == nil { queryStatistics.TimeData[client] = &startTimes{} } - switch tip { + switch statType { case Router: queryStatistics.TimeData[client].RouterStart = t case Shard: @@ -107,12 +134,28 @@ func RecordFinishedTransaction(t time.Time, client uint) { if queryStatistics.ShardTime[client] == nil { queryStatistics.ShardTime[client], _ = tdigest.New() } - err := queryStatistics.RouterTime[client].Add(float64(t.Sub(queryStatistics.TimeData[client].RouterStart).Microseconds()) / 1000) + if queryStatistics.RouterTimeTotal == nil { + queryStatistics.RouterTimeTotal, _ = tdigest.New() + } + if queryStatistics.ShardTimeTotal == nil { + queryStatistics.ShardTimeTotal, _ = tdigest.New() + } + routerTime := float64(t.Sub(queryStatistics.TimeData[client].RouterStart).Microseconds()) / 1000 + shardTime := float64(t.Sub(queryStatistics.TimeData[client].ShardStart).Microseconds()) / 1000 + err := queryStatistics.RouterTime[client].Add(routerTime) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration") + } + err = queryStatistics.ShardTime[client].Add(shardTime) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration") + } + err = queryStatistics.RouterTimeTotal.Add(routerTime) if err != nil { - spqrlog.Zero.Error().Err(err).Msg(err.Error()) + spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration") } - err = queryStatistics.ShardTime[client].Add(float64(t.Sub(queryStatistics.TimeData[client].ShardStart).Microseconds()) / 1000) + err = queryStatistics.ShardTimeTotal.Add(shardTime) if err != nil { - spqrlog.Zero.Error().Err(err).Msg(err.Error()) + spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration") } } diff --git a/router/statistics/qury_time_statistics_test.go b/router/statistics/qury_time_statistics_test.go index f29953bc7..03318980c 100644 --- a/router/statistics/qury_time_statistics_test.go +++ b/router/statistics/qury_time_statistics_test.go @@ -33,6 +33,9 @@ func TestStatisticsForOneUser(t *testing.T) { assert.Equal(4.0, statistics.GetTimeQuantile(statistics.Router, 0.5, 144)) assert.Equal(3.0, statistics.GetTimeQuantile(statistics.Shard, 0.5, 144)) + + assert.Equal(4.0, statistics.GetTotalTimeQuantile(statistics.Router, 0.5)) + assert.Equal(3.0, statistics.GetTotalTimeQuantile(statistics.Shard, 0.5)) } func TestStatisticsForDifferentUsers(t *testing.T) { @@ -62,6 +65,9 @@ func TestStatisticsForDifferentUsers(t *testing.T) { assert.Equal(7.0, statistics.GetTimeQuantile(statistics.Router, 0.5, 229)) assert.Equal(1.5, statistics.GetTimeQuantile(statistics.Shard, 0.5, 229)) + + assert.Equal(5.0, statistics.GetTotalTimeQuantile(statistics.Router, 0.5)) + assert.Equal(1.0, statistics.GetTotalTimeQuantile(statistics.Shard, 0.5)) } func TestNoStatisticsForMisingUser(t *testing.T) { @@ -83,6 +89,7 @@ func TestNoStatisticsWhenNotNeeded(t *testing.T) { statistics.RecordFinishedTransaction(tim.Add(time.Millisecond*2), 149) assert.Equal(0.0, statistics.GetTimeQuantile(statistics.Router, 0.5, 149)) + assert.Equal(0.0, statistics.GetTotalTimeQuantile(statistics.Router, 0.5)) } func TestCheckMultithreading(t *testing.T) { @@ -102,6 +109,8 @@ func TestCheckMultithreading(t *testing.T) { statistics.GetTimeQuantile(statistics.Router, 0.5, 149) statistics.GetTimeQuantile(statistics.Router, 0.99, 149) + statistics.GetTotalTimeQuantile(statistics.Router, 0.99) + statistics.GetTotalTimeQuantile(statistics.Router, 0.99) } wg.Done() }() diff --git a/test/regress/schedule/console b/test/regress/schedule/console index 1d521a6a1..35ba7855c 100644 --- a/test/regress/schedule/console +++ b/test/regress/schedule/console @@ -5,6 +5,7 @@ test: show_key_ranges test: show_distributions test: show_version test: show_relations +test: show_time_quantiles test: drop test: add test: hash diff --git a/test/regress/tests/console/expected/show_time_quantiles.out b/test/regress/tests/console/expected/show_time_quantiles.out new file mode 100644 index 000000000..641bf56f5 --- /dev/null +++ b/test/regress/tests/console/expected/show_time_quantiles.out @@ -0,0 +1,14 @@ + + SPQR router admin console + Here you can configure your routing rules +------------------------------------------------ + You can find documentation here +https://github.com/pg-sharding/spqr/tree/master/docs + +SHOW time_quantiles + quantile_type | value +------------------+-------- + router_time_0.75 | 0.00ms + shard_time_0.75 | 0.00ms +(2 rows) + diff --git a/test/regress/tests/console/sql/show_time_quantiles.sql b/test/regress/tests/console/sql/show_time_quantiles.sql new file mode 100644 index 000000000..1822486d9 --- /dev/null +++ b/test/regress/tests/console/sql/show_time_quantiles.sql @@ -0,0 +1 @@ +SHOW time_quantiles \ No newline at end of file diff --git a/yacc/console/ast.go b/yacc/console/ast.go index c84c20889..9fc07f2c4 100644 --- a/yacc/console/ast.go +++ b/yacc/console/ast.go @@ -264,6 +264,7 @@ const ( TaskGroupStr = "task_group" PreparedStatementsStr = "prepared_statements" UnsupportedStr = "unsupported" + QuantilesStr = "time_quantiles" ) const ( diff --git a/yacc/console/gram.go b/yacc/console/gram.go index b1c6c76af..5116ed182 100644 --- a/yacc/console/gram.go +++ b/yacc/console/gram.go @@ -263,7 +263,7 @@ const yyInitialStackSize = 16 //line gram.y:864 //line yacctab:1 -var yyExca = [...]int{ +var yyExca = [...]int8{ -1, 1, 1, -1, -2, 0, @@ -273,7 +273,7 @@ const yyPrivate = 57344 const yyLast = 251 -var yyAct = [...]int{ +var yyAct = [...]uint8{ 134, 180, 220, 154, 183, 175, 146, 153, 152, 158, 133, 131, 145, 141, 116, 92, 130, 27, 28, 155, 208, 209, 176, 177, 178, 97, 143, 122, 89, 30, @@ -302,7 +302,7 @@ var yyAct = [...]int{ 99, } -var yyPact = [...]int{ +var yyPact = [...]int16{ 11, -1000, 130, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, 87, 87, -28, -36, 21, 78, 78, 190, 39, 187, @@ -328,7 +328,7 @@ var yyPact = [...]int{ -1000, 86, -18, -1000, 189, -1000, -1000, -1000, } -var yyPgo = [...]int{ +var yyPgo = [...]uint8{ 0, 250, 11, 7, 8, 249, 248, 10, 3, 0, 19, 247, 246, 156, 137, 245, 244, 243, 242, 241, 240, 239, 238, 237, 155, 153, 146, 141, 12, 236, @@ -337,7 +337,7 @@ var yyPgo = [...]int{ 219, 217, 216, 213, 212, 210, 205, 203, 199, 197, } -var yyR1 = [...]int{ +var yyR1 = [...]int8{ 0, 58, 59, 59, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 10, 8, 8, 8, 9, 5, 5, 5, 6, @@ -353,7 +353,7 @@ var yyR1 = [...]int{ 55, 52, 51, 56, 57, 57, } -var yyR2 = [...]int{ +var yyR2 = [...]int8{ 0, 2, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, @@ -369,7 +369,7 @@ var yyR2 = [...]int{ 4, 2, 1, 5, 3, 3, } -var yyChk = [...]int{ +var yyChk = [...]int16{ -1000, -58, -11, -20, -21, -22, -23, -19, -50, -49, -17, -18, -52, -51, -53, -54, -55, -56, -57, -37, 25, 24, 68, 69, 26, 27, 28, 6, 7, 19, @@ -395,7 +395,7 @@ var yyChk = [...]int{ -31, -9, -9, -36, 12, -34, -36, -31, } -var yyDef = [...]int{ +var yyDef = [...]int8{ 0, -2, 2, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -421,11 +421,11 @@ var yyDef = [...]int{ 60, 99, 101, 107, 0, 61, 106, 59, } -var yyTok1 = [...]int{ +var yyTok1 = [...]int8{ 1, } -var yyTok2 = [...]int{ +var yyTok2 = [...]int8{ 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, @@ -436,7 +436,7 @@ var yyTok2 = [...]int{ 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, } -var yyTok3 = [...]int{ +var yyTok3 = [...]int8{ 0, } @@ -518,9 +518,9 @@ func yyErrorMessage(state, lookAhead int) string { expected := make([]int, 0, 4) // Look for shiftable tokens. - base := yyPact[state] + base := int(yyPact[state]) for tok := TOKSTART; tok-1 < len(yyToknames); tok++ { - if n := base + tok; n >= 0 && n < yyLast && yyChk[yyAct[n]] == tok { + if n := base + tok; n >= 0 && n < yyLast && int(yyChk[int(yyAct[n])]) == tok { if len(expected) == cap(expected) { return res } @@ -530,13 +530,13 @@ func yyErrorMessage(state, lookAhead int) string { if yyDef[state] == -2 { i := 0 - for yyExca[i] != -1 || yyExca[i+1] != state { + for yyExca[i] != -1 || int(yyExca[i+1]) != state { i += 2 } // Look for tokens that we accept or reduce. for i += 2; yyExca[i] >= 0; i += 2 { - tok := yyExca[i] + tok := int(yyExca[i]) if tok < TOKSTART || yyExca[i+1] == 0 { continue } @@ -567,30 +567,30 @@ func yylex1(lex yyLexer, lval *yySymType) (char, token int) { token = 0 char = lex.Lex(lval) if char <= 0 { - token = yyTok1[0] + token = int(yyTok1[0]) goto out } if char < len(yyTok1) { - token = yyTok1[char] + token = int(yyTok1[char]) goto out } if char >= yyPrivate { if char < yyPrivate+len(yyTok2) { - token = yyTok2[char-yyPrivate] + token = int(yyTok2[char-yyPrivate]) goto out } } for i := 0; i < len(yyTok3); i += 2 { - token = yyTok3[i+0] + token = int(yyTok3[i+0]) if token == char { - token = yyTok3[i+1] + token = int(yyTok3[i+1]) goto out } } out: if token == 0 { - token = yyTok2[1] /* unknown char */ + token = int(yyTok2[1]) /* unknown char */ } if yyDebug >= 3 { __yyfmt__.Printf("lex %s(%d)\n", yyTokname(token), uint(char)) @@ -645,7 +645,7 @@ yystack: yyS[yyp].yys = yystate yynewstate: - yyn = yyPact[yystate] + yyn = int(yyPact[yystate]) if yyn <= yyFlag { goto yydefault /* simple state */ } @@ -656,8 +656,8 @@ yynewstate: if yyn < 0 || yyn >= yyLast { goto yydefault } - yyn = yyAct[yyn] - if yyChk[yyn] == yytoken { /* valid shift */ + yyn = int(yyAct[yyn]) + if int(yyChk[yyn]) == yytoken { /* valid shift */ yyrcvr.char = -1 yytoken = -1 yyVAL = yyrcvr.lval @@ -670,7 +670,7 @@ yynewstate: yydefault: /* default state action */ - yyn = yyDef[yystate] + yyn = int(yyDef[yystate]) if yyn == -2 { if yyrcvr.char < 0 { yyrcvr.char, yytoken = yylex1(yylex, &yyrcvr.lval) @@ -679,18 +679,18 @@ yydefault: /* look through exception table */ xi := 0 for { - if yyExca[xi+0] == -1 && yyExca[xi+1] == yystate { + if yyExca[xi+0] == -1 && int(yyExca[xi+1]) == yystate { break } xi += 2 } for xi += 2; ; xi += 2 { - yyn = yyExca[xi+0] + yyn = int(yyExca[xi+0]) if yyn < 0 || yyn == yytoken { break } } - yyn = yyExca[xi+1] + yyn = int(yyExca[xi+1]) if yyn < 0 { goto ret0 } @@ -712,10 +712,10 @@ yydefault: /* find a state where "error" is a legal shift action */ for yyp >= 0 { - yyn = yyPact[yyS[yyp].yys] + yyErrCode + yyn = int(yyPact[yyS[yyp].yys]) + yyErrCode if yyn >= 0 && yyn < yyLast { - yystate = yyAct[yyn] /* simulate a shift of "error" */ - if yyChk[yystate] == yyErrCode { + yystate = int(yyAct[yyn]) /* simulate a shift of "error" */ + if int(yyChk[yystate]) == yyErrCode { goto yystack } } @@ -751,7 +751,7 @@ yydefault: yypt := yyp _ = yypt // guard against "declared and not used" - yyp -= yyR2[yyn] + yyp -= int(yyR2[yyn]) // yyp is now the index of $0. Perform the default action. Iff the // reduced production is ε, $1 is possibly out of range. if yyp+1 >= len(yyS) { @@ -762,16 +762,16 @@ yydefault: yyVAL = yyS[yyp+1] /* consult goto table to find next state */ - yyn = yyR1[yyn] - yyg := yyPgo[yyn] + yyn = int(yyR1[yyn]) + yyg := int(yyPgo[yyn]) yyj := yyg + yyS[yyp].yys + 1 if yyj >= yyLast { - yystate = yyAct[yyg] + yystate = int(yyAct[yyg]) } else { - yystate = yyAct[yyj] - if yyChk[yystate] != -yyn { - yystate = yyAct[yyg] + yystate = int(yyAct[yyj]) + if int(yyChk[yystate]) != -yyn { + yystate = int(yyAct[yyg]) } } // dummy call; replaced with literal code @@ -1000,7 +1000,7 @@ yydefault: //line gram.y:382 { switch v := strings.ToLower(string(yyDollar[1].str)); v { - case DatabasesStr, RoutersStr, PoolsStr, ShardsStr, BackendConnectionsStr, KeyRangesStr, ShardingRules, ClientsStr, StatusStr, DistributionsStr, VersionStr, RelationsStr, TaskGroupStr, PreparedStatementsStr: + case DatabasesStr, RoutersStr, PoolsStr, ShardsStr, BackendConnectionsStr, KeyRangesStr, ShardingRules, ClientsStr, StatusStr, DistributionsStr, VersionStr, RelationsStr, TaskGroupStr, PreparedStatementsStr, QuantilesStr: yyVAL.str = v default: yyVAL.str = UnsupportedStr diff --git a/yacc/console/gram.y b/yacc/console/gram.y index f6ed6e80a..db47a1bd7 100644 --- a/yacc/console/gram.y +++ b/yacc/console/gram.y @@ -381,7 +381,7 @@ show_statement_type: IDENT { switch v := strings.ToLower(string($1)); v { - case DatabasesStr, RoutersStr, PoolsStr, ShardsStr, BackendConnectionsStr, KeyRangesStr, ShardingRules, ClientsStr, StatusStr, DistributionsStr, VersionStr, RelationsStr, TaskGroupStr, PreparedStatementsStr: + case DatabasesStr, RoutersStr, PoolsStr, ShardsStr, BackendConnectionsStr, KeyRangesStr, ShardingRules, ClientsStr, StatusStr, DistributionsStr, VersionStr, RelationsStr, TaskGroupStr, PreparedStatementsStr, QuantilesStr: $$ = v default: $$ = UnsupportedStr