From d4c425bc70e9e3fa57709d9e46d6d87b906be342 Mon Sep 17 00:00:00 2001 From: Jeff Trent Date: Wed, 19 Jul 2023 09:45:17 -0400 Subject: [PATCH] LoomServer parallel listener start (#7200) * Addresses issues #7051, #6434, #6498 --- .../helidon/inject/api/InjectionServices.java | 4 +- .../runtime/DefaultInjectionServices.java | 87 ++++++++++---- .../inject/runtime/DefaultServices.java | 6 +- .../inject/tests/inject/tbox/ToolBoxTest.java | 13 +-- .../inject/runtime/LockContentionTest.java | 108 ++++++++++++++++++ .../io/helidon/nima/webserver/LoomServer.java | 20 ++-- 6 files changed, 188 insertions(+), 50 deletions(-) create mode 100644 inject/tests/runtime/src/test/java/io/helidon/inject/runtime/LockContentionTest.java diff --git a/inject/api/src/main/java/io/helidon/inject/api/InjectionServices.java b/inject/api/src/main/java/io/helidon/inject/api/InjectionServices.java index 9678bb041f0..959f2a164d2 100644 --- a/inject/api/src/main/java/io/helidon/inject/api/InjectionServices.java +++ b/inject/api/src/main/java/io/helidon/inject/api/InjectionServices.java @@ -184,7 +184,7 @@ default Services services() { *

* If the service provider does not support shutdown an empty is returned. *

- * The default reference implementation for Injection will return a map of all service types that were deactivated to any + * The default reference implementation will return a map of all service types that were deactivated to any * throwable that was observed during that services shutdown sequence. *

* The order in which services are deactivated is dependent upon whether the {@link #activationLog()} is available. @@ -194,6 +194,8 @@ default Services services() { * the same {@link RunLevel} value then the ordering will be based upon the implementation's comparator. *

* When shutdown returns, it is guaranteed that all services were shutdown, or failed to achieve shutdown. + *

+ * The shutdown timeout from {@link InjectionServicesConfigBlueprint#shutdownTimeout()} will be applied as the default. * * @return a map of all managed service types deactivated to results of deactivation, or empty if shutdown is not supported */ diff --git a/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultInjectionServices.java b/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultInjectionServices.java index 0fe7975d4ef..c584feb6531 100644 --- a/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultInjectionServices.java +++ b/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultInjectionServices.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import io.helidon.common.config.Config; @@ -69,6 +71,7 @@ class DefaultInjectionServices implements InjectionServices, Resettable { static final System.Logger LOGGER = System.getLogger(DefaultInjectionServices.class.getName()); + private final ReentrantReadWriteLock lifecycleLock = new ReentrantReadWriteLock(); private final AtomicBoolean initializingServicesStarted = new AtomicBoolean(false); private final AtomicBoolean initializingServicesFinished = new AtomicBoolean(false); private final AtomicBoolean isBinding = new AtomicBoolean(false); @@ -149,32 +152,33 @@ public Optional> lookups() { @Override public Optional services(boolean initialize) { - if (!initialize) { - return Optional.ofNullable(services.get()); - } + boolean isWriteLock = initialize; + Lock lock = (isWriteLock) ? lifecycleLock.writeLock() : lifecycleLock.readLock(); + lock.lock(); + try { + if (!initialize) { + return Optional.ofNullable(services.get()); + } - if (!initializingServicesStarted.getAndSet(true)) { - try { - initializeServices(); - } catch (Throwable t) { - state.lastError(t); - initializingServicesStarted.set(false); - if (t instanceof InjectionException) { - throw (InjectionException) t; - } else { - throw new InjectionException("Failed to initialize: " + t.getMessage(), t); + init(); + + DefaultServices thisServices = services.get(); + if (thisServices == null) { + lock.unlock(); + lock = lifecycleLock.writeLock(); + lock.lock(); + reset(true); + init(); + thisServices = services.get(); + + if (thisServices == null) { + throw new InjectionException("Unable to reset() after shutdown()"); } - } finally { - state.finished(true); - initializingServicesFinished.set(true); } + return Optional.of(thisServices); + } finally { + lock.unlock(); } - - DefaultServices thisServices = services.get(); - if (thisServices == null) { - throw new InjectionException("Must reset() after shutdown()"); - } - return Optional.of(thisServices); } @Override @@ -183,9 +187,9 @@ public Optional> shutdown() { DefaultServices current = services.get(); if (services.compareAndSet(current, null) && current != null) { State currentState = state.clone().currentPhase(Phase.PRE_DESTROYING); - log("started shutdown"); + log("Started shutdown"); result = doShutdown(current, currentState); - log("finished shutdown"); + log("Finished shutdown"); } return Optional.ofNullable(result); } @@ -193,6 +197,8 @@ public Optional> shutdown() { @Override // note that this is typically only called during testing, and also in the injection maven-plugin public boolean reset(boolean deep) { + Lock lock = lifecycleLock.writeLock(); + lock.lock(); try { assertNotInitializing(); if (isInitializing() || isInitialized()) { @@ -228,6 +234,8 @@ public boolean reset(boolean deep) { throw new InjectionException("Failed to reset (state=" + state + ", isInitialized=" + isInitialized() + ", isInitializing=" + isInitializing() + ")", e); + } finally { + lock.unlock(); } } @@ -291,6 +299,25 @@ private void assertNotInitializing() { } } + private void init() { + if (!initializingServicesStarted.getAndSet(true)) { + try { + initializeServices(); + } catch (Throwable t) { + state.lastError(t); + initializingServicesStarted.set(false); + if (t instanceof InjectionException) { + throw (InjectionException) t; + } else { + throw new InjectionException("Failed to initialize: " + t.getMessage(), t); + } + } finally { + state.finished(true); + initializingServicesFinished.set(true); + } + } + } + private void initializeServices() { initializationCallingContext = CallingContextFactory.create(false).orElse(null); @@ -460,6 +487,16 @@ private class Shutdown implements Callable> { @Override public Map call() { + Lock lock = lifecycleLock.writeLock(); + lock.lock(); + try { + return doShutdown(); + } finally { + lock.unlock(); + } + } + + private Map doShutdown() { state.currentPhase(Phase.DESTROYED); ActivationLogQuery query = log.toQuery().orElse(null); @@ -497,7 +534,7 @@ public Map call() { doFinalShutdown(serviceProviders); // finally, clear everything - reset(false); + reset(true); return map; } diff --git a/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultServices.java b/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultServices.java index 6266a88fb72..944f15b1ac5 100644 --- a/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultServices.java +++ b/inject/runtime/src/main/java/io/helidon/inject/runtime/DefaultServices.java @@ -437,15 +437,15 @@ void bind(InjectionServices injectionServices, DefaultInjectionPlanBinder binder, Application app) { String appName = app.named().orElse(app.getClass().getName()); - boolean isLoggable = DefaultInjectionServices.LOGGER.isLoggable(System.Logger.Level.INFO); + boolean isLoggable = DefaultInjectionServices.LOGGER.isLoggable(System.Logger.Level.DEBUG); if (isLoggable) { - DefaultInjectionServices.LOGGER.log(System.Logger.Level.INFO, "starting binding application: " + appName); + DefaultInjectionServices.LOGGER.log(System.Logger.Level.DEBUG, "Starting binding application: " + appName); } try { app.configure(binder); bind(createServiceProvider(app, injectionServices)); if (isLoggable) { - DefaultInjectionServices.LOGGER.log(System.Logger.Level.INFO, "finished binding application: " + appName); + DefaultInjectionServices.LOGGER.log(System.Logger.Level.DEBUG, "Finished binding application: " + appName); } } catch (Exception e) { throw new InjectionException("Failed to process: " + app, e); diff --git a/inject/tests/resources-inject/src/test/java/io/helidon/inject/tests/inject/tbox/ToolBoxTest.java b/inject/tests/resources-inject/src/test/java/io/helidon/inject/tests/inject/tbox/ToolBoxTest.java index 200ebe32eea..a24b8a7b73b 100644 --- a/inject/tests/resources-inject/src/test/java/io/helidon/inject/tests/inject/tbox/ToolBoxTest.java +++ b/inject/tests/resources-inject/src/test/java/io/helidon/inject/tests/inject/tbox/ToolBoxTest.java @@ -25,7 +25,6 @@ import io.helidon.common.types.TypeName; import io.helidon.config.Config; import io.helidon.inject.api.ActivationResult; -import io.helidon.inject.api.InjectionException; import io.helidon.inject.api.InjectionServices; import io.helidon.inject.api.ModuleComponent; import io.helidon.inject.api.Qualifier; @@ -34,14 +33,14 @@ import io.helidon.inject.api.ServiceProvider; import io.helidon.inject.api.Services; import io.helidon.inject.testing.InjectionTestingSupport; -import io.helidon.inject.tests.inject.stacking.CommonContract; -import io.helidon.inject.tests.inject.tbox.impl.BigHammer; -import io.helidon.inject.tests.inject.tbox.impl.HandSaw; import io.helidon.inject.tests.inject.ASerialProviderImpl; import io.helidon.inject.tests.inject.ClassNamedY; import io.helidon.inject.tests.inject.TestingSingleton; import io.helidon.inject.tests.inject.provider.FakeConfig; import io.helidon.inject.tests.inject.provider.FakeServer; +import io.helidon.inject.tests.inject.stacking.CommonContract; +import io.helidon.inject.tests.inject.tbox.impl.BigHammer; +import io.helidon.inject.tests.inject.tbox.impl.HandSaw; import io.helidon.inject.tests.inject.tbox.impl.MainToolBox; import io.helidon.inject.tools.Options; @@ -50,7 +49,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static io.helidon.common.types.TypeName.*; +import static io.helidon.common.types.TypeName.create; import static io.helidon.inject.testing.InjectionTestingSupport.resetAll; import static io.helidon.inject.testing.InjectionTestingSupport.testableServices; import static io.helidon.inject.tests.inject.TestUtils.loadStringFromFile; @@ -64,7 +63,6 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; /** * Expectation here is that the annotation processor ran, and we can use standard injection and di registry services, etc. @@ -314,9 +312,6 @@ void startupAndShutdownCallsPostConstructAndPreDestroy() { assertThat(injectionServices.metrics().orElseThrow().lookupCount().orElse(0), equalTo(0)); - InjectionException e = assertThrows(InjectionException.class, () -> injectionServices.services()); - assertThat(e.getMessage(), equalTo("Must reset() after shutdown()")); - tearDown(); setUp(); TestingSingleton testingSingletonFromLookup2 = injectionServices.services().lookup(TestingSingleton.class).get(); diff --git a/inject/tests/runtime/src/test/java/io/helidon/inject/runtime/LockContentionTest.java b/inject/tests/runtime/src/test/java/io/helidon/inject/runtime/LockContentionTest.java new file mode 100644 index 00000000000..c824ed8a8cd --- /dev/null +++ b/inject/tests/runtime/src/test/java/io/helidon/inject/runtime/LockContentionTest.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2023 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.helidon.inject.runtime; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import io.helidon.common.types.TypeName; +import io.helidon.config.Config; +import io.helidon.config.ConfigSources; +import io.helidon.inject.api.ActivationResult; +import io.helidon.inject.api.Bootstrap; +import io.helidon.inject.api.InjectionServices; +import io.helidon.inject.api.Services; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +class LockContentionTest { + final int COUNT = 100; + + final ExecutorService es = Executors.newFixedThreadPool(16); + final Config config = Config.builder( + ConfigSources.create( + Map.of("inject.permits-dynamic", "true"), "config-1")) + .disableEnvironmentVariablesSource() + .disableSystemPropertiesSource() + .build(); + + @BeforeEach + void init() { + InjectionServices.globalBootstrap(Bootstrap.builder().config(config).build()); + } + + @AfterEach + void tearDown() { + SimpleInjectionTestingSupport.resetAll(); + } + + @Test + // we cannot interlace shutdown with startups here - so instead we are checking to ensure N threads can call startup + // and then N threads can call shutdown in parallel, but in phases + void lockContention() { + Map> result = new ConcurrentHashMap<>(); + for (int i = 1; i <= COUNT; i++) { + result.put("start run:" + i, es.submit(this::start)); + } + + verify(result); + result.clear(); + + for (int i = 1; i <= COUNT; i++) { + result.put("shutdown run:" + i, es.submit(this::shutdown)); + } + + verify(result); + } + + void verify(Map> result) { + result.forEach((k, v) -> { + try { + assertThat(k, v.get(), notNullValue()); + } catch (Exception e) { + fail("failed on " + k, e); + } + }); + } + + Services start() { + return Objects.requireNonNull(InjectionServices.realizedServices()); + } + + Map shutdown() { + InjectionServices injectionServices = InjectionServices.injectionServices().orElseThrow(); + Map result = new LinkedHashMap<>(); + Map round; + do { + round = injectionServices.shutdown().orElseThrow(); + result.putAll(round); + } while (!round.isEmpty()); + return result; + } + +} diff --git a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/LoomServer.java b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/LoomServer.java index e2e632c056c..0c32c64a5ed 100644 --- a/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/LoomServer.java +++ b/nima/webserver/webserver/src/main/java/io/helidon/nima/webserver/LoomServer.java @@ -276,19 +276,15 @@ private void stopIt() { private void startIt() { long now = System.currentTimeMillis(); - // TODO parallel start breaks pico - issue - // boolean result = parallel("start", ServerListener::start); - for (ServerListener listener : listeners.values()) { - listener.start(); + boolean result = parallel("start", ServerListener::start); + if (!result) { + LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down"); + parallel("stop", ServerListener::stop); + if (startFutures != null) { + startFutures.forEach(future -> future.future().cancel(true)); + } + return; } - // if (!result) { - // LOGGER.log(System.Logger.Level.ERROR, "Níma server failed to start, shutting down"); - // parallel("stop", ServerListener::stop); - // if (startFutures != null) { - // startFutures.forEach(future -> future.future().cancel(true)); - // } - // return; - // } if (registerShutdownHook) { registerShutdownHook(); }