Skip to content

Commit

Permalink
Merge pull request #60 from christopherwharrop/feature/slurm-recover-…
Browse files Browse the repository at this point in the history
…jobid

Feature/slurm recover jobid
  • Loading branch information
christopherwharrop authored May 29, 2019
2 parents 6301be4 + fac5d74 commit 76485a5
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
56 changes: 55 additions & 1 deletion lib/workflowmgr/slurmbatchsystem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class SLURMBatchSystem < BatchSystem
require 'etc'
require 'parsedate'
require 'libxml'
require 'securerandom'
require 'workflowmgr/utilities'

#####################################################
Expand Down Expand Up @@ -253,6 +254,10 @@ def submit(task)
input += "#SBATCH #{value}\n"
end

# Add secret identifier for later retrieval
randomID = SecureRandom.hex
input += "#SBATCH --comment=#{randomID}\n"

# Add export commands to pass environment vars to the job
unless task.envars.empty?
varinput=''
Expand All @@ -270,15 +275,64 @@ def submit(task)
tf.write(input)
tf.flush()

WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4)
WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}", 4)

# Run the submit command
output=`#{cmd} < #{tf.path} 2>&1`.chomp()

# Parse the output of the submit command
if output=~/^Submitted batch job (\d+)/
return $1,output
elsif output=~/Batch job submission failed: Socket timed out/

WorkflowMgr.stderr("WARNING: '#{output}', looking to see if job was submitted anyway...", 1)
queued_jobs=""
errors=""
exit_status=0
begin

# Get the username of this process
username=Etc.getpwuid(Process.uid).name

queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32", 45)

# Raise SchedulerDown if the command failed
raise WorkflowMgr::SchedulerDown,errors unless exit_status==0

# Return if the output is empty
return nil,output if queued_jobs.empty?

rescue Timeout::Error,WorkflowMgr::SchedulerDown
WorkflowMgr.log("#{$!}")
WorkflowMgr.stderr("#{$!}",3)
raise WorkflowMgr::SchedulerDown
end

# Make sure queued_jobs is properly encoded
if String.method_defined? :encode
queued_jobs = queued_jobs.encode('UTF-8', 'binary', {:invalid => :replace, :undef => :replace, :replace => ''})
end

# Look for a job that matches the randomID we inserted into the comment
queued_jobs.split("\n").each { |job|

# Skip headers
next if job=~/CLUSTER/
next if job=~/JOBID/

# Extract job id
jobid=job[0..39].strip

# Extract randomID
if randomID == job[40..71].strip
WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out")
WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out",1)
return jobid, output
end
}

else
WorkflowMgr.stderr("WARNING: job submission failed: #{output}", 1)
return nil,output
end

Expand Down
2 changes: 1 addition & 1 deletion lib/workflowmgr/workflowengine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,7 @@ def expire_cycles
expired_cycles.each do |cycle|
@active_jobs.keys.each do |taskname|
next if @active_jobs[taskname][cycle.cycle].nil?
unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED"
unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" || @active_jobs[taskname][cycle.cycle].state == "SUBMITTING"
@logServer.log(cycle.cycle,"Deleting #{taskname} job #{@active_jobs[taskname][cycle.cycle].id} because this cycle has expired!")
@bqServer.delete(@active_jobs[taskname][cycle.cycle].id)
end
Expand Down

0 comments on commit 76485a5

Please sign in to comment.