From 7c9641d134f4f97361e505d4344a50a96188f469 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Fri, 17 May 2019 22:17:21 +0000 Subject: [PATCH 1/2] First try at adding a recovery mechanism for sbatch commands that erroneously report failure after a successful job submission. --- lib/workflowmgr/slurmbatchsystem.rb | 43 +++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index a0e68c1..ab1f59e 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -18,6 +18,7 @@ class SLURMBatchSystem < BatchSystem require 'etc' require 'parsedate' require 'libxml' + erquire 'securerandom' require 'workflowmgr/utilities' ##################################################### @@ -253,6 +254,10 @@ def submit(task) input += "#SBATCH #{value}\n" end + # Add secret identifier for later retrieval + randomID = SecureRandom.hex + input += "#SBATCH #{randomID}\n" + # Add export commands to pass environment vars to the job unless task.envars.empty? varinput='' @@ -278,6 +283,44 @@ def submit(task) # Parse the output of the submit command if output=~/^Submitted batch job (\d+)/ return $1,output + elsif output=~/Batch job submission failed: Socket timed out on send\/recv operation/ + begin + # Get the username of this process + username=Etc.getpwuid(Process.uid).name + + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32",45) + + # Raise SchedulerDown if the command failed + raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + + # Return if the output is empty + return nil,output if queued_jobs.empty? + + rescue Timeout::Error,WorkflowMgr::SchedulerDown + WorkflowMgr.log("#{$!}") + WorkflowMgr.stderr("#{$!}",3) + raise WorkflowMgr::SchedulerDown + end + + # Make sure queued_jobs is properly encoded + if String.method_defined? :encode + queued_jobs = queued_jobs.encode('UTF-8', 'binary', {:invalid => :replace, :undef => :replace, :replace => ''}) + end + + # Look for a job that matches the randomID we inserted into the comment + queued_jobs.split("\n").each { |job| + + # Extract job id + jobid=job[0..39].strip + + # Extract randomID + if randomID == job[40..71].strip + WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out") + WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out".1) + return jobid, output + end + } + else return nil,output end From fac5d7441d6a832a4c61ce5817cf75519c7bfc2b Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Wed, 29 May 2019 17:52:44 +0000 Subject: [PATCH 2/2] Fix bugs in recovery of jobids when Slurm sbatch fails with socket error. --- lib/workflowmgr/slurmbatchsystem.rb | 27 +++++++++++++++++++-------- lib/workflowmgr/workflowengine.rb | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index ab1f59e..ea45aa7 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -18,7 +18,7 @@ class SLURMBatchSystem < BatchSystem require 'etc' require 'parsedate' require 'libxml' - erquire 'securerandom' + require 'securerandom' require 'workflowmgr/utilities' ##################################################### @@ -256,7 +256,7 @@ def submit(task) # Add secret identifier for later retrieval randomID = SecureRandom.hex - input += "#SBATCH #{randomID}\n" + input += "#SBATCH --comment=#{randomID}\n" # Add export commands to pass environment vars to the job unless task.envars.empty? @@ -275,7 +275,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}", 4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() @@ -283,12 +283,18 @@ def submit(task) # Parse the output of the submit command if output=~/^Submitted batch job (\d+)/ return $1,output - elsif output=~/Batch job submission failed: Socket timed out on send\/recv operation/ + elsif output=~/Batch job submission failed: Socket timed out/ + + WorkflowMgr.stderr("WARNING: '#{output}', looking to see if job was submitted anyway...", 1) + queued_jobs="" + errors="" + exit_status=0 begin + # Get the username of this process username=Etc.getpwuid(Process.uid).name - - queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32",45) + + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32", 45) # Raise SchedulerDown if the command failed raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 @@ -310,18 +316,23 @@ def submit(task) # Look for a job that matches the randomID we inserted into the comment queued_jobs.split("\n").each { |job| + # Skip headers + next if job=~/CLUSTER/ + next if job=~/JOBID/ + # Extract job id jobid=job[0..39].strip # Extract randomID if randomID == job[40..71].strip - WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out") - WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out".1) + WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out") + WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out",1) return jobid, output end } else + WorkflowMgr.stderr("WARNING: job submission failed: #{output}", 1) return nil,output end diff --git a/lib/workflowmgr/workflowengine.rb b/lib/workflowmgr/workflowengine.rb index 2ff8b17..9ca3294 100644 --- a/lib/workflowmgr/workflowengine.rb +++ b/lib/workflowmgr/workflowengine.rb @@ -1615,7 +1615,7 @@ def expire_cycles expired_cycles.each do |cycle| @active_jobs.keys.each do |taskname| next if @active_jobs[taskname][cycle.cycle].nil? - unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" + unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" || @active_jobs[taskname][cycle.cycle].state == "SUBMITTING" @logServer.log(cycle.cycle,"Deleting #{taskname} job #{@active_jobs[taskname][cycle.cycle].id} because this cycle has expired!") @bqServer.delete(@active_jobs[taskname][cycle.cycle].id) end