diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bbb2f1612f5..39ad1f25146 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -345,6 +345,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804] - Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462] - The journald input is now generally available. {pull}42107[42107] +- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index fb246803411..9f3a9ecdd81 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -1623,6 +1623,8 @@ observe the activity of the input. [options="header"] |======= | Metric | Description +| `events_published_total` | Total number of events published. +| `pages_published_total` | Total number of pages of event published. | `http_request_total` | Total number of processed requests. | `http_request_errors_total` | Total number of request errors. | `http_request_delete_total` | Total number of `DELETE` requests. diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index f7db8dc6794..90f9b124b78 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -215,7 +215,7 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso } pagination := newPagination(cfg, client, log) responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log) - requester := newRequester(client, requestFactory, responseProcessor, log) + requester := newRequester(client, requestFactory, responseProcessor, metrics, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(cfg.Cursor, log) diff --git a/x-pack/filebeat/input/httpjson/metrics.go b/x-pack/filebeat/input/httpjson/metrics.go index 4e0ba508c01..16ffd69d274 100644 --- a/x-pack/filebeat/input/httpjson/metrics.go +++ b/x-pack/filebeat/input/httpjson/metrics.go @@ -19,6 +19,8 @@ type inputMetrics struct { intervalPages metrics.Sample // histogram of pages per interval intervals *monitoring.Uint // total number of intervals executed intervalErrs *monitoring.Uint // total number of interval errors + eventsPublished *monitoring.Uint // number of events published + pagesPublished *monitoring.Uint // number of pages of event published } func newInputMetrics(reg *monitoring.Registry) *inputMetrics { @@ -29,6 +31,8 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics { out := &inputMetrics{ intervals: monitoring.NewUint(reg, "httpjson_interval_total"), intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"), + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + pagesPublished: monitoring.NewUint(reg, "pages_published_total"), intervalExecutionTime: metrics.NewUniformSample(1024), intervalPageExecutionTime: metrics.NewUniformSample(1024), intervalPages: metrics.NewUniformSample(1024), @@ -44,6 +48,13 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics { return out } +func (m *inputMetrics) addEventsPublished(n uint64) { + if m == nil { + return + } + m.eventsPublished.Add(n) +} + func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) { if m == nil { return @@ -59,6 +70,7 @@ func (m *inputMetrics) updatePageExecutionTime(t time.Time) { if m == nil { return } + m.pagesPublished.Add(1) m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds()) } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index fb847570826..aef078dad20 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) - p := newPublisher(trCtx, publisher, true, r.log) + p := newPublisher(trCtx, publisher, true, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p) n = p.eventCount() continue @@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values - p := newPublisher(trCtx, publisher, false, r.log) + p := newPublisher(trCtx, publisher, false, r.metrics, r.log) r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p) n = p.eventCount() } else { @@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ resps = intermediateResps } - p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) if rf.isChain { rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) } else { @@ -474,14 +474,16 @@ type requester struct { client *httpClient requestFactories []*requestFactory responseProcessors []*responseProcessor + metrics *inputMetrics log *logp.Logger } -func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester { +func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester { return &requester{ client: client, requestFactories: reqs, responseProcessors: resps, + metrics: metrics, log: log, } } @@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra } resps = intermediateResps } - p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log) rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p) n += p.eventCount() } @@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { // publisher is an event publication handler. type publisher struct { - trCtx *transformContext - pub inputcursor.Publisher - n int - log *logp.Logger + trCtx *transformContext + pub inputcursor.Publisher + n int + log *logp.Logger + metrics *inputMetrics } -func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher { +func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher { if !publish { pub = nil } @@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) { p.trCtx.updateLastEvent(msg) p.trCtx.updateCursor() + p.metrics.addEventsPublished(1) p.n++ } diff --git a/x-pack/filebeat/input/httpjson/request_test.go b/x-pack/filebeat/input/httpjson/request_test.go index 2bd3aab675a..89e077b8ee6 100644 --- a/x-pack/filebeat/input/httpjson/request_test.go +++ b/x-pack/filebeat/input/httpjson/request_test.go @@ -66,7 +66,7 @@ func TestCtxAfterDoRequest(t *testing.T) { pagination := newPagination(config, client, log) responseProcessor := newResponseProcessor(config, pagination, nil, nil, log) - requester := newRequester(client, requestFactory, responseProcessor, log) + requester := newRequester(client, requestFactory, responseProcessor, nil, log) trCtx := emptyTransformContext() trCtx.cursor = newCursor(config.Cursor, log)