Skip to content

Commit

Permalink
Merge pull request #86 from vinted/receive_fixez
Browse files Browse the repository at this point in the history
receive/handler: properly set max buffered responses, fix metrics
  • Loading branch information
GiedriusS authored Feb 8, 2024
2 parents 0ad442f + 538a309 commit 0840b65
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 14 deletions.
76 changes: 63 additions & 13 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}

responseStatusCode := http.StatusOK
if err := h.handleRequest(ctx, rep, tenantHTTP, &wreq); err != nil {
tenantStats, err := h.handleRequest(ctx, rep, tenantHTTP, &wreq)
if err != nil {
level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error())
switch errors.Cause(err) {
case errNotReady:
Expand All @@ -593,11 +594,21 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
}
http.Error(w, err.Error(), responseStatusCode)
}
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenantHTTP).Observe(float64(len(wreq.Timeseries)))
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenantHTTP).Observe(float64(totalSamples))

for tenant, stats := range tenantStats {
h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(stats.timeseries))
h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(stats.totalSamples))
}
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenantHTTP string, wreq *prompb.WriteRequest) error {
type requestStats struct {
timeseries int
totalSamples int
}

type tenantRequestStats map[string]requestStats

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenantHTTP string, wreq *prompb.WriteRequest) (tenantRequestStats, error) {
tLogger := log.With(h.logger, "tenantHTTP", tenantHTTP)

// This replica value is used to detect cycles in cyclic topologies.
Expand All @@ -613,7 +624,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenantHTTP stri
if rep > h.options.ReplicationFactor {
level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected",
"request_replica", rep, "replication_factor", h.options.ReplicationFactor)
return errBadReplica
return tenantRequestStats{}, errBadReplica
}

r := replica{n: rep, replicated: rep != 0}
Expand All @@ -637,7 +648,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenantHTTP stri
// unless the request needs to be replicated.
// The function only returns when all requests have finished
// or the context is canceled.
func (h *Handler) forward(ctx context.Context, tenantHTTP string, r replica, wreq *prompb.WriteRequest) error {
func (h *Handler) forward(ctx context.Context, tenantHTTP string, r replica, wreq *prompb.WriteRequest) (tenantRequestStats, error) {
span, ctx := tracing.StartSpan(ctx, "receive_fanout_forward")
defer span.Finish()

Expand Down Expand Up @@ -667,10 +678,43 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) error {
func (h *Handler) gatherWriteStats(writes ...map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for _, write := range writes {
for er := range write {
for tenant, series := range write[er] {
samples := 0

for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}

if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples

stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
}
}
}
}

return stats

}

func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (tenantRequestStats, error) {
ctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), ctx), h.options.ForwardTimeout)

var writeErrors writeErrors
var stats tenantRequestStats = make(tenantRequestStats)

defer func() {
if writeErrors.ErrOrNil() != nil {
// NOTICE: The cancel function is not used on all paths intentionally,
Expand All @@ -689,12 +733,18 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) e
localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenantHTTP, params.replicas, params.writeRequest.Timeseries)
if err != nil {
level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err)
return err
return stats, err
}

stats = h.gatherWriteStats(localWrites, remoteWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
maxBufferedResponses := len(localWrites) + len(remoteWrites)
maxBufferedResponses := len(localWrites)
for er := range remoteWrites {
maxBufferedResponses += len(remoteWrites[er])
}

responses := make(chan writeResponse, maxBufferedResponses)
wg := sync.WaitGroup{}

Expand Down Expand Up @@ -726,13 +776,13 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) e
for {
select {
case <-ctx.Done():
return ctx.Err()
return stats, ctx.Err()
case resp, hasMore := <-responses:
if !hasMore {
for _, seriesErr := range seriesErrs {
writeErrors.Add(seriesErr)
}
return writeErrors.ErrOrNil()
return stats, writeErrors.ErrOrNil()
}

if resp.err != nil {
Expand All @@ -748,7 +798,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) e
successes[seriesID]++
}
if quorumReached(successes, quorum) {
return nil
return stats, nil
}
}
}
Expand Down Expand Up @@ -937,7 +987,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st
span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ func TestHandlerFlippingHashrings(t *testing.T) {
return
}

err := h.handleRequest(ctx, 0, "test", &prompb.WriteRequest{
_, err := h.handleRequest(ctx, 0, "test", &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")),
Expand Down

0 comments on commit 0840b65

Please sign in to comment.