Skip to content

Commit

Permalink
Dump parallel failure to log unless "debug" is "true"
Browse files Browse the repository at this point in the history
  • Loading branch information
oerc0122 committed May 7, 2024
1 parent 8b803ce commit 02ce952
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 68 deletions.
38 changes: 38 additions & 0 deletions _test/test_mpi/job_dispatcher_common_tests.m
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,44 @@ function my_delete(varargin)
assertTrue(is_file(file3a));
end

function test_fail_condense(obj, varargin)

common_param = struct('filepath', obj.working_dir, ...
'filename_template', ...
['test_JD_', obj.cluster_name, {'A','B','C'}, 'L%d_nf%d.txt'], ...
'fail_for_labsN', 2);

file1 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'A', 'L1_nf1.txt']);
file2 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'B', 'L2_nf1.txt']);
file3 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'C', 'L3_nf1.txt']);

jd = JobDispatcher(['test_job_', obj.cluster_name, '_fail_condense']);
log_filename = sprintf("par_fail_%s.log", jd.job_id);

files = {file1, file2, file3, log_filename};
co = onCleanup(@()(obj.my_delete(files{:})));


[outputs, n_failed, ~, jd] = jd.start_job('JETester', common_param, 36, true, 3, true, 1);

clob = set_temporary_config_options(parallel_config, 'debug', true);
out_full = evalc("jd.display_fail_job_results(outputs, n_failed,3)");

clear clob;

clob = set_temporary_config_options(parallel_config, 'debug', false);
out_silenced = evalc("jd.display_fail_job_results(outputs, n_failed,3)");

assertTrue(contains(out_full, 'failed'));
assertTrue(isempty(out_silenced));

log = fileread(log_filename);

assertEqual(log, out_full);


end

function test_job_with_logs_3workers(obj, varargin)
if obj.ignore_test
skipTest('test_job_with_logs_3workers is disabled');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,107 +1,142 @@
function display_fail_jobs_(obj,outputs,n_failed,n_workers,Err_code)
% Auxiliary method to display job results if the job have
% failed
function display_fail_jobs_(obj, outputs, n_failed, n_workers, err_code)
% Auxiliary method to display job results if the job has failed
%
% Input:
% Outputs -- usually cellarray of the results, returned by a
% outputs -- usually cellarray of the results, returned by a
% parallel job
%
% n_failed -- number of tasks failed as the result of parallel
% job
%
% n_workers-- number of labs used by parallel job initially
%
% Err_code -- the text string in the form
% err_code -- the text string in the form
% ERROR_CLASS:error_reason to form identifier of
% the exception to throw
% If this paraemter is empty, it does not throw anything.
% If this parameter is empty, it does not throw anything.
% Throws:
% First exception returned from the cluster if such exceptions
% are present or exception with Err_code as MExeption.identifier
% are present or exception with err_code as MException.identifier
% if no errors returned
%

mEXceptions_outputs = false(size(outputs));
debug_flag = get(parallel_config, 'debug');

if ~debug_flag
log = sprintf("par_fail_%s.log", obj.job_id);
fh = fopen(log, 'w');
end

if ~exist('err_code', 'var') || isempty(err_code)
warn_code = 'HORACE:display_fail_jobs:parallel_failure';
else
warn_code = err_code;
end

MExceptions_outputs = false(size(outputs));

if iscell(outputs)
fprintf('Job %s have failed. Outputs: \n',obj.job_id);
fprintf('Job %s failed. Outputs: \n', obj.job_id);

for i=1:numel(outputs)
if isa(outputs{i},'MException')
mEXceptions_outputs(i) = true;
fprintf('Task N%d failed. Error %s; Message %s\n',...
i,outputs{i}.identifier,outputs{i}.message);
elseif isfield(outputs{i},'error') && isa(outputs{i}.error,'MException')
mEXceptions_outputs(i) = true;
fprintf('Task N%d failed. Reason: %s\n',...
i,outputs{i}.fail_reason);

if isa(outputs{i}, 'MException')
MExceptions_outputs(i) = true;
debug_print('Task %d failed. Error %s; Message %s\n', ...
i, outputs{i}.identifier, outputs{i}.message);

elseif isfield(outputs{i}, 'error') && isa(outputs{i}.error, 'MException')
MExceptions_outputs(i) = true;
debug_print('Task %d failed. Reason: %s\n', ...
i, outputs{i}.fail_reason);

else
mEXceptions_outputs(i) = false;
fprintf('Task N%d failed. Outputs: \n',i);
MExceptions_outputs(i) = false;
debug_print('Task %d failed. Outputs: \n', i);

if isempty(outputs{i})
fprintf('[]\n');
debug_print('[]\n');
else
disp(outputs{i});
debug_print(disp2str(outputs{i}));
end
end
end
elseif isempty(mEXceptions_outputs)

elseif isempty(outputs)
ext_type = class(outputs);
fprintf('Job %s have failed sending unhandled exception: %s\n',obj.job_id,ext_type);
if ~isempty(Err_code)
error(Err_code,'Parallel job have failed throwing unhandled exception: %s',ext_type);
end
debug_print('Job %s failed with unhandled exception: %s\n', obj.job_id, ext_type);

error(err_code, 'Parallel job failed with unhandled exception: %s', ext_type);
else
mEXceptions_outputs(1) = isa(outputs,'MException');
fprintf('Job %s have failed. Output: \n',obj.job_id);
disp(outputs);

MExceptions_outputs(1) = isa(outputs, 'MException');
debug_print('Job %s failed. Output: \n', obj.job_id);
debug_print(disp2str(outputs));

if numel(outputs) == 1
disp_exception(outputs);
end

end
if any(mEXceptions_outputs)
if isempty(Err_code)
warn_code = 'DISPLAY_FAIL_JOBS:parallel_failure';
else
warn_code = Err_code;
end
warning(warn_code ,...
' Number: %d parallel tasks out of total: %d tasks have failed',...
n_failed,n_workers)
errOutputs = outputs(mEXceptions_outputs);
if iscell(errOutputs)
for i=1:numel(errOutputs)
disp(['***** Error output N ',num2str(i)]);
disp_exception(errOutputs{i});

if any(MExceptions_outputs)

warning(warn_code, '%d of %d tasks have failed', n_failed, n_workers)

err_outputs = outputs(MExceptions_outputs);

if iscell(err_outputs)
for i=1:numel(err_outputs)
debug_print(['***** Error output N ', num2str(i)]);
disp_exception(err_outputs{i});
end
else
disp_exception(errOutputs);
disp_exception(err_outputs);
end
if ~isempty(Err_code)
error(Err_code,'Parallel job have failed, producing errors above.');
end
else
if ~isempty(Err_code)
error(Err_code,...
' Number: %d parallel tasks out of total: %d tasks have failed without returning the reason',...
n_failed,n_workers)
if ~isempty(err_code)
error(err_code, 'Parallel job failed, producing the errors above.');
end
elseif ~isempty(err_code)
error(err_code, ...
'%d of %d tasks have failed without returning the reason', ...
n_failed, n_workers)
end

function disp_exception(errOutput)
%
if isa(errOutput,'MException')
disp(getReport(errOutput))
elseif iscell(errOutput)
disp('***************************************************************');
disp(errOutput);
for i=1:numel(errOutput)
sprintf(' Cell %d, contains: %s\n',i,evalc('disp(errOutput{i}'));
disp_exception(errOutput{i});
if ~debug_flag
fclose(fh);
end


function disp_exception(err_output)

if isa(err_output, 'MException')
debug_print(disp2str(getReport(err_output)))

elseif iscell(err_output)

for i=1:numel(err_output)
debug_print(' Cell %d, contains: \n', i);
disp_exception(err_output{i});
end
disp('***************************************************************');
elseif isfield(errOutput,'error') && isa(errOutput.error,'MException')
for i=1:numel(errOutput.error)
disp(getReport(errOutput.error(i)));

elseif isfield(err_output, 'error') && isa(err_output.error, 'MException')

for i=1:numel(err_output.error)
debug_print(disp2str(getReport(err_output.error(i))));
end

else
disp('unknown type of error returned')
debug_print('unknown type of error: \n %s', disp2str(err_output));

end

end

function debug_print(varargin)
if debug_flag
fprintf(varargin{:});
else
fprintf(fh, varargin{:});
end
end

end

0 comments on commit 02ce952

Please sign in to comment.