Skip to content

Commit

Permalink
updated sampling daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
solo2abera committed Nov 13, 2024
1 parent e11b7b4 commit 8b1d84d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 104 deletions.
93 changes: 46 additions & 47 deletions xprof/xprof.rb.in
Original file line number Diff line number Diff line change
Expand Up @@ -739,46 +739,52 @@ def gm_rename_folder
thapi_trace_dir_root
end

SIGRTMIN = 34
SIG_SAMPLING_READY = SIGRTMIN
SIG_SAMPLING_FINISH = SIGRTMIN + 1
class SamplingDaemon
SIGRTMIN = 34
SIG_SAMPLING_READY = SIGRTMIN
SIG_SAMPLING_FINISH = SIGRTMIN + 1

def start_sampling_daemon(parent_pid)
puts "Starting sampling daemon for parent process PID #{parent_pid}"
sampling_daemon_pid = spawn("/home/sbekele/sampling_daemon/bin/sampling_daemon #{parent_pid}")
Process.detach(sampling_daemon_pid)
sampling_daemon_pid
end
attr_reader :pid

# Wait for the READY signal from the sampling daemon
def wait_for_ready_signal
received_ready = false
Signal.trap(SIG_SAMPLING_READY) do
puts "Received READY signal from sampling daemon"
received_ready = true
def initialize
@pid = nil
end
sleep(0.1) while !received_ready # Wait loop until READY signal is received
end

# Send the FINISH signal to terminate the sampling daemon
def send_finish_signal(sampling_daemon_pid)
Process.kill(SIG_SAMPLING_FINISH, sampling_daemon_pid)
puts "Sent FINISH signal to sampling daemon PID #{sampling_daemon_pid}"
def start(parent_pid)
return unless sampling?

daemon_path = "#{__dir__}/sampling_daemon"
raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path)
@pid = spawn("#{daemon_path} #{parent_pid}")
Process.detach(@pid)

wait_for_ready_signal
end

def finalize
return unless @pid

Process.kill(SIG_SAMPLING_FINISH, @pid)
LOGGER.debug("Sent FINISH signal to sampling daemon PID #{@pid}")
end

private

def wait_for_ready_signal
received_ready = false
Signal.trap(SIG_SAMPLING_READY) do
received_ready = true
end
sleep(0.1) while !received_ready # Wait until READY signal is received
end
end


def trace_and_on_node_processing(usr_argv)
def teardown_lttng(syncd, pids, sampling_daemon_pid = nil)
# We need to be sure that all the local ranks are finished
def teardown_lttng(syncd, sampling_daemon, pids)
syncd.local_barrier('waiting_for_application_ending')

# Everything from now on, is some local-master processing
# The `Sync_daemon` context will handle the call to the global barrier
# for the early exiting ranks
return unless mpi_local_master?

#stop_sampling_daemon(sampling_daemon_pid) if sampling_daemon_pid
# Stop Lttng session and babeltrace daemons
lm_lttng_teardown_session
if OPTIONS[:archive]
LOGGER.debug("Waiting for babeltrace_thapi and dirwatch (#{pids}) to finish")
Expand All @@ -787,47 +793,40 @@ def trace_and_on_node_processing(usr_argv)
XprofExitCode.update(status, "babeltrace_thapi or dirwatch #{pid}")
end
end
# we can kill the session daemon
lm_lttng_kill_sessiond
sampling_daemon.finalize if sampling_daemon
end

SyncDaemon.open do |syncd|
# Load Tracers and APILoaders Lib
sampling_daemon = nil
sampling_daemon = SamplingDaemon.new if sampling?
sampling_daemon&.start(Process.pid)

backends, h = env_tracers

# All ranks need to set the LLTTNG_HOME env
# so they can have access to the daemon
ENV['LTTNG_HOME'] = lttng_home_dir
# Only local master spawn LTTNG daemon and start session
pids = if mpi_local_master?
lm_setup_lttng(backends)
lm_babeltrace(backends) if OPTIONS[:archive]
end
end

syncd.local_barrier('waiting_for_lttng_setup')

if sampling?
sampling_daemon_pid = start_sampling_daemon(Process.pid)
puts "Started sampling daemon with PID #{sampling_daemon_pid}"
wait_for_ready_signal
end
# Launch User Command
begin
XprofExitCode.update(launch_usr_bin(h, usr_argv), usr_argv.join(' '))
rescue Errno::ENOENT
teardown_lttng(syncd, pids)
teardown_lttng(syncd, sampling_daemon, pids)
raise
end
send_finish_signal(sampling_daemon_pid) if sampling_daemon_pid
teardown_lttng(syncd, pids, sampling_daemon_pid)

teardown_lttng(syncd, sampling_daemon, pids)
return unless mpi_local_master?

# Preprocess trace
lm_babeltrace(backends) unless OPTIONS[:archive]
lm_move_to_shared
end
# Global master rename the unique trace folder to a more
# human friendly name

gm_rename_folder if mpi_master?
end

Expand Down
91 changes: 34 additions & 57 deletions ze/sampling_daemon.c
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
#include "sampling_daemon.h"
#include "../sampling/thapi_sampling.h"
#include "uthash.h"
#include "utlist.h"
#include "ze.h.include"
#include "ze_build.h"
#include "ze_profiling.h"
#include "ze_properties.h"
#include "ze_sampling.h"
#include "ze_structs_tracepoints.h"
#include "ze_tracepoints.h"
#include "zel_structs_tracepoints.h"
#include "zel_tracepoints.h"
#include "zes_structs_tracepoints.h"
#include "zes_tracepoints.h"
#include "zet_structs_tracepoints.h"
#include "zet_tracepoints.h"
#include "zex_structs_tracepoints.h"
#include "zex_tracepoints.h"
#include <dlfcn.h>
#include <errno.h>
#include <ffi.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>
#include <signal.h>
#include <unistd.h>
#include <stdbool.h>

#define SIG_SAMPLING_READY SIGRTMIN
#define SIG_SAMPLING_READY (SIGRTMIN)
#define SIG_SAMPLING_FINISH (SIGRTMIN + 1)

#define ZES_INIT_PTR zesInit_ptr
Expand Down Expand Up @@ -261,7 +244,7 @@ static void find_ze_symbols(void *handle, int verbose) {
if (!ZES_MEMORY_GET_BANDWIDTH_PTR && verbose)
fprintf(stderr, "Missing symbol zesMemoryGetBandwidth!\n");
}

volatile bool running = true;
thapi_sampling_handle_t _sampling_handle = NULL;
static int _sampling_freq_initialized = 0;
static int _sampling_fabricPorts_initialized = 0;
Expand All @@ -285,22 +268,21 @@ static uint32_t **_sampling_powerDomainCounts = NULL;
static uint32_t **_sampling_engineCounts = NULL;

////////////////////////////////////////////
#define _ZE_ERROR_MSG(NAME, RES) \
do { \
fprintf(stderr, "%s() failed at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \
} while (0)
#define _ZE_ERROR_MSG_NOTERMINATE(NAME, RES) \
do { \
fprintf(stderr, "%s() error at %d(%s): res=%x\n", (NAME), __LINE__, __FILE__, (RES)); \
} while (0)
#define _ERROR_MSG(MSG) \
{ \
perror((MSG)) do { \
{ \
perror((MSG)); \
fprintf(stderr, "errno=%d at %d(%s)", errno, __LINE__, __FILE__); \
} \
while (0)
#define _ZE_ERROR_MSG(NAME,RES) do {\
fprintf(stderr,"%s() failed at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\
} while (0)
#define _ZE_ERROR_MSG_NOTERMINATE(NAME,RES) do {\
fprintf(stderr,"%s() error at %d(%s): res=%x\n",(NAME),__LINE__,__FILE__,(RES));\
} while (0)
#define _ERROR_MSG(MSG) do {\
perror((MSG));fprintf(stderr, "errno=%d at %d(%s)\n", errno, __LINE__, __FILE__);\
} while (0)
#define _USAGE_MSG(MSG, ARGV0) do {\
fprintf(stderr, "Usage: %s %s\n", (ARGV0), (MSG));\
} while (0)
#define _DL_ERROR_MSG() do {\
fprintf(stderr, "dlopen error: %s at %d(%s)\n", dlerror(), __LINE__, __FILE__);\
} while(0)

static void intializeFrequency() {
ze_result_t res;
Expand Down Expand Up @@ -753,7 +735,6 @@ static void thapi_sampling_energy() {
}
}
}
volatile bool running = true;
void process_sampling() {

struct timespec interval;
Expand All @@ -771,24 +752,24 @@ void cleanup_sampling() {

void signal_handler(int signum) {
if (signum == SIG_SAMPLING_FINISH) {
printf("Received FINISH signal, stopping daemon...\n");
// running = false;
cleanup_sampling();
running = false;
}
}

int main(int argc, char **argv) {

fprintf(stderr, "Entering Main.\n");
if (argc < 2) {
fprintf(stderr, "Usage: %s <parent_pid>\n", argv[0]);
_USAGE_MSG("<parent_pid>", argv[0]);
return 1;
}
int parent_pid = atoi(argv[1]);
if (parent_pid <= 0) {
_ERROR_MSG("Invalid or missing parent PID. A positive integer is required.");
return 1;
}
int verbose = 0;
fprintf(stderr, "Thapi sampling init.\n");
thapi_sampling_init();
thapi_sampling_init();// Initialize sampling

// Load necessary libraries
void *handle = NULL;
Expand All @@ -800,27 +781,23 @@ int main(int argc, char **argv) {
}

if (!handle) {
fprintf(stderr, "Failure: could not load ze library!\n");
_DL_ERROR_MSG();
return 1;
}

// Initialize daemon
//Find zes symbols
find_ze_symbols(handle, verbose);
fprintf(stderr, "Initialize the system.\n");
//Initialize device and telemetry handles
initializeHandles();
fprintf(stderr, "Daemon initialized and entering signal loop.\n");
// Run the signal loop
signal(SIG_SAMPLING_FINISH, signal_handler);
if (parent_pid > 0) {
kill(parent_pid, SIG_SAMPLING_READY);
fprintf(stderr, "Daemon sent READY signal to parent PID %d\n", parent_pid);

if (kill(parent_pid, SIG_SAMPLING_READY) != 0) {
_ERROR_MSG("Failed to send READY signal to parent");
}
fprintf(stderr, "Daemon waiting for signals in signal_loop.\n");
// Clearunningnup before exiting
// Process_sampling loop until SIG_SAMPLING_FINISH signal
while (running) {
process_sampling(); // Wait for a signal to be received
process_sampling();
}
dlclose(handle);
printf("Daemon exiting \n");
return 0;
}

0 comments on commit 8b1d84d

Please sign in to comment.