diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 62308285fd..2a0f82da61 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -789,7 +789,13 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( } requestLogger := log.With(h.logger, logTags...) - localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries) + var localWrites, remoteWrites map[endpointReplica]map[string]trackedSeries + var err error + if h.receiverMode == IngestorOnly { + localWrites, remoteWrites, err = h.distributeTimeseriesToReplicasIngestorOnly(params.tenant, params.replicas, params.writeRequest.Timeseries) + } else { + localWrites, remoteWrites, err = h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries) + } if err != nil { level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err) return stats, err @@ -863,6 +869,33 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( } } +func printMap(data map[endpointReplica]map[string]trackedSeries) { + for key, innerMap := range data { + fmt.Printf("Endpoint: %s, CapNProtoAddress: %s, AZ: %s, Replica: %d\n", key.endpoint.Address, key.endpoint.CapNProtoAddress, key.endpoint.AZ, key.replica) + for strKey, series := range innerMap { + fmt.Printf(" Key: %s\n", strKey) + fmt.Printf(" SeriesIDs: %v\n", series.seriesIDs) + fmt.Printf(" TimeSeries length: %d\n", len(series.timeSeries)) + } + } +} + +func (h *Handler) distributeTimeseriesToReplicasIngestorOnly(tenantHTTP string, replicas []uint64, timeseries []prompb.TimeSeries) (map[endpointReplica]map[string]trackedSeries, map[endpointReplica]map[string]trackedSeries, error) { + remoteWrites := make(map[endpointReplica]map[string]trackedSeries) + localWrites := make(map[endpointReplica]map[string]trackedSeries, len(replicas)) + for _, rn := range replicas { + endpointReplica := endpointReplica{endpoint: Endpoint{Address: h.options.Endpoint, CapNProtoAddress: h.options.Endpoint}, replica: rn} + seriesids := make([]int, len(timeseries), len(timeseries)) + for i := range timeseries { + seriesids[i] = i + } + localWrites[endpointReplica] = map[string]trackedSeries{tenantHTTP: {seriesIDs: seriesids, timeSeries: timeseries}} + } + fmt.Println("localWrites (IngestorOnly):") + printMap(localWrites) + return localWrites, remoteWrites, nil +} + // distributeTimeseriesToReplicas distributes the given timeseries from the tenant to different endpoints in a manner // that achieves the replication factor indicated by replicas. // The first return value are the series that should be written to the local node. The second return value are the @@ -928,6 +961,10 @@ func (h *Handler) distributeTimeseriesToReplicas( if h.receiverMode == IngestorOnly && len(remoteWrites) > 0 { panic("ingestor only mode should not have any remote writes") } + fmt.Println("localWrites (both):") + printMap(localWrites) + fmt.Println("remoteWrites (both):") + printMap(remoteWrites) return localWrites, remoteWrites, nil }