Skip to content

Commit

Permalink
Introduce v2 for sub-plans
Browse files Browse the repository at this point in the history
The previous implementation of sub plans API comprised of several
modules which could be layered on top of each other. This made the code
somewhat hard to follow, not all of the modules worked in all scenarios
as some relied on an executor being a single process.

The v2 flattens all the modules into a single one and drastically
reduces the amount of possible combinations. Sub plans are spawned in
batches and then the parent polls until the sub plans finish.

It also includes support for limitting the number of sub-plans that may
be running at a given time, without relying on the throttle limitter and
thus should be safe in multi-process environments. Under the hood it
dynamically alters the batch size to honor the concurrency limit.
  • Loading branch information
adamruzicka committed Jul 21, 2023
1 parent 284598a commit 3d4d8e6
Show file tree
Hide file tree
Showing 5 changed files with 493 additions and 0 deletions.
75 changes: 75 additions & 0 deletions examples/sub_plans_v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env ruby
# frozen_string_literal: true
example_description = <<DESC
Sub Plans Example
===================
This example shows, how to trigger the execution plans from within a
run method of some action and waing for them to finish.
This is useful, when doing bulk actions, where having one big
execution plan would not be effective, or in case all the data are
not available by the time of original action planning.
DESC

require_relative 'example_helper'
require_relative 'orchestrate_evented'

COUNT = (ARGV[0] || 25).to_i

class Foo < Dynflow::Action
def plan
plan_self
end

def run(event = nil)
case event
when nil
rng = Random.new
plan_event(:ping, rng.rand(25) + 1)
suspend
when :ping
# Finish
end
end
end

class SubPlansExample < Dynflow::Action
include Dynflow::Action::V2::WithSubPlans

def initiate
limit_concurrency_level! 3
super
end

def create_sub_plans
current_batch.map { |i| trigger(Foo) }
end

def batch_size
15
end

def batch(from, size)
COUNT.times.drop(from).take(size)
end

def total_count
COUNT
end
end

if $0 == __FILE__
ExampleHelper.world.action_logger.level = Logger::DEBUG
ExampleHelper.world
t1 = ExampleHelper.world.trigger(SubPlansExample)
puts example_description
puts <<-MSG.gsub(/^.*\|/, '')
| Execution plans #{t1.id} with sub plans triggered
| You can see the details at
| #{ExampleHelper::DYNFLOW_URL}/#{t1.id}
MSG

ExampleHelper.run_web_console
end
1 change: 1 addition & 0 deletions lib/dynflow/action.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class Action < Serializable
require 'dynflow/action/with_sub_plans'
require 'dynflow/action/with_bulk_sub_plans'
require 'dynflow/action/with_polling_sub_plans'
require 'dynflow/action/v2'

def self.all_children
children.values.inject(children.values) do |children, child|
Expand Down
9 changes: 9 additions & 0 deletions lib/dynflow/action/v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Dynflow
class Action
module V2
require 'dynflow/action/v2/with_sub_plans'
end
end
end
219 changes: 219 additions & 0 deletions lib/dynflow/action/v2/with_sub_plans.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
# frozen_string_literal: true

module Dynflow::Action::V2
module WithSubPlans
include Dynflow::Action::Cancellable

DEFAULT_BATCH_SIZE = 100
DEFAULT_POLLING_INTERVAL = 15
Ping = Algebrick.atom

class SubtaskFailedException < RuntimeError
def backtrace
[]
end
end

# Methods to be overridden
def create_sub_plans
raise NotImplementedError
end

# Should return the expected total count of tasks
def total_count
raise NotImplementedError
end

def batch_size
DEFAULT_BATCH_SIZE
end

# Should return a slice of size items starting from item with index from
def batch(from, size)
raise NotImplementedError
end

# Polling
def polling_interval
DEFAULT_POLLING_INTERVAL
end

# Callbacks
def on_finish
end

def on_planning_finished
end

def run(event = nil)
case event
when nil
if output[:total_count]
resume
else
initiate
end
when Ping
tick
when ::Dynflow::Action::Cancellable::Cancel
cancel!
when ::Dynflow::Action::Cancellable::Abort
abort!
end
try_to_finish || suspend_and_ping
end

def initiate
output[:planned_count] = 0
output[:cancelled_count] = 0
output[:total_count] = total_count
spawn_plans
end

def resume
if sub_plans.all? { |sub_plan| sub_plan.error_in_plan? }
output[:resumed_count] ||= 0
output[:resumed_count] += output[:failed_count]
# We're starting over and need to reset the counts
%w(total failed pending success).each { |key| output.delete("#{key}_count".to_sym) }
initiate
else
tick
end
end

def tick
recalculate_counts
spawn_plans if can_spawn_next_batch?
end

def suspend_and_ping
delay = (concurrency_limit.nil? || concurrency_limit_capacity > 0) && can_spawn_next_batch? ? nil : polling_interval
plan_event(Ping, delay)
suspend
end

def spawn_plans
sub_plans = create_sub_plans
sub_plans = Array[sub_plans] unless sub_plans.is_a? Array
increase_counts(sub_plans.count, 0)
on_planning_finished unless can_spawn_next_batch?
end

def increase_counts(planned, failed)
output[:planned_count] += planned + failed
output[:failed_count] = output.fetch(:failed_count, 0) + failed
output[:pending_count] = output.fetch(:pending_count, 0) + planned
output[:success_count] ||= 0
end

def try_to_finish
return false unless done?

check_for_errors!
on_finish
true
end

def done?
return false if can_spawn_next_batch? || !counts_set?

total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0
end

def run_progress
return 0.1 unless counts_set? && total_count > 0

sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+)
sum.to_f / total_count
end

def recalculate_counts
total = total_count
failed = sub_plans_count('state' => %w(paused stopped), 'result' => 'error')
success = sub_plans_count('state' => 'stopped', 'result' => 'success')
output.update(:pending_count => total - failed - success,
:failed_count => failed - output.fetch(:resumed_count, 0),
:success_count => success)
end

def counts_set?
output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count]
end

def check_for_errors!
raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0
end

# Helper for creating sub plans
def trigger(action_class, *args)
world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) }
end

# Concurrency limitting
def limit_concurrency_level!(level)
input[:dynflow] ||= {}
input[:dynflow][:concurrency_limit] = level
end

def concurrency_limit
input[:dynflow] ||= {}
input[:dynflow][:concurrency_limit]
end

def concurrency_limit_capacity
if limit = concurrency_limit
return limit unless counts_set?
capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count]))
[0, capacity].max
end
end

# Cancellation handling
def cancel!(force = false)
# Count the not-yet-planned tasks as cancelled
output[:cancelled_count] = total_count - output[:planned_count]
# Pass the cancel event to running sub plans if they can be cancelled
sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? }
suspend
end

def abort!
cancel! true
end

# Batching
# Returns the items in the current batch
def current_batch
start_position = output[:planned_count]
size = batch_size
size = concurrency_limit_capacity if concurrency_limit
size = start_position + size > total_count ? total_count - start_position : size
batch(start_position, size)
end

def can_spawn_next_batch?
remaining_count > 0
end

def remaining_count
total_count - output[:cancelled_count] - output[:planned_count]
end

private

# Sub-plan lookup
def sub_plan_filter
{ 'caller_execution_plan_id' => execution_plan_id,
'caller_action_id' => self.id }
end

def sub_plans(filter = {})
world.persistence.find_execution_plans(filters: sub_plan_filter.merge(filter))
end

def sub_plans_count(filter = {})
world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter))
end
end
end
Loading

0 comments on commit 3d4d8e6

Please sign in to comment.