Skip to content

Commit

Permalink
[kie-issues-249] Data index improvements (#3241)
Browse files Browse the repository at this point in the history
* [kie-issues-249] Data index improvements

* fix pojo it kafka dirty streams

* fix mongodb debezium

* fix for codecs registry

* fix for mongodb publisher
  • Loading branch information
elguardian authored Oct 18, 2023
1 parent 244e3ee commit 70af322
Show file tree
Hide file tree
Showing 144 changed files with 5,130 additions and 3,475 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package org.kie.kogito.events.mongodb;

import java.util.Collection;
import java.util.function.BooleanSupplier;

import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.events.mongodb.codec.EventMongoDBCodecProvider;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
Expand All @@ -50,7 +48,6 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<VariableInstanceDataEvent> variableInstanceDataEventCollection;

protected abstract MongoClient mongoClient();

Expand All @@ -60,54 +57,47 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

protected abstract boolean userTasksEvents();

protected abstract boolean variablesEvents();

protected abstract String eventsDatabaseName();

protected abstract String processInstancesEventsCollection();

protected abstract String userTasksEventsCollection();

protected abstract String variablesEventsCollection();

protected void configure() {
CodecRegistry registry = CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), fromProviders(new EventMongoDBCodecProvider(),
PojoCodecProvider.builder().automatic(true).build()));
MongoDatabase mongoDatabase = mongoClient().getDatabase(eventsDatabaseName()).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), VariableInstanceDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event, this::processInstancesEvents);
break;
case "UserTaskInstanceEvent":
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event, this::userTasksEvents);
break;
case "VariableInstanceEvent":
publishEvent(variableInstanceDataEventCollection, (VariableInstanceDataEvent) event, this::variablesEvents);
break;
default:
logger.warn("Unknown type of event '{}', ignoring", event.getType());
if (this.processInstancesEvents() && event instanceof ProcessInstanceDataEvent) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent) event);
return;
}

if (this.userTasksEvents() && event instanceof UserTaskInstanceDataEvent) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceDataEvent) event);
return;
}

logger.debug("Unknown type of event '{}', ignoring", event.getType());

}

private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event, BooleanSupplier enabled) {
if (enabled.getAsBoolean()) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}
private <T extends DataEvent<?>> void publishEvent(MongoCollection<T> collection, T event) {
if (transactionManager().enabled()) {
collection.insertOne(transactionManager().getClientSession(), event);
// delete the event immediately from the outbox collection
collection.deleteOne(transactionManager().getClientSession(), Filters.eq(ID, event.getId()));
} else {
collection.insertOne(event);
// delete the event from the outbox collection
collection.deleteOne(Filters.eq(ID, event.getId()));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,22 @@
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistry;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.VariableInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;

public class EventMongoDBCodecProvider implements CodecProvider {

private static final ProcessInstanceDataEventCodec PROCESS_INSTANCE_DATA_EVENT_CODEC = new ProcessInstanceDataEventCodec();
private static final UserTaskInstanceDataEventCodec USER_TASK_INSTANCE_DATA_EVENT_CODEC = new UserTaskInstanceDataEventCodec();
private static final VariableInstanceDataEventCodec VARIABLE_INSTANCE_DATA_EVENT_CODEC = new VariableInstanceDataEventCodec();

@SuppressWarnings("unchecked")
@Override
public <T> Codec<T> get(Class<T> aClass, CodecRegistry codecRegistry) {
if (aClass == ProcessInstanceDataEvent.class) {
if (ProcessInstanceDataEvent.class.isAssignableFrom(aClass)) {
return (Codec<T>) PROCESS_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == UserTaskInstanceDataEvent.class) {
if (UserTaskInstanceDataEvent.class.isAssignableFrom(aClass)) {
return (Codec<T>) USER_TASK_INSTANCE_DATA_EVENT_CODEC;
}
if (aClass == VariableInstanceDataEvent.class) {
return (Codec<T>) VARIABLE_INSTANCE_DATA_EVENT_CODEC;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.events.mongodb.codec;

import java.util.stream.Collectors;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -29,13 +27,19 @@
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceEventBody;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class ProcessInstanceDataEventCodec implements CollectibleCodec<ProcessInstanceDataEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(ProcessInstanceDataEventCodec.class);

@Override
public ProcessInstanceDataEvent generateIdIfAbsentFromDocument(ProcessInstanceDataEvent processInstanceDataEvent) {
return processInstanceDataEvent;
Expand All @@ -61,76 +65,16 @@ public ProcessInstanceDataEvent decode(BsonReader bsonReader, DecoderContext dec

@Override
public void encode(BsonWriter bsonWriter, ProcessInstanceDataEvent processInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, processInstanceDataEvent);
doc.put("kogitoProcessType", processInstanceDataEvent.getKogitoProcessType());
doc.put("kogitoProcessInstanceVersion", processInstanceDataEvent.getKogitoProcessInstanceVersion());
doc.put("kogitoParentProcessinstanceId", processInstanceDataEvent.getKogitoParentProcessInstanceId());
doc.put("kogitoProcessinstanceState", processInstanceDataEvent.getKogitoProcessInstanceState());
doc.put("kogitoReferenceId", processInstanceDataEvent.getKogitoReferenceId());
doc.put("kogitoStartFromNode", processInstanceDataEvent.getKogitoStartFromNode());
doc.put("kogitoIdentity", processInstanceDataEvent.getKogitoIdentity());
doc.put("data", encodeData(processInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(ProcessInstanceEventBody data) {
Document doc = new Document();
doc.put("id", data.getId());
doc.put("version", data.getVersion());
doc.put("parentInstanceId", data.getParentInstanceId());
doc.put("rootInstanceId", data.getRootInstanceId());
doc.put("processId", data.getProcessId());
doc.put("processType", data.getProcessType());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("processName", data.getProcessName());
doc.put("startDate", data.getStartDate());
doc.put("endDate", data.getEndDate());
doc.put("state", data.getState());
doc.put("businessKey", data.getBusinessKey());
doc.put("roles", data.getRoles());
doc.put("identity", data.getIdentity());

if (data.getVariables() != null) {
doc.put("variables", new Document(data.getVariables()));
try {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Document document = Document.parse(mapper.writeValueAsString(processInstanceDataEvent));
document.put(CodecUtils.ID, processInstanceDataEvent.getId());
codec().encode(bsonWriter, document, encoderContext);
} catch (JsonProcessingException e) {
LOGGER.error("Could not process json event", e);
}

if (data.getNodeInstances() != null) {
doc.put("nodeInstances",
data.getNodeInstances().stream().map(ni -> {
Document niDoc = new Document();
niDoc.put("id", ni.getId());
niDoc.put("nodeId", ni.getNodeId());
niDoc.put("nodeDefinitionId", ni.getNodeDefinitionId());
niDoc.put("nodeName", ni.getNodeName());
niDoc.put("nodeType", ni.getNodeType());
niDoc.put("triggerTime", ni.getTriggerTime());
if (ni.getLeaveTime() != null) {
niDoc.put("leaveTime", ni.getLeaveTime());
}
return niDoc;
}).collect(Collectors.toSet()));
}

if (data.getError() != null) {
Document eDoc = new Document();
eDoc.put("errorMessage", data.getError().getErrorMessage());
eDoc.put("nodeDefinitionId", data.getError().getNodeDefinitionId());
doc.put("error", eDoc);
}

if (data.getMilestones() != null) {
doc.put("milestones",
data.getMilestones().stream().map(m -> {
Document mDoc = new Document();
mDoc.put("id", m.getId());
mDoc.put("name", m.getName());
mDoc.put("status", m.getStatus());
return mDoc;
}).collect(Collectors.toSet()));
}

return doc;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.events.mongodb.codec;

import java.util.stream.Collectors;

import org.bson.BsonReader;
import org.bson.BsonString;
import org.bson.BsonValue;
Expand All @@ -28,16 +26,22 @@
import org.bson.codecs.CollectibleCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.kie.kogito.event.process.UserTaskInstanceDataEvent;
import org.kie.kogito.event.process.UserTaskInstanceEventBody;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import static org.kie.kogito.events.mongodb.codec.CodecUtils.codec;
import static org.kie.kogito.events.mongodb.codec.CodecUtils.encodeDataEvent;

public class UserTaskInstanceDataEventCodec implements CollectibleCodec<UserTaskInstanceDataEvent> {

private static final Logger LOGGER = LoggerFactory.getLogger(UserTaskInstanceDataEventCodec.class);

@Override
public UserTaskInstanceDataEvent generateIdIfAbsentFromDocument(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
public UserTaskInstanceDataEvent<?> generateIdIfAbsentFromDocument(UserTaskInstanceDataEvent userTaskInstanceDataEvent) {
return userTaskInstanceDataEvent;
}

Expand All @@ -61,64 +65,16 @@ public UserTaskInstanceDataEvent decode(BsonReader bsonReader, DecoderContext de

@Override
public void encode(BsonWriter bsonWriter, UserTaskInstanceDataEvent userTaskInstanceDataEvent, EncoderContext encoderContext) {
Document doc = new Document();
encodeDataEvent(doc, userTaskInstanceDataEvent);
doc.put("kogitoUserTaskinstanceId", userTaskInstanceDataEvent.getKogitoUserTaskinstanceId());
doc.put("kogitoUserTaskinstanceState", userTaskInstanceDataEvent.getKogitoUserTaskinstanceState());
doc.put("data", encodeData(userTaskInstanceDataEvent.getData()));
codec().encode(bsonWriter, doc, encoderContext);
}

private Document encodeData(UserTaskInstanceEventBody data) {
Document doc = new Document();
doc.put("id", data.getId());
doc.put("taskName", data.getTaskName());
doc.put("taskDescription", data.getTaskDescription());
doc.put("taskPriority", data.getTaskPriority());
doc.put("referenceName", data.getReferenceName());
doc.put("startDate", data.getStartDate());
doc.put("completeDate", data.getCompleteDate());
doc.put("state", data.getState());
doc.put("actualOwner", data.getActualOwner());
doc.put("potentialUsers", data.getPotentialUsers());
doc.put("potentialGroups", data.getPotentialGroups());
doc.put("excludedUsers", data.getExcludedUsers());
doc.put("adminUsers", data.getAdminUsers());
doc.put("adminGroups", data.getAdminGroups());
doc.put("inputs", new Document(data.getInputs()));
doc.put("outputs", new Document(data.getOutputs()));
doc.put("processInstanceId", data.getProcessInstanceId());
doc.put("rootProcessInstanceId", data.getRootProcessInstanceId());
doc.put("processId", data.getProcessId());
doc.put("rootProcessId", data.getRootProcessId());
doc.put("identity", data.getIdentity());

if (data.getComments() != null) {
doc.put("comments",
data.getComments().stream().map(c -> {
Document cDoc = new Document();
cDoc.put("id", c.getId());
cDoc.put("content", c.getContent());
cDoc.put("updatedAt", c.getUpdatedAt());
cDoc.put("updatedBy", c.getUpdatedBy());
return cDoc;
}).collect(Collectors.toSet()));
}

if (data.getAttachments() != null) {
doc.put("attachments",
data.getAttachments().stream().map(a -> {
Document aDoc = new Document();
aDoc.put("id", a.getId());
aDoc.put("content", a.getContent());
aDoc.put("updatedAt", a.getUpdatedAt());
aDoc.put("updatedBy", a.getUpdatedBy());
aDoc.put("name", a.getName());
return aDoc;
}).collect(Collectors.toSet()));
try {
ObjectMapper mapper = new ObjectMapper();
mapper.findAndRegisterModules();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
Document document = Document.parse(mapper.writeValueAsString(userTaskInstanceDataEvent));
document.put(CodecUtils.ID, userTaskInstanceDataEvent.getId());
codec().encode(bsonWriter, document, encoderContext);
} catch (JsonProcessingException e) {
LOGGER.error("Could not process json event", e);
}

return doc;
}

@Override
Expand Down
Loading

0 comments on commit 70af322

Please sign in to comment.