Skip to content

Commit

Permalink
Add common query time quantiles && 'SHOW time_quantiles' query proces…
Browse files Browse the repository at this point in the history
…sing (#767)

* Enable total time quantiles collection

* Add 'SHOW time_quantiles' command processing

* Add test for 'SHOW time_quantiles'

* Fix yacc processing of 'SHOW time_quantiles'

* Fixes
  • Loading branch information
EinKrebs authored Sep 12, 2024
1 parent 8796330 commit 6f7dc6d
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 52 deletions.
28 changes: 28 additions & 0 deletions pkg/clientinteractor/interactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,34 @@ func (pi *PSQLInteractor) Version(_ context.Context) error {
return pi.CompleteMsg(0)
}

// Quantiles sends the row description message for total time quantiles of queries in router and in shard.
//
// Parameters:
// - _ (context.Context): The context parameter (not used in the function).
//
// Returns:
// - error: An error if sending the messages fails, otherwise nil.
//
// TODO: unit tests
func (pi *PSQLInteractor) Quantiles(_ context.Context) error {
if err := pi.WriteHeader("quantile_type", "value"); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
return err
}

quantiles := statistics.GetQuantiles()
for _, q := range *quantiles {
if err := pi.WriteDataRow(fmt.Sprintf("router_time_%.2f", q), fmt.Sprintf("%.2fms", statistics.GetTotalTimeQuantile(statistics.Router, q))); err != nil {
return err
}
if err := pi.WriteDataRow(fmt.Sprintf("shard_time_%.2f", q), fmt.Sprintf("%.2fms", statistics.GetTotalTimeQuantile(statistics.Shard, q))); err != nil {
return err
}
}

return pi.CompleteMsg(len(*quantiles) * 2)
}

// TODO : unit tests

// AddShard sends the row description message for adding a data shard, followed by a data row
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ func ProcessShow(ctx context.Context, stmt *spqrparser.Show, mngr EntityMgr, ci
}

return cli.PreparedStatements(ctx, resp)
case spqrparser.QuantilesStr:
return cli.Quantiles(ctx)
default:
return unknownCoordinatorCommand
}
Expand Down
67 changes: 55 additions & 12 deletions router/statistics/query_time_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@ type startTimes struct {
type statistics struct {
RouterTime map[uint]*tdigest.TDigest
ShardTime map[uint]*tdigest.TDigest
RouterTimeTotal *tdigest.TDigest
ShardTimeTotal *tdigest.TDigest
TimeData map[uint]*startTimes
Quantiles []float64
NeedToCollectData bool
lock sync.RWMutex
}

var queryStatistics = statistics{
RouterTime: make(map[uint]*tdigest.TDigest),
ShardTime: make(map[uint]*tdigest.TDigest),
TimeData: make(map[uint]*startTimes),
lock: sync.RWMutex{},
RouterTime: make(map[uint]*tdigest.TDigest),
ShardTime: make(map[uint]*tdigest.TDigest),
RouterTimeTotal: nil,
ShardTimeTotal: nil,
TimeData: make(map[uint]*startTimes),
lock: sync.RWMutex{},
}

func InitStatistics(q []float64) {
Expand All @@ -44,19 +48,22 @@ func InitStatistics(q []float64) {
} else {
queryStatistics.NeedToCollectData = true
}

queryStatistics.RouterTimeTotal, _ = tdigest.New()
queryStatistics.ShardTimeTotal, _ = tdigest.New()
}

func GetQuantiles() *[]float64 {
return &queryStatistics.Quantiles
}

func GetTimeQuantile(tip StatisticsType, q float64, client uint) float64 {
func GetTimeQuantile(statType StatisticsType, q float64, client uint) float64 {
queryStatistics.lock.Lock()
defer queryStatistics.lock.Unlock()

var stat *tdigest.TDigest

switch tip {
switch statType {
case Router:
stat = queryStatistics.RouterTime[client]
if stat == nil {
Expand All @@ -74,7 +81,27 @@ func GetTimeQuantile(tip StatisticsType, q float64, client uint) float64 {
}
}

func RecordStartTime(tip StatisticsType, t time.Time, client uint) {
func GetTotalTimeQuantile(statType StatisticsType, q float64) float64 {
queryStatistics.lock.Lock()
defer queryStatistics.lock.Unlock()

switch statType {
case Router:
if queryStatistics.RouterTimeTotal == nil || queryStatistics.RouterTimeTotal.Count() == 0 {
return 0
}
return queryStatistics.RouterTimeTotal.Quantile(q)
case Shard:
if queryStatistics.ShardTimeTotal == nil || queryStatistics.ShardTimeTotal.Count() == 0 {
return 0
}
return queryStatistics.ShardTimeTotal.Quantile(q)
default:
return 0
}
}

func RecordStartTime(statType StatisticsType, t time.Time, client uint) {
if queryStatistics.NeedToCollectData {
return
}
Expand All @@ -85,7 +112,7 @@ func RecordStartTime(tip StatisticsType, t time.Time, client uint) {
if queryStatistics.TimeData[client] == nil {
queryStatistics.TimeData[client] = &startTimes{}
}
switch tip {
switch statType {
case Router:
queryStatistics.TimeData[client].RouterStart = t
case Shard:
Expand All @@ -107,12 +134,28 @@ func RecordFinishedTransaction(t time.Time, client uint) {
if queryStatistics.ShardTime[client] == nil {
queryStatistics.ShardTime[client], _ = tdigest.New()
}
err := queryStatistics.RouterTime[client].Add(float64(t.Sub(queryStatistics.TimeData[client].RouterStart).Microseconds()) / 1000)
if queryStatistics.RouterTimeTotal == nil {
queryStatistics.RouterTimeTotal, _ = tdigest.New()
}
if queryStatistics.ShardTimeTotal == nil {
queryStatistics.ShardTimeTotal, _ = tdigest.New()
}
routerTime := float64(t.Sub(queryStatistics.TimeData[client].RouterStart).Microseconds()) / 1000
shardTime := float64(t.Sub(queryStatistics.TimeData[client].ShardStart).Microseconds()) / 1000
err := queryStatistics.RouterTime[client].Add(routerTime)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration")
}
err = queryStatistics.ShardTime[client].Add(shardTime)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration")
}
err = queryStatistics.RouterTimeTotal.Add(routerTime)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg(err.Error())
spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration")
}
err = queryStatistics.ShardTime[client].Add(float64(t.Sub(queryStatistics.TimeData[client].ShardStart).Microseconds()) / 1000)
err = queryStatistics.ShardTimeTotal.Add(shardTime)
if err != nil {
spqrlog.Zero.Error().Err(err).Msg(err.Error())
spqrlog.Zero.Error().Err(err).Msg("Failed to record transaction duration")
}
}
9 changes: 9 additions & 0 deletions router/statistics/qury_time_statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func TestStatisticsForOneUser(t *testing.T) {

assert.Equal(4.0, statistics.GetTimeQuantile(statistics.Router, 0.5, 144))
assert.Equal(3.0, statistics.GetTimeQuantile(statistics.Shard, 0.5, 144))

assert.Equal(4.0, statistics.GetTotalTimeQuantile(statistics.Router, 0.5))
assert.Equal(3.0, statistics.GetTotalTimeQuantile(statistics.Shard, 0.5))
}

func TestStatisticsForDifferentUsers(t *testing.T) {
Expand Down Expand Up @@ -62,6 +65,9 @@ func TestStatisticsForDifferentUsers(t *testing.T) {

assert.Equal(7.0, statistics.GetTimeQuantile(statistics.Router, 0.5, 229))
assert.Equal(1.5, statistics.GetTimeQuantile(statistics.Shard, 0.5, 229))

assert.Equal(5.0, statistics.GetTotalTimeQuantile(statistics.Router, 0.5))
assert.Equal(1.0, statistics.GetTotalTimeQuantile(statistics.Shard, 0.5))
}

func TestNoStatisticsForMisingUser(t *testing.T) {
Expand All @@ -83,6 +89,7 @@ func TestNoStatisticsWhenNotNeeded(t *testing.T) {
statistics.RecordFinishedTransaction(tim.Add(time.Millisecond*2), 149)

assert.Equal(0.0, statistics.GetTimeQuantile(statistics.Router, 0.5, 149))
assert.Equal(0.0, statistics.GetTotalTimeQuantile(statistics.Router, 0.5))
}

func TestCheckMultithreading(t *testing.T) {
Expand All @@ -102,6 +109,8 @@ func TestCheckMultithreading(t *testing.T) {

statistics.GetTimeQuantile(statistics.Router, 0.5, 149)
statistics.GetTimeQuantile(statistics.Router, 0.99, 149)
statistics.GetTotalTimeQuantile(statistics.Router, 0.99)
statistics.GetTotalTimeQuantile(statistics.Router, 0.99)
}
wg.Done()
}()
Expand Down
1 change: 1 addition & 0 deletions test/regress/schedule/console
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ test: show_key_ranges
test: show_distributions
test: show_version
test: show_relations
test: show_time_quantiles
test: drop
test: add
test: hash
Expand Down
14 changes: 14 additions & 0 deletions test/regress/tests/console/expected/show_time_quantiles.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

SPQR router admin console
Here you can configure your routing rules
------------------------------------------------
You can find documentation here
https://github.com/pg-sharding/spqr/tree/master/docs

SHOW time_quantiles
quantile_type | value
------------------+--------
router_time_0.75 | 0.00ms
shard_time_0.75 | 0.00ms
(2 rows)

1 change: 1 addition & 0 deletions test/regress/tests/console/sql/show_time_quantiles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SHOW time_quantiles
1 change: 1 addition & 0 deletions yacc/console/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ const (
TaskGroupStr = "task_group"
PreparedStatementsStr = "prepared_statements"
UnsupportedStr = "unsupported"
QuantilesStr = "time_quantiles"
)

const (
Expand Down
Loading

0 comments on commit 6f7dc6d

Please sign in to comment.