Skip to content

Commit

Permalink
Add TaskValidDependency (#13)
Browse files Browse the repository at this point in the history
* Add TaskValidDependency

The TaskValidDependency is used to check whether a task is valid for

given cycle. This comes in handy when setting up complex
ndencies
for workflows in which the completion of a cycle depends on a task

may or may not have needed to run in that cycle.

An example:
You declare two cycledefs, prep and fcst, in which fcst runs a
deterministic forecast once daily, and requires a background ensemble to
run 6 hours prior to the forecast. The prep cycles run a
task named 'bkg_ens_stats', while the the fcst cycles run a task
'forecast'. Successful completion of a cycle depends (among other
things) on both of these tasks having completed, but not for every
cycle. A dependency then, for any valid cycle to be complete could
be:

<and>
  <or>
   <not><taskvalid task="bkg_ens_stats"/></not>
   <taskdep task="bkg_ens_stats"/>
  </or>
  <or>
   <not><taskvalid task="forecast"/></not>
   <taskdep task="forecast"/>
  </or>
</and>

This dependency would check whether a task should run and if it should,
that it completed successfully.

* Remove unnecessary lines.
  • Loading branch information
christinaholt authored and christopherwharrop committed Sep 7, 2017
1 parent 04c01d8 commit 942bf79
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 9 deletions.
4 changes: 2 additions & 2 deletions lib/wfmstat/statusengine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ def checkTask
hangdependencies=nil
unless task.nil?
unless task.dependency.nil?
wstate=WorkflowMgr::WorkflowState.new(cycle.cycle,jobs,@workflowIOServer,@workflowdoc.cycledefs,task.attributes[:name],task)
wstate=WorkflowMgr::WorkflowState.new(cycle.cycle,jobs,@workflowIOServer,@workflowdoc.cycledefs,task.attributes[:name],task,tasks=@workflowdoc.tasks)
dependencies=task.dependency.query(wstate)
printf "%2s%s\n", "","dependencies"
print_deps(dependencies,0)
end
unless task.hangdependency.nil?
wstate=WorkflowState.new(cycle.cycle,jobs,@workflowIOServer,@workflowdoc.cycledefs,task.attributes[:name],task)
wstate=WorkflowState.new(cycle.cycle,jobs,@workflowIOServer,@workflowdoc.cycledefs,task.attributes[:name],task,tasks=@workflowdoc.tasks)
hangdependencies=task.hangdependency.query(wstate)
printf "%2s%s\n", "","hang dependencies"
print_deps(hangdependencies,0)
Expand Down
73 changes: 73 additions & 0 deletions lib/workflowmgr/dependency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,79 @@ def query(d)

end

##########################################
#
# Class TaskValidDependency
#
##########################################
class TaskValidDependency

#####################################################
#
# initialize
#
#####################################################
def initialize(task)
@task=task
end

#####################################################
#
# Resolved?
#
#####################################################
def resolved?(d)

# Get the jobs for this cycle
return false if d.tasks[@task].nil?

checkjob=d.tasks[@task]

# Get the mandatory task attribute
cycle_is_valid=true
unless checkjob.attributes[:cycledefs].nil?
taskcycledefs=d.cycledefs.find_all { |cycledef| checkjob.attributes[:cycledefs].split(/[\s,]+/).member?(cycledef.group) }
# Cycle is invalid for this task if the cycle is not a member of the tasks cycle list
unless taskcycledefs.any? { |cycledef| cycledef.member?(d.cycle) }
cycle_is_valid=false
end
end # unless

return cycle_is_valid
end

#####################################################
#
# Query
#
#####################################################
def query(d)


# Set the job to check

# Get the jobs for this cycle
return [{:dep=>"#{@task}", :msg=>"is not valid", :resolved=>false }] if d.tasks[@task].nil?

checkjob=d.tasks[@task]

cycle_is_valid=true
unless checkjob.attributes[:cycledefs].nil?
taskcycledefs=d.cycledefs.find_all { |cycledef| checkjob.attributes[:cycledefs].split(/[\s,]+/).member?(cycledef.group) }
# Cycle is invalid for this task if the cycle is not a member of the tasks cycle list
unless taskcycledefs.any? { |cycledef| cycledef.member?(d.cycle) }
cycle_is_valid=false
end
end # unless

if cycle_is_valid
return [{:dep=>"#{@task}", :msg=>"is valid", :resolved=>true }]
else
return [{:dep=>"#{@task}", :msg=>"is not valid", :resolved=>false }]
end
end

end

##########################################
#
Expand Down
6 changes: 6 additions & 0 deletions lib/workflowmgr/schema_with_metatasks.rng
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@
</attribute>
<empty/>
</element>
<element name="taskvalid">
<attribute name="task">
<data type="string"/>
</attribute>
<empty/>
</element>
<element name="metataskdep">
<attribute name="metatask">
<data type="string"/>
Expand Down
5 changes: 5 additions & 0 deletions lib/workflowmgr/schema_without_metatasks.rng
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@
</attribute>
<empty/>
</element>
<element name="taskvalid">
<attribute name="task">
<data type="string"/>
</attribute>
</element>
<element name="true">
<empty/>
</element>
Expand Down
4 changes: 2 additions & 2 deletions lib/workflowmgr/workflowdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ def create_tables(db,tables)

# Create the jobs table
unless tables.member?("jobs")
db.execute("CREATE TABLE jobs (id INTEGER PRIMARY KEY, jobid VARCHAR(64), taskname VARCHAR(64), cycle DATETIME, cores INTEGER, state VARCHAR[64], native_state VARCHAR[64], exit_status INTEGER, tries INTEGER, nunknowns INTEGER, duration REAL);")
db.execute("CREATE TABLE jobs (id INTEGER PRIMARY KEY, jobid VARCHAR(64), taskname VARCHAR(64), cycle DATETIME, cores INTEGER, state VARCHAR(64), native_state VARCHAR[64], exit_status INTEGER, tries INTEGER, nunknowns INTEGER, duration REAL);")
end

# Create the bqservers table
Expand All @@ -1157,7 +1157,7 @@ def create_tables(db,tables)

# Create the downpaths table
unless tables.member?("downpaths")
db.execute("CREATE TABLE downpaths (id INTEGER PRIMARY KEY, path VARCHAR(1024), downdate DATETIME, host VARCHAR[64], pid INTEGER);")
db.execute("CREATE TABLE downpaths (id INTEGER PRIMARY KEY, path VARCHAR(1024), downdate DATETIME, host VARCHAR(64), pid INTEGER);")
end

# Create the vacuum table
Expand Down
18 changes: 18 additions & 0 deletions lib/workflowmgr/workflowdoc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,8 @@ def get_dependency_node(element)
return get_shelldep(element)
when "cycleexistdep"
return get_cycleexistdep(element)
when "taskvalid"
return get_taskvaliddep(element)
when "datadep"
return get_datadep(element)
when "timedep"
Expand Down Expand Up @@ -642,6 +644,22 @@ def get_cycleexistdep(element)
return CycleExistDependency.new(cycle_offset)
end


#####################################################
#
# get_taskvaliddep
#
#####################################################
def get_taskvaliddep(element)

task=element.attributes["task"]

return TaskValidDependency.new(task)
end





#####################################################
#
Expand Down
6 changes: 3 additions & 3 deletions lib/workflowmgr/workflowengine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def rewind!
puts "#{strcyc}: #{task_name}: No entry in @tasks. Task does not exist. INTERNAL ERROR."
fail
end
wstate=WorkflowState.new(cycle,@active_jobs,@workflowIOServer,@cycledefs,task_name,task)
wstate=WorkflowState.new(cycle,@active_jobs,@workflowIOServer,@cycledefs,task_name,task,tasks=@tasks)
task.rewind!(wstate)

puts "#{strcyc}: #{task_name}: deleting all records of this job."
Expand Down Expand Up @@ -1127,7 +1127,7 @@ def update_active_jobs
# Check for job hang
unless @tasks[job.task].hangdependency.nil?
if job.state=="RUNNING"
wstate=WorkflowState.new(job.cycle,@active_jobs,@workflowIOServer,@cycledefs,job.task,@tasks[job.task])
wstate=WorkflowState.new(job.cycle,@active_jobs,@workflowIOServer,@cycledefs,job.task,@tasks[job.task],tasks=@tasks)
if @tasks[job.task].hangdependency.resolved?(wstate)
job.state="FAILED"
runmsg=". A job hang has been detected. The job will be killed. It will be resubmitted if the retry count has not been exceeded."
Expand Down Expand Up @@ -1477,7 +1477,7 @@ def submit_new_jobs

# Reject this task if dependencies are not satisfied
unless task.dependency.nil?
wstate=WorkflowState.new(cycletime,@active_jobs,@workflowIOServer,@cycledefs,task.attributes[:name],task)
wstate=WorkflowState.new(cycletime,@active_jobs,@workflowIOServer,@cycledefs,task.attributes[:name],task,tasks=@tasks)
next unless task.dependency.resolved?(wstate)
end

Expand Down
5 changes: 3 additions & 2 deletions lib/workflowmgr/workflowstate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ class WorkflowState
#
##########################################
attr_reader :cycle, :jobList, :workflowIOServer, :cycledefs
attr_reader :taskname, :task
attr_reader :taskname, :task, :tasks
def taskName ; return @taskname ; end

##########################################
#
# Initialize
#
##########################################
def initialize(cycle,jobList,workflowIOServer,cycledefs,taskname,task,doc=nil)
def initialize(cycle,jobList,workflowIOServer,cycledefs,taskname,task,tasks=nil,doc=nil)
if taskname.nil?
raise 'In WorkflowState.new, taskname cannot be nil.'
end
Expand All @@ -37,6 +37,7 @@ def initialize(cycle,jobList,workflowIOServer,cycledefs,taskname,task,doc=nil)
@cycledefs=cycledefs
@taskname=taskname
@task=task
@tasks=tasks
@doc=nil
@se=nil
end
Expand Down

0 comments on commit 942bf79

Please sign in to comment.