Skip to content

Commit

Permalink
New ring state READONLY to be used by ingesters (cortexproject#6163)
Browse files Browse the repository at this point in the history
* New READONLY state on ring

Signed-off-by: Daniel Deluiggi <[email protected]>

* Changelog

Signed-off-by: Daniel Deluiggi <[email protected]>

* remove white space

Signed-off-by: Daniel Deluiggi <[email protected]>

* Update changelog

Signed-off-by: Daniel Deluiggi <[email protected]>

* Add new tests. Change api endpoints

Signed-off-by: Daniel Deluiggi <[email protected]>

* Remove cross-site scripting. Add tests

Signed-off-by: Daniel Deluiggi <[email protected]>

* Remove unRegister on shutdown

Signed-off-by: Daniel Deluiggi <[email protected]>

* Add api endpoint

Signed-off-by: Daniel Deluiggi <[email protected]>

---------

Signed-off-by: Daniel Deluiggi <[email protected]>
Signed-off-by: Daniel Blando <[email protected]>
  • Loading branch information
danielblando authored Sep 3, 2024
1 parent 39a168d commit 15ad4de
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182

Expand Down
10 changes: 10 additions & 0 deletions docs/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ For the sake of clarity, in this document we have grouped API endpoints by servi
| [Flush blocks](#flush-blocks) | Ingester || `GET,POST /ingester/flush` |
| [Shutdown](#shutdown) | Ingester || `GET,POST /ingester/shutdown` |
| [Ingesters ring status](#ingesters-ring-status) | Ingester || `GET /ingester/ring` |
| [Ingester mode](#ingester-mode) | Ingester || `GET,POST /ingester/mode` |
| [Instant query](#instant-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query` |
| [Range query](#range-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query_range` |
| [Exemplar query](#exemplar-query) | Querier, Query-frontend || `GET,POST <prometheus-http-prefix>/api/v1/query_exemplars` |
Expand Down Expand Up @@ -296,6 +297,15 @@ GET /ring

Displays a web page with the ingesters hash ring status, including the state, healthy and last heartbeat time of each ingester.

### Ingester mode

```
GET,POST /ingester/mode
```
Change ingester mode between ACTIVE or READONLY. READONLY ingester does not receive push requests and will only be called for query operations.

The endpoint accept query param `mode` or POST as `application/x-www-form-urlencoded` with mode type.


## Querier / Query-frontend

Expand Down
4 changes: 4 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ type Ingester interface {
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
ModeHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}

Expand All @@ -294,9 +295,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
a.indexPage.AddLink(SectionDangerous, "/ingester/mode?mode=READONLY", "Set Ingester to READONLY mode")
a.indexPage.AddLink(SectionDangerous, "/ingester/mode?mode=ACTIVE", "Set Ingester to ACTIVE mode")
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
Expand Down
56 changes: 56 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"html"
"io"
"math"
"net/http"
Expand Down Expand Up @@ -2881,6 +2882,61 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ModeHandler Change mode of ingester. It will also update set unregisterOnShutdown to true if READONLY mode
func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
respMsg := "failed to parse HTTP request in mode handler"
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg, "err", err)
w.WriteHeader(http.StatusBadRequest)
// We ignore errors here, because we cannot do anything about them.
_, _ = w.Write([]byte(respMsg))
return
}

currentState := i.lifecycler.GetState()
reqMode := strings.ToUpper(r.Form.Get("mode"))
switch reqMode {
case "READONLY":
if currentState != ring.READONLY {
err = i.lifecycler.ChangeState(r.Context(), ring.READONLY)
if err != nil {
respMsg := fmt.Sprintf("failed to change state: %s", err)
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
w.WriteHeader(http.StatusBadRequest)
// We ignore errors here, because we cannot do anything about them.
_, _ = w.Write([]byte(respMsg))
return
}
}
case "ACTIVE":
if currentState != ring.ACTIVE {
err = i.lifecycler.ChangeState(r.Context(), ring.ACTIVE)
if err != nil {
respMsg := fmt.Sprintf("failed to change state: %s", err)
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
w.WriteHeader(http.StatusBadRequest)
// We ignore errors here, because we cannot do anything about them.
_, _ = w.Write([]byte(respMsg))
return
}
}
default:
respMsg := fmt.Sprintf("invalid mode input: %s", html.EscapeString(reqMode))
level.Warn(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
w.WriteHeader(http.StatusBadRequest)
// We ignore errors here, because we cannot do anything about them.
_, _ = w.Write([]byte(respMsg))
return
}

respMsg := fmt.Sprintf("Ingester mode %s", i.lifecycler.GetState())
level.Info(logutil.WithContext(r.Context(), i.logger)).Log("msg", respMsg)
w.WriteHeader(http.StatusOK)
// We ignore errors here, because we cannot do anything about them.
_, _ = w.Write([]byte(respMsg))
}

// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
if queryIngestersWithin > 0 {
Expand Down
111 changes: 111 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5107,6 +5107,117 @@ func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest
return cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API)
}

func Test_Ingester_ModeHandler(t *testing.T) {
tests := map[string]struct {
method string
requestBody io.Reader
requestUrl string
initialState ring.InstanceState
mode string
expectedState ring.InstanceState
expectedResponse int
}{
"should change to READONLY mode": {
method: "POST",
initialState: ring.ACTIVE,
requestUrl: "/mode?mode=reAdOnLy",
expectedState: ring.READONLY,
expectedResponse: http.StatusOK,
},
"should change mode on GET method": {
method: "GET",
initialState: ring.ACTIVE,
requestUrl: "/mode?mode=READONLY",
expectedState: ring.READONLY,
expectedResponse: http.StatusOK,
},
"should change mode on POST method via body": {
method: "POST",
initialState: ring.ACTIVE,
requestUrl: "/mode",
requestBody: strings.NewReader("mode=readonly"),
expectedState: ring.READONLY,
expectedResponse: http.StatusOK,
},
"should change to ACTIVE mode": {
method: "POST",
initialState: ring.READONLY,
requestUrl: "/mode?mode=active",
expectedState: ring.ACTIVE,
expectedResponse: http.StatusOK,
},
"should fail to unknown mode": {
method: "POST",
initialState: ring.ACTIVE,
requestUrl: "/mode?mode=NotSupported",
expectedState: ring.ACTIVE,
expectedResponse: http.StatusBadRequest,
},
"should maintain in readonly": {
method: "POST",
initialState: ring.READONLY,
requestUrl: "/mode?mode=READONLY",
expectedState: ring.READONLY,
expectedResponse: http.StatusOK,
},
"should maintain in active": {
method: "POST",
initialState: ring.ACTIVE,
requestUrl: "/mode?mode=ACTIVE",
expectedState: ring.ACTIVE,
expectedResponse: http.StatusOK,
},
"should fail mode READONLY if LEAVING state": {
method: "POST",
initialState: ring.LEAVING,
requestUrl: "/mode?mode=READONLY",
expectedState: ring.LEAVING,
expectedResponse: http.StatusBadRequest,
},
"should fail with malformatted request": {
method: "GET",
initialState: ring.ACTIVE,
requestUrl: "/mode?mod;e=READONLY",
expectedResponse: http.StatusBadRequest,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

if testData.initialState != ring.ACTIVE {
err = i.lifecycler.ChangeState(context.Background(), testData.initialState)
require.NoError(t, err)

// Wait until initial state
test.Poll(t, 1*time.Second, testData.initialState, func() interface{} {
return i.lifecycler.GetState()
})
}

response := httptest.NewRecorder()
request := httptest.NewRequest(testData.method, testData.requestUrl, testData.requestBody)
if testData.requestBody != nil {
request.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
}
i.ModeHandler(response, request)

require.Equal(t, testData.expectedResponse, response.Code)
require.Equal(t, testData.expectedState, i.lifecycler.GetState())
})
}
}

// mockTenantLimits exposes per-tenant limits based on a provided map
type mockTenantLimits struct {
limits map[string]*validation.Limits
Expand Down
5 changes: 4 additions & 1 deletion pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,10 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error
(currState == JOINING && state == PENDING) || // triggered by TransferChunks on failure
(currState == JOINING && state == ACTIVE) || // triggered by TransferChunks on success
(currState == PENDING && state == ACTIVE) || // triggered by autoJoin
(currState == ACTIVE && state == LEAVING)) { // triggered by shutdown
(currState == ACTIVE && state == LEAVING) || // triggered by shutdown
(currState == ACTIVE && state == READONLY) || // triggered by ingester mode
(currState == READONLY && state == ACTIVE) || // triggered by ingester mode
(currState == READONLY && state == LEAVING)) { // triggered by shutdown
return fmt.Errorf("Changing instance state from %v -> %v is disallowed", currState, state)
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,17 @@ var (
})

// WriteNoExtend is like Write, but with no replicaset extension.
WriteNoExtend = NewOp([]InstanceState{ACTIVE}, nil)
WriteNoExtend = NewOp([]InstanceState{ACTIVE}, func(s InstanceState) bool {
// We want to skip instances that are READONLY. So we will increase the size of replication
// for the key
return s == READONLY
})

// Read operation that extends the replica set if an instance is not ACTIVE, LEAVING OR JOINING
Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool {
// Read operation that extends the replica set if an instance is not ACTIVE, PENDING, LEAVING, JOINING OR READONLY
Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING, READONLY}, func(s InstanceState) bool {
// To match Write with extended replica set we have to also increase the
// size of the replica set for Read, but we can read from LEAVING ingesters.
return s != ACTIVE && s != LEAVING && s != JOINING
return s != ACTIVE && s != LEAVING && s != JOINING && s != READONLY
})

// Reporting is a special value for inquiring about health.
Expand Down Expand Up @@ -661,7 +665,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
oldestTimestampByState := map[string]int64{}

// Initialized to zero so we emit zero-metrics (instead of not emitting anything)
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} {
for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String(), READONLY.String()} {
numByState[s] = 0
oldestTimestampByState[s] = 0
}
Expand Down Expand Up @@ -995,7 +999,7 @@ func NewOp(healthyStates []InstanceState, shouldExtendReplicaSet func(s Instance
}

if shouldExtendReplicaSet != nil {
for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEFT} {
for _, s := range []InstanceState{ACTIVE, LEAVING, PENDING, JOINING, LEFT, READONLY} {
if shouldExtendReplicaSet(s) {
op |= (0x10000 << s)
}
Expand Down
70 changes: 37 additions & 33 deletions pkg/ring/ring.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/ring/ring.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ enum InstanceState {
// This state is only used by gossiping code to distribute information about
// instances that have been removed from the ring. Ring users should not use it directly.
LEFT = 4;

READONLY= 5;
}
Loading

0 comments on commit 15ad4de

Please sign in to comment.