Skip to content

Commit

Permalink
[kie-issues-249] Data index improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed May 11, 2023
1 parent 3d51e76 commit 42f53fd
Show file tree
Hide file tree
Showing 32 changed files with 729 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package org.kie.kogito.events.mongodb;

import java.util.Collection;
import java.util.function.BooleanSupplier;

import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.NodeInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
Expand All @@ -47,6 +47,7 @@ public abstract class MongoDBEventPublisher implements EventPublisher {
static final String ID = "_id";

private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<NodeInstanceDataEvent> nodeInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<VariableInstanceDataEvent> variableInstanceDataEventCollection;

Expand All @@ -62,6 +63,8 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

protected abstract String eventsDatabaseName();

protected abstract String nodeInstanceEventsCollection();

protected abstract String processInstancesEventsCollection();

protected abstract String userTasksEventsCollection();
Expand All @@ -75,37 +78,48 @@ protected void configure() {
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), VariableInstanceDataEvent.class).withCodecRegistry(registry);
nodeInstanceDataEventCollection = mongoDatabase.getCollection(nodeInstanceEventsCollection(), NodeInstanceDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event, this::processInstancesEvents);
if (this.processInstancesEvents()) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event);
}
break;
case "NodeInstanceEvent":
if (this.processInstancesEvents()) {
publishEvent(nodeInstanceDataEventCollection, (NodeInstanceDataEvent) event);
}
break;
case "UserTaskInstanceEvent":
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event, this::userTasksEvents);
if (this.userTasksEvents()) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event);
}
break;
case "VariableInstanceEvent":
publishEvent(variableInstanceDataEventCollection, (VariableInstanceDataEvent) event, this::variablesEvents);
if (this.variablesEvents()) {
publishEvent(variableInstanceDataEventCollection, (VariableInstanceDataEvent) event);
}
break;
default:
logger.warn("Unknown type of event '{}', ignoring", event.getType());
}
}

private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event, BooleanSupplier enabled) {
if (enabled.getAsBoolean()) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}
private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.events.mongodb.codec;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.NodeInstanceDataEvent;
import org.kie.kogito.event.process.NodeInstanceEventBody;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class NodeInstanceDataEventCodec implements CollectibleCodec<NodeInstanceDataEvent> {

@Override
public NodeInstanceDataEvent generateIdIfAbsentFromDocument(NodeInstanceDataEvent nodeInstanceDataEvent) {
return nodeInstanceDataEvent;
}

@Override
public boolean documentHasId(NodeInstanceDataEvent processInstanceDataEvent) {
return processInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(NodeInstanceDataEvent processInstanceDataEvent) {
return new BsonString(processInstanceDataEvent.getId());
}

@Override
public NodeInstanceDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, NodeInstanceDataEvent nodeInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, nodeInstanceDataEvent);
doc.put("kogitoProcessType", nodeInstanceDataEvent.getKogitoProcessType());
doc.put("kogitoProcessInstanceVersion", nodeInstanceDataEvent.getKogitoProcessInstanceVersion());
doc.put("kogitoParentProcessinstanceId", nodeInstanceDataEvent.getKogitoParentProcessInstanceId());
doc.put("kogitoProcessinstanceState", nodeInstanceDataEvent.getKogitoProcessInstanceState());
doc.put("kogitoReferenceId", nodeInstanceDataEvent.getKogitoReferenceId());
doc.put("kogitoStartFromNode", nodeInstanceDataEvent.getKogitoStartFromNode());
doc.put("data", encodeData(nodeInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(NodeInstanceEventBody data) {

Document doc = new Document();
doc.put("processInstanceId", data.getProcessInstanceId());
doc.put("connectionNodeInstanceId", data.getConnectionNodeInstanceId());
doc.put("id", data.getId());
doc.put("nodeId", data.getNodeId());
doc.put("nodeDefinitionId", data.getNodeDefinitionId());
doc.put("nodeName", data.getNodeName());
doc.put("nodeType", data.getNodeType());
doc.put("eventTime", data.getEventTime());
doc.put("eventType", data.getEventType());

doc.put("exitType", data.getExitType());

if (!data.getData().isEmpty()) {
doc.put("data", new Document(data.getData()));
}

return doc;
}

@Override
public Class<NodeInstanceDataEvent> getEncoderClass() {
return NodeInstanceDataEvent.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,6 @@ private Document encodeData(ProcessInstanceEventBody data) {
doc.put("variables", new Document(data.getVariables()));
}

if (data.getNodeInstances() != null) {
doc.put("nodeInstances",
data.getNodeInstances().stream().map(ni -> {
Document niDoc = new Document();
niDoc.put("id", ni.getId());
niDoc.put("nodeId", ni.getNodeId());
niDoc.put("nodeDefinitionId", ni.getNodeDefinitionId());
niDoc.put("nodeName", ni.getNodeName());
niDoc.put("nodeType", ni.getNodeType());
niDoc.put("triggerTime", ni.getTriggerTime());
if (ni.getLeaveTime() != null) {
niDoc.put("leaveTime", ni.getLeaveTime());
}
return niDoc;
}).collect(Collectors.toSet()));
}

if (data.getError() != null) {
Document eDoc = new Document();
eDoc.put("errorMessage", data.getError().getErrorMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ protected String userTasksEventsCollection() {
protected String variablesEventsCollection() {
return "testVCollection";
}

@Override
protected String nodeInstanceEventsCollection() {
return "testNICollection";
}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.events.mongodb.codec;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.process.NodeInstanceDataEvent;
import org.kie.kogito.event.process.NodeInstanceEventBody;
import org.kie.kogito.event.process.ProcessInstanceEventBody;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;

import static org.assertj.core.api.Assertions.assertThat;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.ID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

class NodeInstanceDataEventCodecTest {

private NodeInstanceDataEventCodec codec;

private NodeInstanceDataEvent event;

@BeforeEach
void setUp() {
codec = new NodeInstanceDataEventCodec();

String source = "testSource";
String kogitoAddons = "testKogitoAddons";

Map<String, Object> metaData = new HashMap<>();
metaData.put(ProcessInstanceEventBody.ID_META_DATA, "testKogitoProcessInstanceId");
metaData.put(ProcessInstanceEventBody.VERSION_META_DATA, "testKogitoProcessInstanceVersion");
metaData.put(ProcessInstanceEventBody.ROOT_ID_META_DATA, "testKogitoRootProcessInstanceId");
metaData.put(ProcessInstanceEventBody.PROCESS_ID_META_DATA, "testKogitoProcessId");
metaData.put(ProcessInstanceEventBody.PROCESS_TYPE_META_DATA, "testKogitoProcessType");
metaData.put(ProcessInstanceEventBody.ROOT_PROCESS_ID_META_DATA, "testKogitoRootProcessId");
metaData.put(ProcessInstanceEventBody.PARENT_ID_META_DATA, "testKogitoParentProcessInstanceId");
metaData.put(ProcessInstanceEventBody.STATE_META_DATA, "testKogitoProcessInstanceState");

NodeInstanceEventBody body = NodeInstanceEventBody.create()
.id("testId")
.processInstanceId("testProcessInstanceId")
.connectionNodeInstanceId("connectionNodeInstanceId")
.eventTime(new Date())
.eventType(1)
.exitType(3)
.data("test", 2)
.nodeDefinitionId("testNodeDefinitionId")
.nodeId("testNodeId")
.nodeName("testNodeName")
.nodeType("testNodeType")
.build();

event = new NodeInstanceDataEvent(source, kogitoAddons, metaData, body);
}

@Test
void generateIdIfAbsentFromDocument() {
assertThat(codec.generateIdIfAbsentFromDocument(event)).isEqualTo(event);
}

@Test
void documentHasId() {
assertThat(codec.documentHasId(event)).isTrue();
}

@Test
void getDocumentId() {
assertThat(codec.getDocumentId(event)).isEqualTo(new BsonString(event.getId()));
}

@Test
void decode() {
assertThat(codec.decode(mock(BsonReader.class), DecoderContext.builder().build())).isNull();
}

@Test
void encode() {
try (MockedStatic<CodecUtils> codecUtils = mockStatic(CodecUtils.class)) {
Codec<Document> mockCodec = mock(Codec.class);
codecUtils.when(CodecUtils::codec).thenReturn(mockCodec);
codecUtils.when(() -> CodecUtils.encodeDataEvent(any(), any())).thenCallRealMethod();
BsonWriter writer = mock(BsonWriter.class);
EncoderContext context = EncoderContext.builder().build();

codec.encode(writer, event, context);

ArgumentCaptor<Document> captor = ArgumentCaptor.forClass(Document.class);
verify(mockCodec, times(1)).encode(eq(writer), captor.capture(), eq(context));
Document doc = captor.getValue();

assertThat(doc).containsEntry(ID, event.getId())
.containsEntry("specversion", event.getSpecVersion().toString())
.containsEntry("source", event.getSource().toString())
.containsEntry("type", event.getType())
.containsEntry("time", event.getTime())
.containsEntry("subject", event.getSubject())
.containsEntry("dataContentType", event.getDataContentType())
.containsEntry("dataSchema", event.getDataSchema())
.containsEntry("kogitoProcessinstanceId", event.getKogitoProcessInstanceId())
.containsEntry("kogitoProcessInstanceVersion", event.getKogitoProcessInstanceVersion())
.containsEntry("kogitoRootProcessinstanceId", event.getKogitoRootProcessInstanceId())
.containsEntry("kogitoProcessId", event.getKogitoProcessId())
.containsEntry("kogitoProcessType", event.getKogitoProcessType())
.containsEntry("kogitoRootProcessId", event.getKogitoRootProcessId())
.containsEntry("kogitoAddons", event.getKogitoAddons())
.containsEntry("kogitoParentProcessinstanceId", event.getKogitoParentProcessInstanceId())
.containsEntry("kogitoProcessinstanceState", event.getKogitoProcessInstanceState())
.containsEntry("kogitoReferenceId", event.getKogitoReferenceId())
.containsEntry("kogitoStartFromNode", event.getKogitoStartFromNode());

assertThat(((Document) doc.get("data"))).containsEntry("id", event.getData().getId())
.containsEntry("processInstanceId", event.getData().getProcessInstanceId())
.containsEntry("connectionNodeInstanceId", event.getData().getConnectionNodeInstanceId())
.containsEntry("id", event.getData().getId())
.containsEntry("nodeId", event.getData().getNodeId())
.containsEntry("nodeDefinitionId", event.getData().getNodeDefinitionId())
.containsEntry("nodeName", event.getData().getNodeName())
.containsEntry("nodeType", event.getData().getNodeType())
.containsEntry("eventTime", event.getData().getEventTime())
.containsEntry("eventType", event.getData().getEventType())
.containsEntry("exitType", event.getData().getExitType());
}
}

@Test
void getEncoderClass() {
assertThat(codec.getEncoderClass()).isEqualTo(NodeInstanceDataEvent.class);
}
}
Loading

0 comments on commit 42f53fd

Please sign in to comment.