Skip to content

Commit

Permalink
Add support for excluding specific program types in pre upgrade job
Browse files Browse the repository at this point in the history
  • Loading branch information
anshumanks committed Feb 5, 2025
1 parent ea42fd0 commit 05ea79e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.cdap.cdap.proto.ProgramRecord;
import io.cdap.cdap.proto.ProgramRunStatus;
import io.cdap.cdap.proto.ProgramStatus;
import io.cdap.cdap.proto.ProgramType;
import io.cdap.cdap.proto.RunRecord;
import io.cdap.cdap.proto.codec.ConditionSpecificationCodec;
import io.cdap.cdap.proto.codec.CustomActionSpecificationCodec;
Expand All @@ -63,6 +64,7 @@
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -271,7 +273,7 @@ public List<BatchProgramResult> stop(NamespaceId namespace, List<BatchProgram> p
/**
* Stops all currently running programs.
*/
public void stopAll(NamespaceId namespace)
public void stopAll(NamespaceId namespace, Set<ProgramType> excludeProgramTypes)
throws IOException, UnauthenticatedException, InterruptedException, TimeoutException, UnauthorizedException,
ApplicationNotFoundException, BadRequestException {

Expand All @@ -282,6 +284,10 @@ public void stopAll(NamespaceId namespace)
List<ProgramRecord> programRecords = applicationClient.listPrograms(appId);
for (ProgramRecord programRecord : programRecords) {
try {
if (excludeProgramTypes.contains(programRecord.getType())) {
LOG.info("Skipping program {} of type {}", programRecord.getName(), programRecord.getType());
continue;
}
ProgramId program = appId.program(programRecord.getType(), programRecord.getName());
String status = this.getStatus(program);
if (!status.equals("STOPPED")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import io.cdap.cdap.proto.id.WorkflowId;
import io.cdap.cdap.security.impersonation.SecurityUtil;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -58,11 +60,22 @@ public class UpgradeJobMain {
private static final Logger LOG = LoggerFactory.getLogger(UpgradeJobMain.class);

public static void main(String[] args) {
if (args.length != 2) {
if (args.length < 2) {
throw new RuntimeException(
String.format("Invalid number of arguments to UpgradeJobMain. Needed 2, found %d",
String.format("Invalid number of arguments to UpgradeJobMain. Needed at least 2, found %d",
args.length));
}

// Extract the program types which you want to exclude
Set<ProgramType> excludeProgramTypes = new HashSet<>();
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("--exclude-program-type=")) {
String excludedType = args[i].substring("--exclude-program-type=".length());
ProgramType programType = ProgramType.valueOfCategoryName(excludedType.toLowerCase());
excludeProgramTypes.add(programType);
}
}

// TODO(CDAP-18299): Refactor to use internal service discovery mechanism instead of making calls via the router.
ConnectionConfig connectionConfig = ConnectionConfig.builder()
.setHostname(args[0])
Expand Down Expand Up @@ -93,15 +106,15 @@ public static void main(String[] args) {
try {
Retries.callWithRetries(
() -> {
suspendSchedulesAndStopPipelines(clientConfig);
suspendSchedulesAndStopPipelines(clientConfig, excludeProgramTypes);
return null;
}, retryStrategy, e -> e instanceof IOException || e instanceof NotFoundException);
} catch (Exception e) {
throw new RuntimeException("Failed to prepare instance for upgrade.", e);
}
}

private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig) throws Exception {
private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig, Set<ProgramType> excludeProgramTypes) throws Exception {
ApplicationClient applicationClient = new ApplicationClient(clientConfig);
ScheduleClient scheduleClient = new ScheduleClient(clientConfig);
ProgramClient programClient = new ProgramClient(clientConfig);
Expand Down Expand Up @@ -158,7 +171,7 @@ private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig)
"At least one pipeline in namespace " + namespaceId + " is still running.");
}
// All schedules are stopped, now stop all programs
programClient.stopAll(namespaceId);
programClient.stopAll(namespaceId, excludeProgramTypes);
}
}
}

0 comments on commit 05ea79e

Please sign in to comment.