From 16419d3b97b941c6bfc59d74ecc34a8966508215 Mon Sep 17 00:00:00 2001 From: maillard Date: Tue, 19 Dec 2023 14:17:52 -0800 Subject: [PATCH 1/2] Make resumable sim driver handle task scheduling This allows to remember references to scheduled tasks and thus to fix the bug resulting from the taskid of an anchored task not to be saved in plannedDirectiveToTask because they were scheduled by the sim engine. --- .../simulation/ResumableSimulationDriver.java | 140 ++++++++---------- .../scheduler/solver/PrioritySolver.java | 2 +- .../simulation/AnchorSchedulerTest.java | 15 ++ 3 files changed, 77 insertions(+), 80 deletions(-) diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java index b51713e771..2cf23d0897 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/simulation/ResumableSimulationDriver.java @@ -26,9 +26,11 @@ import java.time.Instant; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; public class ResumableSimulationDriver implements AutoCloseable { @@ -52,6 +54,9 @@ public class ResumableSimulationDriver implements AutoCloseable { //mapping each activity name to its task id (in String form) in the simulation engine private final Map plannedDirectiveToTask; + //subset of plannedDirectiveToTask to check for scheduling dependent tasks + private final Map toCheckForDependencyScheduling; + //simulation results so far private SimulationResults lastSimResults; //cached simulation results cover the period [Duration.ZERO, lastSimResultsEnd] @@ -71,6 +76,7 @@ public ResumableSimulationDriver( ){ this.missionModel = missionModel; plannedDirectiveToTask = new HashMap<>(); + toCheckForDependencyScheduling = new HashMap<>(); this.planDuration = planDuration; countSimulationRestarts = 0; this.canceledListener = canceledListener; @@ -94,6 +100,7 @@ private void printTimeSpent(){ printTimeSpent(); durationSinceRestart = 0; plannedDirectiveToTask.clear(); + toCheckForDependencyScheduling.clear(); lastSimResults = null; lastSimResultsEnd = Duration.ZERO; long before = System.nanoTime(); @@ -273,13 +280,14 @@ private void simulateSchedule(final Map schedule).compute(); // Filter out activities that are before the plan start resolved = StartOffsetReducer.filterOutNegativeStartOffset(resolved); - + final var toSchedule = new HashSet(); + toSchedule.add(null); scheduleActivities( + toSchedule, schedule, resolved, missionModel, - engine, - activityTopic + engine ); var allTaskFinished = false; @@ -303,6 +311,8 @@ private void simulateSchedule(final Map final var commit = engine.performJobs(batch.jobs(), cells, curTime, Duration.MAX_VALUE); timeline.add(commit); + scheduleActivities(getSuccessorsToSchedule(engine), schedule, resolved, missionModel, engine); + // all tasks are complete : do not exit yet, there might be event triggered at the same time if (!plannedDirectiveToTask.isEmpty() && plannedDirectiveToTask .values() @@ -328,93 +338,65 @@ private void simulateSchedule(final Map * @return its duration if the activity has been simulated and has finished simulating, an IllegalArgumentException otherwise */ public Optional getActivityDuration(ActivityDirectiveId activityDirectiveId){ + //potential cause of non presence: (1) activity is outside plan bounds (2) activity has not been simulated yet + if(!plannedDirectiveToTask.containsKey(activityDirectiveId)) return Optional.empty(); return engine.getTaskDuration(plannedDirectiveToTask.get(activityDirectiveId)); } + private Set getSuccessorsToSchedule(final SimulationEngine engine) { + final var toSchedule = new HashSet(); + final var iterator = toCheckForDependencyScheduling.entrySet().iterator(); + while(iterator.hasNext()){ + final var taskToCheck = iterator.next(); + if(engine.isTaskComplete(taskToCheck.getValue())){ + toSchedule.add(taskToCheck.getKey()); + iterator.remove(); + } + } + return toSchedule; + } + private void scheduleActivities( - final Map schedule, + final Set toScheduleNow, + final Map completeSchedule, final HashMap>> resolved, final MissionModel missionModel, - final SimulationEngine engine, - final Topic activityTopic - ) - { - if(resolved.get(null) == null) { return; } // Nothing to simulate - - for (final Pair directivePair : resolved.get(null)) { - final var directiveId = directivePair.getLeft(); - final var startOffset = directivePair.getRight(); - final var serializedDirective = schedule.get(directiveId).serializedActivity(); - - final TaskFactory task; - try { - task = missionModel.getTaskFactory(serializedDirective); - } catch (final InstantiationException ex) { - // All activity instantiations are assumed to be validated by this point - throw new Error("Unexpected state: activity instantiation %s failed with: %s" - .formatted(serializedDirective.getTypeName(), ex.toString())); + final SimulationEngine engine){ + for(final var predecessor: toScheduleNow) { + for (final var directivePair : resolved.get(predecessor)) { + final var offset = directivePair.getRight(); + final var directiveIdToSchedule = directivePair.getLeft(); + final var serializedDirective = completeSchedule.get(directiveIdToSchedule).serializedActivity(); + final TaskFactory task; + try { + task = missionModel.getTaskFactory(serializedDirective); + } catch (final InstantiationException ex) { + // All activity instantiations are assumed to be validated by this point + throw new Error("Unexpected state: activity instantiation %s failed with: %s" + .formatted(serializedDirective.getTypeName(), ex.toString())); + } + Duration computedStartTime = offset; + if (predecessor != null) { + computedStartTime = (curTime.isEqualTo(Duration.MIN_VALUE) ? Duration.ZERO : curTime).plus(offset); + } + final var taskId = engine.scheduleTask( + computedStartTime, + makeTaskFactory(directiveIdToSchedule, task, activityTopic)); + plannedDirectiveToTask.put(directiveIdToSchedule, taskId); + if (resolved.containsKey(directiveIdToSchedule)) { + toCheckForDependencyScheduling.put(directiveIdToSchedule, taskId); + } } - - final var taskId = engine.scheduleTask(startOffset, makeTaskFactory( - directiveId, - task, - schedule, - resolved, - missionModel, - activityTopic - )); - plannedDirectiveToTask.put(directiveId,taskId); } } - private static TaskFactory makeTaskFactory( + private static TaskFactory makeTaskFactory( final ActivityDirectiveId directiveId, final TaskFactory task, - final Map schedule, - final HashMap>> resolved, - final MissionModel missionModel, - final Topic activityTopic - ) - { - // Emit the current activity (defined by directiveId) - return executor -> scheduler0 -> TaskStatus.calling((TaskFactory) (executor1 -> scheduler1 -> { - scheduler1.emit(directiveId, activityTopic); - return task.create(executor1).step(scheduler1); - }), scheduler2 -> { - // When the current activity finishes, get the list of the activities that needed this activity to finish to know their start time - final List> dependents = resolved.get(directiveId) == null ? List.of() : resolved.get(directiveId); - // Iterate over the dependents - for (final var dependent : dependents) { - scheduler2.spawn(executor2 -> scheduler3 -> - // Delay until the dependent starts - TaskStatus.delayed(dependent.getRight(), scheduler4 -> { - final var dependentDirectiveId = dependent.getLeft(); - final var serializedDependentDirective = schedule.get(dependentDirectiveId).serializedActivity(); - - // Initialize the Task for the dependent - final TaskFactory dependantTask; - try { - dependantTask = missionModel.getTaskFactory(serializedDependentDirective); - } catch (final InstantiationException ex) { - // All activity instantiations are assumed to be validated by this point - throw new Error("Unexpected state: activity instantiation %s failed with: %s" - .formatted(serializedDependentDirective.getTypeName(), ex.toString())); - } - - // Schedule the dependent - // When it finishes, it will schedule the activities depending on it to know their start time - scheduler4.spawn(makeTaskFactory( - dependentDirectiveId, - dependantTask, - schedule, - resolved, - missionModel, - activityTopic - )); - return TaskStatus.completed(Unit.UNIT); - })); - } - return TaskStatus.completed(Unit.UNIT); - }); + final Topic activityTopic) { + return executor -> scheduler -> { + scheduler.emit(directiveId, activityTopic); + return task.create(executor).step(scheduler); + }; } } diff --git a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java index f5a81bdfd8..9fe3b6f2d4 100644 --- a/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java +++ b/scheduler-driver/src/main/java/gov/nasa/jpl/aerie/scheduler/solver/PrioritySolver.java @@ -1028,7 +1028,7 @@ public Duration valueAt(Duration start, final EquationSolvingAlgorithms.History< if(computedDuration.isPresent()) { history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, start.plus(computedDuration.get())), new ActivityMetadata(actToSim)); } else{ - logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity."); + logger.debug("No simulation error but activity duration could not be found in simulation, likely caused by unfinished activity or activity outside plan bounds."); history.add(new EquationSolvingAlgorithms.FunctionCoordinate<>(start, null), new ActivityMetadata(actToSim)); } } catch (SimulationFacade.SimulationException e) { diff --git a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java index efc0353705..24f668ef9f 100644 --- a/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java +++ b/scheduler-driver/src/test/java/gov/nasa/jpl/aerie/scheduler/simulation/AnchorSchedulerTest.java @@ -339,6 +339,21 @@ public void activitiesAnchoredToOtherActivities() throws SchedulingInterruptedEx assertEquals(1, driver.getCountSimulationRestarts()); } + @Test + @DisplayName("Reference to anchored activities are correctly maintained by the driver") + public void activitiesAnchoredToOtherActivitiesSimple() throws SchedulingInterruptedException { + final var activitiesToSimulate = new HashMap(2); + activitiesToSimulate.put( + new ActivityDirectiveId(0), + new ActivityDirective(oneMinute, serializedDelayDirective, null, true)); + activitiesToSimulate.put( + new ActivityDirectiveId(1), + new ActivityDirective(oneMinute, serializedDelayDirective, new ActivityDirectiveId(0), false)); + driver.simulateActivities(activitiesToSimulate); + final var durationOfAnchoredActivity = driver.getActivityDuration(new ActivityDirectiveId(1)); + assertTrue(durationOfAnchoredActivity.isPresent()); + } + @Test @DisplayName("Decomposition and anchors do not interfere with each other") public void decomposingActivitiesAndAnchors() throws SchedulingInterruptedException{ From 6ebef0a81cf53e64cc276636a323b3c1344cf04b Mon Sep 17 00:00:00 2001 From: maillard Date: Tue, 19 Dec 2023 16:29:35 -0800 Subject: [PATCH 2/2] Update test with uncontrollable activity All other tests are with growbanana --- .../services/SchedulingIntegrationTests.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java b/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java index b8691dbd29..fd5731ff8f 100644 --- a/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java +++ b/scheduler-worker/src/test/java/gov/nasa/jpl/aerie/scheduler/worker/services/SchedulingIntegrationTests.java @@ -2695,15 +2695,14 @@ void testRelativeActivityPlanPositiveStartOffsetStart() { new ActivityDirectiveId(2L), new ActivityDirective( tenMinutes, - "GrowBanana", + "PickBanana", Map.of( - "quantity", SerializedValue.of(1), - "growingDuration", SerializedValue.of(activityDuration.in(Duration.MICROSECONDS))), + "quantity", SerializedValue.of(1)), new ActivityDirectiveId(1L), true)), List.of(new SchedulingGoal(new GoalId(0L), """ export default () => Goal.CoexistenceGoal({ - forEach: ActivityExpression.ofType(ActivityTypes.GrowBanana), + forEach: ActivityExpression.ofType(ActivityTypes.PickBanana), activityTemplate: ActivityTemplates.PeelBanana({peelDirection: "fromStem"}), startsAt: TimingConstraint.singleton(WindowProperty.START).plus(Temporal.Duration.from({ minutes : 5})) }) @@ -2724,20 +2723,20 @@ export default () => Goal.CoexistenceGoal({ final var planByActivityType = partitionByActivityType(results.updatedPlan()); final var peelBananas = planByActivityType.get("PeelBanana"); - final var growBananas = planByActivityType.get("GrowBanana"); + final var pickBananas = planByActivityType.get("PickBanana"); final var durationParamActivities = planByActivityType.get("DurationParameterActivity"); assertEquals(1, peelBananas.size()); - assertEquals(1, growBananas.size()); + assertEquals(1, pickBananas.size()); assertEquals(1, durationParamActivities.size()); final var peelBanana = peelBananas.iterator().next(); - final var growBanana = growBananas.iterator().next(); + final var pickBanana = pickBananas.iterator().next(); final var durationParamActivity = durationParamActivities.iterator().next(); assertEquals(Duration.ZERO, durationParamActivity.startOffset()); - assertEquals(tenMinutes, growBanana.startOffset()); - assertEquals(SerializedValue.of(1), growBanana.serializedActivity().getArguments().get("quantity")); + assertEquals(tenMinutes, pickBanana.startOffset()); + assertEquals(SerializedValue.of(1), pickBanana.serializedActivity().getArguments().get("quantity")); assertEquals(Duration.of(15, Duration.MINUTES), peelBanana.startOffset()); assertEquals(SerializedValue.of("fromStem"), peelBanana.serializedActivity().getArguments().get("peelDirection"));