Skip to content

Commit

Permalink
KOGITO-8410 Replaced PrefixParamsDecorator with CollectionParamsDecor…
Browse files Browse the repository at this point in the history
…ator

Signed-off-by: Helber Belmiro <[email protected]>
  • Loading branch information
hbelmiro committed Aug 22, 2023
1 parent 7cdf297 commit 24e5d9d
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class PrefixParamsDecorator extends AbstractParamsDecorator {

private static final String HEADER_PREFIX = "HEADER_";
public static final String QUERY_PREFIX = "QUERY_";
private static final String QUERY_PREFIX = "QUERY_";

@Override
protected boolean isHeaderParameter(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
import org.jbpm.ruleflow.core.RuleFlowNodeContainerFactory;
import org.jbpm.ruleflow.core.factory.NodeFactory;
import org.jbpm.ruleflow.core.factory.WorkItemNodeFactory;
import org.jetbrains.annotations.NotNull;
import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.CloudEventKnativeParamsDecorator;
import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler;
import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.Operation;
import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.PlainJsonKnativeParamsDecorator;
import org.kie.kogito.serverless.workflow.parser.ParserContext;
import org.kie.kogito.serverless.workflow.parser.VariableInfo;
import org.kie.kogito.serverless.workflow.parser.types.WorkItemTypeHandler;
import org.kie.kogito.serverless.workflow.suppliers.CollectionParamsDecoratorSupplier;
import org.kie.kogito.serverless.workflow.suppliers.ParamsRestBodyBuilderSupplier;
import org.kogito.workitem.rest.RestWorkItemHandler;
import org.kogito.workitem.rest.decorators.ParamsDecorator;
import org.kogito.workitem.rest.decorators.PrefixParamsDecorator;

import com.github.javaparser.ast.expr.Expression;

Expand All @@ -42,7 +42,6 @@
import io.serverlessworkflow.api.functions.FunctionRef;
import io.vertx.core.http.HttpMethod;

import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_DELIMITER;
import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_PROPERTY_NAME;
import static org.kie.kogito.serverless.workflow.parser.FunctionTypeHandlerFactory.trimCustomOperation;
import static org.kie.kogito.serverless.workflow.utils.ServerlessWorkflowUtils.runtimeRestApi;
Expand All @@ -60,51 +59,58 @@ public class KnativeTypeHandler extends WorkItemTypeHandler {
WorkItemNodeFactory<?> node = buildWorkItem(embeddedSubProcess, context, varInfo.getInputVar(), varInfo.getOutputVar())
.name(functionDef.getName());

if (functionRef.getArguments() != null && !functionRef.getArguments().isEmpty()) {
List<String> payloadFields = new ArrayList<>();
functionRef.getArguments().fieldNames().forEachRemaining(payloadFields::add);
List<String> payloadFields = getPayloadFields(functionRef);

Operation operation = Operation.parse(trimCustomOperation(functionDef));

if (HttpMethod.GET.equals(operation.getHttpMethod())) {
node.workParameter(RestWorkItemHandler.PARAMS_DECORATOR, new CollectionParamsDecoratorSupplier(List.of(), payloadFields));
} else {
if (!payloadFields.isEmpty()) {
node.workParameter(PAYLOAD_FIELDS_PROPERTY_NAME, String.join(PAYLOAD_FIELDS_DELIMITER, payloadFields));
node.workParameter(PAYLOAD_FIELDS_PROPERTY_NAME, payloadFields);
}
if (operation.isCloudEvent()) {
node.workParameter(RestWorkItemHandler.PARAMS_DECORATOR, CloudEventKnativeParamsDecorator.class.getName());
} else {
node.workParameter(RestWorkItemHandler.PARAMS_DECORATOR, PlainJsonKnativeParamsDecorator.class.getName());
}
}

node.workParameter(KnativeWorkItemHandler.SERVICE_PROPERTY_NAME, operation.getService())
.workParameter(KnativeWorkItemHandler.PATH_PROPERTY_NAME, operation.getPath())
.workParameter(RestWorkItemHandler.METHOD, operation.getHttpMethod());

return addFunctionArgs(workflow,
fillWorkItemHandler(workflow, context, node, functionDef),
functionRef);
}

@NotNull
private static List<String> getPayloadFields(FunctionRef functionRef) {
List<String> payloadFields = new ArrayList<>();

if (functionRef.getArguments() != null && !functionRef.getArguments().isEmpty()) {
functionRef.getArguments().fieldNames().forEachRemaining(payloadFields::add);
}
return payloadFields;
}

@Override
protected <T extends RuleFlowNodeContainerFactory<T, ?>> WorkItemNodeFactory<T> fillWorkItemHandler(
Workflow workflow, ParserContext context, WorkItemNodeFactory<T> node, FunctionDefinition functionDef) {
if (functionDef.getMetadata() != null) {
functionDef.getMetadata().forEach(node::metaData);
}

Operation operation = Operation.parse(trimCustomOperation(functionDef));

Supplier<Expression> requestTimeout = runtimeRestApi(functionDef, "timeout",
context.getContext(), String.class, DEFAULT_REQUEST_TIMEOUT_VALUE);

return node.workParameter(KnativeWorkItemHandler.SERVICE_PROPERTY_NAME, operation.getService())
.workParameter(KnativeWorkItemHandler.PATH_PROPERTY_NAME, operation.getPath())
.workParameter(RestWorkItemHandler.BODY_BUILDER, new ParamsRestBodyBuilderSupplier())
.workParameter(RestWorkItemHandler.PARAMS_DECORATOR, getParamsDecorator(operation).getName())
.workParameter(RestWorkItemHandler.METHOD, operation.getHttpMethod())
return node.workParameter(RestWorkItemHandler.BODY_BUILDER, new ParamsRestBodyBuilderSupplier())
.workParameter(RestWorkItemHandler.REQUEST_TIMEOUT_IN_MILLIS, requestTimeout)
.metaData(TaskDescriptor.KEY_WORKITEM_TYPE, RestWorkItemHandler.REST_TASK_TYPE)
.workName(KnativeWorkItemHandler.NAME);
}

private static Class<? extends ParamsDecorator> getParamsDecorator(Operation operation) {
if (operation.isCloudEvent()) {
return CloudEventKnativeParamsDecorator.class;
} else if (HttpMethod.GET.equals(operation.getHttpMethod())) {
return PrefixParamsDecorator.class;
} else {
return PlainJsonKnativeParamsDecorator.class;
}
}

@Override
public String type() {
return KnativeWorkItemHandler.NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
*/
package org.kie.kogito.addons.quarkus.knative.serving.customfunctions;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static java.util.stream.Collectors.toMap;
import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_DELIMITER;
import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_PROPERTY_NAME;

final class KnativeFunctionPayloadSupplier {
Expand All @@ -35,11 +34,8 @@ static Map<String, Object> getPayload(Map<String, Object> parameters) {
}

private static List<String> getPayloadFields(Map<String, Object> parameters) {
String payloadFields = (String) parameters.remove(PAYLOAD_FIELDS_PROPERTY_NAME);
if (payloadFields != null) {
return Arrays.asList(payloadFields.split(PAYLOAD_FIELDS_DELIMITER));
} else {
return List.of();
}
@SuppressWarnings("unchecked")
List<String> payloadFields = (List<String>) parameters.remove(PAYLOAD_FIELDS_PROPERTY_NAME);
return Objects.requireNonNullElseGet(payloadFields, List::of);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.kie.kogito.addons.quarkus.knative.serving.customfunctions;

import java.net.URI;
import java.util.Arrays;
import java.util.Map;

import org.kie.kogito.addons.k8s.resource.catalog.KubernetesServiceCatalog;
Expand All @@ -26,11 +25,9 @@
import org.kie.kogito.process.workitem.WorkItemExecutionException;
import org.kogito.workitem.rest.RestWorkItemHandler;

import io.vertx.core.http.HttpMethod;
import io.vertx.mutiny.ext.web.client.WebClient;

import static org.kie.kogito.addons.k8s.resource.catalog.KubernetesProtocol.KNATIVE;
import static org.kogito.workitem.rest.decorators.PrefixParamsDecorator.QUERY_PREFIX;

public final class KnativeWorkItemHandler extends RestWorkItemHandler {

Expand All @@ -46,8 +43,6 @@ public final class KnativeWorkItemHandler extends RestWorkItemHandler {

public static final String PAYLOAD_FIELDS_PROPERTY_NAME = "knative_function_payload_fields";

public static final String PAYLOAD_FIELDS_DELIMITER = ";";

public static final String CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE = "A Knative custom function argument cannot be a CloudEvent when the 'asCloudEvent' property are not set to 'true'";

private final KubernetesServiceCatalog kubernetesServiceCatalog;
Expand All @@ -61,22 +56,9 @@ public KnativeWorkItemHandler(WebClient httpClient, WebClient httpsClient, Kuber
public void executeWorkItem(KogitoWorkItem workItem, KogitoWorkItemManager manager) {
Map<String, Object> parameters = workItem.getParameters();
parameters.put(RestWorkItemHandler.URL, getUrl(parameters));

if (HttpMethod.GET.name().equals(workItem.getParameters().get(RestWorkItemHandler.METHOD))) {
addQueryParamsPrefix(workItem);
}

super.executeWorkItem(workItem, manager);
}

private static void addQueryParamsPrefix(KogitoWorkItem workItem) {
String payloadFields = workItem.getParameters().remove(PAYLOAD_FIELDS_PROPERTY_NAME).toString();
if (payloadFields != null) {
Arrays.stream(payloadFields.split(PAYLOAD_FIELDS_DELIMITER))
.forEach(field -> workItem.getParameters().put(QUERY_PREFIX + field, workItem.getParameters().remove(field)));
}
}

private String getUrl(Map<String, Object> parameters) {
return getServiceAddress(parameters) + parameters.remove(PATH_PROPERTY_NAME);
}
Expand Down

0 comments on commit 24e5d9d

Please sign in to comment.