Skip to content

Commit

Permalink
Fix RunRecordCorrectorService and AuditLogSubscriberService
Browse files Browse the repository at this point in the history
  • Loading branch information
vsethi09 committed Dec 19, 2024
1 parent 8806de4 commit b3e3e17
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.Service;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.ServiceLoggingContext;
Expand All @@ -40,6 +41,7 @@
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.scheduler.ScheduleNotificationSubscriberService;
import io.cdap.cdap.security.auth.AuditLogSubscriberService;
import io.cdap.cdap.sourcecontrol.RepositoryCleanupService;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
Expand All @@ -54,17 +56,17 @@
import org.slf4j.LoggerFactory;

/**
* AppFabric Server.
* AppFabric Processor Service which runs messaging subscriber services.
*/
public class AppFabricProcessorService extends AbstractIdleService {

// TODO: Use Logger
private static final Logger LOG = LoggerFactory.getLogger(AppFabricProcessorService.class);

private final ProgramRuntimeService programRuntimeService;
private final ApplicationLifecycleService applicationLifecycleService;
private final ProgramNotificationSubscriberService programNotificationSubscriberService;
private final ProgramStopSubscriberService programStopSubscriberService;
private final AuditLogSubscriberService auditLogSubscriberService;
private final RunRecordCorrectorService runRecordCorrectorService;
private final RunDataTimeToLiveService runDataTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
Expand All @@ -84,7 +86,7 @@ public class AppFabricProcessorService extends AbstractIdleService {
private MetricsCollectionService metricsCollectionService;

/**
* Construct the AppFabricServer with service factory and cConf coming from guice injection.
* Construct the AppFabricProcessorService with service factory and cConf coming from guice injection.
*/
@Inject
public AppFabricProcessorService(CConfiguration cConf,
Expand All @@ -95,6 +97,7 @@ public AppFabricProcessorService(CConfiguration cConf,
ApplicationLifecycleService applicationLifecycleService,
ProgramNotificationSubscriberService programNotificationSubscriberService,
ProgramStopSubscriberService programStopSubscriberService,
AuditLogSubscriberService auditLogSubscriberService,
CoreSchedulerService coreSchedulerService,
CredentialProviderService credentialProviderService,
NamespaceCredentialProviderService namespaceCredentialProviderService,
Expand All @@ -114,6 +117,7 @@ public AppFabricProcessorService(CConfiguration cConf,
this.applicationLifecycleService = applicationLifecycleService;
this.programNotificationSubscriberService = programNotificationSubscriberService;
this.programStopSubscriberService = programStopSubscriberService;
this.auditLogSubscriberService = auditLogSubscriberService;
this.runRecordCorrectorService = runRecordCorrectorService;
this.programRunStatusMonitorService = programRunStatusMonitorService;
this.coreSchedulerService = coreSchedulerService;
Expand All @@ -132,19 +136,25 @@ public AppFabricProcessorService(CConfiguration cConf,
}

/**
* Configures the AppFabricService pre-start.
* Configures the AppFabricProcessorService pre-start.
*/
@Override
protected void startUp() throws Exception {
LoggingContextAccessor.setLoggingContext(
new ServiceLoggingContext(NamespaceId.SYSTEM.getNamespace(),
Constants.Logging.COMPONENT_NAME,
Constants.Service.APP_FABRIC_HTTP));
Service.APP_FABRIC_PROCESSOR));
LOG.info("Starting AppFabric processor service.");
List<ListenableFuture<State>> futuresList = new ArrayList<>();
FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
futuresList.add(namespaceCredentialProviderService.start());
}
// Only for RBAC instances
if (Feature.DATAPLANE_AUDIT_LOGGING.isEnabled(featureFlagsProvider)
&& cConf.getBoolean(Constants.Security.ENABLED)) {
futuresList.add(auditLogSubscriberService.start());
}
futuresList.addAll(ImmutableList.of(
provisioningService.start(),
applicationLifecycleService.start(),
Expand Down Expand Up @@ -176,10 +186,12 @@ protected void startUp() throws Exception {
metricsCollectionService.getContext(Collections.emptyMap())
.gauge(Constants.Metrics.Program.NAMESPACE_COUNT,
namespaceCount);
LOG.info("AppFabric processor service started.");
}

@Override
protected void shutDown() throws Exception {
LOG.info("Stopping AppFabric processor service.");
scheduleNotificationSubscriberService.stopAndWait();
coreSchedulerService.stopAndWait();
bootstrapService.stopAndWait();
Expand All @@ -198,5 +210,7 @@ protected void shutDown() throws Exception {
credentialProviderService.stopAndWait();
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
auditLogSubscriberService.stopAndWait();
LOG.info("AppFabric processor service stopped.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public class AppFabricServer extends AbstractIdleService {
private final ApplicationLifecycleService applicationLifecycleService;
private final Set<String> servicesNames;
private final Set<String> handlerHookNames;
private final RunRecordCorrectorService runRecordCorrectorService;
private final RunDataTimeToLiveService runDataTimeToLiveService;
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
private final RunRecordMonitorService runRecordCounterService;
private final CoreSchedulerService coreSchedulerService;
Expand All @@ -104,7 +102,7 @@ public class AppFabricServer extends AbstractIdleService {
private CommonNettyHttpServiceFactory commonNettyHttpServiceFactory;

/**
* Construct the AppFabricProcessorService with service factory and cConf coming from guice
* Construct the AppFabricServer with service factory and cConf coming from guice
* injection.
*/
@Inject
Expand All @@ -114,7 +112,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
@Named(Constants.AppFabric.HANDLERS_BINDING) Set<HttpHandler> handlers,
@Nullable MetricsCollectionService metricsCollectionService,
ProgramRuntimeService programRuntimeService,
RunRecordCorrectorService runRecordCorrectorService,
ProgramRunStatusMonitorService programRunStatusMonitorService,
ApplicationLifecycleService applicationLifecycleService,
@Named("appfabric.services.names") Set<String> servicesNames,
Expand All @@ -128,7 +125,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
TransactionRunner transactionRunner,
RunRecordMonitorService runRecordCounterService,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
RunDataTimeToLiveService runDataTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService) {
this.hostname = hostname;
Expand All @@ -141,7 +137,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.servicesNames = servicesNames;
this.handlerHookNames = handlerHookNames;
this.applicationLifecycleService = applicationLifecycleService;
this.runRecordCorrectorService = runRecordCorrectorService;
this.programRunStatusMonitorService = programRunStatusMonitorService;
this.sslEnabled = cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED);
this.coreSchedulerService = coreSchedulerService;
Expand All @@ -152,7 +147,6 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.systemAppManagementService = systemAppManagementService;
this.transactionRunner = transactionRunner;
this.runRecordCounterService = runRecordCounterService;
this.runDataTimeToLiveService = runDataTimeToLiveService;
this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory;
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.repositoryCleanupService = repositoryCleanupService;
Expand All @@ -177,12 +171,10 @@ protected void startUp() throws Exception {
applicationLifecycleService.start(),
bootstrapService.start(),
programRuntimeService.start(),
runRecordCorrectorService.start(),
programRunStatusMonitorService.start(),
coreSchedulerService.start(),
credentialProviderService.start(),
runRecordCounterService.start(),
runDataTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start()
));
Expand Down Expand Up @@ -241,11 +233,9 @@ protected void shutDown() throws Exception {
cancelHttpService.cancel();
programRuntimeService.stopAndWait();
applicationLifecycleService.stopAndWait();
runRecordCorrectorService.stopAndWait();
programRunStatusMonitorService.stopAndWait();
provisioningService.stopAndWait();
runRecordCounterService.stopAndWait();
runDataTimeToLiveService.stopAndWait();
sourceControlOperationRunner.stopAndWait();
repositoryCleanupService.stopAndWait();
credentialProviderService.stopAndWait();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright © 2014-2018 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.Service;
import com.google.inject.Injector;
import io.cdap.cdap.internal.AppFabricTestHelper;
import org.junit.Assert;
import org.junit.Test;

public class AppFabricProcessorServiceTest {

@Test
public void startStopService() {
try {
Injector injector = AppFabricTestHelper.getInjector();
AppFabricProcessorService service = injector.getInstance(AppFabricProcessorService.class);
Service.State state = service.startAndWait();
Assert.assertSame(state, Service.State.RUNNING);

state = service.stopAndWait();
Assert.assertSame(state, Service.State.TERMINATED);
} finally {
AppFabricTestHelper.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.data2.datafabric.dataset.service.executor.DatasetOpExecutorService;
import io.cdap.cdap.gateway.handlers.log.MockLogReader;
import io.cdap.cdap.gateway.router.NettyRouter;
import io.cdap.cdap.internal.app.services.AppFabricProcessorService;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.guice.AppFabricTestModule;
import io.cdap.cdap.logging.read.LogReader;
Expand Down Expand Up @@ -106,6 +107,7 @@ public abstract class GatewayTestBase {

private static Injector injector;
private static AppFabricServer appFabricServer;
private static AppFabricProcessorService appFabricProcessorService;
private static NettyRouter router;
private static LogQueryService logQueryService;
private static MetricsQueryService metricsQueryService;
Expand Down Expand Up @@ -194,6 +196,8 @@ protected void configure() {
datasetService.startAndWait();
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
logQueryService = injector.getInstance(LogQueryService.class);
logQueryService.startAndWait();
metricsQueryService = injector.getInstance(MetricsQueryService.class);
Expand All @@ -217,6 +221,7 @@ public static void stopGateway(CConfiguration conf) throws Exception {
namespaceAdmin.delete(new NamespaceId(TEST_NAMESPACE2));
namespaceAdmin.delete(NamespaceId.DEFAULT);
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
metricsCollectionService.stopAndWait();
metricsQueryService.stopAndWait();
logQueryService.stopAndWait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
import org.apache.twill.zookeeper.ZKClientService;

/**
* The main class to run app-fabric and other supporting services.
* The main class to run appfabric processor and other supporting services.
*/
public class AppFabricProcessorServiceMain extends AbstractServiceMain<EnvironmentOptions> {

Expand Down
5 changes: 5 additions & 0 deletions cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import io.cdap.cdap.data2.dataset2.lib.table.leveldb.LevelDBTableService;
import io.cdap.cdap.gateway.handlers.AuthorizationHandler;
import io.cdap.cdap.internal.app.runtime.AppStateStoreProvider;
import io.cdap.cdap.internal.app.services.AppFabricProcessorService;
import io.cdap.cdap.internal.app.services.AppFabricServer;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerService;
import io.cdap.cdap.internal.capability.CapabilityConfig;
Expand Down Expand Up @@ -231,6 +232,7 @@ public class TestBase {
private static FieldLineageAdmin fieldLineageAdmin;
private static LineageAdmin lineageAdmin;
private static AppFabricServer appFabricServer;
private static AppFabricProcessorService appFabricProcessorService;
private static PreferencesService preferencesService;
private static ArtifactLocalizerService artifactLocalizerService;
private static AppStateStoreProvider appStateStoreProvider;
Expand Down Expand Up @@ -424,6 +426,8 @@ protected void configure() {
previewRunnerManager.startAndWait();
appFabricServer = injector.getInstance(AppFabricServer.class);
appFabricServer.startAndWait();
appFabricProcessorService = injector.getInstance(AppFabricProcessorService.class);
appFabricProcessorService.startAndWait();
preferencesService = injector.getInstance(PreferencesService.class);

scheduler = injector.getInstance(Scheduler.class);
Expand Down Expand Up @@ -646,6 +650,7 @@ public static void finish() throws Exception {
((Service) messagingService).stopAndWait();
}
appFabricServer.stopAndWait();
appFabricProcessorService.stopAndWait();
}

protected MetricsManager getMetricsManager() {
Expand Down

0 comments on commit b3e3e17

Please sign in to comment.