From 7a562040c5a6bcc204bf685d6f2457d76b7a2210 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Mon, 17 Oct 2022 08:34:00 -0400 Subject: [PATCH 1/6] Add reservation system for metrics This change adds a new reservation system for metric batching. The reservation system forms a queue of metric batchers for copiers to pick up. The queue is ordered by the time a request hits Promscale. That ordering allows copiers to combine a request together better. This solves a few problems: 1. Previously the ordering of reservations was based on a channel and the ordering was based on when the batcher first made the reservation. While this may correlate with the time the request hit the connector, this ordering is less exact to what we actually want. This solves that problem by ordering explicitly on request start time. 2. It allows for Peeking into info associated with the first reservation. That is not possible with a channel-based approach and is used here for ensuring that we don't stop batching until a min duration havs passed since the oldest request in the batch hit the system. That operates as an alternative backpressure mechanism to the regular mechanism of copiers becoming available. While in most cases it shouldn't be necessary, because of the sharding adjustment Prometheus does, it can be useful in some cases. Note that the minimum latency is measured from the time the request hits the connector and NOT the time taken in the batcher. This better controls the latency penalty of this alternate backpressure system. Peeking can also be used to have more advanced logic for batch sizing. For example, we may want to batch based on number of samples. Without peeking, you'd need to pop the queue before learning the number of samples, in which case it is too late. --- pkg/api/router.go | 3 + pkg/pgmodel/ingestor/copier.go | 44 +++--- pkg/pgmodel/ingestor/dispatcher.go | 48 +++--- pkg/pgmodel/ingestor/ingestor.go | 1 + pkg/pgmodel/ingestor/ingestor_sql_test.go | 16 +- pkg/pgmodel/ingestor/metric_batcher.go | 33 ++-- pkg/pgmodel/ingestor/metric_batcher_test.go | 18 ++- pkg/pgmodel/ingestor/reservation.go | 167 ++++++++++++++++++++ pkg/pgmodel/model/sql_test_utils.go | 3 + pkg/psctx/psctx.go | 27 ++++ 10 files changed, 298 insertions(+), 62 deletions(-) create mode 100644 pkg/pgmodel/ingestor/reservation.go create mode 100644 pkg/psctx/psctx.go diff --git a/pkg/api/router.go b/pkg/api/router.go index 42dba3f007..5725befa11 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -24,6 +24,7 @@ import ( "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgclient" pgMetrics "github.com/timescale/promscale/pkg/pgmodel/metrics" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/query" "github.com/timescale/promscale/pkg/telemetry" ) @@ -167,6 +168,8 @@ func withWarnLog(msg string, handler http.Handler) http.HandlerFunc { func timeHandler(histogramVec prometheus.ObserverVec, path string, handler http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { start := time.Now() + ctx := psctx.WithStartTime(r.Context(), start) + r = r.WithContext(ctx) handler.ServeHTTP(w, r) elapsedMs := time.Since(start).Milliseconds() histogramVec.WithLabelValues(path).Observe(float64(elapsedMs)) diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 7337a466d7..5c6fa130fc 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -72,9 +72,13 @@ func (reqs copyBatch) VisitExemplar(callBack func(info *pgmodel.MetricInfo, s *p return nil } +type readRequest struct { + copySender <-chan copyRequest +} + // Handles actual insertion into the DB. // We have one of these per connection reserved for insertion. -func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf *ExemplarLabelFormatter) { +func runCopier(conn pgxconn.PgxConn, sw *seriesWriter, elf *ExemplarLabelFormatter, reservationQ *ReservationQueue) { requestBatch := make([]readRequest, 0, metrics.MaxInsertStmtPerTxn) insertBatch := make([]copyRequest, 0, cap(requestBatch)) for { @@ -90,7 +94,7 @@ func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf // the fact that we fetch the entire batch before executing any of the // reads. This guarantees that we never need to batch the same metrics // together in the copier. - requestBatch, ok = copierGetBatch(ctx, requestBatch, in) + requestBatch, ok = copierGetBatch(ctx, requestBatch, reservationQ) if !ok { span.End() return @@ -157,7 +161,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e return nil } -func copierGetBatch(ctx context.Context, batch []readRequest, in <-chan readRequest) ([]readRequest, bool) { +func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *ReservationQueue) ([]readRequest, bool) { _, span := tracer.Default().Start(ctx, "get-batch") defer span.End() //This mutex is not for safety, but rather for better batching. @@ -173,27 +177,27 @@ func copierGetBatch(ctx context.Context, batch []readRequest, in <-chan readRequ span.AddEvent("Unlocking") }(span) - req, ok := <-in + startTime, ok := reservationQ.Peek() if !ok { return batch, false } - span.AddEvent("Appending first batch") - batch = append(batch, req) - - //we use a small timeout to prevent low-pressure systems from using up too many - //txns and putting pressure on system - timeout := time.After(20 * time.Millisecond) -hot_gather: - for len(batch) < cap(batch) { - select { - case r2 := <-in: - span.AddEvent("Appending batch") - batch = append(batch, r2) - case <-timeout: - span.AddEvent("Timeout appending batches") - break hot_gather - } + since := time.Since(startTime) + //TODO: make configurable in CLI + minDuration := 0 * time.Millisecond + + // Having a minimum batching duration can be useful if the system is using up too many txns or mxids. + // The prometheus remote-write dynamic sharding strategy should auto-adjust things to slow down writes + // in low-pressure environments but having a CLI-settable backstop can also be usefull in certain scenarios. + // Values that have previously been tested with good results: 50ms-250ms. + if since < minDuration { + span.AddEvent("Sleep waiting to batch") + time.Sleep(minDuration - since) } + + span.AddEvent("After sleep") + + batch, _ = reservationQ.PopOntoBatch(batch) + if len(batch) == cap(batch) { span.AddEvent("Batch is full") } diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index f0541db6d7..95cc2689f2 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -41,13 +41,14 @@ type pgxDispatcher struct { invertedLabelsCache *cache.InvertedLabelsCache exemplarKeyPosCache cache.PositionCache batchers sync.Map + batchersWG sync.WaitGroup completeMetricCreation chan struct{} asyncAcks bool - copierReadRequestCh chan<- readRequest seriesEpochRefresh *time.Ticker doneChannel chan struct{} closed *uber_atomic.Bool doneWG sync.WaitGroup + reservationQ *ReservationQueue } var _ model.Dispatcher = &pgxDispatcher{} @@ -59,12 +60,9 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac numCopiers = 1 } - // the copier read request channel retains the queue order between metrics - maxMetrics := 10000 - copierReadRequestCh := make(chan readRequest, maxMetrics) - - metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh))) - metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) }) + //TODO remove + //metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh))) + //metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) }) if cfg.IgnoreCompressedChunks { // Handle decompression to not decompress anything. @@ -82,9 +80,9 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac } sw := NewSeriesWriter(conn, labelArrayOID, labelsCache) elf := NewExamplarLabelFormatter(conn, eCache) - + reservationQ := NewReservationQueue() for i := 0; i < numCopiers; i++ { - go runCopier(conn, copierReadRequestCh, sw, elf) + go runCopier(conn, sw, elf, reservationQ) } inserter := &pgxDispatcher{ @@ -95,11 +93,11 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac exemplarKeyPosCache: eCache, completeMetricCreation: make(chan struct{}, 1), asyncAcks: cfg.MetricsAsyncAcks, - copierReadRequestCh: copierReadRequestCh, // set to run at half our deletion interval seriesEpochRefresh: time.NewTicker(30 * time.Minute), doneChannel: make(chan struct{}), closed: uber_atomic.NewBool(false), + reservationQ: reservationQ, } inserter.closed.Store(false) runBatchWatcher(inserter.doneChannel) @@ -110,7 +108,13 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac return nil, err } - go inserter.runCompleteMetricCreationWorker() + if !cfg.DisableMetricCreation { + inserter.doneWG.Add(1) + go func() { + defer inserter.doneWG.Done() + inserter.runCompleteMetricCreationWorker() + }() + } if !cfg.DisableEpochSync { inserter.doneWG.Add(1) @@ -204,7 +208,8 @@ func (p *pgxDispatcher) Close() { return true }) - close(p.copierReadRequestCh) + p.batchersWG.Wait() + p.reservationQ.Close() close(p.doneChannel) p.doneWG.Wait() } @@ -240,7 +245,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -321,7 +326,11 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques actual, old := p.batchers.LoadOrStore(metric, c) batcher = actual if !old { - go runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.copierReadRequestCh) + p.batchersWG.Add(1) + go func() { + defer p.batchersWG.Done() + runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.reservationQ) + }() } } ch := batcher.(chan *insertDataRequest) @@ -330,11 +339,12 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques } type insertDataRequest struct { - spanCtx trace.SpanContext - metric string - finished *sync.WaitGroup - data []model.Insertable - errChan chan error + requestCtx context.Context + spanCtx trace.SpanContext + metric string + finished *sync.WaitGroup + data []model.Insertable + errChan chan error } func (idr *insertDataRequest) reportResult(err error) { diff --git a/pkg/pgmodel/ingestor/ingestor.go b/pkg/pgmodel/ingestor/ingestor.go index 815eef2f2d..fa6bee14f9 100644 --- a/pkg/pgmodel/ingestor/ingestor.go +++ b/pkg/pgmodel/ingestor/ingestor.go @@ -28,6 +28,7 @@ type Cfg struct { TracesAsyncAcks bool NumCopiers int DisableEpochSync bool + DisableMetricCreation bool IgnoreCompressedChunks bool InvertedLabelsCacheSize uint64 TracesBatchTimeout time.Duration diff --git a/pkg/pgmodel/ingestor/ingestor_sql_test.go b/pkg/pgmodel/ingestor/ingestor_sql_test.go index cec19bff6f..42e48fb64f 100644 --- a/pkg/pgmodel/ingestor/ingestor_sql_test.go +++ b/pkg/pgmodel/ingestor/ingestor_sql_test.go @@ -536,10 +536,11 @@ func TestPGXInserterInsertData(t *testing.T) { } testCases := []struct { - name string - rows map[string][]model.Insertable - sqlQueries []model.SqlQuery - metricsGetErr error + name string + rows map[string][]model.Insertable + sqlQueries []model.SqlQuery + metricsGetErr error + disableMetricCreation bool }{ { name: "Zero data", @@ -810,7 +811,8 @@ func TestPGXInserterInsertData(t *testing.T) { model.NewPromSamples(makeLabel(), make([]prompb.Sample, 1)), }, }, - metricsGetErr: fmt.Errorf("some metrics error"), + metricsGetErr: fmt.Errorf("some metrics error"), + disableMetricCreation: true, sqlQueries: []model.SqlQuery{ {Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}}, {Sql: "SELECT 'prom_api.label_value_array'::regtype::oid", Results: model.RowResults{{uint32(435)}}}, @@ -821,7 +823,6 @@ func TestPGXInserterInsertData(t *testing.T) { Results: model.RowResults{{int64(1), "metric_0", true}}, Err: error(nil), }, - {Sql: "CALL _prom_catalog.finalize_metric_creation()"}, { Copy: &model.Copy{ Table: pgx.Identifier{"prom_data", "metric_0"}, @@ -916,7 +917,6 @@ func TestPGXInserterInsertData(t *testing.T) { t.Run(c.name, func(t *testing.T) { mock := model.NewSqlRecorder(c.sqlQueries, t) scache := cache.NewSeriesCache(cache.DefaultConfig, nil) - mockMetrics := &model.MockMetricCache{ MetricCache: make(map[string]model.MetricInfo), GetMetricErr: c.metricsGetErr, @@ -932,7 +932,7 @@ func TestPGXInserterInsertData(t *testing.T) { if err != nil { t.Fatalf("error setting up mock cache: %s", err.Error()) } - inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2}) + inserter, err := newPgxDispatcher(mock, mockMetrics, scache, nil, &Cfg{DisableEpochSync: true, InvertedLabelsCacheSize: 10, NumCopiers: 2, DisableMetricCreation: c.disableMetricCreation}) if err != nil { t.Fatal(err) } diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index f6153cef12..b528c10744 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "fmt" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -20,6 +21,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/tracer" ) @@ -35,10 +37,6 @@ func containsExemplars(data []model.Insertable) bool { return false } -type readRequest struct { - copySender <-chan copyRequest -} - func metricTableName(conn pgxconn.PgxConn, metric string) (info model.MetricInfo, possiblyNew bool, err error) { res, err := conn.Query( context.Background(), @@ -139,7 +137,7 @@ func runMetricBatcher(conn pgxconn.PgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames cache.MetricCache, - copierReadRequestCh chan<- readRequest, + reservationQ *ReservationQueue, ) { var ( info model.MetricInfo @@ -164,7 +162,7 @@ func runMetricBatcher(conn pgxconn.PgxConn, if !firstReqSet { return } - sendBatches(firstReq, input, conn, &info, copierReadRequestCh) + sendBatches(firstReq, input, conn, &info, reservationQ) } //the basic structure of communication from the batcher to the copier is as follows: @@ -181,12 +179,13 @@ func runMetricBatcher(conn pgxconn.PgxConn, // of requests consecutively to minimize processing delays. That's what the mutex in the copier does. // 2. There is an auto-adjusting adaptation loop in step 3. The longer the copier takes to catch up to the readRequest in the queue, the more things will be batched // 3. The batcher has only a single read request out at a time. -func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, conn pgxconn.PgxConn, info *model.MetricInfo, copierReadRequestCh chan<- readRequest) { +func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, conn pgxconn.PgxConn, info *model.MetricInfo, reservationQ *ReservationQueue) { var ( exemplarsInitialized = false span trace.Span ) + var reservation Reservation addReq := func(req *insertDataRequest, buf *pendingBuffer) { if !exemplarsInitialized && containsExemplars(req.data) { if err := initializeExemplars(conn, info.TableName); err != nil { @@ -205,19 +204,33 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con trace.WithAttributes(attribute.Int("insertable_count", len(req.data))), ) buf.addReq(req) + t, err := psctx.StartTime(req.requestCtx) + if err != nil { + log.Error("msg", err) + t = time.Time{} + } + reservation.Update(reservationQ, t) addSpan.End() } //This channel in synchronous (no buffering). This provides backpressure //to the batcher to keep batching until the copier is ready to read. copySender := make(chan copyRequest) defer close(copySender) - readRequest := readRequest{copySender: copySender} + + startReservation := func(req *insertDataRequest) { + t, err := psctx.StartTime(req.requestCtx) + if err != nil { + log.Error("msg", err) + t = time.Time{} + } + reservation = reservationQ.Add(copySender, t) + } pending := NewPendingBuffer() pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches") span.SetAttributes(attribute.String("metric", info.TableName)) + startReservation(firstReq) addReq(firstReq, pending) - copierReadRequestCh <- readRequest span.AddEvent("Sent a read request") for { @@ -227,8 +240,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con if !ok { return } + startReservation(req) addReq(req, pending) - copierReadRequestCh <- readRequest span.AddEvent("Sent a read request") } diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index fe0eb53bfc..99e25a0b4c 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -5,9 +5,11 @@ package ingestor import ( + "context" "fmt" "sync" "testing" + "time" "github.com/stretchr/testify/require" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -15,6 +17,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/model" pgmodel "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/prompb" + "github.com/timescale/promscale/pkg/psctx" ) func TestMetricTableName(t *testing.T) { @@ -146,11 +149,16 @@ func TestSendBatches(t *testing.T) { model.NewPromSamples(makeSeries(2), make([]prompb.Sample, 1)), model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)), } - firstReq := &insertDataRequest{metric: "test", data: data, finished: &workFinished, errChan: errChan} - copierCh := make(chan readRequest) - go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, copierCh) - copierReq := <-copierCh - batch := <-copierReq.copySender + spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour)) + firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, errChan: errChan} + reservationQ := NewReservationQueue() + go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ) + resos := make([]readRequest, 0, 1) + reservationQ.Peek() + resos, cnt := reservationQ.PopOntoBatch(resos) + require.Equal(t, 1, cnt) + require.Equal(t, 1, len(resos)) + batch := <-(resos[0].copySender) // we make sure that we receive batch data for i := 0; i < 3; i++ { diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go new file mode 100644 index 0000000000..d56c3e5787 --- /dev/null +++ b/pkg/pgmodel/ingestor/reservation.go @@ -0,0 +1,167 @@ +package ingestor + +import ( + "container/heap" + "sync" + "time" +) + +type reservation struct { + copySender <-chan copyRequest + index int + + lock sync.Mutex + startTime time.Time +} + +func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation { + return &reservation{cs, -1, sync.Mutex{}, startTime} +} + +func (res *reservation) Update(rq *ReservationQueue, t time.Time) { + rest := res.GetStartTime() + + if t.Before(rest) { + //this should happen rarely + res.SetStartTime(t) + rq.update(res) + } +} + +func (res *reservation) GetStartTime() time.Time { + res.lock.Lock() + defer res.lock.Unlock() + return res.startTime +} + +func (res *reservation) SetStartTime(t time.Time) { + res.lock.Lock() + defer res.lock.Unlock() + + //double check that it's before + if t.Before(res.startTime) { + res.startTime = t + } +} + +// reservationQueueInternal implements heap.Interface +type reservationQueueInternal []*reservation + +func newReservationQueueInternal() *reservationQueueInternal { + q := make(reservationQueueInternal, 0, 100) + return &q +} + +func (res reservationQueueInternal) Len() int { return len(res) } + +func (res reservationQueueInternal) Less(i, j int) bool { + return res[i].GetStartTime().Before(res[j].GetStartTime()) +} + +func (res reservationQueueInternal) Swap(i, j int) { + res[i], res[j] = res[j], res[i] + res[i].index = i + res[j].index = j +} + +func (res *reservationQueueInternal) Push(x interface{}) { + n := len(*res) + item := x.(*reservation) + item.index = n + *res = append(*res, item) +} + +func (res *reservationQueueInternal) Pop() interface{} { + old := *res + n := len(old) + item := old[n-1] + item.index = -1 //for safety + old[n-1] = nil // avoid memory leak + *res = old[0 : n-1] + return item +} + +type Reservation interface { + Update(*ReservationQueue, time.Time) +} + +type ReservationQueue struct { + lock sync.Mutex + cond sync.Cond + q *reservationQueueInternal + closed bool +} + +func NewReservationQueue() *ReservationQueue { + res := &ReservationQueue{lock: sync.Mutex{}, q: newReservationQueueInternal(), closed: false} + res.cond = *sync.NewCond(&res.lock) + return res +} + +func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Reservation { + si := newReservation(cs, startTime) + + rq.lock.Lock() + defer rq.lock.Unlock() + + if rq.closed { + panic("Should never add to a closed queue") + } + + if rq.q.Len() == 0 { + rq.cond.Broadcast() + } + + heap.Push(rq.q, si) + return si +} + +func (rq *ReservationQueue) Close() { + rq.lock.Lock() + defer rq.lock.Unlock() + + rq.closed = true + rq.cond.Broadcast() +} + +// Peek gives the first startTime as well as if the queue is not closed. +// It blocks until there is an element in the queue or it has been closed. +func (rq *ReservationQueue) Peek() (time.Time, bool) { + rq.lock.Lock() + defer rq.lock.Unlock() + for !rq.closed && rq.q.Len() == 0 { + rq.cond.Wait() + } + + if rq.q.Len() > 0 { + first := (*rq.q)[0] + return first.GetStartTime(), true + } + + //must be closed + return time.Time{}, false +} + +// PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty. +// never blocks. Returns number of requests pop'ed. +func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, int) { + rq.lock.Lock() + defer rq.lock.Unlock() + + count := 0 + for len(batch) < cap(batch) && rq.q.Len() > 0 { + res := heap.Pop(rq.q).(*reservation) + batch = append(batch, readRequest{res.copySender}) + count++ + } + return batch, count +} + +func (rq *ReservationQueue) update(res *reservation) { + rq.lock.Lock() + defer rq.lock.Unlock() + if res.index < 0 { //has already been poped + return + } + heap.Fix(rq.q, res.index) +} diff --git a/pkg/pgmodel/model/sql_test_utils.go b/pkg/pgmodel/model/sql_test_utils.go index 124965b1ee..860aea4529 100644 --- a/pkg/pgmodel/model/sql_test_utils.go +++ b/pkg/pgmodel/model/sql_test_utils.go @@ -153,6 +153,9 @@ func (r *SqlRecorder) checkQuery(sql string, args ...interface{}) (RowResults, e } assert.Equal(r.t, len(row.Args), len(args), "Args of different lengths @ %d %s", idx, sql) + if len(row.Args) != len(args) { + return nil, row.Err + } for i := range row.Args { switch row.Args[i].(type) { case pgtype.TextEncoder: diff --git a/pkg/psctx/psctx.go b/pkg/psctx/psctx.go new file mode 100644 index 0000000000..da59bef331 --- /dev/null +++ b/pkg/psctx/psctx.go @@ -0,0 +1,27 @@ +package psctx + +import ( + "context" + "fmt" + "time" +) + +type StartKey struct{} + +var ErrStartTimeNotSet = fmt.Errorf("start time not set") + +func WithStartTime(ctx context.Context, start time.Time) context.Context { + return context.WithValue(ctx, StartKey{}, start) +} + +func StartTime(ctx context.Context) (time.Time, error) { + val := ctx.Value(StartKey{}) + if val == nil { + return time.Time{}, ErrStartTimeNotSet + } + t, ok := val.(time.Time) + if !ok { + return t, fmt.Errorf("start time not time.Time, is: %T", val) + } + return t, nil +} From 0d2461ea39a8cb44f697a6ad09f534544c332b80 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 20 Oct 2022 16:40:33 -0400 Subject: [PATCH 2/6] Add better metrics for the batching system Should give us better observability. --- pkg/pgmodel/ingestor/buffer.go | 6 +- pkg/pgmodel/ingestor/copier.go | 24 ++++- pkg/pgmodel/ingestor/dispatcher.go | 4 - pkg/pgmodel/ingestor/metric_batcher.go | 11 ++ pkg/pgmodel/ingestor/reservation.go | 7 ++ pkg/pgmodel/ingestor/trace/trace_batcher.go | 4 +- pkg/pgmodel/metrics/ingest.go | 110 ++++++++++++++------ 7 files changed, 128 insertions(+), 38 deletions(-) diff --git a/pkg/pgmodel/ingestor/buffer.go b/pkg/pgmodel/ingestor/buffer.go index 2a4e6659e4..9f0ae3ba30 100644 --- a/pkg/pgmodel/ingestor/buffer.go +++ b/pkg/pgmodel/ingestor/buffer.go @@ -7,6 +7,7 @@ package ingestor import ( "context" "sync" + "time" "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" @@ -34,6 +35,7 @@ type pendingBuffer struct { spanCtx context.Context needsResponse []insertDataTask batch model.Batch + Start time.Time } var pendingBuffers = sync.Pool{ @@ -46,7 +48,9 @@ var pendingBuffers = sync.Pool{ } func NewPendingBuffer() *pendingBuffer { - return pendingBuffers.Get().(*pendingBuffer) + pb := pendingBuffers.Get().(*pendingBuffer) + pb.Start = time.Now() + return pb } func (p *pendingBuffer) IsFull() bool { diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 5c6fa130fc..6327117125 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -40,6 +40,7 @@ type copyRequest struct { var ( getBatchMutex = &sync.Mutex{} + lastGetBatch = time.Time{} handleDecompression = retryAfterDecompression ) @@ -177,6 +178,15 @@ func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *Rese span.AddEvent("Unlocking") }(span) + //note this metric logic does depend on the lock + now := time.Now() + if !lastGetBatch.IsZero() { + timeBetweenGetBatch := now.Sub(lastGetBatch) + metrics.IngestorWaitForCopierSeconds.With(labelsCopier).Observe(timeBetweenGetBatch.Seconds()) + + } + lastGetBatch = now + startTime, ok := reservationQ.Peek() if !ok { return batch, false @@ -191,16 +201,24 @@ func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *Rese // Values that have previously been tested with good results: 50ms-250ms. if since < minDuration { span.AddEvent("Sleep waiting to batch") - time.Sleep(minDuration - since) + sleepDuration := minDuration - since + metrics.IngestorWaitForBatchSleepSeconds.With(labelsCopier).Add(sleepDuration.Seconds()) + metrics.IngestorWaitForBatchSleepTotal.With(labelsCopier).Inc() + time.Sleep(sleepDuration) } + metrics.IngestorPipelineTime.With(labelsCopier).Observe(time.Since(startTime).Seconds()) span.AddEvent("After sleep") batch, _ = reservationQ.PopOntoBatch(batch) if len(batch) == cap(batch) { span.AddEvent("Batch is full") + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": "size"}).Inc() + } else { + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": "timeout"}).Inc() } + metrics.IngestorBatchRemainingAfterFlushTotal.With(labelsCopier).Observe(float64(reservationQ.Len())) span.SetAttributes(attribute.Int("num_batches", len(batch))) return batch, true } @@ -505,7 +523,9 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars)) tput.ReportDuplicateMetrics(duplicateSamples, duplicateMetrics) - metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds()) + duration := time.Since(insertStart).Seconds() + metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(duration) + metrics.IngestorInsertDurationPerRow.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(duration / (float64(totalExemplars + totalSamples))) return nil, lowestMinTime } diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index 95cc2689f2..e80211f81b 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -60,10 +60,6 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac numCopiers = 1 } - //TODO remove - //metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh))) - //metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) }) - if cfg.IgnoreCompressedChunks { // Handle decompression to not decompress anything. handleDecompression = skipDecompression diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index b528c10744..171cd52828 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -209,6 +209,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con log.Error("msg", err) t = time.Time{} } + metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) reservation.Update(reservationQ, t) addSpan.End() } @@ -223,6 +224,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con log.Error("msg", err) t = time.Time{} } + metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) reservation = reservationQ.Add(copySender, t) } @@ -257,6 +259,14 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con //try to send first, if not then keep batching case copySender <- copyRequest{pending, info}: metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) + if pending.IsFull() { + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc() + } else { + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "requested"}).Inc() + } + //note that this is the number of waiting in the queue, not samples or series. + metrics.IngestorBatchRemainingAfterFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(len(recvCh))) span.SetAttributes(attribute.Int("num_series", numSeries)) span.End() pending = NewPendingBuffer() @@ -268,6 +278,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con span.AddEvent("Sending last non-empty batch") copySender <- copyRequest{pending, info} metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) } span.AddEvent("Exiting metric batcher batch loop") span.SetAttributes(attribute.Int("num_series", numSeries)) diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index d56c3e5787..880ce1e972 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -116,6 +116,13 @@ func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Rese return si } +func (rq *ReservationQueue) Len() int { + rq.lock.Lock() + defer rq.lock.Unlock() + + return rq.q.Len() +} + func (rq *ReservationQueue) Close() { rq.lock.Lock() defer rq.lock.Unlock() diff --git a/pkg/pgmodel/ingestor/trace/trace_batcher.go b/pkg/pgmodel/ingestor/trace/trace_batcher.go index 65e5401d7a..d77b69685a 100644 --- a/pkg/pgmodel/ingestor/trace/trace_batcher.go +++ b/pkg/pgmodel/ingestor/trace/trace_batcher.go @@ -159,7 +159,7 @@ func (b *Batcher) batch(batchIdx int) { ticker.Reset(b.config.BatchTimeout) } if batch.isFull() { - metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "reason": "size"}).Inc() + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "subsystem": "batcher", "reason": "size"}).Inc() batch = flushBatch(batch) } } @@ -169,7 +169,7 @@ func (b *Batcher) batch(batchIdx int) { processReq(item) case <-ticker.C: batcherSpan.AddEvent("Batch timeout reached") - metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "reason": "timeout"}).Inc() + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "trace", "subsystem": "batcher", "reason": "timeout"}).Inc() if !batch.isEmpty() { batch = flushBatch(batch) } diff --git a/pkg/pgmodel/metrics/ingest.go b/pkg/pgmodel/metrics/ingest.go index fb792f0524..cecfa5a38c 100644 --- a/pkg/pgmodel/metrics/ingest.go +++ b/pkg/pgmodel/metrics/ingest.go @@ -5,8 +5,6 @@ package metrics import ( - "os" - "github.com/prometheus/client_golang/prometheus" "github.com/timescale/promscale/pkg/util" ) @@ -75,11 +73,38 @@ var ( prometheus.HistogramOpts{ Namespace: util.PromNamespace, Subsystem: "ingest", - Name: "flush_series", + Name: "metric_batch_flush_series", Help: "Number of series batched by the ingestor.", Buckets: util.HistogramBucketsSaturating(1, 2, FlushSize), }, []string{"type", "subsystem"}, ) + IngestorFlushInsertables = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "metric_batch_flush_insertables_total", + Help: "Number of insertables batched by the ingestor.", + Buckets: append(util.HistogramBucketsSaturating(1, 2, FlushSize), 1.2*FlushSize, 2*FlushSize), + }, []string{"type", "subsystem"}, + ) + IngestorBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "metric_batch_duration_seconds", + Help: "Number of seconds that metrics were batched together", + Buckets: prometheus.DefBuckets, + }, []string{"type", "subsystem"}, + ) + IngestorPipelineTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "pipeline_time_seconds", + Help: "Time that it took to reach the subsystem, from beginning of request", + Buckets: prometheus.DefBuckets, + }, []string{"type", "subsystem"}, + ) IngestorInsertsPerBatch = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: util.PromNamespace, @@ -95,7 +120,7 @@ var ( Subsystem: "ingest", Name: "rows_per_batch", Help: "Number of rows inserted in a single transaction.", - Buckets: prometheus.ExponentialBuckets(1, 2, 15), + Buckets: prometheus.ExponentialBuckets(1, 2, 17), }, []string{"type", "subsystem"}, ) IngestorRowsPerInsert = prometheus.NewHistogramVec( @@ -116,6 +141,15 @@ var ( Buckets: append(prometheus.DefBuckets, []float64{60, 120, 300}...), }, []string{"type", "subsystem", "kind"}, ) + IngestorInsertDurationPerRow = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "insert_duration_seconds_per_row", + Help: "Duration of sample/exemplar batch insert call per row to the database.", + Buckets: append([]float64{0.000001, 0.000005, 0.00001, 0.00005, 0.0001, 0.0005, 0.001}, prometheus.DefBuckets...), + }, []string{"type", "subsystem", "kind"}, + ) IngestorActiveWriteRequests = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: util.PromNamespace, @@ -180,8 +214,17 @@ var ( Namespace: util.PromNamespace, Subsystem: "ingest", Name: "batch_flush_total", - Help: "Number of batch flushes by reason (size or timeout).", - }, []string{"type", "reason"}, + Help: "Number of batch flushes by reason (size, timeout, requested).", + }, []string{"type", "reason", "subsystem"}, + ) + IngestorBatchRemainingAfterFlushTotal = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "batch_remaining_after_flush_total", + Help: "Number of items remaining after a flush. Mostly only applies if batch flush reason was size", + Buckets: []float64{0, 10, 50, 100, 200, 500, 1000, 2000, 4000, 6000, 8000, 10000, 20000}, + }, []string{"type", "subsystem"}, ) IngestorPendingBatches = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -199,6 +242,31 @@ var ( Help: "Number of active user requests in queue.", }, []string{"type", "queue_idx"}, ) + IngestorWaitForBatchSleepSeconds = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "wait_for_batch_sleep_seconds", + Help: "Number of seconds sleeping while waiting for batch", + }, []string{"type", "subsystem"}, + ) + IngestorWaitForBatchSleepTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "wait_for_batch_sleep_total", + Help: "Number of times sleeping while waiting for batch", + }, []string{"type", "subsystem"}, + ) + IngestorWaitForCopierSeconds = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "ingest", + Name: "wait_for_copier_seconds", + Help: "Number of seconds waiting for copier get batch", + Buckets: prometheus.DefBuckets, + }, []string{"type", "subsystem"}, + ) ) func init() { @@ -215,6 +283,7 @@ func init() { IngestorRowsPerBatch, IngestorRowsPerInsert, IngestorInsertDuration, + IngestorInsertDurationPerRow, IngestorActiveWriteRequests, IngestorDuration, IngestorItems, @@ -225,28 +294,11 @@ func init() { IngestorBatchFlushTotal, IngestorPendingBatches, IngestorRequestsQueued, + IngestorWaitForBatchSleepSeconds, + IngestorWaitForBatchSleepTotal, + IngestorBatchDuration, + IngestorPipelineTime, + IngestorBatchRemainingAfterFlushTotal, + IngestorWaitForCopierSeconds, ) } - -// RegisterCopierChannelLenMetric creates and registers the copier channel len metric with a callback -// that should return the length of the channel. -// -// Note: ingestorChannelLenCopier metric depends on prometheus call to /metrics hence we need to update with -// a callback. This is an odd one out from the other metrics in the ingestor as other metrics -// are async to prometheus calls. -func RegisterCopierChannelLenMetric(updater func() float64) { - r := prometheus.DefaultRegisterer - if val := os.Getenv("IS_TEST"); val == "true" { - r = prometheus.NewRegistry() - } - ingestorChannelLenCopier := prometheus.NewGaugeFunc( - prometheus.GaugeOpts{ - Namespace: util.PromNamespace, - Subsystem: "ingest", - Name: "channel_len", - Help: "Length of the ingestor channel.", - ConstLabels: map[string]string{"type": "metric", "subsystem": "copier", "kind": "sample"}, - }, updater, - ) - r.MustRegister(ingestorChannelLenCopier) -} From b93ea2b94bf344f929f1dd2db7997f7cef7bafa2 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Tue, 25 Oct 2022 14:39:35 -0400 Subject: [PATCH 3/6] Improve batching - Significantly increase number of samples batched per metric. Heavy metrics are very likely to come from many requests, so flushing them sooner is better. Having to wait multiple times to flush is no bueno. - To prevent copy batches from getting to large in the face of potentially more samples/metric change that batch criteria to batch not just with a limit to # metrics but also a limit of # samples. - change preference from try to flush before batching to try to batch before flushing. This should increase batch sizes - For two metric with the same initial request time, flush the bigger one first, hoping the smaller one can become bigger before flush. --- pkg/pgmodel/ingestor/copier.go | 12 +++---- pkg/pgmodel/ingestor/metric_batcher.go | 35 +++++++++--------- pkg/pgmodel/ingestor/metric_batcher_test.go | 2 +- pkg/pgmodel/ingestor/reservation.go | 40 +++++++++++++++++---- pkg/pgmodel/metrics/ingest.go | 3 +- 5 files changed, 61 insertions(+), 31 deletions(-) diff --git a/pkg/pgmodel/ingestor/copier.go b/pkg/pgmodel/ingestor/copier.go index 6327117125..5911ad9545 100644 --- a/pkg/pgmodel/ingestor/copier.go +++ b/pkg/pgmodel/ingestor/copier.go @@ -210,14 +210,12 @@ func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *Rese metrics.IngestorPipelineTime.With(labelsCopier).Observe(time.Since(startTime).Seconds()) span.AddEvent("After sleep") - batch, _ = reservationQ.PopOntoBatch(batch) + var reason string + batch, _, reason = reservationQ.PopOntoBatch(batch) + metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": reason}).Inc() + + span.AddEvent("Flushed due to" + reason) - if len(batch) == cap(batch) { - span.AddEvent("Batch is full") - metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": "size"}).Inc() - } else { - metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": "timeout"}).Inc() - } metrics.IngestorBatchRemainingAfterFlushTotal.With(labelsCopier).Observe(float64(reservationQ.Len())) span.SetAttributes(attribute.Int("num_batches", len(batch))) return batch, true diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 171cd52828..2f16fa11f9 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -210,7 +210,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con t = time.Time{} } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) - reservation.Update(reservationQ, t) + reservation.Update(reservationQ, t, len(req.data)) addSpan.End() } //This channel in synchronous (no buffering). This provides backpressure @@ -254,11 +254,27 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con } numSeries := pending.batch.CountSeries() + numSamples, numExemplars := pending.batch.Count() select { - //try to send first, if not then keep batching + //try to batch as much as possible before sending + case req, ok := <-recvCh: + if !ok { + if !pending.IsEmpty() { + span.AddEvent("Sending last non-empty batch") + copySender <- copyRequest{pending, info} + metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) + } + span.AddEvent("Exiting metric batcher batch loop") + span.SetAttributes(attribute.Int("num_series", numSeries)) + span.End() + return + } + addReq(req, pending) case copySender <- copyRequest{pending, info}: metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars)) metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) if pending.IsFull() { metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc() @@ -272,20 +288,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con pending = NewPendingBuffer() pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches") span.SetAttributes(attribute.String("metric", info.TableName)) - case req, ok := <-recvCh: - if !ok { - if !pending.IsEmpty() { - span.AddEvent("Sending last non-empty batch") - copySender <- copyRequest{pending, info} - metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) - metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) - } - span.AddEvent("Exiting metric batcher batch loop") - span.SetAttributes(attribute.Int("num_series", numSeries)) - span.End() - return - } - addReq(req, pending) + } } } diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index 99e25a0b4c..45fd772ebc 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -155,7 +155,7 @@ func TestSendBatches(t *testing.T) { go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ) resos := make([]readRequest, 0, 1) reservationQ.Peek() - resos, cnt := reservationQ.PopOntoBatch(resos) + resos, cnt, _ := reservationQ.PopOntoBatch(resos) require.Equal(t, 1, cnt) require.Equal(t, 1, len(resos)) batch := <-(resos[0].copySender) diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index 880ce1e972..ffee442c65 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -3,6 +3,7 @@ package ingestor import ( "container/heap" "sync" + "sync/atomic" "time" ) @@ -12,14 +13,17 @@ type reservation struct { lock sync.Mutex startTime time.Time + + items int64 } func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation { - return &reservation{cs, -1, sync.Mutex{}, startTime} + return &reservation{cs, -1, sync.Mutex{}, startTime, 1} } -func (res *reservation) Update(rq *ReservationQueue, t time.Time) { +func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) { rest := res.GetStartTime() + atomic.AddInt64(&res.items, int64(num_insertables)) if t.Before(rest) { //this should happen rarely @@ -55,6 +59,14 @@ func newReservationQueueInternal() *reservationQueueInternal { func (res reservationQueueInternal) Len() int { return len(res) } func (res reservationQueueInternal) Less(i, j int) bool { + startTimeI := res[i].GetStartTime() + startTimeJ := res[j].GetStartTime() + if startTimeI.Equal(startTimeJ) { + itemsI := atomic.LoadInt64(&res[i].items) + itemsJ := atomic.LoadInt64(&res[j].items) + //prerer metrics with more items because they probably hold up more stuff + return itemsI > itemsJ + } return res[i].GetStartTime().Before(res[j].GetStartTime()) } @@ -82,7 +94,7 @@ func (res *reservationQueueInternal) Pop() interface{} { } type Reservation interface { - Update(*ReservationQueue, time.Time) + Update(*ReservationQueue, time.Time, int) } type ReservationQueue struct { @@ -151,17 +163,33 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) { // PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty. // never blocks. Returns number of requests pop'ed. -func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, int) { +func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, int, string) { rq.lock.Lock() defer rq.lock.Unlock() count := 0 - for len(batch) < cap(batch) && rq.q.Len() > 0 { + items := int64(0) + if rq.q.Len() > 0 { + items = atomic.LoadInt64(&(*rq.q)[0].items) + } + total_items := int64(0) + for len(batch) < cap(batch) && rq.q.Len() > 0 && (len(batch) == 0 || items+total_items < 20000) { res := heap.Pop(rq.q).(*reservation) batch = append(batch, readRequest{res.copySender}) count++ + total_items += items + items = 0 + if rq.q.Len() > 0 { + items = atomic.LoadInt64(&(*rq.q)[0].items) + } + } + reason := "timeout" + if !(len(batch) < cap(batch)) { + reason = "size_metrics" + } else if !(len(batch) == 0 || items+total_items < 20000) { + reason = "size_samples" } - return batch, count + return batch, count, reason } func (rq *ReservationQueue) update(res *reservation) { diff --git a/pkg/pgmodel/metrics/ingest.go b/pkg/pgmodel/metrics/ingest.go index cecfa5a38c..f3ac423125 100644 --- a/pkg/pgmodel/metrics/ingest.go +++ b/pkg/pgmodel/metrics/ingest.go @@ -14,7 +14,7 @@ const ( MetricBatcherChannelCap = 1000 // FlushSize defines the batch size. It is the maximum number of samples/exemplars per insert batch. // This translates to the max array size that we pass into `insert_metric_row` - FlushSize = 2000 + FlushSize = 10000 MaxInsertStmtPerTxn = 100 ) @@ -279,6 +279,7 @@ func init() { IngestorChannelCap, IngestorChannelLenBatcher, IngestorFlushSeries, + IngestorFlushInsertables, IngestorInsertsPerBatch, IngestorRowsPerBatch, IngestorRowsPerInsert, From 80eec93502ed5f0136a5eaf1caeb7154743c8616 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Tue, 1 Nov 2022 12:19:52 -0400 Subject: [PATCH 4/6] Prevent flushing a batch with less than one request Previously, when a new set of requests came in, we often flushed a very small first batch because we flush as soon as any data becomes available in Peek. Since small batches are ineffecient we fix this by waiting till the entire first request is batched to flush (with a timeout). --- pkg/pgmodel/ingestor/dispatcher.go | 5 ++- pkg/pgmodel/ingestor/metric_batcher.go | 3 +- pkg/pgmodel/ingestor/metric_batcher_test.go | 4 +- pkg/pgmodel/ingestor/reservation.go | 42 ++++++++++++++++----- 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index e80211f81b..b3d95a3551 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -227,8 +227,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt int64 rows = dataTS.Rows workFinished = new(sync.WaitGroup) + batched = new(sync.WaitGroup) ) workFinished.Add(len(rows)) + batched.Add(len(rows)) // we only allocate enough space for a single error message here as we only // report one error back upstream. The inserter should not block on this // channel, but only insert if it's empty, anything else can deadlock. @@ -241,7 +243,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -339,6 +341,7 @@ type insertDataRequest struct { spanCtx trace.SpanContext metric string finished *sync.WaitGroup + batched *sync.WaitGroup data []model.Insertable errChan chan error } diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 2f16fa11f9..29c98e9adc 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -211,6 +211,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) reservation.Update(reservationQ, t, len(req.data)) + req.batched.Done() addSpan.End() } //This channel in synchronous (no buffering). This provides backpressure @@ -225,7 +226,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con t = time.Time{} } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) - reservation = reservationQ.Add(copySender, t) + reservation = reservationQ.Add(copySender, req.batched, t) } pending := NewPendingBuffer() diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index 45fd772ebc..3b64919f27 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -143,6 +143,8 @@ func TestSendBatches(t *testing.T) { return l } var workFinished sync.WaitGroup + var batched sync.WaitGroup + batched.Add(1) errChan := make(chan error, 1) data := []model.Insertable{ model.NewPromSamples(makeSeries(1), make([]prompb.Sample, 1)), @@ -150,7 +152,7 @@ func TestSendBatches(t *testing.T) { model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)), } spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour)) - firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, errChan: errChan} + firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, batched: &batched, errChan: errChan} reservationQ := NewReservationQueue() go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ) resos := make([]readRequest, 0, 1) diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index ffee442c65..a6a12c423a 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -8,8 +8,9 @@ import ( ) type reservation struct { - copySender <-chan copyRequest - index int + copySender <-chan copyRequest + firstRequestBatched *sync.WaitGroup + index int lock sync.Mutex startTime time.Time @@ -17,8 +18,8 @@ type reservation struct { items int64 } -func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation { - return &reservation{cs, -1, sync.Mutex{}, startTime, 1} +func newReservation(cs <-chan copyRequest, startTime time.Time, batched *sync.WaitGroup) *reservation { + return &reservation{cs, batched, -1, sync.Mutex{}, startTime, 1} } func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) { @@ -110,8 +111,8 @@ func NewReservationQueue() *ReservationQueue { return res } -func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Reservation { - si := newReservation(cs, startTime) +func (rq *ReservationQueue) Add(cs <-chan copyRequest, batched *sync.WaitGroup, startTime time.Time) Reservation { + si := newReservation(cs, startTime, batched) rq.lock.Lock() defer rq.lock.Unlock() @@ -146,19 +147,42 @@ func (rq *ReservationQueue) Close() { // Peek gives the first startTime as well as if the queue is not closed. // It blocks until there is an element in the queue or it has been closed. func (rq *ReservationQueue) Peek() (time.Time, bool) { + reservation, waited, ok := rq.peek() + if !ok { + return time.Time{}, false + } + if waited { + /* If this is the first reservation in the queue, wait for the entire request to be batched with a timeout. + * (timeout is really a safety measure to prevent deadlocks if some metric batcher is full, which is unlikely)*/ + waitch := make(chan struct{}) + go func() { + reservation.firstRequestBatched.Wait() + close(waitch) + }() + select { + case <-waitch: + case <-time.After(250 * time.Millisecond): + } + } + return reservation.GetStartTime(), ok +} + +func (rq *ReservationQueue) peek() (*reservation, bool, bool) { rq.lock.Lock() defer rq.lock.Unlock() + waited := false for !rq.closed && rq.q.Len() == 0 { + waited = true rq.cond.Wait() } if rq.q.Len() > 0 { - first := (*rq.q)[0] - return first.GetStartTime(), true + firstReservation := (*rq.q)[0] + return firstReservation, waited, true } //must be closed - return time.Time{}, false + return nil, false, false } // PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty. From efdf8699591e3837fc0470f4e7ebaa6b0c4f8ef2 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 3 Nov 2022 16:19:07 -0400 Subject: [PATCH 5/6] trace request logging --- pkg/log/log.go | 26 ++++++++++++++++++++++++++ pkg/pgmodel/ingestor/dispatcher.go | 13 +++++++++++++ pkg/pgmodel/ingestor/reservation.go | 4 ++++ 3 files changed, 43 insertions(+) diff --git a/pkg/log/log.go b/pkg/log/log.go index cb2c1421d7..5466ccd998 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -95,6 +95,7 @@ func Init(cfg Config) error { // NOTE: we add a level of indirection with our logging functions, // so we need additional caller depth logger = log.With(l, "ts", timestampFormat, "caller", log.Caller(4)) + traceRequestEnabled = isTraceRequestEnabled() return nil } @@ -205,3 +206,28 @@ func WarnRateLimited(keyvals ...interface{}) { func DebugRateLimited(keyvals ...interface{}) { rateLimit(debug, keyvals...) } + +var traceRequestEnabled bool + +func isTraceRequestEnabled() bool { + value := os.Getenv("PROMSCALE_TRACE_REQUEST") + if value == "" { + return false + } + enabled, err := strconv.ParseBool(value) + if err != nil || !enabled { + //assume off + return false + } + return true +} + +func TraceRequestEnabled() bool { + return traceRequestEnabled +} + +func TraceRequest(keyvals ...interface{}) { + if TraceRequestEnabled() { + Debug(keyvals...) + } +} diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index b3d95a3551..74c38472eb 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -21,6 +21,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/tracer" tput "github.com/timescale/promscale/pkg/util/throughput" ) @@ -245,6 +246,16 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 } p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan} } + + var startTime time.Time + if log.TraceRequestEnabled() { + t, err := psctx.StartTime(ctx) + if err != nil { + log.TraceRequest("component", "dispatcher", "err", err) + } + startTime = t + log.TraceRequest("component", "dispatcher", "event", "start", "metrics", len(rows), "samples", numRows, "start_time", startTime.UnixNano()) + } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) reportIncomingBatch(numRows) @@ -261,6 +272,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 case err = <-errChan: default: } + log.TraceRequest("component", "dispatcher", "event", "ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime)) reportMetricsTelemetry(maxt, numRows, 0) close(errChan) } else { @@ -275,6 +287,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 if err != nil { log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err) } + log.TraceRequest("component", "dispatcher", "event", "async_ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime)) reportMetricsTelemetry(maxt, numRows, 0) }() } diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index a6a12c423a..5d1f769037 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -5,6 +5,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/timescale/promscale/pkg/log" ) type reservation struct { @@ -163,6 +165,7 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) { case <-waitch: case <-time.After(250 * time.Millisecond): } + log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.q.Len(), "waited", waited, "took", time.Since((*rq.q)[0].GetStartTime())) } return reservation.GetStartTime(), ok } @@ -213,6 +216,7 @@ func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, in } else if !(len(batch) == 0 || items+total_items < 20000) { reason = "size_samples" } + log.TraceRequest("component", "reservation", "event", "pop", "reason", reason, "metrics", count, "items", total_items, "remaining_metrics", rq.q.Len()) return batch, count, reason } From 4c34323fd7522d45289d0887106249d47866d17a Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Tue, 13 Dec 2022 15:41:32 -0500 Subject: [PATCH 6/6] bug fix --- pkg/pgmodel/ingestor/metric_batcher.go | 9 +++++---- pkg/pgmodel/ingestor/reservation.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 29c98e9adc..840b73aaa4 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -256,7 +256,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con numSeries := pending.batch.CountSeries() numSamples, numExemplars := pending.batch.Count() - + wasFull := pending.IsFull() + start := pending.Start select { //try to batch as much as possible before sending case req, ok := <-recvCh: @@ -265,7 +266,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con span.AddEvent("Sending last non-empty batch") copySender <- copyRequest{pending, info} metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) - metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds()) } span.AddEvent("Exiting metric batcher batch loop") span.SetAttributes(attribute.Int("num_series", numSeries)) @@ -276,8 +277,8 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con case copySender <- copyRequest{pending, info}: metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars)) - metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) - if pending.IsFull() { + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(start).Seconds()) + if wasFull { metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc() } else { metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "requested"}).Inc() diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index 5d1f769037..7b87e4fbca 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -165,7 +165,7 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) { case <-waitch: case <-time.After(250 * time.Millisecond): } - log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.q.Len(), "waited", waited, "took", time.Since((*rq.q)[0].GetStartTime())) + log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.Len(), "waited", waited, "took", time.Since(reservation.GetStartTime())) } return reservation.GetStartTime(), ok }