Skip to content

Commit

Permalink
Add users stats http api to ingester
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Deluiggi <[email protected]>
  • Loading branch information
danielblando committed Aug 27, 2024
1 parent 36b59b4 commit ed37486
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 83 deletions.
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ type Ingester interface {
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
AllUserStatsHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}

Expand All @@ -297,6 +298,7 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
s.RuleIngestionRate += u.Data.RuleIngestionRate
s.NumSeries += u.Data.NumSeries
s.ActiveSeries += u.Data.ActiveSeries
s.LoadBlocks += u.Data.LoadBlocks
perUserTotals[u.UserId] = s
}
}
Expand All @@ -1404,6 +1405,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
RuleIngestionRate: stats.RuleIngestionRate,
NumSeries: stats.NumSeries,
ActiveSeries: stats.ActiveSeries,
LoadBlocks: stats.LoadBlocks,
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/http_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const tpl = `
{{ range .Stats }}
<tr>
<td>{{ .UserID }}</td>
<td align='right'>{{ .UserStats.LoadBlocks }}</td>
<td align='right'>{{ .UserStats.NumSeries }}</td>
<td align='right'>{{ .UserStats.ActiveSeries }}</td>
<td align='right'>{{ printf "%.2f" .UserStats.IngestionRate }}</td>
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type UserStats struct {
APIIngestionRate float64 `json:"APIIngestionRate"`
RuleIngestionRate float64 `json:"RuleIngestionRate"`
ActiveSeries uint64 `json:"activeSeries"`
LoadBlocks uint64 `json:"loadBlocks"`
}

// UserStatsHandler handles user stats to the Distributor.
Expand Down
208 changes: 125 additions & 83 deletions pkg/ingester/client/ingester.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ingester/client/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ message UserStatsResponse {
double api_ingestion_rate = 3;
double rule_ingestion_rate = 4;
uint64 active_series = 5;
uint64 load_blocks = 6;
}

message UserIDStatsResponse {
Expand Down
15 changes: 15 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,20 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)
return createUserStats(db, i.cfg.ActiveSeriesMetricsEnabled), nil
}

// AllUserStatsHandler returns ingestion statistics for all users known to this ingester.
func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
var stats, err = i.AllUserStats(r.Context(), &client.UserStatsRequest{})
if err != nil {
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", "failed to retrieve data for all user stats handler", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusOK)
// We ignore errors here, because we cannot do anything about them.
util.WriteJSONResponse(w, stats)
}

// AllUserStats returns ingestion statistics for all users known to this ingester.
func (i *Ingester) AllUserStats(_ context.Context, _ *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
if err := i.checkRunning(); err != nil {
Expand Down Expand Up @@ -1845,6 +1859,7 @@ func createUserStats(db *userTSDB, activeSeriesMetricsEnabled bool) *client.User
RuleIngestionRate: ruleRate,
NumSeries: db.Head().NumSeries(),
ActiveSeries: activeSeries,
LoadBlocks: uint64(len(db.Blocks())),
}
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4248,6 +4248,7 @@ func Test_Ingester_AllUserStats(t *testing.T) {
ApiIngestionRate: 0.2,
RuleIngestionRate: 0,
ActiveSeries: 3,
LoadBlocks: 0,
},
},
{
Expand All @@ -4258,12 +4259,92 @@ func Test_Ingester_AllUserStats(t *testing.T) {
ApiIngestionRate: 0.13333333333333333,
RuleIngestionRate: 0,
ActiveSeries: 2,
LoadBlocks: 0,
},
},
}
assert.ElementsMatch(t, expect, res.Stats)
}

func Test_Ingester_AllUserStatsHandler(t *testing.T) {
for i := 0; i < 1000; i++ {
series := []struct {
user string
lbls labels.Labels
value float64
timestamp int64
}{
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000},
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000},
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000},
}

// Create ingester
i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})
for _, series := range series {
ctx := user.InjectOrgID(context.Background(), series.user)
req, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp)
_, err := i.Push(ctx, req)
require.NoError(t, err)
}

// Force compaction to test loaded blocks
compactionCallbackCh := make(chan struct{})
i.TSDBState.forceCompactTrigger <- requestWithUsersAndCallback{users: nil, callback: compactionCallbackCh}
<-compactionCallbackCh

// force update statistics
for _, db := range i.TSDBState.dbs {
db.ingestedAPISamples.Tick()
db.ingestedRuleSamples.Tick()
}

// Get label names
response := httptest.NewRecorder()
request := httptest.NewRequest("GET", "/all_user_stats", nil)
i.AllUserStatsHandler(response, request)
var resp *client.UsersStatsResponse
err = json.Unmarshal(response.Body.Bytes(), &resp)
require.NoError(t, err)

expect := []*client.UserIDStatsResponse{
{
UserId: "user-1",
Data: &client.UserStatsResponse{
IngestionRate: 0.2,
NumSeries: 0,
ApiIngestionRate: 0.2,
RuleIngestionRate: 0,
ActiveSeries: 3,
LoadBlocks: 1,
},
},
{
UserId: "user-2",
Data: &client.UserStatsResponse{
IngestionRate: 0.13333333333333333,
NumSeries: 0,
ApiIngestionRate: 0.13333333333333333,
RuleIngestionRate: 0,
ActiveSeries: 2,
LoadBlocks: 1,
},
},
}
assert.ElementsMatch(t, expect, resp.Stats)
}
}

func TestIngesterCompactIdleBlock(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0
Expand Down

0 comments on commit ed37486

Please sign in to comment.