From 3d4d8e6fd3b3952f1c470a6dda761d93e42ca8a4 Mon Sep 17 00:00:00 2001 From: Adam Ruzicka Date: Fri, 21 Jul 2023 13:00:50 +0000 Subject: [PATCH] Introduce v2 for sub-plans The previous implementation of sub plans API comprised of several modules which could be layered on top of each other. This made the code somewhat hard to follow, not all of the modules worked in all scenarios as some relied on an executor being a single process. The v2 flattens all the modules into a single one and drastically reduces the amount of possible combinations. Sub plans are spawned in batches and then the parent polls until the sub plans finish. It also includes support for limitting the number of sub-plans that may be running at a given time, without relying on the throttle limitter and thus should be safe in multi-process environments. Under the hood it dynamically alters the batch size to honor the concurrency limit. --- examples/sub_plans_v2.rb | 75 ++++++++ lib/dynflow/action.rb | 1 + lib/dynflow/action/v2.rb | 9 + lib/dynflow/action/v2/with_sub_plans.rb | 219 ++++++++++++++++++++++++ test/v2_sub_plans_test.rb | 189 ++++++++++++++++++++ 5 files changed, 493 insertions(+) create mode 100755 examples/sub_plans_v2.rb create mode 100644 lib/dynflow/action/v2.rb create mode 100644 lib/dynflow/action/v2/with_sub_plans.rb create mode 100644 test/v2_sub_plans_test.rb diff --git a/examples/sub_plans_v2.rb b/examples/sub_plans_v2.rb new file mode 100755 index 00000000..7fd90fd3 --- /dev/null +++ b/examples/sub_plans_v2.rb @@ -0,0 +1,75 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true +example_description = < 0) && can_spawn_next_batch? ? nil : polling_interval + plan_event(Ping, delay) + suspend + end + + def spawn_plans + sub_plans = create_sub_plans + sub_plans = Array[sub_plans] unless sub_plans.is_a? Array + increase_counts(sub_plans.count, 0) + on_planning_finished unless can_spawn_next_batch? + end + + def increase_counts(planned, failed) + output[:planned_count] += planned + failed + output[:failed_count] = output.fetch(:failed_count, 0) + failed + output[:pending_count] = output.fetch(:pending_count, 0) + planned + output[:success_count] ||= 0 + end + + def try_to_finish + return false unless done? + + check_for_errors! + on_finish + true + end + + def done? + return false if can_spawn_next_batch? || !counts_set? + + total_count - output[:success_count] - output[:failed_count] - output[:cancelled_count] <= 0 + end + + def run_progress + return 0.1 unless counts_set? && total_count > 0 + + sum = output.values_at(:success_count, :cancelled_count, :failed_count).reduce(:+) + sum.to_f / total_count + end + + def recalculate_counts + total = total_count + failed = sub_plans_count('state' => %w(paused stopped), 'result' => 'error') + success = sub_plans_count('state' => 'stopped', 'result' => 'success') + output.update(:pending_count => total - failed - success, + :failed_count => failed - output.fetch(:resumed_count, 0), + :success_count => success) + end + + def counts_set? + output[:total_count] && output[:success_count] && output[:failed_count] && output[:pending_count] + end + + def check_for_errors! + raise SubtaskFailedException.new("A sub task failed") if output[:failed_count] > 0 + end + + # Helper for creating sub plans + def trigger(action_class, *args) + world.trigger { world.plan_with_options(action_class: action_class, args: args, caller_action: self) } + end + + # Concurrency limitting + def limit_concurrency_level!(level) + input[:dynflow] ||= {} + input[:dynflow][:concurrency_limit] = level + end + + def concurrency_limit + input[:dynflow] ||= {} + input[:dynflow][:concurrency_limit] + end + + def concurrency_limit_capacity + if limit = concurrency_limit + return limit unless counts_set? + capacity = limit - (output[:planned_count] - (output[:success_count] + output[:failed_count])) + [0, capacity].max + end + end + + # Cancellation handling + def cancel!(force = false) + # Count the not-yet-planned tasks as cancelled + output[:cancelled_count] = total_count - output[:planned_count] + # Pass the cancel event to running sub plans if they can be cancelled + sub_plans(:state => 'running').each { |sub_plan| sub_plan.cancel(force) if sub_plan.cancellable? } + suspend + end + + def abort! + cancel! true + end + + # Batching + # Returns the items in the current batch + def current_batch + start_position = output[:planned_count] + size = batch_size + size = concurrency_limit_capacity if concurrency_limit + size = start_position + size > total_count ? total_count - start_position : size + batch(start_position, size) + end + + def can_spawn_next_batch? + remaining_count > 0 + end + + def remaining_count + total_count - output[:cancelled_count] - output[:planned_count] + end + + private + + # Sub-plan lookup + def sub_plan_filter + { 'caller_execution_plan_id' => execution_plan_id, + 'caller_action_id' => self.id } + end + + def sub_plans(filter = {}) + world.persistence.find_execution_plans(filters: sub_plan_filter.merge(filter)) + end + + def sub_plans_count(filter = {}) + world.persistence.find_execution_plan_counts(filters: sub_plan_filter.merge(filter)) + end + end +end diff --git a/test/v2_sub_plans_test.rb b/test/v2_sub_plans_test.rb new file mode 100644 index 00000000..b6c644e7 --- /dev/null +++ b/test/v2_sub_plans_test.rb @@ -0,0 +1,189 @@ +# frozen_string_literal: true +require_relative 'test_helper' +require 'mocha/minitest' + +module Dynflow + module V2SubPlansTest + describe 'V2 sub-plans' do + include PlanAssertions + include Dynflow::Testing::Assertions + include Dynflow::Testing::Factories + include TestHelpers + + let(:world) { WorldFactory.create_world } + + class ChildAction < ::Dynflow::Action + def run + end + end + + class ParentAction < ::Dynflow::Action + include Dynflow::Action::V2::WithSubPlans + + def plan(count, concurrency_level = nil) + limit_concurrency_level!(concurrency_level) if concurrency_level + plan_self :count => count + end + + def create_sub_plans + output[:batch_count] ||= 0 + output[:batch_count] += 1 + current_batch.map { |i| trigger(ChildAction) } + end + + def batch(from, size) + (1..total_count).to_a.slice(from, size) + end + + def batch_size + 5 + end + + def total_count + input[:count] + end + end + + describe 'normal operation' do + it 'spawns all sub-plans in one go with high-enough batch size and polls until they are done' do + action = create_and_plan_action ParentAction, 3 + action.world.expects(:trigger).times(3) + action = run_action action + _(action.output['total_count']).must_equal 3 + _(action.output['planned_count']).must_equal 3 + _(action.output['pending_count']).must_equal 3 + + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval, 1) + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0) + action.world.stubs(:persistence).returns(persistence) + + action.world.clock.progress + action.world.executor.progress + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval * 2, 1) + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(3) + action.world.stubs(:persistence).returns(persistence) + action.world.clock.progress + action.world.executor.progress + + _(action.state).must_equal :success + _(action.done?).must_equal true + end + + it 'spawns sub-plans in multiple batches and polls until they are done' do + action = create_and_plan_action ParentAction, 7 + action.world.expects(:trigger).times(5) + action = run_action action + _(action.output['total_count']).must_equal 7 + _(action.output['planned_count']).must_equal 5 + _(action.output['pending_count']).must_equal 5 + + _(action.world.clock.pending_pings).must_be :empty? + _, _, event, * = action.world.executor.events_to_process.first + _(event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + persistence = mock() + # Simulate 3 finished + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(3) + action.world.stubs(:persistence).returns(persistence) + + action.world.expects(:trigger).times(2) + action.world.executor.progress + + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval, 1) + + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(7) + action.world.stubs(:persistence).returns(persistence) + action.world.clock.progress + action.world.executor.progress + + _(action.state).must_equal :success + _(action.done?).must_equal true + end + end + + describe 'with concurrency control' do + include Dynflow::Testing + + it 'allows storage and retrieval' do + action = create_and_plan_action ParentAction, 0 + action = run_action action + _(action.concurrency_limit).must_be_nil + _(action.concurrency_limit_capacity).must_be_nil + + action = create_and_plan_action ParentAction, 0, 1 + action = run_action action + + _(action.input['dynflow']['concurrency_limit']).must_equal 1 + _(action.concurrency_limit).must_equal 1 + _(action.concurrency_limit_capacity).must_equal 1 + end + + it 'reduces the batch size to fit within the concurrency limit' do + action = create_and_plan_action ParentAction, 5, 2 + + # Plan first 2 sub-plans + action.world.expects(:trigger).times(2) + + action = run_action action + _(action.output['total_count']).must_equal 5 + _(action.output['planned_count']).must_equal 2 + _(action.output['pending_count']).must_equal 2 + _(action.concurrency_limit_capacity).must_equal 0 + _(action.output['batch_count']).must_equal 1 + + ping = action.world.clock.pending_pings.first + _(ping.what.value.event).must_equal Dynflow::Action::V2::WithSubPlans::Ping + _(ping.when).must_be_within_delta(Time.now + action.polling_interval, 1) + persistence = mock() + # Simulate 1 sub-plan finished + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(1) + action.world.stubs(:persistence).returns(persistence) + + # Only 1 sub-plans fits into the capacity + action.world.expects(:trigger).times(1) + action.world.clock.progress + action.world.executor.progress + + _(action.output['planned_count']).must_equal 3 + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(2) + action.world.stubs(:persistence).returns(persistence) + action.world.expects(:trigger).times(1) + action.world.clock.progress + action.world.executor.progress + + _(action.output['planned_count']).must_equal 4 + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(4) + action.world.stubs(:persistence).returns(persistence) + action.world.expects(:trigger).times(1) + action.world.clock.progress + action.world.executor.progress + + _(action.output['planned_count']).must_equal 5 + _(action.concurrency_limit_capacity).must_equal 1 + + persistence = mock() + persistence.stubs(:find_execution_plan_counts).returns(0).then.returns(5) + action.world.stubs(:persistence).returns(persistence) + action.world.expects(:trigger).never + action.world.clock.progress + action.world.executor.progress + _(action.state).must_equal :success + _(action.done?).must_equal true + _(action.concurrency_limit_capacity).must_equal 2 + end + end + end + end +end