Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: validate that the "old" part of a reconfigured pair will get garbage collected #358

Open
wants to merge 4 commits into
base: transition-to-runkit
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 96 additions & 33 deletions lib/syskit/network_generation/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,61 @@ def compute_deployed_network(
def apply_deployed_network_to_plan
# Finally, we map the deployed network to the currently
# running tasks
@deployment_tasks, @deployed_tasks =
@deployment_tasks, @reused_deployed_tasks, @new_deployed_tasks =
log_timepoint_group "finalize_deployed_tasks" do
finalize_deployed_tasks
end
@deployed_tasks = @reused_deployed_tasks + @new_deployed_tasks

sever_old_plan_from_new_plan

if @dataflow_dynamics
@dataflow_dynamics.apply_merges(merge_solver)
log_timepoint "apply_merged_to_dataflow_dynamics"
end

Engine.deployment_postprocessing.each do |block|
block.call(self, work_plan)
log_timepoint "postprocessing:#{block}"
end
end

# "Cut" relations between the "old" plan and the new one
#
# At this stage, old components (task contexts and compositions)
# that are not part of the new plan may still be child of bits of
# the new plan. This happens if they are added as children of other
# task contexts. The transformer does this to register dynamic
# transformation producers
#
# This pass looks for all proxies of compositions and task contexts
# that are not the target of a merge operation. When this happens,
# we know that the component is not being reused, and we remove all
# dependency relations where it is child and where the parent is
# "useful"
#
# Note that we do this only for relations between Syskit
# components. Relations with "plan" Roby tasks are updated because
# we replace toplevel tasks.
def sever_old_plan_from_new_plan
old_tasks =
work_plan
.find_local_tasks(Syskit::Component)
.find_all(&:transaction_proxy?)

merge_leaves = merge_solver.each_merge_leaf.to_set
old_tasks.each do |old_task|
next if merge_leaves.include?(old_task)

parents =
old_task
.each_parent_task
.find_all { |t| merge_leaves.include?(t) }

parents.each { |t| t.remove_child(old_task) }
end
end

class << self
# Set of blocks registered with
# register_instanciation_postprocessing
Expand Down Expand Up @@ -349,8 +389,9 @@ def finalize_deployed_tasks
end
log_timepoint "select_deployments"

reused_deployed_tasks =
reconfigure_tasks_on_static_port_modification(reused_deployed_tasks)
reconfigure_tasks_on_static_port_modification(
reused_deployed_tasks, newly_deployed_tasks
)
log_timepoint "reconfigure_tasks_on_static_port_modification"

debug do
Expand All @@ -366,7 +407,7 @@ def finalize_deployed_tasks
merge_solver.merge_identical_tasks
log_timepoint "merge"

[selected_deployment_tasks, reused_deployed_tasks | newly_deployed_tasks]
[selected_deployment_tasks, reused_deployed_tasks, newly_deployed_tasks]
end

# Process a single deployment in {#finalize_deployed_tasks}
Expand Down Expand Up @@ -395,18 +436,18 @@ def handle_required_deployment(required, usable, not_reusable)
end

if usable
newly_deployed_tasks = []
reused_deployed_tasks = adapt_existing_deployment(required, usable)
new_deployed_tasks, reused_deployed_tasks =
adapt_existing_deployment(required, usable)
selected = usable
else
# Nothing to do, we leave the plan as it is
newly_deployed_tasks = required.each_executed_task
new_deployed_tasks = required.each_executed_task
reused_deployed_tasks = []
selected = required
end

selected.should_start_after(not_reusable.stop_event) if not_reusable
[selected, newly_deployed_tasks, reused_deployed_tasks]
[selected, new_deployed_tasks, reused_deployed_tasks]
end

# Validate that the usable deployment we found is actually usable
Expand Down Expand Up @@ -525,9 +566,9 @@ def import_existing_deployments(used_deployments)
# Note that tasks that are already reconfigured because of
# {#adapt_existing_deployment} will be fine as the task is not
# configured yet
def reconfigure_tasks_on_static_port_modification(deployed_tasks)
final_deployed_tasks = deployed_tasks.dup

def reconfigure_tasks_on_static_port_modification(
reused_deployed_tasks, newly_deployed_tasks
)
# We filter against 'deployed_tasks' to always select the tasks
# that have been selected in this deployment. It does mean that
# the task is always the 'current' one, that is we would pick
Expand All @@ -538,7 +579,7 @@ def reconfigure_tasks_on_static_port_modification(deployed_tasks)
.find_tasks(Syskit::TaskContext).not_finished.not_finishing
.find_all { |t| !t.read_only? }
.find_all do |t|
deployed_tasks.include?(t) && (t.setting_up? || t.setup?)
reused_deployed_tasks.include?(t) && (t.setting_up? || t.setup?)
end

already_setup_tasks.each do |t|
Expand All @@ -552,10 +593,9 @@ def reconfigure_tasks_on_static_port_modification(deployed_tasks)
new_task = t.execution_agent.task(t.orocos_name, t.concrete_model)
merge_solver.apply_merge_group(t => new_task)
new_task.should_configure_after t.stop_event
final_deployed_tasks.delete(t)
final_deployed_tasks << new_task
reused_deployed_tasks.delete(t)
newly_deployed_tasks << new_task
end
final_deployed_tasks
end

# Find the "last" deployed task in a set of related deployed tasks
Expand Down Expand Up @@ -597,14 +637,13 @@ def adapt_existing_deployment(deployment_task, existing_deployment_task)
(orocos_name_to_existing[t.orocos_name] ||= []) << t
end

applied_merges = Set.new
deployed_tasks = deployment_task.each_executed_task.to_a
new_deployed_tasks = []
reused_deployed_tasks = []
deployed_tasks.each do |task|
existing_tasks =
orocos_name_to_existing[task.orocos_name] || []
unless existing_tasks.empty?
existing_task = find_current_deployed_task(existing_tasks)
end
existing_task = find_current_deployed_task(existing_tasks)

if !existing_task || !task.can_be_deployed_by?(existing_task)
debug do
Expand All @@ -629,25 +668,19 @@ def adapt_existing_deployment(deployment_task, existing_deployment_task)
"to finish before reconfiguring"
end

parent_task_contexts =
previous_task
.each_parent_task
.find_all { |t| t.kind_of?(Syskit::TaskContext) }

parent_task_contexts.each do |t|
t.remove_child(previous_task)
end
new_task.should_configure_after(previous_task.stop_event)
end
new_deployed_tasks << new_task
existing_task = new_task
else
reused_deployed_tasks << existing_task
end

merge_solver.apply_merge_group(task => existing_task)
applied_merges << existing_task
debug { " using #{existing_task} for #{task} (#{task.orocos_name})" }
end
work_plan.remove_task(deployment_task)
applied_merges
[new_deployed_tasks, reused_deployed_tasks]
end

# Computes the set of requirement tasks that should be used for
Expand Down Expand Up @@ -830,15 +863,15 @@ def apply_system_network_to_plan(
log_timepoint "final_network_postprocessing:#{block}"
end

# Finally, we should now only have deployed tasks. Verify it
# and compute the connection policies
if garbage_collect && validate_final_network
validate_final_network(required_instances, work_plan,
compute_deployments: compute_deployments)
log_timepoint "validate_final_network"
end

commit_work_plan

validate_reconfigured_tasks_are_not_held(@new_deployed_tasks)
end

def discard_work_plan
Expand Down Expand Up @@ -899,7 +932,13 @@ def handle_resolution_exception(e, on_error: :discard)
def validate_final_network(
required_instances, plan, compute_deployments: true
)
# Check that all device instances are proper tasks (not proxies)
validate_required_instances_are_tasks(required_instances)

super if defined? super
end

def validate_required_instances_are_tasks(required_instances)
# Check that the final set of root required instances are proper tasks
required_instances.each do |_req_task, task|
if task.transaction_proxy?
raise InternalError,
Expand All @@ -910,8 +949,32 @@ def validate_final_network(
"instance definition #{task} has been removed from plan"
end
end
end

super if defined? super
# Exception added to the plan when we detect that a task being reconfigured
# is held against garbage collection.
#
# This is an internal error (i.e. should not happen), but not triggering
# this causes the system to "hold on" forever.
class InternalErrorReconfiguredTaskIsHeld < Roby::LocalizedError
end

# Validate that "old" tasks in a reconfigured pair will be garbage collected
#
# This must be called at the very end
def validate_reconfigured_tasks_are_not_held(new_deployed_tasks)
reconfigured_tasks = new_deployed_tasks.flat_map do |task|
task.start_event.parent_objects(
Roby::EventStructure::SyskitConfigurationPrecedence
).map(&:task).to_a
end

useful_tasks = real_plan.useful_tasks
reconfigured_tasks.each do |t|
next unless useful_tasks.include?(t)

t.add_error(InternalErrorReconfiguredTaskIsHeld.new(t))
end
end

@@dot_index = 0
Expand Down
8 changes: 8 additions & 0 deletions lib/syskit/network_generation/merge_solver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,14 @@ def display_merge_graph(title, merge_graph)
break
end
end

def each_merge_leaf
return enum_for(__method__) unless block_given?

task_replacement_graph.each_vertex do |v|
yield(v) if task_replacement_graph.leaf?(v)
end
end
end
end
end
12 changes: 7 additions & 5 deletions lib/syskit/test/network_manipulation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,13 @@ def syskit_deploy(
resolve_options = Hash[on_error: :commit].merge(resolve_options)
begin
syskit_engine_resolve_handle_plan_export do
syskit_engine ||= Syskit::NetworkGeneration::Engine.new(plan)
syskit_engine.resolve(
default_deployment_group: default_deployment_group,
**resolve_options
)
execute do
syskit_engine ||= Syskit::NetworkGeneration::Engine.new(plan)
syskit_engine.resolve(
default_deployment_group: default_deployment_group,
**resolve_options
)
end
end
rescue StandardError => e
expect_execution do
Expand Down
Loading