diff --git a/examples/sub_plans_v2.rb b/examples/sub_plans_v2.rb new file mode 100755 index 00000000..7fd90fd3 --- /dev/null +++ b/examples/sub_plans_v2.rb @@ -0,0 +1,75 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true +example_description = < 0) && can_spawn_next_batch? ? nil : polling_interval + plan_event(Ping, delay) + suspend + end + + def spawn_plans + sub_plans = create_sub_plans + sub_plans = Array[sub_plans] unless sub_plans.is_a? Array + increase_counts(sub_plans.count, 0) + on_planning_finished unless can_spawn_next_batch? + end + + def increase_counts(planned, failed) + output[:planned_count] += planned + failed + output[:failed_count] = output.fetch(:failed_count, 0) + failed + output[:pending_count] = output.fetch(:pending_count, 0) + planned + output[:success_count] ||= 0 + end + + def try_to_finish + return false unless done? + + check_for_errors! + on_finish + true + end + + def done? + return false if can_spawn_next_batch? || !counts_set? + + total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0 + end + + def run_progress + return 0.1 unless counts_set? && total_count > 0 + + sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+) + sum.to_f / total_count + end + + def recalculate_counts + total = total_count + failed = sub_plans_count('state' => %w(paused stopped), 'result' => 'error') + success = sub_plans_count('state' => 'stopped', 'result' => 'success') + output.update(:pending_count => total - failed - success, + :failed_count => failed - output.fetch(:resumed_count, 0), + :success_count => success) + end + + def counts_set? + output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count] + end + + def check_for_errors! + raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0 + end + + # Helper for creating sub plans + def trigger(action_class, *args) + world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) } + end + + # Concurrency limitting + def limit_concurrency_level!(level) + input[:dynflow] ||= {} + input[:dynflow][:concurrency_limit] = level + end + + def concurrency_limit + input[:dynflow] ||= {} + input[:dynflow][:concurrency_limit] + end + + def concurrency_limit_capacity + if limit = concurrency_limit + return limit unless counts_set? + capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count])) + [0, capacity].max + end + end + + # Cancellation handling + def cancel!(force = false) + # Count the not-yet-planned tasks as cancelled + output[:cancelled_count] = total_count - output[:planned_count] + # Pass the cancel event to running sub plans if they can be cancelled + sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? } + suspend + end + + def abort! + cancel! true + end + + # Batching + # Returns the items in the current batch + def current_batch + start_position = output[:planned_count] + size = batch_size + size = concurrency_limit_capacity if concurrency_limit + size = start_position + size > total_count ? total_count - start_position : size + batch(start_position, size) + end + + def can_spawn_next_batch? + remaining_count > 0 + end + + def remaining_count + total_count - output[:cancelled_count] - output[:planned_count] + end + + private + + # Sub-plan lookup + def sub_plan_filter + { 'caller_execution_plan_id' => execution_plan_id, + 'caller_action_id' => self.id } + end + + def sub_plans(filter = {}) + world.persistence.find_execution_plans(filters: sub_plan_filter.merge(filter)) + end + + def sub_plans_count(filter = {}) + world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter)) + end + end +end diff --git a/test/v2_sub_plans_test.rb b/test/v2_sub_plans_test.rb new file mode 100644 index 00000000..b6c644e7 --- /dev/null +++ b/test/v2_sub_plans_test.rb @@ -0,0 +1,189 @@ +# frozen_string_literal: true +require_relative 'test_helper' +require 'mocha/minitest' + +module Dynflow + module V2SubPlansTest + describe 'V2 sub-plans' do + include PlanAssertions + include Dynflow::Testing::Assertions + include Dynflow::Testing::Factories + include TestHelpers + + let(:world) { WorldFactory.create_world } + + class ChildAction < ::Dynflow::Action + def run + end + end + + class ParentAction < ::Dynflow::Action + include Dynflow::Action::V2::WithSubPlans + + def plan(count, concurrency_level = nil) + limit_concurrency_level!(concurrency_level) if concurrency_level + plan_self :count => count + end + + def create_sub_plans + output[:batch_count] ||= 0 + output[:batch_count] += 1 + current_batch.map { |i| trigger(ChildAction) } + end + + def batch(from, size) + (1..total_count).to_a.slice(from, size) + end + + def batch_size + 5 + end + + def total_count + input[:count] + end + end + + describe 'normal operation' do + it 'spawns all sub-plans in one go with high-enough batch size and polls until they are done' do + action = create_and_plan_action ParentAction, 3 + action.world.expects(:trigger).times(3) + action = run_action action + _(action.output['total_count']).must_equal 3 + _(action.output['planned_count']).must_equal 3 + _(action.output['pending_count']).must_equal 3 + + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval, 1) + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0) + action.world.stubs(:persistence).returns(persistence) + + action.world.clock.progress + action.world.executor.progress + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval * 2, 1) + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(3) + action.world.stubs(:persistence).returns(persistence) + action.world.clock.progress + action.world.executor.progress + + _(action.state).must_equal :success + _(action.done?).must_equal true + end + + it 'spawns sub-plans in multiple batches and polls until they are done' do + action = create_and_plan_action ParentAction, 7 + action.world.expects(:trigger).times(5) + action = run_action action + _(action.output['total_count']).must_equal 7 + _(action.output['planned_count']).must_equal 5 + _(action.output['pending_count']).must_equal 5 + + _(action.world.clock.pending_pings).must_be :empty? + _, _, event, * = action.world.executor.events_to_process.first + _(event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + persistence = mock() + # Simulate 3 finished + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(3) + action.world.stubs(:persistence).returns(persistence) + + action.world.expects(:trigger).times(2) + action.world.executor.progress + + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval, 1) + + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(7) + action.world.stubs(:persistence).returns(persistence) + action.world.clock.progress + action.world.executor.progress + + _(action.state).must_equal :success + _(action.done?).must_equal true + end + end + + describe 'with concurrency control' do + include Dynflow::Testing + + it 'allows storage and retrieval' do + action = create_and_plan_action ParentAction, 0 + action = run_action action + _(action.concurrency_limit).must_be_nil + _(action.concurrency_limit_capacity).must_be_nil + + action = create_and_plan_action ParentAction, 0, 1 + action = run_action action + + _(action.input['dynflow']['concurrency_limit']).must_equal 1 + _(action.concurrency_limit).must_equal 1 + _(action.concurrency_limit_capacity).must_equal 1 + end + + it 'reduces the batch size to fit within the concurrency limit' do + action = create_and_plan_action ParentAction, 5, 2 + + # Plan first 2 sub-plans + action.world.expects(:trigger).times(2) + + action = run_action action + _(action.output['total_count']).must_equal 5 + _(action.output['planned_count']).must_equal 2 + _(action.output['pending_count']).must_equal 2 + _(action.concurrency_limit_capacity).must_equal 0 + _(action.output['batch_count']).must_equal 1 + + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval, 1) + persistence = mock() + # Simulate 1 sub-plan finished + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(1) + action.world.stubs(:persistence).returns(persistence) + + # Only 1 sub-plans fits into the capacity + action.world.expects(:trigger).times(1) + action.world.clock.progress + action.world.executor.progress + + _(action.output['planned_count']).must_equal 3 + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(2) + action.world.stubs(:persistence).returns(persistence) + action.world.expects(:trigger).times(1) + action.world.clock.progress + action.world.executor.progress + + _(action.output['planned_count']).must_equal 4 + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(4) + action.world.stubs(:persistence).returns(persistence) + action.world.expects(:trigger).times(1) + action.world.clock.progress + action.world.executor.progress + + _(action.output['planned_count']).must_equal 5 + _(action.concurrency_limit_capacity).must_equal 1 + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(5) + action.world.stubs(:persistence).returns(persistence) + action.world.expects(:trigger).never + action.world.clock.progress + action.world.executor.progress + _(action.state).must_equal :success + _(action.done?).must_equal true + _(action.concurrency_limit_capacity).must_equal 2 + end + end + end + end +end