Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KOGITO-9614] Fixing issue with async api and dev mode #3165

Merged
merged 3 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kogito-build/kogito-dependencies-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<version.net.thisptr.jackson-jq>1.0.0-preview.20220705</version.net.thisptr.jackson-jq>
<version.io.quarkiverse.jackson-jq>1.1.0</version.io.quarkiverse.jackson-jq>
<version.io.quarkiverse.openapi.generator>1.3.8</version.io.quarkiverse.openapi.generator>
<version.io.quarkiverse.asyncapi>0.0.5</version.io.quarkiverse.asyncapi>
<version.io.quarkiverse.asyncapi>0.0.6</version.io.quarkiverse.asyncapi>
<version.io.quarkiverse.reactivemessaging.http>1.1.5</version.io.quarkiverse.reactivemessaging.http>
<version.io.quarkiverse.embedded.postgresql>0.0.8</version.io.quarkiverse.embedded.postgresql>
<version.com.github.haifengl.smile>1.5.2</version.com.github.haifengl.smile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private static ProcessCodegen ofProcesses(KogitoBuildContext context, List<Gener
protected static GeneratedInfo<KogitoWorkflowProcess> parseWorkflowFile(Resource r, WorkflowFormat format, KogitoBuildContext context) {
try (Reader reader = r.getReader()) {
return ServerlessWorkflowParser.of(reader, format, context).getProcessInfo();
} catch (IOException | RuntimeException e) {
} catch (Exception e) {
throw new ProcessParsingException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.kie.kogito.codegen.process;

public class ProcessParsingException extends RuntimeException {

private static final long serialVersionUID = 1L;

public ProcessParsingException(Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* Decorators for Http CloudEvents outgoing messages
*/
public final class CloudEventHttpOutgoingDecorator implements MessageDecorator {
public class CloudEventHttpOutgoingDecorator implements MessageDecorator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@radtriste This is intending to fix the inheritance issue on lts


// Note: this constant is also declared in cloudevents-json-jackson.
// However, to avoid importing a library for only one constant that won't likely to change, we opt to have it declared here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ private ChannelMappingStrategy() {
private static final String OVERFLOW_STRATEGY_PROP = "overflow-strategy";
private static final String BUFFER_SIZE_PROP = "buffer-size";

private static Config config = ConfigProvider.getConfig();

public static Collection<ChannelInfo> getChannelMapping() {
Config config = ConfigProvider.getConfig();
Map<String, Collection<String>> inTriggers = new HashMap<>();
Map<String, Collection<String>> outTriggers = new HashMap<>();

for (String property : config.getPropertyNames()) {
if (property.startsWith(INCOMING_TRIGGER)) {
addTrigger(INCOMING_TRIGGER, property, inTriggers);
addTrigger(config, INCOMING_TRIGGER, property, inTriggers);
} else if (property.startsWith(OUTGOING_TRIGGER)) {
addTrigger(OUTGOING_TRIGGER, property, outTriggers);
addTrigger(config, OUTGOING_TRIGGER, property, outTriggers);
}
}

Expand All @@ -69,15 +68,15 @@ public static Collection<ChannelInfo> getChannelMapping() {
final String defaultOutgoingChannel = config.getOptionalValue(OUTGOING_DEFAULT_CHANNEL, String.class).orElse(KogitoEventStreams.OUTGOING);
for (String property : config.getPropertyNames()) {
if (property.startsWith(INCOMING_PREFIX) && property.endsWith(".connector")) {
result.add(getChannelInfo(property, INCOMING_PREFIX, true, defaultIncomingChannel, inTriggers));
result.add(getChannelInfo(config, property, INCOMING_PREFIX, true, defaultIncomingChannel, inTriggers));
} else if (property.startsWith(OUTGOING_PREFIX) && property.endsWith(".connector")) {
result.add(getChannelInfo(property, OUTGOING_PREFIX, false, defaultOutgoingChannel, outTriggers));
result.add(getChannelInfo(config, property, OUTGOING_PREFIX, false, defaultOutgoingChannel, outTriggers));
}
}
return result;
}

private static void addTrigger(String prefix, String property, Map<String, Collection<String>> triggers) {
private static void addTrigger(Config config, String prefix, String property, Map<String, Collection<String>> triggers) {
String channelName = config.getValue(property, String.class);
String triggerName = property.substring(prefix.length());
triggers.computeIfAbsent(channelName, ChannelMappingStrategy::initTriggers).add(triggerName);
Expand All @@ -89,15 +88,15 @@ private static Collection<String> initTriggers(String channelName) {
return result;
}

private static ChannelInfo getChannelInfo(String property, String prefix, boolean isInput, String defaultChannelName, Map<String, Collection<String>> triggers) {
private static ChannelInfo getChannelInfo(Config config, String property, String prefix, boolean isInput, String defaultChannelName, Map<String, Collection<String>> triggers) {
String name = property.substring(prefix.length(), property.lastIndexOf('.'));
return new ChannelInfo(name, triggers.getOrDefault(name, Collections.singleton(name)),
getClassName(config.getOptionalValue(getPropertyName(prefix, name, "value." + (isInput ? "deserializer" : "serializer")), String.class)), isInput,
name.equals(defaultChannelName), config.getOptionalValue((isInput ? UNMARSHALLLER_PREFIX : MARSHALLER_PREFIX) + name, String.class),
isInput ? Optional.empty() : onOverflowInfo(name));
isInput ? Optional.empty() : onOverflowInfo(config, name));
}

private static Optional<OnOverflowInfo> onOverflowInfo(String name) {
private static Optional<OnOverflowInfo> onOverflowInfo(Config config, String name) {
final String namePrefix = KOGITO_EMITTER_PREFIX + name + ".";
Optional<Strategy> strategy = config.getOptionalValue(namePrefix + OVERFLOW_STRATEGY_PROP, String.class).map(Strategy::valueOf);
Optional<Long> bufferSize = config.getOptionalValue(namePrefix + BUFFER_SIZE_PROP, Long.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,31 @@
*/
package org.kie.kogito.quarkus.common.deployment;

import java.util.Optional;

import org.jboss.jandex.IndexView;

import io.quarkus.builder.item.SimpleBuildItem;

public final class LiveReloadExecutionBuildItem extends SimpleBuildItem {

private final IndexView indexView;
private final ClassLoader classLoader;

public LiveReloadExecutionBuildItem(IndexView indexView) {
this(indexView, null);
}

public LiveReloadExecutionBuildItem(IndexView indexView, ClassLoader classLoader) {
this.indexView = indexView;
this.classLoader = classLoader;
}

public IndexView getIndexView() {
return indexView;
}

public Optional<ClassLoader> getClassLoader() {
return Optional.ofNullable(classLoader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import com.github.javaparser.ast.CompilationUnit;

import io.quarkus.deployment.CodeGenContext;
import io.serverlessworkflow.api.Workflow;
import io.serverlessworkflow.api.functions.FunctionDefinition;

Expand All @@ -51,8 +52,12 @@ public class WorkflowCodeGenUtils {
private WorkflowCodeGenUtils() {
}

public static Stream<WorkflowOperationResource> operationResources(Stream<Path> files, Predicate<FunctionDefinition> predicate, Optional<String> operationIdProp) {
return getWorkflows(files).flatMap(w -> processFunction(w, predicate, WorkflowOperationIdFactoryProvider.getFactory(operationIdProp)));
private static WorkflowOperationIdFactory operationIdFactory(CodeGenContext context) {
return WorkflowOperationIdFactoryProvider.getFactory(context.config().getOptionalValue(WorkflowOperationIdFactoryProvider.PROPERTY_NAME, String.class));
}

public static Stream<WorkflowOperationResource> operationResources(Stream<Path> files, Predicate<FunctionDefinition> predicate, CodeGenContext context) {
return getWorkflows(files).flatMap(w -> processFunction(w, predicate, operationIdFactory(context)));
}

public static Stream<Workflow> getWorkflows(Stream<Path> files) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/
package org.kie.kogito.quarkus.serverless.workflow.asyncapi;

import java.util.List;
import java.util.stream.Collectors;
import java.util.ServiceLoader;

import org.kie.kogito.quarkus.common.deployment.KogitoBuildContextAttributeBuildItem;
import org.kie.kogito.quarkus.common.deployment.KogitoAddonsPreGeneratedSourcesBuildItem;
import org.kie.kogito.quarkus.common.deployment.KogitoBuildContextBuildItem;
import org.kie.kogito.quarkus.common.deployment.LiveReloadExecutionBuildItem;
import org.kie.kogito.serverless.workflow.parser.ParserContext;

import io.quarkiverse.asyncapi.config.AsyncAPISupplier;
import io.quarkiverse.asyncapi.config.MapAsyncAPIRegistry;
import io.quarkiverse.asyncapi.generator.AsyncAPIBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;

public class AsyncAPIProcessor {

@BuildStep
KogitoBuildContextAttributeBuildItem asyncAPIContext(List<AsyncAPIBuildItem> asyncAPIBuildItem) {
return new KogitoBuildContextAttributeBuildItem(ParserContext.ASYNC_CONVERTER_KEY, new AsyncAPIInfoConverter(
new MapAsyncAPIRegistry(asyncAPIBuildItem.stream().map(AsyncAPIBuildItem::getAsyncAPI).collect(Collectors.toList()))));
void asyncAPIContext(LiveReloadExecutionBuildItem reload, KogitoBuildContextBuildItem context, BuildProducer<KogitoAddonsPreGeneratedSourcesBuildItem> sources) {
context.getKogitoBuildContext().addContextAttribute(ParserContext.ASYNC_CONVERTER_KEY, new AsyncAPIInfoConverter(
new MapAsyncAPIRegistry(ServiceLoader.load(AsyncAPISupplier.class, reload.getClassLoader().orElse(Thread.currentThread().getContextClassLoader())))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

import org.kie.kogito.quarkus.serverless.workflow.WorkflowOperationResource;

import io.quarkiverse.asyncapi.config.InputStreamSupplier;
import io.quarkiverse.asyncapi.generator.input.InputStreamSupplier;

class AsyncAPIInputStreamSupplier implements InputStreamSupplier {
class AsyncInputStreamSupplier implements InputStreamSupplier {

private final WorkflowOperationResource resource;

public AsyncAPIInputStreamSupplier(WorkflowOperationResource resource) {
public AsyncInputStreamSupplier(WorkflowOperationResource resource) {
this.resource = resource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,31 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.kie.kogito.quarkus.serverless.workflow.WorkflowCodeGenUtils;
import org.kie.kogito.serverless.workflow.operationid.WorkflowOperationIdFactoryProvider;

import io.quarkiverse.asyncapi.config.AsyncAPISpecInput;
import io.quarkiverse.asyncapi.config.AsyncAPISpecInputProvider;
import io.quarkiverse.asyncapi.generator.input.AsyncAPISpecInput;
import io.quarkiverse.asyncapi.generator.input.AsyncApiSpecInputProvider;
import io.quarkus.deployment.CodeGenContext;
import io.serverlessworkflow.api.functions.FunctionDefinition.Type;
import io.smallrye.config.ConfigSourceContext;

public class WorkflowAsyncAPISpecInputProvider implements AsyncAPISpecInputProvider {
public class WorkflowAsyncApiSpecInputProvider implements AsyncApiSpecInputProvider {

private static final String KOGITO_PACKAGE_PREFIX = "org.kie.kogito.asyncAPI";

@Override
public AsyncAPISpecInput read(ConfigSourceContext context) throws IOException {
try (Stream<Path> workflowFiles = Files.walk(Path.of(System.getProperty("user.dir")))) {
return new AsyncAPISpecInput(WorkflowCodeGenUtils
.operationResources(workflowFiles, f -> f.getType() == Type.ASYNCAPI,
Optional.ofNullable(context.getValue(WorkflowOperationIdFactoryProvider.PROPERTY_NAME).getValue()))
.collect(Collectors.toMap(resource -> resource.getOperationId().getFileName(), AsyncAPIInputStreamSupplier::new, (key1, key2) -> key1)));
public AsyncAPISpecInput read(CodeGenContext context) {
Path inputDir = context.inputDir();
while (!Files.exists(inputDir)) {
inputDir = inputDir.getParent();
}
try (Stream<Path> workflowFiles = Files.walk(inputDir)) {
return new AsyncAPISpecInput(WorkflowCodeGenUtils.operationResources(workflowFiles, f -> f.getType() == Type.ASYNCAPI, context)
.collect(Collectors.toMap(resource -> resource.getOperationId().getFileName(), AsyncInputStreamSupplier::new, (key1, key2) -> key1)), KOGITO_PACKAGE_PREFIX);
} catch (IOException io) {
throw new IllegalStateException(io);
}
}
}
Loading
Loading