Skip to content

Commit

Permalink
LoomServer parallel listener start (#7200)
Browse files Browse the repository at this point in the history
* Addresses issues #7051, #6434, #6498
  • Loading branch information
trentjeff authored Jul 19, 2023
1 parent 5b99f69 commit d4c425b
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ default Services services() {
* <p>
* If the service provider does not support shutdown an empty is returned.
* <p>
* The default reference implementation for Injection will return a map of all service types that were deactivated to any
* The default reference implementation will return a map of all service types that were deactivated to any
* throwable that was observed during that services shutdown sequence.
* <p>
* The order in which services are deactivated is dependent upon whether the {@link #activationLog()} is available.
Expand All @@ -194,6 +194,8 @@ default Services services() {
* the same {@link RunLevel} value then the ordering will be based upon the implementation's comparator.
* <p>
* When shutdown returns, it is guaranteed that all services were shutdown, or failed to achieve shutdown.
* <p>
* The shutdown timeout from {@link InjectionServicesConfigBlueprint#shutdownTimeout()} will be applied as the default.
*
* @return a map of all managed service types deactivated to results of deactivation, or empty if shutdown is not supported
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import io.helidon.common.config.Config;
Expand Down Expand Up @@ -69,6 +71,7 @@
class DefaultInjectionServices implements InjectionServices, Resettable {
static final System.Logger LOGGER = System.getLogger(DefaultInjectionServices.class.getName());

private final ReentrantReadWriteLock lifecycleLock = new ReentrantReadWriteLock();
private final AtomicBoolean initializingServicesStarted = new AtomicBoolean(false);
private final AtomicBoolean initializingServicesFinished = new AtomicBoolean(false);
private final AtomicBoolean isBinding = new AtomicBoolean(false);
Expand Down Expand Up @@ -149,32 +152,33 @@ public Optional<Set<ServiceInfoCriteria>> lookups() {

@Override
public Optional<DefaultServices> services(boolean initialize) {
if (!initialize) {
return Optional.ofNullable(services.get());
}
boolean isWriteLock = initialize;
Lock lock = (isWriteLock) ? lifecycleLock.writeLock() : lifecycleLock.readLock();
lock.lock();
try {
if (!initialize) {
return Optional.ofNullable(services.get());
}

if (!initializingServicesStarted.getAndSet(true)) {
try {
initializeServices();
} catch (Throwable t) {
state.lastError(t);
initializingServicesStarted.set(false);
if (t instanceof InjectionException) {
throw (InjectionException) t;
} else {
throw new InjectionException("Failed to initialize: " + t.getMessage(), t);
init();

DefaultServices thisServices = services.get();
if (thisServices == null) {
lock.unlock();
lock = lifecycleLock.writeLock();
lock.lock();
reset(true);
init();
thisServices = services.get();

if (thisServices == null) {
throw new InjectionException("Unable to reset() after shutdown()");
}
} finally {
state.finished(true);
initializingServicesFinished.set(true);
}
return Optional.of(thisServices);
} finally {
lock.unlock();
}

DefaultServices thisServices = services.get();
if (thisServices == null) {
throw new InjectionException("Must reset() after shutdown()");
}
return Optional.of(thisServices);
}

@Override
Expand All @@ -183,16 +187,18 @@ public Optional<Map<TypeName, ActivationResult>> shutdown() {
DefaultServices current = services.get();
if (services.compareAndSet(current, null) && current != null) {
State currentState = state.clone().currentPhase(Phase.PRE_DESTROYING);
log("started shutdown");
log("Started shutdown");
result = doShutdown(current, currentState);
log("finished shutdown");
log("Finished shutdown");
}
return Optional.ofNullable(result);
}

@Override
// note that this is typically only called during testing, and also in the injection maven-plugin
public boolean reset(boolean deep) {
Lock lock = lifecycleLock.writeLock();
lock.lock();
try {
assertNotInitializing();
if (isInitializing() || isInitialized()) {
Expand Down Expand Up @@ -228,6 +234,8 @@ public boolean reset(boolean deep) {
throw new InjectionException("Failed to reset (state=" + state
+ ", isInitialized=" + isInitialized()
+ ", isInitializing=" + isInitializing() + ")", e);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -291,6 +299,25 @@ private void assertNotInitializing() {
}
}

private void init() {
if (!initializingServicesStarted.getAndSet(true)) {
try {
initializeServices();
} catch (Throwable t) {
state.lastError(t);
initializingServicesStarted.set(false);
if (t instanceof InjectionException) {
throw (InjectionException) t;
} else {
throw new InjectionException("Failed to initialize: " + t.getMessage(), t);
}
} finally {
state.finished(true);
initializingServicesFinished.set(true);
}
}
}

private void initializeServices() {
initializationCallingContext = CallingContextFactory.create(false).orElse(null);

Expand Down Expand Up @@ -460,6 +487,16 @@ private class Shutdown implements Callable<Map<TypeName, ActivationResult>> {

@Override
public Map<TypeName, ActivationResult> call() {
Lock lock = lifecycleLock.writeLock();
lock.lock();
try {
return doShutdown();
} finally {
lock.unlock();
}
}

private Map<TypeName, ActivationResult> doShutdown() {
state.currentPhase(Phase.DESTROYED);

ActivationLogQuery query = log.toQuery().orElse(null);
Expand Down Expand Up @@ -497,7 +534,7 @@ public Map<TypeName, ActivationResult> call() {
doFinalShutdown(serviceProviders);

// finally, clear everything
reset(false);
reset(true);

return map;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,15 @@ void bind(InjectionServices injectionServices,
DefaultInjectionPlanBinder binder,
Application app) {
String appName = app.named().orElse(app.getClass().getName());
boolean isLoggable = DefaultInjectionServices.LOGGER.isLoggable(System.Logger.Level.INFO);
boolean isLoggable = DefaultInjectionServices.LOGGER.isLoggable(System.Logger.Level.DEBUG);
if (isLoggable) {
DefaultInjectionServices.LOGGER.log(System.Logger.Level.INFO, "starting binding application: " + appName);
DefaultInjectionServices.LOGGER.log(System.Logger.Level.DEBUG, "Starting binding application: " + appName);
}
try {
app.configure(binder);
bind(createServiceProvider(app, injectionServices));
if (isLoggable) {
DefaultInjectionServices.LOGGER.log(System.Logger.Level.INFO, "finished binding application: " + appName);
DefaultInjectionServices.LOGGER.log(System.Logger.Level.DEBUG, "Finished binding application: " + appName);
}
} catch (Exception e) {
throw new InjectionException("Failed to process: " + app, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.helidon.common.types.TypeName;
import io.helidon.config.Config;
import io.helidon.inject.api.ActivationResult;
import io.helidon.inject.api.InjectionException;
import io.helidon.inject.api.InjectionServices;
import io.helidon.inject.api.ModuleComponent;
import io.helidon.inject.api.Qualifier;
Expand All @@ -34,14 +33,14 @@
import io.helidon.inject.api.ServiceProvider;
import io.helidon.inject.api.Services;
import io.helidon.inject.testing.InjectionTestingSupport;
import io.helidon.inject.tests.inject.stacking.CommonContract;
import io.helidon.inject.tests.inject.tbox.impl.BigHammer;
import io.helidon.inject.tests.inject.tbox.impl.HandSaw;
import io.helidon.inject.tests.inject.ASerialProviderImpl;
import io.helidon.inject.tests.inject.ClassNamedY;
import io.helidon.inject.tests.inject.TestingSingleton;
import io.helidon.inject.tests.inject.provider.FakeConfig;
import io.helidon.inject.tests.inject.provider.FakeServer;
import io.helidon.inject.tests.inject.stacking.CommonContract;
import io.helidon.inject.tests.inject.tbox.impl.BigHammer;
import io.helidon.inject.tests.inject.tbox.impl.HandSaw;
import io.helidon.inject.tests.inject.tbox.impl.MainToolBox;
import io.helidon.inject.tools.Options;

Expand All @@ -50,7 +49,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static io.helidon.common.types.TypeName.*;
import static io.helidon.common.types.TypeName.create;
import static io.helidon.inject.testing.InjectionTestingSupport.resetAll;
import static io.helidon.inject.testing.InjectionTestingSupport.testableServices;
import static io.helidon.inject.tests.inject.TestUtils.loadStringFromFile;
Expand All @@ -64,7 +63,6 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* Expectation here is that the annotation processor ran, and we can use standard injection and di registry services, etc.
Expand Down Expand Up @@ -314,9 +312,6 @@ void startupAndShutdownCallsPostConstructAndPreDestroy() {

assertThat(injectionServices.metrics().orElseThrow().lookupCount().orElse(0), equalTo(0));

InjectionException e = assertThrows(InjectionException.class, () -> injectionServices.services());
assertThat(e.getMessage(), equalTo("Must reset() after shutdown()"));

tearDown();
setUp();
TestingSingleton testingSingletonFromLookup2 = injectionServices.services().lookup(TestingSingleton.class).get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.inject.runtime;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import io.helidon.common.types.TypeName;
import io.helidon.config.Config;
import io.helidon.config.ConfigSources;
import io.helidon.inject.api.ActivationResult;
import io.helidon.inject.api.Bootstrap;
import io.helidon.inject.api.InjectionServices;
import io.helidon.inject.api.Services;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

class LockContentionTest {
final int COUNT = 100;

final ExecutorService es = Executors.newFixedThreadPool(16);
final Config config = Config.builder(
ConfigSources.create(
Map.of("inject.permits-dynamic", "true"), "config-1"))
.disableEnvironmentVariablesSource()
.disableSystemPropertiesSource()
.build();

@BeforeEach
void init() {
InjectionServices.globalBootstrap(Bootstrap.builder().config(config).build());
}

@AfterEach
void tearDown() {
SimpleInjectionTestingSupport.resetAll();
}

@Test
// we cannot interlace shutdown with startups here - so instead we are checking to ensure N threads can call startup
// and then N threads can call shutdown in parallel, but in phases
void lockContention() {
Map<String, Future<?>> result = new ConcurrentHashMap<>();
for (int i = 1; i <= COUNT; i++) {
result.put("start run:" + i, es.submit(this::start));
}

verify(result);
result.clear();

for (int i = 1; i <= COUNT; i++) {
result.put("shutdown run:" + i, es.submit(this::shutdown));
}

verify(result);
}

void verify(Map<String, Future<?>> result) {
result.forEach((k, v) -> {
try {
assertThat(k, v.get(), notNullValue());
} catch (Exception e) {
fail("failed on " + k, e);
}
});
}

Services start() {
return Objects.requireNonNull(InjectionServices.realizedServices());
}

Map<TypeName, ActivationResult> shutdown() {
InjectionServices injectionServices = InjectionServices.injectionServices().orElseThrow();
Map<TypeName, ActivationResult> result = new LinkedHashMap<>();
Map<TypeName, ActivationResult> round;
do {
round = injectionServices.shutdown().orElseThrow();
result.putAll(round);
} while (!round.isEmpty());
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -276,19 +276,15 @@ private void stopIt() {

private void startIt() {
long now = System.currentTimeMillis();
// TODO parallel start breaks pico - issue
// boolean result = parallel("start", ServerListener::start);
for (ServerListener listener : listeners.values()) {
listener.start();
boolean result = parallel("start", ServerListener::start);
if (!result) {
LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down");
parallel("stop", ServerListener::stop);
if (startFutures != null) {
startFutures.forEach(future -> future.future().cancel(true));
}
return;
}
// if (!result) {
// LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down");
// parallel("stop", ServerListener::stop);
// if (startFutures != null) {
// startFutures.forEach(future -> future.future().cancel(true));
// }
// return;
// }
if (registerShutdownHook) {
registerShutdownHook();
}
Expand Down

0 comments on commit d4c425b

Please sign in to comment.