Skip to content

Commit

Permalink
update grafana dashboard and change kafka output counts
Browse files Browse the repository at this point in the history
  • Loading branch information
dtrai2 committed Oct 27, 2023
1 parent 75846a5 commit ecf2fc1
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 23 deletions.
7 changes: 4 additions & 3 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
import json
from datetime import datetime
from functools import cached_property, partial
from logging import Logger
from socket import getfqdn
from typing import Optional

from attrs import define, field, validators
from confluent_kafka import KafkaException, Producer

from logprep.abc.output import CriticalOutputError, FatalOutputError, Output
from logprep.metrics.metrics import GaugeMetric
from logprep.metrics.metrics import GaugeMetric, Metric
from logprep.util.validators import keys_in_validator

DEFAULTS = {
Expand Down Expand Up @@ -241,10 +240,10 @@ def store(self, document: dict) -> Optional[bool]:
configured input
"""
self.store_custom(document, self._config.topic)
self.metrics.number_of_processed_events += 1
if self.input_connector:
self.input_connector.batch_finished_callback()

@Metric.measure_time()
def store_custom(self, document: dict, target: str) -> None:
"""Write document to Kafka into target topic.
Expand All @@ -263,6 +262,7 @@ def store_custom(self, document: dict, target: str) -> None:
try:
self._producer.produce(target, value=self._encoder.encode(document))
self._producer.poll(self._config.send_timeout)
self.metrics.number_of_processed_events += 1
except BufferError:
# block program until buffer is empty
self._producer.flush(timeout=self._config.flush_timeout)
Expand All @@ -271,6 +271,7 @@ def store_custom(self, document: dict, target: str) -> None:
self, f"Error storing output document -> {error}", document
) from error

@Metric.measure_time()
def store_failed(
self, error_message: str, document_received: dict, document_processed: dict
) -> None:
Expand Down
Loading

0 comments on commit ecf2fc1

Please sign in to comment.