Skip to content

Commit

Permalink
Change tagged job assignment behavior
Browse files Browse the repository at this point in the history
Once tag removed from instance for assignment purpose, current behavior is:
1) no new tagged tasks assigns to that instance.
2) drop the existing running tasks.

Change behavior for this:
1) keep no new tagged tasks assigns to that instance.
2) let existing tasks running to complete

Test added.
  • Loading branch information
Junkai committed Dec 5, 2024
1 parent 522554e commit eb136a1
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -825,14 +825,19 @@ private Set<Integer> filterTasks(String jobResource, Iterable<Integer> allPartit
// disabled. If instance is disabled and current state still exist on the instance,
// then controller needs to drop the current state, otherwise, the task can be marked as
// dropped and be reassigned to other instances
if (disableInstances.contains(assignedParticipant)
&& currStateOutput.getCurrentState(jobResource, new Partition(partitionName),
if (disableInstances.contains(assignedParticipant) &&
currStateOutput.getCurrentState(jobResource, new Partition(partitionName),
assignedParticipant) != null) {
paMap.put(partitionNumber,
new PartitionAssignment(assignedParticipant, TaskPartitionState.DROPPED.name()));
} else {
jobContext.setPartitionState(partitionNumber, TaskPartitionState.DROPPED);
filteredTasks.add(partitionNumber);
if (_manager.getHelixDataAccessor().getProperty(
_manager.getHelixDataAccessor().keyBuilder().instanceConfig(assignedParticipant))
== null) {
// Only drop the task if the instance is not alive, otherwise, the task will be continued
jobContext.setPartitionState(partitionNumber, TaskPartitionState.DROPPED);
filteredTasks.add(partitionNumber);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ private ResourceAssignment computeResourceMapping(String jobResource,
Map<String, Set<Integer>> tasksToDrop = new HashMap<>();

Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
getCurrentInstanceToTaskAssignments(liveInstances, currStateOutput, jobResource, tasksToDrop);
getCurrentInstanceToTaskAssignments(cache.getEnabledLiveInstances(), currStateOutput,
jobResource, tasksToDrop);

updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.apache.helix.task;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.collect.ImmutableMap;

import java.util.ArrayList;
import java.util.List;

import org.apache.helix.ConfigAccessor;
import org.apache.helix.integration.task.MockTask;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.model.InstanceConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestJobTagRemoval extends TaskTestBase {

@Test
public void testJobTagRemoval() throws InterruptedException {
String TEST_TAG = "testTag";

ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
InstanceConfig cfg =
configAccessor.getInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName());
cfg.addTag(TEST_TAG);
configAccessor.setInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName(), cfg);

Workflow.Builder builder = new Workflow.Builder("testWorkflow");
List<TaskConfig> taskConfigs = new ArrayList<>();
taskConfigs.add(
new TaskConfig.Builder().setTaskId("tagged_task").setCommand(MockTask.TASK_COMMAND)
.build());
JobConfig.Builder jobBuilder = new JobConfig.Builder().addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "5000"))
.setInstanceGroupTag(TEST_TAG);
JobConfig.Builder jobBuilder1 = new JobConfig.Builder().addTaskConfigs(taskConfigs)
.setJobCommandConfigMap(ImmutableMap.of(MockTask.JOB_DELAY, "1000"))
.setInstanceGroupTag(TEST_TAG);
builder.addJob("JOB0", jobBuilder);
builder.addJob("JOB1", jobBuilder1);
builder.addParentChildDependency("JOB0", "JOB1");
_driver.start(builder.build());

// Wait for the job to be created
_driver.pollForJobState("testWorkflow", "testWorkflow_JOB0", TaskState.IN_PROGRESS);

// Remove the tag
cfg = configAccessor.getInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName());
cfg.removeTag(TEST_TAG);
configAccessor.setInstanceConfig(CLUSTER_NAME, _participants[0].getInstanceName(), cfg);

// Wait for the job to complete
_driver.pollForJobState("testWorkflow", "testWorkflow_JOB0", TaskState.COMPLETED);
_driver.pollForJobState("testWorkflow", "testWorkflow_JOB1", TaskState.IN_PROGRESS);
JobContext ctx = _driver.getJobContext("testWorkflow_JOB1");
Assert.assertEquals(ctx.getPartitionState(0), null);
}
}

0 comments on commit eb136a1

Please sign in to comment.