Skip to content

Commit

Permalink
Merge pull request syslog-ng#4356 from alltilla/disk-buffer-disk-usag…
Browse files Browse the repository at this point in the history
…e-metrics

disk-buffer: add disk-buffer related metrics
  • Loading branch information
MrAnno authored Mar 31, 2023
2 parents 206b253 + bc30bcd commit c7ca17a
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 7 deletions.
17 changes: 12 additions & 5 deletions modules/diskq/logqueue-disk-non-reliable.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ _move_messages_from_overflow(LogQueueDiskNonReliable *self)
{
if (_serialize_and_write_message_to_disk(self, msg))
{
log_queue_disk_update_disk_related_counters(&self->super);
log_queue_memory_usage_sub(&self->super.super, log_msg_get_size(msg));
log_msg_ack(msg, &path_options, AT_PROCESSED);
log_msg_unref(msg);
Expand Down Expand Up @@ -169,6 +170,7 @@ _move_messages_from_disk_to_qout(LogQueueDiskNonReliable *self)
g_queue_push_tail(self->qout, msg);
g_queue_push_tail(self->qout, LOG_PATH_OPTIONS_TO_POINTER(&path_options));
log_queue_memory_usage_add(&self->super.super, log_msg_get_size(msg));
log_queue_disk_update_disk_related_counters(&self->super);
}
while (HAS_SPACE_IN_QUEUE(self->qout));

Expand Down Expand Up @@ -308,6 +310,7 @@ _pop_head(LogQueue *s, LogPathOptions *path_options)
stats_update = FALSE;
}

log_queue_disk_update_disk_related_counters(&self->super);
g_mutex_unlock(&s->lock);

if (s->use_backlog)
Expand Down Expand Up @@ -389,12 +392,16 @@ static inline gboolean
_push_tail_disk(LogQueueDiskNonReliable *self, LogMessage *msg, const LogPathOptions *path_options,
GString *serialized_msg)
{
if (!_ensure_serialized_and_write_to_disk(self, msg, serialized_msg))
return FALSE;
gboolean result = _ensure_serialized_and_write_to_disk(self, msg, serialized_msg);
if (result)
{
log_msg_ack(msg, path_options, AT_PROCESSED);
log_msg_unref(msg);
}

log_msg_ack(msg, path_options, AT_PROCESSED);
log_msg_unref(msg);
return TRUE;
log_queue_disk_update_disk_related_counters(&self->super);

return result;
}

static void
Expand Down
4 changes: 4 additions & 0 deletions modules/diskq/logqueue-disk-reliable.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ _ack_backlog(LogQueue *s, gint num_msg_to_ack)
}

qdisk_ack_backlog(self->super.qdisk);
log_queue_disk_update_disk_related_counters(&self->super);
}
exit_reliable:
qdisk_reset_file_if_empty(self->super.qdisk);
Expand Down Expand Up @@ -280,6 +281,7 @@ _pop_head(LogQueue *s, LogPathOptions *path_options)
if (!s->use_backlog)
qdisk_empty_backlog(self->super.qdisk);

log_queue_disk_update_disk_related_counters(&self->super);
log_queue_queued_messages_dec(s);

if (qdisk_corrupt)
Expand Down Expand Up @@ -346,6 +348,8 @@ _push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options)
return;
}

log_queue_disk_update_disk_related_counters(&self->super);

scratch_buffers_reclaim_marked(marker);

if (_is_reserved_buffer_size_reached(self))
Expand Down
83 changes: 83 additions & 0 deletions modules/diskq/logqueue-disk.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "serialize.h"
#include "logmsg/logmsg-serialize.h"
#include "stats/stats-registry.h"
#include "stats/stats-cluster-single.h"
#include "reloc.h"
#include "qdisk.h"
#include "scratch-buffers.h"
Expand All @@ -41,6 +42,8 @@
#include <string.h>
#include <stdlib.h>

#define B_TO_KiB(x) ((x) / 1024)

QueueType log_queue_disk_type = "DISK";

gboolean
Expand Down Expand Up @@ -71,6 +74,8 @@ log_queue_disk_start(LogQueue *s)
if (self->start(self))
{
log_queue_queued_messages_add(s, log_queue_get_length(s));
log_queue_disk_update_disk_related_counters(self);
stats_counter_set(self->metrics.capacity, B_TO_KiB(qdisk_get_max_useful_space(self->qdisk)));
return TRUE;
}

Expand All @@ -84,15 +89,56 @@ log_queue_disk_get_filename(LogQueue *s)
return qdisk_get_filename(self->qdisk);
}

static void
_unregister_counters(LogQueueDisk *self)
{
stats_lock();
{
if (self->metrics.capacity_sc_key)
{
stats_unregister_counter(self->metrics.capacity_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.capacity);

stats_cluster_key_free(self->metrics.capacity_sc_key);
}

if (self->metrics.disk_usage_sc_key)
{
stats_unregister_counter(self->metrics.disk_usage_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.disk_usage);

stats_cluster_key_free(self->metrics.disk_usage_sc_key);
}

if (self->metrics.disk_allocated_sc_key)
{
stats_unregister_counter(self->metrics.disk_allocated_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.disk_allocated);

stats_cluster_key_free(self->metrics.disk_allocated_sc_key);
}
}
stats_unlock();
}

void
log_queue_disk_free_method(LogQueueDisk *self)
{
g_assert(!qdisk_started(self->qdisk));
qdisk_free(self->qdisk);

_unregister_counters(self);

log_queue_free_method(&self->super);
}

void
log_queue_disk_update_disk_related_counters(LogQueueDisk *self)
{
stats_counter_set(self->metrics.disk_usage, B_TO_KiB(qdisk_get_used_useful_space(self->qdisk)));
stats_counter_set(self->metrics.disk_allocated, B_TO_KiB(qdisk_get_file_size(self->qdisk)));
}

static gboolean
_pop_disk(LogQueueDisk *self, LogMessage **msg)
{
Expand Down Expand Up @@ -227,6 +273,42 @@ log_queue_disk_restart_corrupted(LogQueueDisk *self)
{
_restart_diskq(self);
log_queue_queued_messages_reset(&self->super);
log_queue_disk_update_disk_related_counters(self);
stats_counter_set(self->metrics.capacity, B_TO_KiB(qdisk_get_max_useful_space(self->qdisk)));
}

static void
_register_counters(LogQueueDisk *self, StatsClusterKeyBuilder *builder)
{
if (!builder)
return;

StatsClusterKeyBuilder *local_builder = stats_cluster_key_builder_clone(builder);

/* Up to 4 TiB with 32 bit atomic counters. */
stats_cluster_key_builder_set_unit(local_builder, SCU_KIB);

stats_cluster_key_builder_set_name(local_builder, "capacity_bytes");
self->metrics.capacity_sc_key = stats_cluster_key_builder_build_single(local_builder);

stats_cluster_key_builder_set_name(local_builder, "disk_usage_bytes");
self->metrics.disk_usage_sc_key = stats_cluster_key_builder_build_single(local_builder);

stats_cluster_key_builder_set_name(local_builder, "disk_allocated_bytes");
self->metrics.disk_allocated_sc_key = stats_cluster_key_builder_build_single(local_builder);

stats_lock();
{
stats_register_counter(STATS_LEVEL1, self->metrics.capacity_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.capacity);
stats_register_counter(STATS_LEVEL1, self->metrics.disk_usage_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.disk_usage);
stats_register_counter(STATS_LEVEL1, self->metrics.disk_allocated_sc_key, SC_TYPE_SINGLE_VALUE,
&self->metrics.disk_allocated);
}
stats_unlock();

stats_cluster_key_builder_free(local_builder);
}

void
Expand All @@ -249,6 +331,7 @@ log_queue_disk_init_instance(LogQueueDisk *self, DiskQueueOptions *options, cons
self->compaction = options->compaction;

self->qdisk = qdisk_new(options, qdisk_file_id, filename);
_register_counters(self, queue_sck_builder);
}

static gboolean
Expand Down
14 changes: 13 additions & 1 deletion modules/diskq/logqueue-disk.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ struct _LogQueueDisk
* LogQueueDisk should have a separate options class, which should only contain compaction, reliable, etc...
* Similarly, QDisk should have a separate options class, which should only contain disk_buf_size, mem_buf_size, etc...
*/

struct
{
StatsClusterKey *capacity_sc_key;
StatsClusterKey *disk_usage_sc_key;
StatsClusterKey *disk_allocated_sc_key;

StatsCounterItem *capacity;
StatsCounterItem *disk_usage;
StatsCounterItem *disk_allocated;
} metrics;

gboolean compaction;
gboolean (*start)(LogQueueDisk *s);
gboolean (*stop)(LogQueueDisk *s, gboolean *persistent);
Expand All @@ -57,7 +69,7 @@ void log_queue_disk_init_instance(LogQueueDisk *self, DiskQueueOptions *options,
void log_queue_disk_restart_corrupted(LogQueueDisk *self);
void log_queue_disk_free_method(LogQueueDisk *self);


void log_queue_disk_update_disk_related_counters(LogQueueDisk *self);
LogMessage *log_queue_disk_read_message(LogQueueDisk *self, LogPathOptions *path_options);
void log_queue_disk_drop_message(LogQueueDisk *self, LogMessage *msg, const LogPathOptions *path_options);
gboolean log_queue_disk_serialize_msg(LogQueueDisk *self, LogMessage *msg, GString *serialized);
Expand Down
1 change: 1 addition & 0 deletions modules/diskq/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ add_unit_test(CRITERION LIBTEST TARGET test_reliable_backlog DEPENDS disk-buffer
add_unit_test(CRITERION LIBTEST TARGET test_diskq_truncate DEPENDS m disk-buffer)
add_unit_test(CRITERION LIBTEST TARGET test_qdisk DEPENDS disk-buffer)
add_unit_test(CRITERION LIBTEST TARGET test_logqueue_disk DEPENDS disk-buffer)
add_unit_test(CRITERION LIBTEST TARGET test_diskq_counters DEPENDS disk-buffer)
12 changes: 11 additions & 1 deletion modules/diskq/tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ modules_diskq_tests_TESTS = \
modules/diskq/tests/test_diskq_truncate \
modules/diskq/tests/test_reliable_backlog \
modules/diskq/tests/test_qdisk \
modules/diskq/tests/test_logqueue_disk
modules/diskq/tests/test_logqueue_disk \
modules/diskq/tests/test_diskq_counters

check_PROGRAMS += ${modules_diskq_tests_TESTS}

Expand Down Expand Up @@ -67,3 +68,12 @@ modules_diskq_tests_test_logqueue_disk_DEPENDENCIES = \
modules_diskq_tests_test_logqueue_disk_SOURCES = \
modules/diskq/tests/test_logqueue_disk.c \
modules/diskq/tests/test_diskq_tools.h

modules_diskq_tests_test_diskq_counters_CFLAGS = $(DISKQ_TEST_C_FLAGS)
modules_diskq_tests_test_diskq_counters_LDFLAGS = $(DISKQ_TEST_LD_FLAGS)
modules_diskq_tests_test_diskq_counters_LDADD = $(DISKQ_TEST_LD_ADD)
modules_diskq_tests_test_diskq_counters_DEPENDENCIES = \
$(top_builddir)/modules/diskq/libdisk-buffer.la
modules_diskq_tests_test_diskq_counters_SOURCES = \
modules/diskq/tests/test_diskq_counters.c \
modules/diskq/tests/test_diskq_tools.h
Loading

0 comments on commit c7ca17a

Please sign in to comment.