diff --git a/packages/kafka/_dev/build/docs/README.md b/packages/kafka/_dev/build/docs/README.md index 3de77c94fb8..945570b0832 100644 --- a/packages/kafka/_dev/build/docs/README.md +++ b/packages/kafka/_dev/build/docs/README.md @@ -54,4 +54,28 @@ Please refer to the following [document](https://www.elastic.co/guide/en/ecs/cur Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. -{{fields "partition"}} \ No newline at end of file +{{fields "partition"}} + +### consumer + +The `consumer` dataset collects JMX metrics from Kafka consumers using Jolokia. + +{{event "consumer"}} + +**ECS Field Reference** + +Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. + +{{fields "consumer"}} + +### producer + +The `producer` dataset collects JMX metrics from Kafka producers using Jolokia. + +{{event "producer"}} + +**ECS Field Reference** + +Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. + +{{fields "producer"}} \ No newline at end of file diff --git a/packages/kafka/changelog.yml b/packages/kafka/changelog.yml index 0bb189a31c2..24489a05465 100644 --- a/packages/kafka/changelog.yml +++ b/packages/kafka/changelog.yml @@ -1,4 +1,9 @@ # newer versions go on top +- version: "1.19.0" + changes: + - description: Add support for producer and consumer data streams. + type: enhancement + link: https://github.com/elastic/integrations/pull/13648 - version: "1.18.4" changes: - description: Update supported kafka versions in README. diff --git a/packages/kafka/data_stream/consumer/agent/stream/stream.yml.hbs b/packages/kafka/data_stream/consumer/agent/stream/stream.yml.hbs new file mode 100644 index 00000000000..3f5039a8e70 --- /dev/null +++ b/packages/kafka/data_stream/consumer/agent/stream/stream.yml.hbs @@ -0,0 +1,28 @@ +metricsets: ["consumer"] +period: {{period}} +hosts: +{{#each jolokia_hosts}} + - {{this}} +{{/each}} +{{#if ssl.certificate}} +ssl.certificate: {{ssl.certificate}} +{{/if}} +{{#if ssl.certificate_authorities}} +ssl.certificate_authorities: {{ssl.certificate_authorities}} +{{/if}} +{{#if ssl.key}} +ssl.key: {{ssl.key}} +{{/if}} +{{#if ssl.key_passphrase}} +ssl.key_passphrase: {{ssl.key_passphrase}} +{{/if}} +{{#if ssl.verification_mode}} +ssl.verification_mode: {{ssl.verification_mode}} +{{/if}} +{{#if ssl.ca_trusted_fingerprint}} +ssl.ca_trusted_fingerprint: {{ssl.ca_trusted_fingerprint}} +{{/if}} +{{#if processors}} +processors: +{{processors}} +{{/if}} \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/fields/agent.yml b/packages/kafka/data_stream/consumer/fields/agent.yml new file mode 100644 index 00000000000..4bb954b4a8e --- /dev/null +++ b/packages/kafka/data_stream/consumer/fields/agent.yml @@ -0,0 +1,94 @@ +- name: cloud + title: Cloud + group: 2 + description: Fields related to the cloud or infrastructure the events are coming from. + footnote: 'Examples: If Metricbeat is running on an EC2 host and fetches data from its host, the cloud info contains the data about this machine. If Metricbeat runs on a remote machine outside the cloud and fetches data from a service running in the cloud, the field contains cloud data from the machine the service is running on.' + type: group + fields: + - name: account.id + level: extended + type: keyword + ignore_above: 1024 + dimension: true + description: 'The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.' + example: 666777888999 + - name: availability_zone + level: extended + type: keyword + ignore_above: 1024 + dimension: true + description: Availability zone in which this host is running. + example: us-east-1c + - name: instance.id + level: extended + type: keyword + ignore_above: 1024 + description: Instance ID of the host machine. + example: i-1234567890abcdef0 + dimension: true + - name: provider + level: extended + type: keyword + ignore_above: 1024 + description: Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean. + example: aws + dimension: true + - name: region + level: extended + type: keyword + ignore_above: 1024 + dimension: true + description: Region in which this host is running. + example: us-east-1 + - name: image.id + type: keyword + description: Image ID for the cloud instance. +- name: container + title: Container + group: 2 + description: 'Container fields are used for meta information about the specific container that is the source of information. These fields help correlate data based containers from any runtime.' + type: group + fields: + - name: id + level: core + type: keyword + ignore_above: 1024 + description: Unique container id. + dimension: true +- name: host + title: Host + group: 2 + description: 'A host is defined as a general computing instance. ECS host.* fields should be populated with details about the host on which the event happened, or from which the measurement was taken. Host types include hardware, virtual machines, Docker containers, and Kubernetes nodes.' + type: group + fields: + - name: name + level: core + type: keyword + ignore_above: 1024 + dimension: true + description: 'Name of the host. It can contain what `hostname` returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.' + - name: containerized + type: boolean + description: > + If the host is a container. + + - name: os.build + type: keyword + example: "18D109" + description: > + OS build information. + + - name: os.codename + type: keyword + example: "stretch" + description: > + OS codename, if any. + +- name: agent + title: Agent + type: group + fields: + - name: id + type: keyword + ignore_above: 1024 + dimension: true \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/fields/base-fields.yml b/packages/kafka/data_stream/consumer/fields/base-fields.yml new file mode 100644 index 00000000000..f76f4c9e8a5 --- /dev/null +++ b/packages/kafka/data_stream/consumer/fields/base-fields.yml @@ -0,0 +1,20 @@ +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. +- name: '@timestamp' + type: date + description: Event timestamp. +- name: event.module + type: constant_keyword + description: Event module + value: kafka +- name: event.dataset + type: constant_keyword + description: Event dataset + value: kafka.consumer \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/fields/ecs.yml b/packages/kafka/data_stream/consumer/fields/ecs.yml new file mode 100644 index 00000000000..5b5d8e642e7 --- /dev/null +++ b/packages/kafka/data_stream/consumer/fields/ecs.yml @@ -0,0 +1,3 @@ +- external: ecs + name: service.address + dimension: true \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/fields/fields.yml b/packages/kafka/data_stream/consumer/fields/fields.yml new file mode 100644 index 00000000000..f5692c552cc --- /dev/null +++ b/packages/kafka/data_stream/consumer/fields/fields.yml @@ -0,0 +1,31 @@ +- name: kafka.consumer + type: group + description: Consumer metrics from Kafka Consumer JMX + fields: + - name: mbean + description: Mbean that this event is related to + type: keyword + - name: fetch_rate + description: The minimum rate at which the consumer sends fetch requests to a broker + type: float + - name: bytes_consumed + description: The average number of bytes consumed for a specific topic per second + type: float + - name: records_consumed + description: The average number of records consumed per second for a specific topic + type: float + - name: in.bytes_per_sec + description: The rate of bytes coming in to the consumer + type: float + - name: max_lag + description: The maximum consumer lag + type: float + - name: zookeeper_commits + description: The rate of offset commits to ZooKeeper + type: float + - name: kafka_commits + description: The rate of offset commits to Kafka + type: float + - name: messages_in + description: The rate of consumer message consumption + type: float \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/fields/package-fields.yml b/packages/kafka/data_stream/consumer/fields/package-fields.yml new file mode 100644 index 00000000000..1fcb81ebd42 --- /dev/null +++ b/packages/kafka/data_stream/consumer/fields/package-fields.yml @@ -0,0 +1,40 @@ +- name: kafka + type: group + fields: + - name: broker + type: group + fields: + - name: id + type: long + description: | + Broker id + - name: address + type: keyword + #Reason to add as dimension field: Multiple brokers may exist in a Kafka cluster. + dimension: true + description: | + Broker advertised address + - name: topic.name + type: keyword + #Reason to add as dimension field: Multiple values of topics exist. + dimension: true + description: | + Topic name + - name: topic.error.code + type: long + description: | + Topic error code. + - name: partition.id + type: long + description: | + Partition id. + - name: partition.topic_id + type: keyword + #Reason to add as dimension field: Multiple records exist for the same kafka.partition.id, kafka.topic.name + dimension: true + description: Unique id of the partition in the topic. + - name: partition.topic_broker_id + type: keyword + #Reason to add as dimension field: For future use. + dimension: true + description: Unique id of the partition in the topic and the broker. \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/manifest.yml b/packages/kafka/data_stream/consumer/manifest.yml new file mode 100644 index 00000000000..51e9bc18624 --- /dev/null +++ b/packages/kafka/data_stream/consumer/manifest.yml @@ -0,0 +1,25 @@ +title: Kafka consumer metrics +type: metrics +streams: + - input: kafka/metrics + title: Kafka consumer metrics + description: Collect Kafka consumer metrics + vars: + - name: jolokia_hosts + type: text + title: Address of Jolokia agent installed in Kafka + multi: true + required: true + show_user: true + default: + - 'localhost:8774' + - name: processors + type: yaml + title: Processors + multi: false + required: false + show_user: false + description: > + Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details. +elasticsearch: + index_mode: "time_series" \ No newline at end of file diff --git a/packages/kafka/data_stream/consumer/sample_event.json b/packages/kafka/data_stream/consumer/sample_event.json new file mode 100644 index 00000000000..8cd8fb4dcf3 --- /dev/null +++ b/packages/kafka/data_stream/consumer/sample_event.json @@ -0,0 +1,36 @@ +{ + "@timestamp": "2024-11-26T05:51:59.816Z", + "agent": { + "ephemeral_id": "f45cfc11-360f-4f24-8fe1-68b586fd9e1f", + "id": "fd4ec33d-d5ff-483b-aa6e-e08f8d0210f5", + "name": "EPINPUNW06A8", + "type": "metricbeat", + "version": "8.15.4" + }, + "ecs": { + "version": "8.0.0" + }, + "event": { + "agent_id_status": "verified", + "dataset": "kafka.consumer", + "duration": 4557113, + "ingested": "2024-11-26T05:51:59Z", + "module": "kafka" + }, + "kafka": { + "consumer": { + "bytes_consumed": 83, + "fetch_rate": 2.0042522, + "mbean": "kafka.consumer:client-id=console-consumer,type=consumer-fetch-manager-metrics", + "records_consumed": 4 + } + }, + "metricset": { + "name": "consumer", + "period": 10000 + }, + "service": { + "address": "http://127.0.0.1:8774/jolokia/", + "type": "kafka" + } +} \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/agent/stream/stream.yml.hbs b/packages/kafka/data_stream/producer/agent/stream/stream.yml.hbs new file mode 100644 index 00000000000..01563a0c0db --- /dev/null +++ b/packages/kafka/data_stream/producer/agent/stream/stream.yml.hbs @@ -0,0 +1,28 @@ +metricsets: ["producer"] +period: {{period}} +hosts: +{{#each jolokia_hosts}} + - {{this}} +{{/each}} +{{#if ssl.certificate}} +ssl.certificate: {{ssl.certificate}} +{{/if}} +{{#if ssl.certificate_authorities}} +ssl.certificate_authorities: {{ssl.certificate_authorities}} +{{/if}} +{{#if ssl.key}} +ssl.key: {{ssl.key}} +{{/if}} +{{#if ssl.key_passphrase}} +ssl.key_passphrase: {{ssl.key_passphrase}} +{{/if}} +{{#if ssl.verification_mode}} +ssl.verification_mode: {{ssl.verification_mode}} +{{/if}} +{{#if ssl.ca_trusted_fingerprint}} +ssl.ca_trusted_fingerprint: {{ssl.ca_trusted_fingerprint}} +{{/if}} +{{#if processors}} +processors: +{{processors}} +{{/if}} \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/fields/agent.yml b/packages/kafka/data_stream/producer/fields/agent.yml new file mode 100644 index 00000000000..4bb954b4a8e --- /dev/null +++ b/packages/kafka/data_stream/producer/fields/agent.yml @@ -0,0 +1,94 @@ +- name: cloud + title: Cloud + group: 2 + description: Fields related to the cloud or infrastructure the events are coming from. + footnote: 'Examples: If Metricbeat is running on an EC2 host and fetches data from its host, the cloud info contains the data about this machine. If Metricbeat runs on a remote machine outside the cloud and fetches data from a service running in the cloud, the field contains cloud data from the machine the service is running on.' + type: group + fields: + - name: account.id + level: extended + type: keyword + ignore_above: 1024 + dimension: true + description: 'The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier.' + example: 666777888999 + - name: availability_zone + level: extended + type: keyword + ignore_above: 1024 + dimension: true + description: Availability zone in which this host is running. + example: us-east-1c + - name: instance.id + level: extended + type: keyword + ignore_above: 1024 + description: Instance ID of the host machine. + example: i-1234567890abcdef0 + dimension: true + - name: provider + level: extended + type: keyword + ignore_above: 1024 + description: Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean. + example: aws + dimension: true + - name: region + level: extended + type: keyword + ignore_above: 1024 + dimension: true + description: Region in which this host is running. + example: us-east-1 + - name: image.id + type: keyword + description: Image ID for the cloud instance. +- name: container + title: Container + group: 2 + description: 'Container fields are used for meta information about the specific container that is the source of information. These fields help correlate data based containers from any runtime.' + type: group + fields: + - name: id + level: core + type: keyword + ignore_above: 1024 + description: Unique container id. + dimension: true +- name: host + title: Host + group: 2 + description: 'A host is defined as a general computing instance. ECS host.* fields should be populated with details about the host on which the event happened, or from which the measurement was taken. Host types include hardware, virtual machines, Docker containers, and Kubernetes nodes.' + type: group + fields: + - name: name + level: core + type: keyword + ignore_above: 1024 + dimension: true + description: 'Name of the host. It can contain what `hostname` returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use.' + - name: containerized + type: boolean + description: > + If the host is a container. + + - name: os.build + type: keyword + example: "18D109" + description: > + OS build information. + + - name: os.codename + type: keyword + example: "stretch" + description: > + OS codename, if any. + +- name: agent + title: Agent + type: group + fields: + - name: id + type: keyword + ignore_above: 1024 + dimension: true \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/fields/base-fields.yml b/packages/kafka/data_stream/producer/fields/base-fields.yml new file mode 100644 index 00000000000..aa3bee91819 --- /dev/null +++ b/packages/kafka/data_stream/producer/fields/base-fields.yml @@ -0,0 +1,20 @@ +- name: data_stream.type + type: constant_keyword + description: Data stream type. +- name: data_stream.dataset + type: constant_keyword + description: Data stream dataset. +- name: data_stream.namespace + type: constant_keyword + description: Data stream namespace. +- name: '@timestamp' + type: date + description: Event timestamp. +- name: event.module + type: constant_keyword + description: Event module + value: kafka +- name: event.dataset + type: constant_keyword + description: Event dataset + value: kafka.producer \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/fields/ecs.yml b/packages/kafka/data_stream/producer/fields/ecs.yml new file mode 100644 index 00000000000..5b5d8e642e7 --- /dev/null +++ b/packages/kafka/data_stream/producer/fields/ecs.yml @@ -0,0 +1,3 @@ +- external: ecs + name: service.address + dimension: true \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/fields/fields.yml b/packages/kafka/data_stream/producer/fields/fields.yml new file mode 100644 index 00000000000..d16fa186080 --- /dev/null +++ b/packages/kafka/data_stream/producer/fields/fields.yml @@ -0,0 +1,49 @@ +- name: kafka.producer + type: group + description: Producer metrics from Kafka Producer JMX + fields: + - name: mbean + description: Mbean that this event is related to + type: keyword + - name: available_buffer_bytes + description: The total amount of buffer memory + type: float + - name: batch_size_avg + description: The average number of bytes sent + type: float + - name: batch_size_max + description: The maximum number of bytes sent + type: long + - name: record_send_rate + description: The average number of records sent per second + type: float + - name: record_retry_rate + description: The average number of retried record sends per second + type: float + - name: record_error_rate + description: The average number of retried record sends per second + type: float + - name: records_per_request + description: The average number of records sent per second + type: float + - name: record_size_avg + description: The average record size + type: float + - name: record_size_max + description: The maximum record size + type: long + - name: request_rate + description: The number of producer requests per second + type: float + - name: response_rate + description: The number of producer responses per second + type: float + - name: io_wait + description: The producer I/O wait time + type: float + - name: out.bytes_per_sec + description: The rate of bytes going out for the producer + type: float + - name: message_rate + description: The producer message rate + type: float \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/fields/package-fields.yml b/packages/kafka/data_stream/producer/fields/package-fields.yml new file mode 100644 index 00000000000..1fcb81ebd42 --- /dev/null +++ b/packages/kafka/data_stream/producer/fields/package-fields.yml @@ -0,0 +1,40 @@ +- name: kafka + type: group + fields: + - name: broker + type: group + fields: + - name: id + type: long + description: | + Broker id + - name: address + type: keyword + #Reason to add as dimension field: Multiple brokers may exist in a Kafka cluster. + dimension: true + description: | + Broker advertised address + - name: topic.name + type: keyword + #Reason to add as dimension field: Multiple values of topics exist. + dimension: true + description: | + Topic name + - name: topic.error.code + type: long + description: | + Topic error code. + - name: partition.id + type: long + description: | + Partition id. + - name: partition.topic_id + type: keyword + #Reason to add as dimension field: Multiple records exist for the same kafka.partition.id, kafka.topic.name + dimension: true + description: Unique id of the partition in the topic. + - name: partition.topic_broker_id + type: keyword + #Reason to add as dimension field: For future use. + dimension: true + description: Unique id of the partition in the topic and the broker. \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/manifest.yml b/packages/kafka/data_stream/producer/manifest.yml new file mode 100644 index 00000000000..bbb8b24815d --- /dev/null +++ b/packages/kafka/data_stream/producer/manifest.yml @@ -0,0 +1,25 @@ +title: Kafka producer metrics +type: metrics +streams: + - input: kafka/metrics + title: Kafka producer metrics + description: Collect Kafka producer metrics + vars: + - name: jolokia_hosts + type: text + title: Address of Jolokia agent installed in Kafka + multi: true + required: true + show_user: true + default: + - 'localhost:8775' + - name: processors + type: yaml + title: Processors + multi: false + required: false + show_user: false + description: > + Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details. +elasticsearch: + index_mode: "time_series" \ No newline at end of file diff --git a/packages/kafka/data_stream/producer/sample_event.json b/packages/kafka/data_stream/producer/sample_event.json new file mode 100644 index 00000000000..cef07e1bb3d --- /dev/null +++ b/packages/kafka/data_stream/producer/sample_event.json @@ -0,0 +1,45 @@ +{ + "@timestamp": "2024-11-26T05:49:08.893Z", + "agent": { + "ephemeral_id": "f45cfc11-360f-4f24-8fe1-68b586fd9e1f", + "id": "fd4ec33d-d5ff-483b-aa6e-e08f8d0210f5", + "name": "EPINPUNW06A8", + "type": "metricbeat", + "version": "8.15.4" + }, + "ecs": { + "version": "8.0.0" + }, + "event": { + "agent_id_status": "verified", + "dataset": "kafka.producer", + "duration": 3162940, + "ingested": "2024-11-26T05:49:09Z", + "module": "kafka" + }, + "kafka": { + "producer": { + "available_buffer_bytes": 33554432, + "batch_size_avg": 83, + "batch_size_max": 84, + "io_wait": 1271956426, + "mbean": "kafka.producer:client-id=console-producer,type=producer-metrics", + "record_error_rate": 0, + "record_retry_rate": 0, + "record_send_rate": 0, + "record_size_avg": 100, + "record_size_max": 101, + "records_per_request": 1, + "request_rate": 0, + "response_rate": 0 + } + }, + "metricset": { + "name": "producer", + "period": 10000 + }, + "service": { + "address": "http://127.0.0.1:8775/jolokia/", + "type": "kafka" + } +} \ No newline at end of file diff --git a/packages/kafka/docs/README.md b/packages/kafka/docs/README.md index ba753d94a80..546902897a3 100644 --- a/packages/kafka/docs/README.md +++ b/packages/kafka/docs/README.md @@ -349,3 +349,198 @@ Please refer to the following [document](https://www.elastic.co/guide/en/ecs/cur | kafka.topic.error.code | Topic error code. | long | | | kafka.topic.name | Topic name | keyword | | | service.address | Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets). | keyword | | + + +### consumer + +The `consumer` dataset collects JMX metrics from Kafka consumers using Jolokia. + +An example event for `consumer` looks as following: + +```json +{ + "@timestamp": "2024-11-26T05:51:59.816Z", + "agent": { + "ephemeral_id": "f45cfc11-360f-4f24-8fe1-68b586fd9e1f", + "id": "fd4ec33d-d5ff-483b-aa6e-e08f8d0210f5", + "name": "EPINPUNW06A8", + "type": "metricbeat", + "version": "8.15.4" + }, + "ecs": { + "version": "8.0.0" + }, + "event": { + "agent_id_status": "verified", + "dataset": "kafka.consumer", + "duration": 4557113, + "ingested": "2024-11-26T05:51:59Z", + "module": "kafka" + }, + "kafka": { + "consumer": { + "bytes_consumed": 83, + "fetch_rate": 2.0042522, + "mbean": "kafka.consumer:client-id=console-consumer,type=consumer-fetch-manager-metrics", + "records_consumed": 4 + } + }, + "metricset": { + "name": "consumer", + "period": 10000 + }, + "service": { + "address": "http://127.0.0.1:8774/jolokia/", + "type": "kafka" + } +} +``` + +**ECS Field Reference** + +Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. + +**Exported fields** + +| Field | Description | Type | +|---|---|---| +| @timestamp | Event timestamp. | date | +| agent.id | | keyword | +| cloud.account.id | The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier. | keyword | +| cloud.availability_zone | Availability zone in which this host is running. | keyword | +| cloud.image.id | Image ID for the cloud instance. | keyword | +| cloud.instance.id | Instance ID of the host machine. | keyword | +| cloud.provider | Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean. | keyword | +| cloud.region | Region in which this host is running. | keyword | +| container.id | Unique container id. | keyword | +| data_stream.dataset | Data stream dataset. | constant_keyword | +| data_stream.namespace | Data stream namespace. | constant_keyword | +| data_stream.type | Data stream type. | constant_keyword | +| event.dataset | Event dataset | constant_keyword | +| event.module | Event module | constant_keyword | +| host.containerized | If the host is a container. | boolean | +| host.name | Name of the host. It can contain what `hostname` returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use. | keyword | +| host.os.build | OS build information. | keyword | +| host.os.codename | OS codename, if any. | keyword | +| kafka.broker.address | Broker advertised address | keyword | +| kafka.broker.id | Broker id | long | +| kafka.consumer.bytes_consumed | The average number of bytes consumed for a specific topic per second | float | +| kafka.consumer.fetch_rate | The minimum rate at which the consumer sends fetch requests to a broker | float | +| kafka.consumer.in.bytes_per_sec | The rate of bytes coming in to the consumer | float | +| kafka.consumer.kafka_commits | The rate of offset commits to Kafka | float | +| kafka.consumer.max_lag | The maximum consumer lag | float | +| kafka.consumer.mbean | Mbean that this event is related to | keyword | +| kafka.consumer.messages_in | The rate of consumer message consumption | float | +| kafka.consumer.records_consumed | The average number of records consumed per second for a specific topic | float | +| kafka.consumer.zookeeper_commits | The rate of offset commits to ZooKeeper | float | +| kafka.partition.id | Partition id. | long | +| kafka.partition.topic_broker_id | Unique id of the partition in the topic and the broker. | keyword | +| kafka.partition.topic_id | Unique id of the partition in the topic. | keyword | +| kafka.topic.error.code | Topic error code. | long | +| kafka.topic.name | Topic name | keyword | +| service.address | Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets). | keyword | + + +### producer + +The `producer` dataset collects JMX metrics from Kafka producers using Jolokia. + +An example event for `producer` looks as following: + +```json +{ + "@timestamp": "2024-11-26T05:49:08.893Z", + "agent": { + "ephemeral_id": "f45cfc11-360f-4f24-8fe1-68b586fd9e1f", + "id": "fd4ec33d-d5ff-483b-aa6e-e08f8d0210f5", + "name": "EPINPUNW06A8", + "type": "metricbeat", + "version": "8.15.4" + }, + "ecs": { + "version": "8.0.0" + }, + "event": { + "agent_id_status": "verified", + "dataset": "kafka.producer", + "duration": 3162940, + "ingested": "2024-11-26T05:49:09Z", + "module": "kafka" + }, + "kafka": { + "producer": { + "available_buffer_bytes": 33554432, + "batch_size_avg": 83, + "batch_size_max": 84, + "io_wait": 1271956426, + "mbean": "kafka.producer:client-id=console-producer,type=producer-metrics", + "record_error_rate": 0, + "record_retry_rate": 0, + "record_send_rate": 0, + "record_size_avg": 100, + "record_size_max": 101, + "records_per_request": 1, + "request_rate": 0, + "response_rate": 0 + } + }, + "metricset": { + "name": "producer", + "period": 10000 + }, + "service": { + "address": "http://127.0.0.1:8775/jolokia/", + "type": "kafka" + } +} +``` + +**ECS Field Reference** + +Please refer to the following [document](https://www.elastic.co/guide/en/ecs/current/ecs-field-reference.html) for detailed information on ECS fields. + +**Exported fields** + +| Field | Description | Type | +|---|---|---| +| @timestamp | Event timestamp. | date | +| agent.id | | keyword | +| cloud.account.id | The cloud account or organization id used to identify different entities in a multi-tenant environment. Examples: AWS account id, Google Cloud ORG Id, or other unique identifier. | keyword | +| cloud.availability_zone | Availability zone in which this host is running. | keyword | +| cloud.image.id | Image ID for the cloud instance. | keyword | +| cloud.instance.id | Instance ID of the host machine. | keyword | +| cloud.provider | Name of the cloud provider. Example values are aws, azure, gcp, or digitalocean. | keyword | +| cloud.region | Region in which this host is running. | keyword | +| container.id | Unique container id. | keyword | +| data_stream.dataset | Data stream dataset. | constant_keyword | +| data_stream.namespace | Data stream namespace. | constant_keyword | +| data_stream.type | Data stream type. | constant_keyword | +| event.dataset | Event dataset | constant_keyword | +| event.module | Event module | constant_keyword | +| host.containerized | If the host is a container. | boolean | +| host.name | Name of the host. It can contain what `hostname` returns on Unix systems, the fully qualified domain name, or a name specified by the user. The sender decides which value to use. | keyword | +| host.os.build | OS build information. | keyword | +| host.os.codename | OS codename, if any. | keyword | +| kafka.broker.address | Broker advertised address | keyword | +| kafka.broker.id | Broker id | long | +| kafka.partition.id | Partition id. | long | +| kafka.partition.topic_broker_id | Unique id of the partition in the topic and the broker. | keyword | +| kafka.partition.topic_id | Unique id of the partition in the topic. | keyword | +| kafka.producer.available_buffer_bytes | The total amount of buffer memory | float | +| kafka.producer.batch_size_avg | The average number of bytes sent | float | +| kafka.producer.batch_size_max | The maximum number of bytes sent | long | +| kafka.producer.io_wait | The producer I/O wait time | float | +| kafka.producer.mbean | Mbean that this event is related to | keyword | +| kafka.producer.message_rate | The producer message rate | float | +| kafka.producer.out.bytes_per_sec | The rate of bytes going out for the producer | float | +| kafka.producer.record_error_rate | The average number of retried record sends per second | float | +| kafka.producer.record_retry_rate | The average number of retried record sends per second | float | +| kafka.producer.record_send_rate | The average number of records sent per second | float | +| kafka.producer.record_size_avg | The average record size | float | +| kafka.producer.record_size_max | The maximum record size | long | +| kafka.producer.records_per_request | The average number of records sent per second | float | +| kafka.producer.request_rate | The number of producer requests per second | float | +| kafka.producer.response_rate | The number of producer responses per second | float | +| kafka.topic.error.code | Topic error code. | long | +| kafka.topic.name | Topic name | keyword | +| service.address | Address where data about this service was collected from. This should be a URI, network address (ipv4:port or [ipv6]:port) or a resource path (sockets). | keyword | diff --git a/packages/kafka/manifest.yml b/packages/kafka/manifest.yml index 03904dc4a7d..fa2e05b6bb2 100644 --- a/packages/kafka/manifest.yml +++ b/packages/kafka/manifest.yml @@ -1,7 +1,7 @@ format_version: "3.0.2" name: kafka title: Kafka -version: "1.18.4" +version: "1.19.0" description: Collect logs and metrics from Kafka servers with Elastic Agent. type: integration categories: