diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index a0e68c1..ea45aa7 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -18,6 +18,7 @@ class SLURMBatchSystem < BatchSystem require 'etc' require 'parsedate' require 'libxml' + require 'securerandom' require 'workflowmgr/utilities' ##################################################### @@ -253,6 +254,10 @@ def submit(task) input += "#SBATCH #{value}\n" end + # Add secret identifier for later retrieval + randomID = SecureRandom.hex + input += "#SBATCH --comment=#{randomID}\n" + # Add export commands to pass environment vars to the job unless task.envars.empty? varinput='' @@ -270,7 +275,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}", 4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() @@ -278,7 +283,56 @@ def submit(task) # Parse the output of the submit command if output=~/^Submitted batch job (\d+)/ return $1,output + elsif output=~/Batch job submission failed: Socket timed out/ + + WorkflowMgr.stderr("WARNING: '#{output}', looking to see if job was submitted anyway...", 1) + queued_jobs="" + errors="" + exit_status=0 + begin + + # Get the username of this process + username=Etc.getpwuid(Process.uid).name + + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32", 45) + + # Raise SchedulerDown if the command failed + raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + + # Return if the output is empty + return nil,output if queued_jobs.empty? + + rescue Timeout::Error,WorkflowMgr::SchedulerDown + WorkflowMgr.log("#{$!}") + WorkflowMgr.stderr("#{$!}",3) + raise WorkflowMgr::SchedulerDown + end + + # Make sure queued_jobs is properly encoded + if String.method_defined? :encode + queued_jobs = queued_jobs.encode('UTF-8', 'binary', {:invalid => :replace, :undef => :replace, :replace => ''}) + end + + # Look for a job that matches the randomID we inserted into the comment + queued_jobs.split("\n").each { |job| + + # Skip headers + next if job=~/CLUSTER/ + next if job=~/JOBID/ + + # Extract job id + jobid=job[0..39].strip + + # Extract randomID + if randomID == job[40..71].strip + WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out") + WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out",1) + return jobid, output + end + } + else + WorkflowMgr.stderr("WARNING: job submission failed: #{output}", 1) return nil,output end diff --git a/lib/workflowmgr/workflowengine.rb b/lib/workflowmgr/workflowengine.rb index 2ff8b17..9ca3294 100644 --- a/lib/workflowmgr/workflowengine.rb +++ b/lib/workflowmgr/workflowengine.rb @@ -1615,7 +1615,7 @@ def expire_cycles expired_cycles.each do |cycle| @active_jobs.keys.each do |taskname| next if @active_jobs[taskname][cycle.cycle].nil? - unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" + unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" || @active_jobs[taskname][cycle.cycle].state == "SUBMITTING" @logServer.log(cycle.cycle,"Deleting #{taskname} job #{@active_jobs[taskname][cycle.cycle].id} because this cycle has expired!") @bqServer.delete(@active_jobs[taskname][cycle.cycle].id) end