Skip to content

Commit

Permalink
KOGITO-8940: Add processInstance identity idexation at Data Index (#1825
Browse files Browse the repository at this point in the history
)

* KOGITO-8940: Creator/owner field at process

* keep importing individually, postgres identity column type to character varying

* Add createdBy and updatedBy in the GraphQL schema to store event identity

* Renamed migration files to match the version and added missing store=yes

* added createdBy check on integration test query
  • Loading branch information
nmirasch committed Sep 13, 2023
1 parent 95a8bac commit e02d974
Show file tree
Hide file tree
Showing 36 changed files with 226 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,14 @@ public void testProcessInstanceEvents() throws IOException {
await()
.atMost(TIMEOUT)
.untilAsserted(() -> given().spec(dataIndexSpec()).contentType(ContentType.JSON)
.body("{ \"query\" : \"{ProcessInstances{ id, processId, state } }\" }")
.body("{ \"query\" : \"{ProcessInstances{ id, processId, state, createdBy} }\" }")
.when().post("/graphql")
.then().statusCode(200)
.body("data.ProcessInstances.size()", is(1))
.body("data.ProcessInstances[0].id", is(pId))
.body("data.ProcessInstances[0].processId", is("approvals"))
.body("data.ProcessInstances[0].state", is("ACTIVE")));
.body("data.ProcessInstances[0].state", is("ACTIVE"))
.body("data.ProcessInstances[0].createdBy", nullValue()));

await()
.atMost(TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@

import static io.restassured.RestAssured.given;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.*;

public class KogitoDevServiceTest {

Expand Down Expand Up @@ -88,13 +87,14 @@ public void testDataIndexDevService() {
.baseUri("http://localhost:" + dataIndexHttpPort)
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.body("{ \"query\" : \"{ ProcessInstances (where: { id: {equal: \\\"" + processId + "\\\"}}) { id, processId, processName } }\"}")
.body("{ \"query\" : \"{ ProcessInstances (where: { id: {equal: \\\"" + processId + "\\\"}}) { id, processId, processName, createdBy } }\"}")
.when().post("/graphql")
.then()
.statusCode(200)
.body("data.ProcessInstances[0].id", is(processId))
.body("data.ProcessInstances[0].processId", is("greet"))
.body("data.ProcessInstances[0].processName", is("Greeting workflow")));
.body("data.ProcessInstances[0].processName", is("Greeting workflow"))
.body("data.ProcessInstances[0].createdBy", nullValue()));

given().contentType(ContentType.JSON)
.baseUri("http://localhost:" + dataIndexHttpPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public ProcessInstance apply(ProcessInstanceDataEvent event) {
pi.setLastUpdate(toZonedDateTime(event.getTime()));
pi.setVersion(event.getData().getVersion());
pi.setDefinition(definition().apply(event));
pi.setUpdatedBy(event.getData().getIdentity());
return pi;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public void indexProcessInstance(ProcessInstance pi) {
if (previousPI != null) {
List<NodeInstance> nodes = previousPI.getNodes().stream().filter(n -> !pi.getNodes().contains(n)).collect(toList());
pi.getNodes().addAll(nodes);
} else {
pi.setCreatedBy(pi.getUpdatedBy());
}
ProcessDefinition definition = pi.getDefinition();
if (!manager.getProcessDefinitionsCache().containsKey(definition.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type ProcessInstance {
diagram: String
source: String
definition: ProcessDefinition
identity: String
createdBy: String
updatedBy: String
}

type ProcessInstanceError {
Expand Down Expand Up @@ -132,6 +135,8 @@ input ProcessInstanceOrderBy {
error: ProcessInstanceErrorOrderBy
lastUpdate: OrderBy
businessKey: OrderBy
createdBy: OrderBy
updatedBy: OrderBy
}

input ProcessInstanceErrorOrderBy {
Expand Down Expand Up @@ -160,6 +165,8 @@ input ProcessInstanceArgument {
addons: StringArrayArgument
lastUpdate: DateArgument
businessKey: StringArgument
createdBy: StringArgument
updatedBy: StringArgument
}

input ProcessInstanceErrorArgument {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ private ObjectNode getProcessJson(ProcessInstanceDataEvent event) {
if (!isNullOrEmpty(event.getData().getBusinessKey())) {
json.put("businessKey", event.getData().getBusinessKey());
}
if (!isNullOrEmpty(event.getData().getIdentity())) {
json.put("updatedBy", event.getData().getIdentity());
}
return json;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type ProcessInstanceMeta {
end: DateTime
lastUpdate: DateTime!
businessKey: String
createdBy: String
updatedBy: String
}

input ProcessInstanceMetaArgument {
Expand All @@ -71,6 +73,8 @@ input ProcessInstanceMetaArgument {
start: DateArgument
end: DateArgument
businessKey: StringArgument
createdBy: StringArgument
updatedBy: StringArgument
}

extend type UserTaskInstance {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void testAddProtoFile() throws Exception {
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.Travels", isA(Collection.class));

ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
indexProcessCloudEvent(startEvent);

validateProcessInstance(getProcessInstanceByIdAndState(processInstanceId, ACTIVE), startEvent);
Expand Down Expand Up @@ -205,7 +205,7 @@ void testAddProtoFile() throws Exception {
.body("data.Travels[0].flight.flightNumber", is("MX555"));

ProcessInstanceDataEvent subProcessStartEvent = getProcessCloudEvent(subProcessId, subProcessInstanceId, ACTIVE,
processInstanceId, processId, processInstanceId);
processInstanceId, processId, processInstanceId, "currentUser");
Map<String, Object> travellerMap = new HashMap<>();
travellerMap.put("firstName", "Maciej");
travellerMap.put("email", "[email protected]");
Expand Down Expand Up @@ -271,7 +271,7 @@ void testAddProtoFile() throws Exception {
.body("data.Travels[0].flight.arrival", is("2019-08-20T22:12:57.34Z"))
.body("data.Travels[0].flight.departure", is("2019-08-20T07:12:57.34Z"));

ProcessInstanceDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null);
ProcessInstanceDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, "currentUser");
indexProcessCloudEvent(endEvent);

validateProcessInstance(getProcessInstanceByIdAndState(processInstanceId, COMPLETED), endEvent, subProcessInstanceId);
Expand Down Expand Up @@ -420,7 +420,7 @@ void testIndexingDomainUsingUserTaskEventFirst() throws Exception {
.body("data.Travels[0].metadata.userTasks[0].lastUpdate", is(formatOffsetDateTime(userTaskEvent.getTime())))
.body("data.Travels[0].metadata.processInstances", is(nullValue()));

ProcessInstanceDataEvent processEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
ProcessInstanceDataEvent processEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
indexProcessCloudEvent(processEvent);

given().contentType(ContentType.JSON)
Expand Down Expand Up @@ -466,7 +466,7 @@ void testIndexingDomainUsingProcessEventFirst() throws Exception {
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.Travels", isA(Collection.class));

ProcessInstanceDataEvent processEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
ProcessInstanceDataEvent processEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
indexProcessCloudEvent(processEvent);

given().contentType(ContentType.JSON)
Expand Down Expand Up @@ -535,7 +535,7 @@ void testIndexingDomainParallelEvents() throws Exception {
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.Travels", isA(Collection.class));

ProcessInstanceDataEvent processEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
ProcessInstanceDataEvent processEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
UserTaskInstanceDataEvent userTaskEvent = getUserTaskCloudEvent(taskId, processId, processInstanceId, null, null, state);

CompletableFuture.allOf(
Expand Down Expand Up @@ -585,7 +585,7 @@ void testProcessInstanceDomainIndex() throws Exception {
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.Travels", isA(Collection.class));

ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
indexProcessCloudEvent(startEvent);

validateProcessInstance(getProcessInstanceById(processInstanceId), startEvent);
Expand Down Expand Up @@ -613,7 +613,7 @@ void testProcessInstanceDomainIndex() throws Exception {
.body("data.Travels[0].hotel.name", is("Meriton"))
.body("data.Travels[0].traveller.firstName", is("Maciej"));

ProcessInstanceDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null);
ProcessInstanceDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, "currentUser");
endEvent.getData().update().endDate(new Date());
Map<String, Object> variablesMap = getProcessInstanceVariablesMap();
((Map<String, Object>) variablesMap.get("hotel")).put("name", "Ibis");
Expand Down Expand Up @@ -647,13 +647,13 @@ void testProcessInstanceDomainIndex() throws Exception {
.body("data.Travels[0].traveller.firstName", is("Maciej"));

ProcessInstanceDataEvent event = getProcessCloudEvent(subProcessId, subProcessInstanceId, ACTIVE, processInstanceId,
processId, processInstanceId);
processId, processInstanceId, "currentUser");
indexProcessCloudEvent(event);

validateProcessInstance(getProcessInstanceByParentProcessInstanceId(processInstanceId), event);

ProcessInstanceDataEvent errorEvent = getProcessCloudEvent(subProcessId, subProcessInstanceId, ERROR, processInstanceId,
processId, processInstanceId);
processId, processInstanceId, "currentUser");
indexProcessCloudEvent(errorEvent);

validateProcessInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.kie.kogito.index.service.GraphQLUtils.getJobById;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessDefinitionByIdAndVersion;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByBusinessKey;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByCreatedBy;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceById;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByIdAndAddon;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByIdAndErrorNode;
Expand All @@ -72,6 +73,7 @@
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByIdAndState;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByParentProcessInstanceId;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByRootProcessInstanceId;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByUpdatedBy;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceById;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdAndActualOwner;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdAndCompleted;
Expand Down Expand Up @@ -201,7 +203,7 @@ void testProcessInstancePagination() {

IntStream.range(0, 100).forEach(i -> {
String pId = UUID.randomUUID().toString();
ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, pId, ACTIVE, null, null, null);
ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, pId, ACTIVE, null, null, null, "currentUser");
indexProcessCloudEvent(startEvent);
pIds.add(pId);
await()
Expand Down Expand Up @@ -294,7 +296,7 @@ void testProcessInstanceIndex() throws Exception {
String subProcessId = processId + "_sub";
String subProcessInstanceId = UUID.randomUUID().toString();

ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null);
ProcessInstanceDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
indexProcessCloudEvent(startEvent);

validateProcessDefinition(getProcessDefinitionByIdAndVersion(startEvent.getKogitoProcessId(), startEvent.getData().getVersion()), startEvent);
Expand All @@ -309,8 +311,10 @@ void testProcessInstanceIndex() throws Exception {
validateProcessInstance(getProcessInstanceByIdAndMilestoneStatus(processInstanceId, MilestoneStatus.AVAILABLE.name()),
startEvent);
validateProcessInstance(getProcessInstanceByBusinessKey(startEvent.getData().getBusinessKey()), startEvent);
validateProcessInstance(getProcessInstanceByCreatedBy(startEvent.getData().getIdentity()), startEvent);
validateProcessInstance(getProcessInstanceByUpdatedBy(startEvent.getData().getIdentity()), startEvent);

ProcessInstanceDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null);
ProcessInstanceDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, "currentUser");
endEvent.getData().update().endDate(new Date());
Map<String, Object> variablesMap = getProcessInstanceVariablesMap();
((Map<String, Object>) variablesMap.get("hotel")).put("name", "Ibis");
Expand All @@ -323,7 +327,7 @@ void testProcessInstanceIndex() throws Exception {
validateProcessInstance(getProcessInstanceByIdAndMilestoneStatus(processInstanceId, MilestoneStatus.COMPLETED.name()), endEvent);

ProcessInstanceDataEvent event = getProcessCloudEvent(subProcessId, subProcessInstanceId, ACTIVE, processInstanceId,
processId, processInstanceId);
processId, processInstanceId, "currentUser");
indexProcessCloudEvent(event);

validateProcessInstance(getProcessInstanceByParentProcessInstanceId(processInstanceId), event);
Expand All @@ -337,7 +341,7 @@ void testProcessInstanceIndex() throws Exception {
event);

ProcessInstanceDataEvent errorEvent = getProcessCloudEvent(subProcessId, subProcessInstanceId, ERROR, processInstanceId,
processId, processInstanceId);
processId, processInstanceId, "currentUser");
indexProcessCloudEvent(errorEvent);

validateProcessInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ public static String getProcessInstanceByBusinessKey(String businessKeys) {
return getProcessInstanceQuery("ProcessInstanceByBusinessKey", businessKeys);
}

public static String getProcessInstanceByCreatedBy(String identity) {
return getProcessInstanceQuery("ProcessInstanceByCreatedBy", identity);
}

public static String getProcessInstanceByUpdatedBy(String identity) {
return getProcessInstanceQuery("ProcessInstanceByUpdatedBy", identity);
}

public static String getUserTaskInstanceById(String id) {
return getUserTaskInstanceQuery("UserTaskInstanceById", id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private void assertDomainSubscription(String processId, String processInstanceId
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.Travels", isA(Collection.class));

ProcessInstanceDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null);
ProcessInstanceDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null, "currentUser");
indexProcessCloudEvent(event);

JsonObject json = cf.get(1, TimeUnit.MINUTES);
Expand All @@ -168,7 +168,7 @@ private void assertProcessInstanceSubscription(String processId, String processI
.when().post("/graphql")
.then().log().ifValidationFails().statusCode(200).body("data.Travels", isA(Collection.class));

ProcessInstanceDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null);
ProcessInstanceDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null, "currentUser");
indexProcessCloudEvent(event);

JsonObject json = cf.get(1, TimeUnit.MINUTES);
Expand Down
Loading

0 comments on commit e02d974

Please sign in to comment.