Skip to content

Commit

Permalink
[kie-issues-249] Data index improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Aug 28, 2023
1 parent 6477b03 commit b50a57b
Show file tree
Hide file tree
Showing 129 changed files with 4,851 additions and 2,986 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
package org.kie.kogito.events.mongodb;

import java.util.Collection;
import java.util.function.BooleanSupplier;

import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.events.mongodb.codec.EventMongoDBCodecProvider;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
Expand All @@ -46,9 +46,10 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

static final String ID = "_id";

private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<VariableInstanceDataEvent> variableInstanceDataEventCollection;
private MongoCollection<ProcessInstanceStateDataEvent> processInstanceDataEventCollection;
private MongoCollection<ProcessInstanceNodeDataEvent> nodeInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceStateDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<ProcessInstanceVariableDataEvent> variableInstanceDataEventCollection;

protected abstract MongoClient mongoClient();

Expand All @@ -62,6 +63,8 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

protected abstract String eventsDatabaseName();

protected abstract String nodeInstanceEventsCollection();

protected abstract String processInstancesEventsCollection();

protected abstract String userTasksEventsCollection();
Expand All @@ -72,40 +75,51 @@ protected void configure() {
CodecRegistry registry = CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), fromProviders(new EventMongoDBCodecProvider(),
PojoCodecProvider.builder().automatic(true).build()));
MongoDatabase mongoDatabase = mongoClient().getDatabase(eventsDatabaseName()).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), VariableInstanceDataEvent.class).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceStateDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceStateDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), ProcessInstanceVariableDataEvent.class).withCodecRegistry(registry);
nodeInstanceDataEventCollection = mongoDatabase.getCollection(nodeInstanceEventsCollection(), ProcessInstanceNodeDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event, this::processInstancesEvents);
if (this.processInstancesEvents()) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceStateDataEvent) event);
}
break;
case "NodeInstanceEvent":
if (this.processInstancesEvents()) {
publishEvent(nodeInstanceDataEventCollection, (ProcessInstanceNodeDataEvent) event);
}
break;
case "UserTaskInstanceEvent":
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event, this::userTasksEvents);
if (this.userTasksEvents()) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceStateDataEvent) event);
}
break;
case "VariableInstanceEvent":
publishEvent(variableInstanceDataEventCollection, (VariableInstanceDataEvent) event, this::variablesEvents);
if (this.variablesEvents()) {
publishEvent(variableInstanceDataEventCollection, (ProcessInstanceVariableDataEvent) event);
}
break;
default:
logger.warn("Unknown type of event '{}', ignoring", event.getType());
}
}

private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event, BooleanSupplier enabled) {
if (enabled.getAsBoolean()) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}
private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;

public class EventMongoDBCodecProvider implements CodecProvider {

Expand All @@ -32,13 +32,13 @@ public class EventMongoDBCodecProvider implements CodecProvider {
@SuppressWarnings("unchecked")
@Override
public <T> Codec<T> get(Class<T> aClass, CodecRegistry codecRegistry) {
if (aClass == ProcessInstanceDataEvent.class) {
if (aClass == ProcessInstanceStateDataEvent.class) {
return (Codec<T>) PROCESS_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == UserTaskInstanceDataEvent.class) {
if (aClass == UserTaskInstanceStateDataEvent.class) {
return (Codec<T>) USER_TASK_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == VariableInstanceDataEvent.class) {
if (aClass == ProcessInstanceVariableDataEvent.class) {
return (Codec<T>) VARIABLE_INSTANCE_DATA_EVENT_CODEC;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.kie.kogito.events.mongodb.codec;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeEventBody;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class NodeInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceNodeDataEvent> {

@Override
public ProcessInstanceNodeDataEvent generateIdIfAbsentFromDocument(ProcessInstanceNodeDataEvent nodeInstanceDataEvent) {
return nodeInstanceDataEvent;
}

@Override
public boolean documentHasId(ProcessInstanceNodeDataEvent processInstanceDataEvent) {
return processInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(ProcessInstanceNodeDataEvent processInstanceDataEvent) {
return new BsonString(processInstanceDataEvent.getId());
}

@Override
public ProcessInstanceNodeDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceNodeDataEvent nodeInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, nodeInstanceDataEvent);
doc.put("kogitoProcessType", nodeInstanceDataEvent.getKogitoProcessType());
doc.put("kogitoProcessInstanceVersion", nodeInstanceDataEvent.getKogitoProcessInstanceVersion());
doc.put("kogitoParentProcessinstanceId", nodeInstanceDataEvent.getKogitoParentProcessInstanceId());
doc.put("kogitoProcessinstanceState", nodeInstanceDataEvent.getKogitoProcessInstanceState());
doc.put("kogitoReferenceId", nodeInstanceDataEvent.getKogitoReferenceId());
doc.put("kogitoStartFromNode", nodeInstanceDataEvent.getKogitoStartFromNode());
doc.put("data", encodeData(nodeInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceNodeEventBody data) {

Document doc = new Document();
doc.put("processInstanceId", data.getProcessInstanceId());
doc.put("connectionNodeInstanceId", data.getConnectionNodeInstanceId());
doc.put("id", data.getProcessInstanceId());
doc.put("nodeId", data.getNodeDefinitionId());
doc.put("nodeDefinitionId", data.getNodeDefinitionId());
doc.put("nodeName", data.getNodeName());
doc.put("nodeType", data.getNodeType());
doc.put("eventTime", data.getEventDate());
doc.put("eventType", data.getEventType());

if (!data.getData().isEmpty()) {
doc.put("data", new Document(data.getData()));
}

return doc;
}

@Override
public Class<ProcessInstanceNodeDataEvent> getEncoderClass() {
return ProcessInstanceNodeDataEvent.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.kie.kogito.events.mongodb.codec;

import java.util.stream.Collectors;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -26,39 +24,39 @@
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceEventBody;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateEventBody;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceDataEvent> {
public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceStateDataEvent> {

@Override
public ProcessInstanceDataEvent generateIdIfAbsentFromDocument(ProcessInstanceDataEvent processInstanceDataEvent) {
public ProcessInstanceStateDataEvent generateIdIfAbsentFromDocument(ProcessInstanceStateDataEvent processInstanceDataEvent) {
return processInstanceDataEvent;
}

@Override
public boolean documentHasId(ProcessInstanceDataEvent processInstanceDataEvent) {
public boolean documentHasId(ProcessInstanceStateDataEvent processInstanceDataEvent) {
return processInstanceDataEvent.getId() != null;
}

@Override
public BsonValue getDocumentId(ProcessInstanceDataEvent processInstanceDataEvent) {
public BsonValue getDocumentId(ProcessInstanceStateDataEvent processInstanceDataEvent) {
return new BsonString(processInstanceDataEvent.getId());
}

@Override
public ProcessInstanceDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
public ProcessInstanceStateDataEvent decode(BsonReader bsonReader, DecoderContext decoderContext) {
// The events persist in an outbox collection
// The events are deleted immediately (in the same transaction)
// "decode" is not supposed to take place in any scenario
return null;
}

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
public void encode(BsonWriter bsonWriter, ProcessInstanceStateDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, processInstanceDataEvent);
doc.put("kogitoProcessType", processInstanceDataEvent.getKogitoProcessType());
Expand All @@ -72,67 +70,38 @@ public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstan
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceEventBody data) {
private Document encodeData(ProcessInstanceStateEventBody data) {
Document doc = new Document();
doc.put("id", data.getId());
doc.put("version", data.getVersion());
doc.put("id", data.getProcessInstanceId());
doc.put("version", data.getProcessVersion());
doc.put("parentInstanceId", data.getParentInstanceId());
doc.put("rootInstanceId", data.getRootInstanceId());
doc.put("rootInstanceId", data.getRootProcessInstanceId());
doc.put("processId", data.getProcessId());
doc.put("processType", data.getProcessType());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("processName", data.getProcessName());
doc.put("startDate", data.getStartDate());
doc.put("endDate", data.getEndDate());
doc.put("startDate", data.getEventDate());
doc.put("state", data.getState());
doc.put("businessKey", data.getBusinessKey());
doc.put("roles", data.getRoles());
doc.put("identity", data.getIdentity());

if (data.getVariables() != null) {
doc.put("variables", new Document(data.getVariables()));
}

if (data.getNodeInstances() != null) {
doc.put("nodeInstances",
data.getNodeInstances().stream().map(ni -> {
Document niDoc = new Document();
niDoc.put("id", ni.getId());
niDoc.put("nodeId", ni.getNodeId());
niDoc.put("nodeDefinitionId", ni.getNodeDefinitionId());
niDoc.put("nodeName", ni.getNodeName());
niDoc.put("nodeType", ni.getNodeType());
niDoc.put("triggerTime", ni.getTriggerTime());
if (ni.getLeaveTime() != null) {
niDoc.put("leaveTime", ni.getLeaveTime());
}
return niDoc;
}).collect(Collectors.toSet()));
}

if (data.getError() != null) {
Document eDoc = new Document();
eDoc.put("errorMessage", data.getError().getErrorMessage());
eDoc.put("nodeDefinitionId", data.getError().getNodeDefinitionId());
doc.put("error", eDoc);
}

if (data.getMilestones() != null) {
doc.put("milestones",
data.getMilestones().stream().map(m -> {
Document mDoc = new Document();
mDoc.put("id", m.getId());
mDoc.put("name", m.getName());
mDoc.put("status", m.getStatus());
return mDoc;
}).collect(Collectors.toSet()));
}
// if (data.getVariables() != null) {
// doc.put("variables", new Document(data.getVariables()));
// }
//
// if (data.getError() != null) {
// Document eDoc = new Document();
// eDoc.put("errorMessage", data.getError().getErrorMessage());
// eDoc.put("nodeDefinitionId", data.getError().getNodeDefinitionId());
// doc.put("error", eDoc);
// }

return doc;
}

@Override
public Class<ProcessInstanceDataEvent> getEncoderClass() {
return ProcessInstanceDataEvent.class;
public Class<ProcessInstanceStateDataEvent> getEncoderClass() {
return ProcessInstanceStateDataEvent.class;
}
}
Loading

0 comments on commit b50a57b

Please sign in to comment.