Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce v2 for sub-plans #428

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions examples/sub_plans_v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
example_description = <<DESC
Sub Plans Example
===================

This example shows, how to trigger the execution plans from within a
run method of some action and waing for them to finish.

This is useful, when doing bulk actions, where having one big
execution plan would not be effective, or in case all the data are
not available by the time of original action planning.

DESC

require_relative 'example_helper'
require_relative 'orchestrate_evented'

COUNT = (ARGV[0] || 25).to_i

class Foo < Dynflow::Action
def plan
plan_self
end

def run(event = nil)
case event
when nil
rng = Random.new
plan_event(:ping, rng.rand(25) + 1)
suspend
when :ping
# Finish
end
end
end

class SubPlansExample < Dynflow::Action
include Dynflow::Action::V2::WithSubPlans

def initiate
limit_concurrency_level! 3
super
end

def create_sub_plans
current_batch.map { |i| trigger(Foo) }
end

def batch_size
15
end

def batch(from, size)
COUNT.times.drop(from).take(size)
end

def total_count
COUNT
end
end

if $0 == __FILE__
ExampleHelper.world.action_logger.level = Logger::DEBUG
ExampleHelper.world
t1 = ExampleHelper.world.trigger(SubPlansExample)
puts example_description
puts <<-MSG.gsub(/^.*\|/, '')
| Execution plans #{t1.id} with sub plans triggered
| You can see the details at
| #{ExampleHelper::DYNFLOW_URL}/#{t1.id}
MSG

ExampleHelper.run_web_console
end
1 change: 1 addition & 0 deletions lib/dynflow/action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Action < Serializable
require 'dynflow/action/with_sub_plans'
require 'dynflow/action/with_bulk_sub_plans'
require 'dynflow/action/with_polling_sub_plans'
require 'dynflow/action/v2'

def self.all_children
children.values.inject(children.values) do |children, child|
Expand Down
9 changes: 9 additions & 0 deletions lib/dynflow/action/v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Dynflow
class Action
module V2
require 'dynflow/action/v2/with_sub_plans'
end
end
end
219 changes: 219 additions & 0 deletions lib/dynflow/action/v2/with_sub_plans.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# frozen_string_literal: true

module Dynflow::Action::V2
module WithSubPlans
include Dynflow::Action::Cancellable

DEFAULT_BATCH_SIZE = 100
DEFAULT_POLLING_INTERVAL = 15
Ping = Algebrick.atom

class SubtaskFailedException < RuntimeError
def backtrace
[]
end
end

# Methods to be overridden
def create_sub_plans
raise NotImplementedError
end

# Should return the expected total count of tasks
def total_count
raise NotImplementedError
end

def batch_size
DEFAULT_BATCH_SIZE
end

# Should return a slice of size items starting from item with index from
def batch(from, size)
raise NotImplementedError
end

# Polling
def polling_interval
DEFAULT_POLLING_INTERVAL
end

# Callbacks
def on_finish
end

def on_planning_finished
end

def run(event = nil)
case event
when nil
if output[:total_count]
resume
else
initiate
end
when Ping
tick
when ::Dynflow::Action::Cancellable::Cancel
cancel!
when ::Dynflow::Action::Cancellable::Abort
abort!
end
try_to_finish || suspend_and_ping
end

def initiate
output[:planned_count] = 0
output[:cancelled_count] = 0
output[:total_count] = total_count
spawn_plans
end

def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
output[:resumed_count] ||= 0
output[:resumed_count] += output[:failed_count]
# We're starting over and need to reset the counts
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
tick
end
end

def tick
recalculate_counts
spawn_plans if can_spawn_next_batch?
end

def suspend_and_ping
delay = (concurrency_limit.nil? || concurrency_limit_capacity > 0) && can_spawn_next_batch? ? nil : polling_interval
plan_event(Ping, delay)
suspend
end

def spawn_plans
sub_plans = create_sub_plans
sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
increase_counts(sub_plans.count, 0)
on_planning_finished unless can_spawn_next_batch?
end

def increase_counts(planned, failed)
output[:planned_count] += planned + failed
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end

def try_to_finish
return false unless done?

check_for_errors!
on_finish
true
end

def done?
return false if can_spawn_next_batch? || !counts_set?

total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0
end

def run_progress
return 0.1 unless counts_set? && total_count > 0

sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+)
sum.to_f / total_count
end

def recalculate_counts
total = total_count
failed = sub_plans_count('state' => %w(paused stopped), 'result' => 'error')
success = sub_plans_count('state' => 'stopped', 'result' => 'success')
output.update(:pending_count => total - failed - success,
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success)
end

def counts_set?
output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end

def check_for_errors!
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end

# Helper for creating sub plans
def trigger(action_class, *args)
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end

# Concurrency limitting
def limit_concurrency_level!(level)
input[:dynflow] ||= {}
input[:dynflow][:concurrency_limit] = level
end

def concurrency_limit
input[:dynflow] ||= {}
input[:dynflow][:concurrency_limit]
end

def concurrency_limit_capacity
if limit = concurrency_limit
return limit unless counts_set?
capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count]))
[0, capacity].max
end
end

# Cancellation handling
def cancel!(force = false)
# Count the not-yet-planned tasks as cancelled
output[:cancelled_count] = total_count - output[:planned_count]
# Pass the cancel event to running sub plans if they can be cancelled
sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
suspend
end

def abort!
cancel! true
end

# Batching
# Returns the items in the current batch
def current_batch
start_position = output[:planned_count]
size = batch_size
size = concurrency_limit_capacity if concurrency_limit
size = start_position + size > total_count ? total_count - start_position : size
batch(start_position, size)
end

def can_spawn_next_batch?
remaining_count > 0
end

def remaining_count
total_count - output[:cancelled_count] - output[:planned_count]
end

private

# Sub-plan lookup
def sub_plan_filter
{ 'caller_execution_plan_id' => execution_plan_id,
'caller_action_id' => self.id }
end

def sub_plans(filter = {})
world.persistence.find_execution_plans(filters: sub_plan_filter.merge(filter))
end

def sub_plans_count(filter = {})
world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter))
end
end
end
Loading
Loading