Skip to content

Commit

Permalink
Workers utilization charts (netdata#12807)
Browse files Browse the repository at this point in the history
* initial version of worker utilization

* working example

* without mutexes

* monitoring DBENGINE, ACLKSYNC, WEB workers

* added charts to monitor worker usage

* fixed charts units

* updated contexts

* updated priorities

* added documentation

* converted threads to stacked chart

* One query per query thread

* Revert "One query per query thread"

This reverts commit 6aeb391.

* fixed priority for web charts

* read worker cpu utilization from proc

* read workers cpu utilization via /proc/self/task/PID/stat, so that we have cpu utilization even when the jobs are too long to finish within our update_every frequency

* disabled web server cpu utilization monitoring - it is now monitored by worker utilization

* tight integration of worker utilization to web server

* monitoring statsd worker threads

* code cleanup and renaming of variables

* contrained worker and statistics conflict to just one variable

* support for rendering jobs per type

* better priorities and removed the total jobs chart

* added busy time in ms per job type

* added proc.plugin monitoring, switch clock to MONOTONIC_RAW if available, global statistics now cleans up old worker threads

* isolated worker thread families

* added cgroups.plugin workers

* remove unneeded dimensions when then expected worker is just one

* plugins.d and streaming monitoring

* rebased; support worker_is_busy() to be called one after another

* added diskspace plugin monitoring

* added tc.plugin monitoring

* added ML threads monitoring

* dont create dimensions and charts that are not needed

* fix crash when job types are added on the fly

* added timex and idlejitter plugins; collected heartbeat statistics; reworked heartbeat according to the POSIX

* the right name is heartbeat for this chart

* monitor streaming senders

* added streaming senders to global stats

* prevent division by zero

* added clock_init() to external C plugins

* added freebsd and macos plugins

* added freebsd and macos to global statistics

* dont use new as a variable; address compiler warnings on FreeBSD and MacOS

* refactored contexts to be unique; added health threads monitoring

Co-authored-by: Stelios Fragkakis <[email protected]>
  • Loading branch information
ktsaou and stelfrag authored May 9, 2022
1 parent 0b3ee50 commit eb216a1
Show file tree
Hide file tree
Showing 42 changed files with 2,070 additions and 1,096 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ set(LIBNETDATA_FILES
libnetdata/string/utf8.h
libnetdata/socket/security.c
libnetdata/socket/security.h
libnetdata/worker_utilization/worker_utilization.c
libnetdata/worker_utilization/worker_utilization.h
libnetdata/circular_buffer/circular_buffer.c
libnetdata/circular_buffer/circular_buffer.h)

Expand Down
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ LIBNETDATA_FILES = \
libnetdata/health/health.c \
libnetdata/health/health.h \
libnetdata/string/utf8.h \
libnetdata/worker_utilization/worker_utilization.c \
libnetdata/worker_utilization/worker_utilization.h \
$(NULL)

if ENABLE_PLUGIN_EBPF
Expand Down
16 changes: 16 additions & 0 deletions aclk/aclk_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
{
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
if (aclk_query_handlers[i].type == query->type) {
worker_is_busy(i);

debug(D_ACLK, "Processing Queued Message of type: \"%s\"", aclk_query_handlers[i].name);
aclk_query_handlers[i].fnc(query_thr, query);
if (aclk_stats_enabled) {
Expand All @@ -361,6 +363,8 @@ static void aclk_query_process_msg(struct aclk_query_thread *query_thr, aclk_que
ACLK_STATS_UNLOCK;
}
aclk_query_free(query);

worker_is_idle();
return;
}
}
Expand All @@ -378,21 +382,33 @@ int aclk_query_process_msgs(struct aclk_query_thread *query_thr)
return 0;
}

static void worker_aclk_register(void) {
worker_register("ACLKQUERY");
for (int i = 0; aclk_query_handlers[i].type != UNKNOWN; i++) {
worker_register_job_name(i, aclk_query_handlers[i].name);
}
}

/**
* Main query processing thread
*/
void *aclk_query_main_thread(void *ptr)
{
worker_aclk_register();

struct aclk_query_thread *query_thr = ptr;

while (!netdata_exit) {
aclk_query_process_msgs(query_thr);

worker_is_idle();
QUERY_THREAD_LOCK;
if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
QUERY_THREAD_UNLOCK;
}

worker_unregister();
return NULL;
}

Expand Down
4 changes: 1 addition & 3 deletions collectors/all.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,8 @@

#define NETDATA_CHART_PRIO_CHECKS 99999

#define NETDATA_CHART_PRIO_NETDATA_DISKSPACE 132020
#define NETDATA_CHART_PRIO_NETDATA_TIMEX 132030
#define NETDATA_CHART_PRIO_NETDATA_TC_CPU 135000
#define NETDATA_CHART_PRIO_NETDATA_TC_TIME 135001
#define NETDATA_CHART_PRIO_NETDATA_TC_TIME 1000100


#endif //NETDATA_ALL_H
2 changes: 2 additions & 0 deletions collectors/apps.plugin/apps_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4124,6 +4124,8 @@ static int check_capabilities() {
int main(int argc, char **argv) {
// debug_flags = D_PROCFILE;

clocks_init();

pagesize = (size_t)sysconf(_SC_PAGESIZE);

// set the name for logging
Expand Down
102 changes: 62 additions & 40 deletions collectors/cgroups.plugin/sys_fs_cgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -2646,11 +2646,26 @@ static inline void discovery_process_cgroup(struct cgroup *cg) {
read_cgroup_network_interfaces(cg);
}

#define WORKER_DISCOVERY_INIT 0
#define WORKER_DISCOVERY_FIND 1
#define WORKER_DISCOVERY_PROCESS 2
#define WORKER_DISCOVERY_UPDATE 3
#define WORKER_DISCOVERY_CLEANUP 4
#define WORKER_DISCOVERY_COPY 5
#define WORKER_DISCOVERY_SHARE 6
#define WORKER_DISCOVERY_LOCK 7

#if WORKER_UTILIZATION_MAX_JOB_TYPES < 8
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 8
#endif

static inline void discovery_find_all_cgroups() {
debug(D_CGROUP, "searching for cgroups");

worker_is_busy(WORKER_DISCOVERY_INIT);
discovery_mark_all_cgroups_as_unavailable();

worker_is_busy(WORKER_DISCOVERY_FIND);
if (!cgroup_use_unified_cgroups) {
discovery_find_all_cgroups_v1();
} else {
Expand All @@ -2659,16 +2674,25 @@ static inline void discovery_find_all_cgroups() {

struct cgroup *cg;
for (cg = discovered_cgroup_root; cg; cg = cg->discovered_next) {
worker_is_busy(WORKER_DISCOVERY_PROCESS);
discovery_process_cgroup(cg);
}

worker_is_busy(WORKER_DISCOVERY_UPDATE);
discovery_update_filenames();

worker_is_busy(WORKER_DISCOVERY_LOCK);
uv_mutex_lock(&cgroup_root_mutex);

worker_is_busy(WORKER_DISCOVERY_CLEANUP);
discovery_cleanup_all_cgroups();

worker_is_busy(WORKER_DISCOVERY_COPY);
discovery_copy_discovered_cgroups_to_reader();

uv_mutex_unlock(&cgroup_root_mutex);

worker_is_busy(WORKER_DISCOVERY_SHARE);
discovery_share_cgroups_with_ebpf();

debug(D_CGROUP, "done searching for cgroups");
Expand All @@ -2678,7 +2702,19 @@ void cgroup_discovery_worker(void *ptr)
{
UNUSED(ptr);

worker_register("CGROUPSDISC");
worker_register_job_name(WORKER_DISCOVERY_INIT, "init");
worker_register_job_name(WORKER_DISCOVERY_FIND, "find");
worker_register_job_name(WORKER_DISCOVERY_PROCESS, "process");
worker_register_job_name(WORKER_DISCOVERY_UPDATE, "update");
worker_register_job_name(WORKER_DISCOVERY_CLEANUP, "cleanup");
worker_register_job_name(WORKER_DISCOVERY_COPY, "copy");
worker_register_job_name(WORKER_DISCOVERY_SHARE, "share");
worker_register_job_name(WORKER_DISCOVERY_LOCK, "lock");

while (!netdata_exit) {
worker_is_idle();

uv_mutex_lock(&discovery_thread.mutex);
while (!discovery_thread.start_discovery)
uv_cond_wait(&discovery_thread.cond_var, &discovery_thread.mutex);
Expand All @@ -2692,6 +2728,7 @@ void cgroup_discovery_worker(void *ptr)
}

discovery_thread.exited = 1;
worker_unregister();
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -4650,6 +4687,8 @@ void update_cgroup_charts(int update_every) {
// cgroups main

static void cgroup_main_cleanup(void *ptr) {
worker_unregister();

struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

Expand Down Expand Up @@ -4687,24 +4726,30 @@ static void cgroup_main_cleanup(void *ptr) {
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
}

#define WORKER_CGROUPS_LOCK 0
#define WORKER_CGROUPS_READ 1
#define WORKER_CGROUPS_CHART 2

#if WORKER_UTILIZATION_MAX_JOB_TYPES < 3
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 3
#endif

void *cgroups_main(void *ptr) {
netdata_thread_cleanup_push(cgroup_main_cleanup, ptr);
worker_register("CGROUPS");
worker_register_job_name(WORKER_CGROUPS_LOCK, "lock");
worker_register_job_name(WORKER_CGROUPS_READ, "read");
worker_register_job_name(WORKER_CGROUPS_READ, "chart");

struct rusage thread;
netdata_thread_cleanup_push(cgroup_main_cleanup, ptr);

if (getenv("KUBERNETES_SERVICE_HOST") != NULL && getenv("KUBERNETES_SERVICE_PORT") != NULL) {
is_inside_k8s = 1;
cgroup_enable_cpuacct_cpu_shares = CONFIG_BOOLEAN_YES;
}

// when ZERO, attempt to do it
int vdo_cpu_netdata = config_get_boolean("plugin:cgroups", "cgroups plugin resource charts", 1);

read_cgroup_plugin_configuration();
netdata_cgroup_ebpf_initialize_shm();

RRDSET *stcpu_thread = NULL;

if (uv_mutex_init(&cgroup_root_mutex)) {
error("CGROUP: cannot initialize mutex for the main cgroup list");
goto exit;
Expand Down Expand Up @@ -4736,6 +4781,8 @@ void *cgroups_main(void *ptr) {
usec_t find_every = cgroup_check_for_new_every * USEC_PER_SEC, find_dt = 0;

while(!netdata_exit) {
worker_is_idle();

usec_t hb_dt = heartbeat_next(&hb, step);
if(unlikely(netdata_exit)) break;

Expand All @@ -4747,46 +4794,21 @@ void *cgroups_main(void *ptr) {
cgroups_check = 0;
}

worker_is_busy(WORKER_CGROUPS_LOCK);
uv_mutex_lock(&cgroup_root_mutex);
read_all_discovered_cgroups(cgroup_root);
update_cgroup_charts(cgroup_update_every);
uv_mutex_unlock(&cgroup_root_mutex);

// --------------------------------------------------------------------

if(vdo_cpu_netdata) {
getrusage(RUSAGE_THREAD, &thread);

if(unlikely(!stcpu_thread)) {

stcpu_thread = rrdset_create_localhost(
"netdata"
, "plugin_cgroups_cpu"
, NULL
, "cgroups"
, NULL
, "Netdata CGroups Plugin CPU usage"
, "milliseconds/s"
, PLUGIN_CGROUPS_NAME
, "stats"
, 132000
, cgroup_update_every
, RRDSET_TYPE_STACKED
);
worker_is_busy(WORKER_CGROUPS_READ);
read_all_discovered_cgroups(cgroup_root);

rrddim_add(stcpu_thread, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
rrddim_add(stcpu_thread, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
}
else
rrdset_next(stcpu_thread);
worker_is_busy(WORKER_CGROUPS_CHART);
update_cgroup_charts(cgroup_update_every);

rrddim_set(stcpu_thread, "user" , thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
rrddim_set(stcpu_thread, "system", thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
rrdset_done(stcpu_thread);
}
worker_is_idle();
uv_mutex_unlock(&cgroup_root_mutex);
}

exit:
worker_unregister();
netdata_thread_cleanup_pop(1);
return NULL;
}
1 change: 1 addition & 0 deletions collectors/cups.plugin/cups_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ void reset_metrics() {
}

int main(int argc, char **argv) {
clocks_init();

// ------------------------------------------------------------------------
// initialization of netdata plugin
Expand Down
Loading

0 comments on commit eb216a1

Please sign in to comment.