Skip to content

Commit

Permalink
Fix Daemon (and integration) (#279)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Thomas Applencourt <[email protected]>
  • Loading branch information
TApplencourt and Thomas Applencourt authored Aug 21, 2024
1 parent f7b68d5 commit f3efff8
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 74 deletions.
7 changes: 3 additions & 4 deletions integration_tests/light_iprof_only_sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ ${THAPI_BIN_DIR}/sync_daemon_${THAPI_SYNC_DAEMON} $PARENT_PID &
DAEMON_PID=$!
echo "Wait for daemon to be ready"
wait_for_signal
echo "Send Local and Global Barrier signal"
echo "Send Local Barrier signal"
send_signal_blocking $RT_SIGNAL_LOCAL_BARRIER
send_signal_blocking $RT_SIGNAL_GLOBAL_BARRIER

# Run test program
"$@"

# Final synchronization after mpi_hello_world execution
echo "Send Local and Global Barrier signal"
echo "Send Local Barrier signal"
send_signal_blocking $RT_SIGNAL_LOCAL_BARRIER
echo "Send Global Barrier signal"
send_signal_blocking $RT_SIGNAL_GLOBAL_BARRIER
echo "Send Termination signal"
send_signal_blocking $RT_SIGNAL_FINISH
Expand Down
12 changes: 9 additions & 3 deletions integration_tests/parallel_execution.bats
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@ teardown_file() {
}

@test "iprof_fs" {
THAPI_SYNC_DAEMON=fs THAPI_JOBID=0 $MPIRUN -n 2 $IPROF --debug 0 -- $THAPI_TEST_BIN
THAPI_SYNC_DAEMON=fs THAPI_JOBID=0 timeout 40s $MPIRUN -n 2 $IPROF --debug 0 -- $THAPI_TEST_BIN
}

@test "sync_daemon_fs_launching_mpi_app" {
mpicc ./integration_tests/mpi_helloworld.c -o mpi_helloworld
THAPI_SYNC_DAEMON=fs THAPI_JOBID=0 timeout 20s $MPIRUN -n 2 ./integration_tests/light_iprof_only_sync.sh ./mpi_helloworld
}


@test "sync_daemon_mpi" {
THAPI_SYNC_DAEMON=mpi THAPI_JOBID=0 timeout 20s $MPIRUN -n 2 ./integration_tests/light_iprof_only_sync.sh $THAPI_TEST_BIN
}

@test "iprof_mpi" {
THAPI_SYNC_DAEMON=mpi THAPI_JOBID=0 $MPIRUN -n 2 $IPROF --debug 0 -- $THAPI_TEST_BIN
THAPI_SYNC_DAEMON=mpi THAPI_JOBID=0 timeout 40s $MPIRUN -n 2 $IPROF --debug 0 -- $THAPI_TEST_BIN
}

@test "sync_daemon_mpi_launching_mpi_app" {
mpicc ./integration_tests/mpi_helloworld.c -o mpi_helloworld
THAPI_SYNC_DAEMON=mpi THAPI_JOBID=0 timeout 40s $MPIRUN -n 2 ./integration_tests/light_iprof_only_sync.sh ./mpi_helloworld
THAPI_SYNC_DAEMON=mpi THAPI_JOBID=0 timeout 20s $MPIRUN -n 2 ./integration_tests/light_iprof_only_sync.sh ./mpi_helloworld
}
47 changes: 16 additions & 31 deletions xprof/sync_daemon_mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ int MPIX_Init_Session(MPI_Session *lib_shandle, MPI_Comm *lib_comm) {
if (flag == 0)
fprintf(stderr, "THAPI_SYNC_DAEMON_MPI Warning: Could not find key %s\n", mt_key);
else if (strcmp(out_value, mt_value))
fprintf(stderr, "THAPI_SYNC_DAEMON_MPI Warning: Did not get %s, got %s\n",
mt_value, out_value);
fprintf(stderr, "THAPI_SYNC_DAEMON_MPI Warning: Did not get %s, got %s\n", mt_value,
out_value);
}
/*
* create a group from the WORLD process set
Expand Down Expand Up @@ -77,52 +77,37 @@ int MPIX_Init_Session(MPI_Session *lib_shandle, MPI_Comm *lib_comm) {
}

int signal_loop(int parent_pid, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_NODE) {

// Required MPI info
int global_rank;
MPI_Comm_rank(MPI_COMM_WORLD_THAPI, &global_rank);
int global_size;
MPI_Comm_size(MPI_COMM_WORLD_THAPI, &global_size);
int local_size;
MPI_Comm_size(MPI_COMM_NODE, &local_size);

// Initialize signal set and add signals
sigset_t signal_set;
sigemptyset(&signal_set);
sigaddset(&signal_set, RT_SIGNAL_GLOBAL_BARRIER);
sigaddset(&signal_set, RT_SIGNAL_LOCAL_BARRIER);
sigaddset(&signal_set, RT_SIGNAL_FINISH);

sigprocmask(SIG_BLOCK, &signal_set, NULL);

// Send ready to parent
kill(parent_pid, RT_SIGNAL_READY);
// Main loop
// Non blocked signal will be handled as usual

// Processing loop:
// Should be only exited when receiving RT_SIGNAL_FINISH
while (true) {
int signum;
sigwait(&signal_set, &signum);
if (signum == RT_SIGNAL_FINISH) {
break;
} else if (signum == RT_SIGNAL_GLOBAL_BARRIER) {
// Only local master should send signal
if (global_rank != 0) {
MPI_Send(&local_size, 1, MPI_INT, 0, 0, MPI_COMM_WORLD_THAPI);
} else {
int sum_local_size_recv = local_size;
while (sum_local_size_recv != global_size) {
int local_size_recv;
MPI_Recv(&local_size_recv, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD_THAPI,
MPI_STATUS_IGNORE);
sum_local_size_recv += local_size_recv;
}
}
// Ready signal will be sent after cleaning
return 0;
} else if (signum == RT_SIGNAL_LOCAL_BARRIER) {
MPI_Barrier(MPI_COMM_NODE);
} else if (signum == RT_SIGNAL_GLOBAL_BARRIER) {
MPI_Barrier(MPI_COMM_WORLD_THAPI);
} else {
fprintf(stderr, "Wrong signal rreseved %d. Exiting", signum);
return 1;
}
kill(parent_pid, RT_SIGNAL_READY);
}
return 0;

// Unreachable
fprintf(stderr, "Wrong signal_loop exit");
return 1;
}

int main(int argc, char **argv) {
Expand Down
82 changes: 46 additions & 36 deletions xprof/xprof.rb.in
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,16 @@ class Sync_daemon
rescue StandardError
raise
ensure
f.finalize if f
return unless f
f.global_barrier
f.finalize
end
end

# __
# | _|_ _|_ ._ _ (_ _ _|_ ._
# |_ |_ |_ | | (_| __) (/_ |_ |_| |_)
# _| |
#
# | _|_ _|_ ._ _
# |_ |_ |_ | | (_|
# _|
def lttng_session_uuid
Digest::MD5.hexdigest(lttng_trace_dir_tmp)
end
Expand Down Expand Up @@ -524,7 +526,7 @@ def enable_events_metadata(channel_name, tracing_mode: 'default', profiling: tru
exec("#{lttng_enable} lttng_ust_thapi:*")
end

def setup_lttng(backends)
def lm_setup_lttng(backends)
raise unless mpi_local_master?

# Spawning the sessiond Daemon will crash
Expand Down Expand Up @@ -561,11 +563,13 @@ def setup_lttng(backends)
exec("lttng start #{lttng_session_uuid}")
end

def lttng_teardown_session
def lm_lttng_teardown_session
raise unless mpi_local_master?
exec("lttng destroy #{lttng_session_uuid}")
end

def lttng_kill_sessiond
def lm_lttng_kill_sessiond
raise unless mpi_local_master?
# Need to kill the sessiond Daemon. It's safe because each job has their own
#
# In theory, opening the lttng-sessiond.pid file is racy.
Expand All @@ -582,10 +586,7 @@ end
# |_) _. |_ _ | _|_ ._ _. _ _ )
# |_) (_| |_) (/_ | |_ | (_| (_ (/_ /_
#

# TODO: Use babeltrace_thapi as a LIB not a binary

def on_node_processing_babeltrace(backends)
def lm_babeltrace(backends)
raise unless mpi_local_master?
# No need to run babeltrace_thapi
return if OPTIONS.include?(:trace) || !OPTIONS[:analysis]
Expand All @@ -598,44 +599,53 @@ def on_node_processing_babeltrace(backends)
exec("#{BINDIR}/babeltrace_thapi #{opts.join(' ')} -- #{lttng_trace_dir_tmp}")
end

# NOTE: I don't understand `mv`
# File.mv(a, b) will put a into b (aka a/b)
# FileUtils.rename(a,b) will move a as b, but may
# raise Invalid cross-device error.
# So we use `exec(mv -T a b)`, this have the added benefice of logging
#
def on_node_processing_sync_and_rename(syncd)
# _
# |_) ._ _ _ _ _ _ o ._ _
# | | (_) (_ (/_ _> _> | | | (_|
# _|

# Some naming convension
# lm == function executed only local_master
# gm == function executed only global_master

def lm_move_to_shared
raise unless mpi_local_master?

if OPTIONS.include?(:trace) || !OPTIONS[:analysis]
# The Apps finished, lttng finished, need to move to the shared tmp folder
FileUtils.mkdir_p(File.dirname(thapi_trace_dir_tmp))
exec("mv #{lttng_trace_dir_tmp} #{thapi_trace_dir_tmp}")
else
# Babeltrace_thapi finished, can remove `tmp` folder
# `lm_babeltrace` finished, can remove `tmp` folder
FileUtils.rm_f(lttng_trace_dir_tmp)
end
end

# Global Barrier, ensure that all on_node_processessing finished
syncd.global_barrier
def gm_rename_folder
raise unless mpi_master?

# Replace mpi_job_id with a better name and update medatada,
# Only done by mpi_master
return unless mpi_master?
# All process have put their file into `thapi_trace_dir_tmp/hostname`.
# `thapi_trace_dir_tmp` is using the MPI_JOB_ID
# Replace it with a better name, and update the root metadata.

thapi_trace_dir_tmp_root = File.dirname(thapi_trace_dir_tmp)

# Because of `traced-rank`, `mpi_master` may not have any trace avalaible.
# Because of `traced-rank`, `mpi_master` may not have any trace avalaible,
# so find the first hostname who have a metadata
FileUtils.cp(Dir.glob("#{thapi_trace_dir_tmp_root}/*/thapi_metadata.yaml").first,
File.join(thapi_trace_dir_tmp_root, 'thapi_metadata.yaml'))

# NOTE: I don't understand `mv`
# File.mv(a, b) will put a into b (aka a/b)
# FileUtils.rename(a,b) will move a as b, but may
# raise Invalid cross-device error.
# So we use `exec(mv -T a b)`, this have the added benefice of logging
exec("mv -T #{thapi_trace_dir_tmp_root} #{thapi_trace_dir_root}") unless OPTIONS[:'trace-output']
thapi_trace_dir_root
end

# Start, Stop lttng, amd do the on-node analsysis
def trace_and_on_node_processing(usr_argv)
# All masters set a future global barrier
# Global barrier at exit
Sync_daemon.open do |syncd|
# Load Tracers and APILoaders Lib
backends, h = env_tracers
Expand All @@ -644,7 +654,7 @@ def trace_and_on_node_processing(usr_argv)
# so they can have access to the daemon
ENV['LTTNG_HOME'] = lttng_home_dir
# Only local master spawn LTTNG daemon and start session
setup_lttng(backends) if mpi_local_master?
lm_setup_lttng(backends) if mpi_local_master?
syncd.local_barrier('waiting_for_lttng_setup')
# Launch User Command
launch_usr_bin(h, usr_argv)
Expand All @@ -655,18 +665,18 @@ def trace_and_on_node_processing(usr_argv)
return unless mpi_local_master?

# Stop Lttng session
lttng_teardown_session
lm_lttng_teardown_session
# Lttng session is finished,
# we can kill the session daemon
lttng_kill_sessiond

lm_lttng_kill_sessiond
# Preprocess trace
on_node_processing_babeltrace(backends)
on_node_processing_sync_and_rename(syncd)
lm_babeltrace(backends)
lm_move_to_shared
end
gm_rename_folder if mpi_master?
end

def global_processing(folder)
def gm_processing(folder)
raise unless mpi_master?

LOGGER.info { "Postprocess #{folder}" }
Expand Down Expand Up @@ -830,6 +840,6 @@ if __FILE__ == $PROGRAM_NAME
folder = OPTIONS.include?(:replay) ? OPTIONS[:replay] || last_trace_saved : trace_and_on_node_processing(ARGV)
if mpi_master?
warn("THAPI: Trace location: #{folder}")
global_processing(folder) if OPTIONS[:analysis]
gm_processing(folder) if OPTIONS[:analysis]
end
end

0 comments on commit f3efff8

Please sign in to comment.