Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PersistenceObjectStrategy option #5418

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public ReliableNamedEntryPoint(EntryPointId entryPoint, EntryPointNode entryPoin
protected ObjectStore createObjectStore(EntryPointId entryPoint, RuleBaseConfiguration conf, ReteEvaluator reteEvaluator) {
boolean storesOnlyStrategy = reteEvaluator.getSessionConfiguration().getPersistedSessionOption().getPersistenceStrategy() == PersistedSessionOption.PersistenceStrategy.STORES_ONLY;
return storesOnlyStrategy ?
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId())) :
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()),
reteEvaluator.getSessionConfiguration().getPersistedSessionOption()) :
new FullReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class SerializableStoredObject extends BaseStoredObject {

private final Serializable object;
protected final Serializable object;

public SerializableStoredObject(Object object, boolean propagated) {
super(propagated);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.drools.reliability.core;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Copyright header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @tkobayas, copyright header added.


import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class SerializableStoredRefObject extends SerializableStoredObject {

private final Map<String, Long> referencedObjects;

public SerializableStoredRefObject(Object object, boolean propagated) {
super(object, propagated);
referencedObjects=new HashMap<>();
}

public void addReferencedObject(String fieldName, Long refObjectKey){
this.referencedObjects.put(fieldName, refObjectKey);
}

public StoredObject updateReferencedObjects(Storage<Long, StoredObject> storage){
this.referencedObjects.keySet().forEach(fieldName -> {
Optional<Field> refField = Arrays.stream(object.getClass().getDeclaredFields())
.filter(f -> f.getName().equals(fieldName)).findFirst();
if (refField.isPresent()){
refField.get().setAccessible(true);
try {
refField.get().set(this.object, storage.get(this.referencedObjects.get(refField.get().getName())).getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import org.drools.core.common.Storage;
import org.kie.api.internal.utils.KieService;
import org.kie.api.runtime.conf.PersistedSessionOption;

public interface SimpleReliableObjectStoreFactory extends KieService {

SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage);
SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption);

class Holder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package org.drools.reliability.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
Expand All @@ -27,9 +23,13 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;
protected transient Storage<Long, StoredObject> storage;

protected boolean reInitPropagated = false;

Expand Down Expand Up @@ -113,7 +113,7 @@ public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagate
storage.put(getHandleForObject(object).getId(), storedObject);
}

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably protected is more suitable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, thank you.

return handle.isEvent() ?
createStoredEvent(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration()) :
createStoredObject(propagated, object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.drools.reliability.core;

import org.drools.core.common.Storage;
import org.kie.api.runtime.conf.PersistedSessionOption;

public class SimpleSerializationReliableObjectStoreFactory implements SimpleReliableObjectStoreFactory {

Expand All @@ -25,6 +26,14 @@ public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, S
return new SimpleSerializationReliableObjectStore(storage);
}

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (persistedSessionOption.getPersistenceObjectsStrategy() == PersistedSessionOption.PersistenceObjectsStrategy.SIMPLE) {
return new SimpleSerializationReliableObjectStore(storage);
}else {
return new SimpleSerializationReliableRefObjectStore(storage);
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.drools.reliability.core;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Copyright header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @tkobayas, copyright header added.


import org.drools.core.common.InternalFactHandle;
import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class SimpleSerializationReliableRefObjectStore extends SimpleSerializationReliableObjectStore {

public SimpleSerializationReliableRefObjectStore(Storage<Long, StoredObject> storage) {
super(storage);
this.storage = storage.size()>0 ? updateObjectReferences(storage) : storage;
}

private Storage<Long, StoredObject> updateObjectReferences(Storage<Long, StoredObject> storage){
Storage<Long, StoredObject> updateStorage = storage;

for (Long key: storage.keySet()){
updateStorage.put(key, ((SerializableStoredRefObject) storage.get(key)).updateReferencedObjects(storage));
}
return updateStorage;
}

@Override
public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) {
Object object = handle.getObject();
StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object);
storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject));
}

@Override
protected StoredObject createStoredObject(boolean propagated, Object object) {
return new SerializableStoredRefObject(object, propagated);
}

private void updateReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {

}
}
Copy link
Contributor

@tkobayas tkobayas Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not used, so should be removed? Please check SonarCloud suggestions in this PR (sometimes there are unacceptable suggestions, though). Also you can add SonarLint plugin in your local IDE to notice such suggestions earlier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I noticed that is not used, it seems that its implementation is not complete that's why I left it there to see what else is missing. However, I can remove it and add it back if needed.
Thank you for the suggestion for the plugin, I didn't have this in mind:)


private StoredObject setReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {
// for each referenced object in sObject
// lookup in storage, find the object of reference, get its fact handle id
// save this association in the StoredObject
referencedObjects.forEach(field -> {
field.setAccessible(true);
Object fieldObject = null;
try {
fieldObject = field.get(object.getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repropagate this exception as a runtime one instead of simply printing the stack trace and swallow it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @mariofusco, I will change this to throw a ReliabilityRuntimeException

}
Long objectKey = fromObjectToFactHandleId(fieldObject);
if (objectKey!=null){
((SerializableStoredRefObject) object).addReferencedObject(field.getName(), objectKey);}
});
}
return object;
}

private Long fromObjectToFactHandleId(Object object){
for (Long key : this.storage.keySet()){
if (( (SerializableStoredRefObject) storage.get(key)).getObject()==object){
return key;
}
}
return null;
}

private List<Field> getReferencedObjects(Object object){
Field[] fields = object.getClass().getDeclaredFields();

List<Field> nonPrimitiveFields = Arrays.stream(fields)
.filter(field -> !field.getType().isPrimitive())
.filter(field -> !field.getType().equals(String.class))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of filtering on the type of the field probably here it should keep only the fields having a value that is also an object inserted in the session, or in other words an object that is also present in the objects store (not sure how easy would be to check this though).

Copy link
Contributor Author

@nprentza nprentza Aug 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good idea, thanks @mariofusco
I introduced a uniqueObjectTypesInStore object to maintain the classes of objects inserted in the store. Every time an object is inserted or removed from the persisted store,uniqueObjectTypesInStore is updated accordingly. Then, the filtering of the field types is done by looking into the list with the unique object types in the store.

.collect(Collectors.toList());
return nonPrimitiveFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.drools.reliability.core.SimpleReliableObjectStore;
import org.drools.reliability.core.SimpleReliableObjectStoreFactory;
import org.drools.reliability.core.SimpleSerializationReliableObjectStore;
import org.drools.reliability.core.SimpleSerializationReliableRefObjectStore;
import org.drools.reliability.core.StorageManagerFactory;
import org.drools.reliability.core.StoredObject;
import org.drools.reliability.infinispan.proto.SimpleProtoStreamReliableObjectStore;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,18 +33,24 @@ public class SimpleInfinispanReliableObjectStoreFactory implements SimpleReliabl

private static final Logger LOG = LoggerFactory.getLogger(SimpleInfinispanReliableObjectStoreFactory.class);

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage) {
public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (((InfinispanStorageManager)StorageManagerFactory.get().getStorageManager()).isProtoStream()) {
LOG.debug("Using SimpleProtoStreamReliableObjectStore");
return new SimpleProtoStreamReliableObjectStore(storage);
} else {
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
if (persistedSessionOption.getPersistenceObjectsStrategy()== PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES){
LOG.debug("Using SimpleSerializationReliableRefObjectStore");
return new SimpleSerializationReliableRefObjectStore(storage);
}else{
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
}
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.drools.reliability.test;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Copyright header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @tkobayas, copyright header added.


import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.kie.api.runtime.rule.FactHandle;
import org.test.domain.fireandalarm.Alarm;
import org.test.domain.fireandalarm.Fire;
import org.test.domain.fireandalarm.Room;
import org.test.domain.fireandalarm.Sprinkler;

import static org.assertj.core.api.Assertions.assertThat;

public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{
private static final String FIRE_AND_ALARM =
"import " + Alarm.class.getCanonicalName() + ";" +
"import " + Fire.class.getCanonicalName() + ";" +
"import " + Sprinkler.class.getCanonicalName() + ";" +
"import " + Room.class.getCanonicalName() + ";" +
"global java.util.List results;" +
"rule 'When there is a fire turn on the sprinkler' when\n" +
" Fire($room : room) \n" +
" $sprinkler: Sprinkler( room == $room, on == false ) \n" +
"then\n" +
" modify($sprinkler) { setOn(true); } \n" +
" System.out.println(\"Turn on the sprinkler for room\" + $room.getName()); \n" +
"end\n" +
"rule 'Raise the alarm when we have one or more firs' when\n" +
" exists Fire() \n" +
"then\n" +
" insert( new Alarm() );\n" +
" System.out.println(\"Raise the alarm\");\n" +
"end\n"+
"rule 'Cancel the alarm when all the fires have gone' when \n" +
" not Fire() \n" +
" $alarm : Alarm() \n" +
"then\n" +
" delete ( $alarm ); \n" +
" System.out.println(\"Cancel the alarm\"); \n" +
"end\n" +
"rule 'Status output when things are ok' when\n" +
" not Alarm() \n" +
" not Sprinkler ( on == true ) \n" +
"then \n" +
" System.out.println(\"Everything is ok\"); \n" +
"end";


@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testNoFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy);

// phase 1
Room room1 = new Room("Room 1");
insert(room1);
FactHandle fireFact1 = insert(new Fire(room1));
fireAllRules();

// phase 2
Sprinkler sprinkler1 = new Sprinkler(room1);
insert(sprinkler1);
fireAllRules();

assertThat(sprinkler1.isOn()).isTrue();

// phase 3
delete(fireFact1);
fireAllRules();
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testInsertFailover_ShouldFireRules(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy, PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES);

Room room1 = new Room("Room 1");
insert(room1);
insert(new Fire(room1));
insert(new Sprinkler(room1));

failover();
restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy,PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES);

assertThat(fireAllRules()).isEqualTo(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asserting only fire count is not good enough. Possible additional assertions:

  1. Use global results to collect fired rule names and assert
  2. Assert expected object state. in this case, Sprinkler.on is true. Alarm exists in the working memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, this is a very basic test, I will improve it and add more tests as needed.

}
}
Loading
Loading