Skip to content

Commit

Permalink
fix 3
Browse files Browse the repository at this point in the history
  • Loading branch information
elguardian committed Sep 12, 2023
1 parent 3c0f241 commit faa8e16
Show file tree
Hide file tree
Showing 21 changed files with 102 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import org.bson.codecs.pojo.PojoCodecProvider;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.events.mongodb.codec.EventMongoDBCodecProvider;
import org.kie.kogito.mongodb.transaction.AbstractTransactionManager;
Expand All @@ -48,10 +47,8 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

static final String ID = "_id";

private MongoCollection<ProcessInstanceStateDataEvent> processInstanceDataEventCollection;
private MongoCollection<ProcessInstanceNodeDataEvent> nodeInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceStateDataEvent> userTaskInstanceDataEventCollection;
private MongoCollection<ProcessInstanceVariableDataEvent> variableInstanceDataEventCollection;
private MongoCollection<ProcessInstanceDataEvent> processInstanceDataEventCollection;
private MongoCollection<UserTaskInstanceDataEvent> userTaskInstanceDataEventCollection;

protected abstract MongoClient mongoClient();

Expand All @@ -61,51 +58,42 @@ public abstract class MongoDBEventPublisher implements EventPublisher {

protected abstract boolean userTasksEvents();

protected abstract boolean variablesEvents();

protected abstract String eventsDatabaseName();

protected abstract String nodeInstanceEventsCollection();

protected abstract String processInstancesEventsCollection();

protected abstract String userTasksEventsCollection();

protected abstract String variablesEventsCollection();

protected void configure() {
CodecRegistry registry = CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), fromProviders(new EventMongoDBCodecProvider(),
PojoCodecProvider.builder().automatic(true).build()));
MongoDatabase mongoDatabase = mongoClient().getDatabase(eventsDatabaseName()).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceStateDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceStateDataEvent.class).withCodecRegistry(registry);
variableInstanceDataEventCollection = mongoDatabase.getCollection(variablesEventsCollection(), ProcessInstanceVariableDataEvent.class).withCodecRegistry(registry);
nodeInstanceDataEventCollection = mongoDatabase.getCollection(nodeInstanceEventsCollection(), ProcessInstanceNodeDataEvent.class).withCodecRegistry(registry);
processInstanceDataEventCollection = mongoDatabase.getCollection(processInstancesEventsCollection(), ProcessInstanceDataEvent.class).withCodecRegistry(registry);
userTaskInstanceDataEventCollection = mongoDatabase.getCollection(userTasksEventsCollection(), UserTaskInstanceDataEvent.class).withCodecRegistry(registry);
}

@Override
public void publish(DataEvent<?> event) {
switch (event.getType()) {
case "ProcessInstanceEvent":
if (this.processInstancesEvents()) {
publishEvent(processInstanceDataEventCollection, (ProcessInstanceStateDataEvent) event);
}
break;
case "NodeInstanceEvent":
case "ProcessInstanceErrorDataEvent":
case "ProcessInstanceNodeDataEvent":
case "ProcessInstanceSLADataEvent":
case "ProcessInstanceStateDataEvent":
case "ProcessInstanceVariableDataEvent":
if (this.processInstancesEvents()) {
publishEvent(nodeInstanceDataEventCollection, (ProcessInstanceNodeDataEvent) event);
publishEvent(processInstanceDataEventCollection, (ProcessInstanceDataEvent<?>) event);
}
break;
case "UserTaskInstanceEvent":
case "UserTaskInstanceAssignmentDataEvent":
case "UserTaskInstanceAttachmentDataEvent":
case "UserTaskInstanceCommentDataEvent":
case "UserTaskInstanceDeadlineDataEvent":
case "UserTaskInstanceStateDataEvent":
case "UserTaskInstanceVariableDataEvent":
if (this.userTasksEvents()) {
publishEvent(userTaskInstanceDataEventCollection, (UserTaskInstanceStateDataEvent) event);
}
break;
case "VariableInstanceEvent":
if (this.variablesEvents()) {
publishEvent(variableInstanceDataEventCollection, (ProcessInstanceVariableDataEvent) event);
}
break;
default:
logger.warn("Unknown type of event '{}', ignoring", event.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

import static org.kie.kogito.events.mongodb.MongoDBEventPublisher.ID;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.eq;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -85,11 +85,6 @@ protected boolean userTasksEvents() {
return true;
}

@Override
protected boolean variablesEvents() {
return true;
}

@Override
protected String eventsDatabaseName() {
return "testDB";
Expand All @@ -105,15 +100,6 @@ protected String userTasksEventsCollection() {
return "testTECollection";
}

@Override
protected String variablesEventsCollection() {
return "testVCollection";
}

@Override
protected String nodeInstanceEventsCollection() {
return "testNICollection";
}
};

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public Map<String, Object> metaData() {
metadata.put(ProcessInstanceEventMetadata.ROOT_PROCESS_INSTANCE_ID_META_DATA, rootProcessInstanceId);
metadata.put(ProcessInstanceEventMetadata.PROCESS_ID_META_DATA, processId);
metadata.put(ProcessInstanceEventMetadata.PROCESS_TYPE_META_DATA, processType);
metadata.put(ProcessInstanceEventMetadata.ROOT_PROCESS_INSTANCE_ID_META_DATA, rootProcessId);
metadata.put(ProcessInstanceEventMetadata.ROOT_PROCESS_ID_META_DATA, rootProcessId);
metadata.put(ProcessInstanceEventMetadata.PROCESS_INSTANCE_STATE_META_DATA, String.valueOf(state));
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public Builder eventType(String eventType) {
return this;
}

public Builder eventUser(String eventUser) {
this.instance.eventUser = eventUser;
return this;
}

public UserTaskInstanceVariableEventBody build() {
return this.instance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import org.kie.kogito.event.EventBatch;
import org.kie.kogito.event.EventManager;
import org.kie.kogito.event.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseEventManager implements EventManager {

private static final Logger logger = LoggerFactory.getLogger(BaseEventManager.class);

private String service;
private Addons addons;
private Set<EventPublisher> publishers = new LinkedHashSet<>();
Expand All @@ -47,6 +51,10 @@ public void publish(EventBatch batch) {
}
Collection<DataEvent<?>> events = batch.events();

for (DataEvent<?> event : events) {
logger.info("publishing {}", event);
}

publishers.forEach(p -> p.publish(events));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ private void handleUserTaskCommentEvent(UserTaskCommentEvent event) {
if (event.getNewComment() != null) {
builder.commentContent(event.getNewComment().getCommentContent())
.commentId(event.getNewComment().getCommentId());
} else if (event.getOldComment() != null) {
builder.commentId(event.getOldComment().getCommentId());
}

UserTaskInstanceCommentEventBody body = builder.build();
Expand All @@ -315,6 +317,8 @@ private void handleUserTaskAttachmentEvent(UserTaskAttachmentEvent event) {
builder.attachmentId(event.getNewAttachment().getAttachmentId())
.attachmentName(event.getNewAttachment().getAttachmentName())
.attachmentURI(event.getNewAttachment().getAttachmentURI());
} else if (event.getOldAttachment() != null) {
builder.attachmentId(event.getOldAttachment().getAttachmentId());
}

UserTaskInstanceAttachmentEventBody body = builder.build();
Expand Down Expand Up @@ -381,7 +385,15 @@ private void handleUserTaskVariableEvent(UserTaskVariableEvent event) {
metadata.putAll(buildProcessMetadata((KogitoWorkflowProcessInstance) event.getProcessInstance()));

UserTaskInstanceVariableEventBody.Builder builder = UserTaskInstanceVariableEventBody.create()
.eventDate(new Date());
.eventDate(new Date())
.eventUser(event.getEventUser())
.userTaskDefinitionId(event.getUserTaskDefinitionId())
.userTaskInstanceId(((HumanTaskWorkItem) event.getWorkItem()).getStringId())
.userTaskName(((HumanTaskWorkItem) event.getWorkItem()).getTaskName())
.variableId(event.getVariableName())
.variableName(event.getVariableName())
.variableValue(event.getNewValue())
.variableType(event.getVariableType().name());

UserTaskInstanceVariableEventBody body = builder.build();
processedEvents.add(new UserTaskInstanceVariableDataEvent(buildSource(event.getProcessInstance().getProcessId()), addons.toString(), event.getEventUser(), metadata, body));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public ProcessInstanceStateDataEvent(String source, String addons, String identi
(String) metaData.get(ProcessInstanceEventMetadata.PARENT_PROCESS_INSTANCE_ID_META_DATA),
(String) metaData.get(ProcessInstanceEventMetadata.ROOT_PROCESS_INSTANCE_ID_META_DATA),
(String) metaData.get(ProcessInstanceEventMetadata.PROCESS_ID_META_DATA),
(String) metaData.get(ProcessInstanceEventMetadata.ROOT_PROCESS_INSTANCE_ID_META_DATA),
(String) metaData.get(ProcessInstanceEventMetadata.ROOT_PROCESS_ID_META_DATA),
(String) metaData.get(ProcessInstanceEventMetadata.PROCESS_INSTANCE_STATE_META_DATA),
addons,
(String) metaData.get(ProcessInstanceEventMetadata.PROCESS_TYPE_META_DATA),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ public void setupParameters(ProcessInstance processInstance, Map<String, Object>
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
if (entry.getValue() != null) {
variableScope.validateVariable(process.getName(), entry.getKey(), entry.getValue());
//Use internalSetVariable in order to avoid publishing variable change events
variableScopeInstance.internalSetVariable(entry.getKey(), entry.getValue());
variableScopeInstance.setVariable(entry.getKey(), entry.getValue());
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,13 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.drools.core.common.InternalKnowledgeRuntime;
import org.jbpm.process.core.context.variable.Variable;
import org.jbpm.process.core.context.variable.VariableScope;
import org.jbpm.process.core.datatype.impl.coverter.CloneHelper;
import org.jbpm.process.instance.ContextInstanceContainer;
import org.jbpm.process.instance.InternalProcessRuntime;
import org.jbpm.process.instance.ProcessInstance;
import org.jbpm.process.instance.context.AbstractContextInstance;
import org.jbpm.workflow.core.Node;
import org.jbpm.workflow.instance.node.CompositeContextNodeInstance;
Expand Down Expand Up @@ -92,42 +88,39 @@ public void setVariable(String name, Object value) {
public void setVariable(KogitoNodeInstance nodeInstance, String name, Object value) {
Objects.requireNonNull(name, "The name of a variable may not be null!");
// check if variable that is being set is readonly and has already been set
VariableScope scope = getVariableScope();
ProcessInstance pi = getProcessInstance();
Object oldValue = getVariable(name);
if (oldValue != null && scope.isReadOnly(name)) {
throw new VariableViolationException(pi.getStringId(), name, "Variable '" + name + "' is already set and is marked as read only");
if (oldValue != null && getVariableScope().isReadOnly(name)) {
throw new VariableViolationException(getProcessInstance().getStringId(), name, "Variable '" + name + "' is already set and is marked as read only");
}
// ignore similar value
if (ignoreChange(oldValue, value)) {
return;
}
List<String> tags = scope.tags(name);
Object newValue = value;
InternalKnowledgeRuntime runtime = pi.getKnowledgeRuntime();

if (runtime != null) {
if (!tags.contains(Variable.INTERNAL_TAG)) {
newValue = CloneHelper.get().clone(value);
}
final Object clonedValue = getProcessInstance().getKnowledgeRuntime() != null ? clone(name, value) : null;
if (clonedValue != null) {
getProcessEventSupport().fireBeforeVariableChanged(
(variableIdPrefix == null ? "" : variableIdPrefix + ":") + name,
(variableInstanceIdPrefix == null ? "" : variableInstanceIdPrefix + ":") + name,
oldValue, newValue, tags, pi,
oldValue, clonedValue, getVariableScope().tags(name), getProcessInstance(),
nodeInstance,
runtime);
getProcessInstance().getKnowledgeRuntime());
}
internalSetVariable(name, value);
if (runtime != null) {
if (clonedValue != null) {
getProcessEventSupport().fireAfterVariableChanged(
(variableIdPrefix == null ? "" : variableIdPrefix + ":") + name,
(variableInstanceIdPrefix == null ? "" : variableInstanceIdPrefix + ":") + name,
oldValue, newValue, tags, pi,
oldValue, clonedValue, getVariableScope().tags(name), getProcessInstance(),
nodeInstance,
runtime);
getProcessInstance().getKnowledgeRuntime());
}
}

private Object clone(String name, Object newValue) {
Variable variable = getVariableScope().findVariable(name);
return variable != null ? variable.getType().clone(newValue) : newValue;
}

private boolean ignoreChange(Object oldValue, Object newValue) {
if (newValue instanceof KogitoObjectListenerAware) {
return Objects.equals(oldValue, newValue) || (oldValue == null && ((KogitoObjectListenerAware) newValue).isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,10 @@ public class QuarkusMongoDBEventPublisher extends MongoDBEventPublisher {
@ConfigProperty(name = "kogito.events.processinstances.collection", defaultValue = "kogitoprocessinstancesevents")
String quarkusProcessInstancesEventsCollectionName;

@Inject
@ConfigProperty(name = "kogito.events.nodeinstances.collection", defaultValue = "kogitonodeinstancesevents")
String quarkusNodeInstancesEventsCollectionName;

@Inject
@ConfigProperty(name = "kogito.events.usertasks.collection", defaultValue = "kogitousertaskinstancesevents")
String quarkusUserTasksEventsCollectionName;

@Inject
@ConfigProperty(name = "kogito.events.variables.collection", defaultValue = "kogitovariablesevents")
String quarkusVariablesEventsCollectionName;

@PostConstruct
public void setupQuarkusMongoDBEventPublisher() {
super.configure();
Expand All @@ -93,11 +85,6 @@ protected boolean userTasksEvents() {
return this.quarkusEnableUserTasksEvents;
}

@Override
protected boolean variablesEvents() {
return this.quarkusEnableVariablesEvents;
}

@Override
protected String eventsDatabaseName() {
return this.quarkusEventsDatabaseName;
Expand All @@ -112,14 +99,4 @@ protected String processInstancesEventsCollection() {
protected String userTasksEventsCollection() {
return this.quarkusUserTasksEventsCollectionName;
}

@Override
protected String variablesEventsCollection() {
return this.quarkusVariablesEventsCollectionName;
}

@Override
protected String nodeInstanceEventsCollection() {
return this.quarkusNodeInstancesEventsCollectionName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ void userTasksEvents() {
assertFalse(publisher.userTasksEvents());
}

@Test
void variablesEvents() {
assertFalse(publisher.variablesEvents());
}

@Test
void eventsDatabaseName() {
assertEquals("testDB", publisher.eventsDatabaseName());
Expand All @@ -92,8 +87,4 @@ void userTasksEventsCollection() {
assertEquals("testUTCollection", publisher.userTasksEventsCollection());
}

@Test
void variablesEventsCollection() {
assertEquals("testVCollection", publisher.variablesEventsCollection());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,23 @@ public void init() {

@Override
public void publish(DataEvent<?> event) {

switch (event.getType()) {
case "ProcessInstanceVariableDataEvent":
case "ProcessInstanceErrorDataEvent":
case "ProcessInstanceNodeDataEvent":
case "ProcessInstanceSLADataEvent":
case "ProcessInstanceStateDataEvent":
case "ProcessInstanceVariableDataEvent":
if (processInstancesEvents.orElse(true)) {
publishToTopic(event, processInstancesEventsEmitter, PI_TOPIC_NAME);
}
break;
case "UserTaskInstanceAssignmentDataEvent":
case "UserTaskInstanceAttachmentDataEvent":
case "UserTaskInstanceCommentDataEvent":
case "UserTaskInstanceDeadlineDataEvent":
case "UserTaskInstanceStateDataEvent":
case "UserTaskInstanceVariableDataEvent":
if (userTasksEvents.orElse(true)) {
publishToTopic(event, userTasksEventsEmitter, UI_TOPIC_NAME);
}
Expand Down
Loading

0 comments on commit faa8e16

Please sign in to comment.