diff --git a/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java b/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java index 9aeaeaa0f55a..ea3847d4a55d 100644 --- a/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java +++ b/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java @@ -44,6 +44,7 @@ import io.cdap.cdap.proto.ProgramRecord; import io.cdap.cdap.proto.ProgramRunStatus; import io.cdap.cdap.proto.ProgramStatus; +import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.RunRecord; import io.cdap.cdap.proto.codec.ConditionSpecificationCodec; import io.cdap.cdap.proto.codec.CustomActionSpecificationCodec; @@ -63,6 +64,7 @@ import java.net.URL; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -271,7 +273,7 @@ public List stop(NamespaceId namespace, List p /** * Stops all currently running programs. */ - public void stopAll(NamespaceId namespace) + public void stopAll(NamespaceId namespace, Set excludeProgramTypes) throws IOException, UnauthenticatedException, InterruptedException, TimeoutException, UnauthorizedException, ApplicationNotFoundException, BadRequestException { @@ -282,6 +284,10 @@ public void stopAll(NamespaceId namespace) List programRecords = applicationClient.listPrograms(appId); for (ProgramRecord programRecord : programRecords) { try { + if (excludeProgramTypes.contains(programRecord.getType())) { + LOG.info("Skipping program {} of type {}", programRecord.getName(), programRecord.getType()); + continue; + } ProgramId program = appId.program(programRecord.getType(), programRecord.getName()); String status = this.getStatus(program); if (!status.equals("STOPPED")) { diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java index 39042fb4a8ca..705a1e67647e 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java @@ -39,7 +39,9 @@ import io.cdap.cdap.proto.id.WorkflowId; import io.cdap.cdap.security.impersonation.SecurityUtil; import java.io.IOException; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -58,11 +60,22 @@ public class UpgradeJobMain { private static final Logger LOG = LoggerFactory.getLogger(UpgradeJobMain.class); public static void main(String[] args) { - if (args.length != 2) { + if (args.length < 2) { throw new RuntimeException( - String.format("Invalid number of arguments to UpgradeJobMain. Needed 2, found %d", + String.format("Invalid number of arguments to UpgradeJobMain. Needed at least 2, found %d", args.length)); } + + // Extract the program types which you want to exclude + Set excludeProgramTypes = new HashSet<>(); + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("--exclude-program-type=")) { + String excludedType = args[i].substring("--exclude-program-type=".length()); + ProgramType programType = ProgramType.valueOfCategoryName(excludedType.toLowerCase()); + excludeProgramTypes.add(programType); + } + } + // TODO(CDAP-18299): Refactor to use internal service discovery mechanism instead of making calls via the router. ConnectionConfig connectionConfig = ConnectionConfig.builder() .setHostname(args[0]) @@ -93,7 +106,7 @@ public static void main(String[] args) { try { Retries.callWithRetries( () -> { - suspendSchedulesAndStopPipelines(clientConfig); + suspendSchedulesAndStopPipelines(clientConfig, excludeProgramTypes); return null; }, retryStrategy, e -> e instanceof IOException || e instanceof NotFoundException); } catch (Exception e) { @@ -101,7 +114,7 @@ public static void main(String[] args) { } } - private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig) throws Exception { + private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig, Set excludeProgramTypes) throws Exception { ApplicationClient applicationClient = new ApplicationClient(clientConfig); ScheduleClient scheduleClient = new ScheduleClient(clientConfig); ProgramClient programClient = new ProgramClient(clientConfig); @@ -158,7 +171,7 @@ private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig) "At least one pipeline in namespace " + namespaceId + " is still running."); } // All schedules are stopped, now stop all programs - programClient.stopAll(namespaceId); + programClient.stopAll(namespaceId, excludeProgramTypes); } } }