Skip to content

Commit

Permalink
KOGITO-9737 - Index process definition data (#1842)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianonicolai committed Aug 31, 2023
1 parent bf28115 commit 4bde80b
Show file tree
Hide file tree
Showing 87 changed files with 2,430 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ public void testProcessInstanceEvents() throws IOException {
.body("data.ProcessInstances[0].processId", is("approvals"))
.body("data.ProcessInstances[0].state", is("ACTIVE")));

await()
.atMost(TIMEOUT)
.untilAsserted(() -> given().spec(dataIndexSpec()).contentType(ContentType.JSON)
.body("{ \"query\" : \"{ProcessDefinitions{ id, version, name } }\" }")
.when().post("/graphql")
.then().statusCode(200)
.body("data.ProcessDefinitions.size()", is(1))
.body("data.ProcessDefinitions[0].id", is("approvals"))
.body("data.ProcessDefinitions[0].version", is("1.0"))
.body("data.ProcessDefinitions[0].name", is("approvals")));

await()
.atMost(TIMEOUT)
.untilAsserted(() -> given().spec(dataIndexSpec()).contentType(ContentType.JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ public void testDataIndexDevService() {
.body("data.ProcessInstances[0].id", is(processId))
.body("data.ProcessInstances[0].processId", is("greet"))
.body("data.ProcessInstances[0].processName", is("Greeting workflow")));

given().contentType(ContentType.JSON)
.baseUri("http://localhost:" + dataIndexHttpPort)
.body("{ \"query\" : \"{ProcessDefinitions{ id, version, name } }\" }")
.when().post("/graphql")
.then().statusCode(200)
.body("data.ProcessDefinitions.size()", is(1))
.body("data.ProcessDefinitions[0].id", is("greet"))
.body("data.ProcessDefinitions[0].version", is("1.0"))
.body("data.ProcessDefinitions[0].name", is("Greeting workflow"));
}

private static String getApplicationPropertiesContent() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public interface KogitoRuntimeClient {

CompletableFuture<String> getProcessInstanceDiagram(String serviceURL, ProcessInstance processInstance);

CompletableFuture<String> getProcessInstanceSourceFileContent(String serviceURL, ProcessInstance processInstance);
CompletableFuture<String> getProcessDefinitionSourceFileContent(String serviceURL, String processId);

CompletableFuture<List<Node>> getProcessInstanceNodeDefinitions(String serviceURL, ProcessInstance processInstance);
CompletableFuture<List<Node>> getProcessDefinitionNodes(String serviceURL, String processId);

CompletableFuture<String> triggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeDefinitionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.index.model.Milestone;
import org.kie.kogito.index.model.NodeInstance;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.ProcessInstanceError;

Expand Down Expand Up @@ -61,9 +62,24 @@ public ProcessInstance apply(ProcessInstanceDataEvent event) {
pi.setAddons(isNullOrEmpty(event.getKogitoAddons()) ? null : Set.of(event.getKogitoAddons().split(",")));
pi.setEndpoint(event.getSource() == null ? null : event.getSource().toString());
pi.setLastUpdate(toZonedDateTime(event.getTime()));
pi.setDefinition(definitions().apply(event));
return pi;
}

private Function<ProcessInstanceDataEvent, ProcessDefinition> definitions() {
return event -> {
ProcessDefinition pd = new ProcessDefinition();
pd.setId(event.getData().getProcessId());
pd.setName(event.getData().getProcessName());
pd.setVersion(event.getData().getVersion());
pd.setAddons(isNullOrEmpty(event.getKogitoAddons()) ? null : Set.of(event.getKogitoAddons().split(",")));
pd.setRoles(event.getData().getRoles());
pd.setType(event.getKogitoProcessType());
pd.setEndpoint(event.getSource() == null ? null : event.getSource().toString());
return pd;
};
}

private Function<NodeInstanceEventBody, NodeInstance> nodeInstance() {
return nib -> {
NodeInstance ni = new NodeInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.NodeInstance;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.storage.DataIndexStorageService;
Expand Down Expand Up @@ -56,6 +57,8 @@ public void indexProcessInstance(ProcessInstance pi) {
List<NodeInstance> nodes = previousPI.getNodes().stream().filter(n -> !pi.getNodes().contains(n)).collect(toList());
pi.getNodes().addAll(nodes);
}
ProcessDefinition definition = pi.getDefinition();
manager.getProcessDefinitionsCache().put(definition.getKey(), definition);
manager.getProcessInstancesCache().put(pi.getId(), pi);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.storage.DataIndexStorageService;
Expand Down Expand Up @@ -71,6 +72,7 @@ public abstract class AbstractGraphQLSchemaManager implements GraphQLSchemaManag
public void setup() {
schema = createSchema();
GraphQLQueryParserRegistry.get().registerParsers(
(GraphQLInputObjectType) schema.getType("ProcessDefinitionArgument"),
(GraphQLInputObjectType) schema.getType("ProcessInstanceArgument"),
(GraphQLInputObjectType) schema.getType("UserTaskInstanceArgument"),
(GraphQLInputObjectType) schema.getType("JobArgument"));
Expand Down Expand Up @@ -104,6 +106,14 @@ public void setDataIndexApiExecutor(KogitoRuntimeClient dataIndexApiExecutor) {
this.dataIndexApiExecutor = dataIndexApiExecutor;
}

public String getProcessDefinitionServiceUrl(DataFetchingEnvironment env) {
ProcessDefinition source = env.getSource();
if (source == null || source.getEndpoint() == null || source.getId() == null) {
return null;
}
return getServiceUrl(source.getEndpoint(), source.getId());
}

public String getProcessInstanceServiceUrl(DataFetchingEnvironment env) {
ProcessInstance source = env.getSource();
if (source == null || source.getEndpoint() == null || source.getProcessId() == null) {
Expand Down Expand Up @@ -149,6 +159,10 @@ protected ProcessInstance getParentProcessInstanceValue(DataFetchingEnvironment
return !execute.isEmpty() ? execute.get(0) : null;
}

protected Collection<ProcessDefinition> getProcessDefinitionsValues(DataFetchingEnvironment env) {
return executeAdvancedQueryForCache(cacheService.getProcessDefinitionsCache(), env);
}

protected Collection<ProcessInstance> getProcessInstancesValues(DataFetchingEnvironment env) {
return executeAdvancedQueryForCache(cacheService.getProcessInstancesCache(), env);
}
Expand Down Expand Up @@ -195,13 +209,23 @@ public CompletableFuture<String> getProcessInstanceDiagram(DataFetchingEnvironme
}

public CompletableFuture<String> getProcessInstanceSourceFileContent(DataFetchingEnvironment env) {
ProcessInstance processInstance = env.getSource();
return dataIndexApiExecutor.getProcessInstanceSourceFileContent(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance);
ProcessInstance pi = env.getSource();
return dataIndexApiExecutor.getProcessDefinitionSourceFileContent(getServiceUrl(pi.getEndpoint(), pi.getProcessId()), pi.getProcessId());
}

public CompletableFuture<List<Node>> getProcessNodes(DataFetchingEnvironment env) {
ProcessInstance processInstance = env.getSource();
return dataIndexApiExecutor.getProcessInstanceNodeDefinitions(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance);
public CompletableFuture<List<Node>> getProcessInstanceNodes(DataFetchingEnvironment env) {
ProcessInstance pi = env.getSource();
return dataIndexApiExecutor.getProcessDefinitionNodes(getServiceUrl(pi.getEndpoint(), pi.getProcessId()), pi.getProcessId());
}

public CompletableFuture<String> getProcessDefinitionSourceFileContent(DataFetchingEnvironment env) {
ProcessDefinition pd = env.getSource();
return dataIndexApiExecutor.getProcessDefinitionSourceFileContent(getServiceUrl(pd.getEndpoint(), pd.getId()), pd.getId());
}

public CompletableFuture<List<Node>> getProcessDefinitionNodes(DataFetchingEnvironment env) {
ProcessDefinition pd = env.getSource();
return dataIndexApiExecutor.getProcessDefinitionNodes(getServiceUrl(pd.getEndpoint(), pd.getId()), pd.getId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,40 @@ schema {
}

type Query {
ProcessDefinitions(where: ProcessDefinitionArgument, orderBy: ProcessDefinitionOrderBy, pagination: Pagination): [ProcessDefinition]
ProcessInstances(where: ProcessInstanceArgument, orderBy: ProcessInstanceOrderBy, pagination: Pagination): [ProcessInstance]
UserTaskInstances(where: UserTaskInstanceArgument, orderBy: UserTaskInstanceOrderBy, pagination: Pagination): [UserTaskInstance]
Jobs(where: JobArgument, orderBy: JobOrderBy, pagination: Pagination): [Job]
}

type ProcessDefinition {
id: String!
name: String
version: String
nodes: [Node!]
source: String
addons: [String!]
roles: [String!]
type: String
endpoint: String!
serviceUrl: String
}

input ProcessDefinitionOrderBy {
id: OrderBy
name: OrderBy
version: OrderBy
}

input ProcessDefinitionArgument {
and: [ProcessDefinitionArgument!]
or: [ProcessDefinitionArgument!]
not: ProcessDefinitionArgument
id: StringArgument
name: StringArgument
version: StringArgument
}

type ProcessInstance {
id: String!
processId: String!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,16 @@ public CompletableFuture<String> getProcessInstanceDiagram(String serviceURL, Pr
}

@Override
public CompletableFuture<String> getProcessInstanceSourceFileContent(String serviceURL, ProcessInstance processInstance) {
String requestURI = format(GET_PROCESS_INSTANCE_SOURCE_PATH, processInstance.getProcessId());
public CompletableFuture<String> getProcessDefinitionSourceFileContent(String serviceURL, String processId) {
String requestURI = format(GET_PROCESS_INSTANCE_SOURCE_PATH, processId);
return sendGetClientRequest(getWebClient(serviceURL), requestURI, "Get Process Instance source file with processId: " +
processInstance.getProcessId(), null);
processId, null);
}

@Override
public CompletableFuture<List<Node>> getProcessInstanceNodeDefinitions(String serviceURL, ProcessInstance processInstance) {
String requestURI = format(GET_PROCESS_INSTANCE_NODE_DEFINITIONS_PATH, processInstance.getProcessId());
return sendGetClientRequest(getWebClient(serviceURL), requestURI, "Get Process Instance available nodes with id: " + processInstance.getId(), List.class);
public CompletableFuture<List<Node>> getProcessDefinitionNodes(String serviceURL, String processId) {
String requestURI = format(GET_PROCESS_INSTANCE_NODE_DEFINITIONS_PATH, processId);
return sendGetClientRequest(getWebClient(serviceURL), requestURI, "Get Process available nodes with id: " + processId, List.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public GraphQLSchema createSchema() {

RuntimeWiring runtimeWiring = RuntimeWiring.newRuntimeWiring()
.type("Query", builder -> {
builder.dataFetcher("ProcessDefinitions", this::getProcessDefinitionsValues);
builder.dataFetcher("ProcessInstances", this::getProcessInstancesValues);
builder.dataFetcher("UserTaskInstances", this::getUserTaskInstancesValues);
builder.dataFetcher("Jobs", this::getJobsValues);
Expand All @@ -111,13 +112,19 @@ public GraphQLSchema createSchema() {
builder.dataFetcher("UserTaskInstanceAttachmentDelete", this::deleteUserTaskAttachment);
return builder;
})
.type("ProcessDefinition", builder -> {
builder.dataFetcher("source", this::getProcessDefinitionSourceFileContent);
builder.dataFetcher("nodes", this::getProcessDefinitionNodes);
builder.dataFetcher("serviceUrl", this::getProcessDefinitionServiceUrl);
return builder;
})
.type("ProcessInstance", builder -> {
builder.dataFetcher("parentProcessInstance", this::getParentProcessInstanceValue);
builder.dataFetcher("childProcessInstances", this::getChildProcessInstancesValues);
builder.dataFetcher("serviceUrl", this::getProcessInstanceServiceUrl);
builder.dataFetcher("diagram", this::getProcessInstanceDiagram);
builder.dataFetcher("source", this::getProcessInstanceSourceFileContent);
builder.dataFetcher("nodeDefinitions", this::getProcessNodes);
builder.dataFetcher("nodeDefinitions", this::getProcessInstanceNodes);
return builder;
})
.type("UserTaskInstance", builder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.kie.kogito.index.model.ProcessInstanceState.COMPLETED;
import static org.kie.kogito.index.model.ProcessInstanceState.ERROR;
import static org.kie.kogito.index.service.GraphQLUtils.getJobById;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessDefinitionByIdAndVersion;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByBusinessKey;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceById;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByIdAndAddon;
Expand Down Expand Up @@ -104,13 +105,18 @@ static void setup() {
@Transactional
void tearDown() {
cacheService.getJobsCache().clear();
cacheService.getProcessDefinitionsCache().clear();
cacheService.getProcessInstancesCache().clear();
cacheService.getUserTaskInstancesCache().clear();
}

@Test
//Reproducer for KOGITO-334
void testDefaultGraphqlTypes() {
given().contentType(ContentType.JSON).body("{ \"query\" : \"{ProcessDefinitions{ id } }\" }")
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.ProcessDefinitions", isA(Collection.class));

given().contentType(ContentType.JSON).body("{ \"query\" : \"{ProcessInstances{ id } }\" }")
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.ProcessInstances", isA(Collection.class));
Expand All @@ -124,6 +130,21 @@ void testDefaultGraphqlTypes() {
.then().log().ifValidationFails().statusCode(200).body("data.Jobs", isA(Collection.class));
}

protected void validateProcessDefinition(String query, ProcessInstanceDataEvent event) {
LOGGER.debug("GraphQL query: {}", query);
await()
.atMost(timeout)
.untilAsserted(() -> given().contentType(ContentType.JSON).body(query)
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200)
.body("data.ProcessDefinitions[0].id", is(event.getData().getProcessId()))
.body("data.ProcessDefinitions[0].name", is(event.getData().getProcessName()))
.body("data.ProcessDefinitions[0].version", is(event.getData().getVersion()))
.body("data.ProcessDefinitions[0].type", is(event.getData().getProcessType()))
.body("data.ProcessDefinitions[0].addons", event.getKogitoAddons() == null ? is(nullValue()) : hasItems(event.getKogitoAddons().split(",")))
.body("data.ProcessDefinitions[0].roles", event.getData().getRoles() == null ? is(nullValue()) : hasItems(event.getData().getRoles().toArray())));
}

protected void validateProcessInstance(String query, ProcessInstanceDataEvent event, String childProcessInstanceId) {
LOGGER.debug("GraphQL query: {}", query);
await()
Expand Down Expand Up @@ -272,6 +293,7 @@ void testProcessInstanceIndex() throws Exception {
ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
indexProcessCloudEvent(startEvent);

validateProcessDefinition(getProcessDefinitionByIdAndVersion(startEvent.getKogitoProcessId(), startEvent.getData().getVersion()), startEvent);
validateProcessInstance(getProcessInstanceById(processInstanceId), startEvent);
validateProcessInstance(getProcessInstanceByIdAndState(processInstanceId, ACTIVE), startEvent);
validateProcessInstance(getProcessInstanceByIdAndProcessId(processInstanceId, processId), startEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.commons.lang3.reflect.FieldUtils;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.ProcessInstanceState;
import org.kie.kogito.index.model.UserTaskInstance;
Expand All @@ -52,8 +53,9 @@ public class GraphQLUtils {
private static final Map<String, String> QUERIES = new HashMap<>();

static {
QUERY_FIELDS.put(ProcessDefinition.class, getAllFieldsList(ProcessDefinition.class).map(getFieldName()).collect(joining(", ")));
QUERY_FIELDS.put(UserTaskInstance.class, getAllFieldsList(UserTaskInstance.class).map(getFieldName()).collect(joining(", ")));
QUERY_FIELDS.put(ProcessInstance.class, getAllFieldsList(ProcessInstance.class).map(getFieldName()).collect(joining(", ")));
QUERY_FIELDS.put(ProcessInstance.class, getAllFieldsList(ProcessInstance.class).filter(field -> !field.getName().equals("definition")).map(getFieldName()).collect(joining(", ")));
QUERY_FIELDS.put(Job.class, getAllFieldsList(Job.class).map(getFieldName()).collect(joining(", ")));
QUERY_FIELDS.computeIfPresent(ProcessInstance.class, (k, v) -> v + ", serviceUrl");
QUERY_FIELDS.computeIfPresent(ProcessInstance.class, (k, v) -> v + ", childProcessInstances { id, processName }");
Expand All @@ -71,6 +73,10 @@ public class GraphQLUtils {
}
}

public static String getProcessDefinitionByIdAndVersion(String id, String version) {
return getProcessDefinitionQuery("ProcessDefinitionByIdAndVersion", id, version);
}

public static String getProcessInstanceById(String id) {
return getProcessInstanceQuery("ProcessInstanceById", id);
}
Expand Down Expand Up @@ -207,6 +213,10 @@ private static String getProcessInstanceQuery(String name, String... args) {
return getQuery(name, ProcessInstance.class, args);
}

private static String getProcessDefinitionQuery(String name, String... args) {
return getQuery(name, ProcessDefinition.class, args);
}

private static String getUserTaskInstanceQuery(String name, String... args) {
return getQuery(name, UserTaskInstance.class, args);
}
Expand Down
Loading

0 comments on commit 4bde80b

Please sign in to comment.