From 03216d01b826fcfb9c747b0204d0335456cf98a6 Mon Sep 17 00:00:00 2001 From: Vaibhav Sethi Date: Mon, 13 Jan 2025 12:23:54 +0530 Subject: [PATCH] Remove in-memory launching queue in RunRecordMonitorService --- .../guice/AppFabricServiceRuntimeModule.java | 4 +- .../services/AppFabricProcessorService.java | 5 +- .../app/services/FlowControlService.java | 157 +++++++++ .../app/services/ProgramLifecycleService.java | 12 +- .../ProgramNotificationSubscriberService.java | 36 +- .../app/services/RunRecordMonitorService.java | 313 ------------------ .../internal/app/store/AppMetadataStore.java | 294 +++++++++++----- .../events/StartProgramEventSubscriber.java | 18 +- ...gramNotificationSubscriberServiceTest.java | 2 +- .../app/store/AppMetadataStoreTest.java | 2 +- .../internal/app/store/DefaultStoreTest.java | 12 +- .../StartProgramEventSubscriberTest.java | 15 +- .../io/cdap/cdap/common/conf/Constants.java | 4 - .../internal/app/store/RunRecordDetail.java | 33 +- .../RunRecordDetailWithExistingStatus.java | 21 +- .../src/main/resources/cdap-default.xml | 25 -- .../io/cdap/cdap/store/StoreDefinition.java | 6 +- 17 files changed, 464 insertions(+), 495 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java delete mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 35cde21565a4..3d4e18e861c5 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -118,7 +118,7 @@ import io.cdap.cdap.internal.app.services.NoopRunRecordCorrectorService; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; import io.cdap.cdap.internal.app.services.RunRecordCorrectorService; -import io.cdap.cdap.internal.app.services.RunRecordMonitorService; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService; import io.cdap.cdap.internal.app.store.DefaultStore; import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules; @@ -453,7 +453,7 @@ protected void configure() { bind(ArtifactStore.class).in(Scopes.SINGLETON); bind(ProfileService.class).in(Scopes.SINGLETON); - bind(RunRecordMonitorService.class).in(Scopes.SINGLETON); + bind(FlowControlService.class).in(Scopes.SINGLETON); bind(ProgramLifecycleService.class).in(Scopes.SINGLETON); bind(SystemAppManagementService.class).in(Scopes.SINGLETON); bind(OwnerAdmin.class).to(DefaultOwnerAdmin.class); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java index d2a2d891db66..7a046b0ced24 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricProcessorService.java @@ -51,7 +51,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import javax.annotation.Nullable; import org.apache.twill.common.Cancellable; import org.apache.twill.discovery.DiscoveryService; import org.slf4j.Logger; @@ -75,7 +74,7 @@ public class AppFabricProcessorService extends AbstractIdleService { private final RunRecordCorrectorService runRecordCorrectorService; private final RunDataTimeToLiveService runDataTimeToLiveService; private final ProgramRunStatusMonitorService programRunStatusMonitorService; - private final RunRecordMonitorService runRecordCounterService; + private final FlowControlService runRecordCounterService; private final CoreSchedulerService coreSchedulerService; private final ProvisioningService provisioningService; private final BootstrapService bootstrapService; @@ -111,7 +110,7 @@ public AppFabricProcessorService(CConfiguration cConf, ProvisioningService provisioningService, BootstrapService bootstrapService, SystemAppManagementService systemAppManagementService, - RunRecordMonitorService runRecordCounterService, + FlowControlService runRecordCounterService, RunDataTimeToLiveService runDataTimeToLiveService, OperationNotificationSubscriberService operationNotificationSubscriberService, ScheduleNotificationSubscriberService scheduleNotificationSubscriberService) { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java new file mode 100644 index 000000000000..32e480c6d328 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/FlowControlService.java @@ -0,0 +1,157 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.inject.Inject; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.app.program.ProgramDescriptor; +import io.cdap.cdap.app.runtime.ProgramOptions; +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.proto.ProgramRunStatus; +import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintain and provides total number of launching and running run-records. This class is used by + * flow-control mechanism for launch requests. + */ +public class FlowControlService extends AbstractIdleService { + + private static final Logger LOG = LoggerFactory.getLogger(FlowControlService.class); + + private final MetricsCollectionService metricsCollectionService; + private final TransactionRunner transactionRunner; + + /** + * Monitors the program flow control. + * + * @param metricsCollectionService collect metrics + */ + @Inject + public FlowControlService( + MetricsCollectionService metricsCollectionService, + TransactionRunner transactionRunner) { + this.metricsCollectionService = metricsCollectionService; + this.transactionRunner = transactionRunner; + } + + @Override + protected void startUp() throws Exception { + LOG.info("FlowControlService started."); + } + + @Override + protected void shutDown() throws Exception { + LOG.info("FlowControlService successfully shut down."); + } + + /** + * Add a new in-flight launch request and return total number of launching and running programs. + * + * @param programRunId run id associated with the launch request + * @return total number of launching and running program runs. + */ + public Counter addRequestAndGetCounter(ProgramRunId programRunId, ProgramOptions programOptions, + ProgramDescriptor programDescriptor) throws Exception { + if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) { + throw new Exception("None time-based UUIDs are not supported"); + } + + Counter counter = TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + store.recordProgramPending(programRunId, + programOptions.getArguments().asMap(), + programOptions.getUserArguments().asMap(), + programDescriptor.getArtifactId().toApiArtifactId()); + int launchingCount = store.getFlowControlLaunchingCount(); + int runningCount = store.getFlowControlRunningCount(); + return new Counter(launchingCount, runningCount); + }); + LOG.info("Added request with runId {}.", programRunId); + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, counter.getLaunchingCount()); + + LOG.info( + "Counter has {} concurrent launching and {} running programs.", + counter.getLaunchingCount(), + counter.getRunningCount()); + return counter; + } + + /** + * Get total number of launching and running programs. + * + * @return Counter with total number of launching and running program runs. + */ + public Counter getCounter() { + return TransactionRunners.run(transactionRunner, context -> { + AppMetadataStore store = AppMetadataStore.create(context); + return new Counter(store.getFlowControlLaunchingCount(), store.getFlowControlRunningCount()); + }); + } + + public void emitFlowControlMetrics() { + Counter counter = getCounter(); + emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, counter.getLaunchingCount()); + emitMetrics(Constants.Metrics.FlowControl.RUNNING_COUNT, counter.getRunningCount()); + } + + private void emitMetrics(String metricName, long value) { + LOG.trace("Setting metric {} to value {}", metricName, value); + metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value); + } + + /** + * Counts the concurrent program runs. + */ + public class Counter { + + /** + * Total number of launch requests that have been accepted but still missing in metadata store + + * * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of + * run records with {@link ProgramRunStatus#STARTING} status. + */ + private final int launchingCount; + + /** + * Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run + * records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with + * {@link ProgramRunStatus#RESUMING} status. + */ + private final int runningCount; + + Counter(int launchingCount, int runningCount) { + this.launchingCount = launchingCount; + this.runningCount = runningCount; + } + + public int getLaunchingCount() { + return launchingCount; + } + + public int getRunningCount() { + return runningCount; + } + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java index 8bd33edccd14..c8bdd8b0c19f 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramLifecycleService.java @@ -149,7 +149,7 @@ public class ProgramLifecycleService { private final int defaultStopTimeoutSecs; private final int batchSize; private final ArtifactRepository artifactRepository; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private final boolean userProgramLaunchDisabled; @Inject @@ -161,7 +161,7 @@ public class ProgramLifecycleService { ProvisionerNotifier provisionerNotifier, ProvisioningService provisioningService, ProgramStateWriter programStateWriter, CapabilityReader capabilityReader, ArtifactRepository artifactRepository, - RunRecordMonitorService runRecordMonitorService) { + FlowControlService flowControlService) { this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS); this.maxConcurrentLaunching = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_LAUNCHING); this.defaultStopTimeoutSecs = cConf.getInt(Constants.AppFabric.PROGRAM_MAX_STOP_SECONDS); @@ -180,7 +180,7 @@ public class ProgramLifecycleService { this.programStateWriter = programStateWriter; this.capabilityReader = capabilityReader; this.artifactRepository = artifactRepository; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; } /** @@ -730,8 +730,8 @@ public RunId runInternal(ProgramId programId, Map userArgs, checkCapability(programDescriptor); ProgramRunId programRunId = programId.run(runId); - RunRecordMonitorService.Counter counter = runRecordMonitorService.addRequestAndGetCount( - programRunId); + FlowControlService.Counter counter = flowControlService.addRequestAndGetCounter( + programRunId, programOptions, programDescriptor); boolean done = false; try { @@ -765,7 +765,7 @@ public RunId runInternal(ProgramId programId, Map userArgs, done = true; } finally { if (!done) { - runRecordMonitorService.removeRequest(programRunId, false); + flowControlService.emitFlowControlMetrics(); } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java index 88fe9dc5ff0e..a077d6d0cfc9 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberService.java @@ -112,7 +112,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { private final ProgramStateWriter programStateWriter; private final TransactionRunner transactionRunner; private final Store store; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private Service delegate; private Set programCompletionNotifiers; @@ -127,7 +127,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { ProgramStateWriter programStateWriter, TransactionRunner transactionRunner, Store store, - RunRecordMonitorService runRecordMonitorService) { + FlowControlService flowControlService) { this.messagingService = messagingService; this.cConf = cConf; @@ -138,7 +138,7 @@ public class ProgramNotificationSubscriberService extends AbstractIdleService { this.programStateWriter = programStateWriter; this.transactionRunner = transactionRunner; this.store = store; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; this.programCompletionNotifiers = Collections.emptySet(); } @@ -163,8 +163,7 @@ protected void startUp() throws Exception { } private void emitFlowControlMetrics() { - runRecordMonitorService.emitLaunchingMetrics(); - runRecordMonitorService.emitRunningMetrics(); + flowControlService.emitFlowControlMetrics(); } private void restoreActiveRuns() { @@ -184,23 +183,22 @@ private void restoreActiveRuns() { } try { LOG.info("Found active run: {}", runRecordDetail.getProgramRunId()); - if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) { - runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); - } else if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) { - runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); - // It is unknown what is the state of program runs in STARTING state. - // A STARTING message is published again to retry STARTING logic. + if (runRecordDetail.getStatus() == ProgramRunStatus.STARTING) { ProgramOptions programOptions = new SimpleProgramOptions( runRecordDetail.getProgramRunId().getParent(), new BasicArguments(runRecordDetail.getSystemArgs()), new BasicArguments(runRecordDetail.getUserArgs())); + ProgramDescriptor programDescriptor = this.store.loadProgram( + runRecordDetail.getProgramRunId().getParent()); + // It is unknown what is the state of program runs in STARTING state. + // A STARTING message is published again to retry STARTING logic. LOG.debug("Retrying to start run {}.", runRecordDetail.getProgramRunId()); programStateWriter.start( runRecordDetail.getProgramRunId(), programOptions, null, - this.store.loadProgram(runRecordDetail.getProgramRunId().getParent())); + programDescriptor); } } catch (Exception e) { ProgramRunId programRunId = runRecordDetail.getProgramRunId(); @@ -234,7 +232,7 @@ private ProgramNotificationSingleTopicSubscriberService createChildService( provisioningService, programStateWriter, transactionRunner, - runRecordMonitorService, + flowControlService, name, topicName, programCompletionNotifiers); @@ -275,7 +273,7 @@ class ProgramNotificationSingleTopicSubscriberService private final Queue tasks; private final MetricsCollectionService metricsCollectionService; private Set programCompletionNotifiers; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private final boolean checkTxSeparation; ProgramNotificationSingleTopicSubscriberService( @@ -287,7 +285,7 @@ class ProgramNotificationSingleTopicSubscriberService ProvisioningService provisioningService, ProgramStateWriter programStateWriter, TransactionRunner transactionRunner, - RunRecordMonitorService runRecordMonitorService, + FlowControlService flowControlService, String name, String topicName, Set programCompletionNotifiers) { @@ -310,7 +308,7 @@ class ProgramNotificationSingleTopicSubscriberService this.tasks = new LinkedList<>(); this.metricsCollectionService = metricsCollectionService; this.programCompletionNotifiers = programCompletionNotifiers; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; // If number of partitions equals 1, DB deadlock cannot happen as a result of concurrent // modifications to @@ -582,7 +580,7 @@ private void handleProgramEvent( appMetadataStore.recordProgramRunning( programRunId, logicalStartTimeSecs, twillRunId, messageIdBytes); writeToHeartBeatTable(recordedRunRecord, logicalStartTimeSecs, programHeartbeatTable); - runRecordMonitorService.removeRequest(programRunId, true); + flowControlService.emitFlowControlMetrics(); long startDelayTime = logicalStartTimeSecs - RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); emitStartingTimeMetric(programRunId, startDelayTime, recordedRunRecord); @@ -660,7 +658,7 @@ private void handleProgramEvent( Constants.Metrics.Program.PROGRAM_REJECTED_RUNS, null) .ifPresent(runnables::add); - runRecordMonitorService.removeRequest(programRunId, true); + flowControlService.emitFlowControlMetrics(); break; default: // This should not happen @@ -774,7 +772,7 @@ private RunRecordDetail handleProgramCompletion( programCompletionNotifiers.forEach( notifier -> notifier.onProgramCompleted(programRunId, recordedRunRecord.getStatus())); - runRecordMonitorService.removeRequest(programRunId, true); + flowControlService.emitFlowControlMetrics(); }); } return recordedRunRecord; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java deleted file mode 100644 index 89c2fb0c8a9b..000000000000 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordMonitorService.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Copyright © 2022 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.app.services; - -import com.google.common.util.concurrent.AbstractScheduledService; -import com.google.inject.Inject; -import io.cdap.cdap.api.metrics.MetricsCollectionService; -import io.cdap.cdap.app.runtime.ProgramRuntimeService; -import io.cdap.cdap.common.app.RunIds; -import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.conf.Constants.Metrics.FlowControl; -import io.cdap.cdap.proto.ProgramRunStatus; -import io.cdap.cdap.proto.ProgramType; -import io.cdap.cdap.proto.id.ProgramRunId; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.twill.common.Threads; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Maintain and return total number of launching and running run-records. This class is used by - * flow-control mechanism for launch requests. It also has a cleanup mechanism to automatically - * remove old (i.e., configurable) entries from the counter as a safe-guard mechanism. - */ -public class RunRecordMonitorService extends AbstractScheduledService { - - private static final Logger LOG = LoggerFactory.getLogger(RunRecordMonitorService.class); - - /** - * Contains ProgramRunIds of runs that have been accepted, but have not been added to metadata - * store plus all run records with {@link ProgramRunStatus#PENDING} or {@link - * ProgramRunStatus#STARTING} status. - */ - private final BlockingQueue launchingQueue; - - private final ProgramRuntimeService runtimeService; - private final long ageThresholdSec; - private final CConfiguration cConf; - private final MetricsCollectionService metricsCollectionService; - private final int maxConcurrentRuns; - private ScheduledExecutorService executor; - - /** - * Tracks the program runs. - * - * @param cConf configuration - * @param runtimeService service to get info on programs - * @param metricsCollectionService collect metrics - */ - @Inject - public RunRecordMonitorService( - CConfiguration cConf, - ProgramRuntimeService runtimeService, - MetricsCollectionService metricsCollectionService) { - this.cConf = cConf; - this.runtimeService = runtimeService; - this.metricsCollectionService = metricsCollectionService; - - this.launchingQueue = - new PriorityBlockingQueue<>( - 128, Comparator.comparingLong(o -> RunIds.getTime(o.getRun(), TimeUnit.MILLISECONDS))); - this.ageThresholdSec = cConf.getLong(Constants.AppFabric.MONITOR_RECORD_AGE_THRESHOLD_SECONDS); - this.maxConcurrentRuns = cConf.getInt(Constants.AppFabric.MAX_CONCURRENT_RUNS); - } - - @Override - protected void startUp() throws Exception { - LOG.info("RunRecordMonitorService started."); - } - - @Override - protected void shutDown() throws Exception { - if (executor != null) { - executor.shutdownNow(); - } - LOG.info("RunRecordMonitorService successfully shut down."); - } - - @Override - protected void runOneIteration() throws Exception { - cleanupQueue(); - } - - @Override - protected Scheduler scheduler() { - return Scheduler.newFixedRateSchedule( - 0, cConf.getInt(Constants.AppFabric.MONITOR_CLEANUP_INTERVAL_SECONDS), TimeUnit.SECONDS); - } - - @Override - protected final ScheduledExecutorService executor() { - executor = - Executors.newSingleThreadScheduledExecutor( - Threads.createDaemonThreadFactory("run-record-monitor-service-cleanup-scheduler")); - return executor; - } - - /** - * Add a new in-flight launch request and return total number of launching and running programs. - * - * @param programRunId run id associated with the launch request - * @return total number of launching and running program runs. - */ - public Counter addRequestAndGetCount(ProgramRunId programRunId) throws Exception { - if (RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) == -1) { - throw new Exception("None time-based UUIDs are not supported"); - } - - int launchingCount = addRequest(programRunId); - int runningCount = getProgramsRunningCount(); - - LOG.info( - "Counter has {} concurrent launching and {} running programs.", - launchingCount, - runningCount); - return new Counter(launchingCount, runningCount); - } - - /** - * Get imprecise (due to data races) total number of launching and running programs. - * - * @return total number of launching and running program runs. - */ - public Counter getCount() { - int launchingCount = launchingQueue.size(); - int runningCount = getProgramsRunningCount(); - - return new Counter(launchingCount, runningCount); - } - - /** - * Add a new in-flight launch request. - * - * @param programRunId run id associated with the launch request - */ - public int addRequest(ProgramRunId programRunId) { - int result; - synchronized (launchingQueue) { - launchingQueue.add(programRunId); - result = launchingQueue.size(); - } - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, result); - LOG.info("Added request with runId {}.", programRunId); - return result; - } - - /** - * Remove the request with the provided programRunId when the request is no longer launching. - * I.e., not in-flight, not in {@link ProgramRunStatus#PENDING} and not in {@link - * ProgramRunStatus#STARTING} - * - * @param programRunId of the request to be removed from launching queue. - * @param emitRunningChange if true, also updates {@link - * Constants.Metrics.FlowControl#RUNNING_COUNT} - */ - public void removeRequest(ProgramRunId programRunId, boolean emitRunningChange) { - if (launchingQueue.remove(programRunId)) { - LOG.info( - "Removed request with runId {}. Counter has {} concurrent launching requests.", - programRunId, - launchingQueue.size()); - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size()); - } - - if (emitRunningChange) { - emitRunningMetrics(); - } - } - - public void emitLaunchingMetrics(long value) { - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, value); - } - - /** - * Emit the {@link Constants.Metrics.FlowControl#LAUNCHING_COUNT} metric for runs. - */ - public void emitLaunchingMetrics() { - emitMetrics(Constants.Metrics.FlowControl.LAUNCHING_COUNT, launchingQueue.size()); - } - - - /** - * Emit the {@link Constants.Metrics.FlowControl#RUNNING_COUNT} metric for runs. - */ - public void emitRunningMetrics() { - emitMetrics(FlowControl.RUNNING_COUNT, getProgramsRunningCount()); - } - - private void emitMetrics(String metricName, long value) { - LOG.trace("Setting metric {} to value {}", metricName, value); - metricsCollectionService.getContext(Collections.emptyMap()).gauge(metricName, value); - } - - private void cleanupQueue() { - while (true) { - ProgramRunId programRunId = launchingQueue.peek(); - if (programRunId == null - || RunIds.getTime(programRunId.getRun(), TimeUnit.MILLISECONDS) + (ageThresholdSec * 1000) - >= System.currentTimeMillis()) { - // Queue is empty or queue head has not expired yet. - break; - } - // Queue head might have already been removed. So instead of calling poll, we call remove. - if (launchingQueue.remove(programRunId)) { - LOG.info("Removing request with runId {} due to expired retention time.", programRunId); - } - } - // Always emit both metrics after cleanup. - emitLaunchingMetrics(); - emitRunningMetrics(); - } - - /** - * Returns the total number of programs in running state. The count includes batch (i.e., {@link - * ProgramType#WORKFLOW}), streaming (i.e., {@link ProgramType#SPARK}) with no parent and - * replication (i.e., {@link ProgramType#WORKER}) jobs. - */ - private int getProgramsRunningCount() { - List list = - runtimeService.listAll( - ProgramType.WORKFLOW, ProgramType.WORKER, ProgramType.SPARK, ProgramType.MAPREDUCE); - - int launchingCount = launchingQueue.size(); - - // We use program controllers (instead of querying metadata store) to count the total number of - // programs in running state. - // A program controller is created when a launch request is in the middle of starting state. - // Therefore, the returning running count is NOT precise. - int impreciseRunningCount = - (int) list.stream() - .filter(r -> isRunning(r.getController().getState().getRunStatus())) - .count(); - - if (maxConcurrentRuns < 0 || (launchingCount + impreciseRunningCount < maxConcurrentRuns)) { - // It is safe to return the imprecise value since either flow control for runs is disabled - // (i.e., -1) or flow control will not reject an incoming request yet. - return impreciseRunningCount; - } - - // Flow control is at the threshold. We return the precise count. - return (int) list.stream() - .filter( - r -> - isRunning(r.getController().getState().getRunStatus()) - && !launchingQueue.contains(r.getController().getProgramRunId())) - .count(); - } - - private boolean isRunning(ProgramRunStatus status) { - if (status == ProgramRunStatus.RUNNING - || status == ProgramRunStatus.SUSPENDED - || status == ProgramRunStatus.RESUMING) { - return true; - } - - return false; - } - - /** - * Counts the concurrent program runs. - */ - public class Counter { - - /** - * Total number of launch requests that have been accepted but still missing in metadata store + - * * total number of run records with {@link ProgramRunStatus#PENDING} status + total number of - * run records with {@link ProgramRunStatus#STARTING} status. - */ - private final int launchingCount; - - /** - * Total number of run records with {@link ProgramRunStatus#RUNNING} status + Total number of run - * records with {@link ProgramRunStatus#SUSPENDED} status + Total number of run records with - * {@link ProgramRunStatus#RESUMING} status. - */ - private final int runningCount; - - Counter(int launchingCount, int runningCount) { - this.launchingCount = launchingCount; - this.runningCount = runningCount; - } - - public int getLaunchingCount() { - return launchingCount; - } - - public int getRunningCount() { - return runningCount; - } - } -} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java index c8d5e1def038..c6a362d94133 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; @@ -150,6 +151,16 @@ public class AppMetadataStore { .put(ProgramRunStatus.REJECTED, TYPE_RUN_RECORD_COMPLETED) .build(); + private static final String TYPE_FLOW_CONTROL_LAUNCHING = "launching"; + private static final String TYPE_FLOW_CONTROL_RUNNING = "running"; + private static final String TYPE_FLOW_CONTROL_NONE = ""; + + // Program types controlled by flow-control mechanism. + private static final Set CONTROL_FLOW_PROGRAM_TYPES = ImmutableSet.of(ProgramType.MAPREDUCE, + ProgramType.WORKFLOW, + ProgramType.SPARK, + ProgramType.WORKER); + private final StructuredTableContext context; private StructuredTable applicationSpecificationTable; private StructuredTable applicationEditTable; @@ -898,11 +909,58 @@ private void addWorkflowNodeState(ProgramRunId programRunId, Map // Update the parent Workflow run record by adding node id and program run id in the properties Map properties = new HashMap<>(record.getProperties()); properties.put(workflowNodeId, programRunId.getRun()); - writeToStructuredTableWithPrimaryKeys( - runRecordFields, - RunRecordDetail.builder(record).setProperties(properties).setSourceId(sourceId).build(), - getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(runRecordFields, + RunRecordDetail.builder(record).setProperties(properties).setSourceId(sourceId).build()); + } + } + + /** + * Record that the program run is pending run. + * + * @param programRunId program run + * + * @return {@link RunRecordDetail} with status Pending. + */ + @Nullable + public RunRecordDetail recordProgramPending(ProgramRunId programRunId, Map runtimeArgs, + Map systemArgs, @Nullable ArtifactId artifactId) + throws IOException { + long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); + if (startTs == -1L) { + throw new IllegalArgumentException(String.format( + "Provisioning state for program run that does not have a timestamp in the run id: '%s'", programRunId)); } + + Optional profileId = SystemArguments.getProfileIdFromArgs( + programRunId.getNamespaceId(), systemArgs); + RunRecordDetail existing = getRun(programRunId); + + // If for some reason, there is an existing run record then return null. + if (existing != null) { + LOG.warn( + "Ignoring unexpected request to record pending state for program run {} that has an " + + "existing run record in run state {} and cluster state {}.", + programRunId, existing.getStatus()); + return null; + } + + ProgramRunCluster cluster = new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, null, null); + RunRecordDetail meta = RunRecordDetail.builder() + .setProgramRunId(programRunId) + .setStartTime(startTs) + .setProperties(getRecordProperties(systemArgs, runtimeArgs)) + .setSystemArgs(systemArgs) + .setCluster(cluster) + .setStatus(ProgramRunStatus.PENDING) + .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) + .setArtifactId(artifactId) + .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .setProfileId(profileId.orElse(null)) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.PENDING, systemArgs)) + .build(); + writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + LOG.trace("Recorded {} for program {}", ProgramRunStatus.PENDING, programRunId); + return meta; } /** @@ -927,25 +985,6 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId, Map systemArgs, byte[] sourceId, @Nullable ArtifactId artifactId) throws IOException { - long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); - if (startTs == -1L) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that does not have " - - + "a timestamp in the run id.", programRunId); - return null; - } - - RunRecordDetail existing = getRun(programRunId); - // for some reason, there is an existing run record. - if (existing != null) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that has an existing " - + "run record in run state {} and cluster state {}.", - programRunId, existing.getStatus(), existing.getCluster().getStatus()); - return null; - } - Optional profileId = SystemArguments.getProfileIdFromArgs( programRunId.getNamespaceId(), systemArgs); if (!profileId.isPresent()) { @@ -957,20 +996,60 @@ public RunRecordDetail recordProgramProvisioning(ProgramRunId programRunId, ProgramRunCluster cluster = new ProgramRunCluster(ProgramRunClusterStatus.PROVISIONING, null, null); - RunRecordDetail meta = RunRecordDetail.builder() - .setProgramRunId(programRunId) - .setStartTime(startTs) - .setStatus(ProgramRunStatus.PENDING) - .setProperties(getRecordProperties(systemArgs, runtimeArgs)) - .setSystemArgs(systemArgs) - .setCluster(cluster) - .setProfileId(profileId.get()) - .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) - .setSourceId(sourceId) - .setArtifactId(artifactId) - .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) - .build(); - writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + + RunRecordDetail existing = getRun(programRunId); + RunRecordDetail meta; + if (existing == null) { + // Create a new run record if it doesn't exist. + long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); + if (startTs == -1L) { + LOG.error( + "Ignoring unexpected request to record provisioning state for program run {} that does not have " + + + "a timestamp in the run id.", programRunId); + return null; + } + meta = RunRecordDetail.builder() + .setProgramRunId(programRunId) + .setStartTime(startTs) + .setStatus(ProgramRunStatus.PENDING) + .setProperties(getRecordProperties(systemArgs, runtimeArgs)) + .setSystemArgs(systemArgs) + .setCluster(cluster) + .setProfileId(profileId.get()) + .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) + .setSourceId(sourceId) + .setArtifactId(artifactId) + .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.PENDING, systemArgs)) + .build(); + writeNewRunRecord(meta, TYPE_RUN_RECORD_ACTIVE); + } else { + if (existing.getStatus() != ProgramRunStatus.PENDING && existing.getStatus() != ProgramRunStatus.SUSPENDED) { + LOG.error( + "Ignoring unexpected request to record provisioning state for program run {} that has " + + "a status in end state {}.", programRunId, existing.getStatus()); + return null; + } + delete(existing); + meta = RunRecordDetail.builder(existing) + .setStatus(ProgramRunStatus.PENDING) + .setProperties(getRecordProperties(systemArgs, runtimeArgs)) + .setSystemArgs(systemArgs) + .setCluster(cluster) + .setProfileId(profileId.get()) + .setPeerName(systemArgs.get(ProgramOptionConstants.PEER_NAME)) + .setSourceId(sourceId) + .setArtifactId(artifactId) + .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.PENDING, systemArgs)) + .build(); + List> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_ACTIVE, + existing.getProgramRunId(), + existing.getStartTs()); + writeRunRecordWithPrimaryKeys(key, meta); + } + LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONING, programRunId); return meta; } @@ -1031,8 +1110,7 @@ public RunRecordDetail recordProgramProvisioned(ProgramRunId programRunId, int n .setCluster(cluster) .setSourceId(sourceId) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.PROVISIONED, existing.getProgramRunId()); return meta; @@ -1076,8 +1154,7 @@ public RunRecordDetail recordProgramDeprovisioning(ProgramRunId programRunId, by .setCluster(cluster) .setSourceId(sourceId) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.DEPROVISIONING, existing.getProgramRunId()); return meta; @@ -1122,8 +1199,7 @@ public RunRecordDetail recordProgramDeprovisioned(ProgramRunId programRunId, @Nu .setCluster(cluster) .setSourceId(sourceId) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.DEPROVISIONED, existing.getProgramRunId()); return meta; @@ -1167,10 +1243,9 @@ public RunRecordDetail recordProgramOrphaned(ProgramRunId programRunId, long end .setCluster(cluster) .setSourceId(sourceId) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunClusterStatus.ORPHANED, - existing.getProgramRunId()); + meta.getProgramRunId()); return meta; } @@ -1179,32 +1254,30 @@ public RunRecordDetail recordProgramRejected(ProgramRunId programRunId, Map runtimeArgs, Map systemArgs, byte[] sourceId, @Nullable ArtifactId artifactId) throws IOException { - long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); - if (startTs == -1L) { - LOG.error( - "Ignoring unexpected request to record provisioning state for program run {} that does not have " - - + "a timestamp in the run id.", programRunId); - return null; - } - RunRecordDetail existing = getRun(programRunId); - // for some reason, there is an existing run record? - if (existing != null) { - LOG.error( - "Ignoring unexpected request to record rejected state for program run {} that has an existing " - + "run record in run state {} and cluster state {}.", - programRunId, existing.getStatus(), existing.getCluster().getStatus()); - return null; + + RunRecordDetail.Builder builder; + if (existing == null) { + LOG.warn( + "Unexpected request to record rejected state for program run {} that has no existing run record.", + programRunId); + long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); + if (startTs == -1L) { + LOG.error( + "Ignoring unexpected request to record provisioning state for program run {} that does not have " + + + "a timestamp in the run id.", programRunId); + return null; + } + builder = RunRecordDetail.builder().setProgramRunId(programRunId).setStartTime(startTs).setStopTime(startTs); + } else { + delete(existing); + builder = RunRecordDetail.builder(existing).setStopTime(existing.getStartTs()); } Optional profileId = SystemArguments.getProfileIdFromArgs( programRunId.getNamespaceId(), systemArgs); - RunRecordDetail meta = RunRecordDetail.builder() - .setProgramRunId(programRunId) - .setStartTime(startTs) - .setStopTime(startTs) // rejected: stop time == start time - .setStatus(ProgramRunStatus.REJECTED) + RunRecordDetail meta = builder.setStatus(ProgramRunStatus.REJECTED) .setProperties(getRecordProperties(systemArgs, runtimeArgs)) .setSystemArgs(systemArgs) .setProfileId(profileId.orElse(null)) @@ -1212,9 +1285,13 @@ public RunRecordDetail recordProgramRejected(ProgramRunId programRunId, .setArtifactId(artifactId) .setSourceId(sourceId) .setPrincipal(systemArgs.get(ProgramOptionConstants.PRINCIPAL)) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.REJECTED, systemArgs)) .build(); - writeNewRunRecord(meta, TYPE_RUN_RECORD_COMPLETED); + List> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_COMPLETED, + meta.getProgramRunId(), + meta.getStartTs()); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunStatus.REJECTED, programRunId); return meta; } @@ -1226,8 +1303,7 @@ private void writeNewRunRecord(RunRecordDetail meta, String typeRunRecordComplet throws IOException { List> fields = getProgramRunInvertedTimeKey(typeRunRecordCompleted, meta.getProgramRunId(), meta.getStartTs()); - writeToStructuredTableWithPrimaryKeys(fields, meta, getRunRecordsTable(), - StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(fields, meta); List> countKey = getProgramCountPrimaryKeys(TYPE_COUNT, meta.getProgramRunId().getParent()); getProgramCountsTable().increment(countKey, StoreDefinition.AppMetadataStore.COUNTS, 1L); @@ -1280,9 +1356,9 @@ public RunRecordDetail recordProgramStart(ProgramRunId programRunId, @Nullable S .setSystemArgs(newSystemArgs) .setTwillRunId(twillRunId) .setSourceId(sourceId) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.STARTING, newSystemArgs)) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunStatus.STARTING, existing.getProgramRunId()); return meta; } @@ -1333,9 +1409,9 @@ public RunRecordDetail recordProgramRunning(ProgramRunId programRunId, long stat .setStatus(ProgramRunStatus.RUNNING) .setTwillRunId(twillRunId) .setSourceId(sourceId) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.RUNNING, systemArgs)) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunStatus.RUNNING, existing.getProgramRunId()); return meta; } @@ -1410,7 +1486,9 @@ private RunRecordDetail recordProgramSuspendResume(byte[] sourceId, RunRecordDet List> key = getProgramRunInvertedTimeKey(TYPE_RUN_RECORD_ACTIVE, existing.getProgramRunId(), existing.getStartTs()); - RunRecordDetail.Builder builder = RunRecordDetail.builder(existing).setStatus(toStatus) + RunRecordDetail.Builder builder = RunRecordDetail.builder(existing) + .setStatus(toStatus) + .setFlowControlStatus(getFlowControlStatus(existing.getProgramRunId(), toStatus, existing.getSystemArgs())) .setSourceId(sourceId); if (timestamp != -1) { if (action.equals("resume")) { @@ -1420,8 +1498,7 @@ private RunRecordDetail recordProgramSuspendResume(byte[] sourceId, RunRecordDet } } RunRecordDetail meta = builder.build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", toStatus, existing.getProgramRunId()); return meta; } @@ -1475,9 +1552,9 @@ public RunRecordDetail recordProgramStopping(ProgramRunId programRunId, byte[] s .setStoppingTime(stoppingTsSecs) .setTerminateTs(terminateTsSecs) .setSourceId(sourceId) + .setFlowControlStatus(getFlowControlStatus(programRunId, ProgramRunStatus.STOPPING, systemArgs)) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", ProgramRunStatus.STOPPING, existing.getProgramRunId()); return meta; } @@ -1529,9 +1606,9 @@ public RunRecordDetailWithExistingStatus recordProgramStop(ProgramRunId programR .setStopTime(stopTs) .setStatus(runStatus) .setSourceId(sourceId) + .setFlowControlStatus(getFlowControlStatus(programRunId, runStatus, systemArgs)) .build(); - writeToStructuredTableWithPrimaryKeys( - key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + writeRunRecordWithPrimaryKeys(key, meta); LOG.trace("Recorded {} for program {}", runStatus, existing.getProgramRunId()); return meta; } @@ -1643,6 +1720,34 @@ public int countActiveRuns(@Nullable Integer limit) throws IOException { return count.get(); } + /** + * Count all records in launching state. + * + * @return Count of records in launching state. + */ + public int getFlowControlLaunchingCount() throws IOException { + ImmutableList> keyPrefix = ImmutableList.of( + Fields.stringField(StoreDefinition.AppMetadataStore.RUN_STATUS, TYPE_RUN_RECORD_ACTIVE)); + Collection> filterIndexes = + ImmutableList.of( + Fields.stringField(StoreDefinition.AppMetadataStore.FLOW_CONTROL_STATUS, TYPE_FLOW_CONTROL_LAUNCHING)); + return (int) getRunRecordsTable().count(Arrays.asList(Range.singleton(keyPrefix)), filterIndexes); + } + + /** + * Count all records in running state. + * + * @return Count of records in running state. + */ + public int getFlowControlRunningCount() throws IOException { + ImmutableList> keyPrefix = ImmutableList.of( + Fields.stringField(StoreDefinition.AppMetadataStore.RUN_STATUS, TYPE_RUN_RECORD_ACTIVE)); + Collection> filterIndexes = + ImmutableList.of( + Fields.stringField(StoreDefinition.AppMetadataStore.FLOW_CONTROL_STATUS, TYPE_FLOW_CONTROL_RUNNING)); + return (int) getRunRecordsTable().count(Arrays.asList(Range.singleton(keyPrefix)), filterIndexes); + } + /** * Scans active runs, starting from the given cursor. * @@ -2920,6 +3025,33 @@ private static ApplicationId getApplicationIdFromRow(StructuredRow row) { row.getString(StoreDefinition.AppMetadataStore.VERSION_FIELD)); } + private String getFlowControlStatus(ProgramRunId programRunId, ProgramRunStatus runStatus, + Map systemArgs) { + if (!CONTROL_FLOW_PROGRAM_TYPES.contains(programRunId.getType())) { + return TYPE_FLOW_CONTROL_NONE; + } + if (programRunId.getParent().getNamespace() == NamespaceId.SYSTEM.getNamespace()) { + return TYPE_FLOW_CONTROL_NONE; + } + if (systemArgs.containsKey(ProgramOptionConstants.WORKFLOW_NAME)) { + return TYPE_FLOW_CONTROL_NONE; + } + if (runStatus == ProgramRunStatus.RUNNING) { + return TYPE_FLOW_CONTROL_RUNNING; + } + if (runStatus == ProgramRunStatus.PENDING || runStatus == ProgramRunStatus.STARTING) { + return TYPE_FLOW_CONTROL_LAUNCHING; + } + + return TYPE_FLOW_CONTROL_NONE; + } + + private void writeRunRecordWithPrimaryKeys(List> key, RunRecordDetail meta) throws IOException { + key.add(Fields.stringField(StoreDefinition.AppMetadataStore.FLOW_CONTROL_STATUS, meta.getFlowControlStatus())); + writeToStructuredTableWithPrimaryKeys( + key, meta, getRunRecordsTable(), StoreDefinition.AppMetadataStore.RUN_RECORD_DATA); + } + /** * Represents a position for scanning. */ diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java index e88502095182..2816dd10429c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/events/StartProgramEventSubscriber.java @@ -21,7 +21,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; -import io.cdap.cdap.internal.app.services.RunRecordMonitorService; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.proto.ProgramType; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.id.ProgramRunId; @@ -54,7 +54,7 @@ public class StartProgramEventSubscriber extends EventSubscriber { private final CConfiguration cConf; private final EventReaderProvider extensionProvider; private final ProgramLifecycleService lifecycleService; - private final RunRecordMonitorService runRecordMonitorService; + private final FlowControlService flowControlService; private ScheduledExecutorService executor; private Collection> readers; private ExecutorService threadPoolExecutor; @@ -66,17 +66,17 @@ public class StartProgramEventSubscriber extends EventSubscriber { * @param cConf CDAP configuration * @param extensionProvider eventReaderProvider for StartProgramEvent Readers * @param lifecycleService to publish start programs to TMS - * @param runRecordMonitorService basic flow-control + * @param flowControlService basic flow-control */ @Inject StartProgramEventSubscriber(CConfiguration cConf, EventReaderProvider extensionProvider, ProgramLifecycleService lifecycleService, - RunRecordMonitorService runRecordMonitorService) { + FlowControlService flowControlService) { this.cConf = cConf; this.extensionProvider = extensionProvider; this.lifecycleService = lifecycleService; - this.runRecordMonitorService = runRecordMonitorService; + this.flowControlService = flowControlService; maxConcurrentRuns = -1; } @@ -132,14 +132,14 @@ protected void runOneIteration() throws Exception { if (threadPoolExecutor != null) { for (EventReader reader : readers) { threadPoolExecutor.execute(() -> { - if (runRecordMonitorService.isRunning()) { + if (flowControlService.isRunning()) { // Only attempt to process event if there is no max or the current count is less than max if (hasNominalCapacity()) { processEvents(reader); } } else { - LOG.warn("RunRecordMonitorService not yet running, currently in state: {}." - + " Status will be checked again in next attempt.", runRecordMonitorService.state()); + LOG.warn("FlowControlService not yet running, currently in state: {}." + + " Status will be checked again in next attempt.", flowControlService.state()); } }); } @@ -153,7 +153,7 @@ protected void runOneIteration() throws Exception { */ @VisibleForTesting boolean hasNominalCapacity() { - RunRecordMonitorService.Counter counter = runRecordMonitorService.getCount(); + FlowControlService.Counter counter = flowControlService.getCounter(); // no limit if (maxConcurrentRuns <= 0) { return true; diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java index 6c2a9cfd6f5b..51a004cf8848 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ProgramNotificationSubscriberServiceTest.java @@ -361,7 +361,7 @@ public void testLaunchingCountMetricsOnRestart() throws Exception { // terminate the main service. notificationService.shutDown(); notificationService.startUp(); - // Running counts are not based on metadata store in RunRecordMonitorService so not asserting it + // Running counts are not based on metadata store in FlowControlService so not asserting it // here. Tasks.waitFor(0L, () -> queryMetrics(metricStore, SYSTEM_METRIC_PREFIX + FlowControl.LAUNCHING_COUNT, new HashMap<>()), 10, TimeUnit.SECONDS); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java index ab80eccd8eb8..452243c759b0 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java @@ -861,7 +861,7 @@ public void testDuplicateWritesIgnored() throws Exception { byte[] sourceId = new byte[] { 0 }; TransactionRunners.run(transactionRunner, context -> { AppMetadataStore store = AppMetadataStore.create(context); - assertSecondCallIsNull(() -> store.recordProgramProvisioning(runId, null, SINGLETON_PROFILE_MAP, + Assert.assertNotNull(store.recordProgramProvisioning(runId, null, SINGLETON_PROFILE_MAP, sourceId, ARTIFACT_ID)); assertSecondCallIsNull(() -> store.recordProgramProvisioned(runId, 0, sourceId)); assertSecondCallIsNull(() -> store.recordProgramStart(runId, null, Collections.emptyMap(), sourceId)); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java index 3173d0dee397..09e950f789cf 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/DefaultStoreTest.java @@ -435,7 +435,9 @@ public void testLogProgramRunHistory() { .setProperties(noRuntimeArgsProps) .setCluster(emptyCluster) .setArtifactId(artifactId) - .setSourceId(AppFabricTestHelper.createSourceId(sourceId)).build(); + .setSourceId(AppFabricTestHelper.createSourceId(sourceId)) + .setFlowControlStatus("") + .build(); RunRecordDetail actualRecord7 = store.getRun(programId.run(run7.getId())); Assert.assertEquals(expectedRunRecord7, actualRecord7); @@ -450,7 +452,9 @@ public void testLogProgramRunHistory() { .setProperties(noRuntimeArgsProps) .setCluster(emptyCluster) .setArtifactId(artifactId) - .setSourceId(AppFabricTestHelper.createSourceId(sourceId)).build(); + .setSourceId(AppFabricTestHelper.createSourceId(sourceId)) + .setFlowControlStatus("") + .build(); RunRecordDetail actualRecord8 = store.getRun(programId.run(run8.getId())); Assert.assertEquals(expectedRunRecord8, actualRecord8); @@ -470,7 +474,9 @@ public void testLogProgramRunHistory() { .setProperties(noRuntimeArgsProps) .setCluster(emptyCluster) .setArtifactId(artifactId) - .setSourceId(AppFabricTestHelper.createSourceId(sourceId)).build(); + .setSourceId(AppFabricTestHelper.createSourceId(sourceId)) + .setFlowControlStatus("") + .build(); RunRecordDetail actualRecord9 = store.getRun(programId.run(run9.getId())); Assert.assertEquals(expectedRunRecord9, actualRecord9); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java index 83f59b7e16f6..b36b6a0cb356 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/events/StartProgramEventSubscriberTest.java @@ -27,7 +27,7 @@ import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.internal.app.services.ProgramLifecycleService; -import io.cdap.cdap.internal.app.services.RunRecordMonitorService; +import io.cdap.cdap.internal.app.services.FlowControlService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; import io.cdap.cdap.internal.events.dummy.DummyEventReader; import io.cdap.cdap.internal.events.dummy.DummyEventReaderExtensionProvider; @@ -38,7 +38,6 @@ import java.util.Collection; import org.junit.Before; import org.junit.Test; -import org.mockito.Mock; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +48,8 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase { private static final Logger LOG = LoggerFactory.getLogger(StartProgramEventSubscriberTest.class); private ProgramLifecycleService lifecycleService; - private RunRecordMonitorService runRecordMonitorService; - private RunRecordMonitorService.Counter mockCounter; + private FlowControlService flowControlService; + private FlowControlService.Counter mockCounter; private CConfiguration cConf; private DummyEventReader eventReader; private Injector injector; @@ -59,16 +58,16 @@ public class StartProgramEventSubscriberTest extends AppFabricTestBase { @Before public void setup() { lifecycleService = Mockito.mock(ProgramLifecycleService.class); - runRecordMonitorService = Mockito.mock(RunRecordMonitorService.class); - mockCounter = Mockito.mock(RunRecordMonitorService.Counter.class); - Mockito.doReturn(mockCounter).when(runRecordMonitorService).getCount(); + flowControlService = Mockito.mock(FlowControlService.class); + mockCounter = Mockito.mock(FlowControlService.Counter.class); + Mockito.doReturn(mockCounter).when(flowControlService).getCounter(); cConf = CConfiguration.create(); eventReader = new DummyEventReader<>(mockedEvents()); injector = Guice.createInjector(new AbstractModule() { @Override protected void configure() { bind(ProgramLifecycleService.class).toInstance(lifecycleService); - bind(RunRecordMonitorService.class).toInstance(runRecordMonitorService); + bind(FlowControlService.class).toInstance(flowControlService); bind(CConfiguration.class).toInstance(cConf); bind(new TypeLiteral>() { }) diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index d1d4c667e052..8329794839f3 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -284,10 +284,6 @@ public static final class AppFabric { public static final String PROGRAM_TRANSACTION_CONTROL = "app.program.transaction.control"; public static final String MAX_CONCURRENT_RUNS = "app.max.concurrent.runs"; public static final String MAX_CONCURRENT_LAUNCHING = "app.max.concurrent.launching"; - public static final String MONITOR_RECORD_AGE_THRESHOLD_SECONDS = - "run.record.monitor.record.age.threshold.seconds"; - public static final String MONITOR_CLEANUP_INTERVAL_SECONDS = - "run.record.monitor.cleanup.interval.seconds"; public static final String PROGRAM_LAUNCH_THREADS = "app.program.launch.threads"; public static final String PROGRAM_KILL_THREADS = "app.program.kill.threads"; public static final String RUN_DATA_CLEANUP_TTL_DAYS = "app.run.records.ttl.days"; diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java index 4d70f44d2b5d..a2b5d7e945a9 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetail.java @@ -26,6 +26,7 @@ import io.cdap.cdap.proto.RunRecord; import io.cdap.cdap.proto.id.ProfileId; import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.runtime.spi.provisioner.Cluster; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -62,6 +63,10 @@ public class RunRecordDetail extends RunRecord { @Nullable private final String principal; + @SerializedName("flowcontrolstatus") + @Nullable + private final String flowControlStatus; + // carries the user arguments decoded from properties. No need to serialize since it is from the properties. private transient volatile Map userArgs; @@ -72,7 +77,7 @@ protected RunRecordDetail(ProgramRunId programRunId, long startTs, @Nullable Lon @Nullable Map properties, @Nullable Map systemArgs, @Nullable String twillRunId, ProgramRunCluster cluster, ProfileId profileId, @Nullable String peerName, byte[] sourceId, @Nullable ArtifactId artifactId, - @Nullable String principal) { + @Nullable String principal, @Nullable String flowControlStatus) { super(programRunId.getRun(), startTs, runTs, stopTs, suspendTs, resumeTs, stoppingTs, terminateTs, status, properties, cluster, profileId, peerName, programRunId.getVersion()); @@ -82,6 +87,7 @@ protected RunRecordDetail(ProgramRunId programRunId, long startTs, @Nullable Lon this.sourceId = sourceId; this.artifactId = artifactId; this.principal = principal; + this.flowControlStatus = flowControlStatus; } @Nullable @@ -137,6 +143,11 @@ public String getPrincipal() { return principal; } + @Nullable + public String getFlowControlStatus() { + return flowControlStatus; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -161,15 +172,16 @@ public boolean equals(Object o) { && Objects.equal(this.getTwillRunId(), that.getTwillRunId()) && Arrays.equals(this.getSourceId(), that.getSourceId()) && Objects.equal(this.getArtifactId(), that.getArtifactId()) - && Objects.equal(this.getPrincipal(), that.getPrincipal()); + && Objects.equal(this.getPrincipal(), that.getPrincipal()) + && Objects.equal(this.getFlowControlStatus(), that.getFlowControlStatus()); } @Override public int hashCode() { return Objects.hashCode(getProgramRunId(), getStartTs(), getRunTs(), getStopTs(), - getSuspendTs(), getResumeTs(), - getStoppingTs(), getTerminateTs(), getStatus(), getProperties(), getPeerName(), - getTwillRunId(), Arrays.hashCode(getSourceId()), getArtifactId(), getPrincipal()); + getSuspendTs(), getResumeTs(), getStoppingTs(), getTerminateTs(), getStatus(), + getProperties(), getPeerName(), getTwillRunId(), Arrays.hashCode(getSourceId()), + getArtifactId(), getPrincipal(), getFlowControlStatus()); } @Override @@ -193,6 +205,7 @@ public String toString() { .add("sourceId", getSourceId() == null ? null : Bytes.toHexString(getSourceId())) .add("artifactId", getArtifactId()) .add("principal", getPrincipal()) + .add("flowcontrolstatus", getFlowControlStatus()) .toString(); } @@ -239,6 +252,7 @@ public abstract static class ABuilder extends RunRecord.Buil protected byte[] sourceId; protected String principal; protected ArtifactId artifactId; + protected String flowControlStatus; protected ABuilder() { systemArgs = new HashMap<>(); @@ -252,6 +266,7 @@ protected ABuilder(RunRecordDetail record) { sourceId = record.getSourceId(); principal = record.getPrincipal(); artifactId = record.getArtifactId(); + flowControlStatus = record.getFlowControlStatus(); } public T setProgramRunId(ProgramRunId programRunId) { @@ -286,12 +301,16 @@ public T setArtifactId(ArtifactId artifactId) { this.artifactId = artifactId; return (T) this; } + public T setFlowControlStatus(String flowControlStatus) { + this.flowControlStatus = flowControlStatus; + return (T) this; + } public RunRecordDetail build() { if (programRunId == null) { throw new IllegalArgumentException("Run record run id must be specified."); } - if (sourceId == null) { + if (status != ProgramRunStatus.PENDING && sourceId == null) { throw new IllegalArgumentException("Run record source id must be specified."); } // we are not validating artifactId for null, @@ -300,7 +319,7 @@ public RunRecordDetail build() { return new RunRecordDetail(programRunId, startTs, runTs, stopTs, suspendTs, resumeTs, stoppingTs, terminateTs, status, properties, systemArgs, twillRunId, cluster, - profileId, peerName, sourceId, artifactId, principal); + profileId, peerName, sourceId, artifactId, principal, flowControlStatus); } } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetailWithExistingStatus.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetailWithExistingStatus.java index 9283757b8068..292152a1d23f 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetailWithExistingStatus.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/RunRecordDetailWithExistingStatus.java @@ -35,15 +35,15 @@ public final class RunRecordDetailWithExistingStatus extends RunRecordDetail { private RunRecordDetailWithExistingStatus(ProgramRunId programRunId, long startTs, @Nullable Long runTs, @Nullable Long stopTs, @Nullable Long suspendTs, @Nullable Long resumeTs, - @Nullable Long stoppingTs, @Nullable Long terminateTs, - ProgramRunStatus status, @Nullable Map properties, - @Nullable Map systemArgs, @Nullable String twillRunId, - ProgramRunCluster cluster, ProfileId profileId, @Nullable String peerName, - byte[] sourceId, @Nullable ArtifactId artifactId, - @Nullable String principal, @Nullable ProgramRunStatus existingStatus) { + @Nullable Long stoppingTs, @Nullable Long terminateTs, ProgramRunStatus status, + @Nullable Map properties, @Nullable Map systemArgs, + @Nullable String twillRunId, ProgramRunCluster cluster, ProfileId profileId, + @Nullable String peerName, byte[] sourceId, @Nullable ArtifactId artifactId, + @Nullable String principal, @Nullable ProgramRunStatus existingStatus, + @Nullable String flowControlStatus) { super(programRunId, startTs, runTs, stopTs, suspendTs, resumeTs, stoppingTs, terminateTs, - status, properties, - systemArgs, twillRunId, cluster, profileId, peerName, sourceId, artifactId, principal); + status, properties, systemArgs, twillRunId, cluster, profileId, peerName, sourceId, + artifactId, principal, flowControlStatus); this.existingStatus = existingStatus; } @@ -81,10 +81,9 @@ public RunRecordDetailWithExistingStatus build() { // artifactId could be null for program starts that were recorded pre 5.0 but weren't processed // we don't want to throw exception while processing them return new RunRecordDetailWithExistingStatus(programRunId, startTs, runTs, stopTs, suspendTs, - resumeTs, - stoppingTs, terminateTs, status, properties, systemArgs, + resumeTs, stoppingTs, terminateTs, status, properties, systemArgs, twillRunId, cluster, profileId, peerName, sourceId, artifactId, - principal, existingStatus); + principal, existingStatus, flowControlStatus); } } } diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 002975012a3f..eb2cdc26ddfe 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -575,31 +575,6 @@ - - run.record.monitor.record.age.threshold.seconds - 3600 - - Maximum amount of time (in seconds) in which a run record is retained in - run record monitor. - This is to safe guard launch requests flow-control such that if a request - is somehow stuck in PENDING/STARTING - state, it will be dropped from after the threshold. - Note that run.record.monitor.cleanup.interval.seconds might be needed to - changed if this config changes. - - - - - run.record.monitor.cleanup.interval.seconds - 60 - - Cleanup interval (in seconds) in which run record monitor service cleanup - logic runs to delete old entries. - Note that run.record.monitor.record.age.threshold.seconds might be needed - to changed if this config changes. - - - app.program.launch.threads 20 diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java index ac5d2c121ac3..6975bf0768b8 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java @@ -504,6 +504,7 @@ public static final class AppMetadataStore { public static final String RUN_STATUS = "run_status"; public static final String RUN_START_TIME = "run_start_time"; public static final String RUN_RECORD_DATA = "run_record_data"; + public static final String FLOW_CONTROL_STATUS = "flow_control_status"; public static final String WORKFLOW_DATA = "workflow_data"; public static final String COUNT_TYPE = "count_type"; public static final String COUNTS = "counts"; @@ -567,10 +568,11 @@ public static final class AppMetadataStore { Fields.stringType(PROGRAM_FIELD), Fields.longType(RUN_START_TIME), Fields.stringType(RUN_FIELD), + Fields.stringType(FLOW_CONTROL_STATUS), Fields.stringType(RUN_RECORD_DATA)) .withPrimaryKeys(RUN_STATUS, NAMESPACE_FIELD, APPLICATION_FIELD, VERSION_FIELD, - PROGRAM_TYPE_FIELD, - PROGRAM_FIELD, RUN_START_TIME, RUN_FIELD) + PROGRAM_TYPE_FIELD, PROGRAM_FIELD, RUN_START_TIME, RUN_FIELD) + .withIndexes(FLOW_CONTROL_STATUS) .build(); public static final StructuredTableSpecification WORKFLOWS_SPEC =