Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #42442) x-pack/filebeat/input/httpjson: add metrics for number of events and pages published #42589

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
- The journald input is now generally available. {pull}42107[42107]
- Publish events progressively in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42567[42567]
- The journald input is now generally available. {pull}42107[42107]
efd6 marked this conversation as resolved.
Show resolved Hide resolved
efd6 marked this conversation as resolved.
Show resolved Hide resolved
- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442]

*Auditbeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,8 @@ observe the activity of the input.
[options="header"]
|=======
| Metric | Description
| `events_published_total` | Total number of events published.
| `pages_published_total` | Total number of pages of event published.
| `http_request_total` | Total number of processed requests.
| `http_request_errors_total` | Total number of request errors.
| `http_request_delete_total` | Total number of `DELETE` requests.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@
}

keys := make([]string, 0, len(m))
for k := range m {

Check failure on line 121 in x-pack/filebeat/input/httpjson/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot range over m (variable of type mapstrM) (typecheck)
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := m[k]

Check failure on line 126 in x-pack/filebeat/input/httpjson/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

invalid operation: cannot index m (variable of type mapstrM) (typecheck)
if inner, ok := tryToMapStr(v); ok {
err := enc.AddObject(k, inner)
if err != nil {
Expand All @@ -141,9 +141,9 @@
case mapstrM:
return m, true
case map[string]interface{}:
return mapstrM(m), true

Check failure on line 144 in x-pack/filebeat/input/httpjson/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot convert m (variable of type map[string]interface{}) to type mapstrM (typecheck)
default:
return nil, false

Check failure on line 146 in x-pack/filebeat/input/httpjson/input.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

cannot use nil as mapstrM value in return statement (typecheck)
}
}

Expand Down Expand Up @@ -215,7 +215,7 @@
}
pagination := newPagination(cfg, client, log)
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log)
requester := newRequester(client, requestFactory, responseProcessor, log)
requester := newRequester(client, requestFactory, responseProcessor, metrics, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(cfg.Cursor, log)
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/httpjson/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type inputMetrics struct {
intervalPages metrics.Sample // histogram of pages per interval
intervals *monitoring.Uint // total number of intervals executed
intervalErrs *monitoring.Uint // total number of interval errors
eventsPublished *monitoring.Uint // number of events published
pagesPublished *monitoring.Uint // number of pages of event published
}

func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
Expand All @@ -29,6 +31,8 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
out := &inputMetrics{
intervals: monitoring.NewUint(reg, "httpjson_interval_total"),
intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
pagesPublished: monitoring.NewUint(reg, "pages_published_total"),
intervalExecutionTime: metrics.NewUniformSample(1024),
intervalPageExecutionTime: metrics.NewUniformSample(1024),
intervalPages: metrics.NewUniformSample(1024),
Expand All @@ -44,6 +48,13 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
return out
}

func (m *inputMetrics) addEventsPublished(n uint64) {
if m == nil {
return
}
m.eventsPublished.Add(n)
}

func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) {
if m == nil {
return
Expand All @@ -59,6 +70,7 @@ func (m *inputMetrics) updatePageExecutionTime(t time.Time) {
if m == nil {
return
}
m.pagesPublished.Add(1)
m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds())
}

Expand Down
24 changes: 14 additions & 10 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
p := newPublisher(trCtx, publisher, true, r.log)
p := newPublisher(trCtx, publisher, true, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
Expand Down Expand Up @@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
return err
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
p := newPublisher(trCtx, publisher, false, r.log)
p := newPublisher(trCtx, publisher, false, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
resps = intermediateResps
}

p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
if rf.isChain {
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
} else {
Expand Down Expand Up @@ -474,14 +474,16 @@ type requester struct {
client *httpClient
requestFactories []*requestFactory
responseProcessors []*responseProcessor
metrics *inputMetrics
log *logp.Logger
}

func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester {
func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester {
return &requester{
client: client,
requestFactories: reqs,
responseProcessors: resps,
metrics: metrics,
log: log,
}
}
Expand Down Expand Up @@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra
}
resps = intermediateResps
}
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
n += p.eventCount()
}
Expand Down Expand Up @@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {

// publisher is an event publication handler.
type publisher struct {
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
metrics *inputMetrics
}

func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher {
if !publish {
pub = nil
}
Expand Down Expand Up @@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) {
p.trCtx.updateLastEvent(msg)
p.trCtx.updateCursor()

p.metrics.addEventsPublished(1)
p.n++
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestCtxAfterDoRequest(t *testing.T) {
pagination := newPagination(config, client, log)
responseProcessor := newResponseProcessor(config, pagination, nil, nil, log)

requester := newRequester(client, requestFactory, responseProcessor, log)
requester := newRequester(client, requestFactory, responseProcessor, nil, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(config.Cursor, log)
Expand Down
Loading