Skip to content

Commit

Permalink
[KOGITO-9614] Async api and reload
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Aug 8, 2023
1 parent 8699087 commit 7dc865c
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

public abstract class ValidationDecorator {

protected final Map<String, Exception> exceptions;
protected final Map<String, Throwable> exceptions;

protected ValidationDecorator(Map<String, Exception> exceptions) {
protected ValidationDecorator(Map<String, Throwable> exceptions) {
this.exceptions = exceptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ValidationLogDecorator extends ValidationDecorator {

private static final Logger LOGGER = LoggerFactory.getLogger(ValidationLogDecorator.class);

public ValidationLogDecorator(Map<String, Exception> exceptions) {
public ValidationLogDecorator(Map<String, Throwable> exceptions) {
super(exceptions);
}

Expand Down
2 changes: 1 addition & 1 deletion kogito-build/kogito-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<version.net.thisptr.jackson-jq>1.0.0-preview.20220705</version.net.thisptr.jackson-jq>
<version.io.quarkiverse.jackson-jq>1.1.0</version.io.quarkiverse.jackson-jq>
<version.io.quarkiverse.openapi.generator>1.3.8</version.io.quarkiverse.openapi.generator>
<version.io.quarkiverse.asyncapi>0.0.3</version.io.quarkiverse.asyncapi>
<version.io.quarkiverse.asyncapi>0.0.6</version.io.quarkiverse.asyncapi>
<version.io.quarkiverse.reactivemessaging.http>1.1.5</version.io.quarkiverse.reactivemessaging.http>
<version.io.quarkiverse.embedded.postgresql>0.0.8</version.io.quarkiverse.embedded.postgresql>
<version.com.github.haifengl.smile>1.5.2</version.com.github.haifengl.smile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class ProcessCodegen extends AbstractGenerator {

public static ProcessCodegen ofCollectedResources(KogitoBuildContext context, Collection<CollectedResource> resources) {
Map<String, byte[]> processSVGMap = new HashMap<>();
Map<String, Exception> processesErrors = new HashMap<>();
Map<String, Throwable> processesErrors = new HashMap<>();
boolean useSvgAddon = context.getAddonsConfig().useProcessSVG();
final List<GeneratedInfo<KogitoWorkflowProcess>> processes = resources.stream()
.map(CollectedResource::resource)
Expand All @@ -123,13 +123,15 @@ public static ProcessCodegen ofCollectedResources(KogitoBuildContext context, Co
GeneratedInfo<KogitoWorkflowProcess> generatedInfo = parseWorkflowFile(resource, WorkflowFormat.fromFileName(resource.getSourcePath()), context);
notifySourceFileCodegenBindListeners(context, resource, Collections.singletonList(generatedInfo.info()));
return Stream.of(addResource(generatedInfo, resource));
} else {
return Stream.empty();
}
} catch (ValidationException | ProcessParsingException e) {
} catch (ValidationException e) {
processesErrors.put(resource.getSourcePath(), e);
return Stream.empty();

} catch (ProcessParsingException e) {
processesErrors.put(resource.getSourcePath(), e.getCause());

}
return Stream.empty();
})
//Validate parsed processes
.map(processInfo -> validate(processInfo, processesErrors))
Expand All @@ -154,7 +156,7 @@ private static void notifySourceFileCodegenBindListeners(KogitoBuildContext cont
.ifPresent(notifier -> processes.forEach(p -> notifier.notify(new SourceFileCodegenBindEvent(p.getId(), resource.getSourcePath()))));
}

private static void handleValidation(KogitoBuildContext context, Map<String, Exception> processesErrors) {
private static void handleValidation(KogitoBuildContext context, Map<String, Throwable> processesErrors) {
if (!processesErrors.isEmpty()) {
ValidationLogDecorator decorator = new ValidationLogDecorator(processesErrors);
decorator.decorate();
Expand All @@ -165,7 +167,7 @@ private static void handleValidation(KogitoBuildContext context, Map<String, Exc
}
}

private static GeneratedInfo<KogitoWorkflowProcess> validate(GeneratedInfo<KogitoWorkflowProcess> processInfo, Map<String, Exception> processesErrors) {
private static GeneratedInfo<KogitoWorkflowProcess> validate(GeneratedInfo<KogitoWorkflowProcess> processInfo, Map<String, Throwable> processesErrors) {
Process process = processInfo.info();
try {
ProcessValidatorRegistry.getInstance().getValidator(process, process.getResource()).validate(process);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ private ChannelMappingStrategy() {
private static final String OVERFLOW_STRATEGY_PROP = "overflow-strategy";
private static final String BUFFER_SIZE_PROP = "buffer-size";

private static Config config = ConfigProvider.getConfig();

public static Collection<ChannelInfo> getChannelMapping() {
Config config = ConfigProvider.getConfig();
Map<String, Collection<String>> inTriggers = new HashMap<>();
Map<String, Collection<String>> outTriggers = new HashMap<>();

for (String property : config.getPropertyNames()) {
if (property.startsWith(INCOMING_TRIGGER)) {
addTrigger(INCOMING_TRIGGER, property, inTriggers);
addTrigger(config, INCOMING_TRIGGER, property, inTriggers);
} else if (property.startsWith(OUTGOING_TRIGGER)) {
addTrigger(OUTGOING_TRIGGER, property, outTriggers);
addTrigger(config, OUTGOING_TRIGGER, property, outTriggers);
}
}

Expand All @@ -69,15 +68,15 @@ public static Collection<ChannelInfo> getChannelMapping() {
final String defaultOutgoingChannel = config.getOptionalValue(OUTGOING_DEFAULT_CHANNEL, String.class).orElse(KogitoEventStreams.OUTGOING);
for (String property : config.getPropertyNames()) {
if (property.startsWith(INCOMING_PREFIX) && property.endsWith(".connector")) {
result.add(getChannelInfo(property, INCOMING_PREFIX, true, defaultIncomingChannel, inTriggers));
result.add(getChannelInfo(config, property, INCOMING_PREFIX, true, defaultIncomingChannel, inTriggers));
} else if (property.startsWith(OUTGOING_PREFIX) && property.endsWith(".connector")) {
result.add(getChannelInfo(property, OUTGOING_PREFIX, false, defaultOutgoingChannel, outTriggers));
result.add(getChannelInfo(config, property, OUTGOING_PREFIX, false, defaultOutgoingChannel, outTriggers));
}
}
return result;
}

private static void addTrigger(String prefix, String property, Map<String, Collection<String>> triggers) {
private static void addTrigger(Config config, String prefix, String property, Map<String, Collection<String>> triggers) {
String channelName = config.getValue(property, String.class);
String triggerName = property.substring(prefix.length());
triggers.computeIfAbsent(channelName, ChannelMappingStrategy::initTriggers).add(triggerName);
Expand All @@ -89,15 +88,15 @@ private static Collection<String> initTriggers(String channelName) {
return result;
}

private static ChannelInfo getChannelInfo(String property, String prefix, boolean isInput, String defaultChannelName, Map<String, Collection<String>> triggers) {
private static ChannelInfo getChannelInfo(Config config, String property, String prefix, boolean isInput, String defaultChannelName, Map<String, Collection<String>> triggers) {
String name = property.substring(prefix.length(), property.lastIndexOf('.'));
return new ChannelInfo(name, triggers.getOrDefault(name, Collections.singleton(name)),
getClassName(config.getOptionalValue(getPropertyName(prefix, name, "value." + (isInput ? "deserializer" : "serializer")), String.class)), isInput,
name.equals(defaultChannelName), config.getOptionalValue((isInput ? UNMARSHALLLER_PREFIX : MARSHALLER_PREFIX) + name, String.class),
isInput ? Optional.empty() : onOverflowInfo(name));
isInput ? Optional.empty() : onOverflowInfo(config, name));
}

private static Optional<OnOverflowInfo> onOverflowInfo(String name) {
private static Optional<OnOverflowInfo> onOverflowInfo(Config config, String name) {
final String namePrefix = KOGITO_EMITTER_PREFIX + name + ".";
Optional<Strategy> strategy = config.getOptionalValue(namePrefix + OVERFLOW_STRATEGY_PROP, String.class).map(Strategy::valueOf);
Optional<Long> bufferSize = config.getOptionalValue(namePrefix + BUFFER_SIZE_PROP, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@
*/
package org.kie.kogito.quarkus.common.deployment;

import java.util.Optional;

import org.jboss.jandex.IndexView;

import io.quarkus.builder.item.SimpleBuildItem;

public final class LiveReloadExecutionBuildItem extends SimpleBuildItem {

private final IndexView indexView;
private final ClassLoader classLoader;

public LiveReloadExecutionBuildItem(IndexView indexView) {
this(indexView, null);
}

public LiveReloadExecutionBuildItem(IndexView indexView, ClassLoader classLoader) {
this.indexView = indexView;
this.classLoader = classLoader;
}

public IndexView getIndexView() {
return indexView;
}

public Optional<ClassLoader> getClassLoader() {
return Optional.ofNullable(classLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/
package org.kie.kogito.quarkus.serverless.workflow.asyncapi;

import java.util.List;
import java.util.stream.Collectors;
import java.util.ServiceLoader;

import org.kie.kogito.quarkus.common.deployment.KogitoBuildContextAttributeBuildItem;
import org.kie.kogito.quarkus.common.deployment.KogitoAddonsPreGeneratedSourcesBuildItem;
import org.kie.kogito.quarkus.common.deployment.KogitoBuildContextBuildItem;
import org.kie.kogito.quarkus.common.deployment.LiveReloadExecutionBuildItem;
import org.kie.kogito.serverless.workflow.parser.ParserContext;

import io.quarkiverse.asyncapi.config.AsyncAPISupplier;
import io.quarkiverse.asyncapi.config.MapAsyncAPIRegistry;
import io.quarkiverse.asyncapi.generator.AsyncAPIBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;

public class AsyncAPIProcessor {

@BuildStep
KogitoBuildContextAttributeBuildItem asyncAPIContext(List<AsyncAPIBuildItem> asyncAPIBuildItem) {
return new KogitoBuildContextAttributeBuildItem(ParserContext.ASYNC_CONVERTER_KEY, new AsyncAPIInfoConverter(
new MapAsyncAPIRegistry(asyncAPIBuildItem.stream().map(AsyncAPIBuildItem::getAsyncAPI).collect(Collectors.toList()))));
void asyncAPIContext(LiveReloadExecutionBuildItem reload, KogitoBuildContextBuildItem context, BuildProducer<KogitoAddonsPreGeneratedSourcesBuildItem> sources) throws ClassNotFoundException {
context.getKogitoBuildContext().addContextAttribute(ParserContext.ASYNC_CONVERTER_KEY, new AsyncAPIInfoConverter(
new MapAsyncAPIRegistry(ServiceLoader.load(AsyncAPISupplier.class, reload.getClassLoader().orElse(Thread.currentThread().getContextClassLoader())))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.inject.Inject;
Expand All @@ -40,19 +41,28 @@
import org.kie.kogito.quarkus.common.deployment.KogitoBuildContextBuildItem;
import org.kie.kogito.quarkus.common.deployment.KogitoQuarkusResourceUtils;
import org.kie.kogito.quarkus.common.deployment.LiveReloadExecutionBuildItem;
import org.kie.kogito.quarkus.serverless.workflow.config.LiveReloadConfigBuilder;

import io.quarkus.arc.deployment.GeneratedBeanBuildItem;
import io.quarkus.bootstrap.classloading.MemoryClassPathElement;
import io.quarkus.bootstrap.classloading.PathTreeClassPathElement;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.bootstrap.model.ApplicationModel;
import io.quarkus.bootstrap.prebuild.CodeGenException;
import io.quarkus.deployment.CodeGenContext;
import io.quarkus.deployment.IsDevelopment;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.GeneratedResourceBuildItem;
import io.quarkus.deployment.builditem.LiveReloadBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigBuilderBuildItem;
import io.quarkus.deployment.index.IndexingUtil;
import io.quarkus.deployment.pkg.builditem.CurateOutcomeBuildItem;
import io.quarkus.deployment.pkg.builditem.OutputTargetBuildItem;
import io.quarkus.paths.PathTree;
import io.quarkus.runtime.configuration.ConfigUtils;
import io.quarkus.runtime.configuration.QuarkusConfigFactory;

/**
* This class adds live reload support for {@link io.quarkus.deployment.CodeGenProvider} objects.
Expand All @@ -71,6 +81,8 @@ public class LiveReloadProcessor {

private final KogitoBuildContext kogitoBuildContext;

private final QuarkusClassLoader.Builder classLoader;

@Inject
public LiveReloadProcessor(
CombinedIndexBuildItem combinedIndexBuildItem,
Expand All @@ -84,10 +96,12 @@ public LiveReloadProcessor(
computingIndex = combinedIndexBuildItem.getComputingIndex();
index = combinedIndexBuildItem.getIndex();
kogitoBuildContext = contextBuildItem.getKogitoBuildContext();
classLoader = QuarkusClassLoader.builder("liveReload", kogitoBuildContext.getClassLoader(), false);
}

@BuildStep(onlyIf = IsDevelopment.class)
public LiveReloadExecutionBuildItem liveReload(BuildProducer<KogitoAddonsPreGeneratedSourcesBuildItem> sourcesProducer) {
public LiveReloadExecutionBuildItem liveReload(BuildProducer<KogitoAddonsPreGeneratedSourcesBuildItem> sourcesProducer, BuildProducer<GeneratedResourceBuildItem> genResBI,
BuildProducer<RunTimeConfigBuilderBuildItem> configBuilder) {
Collection<GeneratedFile> generatedFiles = new ArrayList<>();
List<IndexView> indexViews = new ArrayList<>();
if (liveReloadBuildItem.isLiveReload()) {
Expand All @@ -103,10 +117,13 @@ public LiveReloadExecutionBuildItem liveReload(BuildProducer<KogitoAddonsPreGene
});
}
}
configBuilder.produce(new RunTimeConfigBuilderBuildItem(LiveReloadConfigBuilder.class.getCanonicalName()));
if (!generatedFiles.isEmpty()) {
sourcesProducer.produce(new KogitoAddonsPreGeneratedSourcesBuildItem(generatedFiles));
skipNextLiveReload();
return new LiveReloadExecutionBuildItem(KogitoQuarkusResourceUtils.generateAggregatedIndexNew(computingIndex, indexViews));
ClassLoader reloadedClassLoader = classLoader.build();
QuarkusConfigFactory.setConfig(ConfigUtils.emptyConfigBuilder().addDefaultSources().addDiscoveredSources().forClassLoader(reloadedClassLoader).build());
return new LiveReloadExecutionBuildItem(KogitoQuarkusResourceUtils.generateAggregatedIndexNew(computingIndex, indexViews), reloadedClassLoader);
} else {
dontSkipNextLiveReload();
return new LiveReloadExecutionBuildItem(computingIndex);
Expand All @@ -116,7 +133,12 @@ public LiveReloadExecutionBuildItem liveReload(BuildProducer<KogitoAddonsPreGene
private CodeGenerationResult generateCode(LiveReloadableCodeGenProvider codeGenProvider) {
try {
Collection<GeneratedFile> generatedFiles = new ArrayList<>(generateSources(codeGenProvider));
return !generatedFiles.isEmpty() ? new CodeGenerationResult(generatedFiles, indexCompiledSources(compileGeneratedSources(generatedFiles)))
Collection<GeneratedBeanBuildItem> generatedBeans = compileGeneratedSources(generatedFiles);
if (!generatedBeans.isEmpty()) {
classLoader.addElement(new MemoryClassPathElement(
generatedBeans.stream().collect(Collectors.toMap(x -> x.getName().replace('.', '/').concat(".class"), GeneratedBeanBuildItem::getData)), true));
}
return !generatedFiles.isEmpty() ? new CodeGenerationResult(generatedFiles, indexCompiledSources(generatedBeans))
: new CodeGenerationResult(List.of(), computingIndex);
} catch (CodeGenException e) {
throw new IllegalStateException(e);
Expand Down Expand Up @@ -159,23 +181,28 @@ private Collection<GeneratedFile> generateSources(LiveReloadableCodeGenProvider
CodeGenContext codeGenContext = new CodeGenContext(applicationModel, outDir, workDir, inputDir, false, config, false);
if (codeGenProvider.shouldRun(inputDir, config) && codeGenProvider.trigger(codeGenContext)) {
try (Stream<Path> sources = Files.walk(outDir)) {
sources.filter(Files::isRegularFile)
.filter(path -> path.toString().endsWith(".java"))
.map(path -> {
try {
return new GeneratedFile(GeneratedFileType.SOURCE, outDir.relativize(path), Files.readAllBytes(path));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.forEach(generatedFiles::add);
sources.filter(Files::isRegularFile).forEach(p -> processSource(p, outDir, generatedFiles));
}
Path classPath = outDir.getParent().getParent().resolve("classes");
Path serviceLoaderPath = classPath.resolve("META-INF/services");
if (Files.isDirectory(classPath) && Files.isDirectory(serviceLoaderPath)) {
classLoader.addElement(new PathTreeClassPathElement(PathTree.ofDirectoryOrFile(classPath), true));
}
}
}

return generatedFiles;
}

private void processSource(Path path, Path outDir, Collection<GeneratedFile> generatedFiles) {
if (path.toString().endsWith(".java")) {
try {
generatedFiles.add(new GeneratedFile(GeneratedFileType.SOURCE, outDir.relativize(path), Files.readAllBytes(path)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

private void skipNextLiveReload() {
liveReloadBuildItem.setContextObject(SkipLiveReload.class, SkipLiveReload.TRUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,16 @@
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${version.io.grpc}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${version.io.grpc}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${version.io.grpc}</version>
</dependency>
<dependency>
<!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.kie.kogito.test.utils.SocketUtils;
Expand Down Expand Up @@ -163,7 +162,6 @@ void testGrpc() throws InterruptedException, IOException {
}

@Test
@Disabled("Disabled until https://issues.redhat.com/browse/KOGITO-9614 is resolved")
void testAsyncApi() throws IOException {
given()
.contentType(ContentType.JSON)
Expand Down
Loading

0 comments on commit 7dc865c

Please sign in to comment.