Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

force send on maximum byte size #28

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 56 additions & 18 deletions loki_logger_handler/loki_logger_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import Queue as queue # Python 2.7

import atexit
import json
import logging
import threading

import requests

from loki_logger_handler.formatters.logger_formatter import LoggerFormatter
Expand All @@ -30,11 +32,11 @@ def __init__(
timeout=10,
compressed=True,
default_formatter=LoggerFormatter(),
max_stream_size=0,
enable_self_errors=False,
enable_structured_loki_metadata=False,
loki_metadata=None,
loki_metadata_keys=None

loki_metadata_keys=None,
):
"""
Initialize the LokiLoggerHandler object.
Expand All @@ -47,7 +49,11 @@ def __init__(
message_in_json_format (bool): Whether to format log values as JSON.
timeout (int, optional): Timeout interval for flushing logs. Defaults to 10 seconds.
compressed (bool, optional): Whether to compress the logs using gzip. Defaults to True.
default_formatter (logging.Formatter, optional): Formatter for the log records. If not provided, LoggerFormatter or LoguruFormatter will be used.
default_formatter (logging.Formatter, optional): Formatter for the log records. If not provided,
LoggerFormatter or LoguruFormatter will be used.
max_stream_size (int, optional): Max stream size in bytes to forcefully send to server instead of
waiting for timeout. If max stream size is 0 forcefully sending by byte size is disabled.
Defaults to 0.
enable_self_errors (bool, optional): Set to True to show Hanlder errors on console. Default False
enable_structured_loki_metadata (bool, optional): Whether to include structured loki_metadata in the logs. Defaults to False. Only supported for Loki 3.0 and above
loki_metadata (dict, optional): Default loki_metadata values. Defaults to None. Only supported for Loki 3.0 and above
Expand Down Expand Up @@ -83,12 +89,16 @@ def __init__(
self.flush_thread.start()

self.message_in_json_format = message_in_json_format
self.max_stream_size = max_stream_size
self._current_stream_size = 0

# Halndler working with errors
self.error = False
self.enable_structured_loki_metadata = enable_structured_loki_metadata
self.loki_metadata = loki_metadata
self.loki_metadata_keys = loki_metadata_keys if loki_metadata_keys is not None else []
self.loki_metadata_keys = (
loki_metadata_keys if loki_metadata_keys is not None else []
)

def emit(self, record):
"""
Expand All @@ -101,7 +111,7 @@ def emit(self, record):
formatted_record, log_loki_metadata = self.formatter.format(record)
self._put(formatted_record, log_loki_metadata)
except Exception as e:
self.handle_unexpected_error(e)
self.handle_unexpected_error(e)

def _flush(self):
"""
Expand All @@ -111,32 +121,31 @@ def _flush(self):
atexit.register(self._send)

while True:

# Wait until flush_event is set or timeout elapses
self.flush_event.wait(timeout=self.timeout)

# Reset the event for the next cycle
self.flush_event.clear()

# Flush the logs if buffer is not empty
if not self.buffer.empty():
try:
self._send()
except Exception as e:
self.handle_unexpected_error(e)



def _send(self):
"""
Send the buffered logs to the Loki server.
"""
temp_streams = {}
self._current_stream_size = 0

while not self.buffer.empty():
log = self.buffer.get()
if log.key not in temp_streams:
stream = Stream(log.labels, self.loki_metadata,
self.message_in_json_format)
stream = Stream(
log.labels, self.loki_metadata, self.message_in_json_format
)
temp_streams[log.key] = stream

temp_streams[log.key].append_value(log.line, log.loki_metadata)
Expand All @@ -146,8 +155,7 @@ def _send(self):
try:
self.request.send(streams.serialize())
except requests.RequestException as e:
self.handle_unexpected_error(e)

self.handle_unexpected_error(e)

def write(self, message):
"""
Expand Down Expand Up @@ -194,6 +202,18 @@ def assign_labels_from_log(self, log_record, labels):
if key in log_record:
labels[key] = log_record[key]

log_line = LogLine(labels, log_record)
log_size = log_line.size()
self.buffer.put(log_line)
self._current_stream_size += log_size

if (
self.max_stream_size > 0
and self._current_stream_size >= self.max_stream_size
):
# send event to call flush_event and then unlock it without waiting timeout
self.flush_event.set()

def extract_and_clean_metadata(self, log_record, log_loki_metadata):
"""
This method iterates over the keys defined in `self.loki_metadata_keys` and checks if they are present
Expand All @@ -217,7 +237,6 @@ def extract_and_clean_metadata(self, log_record, log_loki_metadata):
for key in keys_to_delete:
del log_record[key]


def handle_unexpected_error(self, e):
"""
Handles unexpected errors by logging them and setting the error flag.
Expand All @@ -229,18 +248,18 @@ def handle_unexpected_error(self, e):
None
"""
if self.enable_self_errors:
self.debug_logger.error(
"Unexpected error: %s", e, exc_info=True)
self.debug_logger.error("Unexpected error: %s", e, exc_info=True)
self.error = True


class LogLine:
"""
Represents a single log line with associated labels.

Attributes:
labels (dict): Labels associated with the log line.
key (str): A unique key generated from the labels.
line (str): The actual log line content.
line (dict|str): The actual log line content.
"""

def __init__(self, labels, line, loki_metadata=None):
Expand All @@ -249,13 +268,32 @@ def __init__(self, labels, line, loki_metadata=None):

Args:
labels (dict): Labels associated with the log line.
line (str): The actual log line content.
line (dict|str): The actual log line content.
"""
self.labels = labels
self.key = self._key_from_labels(labels)
self.line = line
self.loki_metadata = loki_metadata

def size(self):
"""
Calculate the approximate size of the log line in bytes.

Returns:
int: The size of the log line in bytes.
"""
# Convert labels to a JSON string and calculate its size
labels_size = len(json.dumps(self.labels).encode("utf-8"))
# Calculate the size of the line
if isinstance(self.line, str):
line_size = len(self.line.encode("utf-8"))
elif isinstance(self.line, dict):
line_size = len(json.dumps(self.line).encode("utf-8"))
else:
line_size = 0
# Total size is the sum of labels size and line size
return labels_size + line_size

@staticmethod
def _key_from_labels(labels):
"""
Expand Down