diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 14d8006..53689c6 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,8 @@ ## New for Version 1.3.0 * Update SLURM support to handle pack groups +* Update SLURM support to map `` to --qos instead of --partition +* Update SLURM support to map `` to --partition * Update LSF support to handle additional methods LSF uses to report the exit status * All rocoto commands have the same -a, -c, -m, and -t options * The -c and -t options can now select by cycledefs and attributes (ie. final) diff --git a/VERSION b/VERSION index e8ea05d..f0bb29e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.2.4 +1.3.0 diff --git a/lib/wfmstat/wfmstatoption.rb b/lib/wfmstat/wfmstatoption.rb index 78e27f5..dc7c2cd 100644 --- a/lib/wfmstat/wfmstatoption.rb +++ b/lib/wfmstat/wfmstatoption.rb @@ -53,6 +53,11 @@ def add_opts(opts) @summary=true end + # task order + opts.on("-T","--task-sort","Sort by task") do + @taskfirst=true + end + end # add_opts def make_selection() diff --git a/lib/workflowmgr/schema_with_metatasks.rng b/lib/workflowmgr/schema_with_metatasks.rng index cd75076..d653ee6 100644 --- a/lib/workflowmgr/schema_with_metatasks.rng +++ b/lib/workflowmgr/schema_with_metatasks.rng @@ -465,14 +465,30 @@ - + - - - - - - + + + + + + + + + + + + + + + + + + + + + + @@ -518,15 +534,6 @@ - - - - - - - - - diff --git a/lib/workflowmgr/schema_without_metatasks.rng b/lib/workflowmgr/schema_without_metatasks.rng index 2b9d97c..79cab40 100644 --- a/lib/workflowmgr/schema_without_metatasks.rng +++ b/lib/workflowmgr/schema_without_metatasks.rng @@ -446,14 +446,30 @@ - + - - - - - - + + + + + + + + + + + + + + + + + + + + + + @@ -499,15 +515,6 @@ - - - - - - - - - diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 81b56b9..7005247 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -6,6 +6,7 @@ module WorkflowMgr require 'workflowmgr/batchsystem' + require 'date' ########################################## # @@ -17,6 +18,7 @@ class SLURMBatchSystem < BatchSystem require 'etc' require 'parsedate' require 'libxml' + require 'securerandom' require 'workflowmgr/utilities' ##################################################### @@ -31,43 +33,11 @@ def initialize(slurm_root=nil) # Initialize an empty hash for job accounting records @jobacct={} + @jobacct_duration=0 # Assume the scheduler is up @schedup=true - # Set heterogeneous job support to nil (it will be set once in submit) - @heterogeneous_job_support - - end - - - ##################################################### - # - # statuses - # - ##################################################### - def statuses(jobids) - - begin - - raise WorkflowMgr::SchedulerDown unless @schedup - - # Initialize statuses to UNAVAILABLE - jobStatuses={} - jobids.each do |jobid| - jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" } - end - - jobids.each do |jobid| - jobStatuses[jobid] = self.status(jobid) - end - - rescue WorkflowMgr::SchedulerDown - @schedup=false - ensure - return jobStatuses - end - end @@ -89,7 +59,13 @@ def status(jobid) return @jobqueue[jobid] if @jobqueue.has_key?(jobid) # Populate the job accounting log table if it is empty - refresh_jobacct if @jobacct.empty? + refresh_jobacct(1) if @jobacct_duration<1 + + # Return the jobacct record if there is one + return @jobacct[jobid] if @jobacct.has_key?(jobid) + + # Now re-populate over a longer history: + refresh_jobacct(5) if @jobacct_duration<5 # Return the jobacct record if there is one return @jobacct[jobid] if @jobacct.has_key?(jobid) @@ -107,41 +83,69 @@ def status(jobid) ##################################################### # - # submit + # statuses # ##################################################### - def submit(task) + def statuses(jobids) - # Check if heterogeneous jobs are supported - if @heterogeneous_job_support.nil? + begin - # Get version of sbatch being used - version,errors,exit_status=WorkflowMgr.run4("sbatch --version",30) + raise WorkflowMgr::SchedulerDown unless @schedup - # Raise SchedulerDown if the command failed - raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + # Initialize statuses to UNAVAILABLE + jobStatuses={} + jobids.each do |jobid| + jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" } + end + + # Populate the job status table if it is empty + refresh_jobqueue(jobids) if @jobqueue.empty? + + # Check to see if status info is missing for any job and populate jobacct record if necessary + if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) } + refresh_jobacct(1) if @jobacct_duration<1 - # Get first four digits of version as an integer - @version = version.gsub(/[slurm.\s]/,"")[0..3].to_i + # Check again, and re-populate over a longer history if necessary + if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) && !@jobacct.has_key?(jobid) } + refresh_jobacct(5) if @jobacct_duration<5 + end + end - # Check for heterogeneous job support - @heterogeneous_job_support = false - if @version >= 1808 - @heterogeneous_job_support = true + # Collect the statuses of the jobs + jobids.each do |jobid| + if @jobqueue.has_key?(jobid) + jobStatuses[jobid] = @jobqueue[jobid] + elsif @jobacct.has_key?(jobid) + jobStatuses[jobid] = @jobacct[jobid] + else + # We didn't find the job, so return an uknown status record + jobStatuses[jobid] = { :jobid => jobid, :state => "UNKNOWN", :native_state => "Unknown" } + end end + rescue WorkflowMgr::SchedulerDown + @schedup=false + ensure + return jobStatuses end + end + + + ##################################################### + # + # submit + # + ##################################################### + def submit(task) + # Initialize the submit command cmd="sbatch" input="#! /bin/sh\n" - per_pack_group_input="" - pack_group_nodes=Array.new - # Add Slurm batch system options translated from the generic options specification task.attributes.each do |option,value| - if value.is_a?(String) + if value.is_a?(String) if value.empty? WorkflowMgr.stderr("WARNING: <#{option}> has empty content and is ignored", 1) next @@ -149,99 +153,73 @@ def submit(task) end case option when :account - per_pack_group_input += "#SBATCH --account #{value}\n" + input += "#SBATCH --account=#{value}\n" when :queue - per_pack_group_input += "#SBATCH --qos #{value}\n" + input += "#SBATCH --qos=#{value}\n" when :partition - per_pack_group_input += "#SBATCH --partition #{value}\n" + input += "#SBATCH --partition=#{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? - if @heterogeneous_job_support - pack_group_nodes << "#SBATCH --ntasks=#{value}\n" - else - pack_group_nodes = ["#SBATCH --ntasks=#{value}\n"] - end + input += "#SBATCH --ntasks=#{value}\n" when :nodes - # Make sure exclusive access to nodes is enforced -# per_pack_group_input += "#SBATCH --exclusive\n" - - if @heterogeneous_job_support - - first_spec = true - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end - } - - # Request for this resource - pack_group_nodes << "#SBATCH --ntasks=#{nnodes*ppn} --tasks-per-node=#{ppn}\n" - - first_spec = false + # Rocoto does not currently support Slurm heterogeneous jobs. However, it does + # support requests for non-uniform processor geometries. To accomplish this, + # Rocoto will use sbatch to submit a job with the smallest uniform resource + # request that can accommodate the non-uniform request. It is up to the user to + # use the appropriate tools to manipulate the host file and use the appropriate + # MPI launcher command to specify the desired processor layout for the executable + # in the job script. + + # Get the total nodes and max ppn requested + maxppn=1 + nnodes=0 + tpp=0 + nodespecs=value.split("+") + nodespecs.each { |nodespec| + resources=nodespec.split(":") + nnodes+=resources.shift.to_i + ppn=0 + resources.each { |resource| + case resource + when /ppn=(\d+)/ + ppn=$1.to_i + when /tpp=(\d+)/ + tpp=$1.to_i + end } + maxppn=ppn if ppn > maxppn + } - else - - # This version of SLURM (< version 18.08) does not support submission of jobs - # (via sbatch) with non-uniform processor geometries. SLURM refers to these as - # "heterogenous jobs". To work around this, we will use sbatch to submit a job - # with the smallest uniform resource request that can accommodate the - # heterogeneous request. It is up to the user to use the appropriate host file - # manipulation and/or MPI launcher command to specify the desired processor layout - # for the executable in the job script. - - # Get the total nodes and max ppn requested - maxppn=1 - nnodes=0 - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes+=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end - } - maxppn=ppn if ppn > maxppn - } - - # Request total number of nodes - node_input = "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" + # Request total number of nodes + input += "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" - # Request max tasks per node - node_input += "#SBATCH --tasks-per-node=#{maxppn}\n" + # Request max tasks per node + input += "#SBATCH --tasks-per-node=#{maxppn}\n" - pack_group_nodes = [ node_input ] # ensure only one "pack group" - - # Print a warning if multiple nodespecs are specified - if nodespecs.size > 1 - WorkflowMgr.stderr("WARNING: SLURM < 18.08 does not support requests for non-unifortm task geometries",1) - WorkflowMgr.stderr("WARNING: during batch job submission You must use the -m option of the srun command",1) - WorkflowMgr.stderr("WARNING: in your script to launch your code with an arbitrary distribution of tasks",1) - WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details",1) - WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}'",1) - WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use",1) - WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.",1) - end + # Request cpus per task if only one nodespec is specified and tpp was specified + if nodespecs.size == 1 && !tpp.zero? + input += "#SBATCH --cpus-per-task=#{tpp}\n" + end + # Print a warning if multiple nodespecs are specified + if nodespecs.size > 1 + WorkflowMgr.stderr("WARNING: Rocoto does not support Slurm's heterogeneous job feature. However, Rocoto", 1) + WorkflowMgr.stderr("WARNING: does support Slurm requests for non-unifortm task geometries. It does this by", 1) + WorkflowMgr.stderr("WARNING: converting non-uniform requests into the smallest uniform task geometry", 1) + WorkflowMgr.stderr("WARNING: request that can accommodate the non-uniform one during batch job submission.", 1) + WorkflowMgr.stderr("WARNING: It is up to the user to use the -m option of the srun command, or other", 1) + WorkflowMgr.stderr("WARNING: appropriate tools, in the job script to launch executables with an arbitrary", 1) + WorkflowMgr.stderr("WARNING: distribution of tasks.", 1) + WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details", 1) + WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}' for this job", 1) + WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use", 1) + WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.", 1) end when :walltime # Make sure format is dd-hh:mm:ss if days are included - per_pack_group_input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" + input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" when :memory m=/^([\.\d]+)([\D]*)$/.match(value) amount=m[1].to_f @@ -259,7 +237,7 @@ def submit(task) amount=(amount / 1024.0 / 1024.0).ceil end if amount > 0 - per_pack_group_input += "#SBATCH --mem=#{amount}\n" + input += "#SBATCH --mem=#{amount}\n" end when :stdout input += "#SBATCH -o #{value}\n" @@ -268,24 +246,17 @@ def submit(task) when :join input += "#SBATCH -o #{value}\n" when :jobname - input += "#SBATCH --job-name #{value}\n" + input += "#SBATCH --job-name=#{value}\n" end end task.each_native do |value| - per_pack_group_input += "#SBATCH #{value}\n" + input += "#SBATCH #{value}\n" end - first=true - pack_group_nodes.each do |this_group_nodes| - if first - first=false - else - input += "\n#SBATCH packjob\n\n" - end - input += per_pack_group_input - input += this_group_nodes - end + # Add secret identifier for later retrieval + randomID = SecureRandom.hex + input += "#SBATCH --comment=#{randomID}\n" # Add export commands to pass environment vars to the job unless task.envars.empty? @@ -295,7 +266,6 @@ def submit(task) } input += varinput end - input+="set -x\n" # Add the command to execute input += task.attributes[:command] @@ -305,7 +275,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input {{#{input}}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}", 4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() @@ -313,7 +283,56 @@ def submit(task) # Parse the output of the submit command if output=~/^Submitted batch job (\d+)/ return $1,output + elsif output=~/Batch job submission failed: Socket timed out/ + + WorkflowMgr.stderr("WARNING: '#{output}', looking to see if job was submitted anyway...", 1) + queued_jobs="" + errors="" + exit_status=0 + begin + + # Get the username of this process + username=Etc.getpwuid(Process.uid).name + + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32", 45) + + # Raise SchedulerDown if the command failed + raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + + # Return if the output is empty + return nil,output if queued_jobs.empty? + + rescue Timeout::Error,WorkflowMgr::SchedulerDown + WorkflowMgr.log("#{$!}") + WorkflowMgr.stderr("#{$!}",3) + raise WorkflowMgr::SchedulerDown + end + + # Make sure queued_jobs is properly encoded + if String.method_defined? :encode + queued_jobs = queued_jobs.encode('UTF-8', 'binary', {:invalid => :replace, :undef => :replace, :replace => ''}) + end + + # Look for a job that matches the randomID we inserted into the comment + queued_jobs.split("\n").each { |job| + + # Skip headers + next if job=~/CLUSTER/ + next if job=~/JOBID/ + + # Extract job id + jobid=job[0..39].strip + + # Extract randomID + if randomID == job[40..71].strip + WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out") + WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out",1) + return jobid, output + end + } + else + WorkflowMgr.stderr("WARNING: job submission failed: #{output}", 1) return nil,output end @@ -339,7 +358,7 @@ def delete(jobid) # refresh_jobqueue # ##################################################### - def refresh_jobqueue + def refresh_jobqueue(jobids) begin @@ -350,7 +369,13 @@ def refresh_jobqueue queued_jobs="" errors="" exit_status=0 - queued_jobs,errors,exit_status=WorkflowMgr.run4("scontrol -o show job",30) + + if jobids.nil? or jobids.join(',').length>64 + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + else + joblist = jobids.join(",") + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue --jobs=#{joblist} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + end # Raise SchedulerDown if the command failed raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 @@ -372,57 +397,61 @@ def refresh_jobqueue # For each job, find the various attributes and create a job record queued_jobs.split("\n").each { |job| + # Skip headings + next if job[0..4] == 'JOBID' + next if job[0..7] == 'CLUSTER:' + # Initialize an empty job record record={} - # Look at all the attributes for this job and build the record - jobfields=Hash[job.split.collect {|f| f.split("=")}.collect{|f| f.length == 2 ? f : [f[0], '']}] - - # Skip records for other users - next unless jobfields["UserId"] =~/^#{username}\(/ - # Extract job id - record[:jobid]=jobfields["JobId"] + record[:jobid]=job[0..39].strip # Extract job name - record[:jobname]=jobfields["Name"] + record[:jobname]=job[270..job.length].strip # Extract job owner - record[:user]=jobfields["UserId"].split("(").first + record[:user]=job[40..79].strip # Extract core count - record[:cores]=jobfields["NumCPUs"].to_i + record[:cores]=job[80..89].strip.to_i # Extract the partition - record[:queue]=jobfields["Partition"] + record[:queue]=job[90..109].strip # Extract the submit time - record[:submit_time]=Time.local(*jobfields["SubmitTime"].split(/[-:T]/)).getgm + record[:submit_time]=Time.local(*job[110..139].strip.split(/[-:T]/)).getgm # Extract the start time - record[:start_time]=Time.local(*jobfields["StartTime"].split(/[-:T]/)).getgm + record[:start_time]=Time.local(*job[140..169].strip.split(/[-:T]/)).getgm # Extract the end time - record[:end_time]=Time.local(*jobfields["EndTime"].split(/[-:T]/)).getgm + record[:end_time]=Time.local(*job[170..199].strip.split(/[-:T]/)).getgm # Extract the priority - record[:priority]=jobfields["Priority"] + record[:priority]=job[200..229].strip # Extract the exit status - code,signal=jobfields["ExitCode"].split(":").collect {|i| i.to_i} - if code==0 - record[:exit_status]=signal + code_signal=job[230..239].strip + if code_signal=~ /:/ + + code,signal=core_signal.split(":").collect {|i| i.to_i} + if code==0 + record[:exit_status]=signal + else + record[:exit_status]=code + end else - record[:exit_status]=code + record[:exit_status]=code_signal.to_i end # Extract job state - case jobfields["JobState"] - when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/ + case job[240..269].strip + when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" - when /^RUNNING$/,/^COMPLETING$/ + when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ record[:state]="RUNNING" - when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/ + when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/ record[:state]="FAILED" record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs when /^COMPLETED$/ @@ -434,8 +463,7 @@ def refresh_jobqueue else record[:state]="UNKNOWN" end - record[:native_state]=jobfields["JobState"] - + record[:native_state]=job[240..269].strip # Add record to job queue @jobqueue[record[:jobid]]=record @@ -449,7 +477,9 @@ def refresh_jobqueue # refresh_jobacct # ##################################################### - def refresh_jobacct + def refresh_jobacct(delta_days) + + @jobacct_duration=delta_days begin @@ -460,7 +490,9 @@ def refresh_jobacct completed_jobs="" errors="" exit_status=0 - completed_jobs,errors,exit_status=WorkflowMgr.run4("sacct -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P",30) + mmddyy=(DateTime.now-delta_days).strftime('%m%d%y') + cmd="sacct -S #{mmddyy} -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P" + completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,45) return if errors=~/SLURM accounting storage is disabled/ @@ -525,11 +557,11 @@ def refresh_jobacct # Extract job state case jobfields[10] - when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/,/^REQUEUED$/ + when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" - when /^RUNNING$/,/^COMPLETING$/ + when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ record[:state]="RUNNING" - when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/,/^OUT_OF_MEMORY$/,/^BOOT_FAIL$/,/^DEADLINE$/ + when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/ record[:state]="FAILED" record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs when /^COMPLETED$/ diff --git a/lib/workflowmgr/workflowconfig.rb b/lib/workflowmgr/workflowconfig.rb index 848f2d2..ed48d16 100644 --- a/lib/workflowmgr/workflowconfig.rb +++ b/lib/workflowmgr/workflowconfig.rb @@ -43,48 +43,45 @@ def initialize # Load the configuration begin - @config=WorkflowMgr.forkit(10) do - - # Create a .rocoto directory if one does not already exist - FileUtils.mkdir_p(@config_dir) unless File.exists?(@config_dir) - - # Create a .rocoto tmp dir if one does not already exist - FileUtils.mkdir_p(@config_tmp) unless File.exists?(@config_tmp) - - # Move the legacy .wfmrc file to rocotorc file if it exists - FileUtils.mv("#{ENV['HOME']}/.wfmrc",@config_file) if File.exists?("#{ENV['HOME']}/.wfmrc") - - # Load the rocotorc config if one exists - if File.exists?(@config_file) && !File.zero?(@config_file) - config=YAML.load_file(@config_file) - if config.is_a?(Hash) - # Merge default config into rocotorc config if there are unspecified config options - if config.keys.collect {|c| c.to_s}.sort != DEFAULT_CONFIG.keys.collect {|c| c.to_s}.sort - config=DEFAULT_CONFIG.merge(config).delete_if { |k,v| !DEFAULT_CONFIG.has_key?(k) } - File.open("#{@config_file}.#{Process.ppid}","w") { |f| YAML.dump(config,f) } - end - config - else - WorkflowMgr.log("WARNING! Reverted corrupted configuration in #{@config_file} to default.") - WorkflowMgr.stderr("WARNING! Reverted corrupted configuration in #{@config_file} to default.") - File.open("#{@config_file}.#{Process.ppid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } - DEFAULT_CONFIG + + # Create a .rocoto directory if one does not already exist + FileUtils.mkdir_p(@config_dir) unless File.exists?(@config_dir) + + # Create a .rocoto tmp dir if one does not already exist + FileUtils.mkdir_p(@config_tmp) unless File.exists?(@config_tmp) + + # Move the legacy .wfmrc file to rocotorc file if it exists + FileUtils.mv("#{ENV['HOME']}/.wfmrc",@config_file) if File.exists?("#{ENV['HOME']}/.wfmrc") + + # Load the rocotorc config if one exists + if File.exists?(@config_file) && !File.zero?(@config_file) + config=YAML.load_file(@config_file) + if config.is_a?(Hash) + # Merge default config into rocotorc config if there are unspecified config options + if config.keys.collect {|c| c.to_s}.sort != DEFAULT_CONFIG.keys.collect {|c| c.to_s}.sort + config=DEFAULT_CONFIG.merge(config).delete_if { |k,v| !DEFAULT_CONFIG.has_key?(k) } + File.open("#{@config_file}.#{Process.pid}","w") { |f| YAML.dump(config,f) } end + @config = config else - # Create a rocotorc file with default settings if it does not exist - File.open("#{@config_file}.#{Process.ppid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } - DEFAULT_CONFIG + WorkflowMgr.log("WARNING! Reverted corrupted configuration in #{@config_file} to default.") + WorkflowMgr.stderr("WARNING! Reverted corrupted configuration in #{@config_file} to default.") + File.open("#{@config_file}.#{Process.pid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } + @config = DEFAULT_CONFIG end - - end # @config do - - # Update the config file in a quasi-atomic way. - FileUtils.mv("#{@config_file}.#{Process.pid}", @config_file) if File.exists?("#{@config_file}.#{Process.pid}") + else + # Create a rocotorc file with default settings if it does not exist + File.open("#{@config_file}.#{Process.pid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } + @config = DEFAULT_CONFIG + end rescue WorkflowMgr::ForkitTimeoutException msg="ERROR: An I/O operation timed out while reading, writing, or testing for the existence of '#{@config_file}'" WorkflowMgr.log(msg) raise msg + ensure + # Update the config file in a quasi-atomic way. + FileUtils.mv("#{@config_file}.#{Process.pid}", @config_file) if File.exists?("#{@config_file}.#{Process.pid}") end end # initialize diff --git a/lib/workflowmgr/workflowengine.rb b/lib/workflowmgr/workflowengine.rb index d424fd8..9ca3294 100644 --- a/lib/workflowmgr/workflowengine.rb +++ b/lib/workflowmgr/workflowengine.rb @@ -499,7 +499,9 @@ def boot # Roll the log file (if it already exists) @workflowIOServer.roll_log(value) end - @workflowIOServer.mkdir_p(outdir) + unless outdir.empty? + @workflowIOServer.mkdir_p(outdir) + end end end @@ -1613,7 +1615,7 @@ def expire_cycles expired_cycles.each do |cycle| @active_jobs.keys.each do |taskname| next if @active_jobs[taskname][cycle.cycle].nil? - unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" + unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" || @active_jobs[taskname][cycle.cycle].state == "SUBMITTING" @logServer.log(cycle.cycle,"Deleting #{taskname} job #{@active_jobs[taskname][cycle.cycle].id} because this cycle has expired!") @bqServer.delete(@active_jobs[taskname][cycle.cycle].id) end