Skip to content

Commit

Permalink
PersistenceObjectStrategy option, basic test 'insert-failover-shouldF…
Browse files Browse the repository at this point in the history
…ireRules'
  • Loading branch information
nprentza committed Jul 31, 2023
1 parent c494d58 commit dbef320
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public ReliableNamedEntryPoint(EntryPointId entryPoint, EntryPointNode entryPoin
protected ObjectStore createObjectStore(EntryPointId entryPoint, RuleBaseConfiguration conf, ReteEvaluator reteEvaluator) {
boolean storesOnlyStrategy = reteEvaluator.getSessionConfiguration().getPersistedSessionOption().getPersistenceStrategy() == PersistedSessionOption.PersistenceStrategy.STORES_ONLY;
return storesOnlyStrategy ?
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId())) :
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()),
reteEvaluator.getSessionConfiguration().getPersistedSessionOption()) :
new FullReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class SerializableStoredObject extends BaseStoredObject {

private final Serializable object;
protected final Serializable object;

public SerializableStoredObject(Object object, boolean propagated) {
super(propagated);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.drools.reliability.core;

import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class SerializableStoredRefObject extends SerializableStoredObject {

private final Map<String, Long> referencedObjects;

public SerializableStoredRefObject(Object object, boolean propagated) {
super(object, propagated);
referencedObjects=new HashMap<>();
}

public void addReferencedObject(String fieldName, Long refObjectKey){
this.referencedObjects.put(fieldName, refObjectKey);
}

public StoredObject updateReferencedObjects(Storage<Long, StoredObject> storage){
this.referencedObjects.keySet().forEach(fieldName -> {
Optional<Field> refField = Arrays.stream(object.getClass().getDeclaredFields())
.filter(f -> f.getName().equals(fieldName)).findFirst();
if (refField.isPresent()){
refField.get().setAccessible(true);
try {
refField.get().set(this.object, storage.get(this.referencedObjects.get(refField.get().getName())).getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import org.drools.core.common.Storage;
import org.kie.api.internal.utils.KieService;
import org.kie.api.runtime.conf.PersistedSessionOption;

public interface SimpleReliableObjectStoreFactory extends KieService {

SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage);
SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption);

class Holder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package org.drools.reliability.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
Expand All @@ -27,9 +23,13 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;
protected transient Storage<Long, StoredObject> storage;

protected boolean reInitPropagated = false;

Expand Down Expand Up @@ -113,7 +113,7 @@ public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagate
storage.put(getHandleForObject(object).getId(), storedObject);
}

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
return handle.isEvent() ?
createStoredEvent(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration()) :
createStoredObject(propagated, object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.drools.reliability.core;

import org.drools.core.common.Storage;
import org.kie.api.runtime.conf.PersistedSessionOption;

public class SimpleSerializationReliableObjectStoreFactory implements SimpleReliableObjectStoreFactory {

Expand All @@ -25,6 +26,14 @@ public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, S
return new SimpleSerializationReliableObjectStore(storage);
}

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (persistedSessionOption.getPersistenceObjectsStrategy() == PersistedSessionOption.PersistenceObjectsStrategy.SIMPLE) {
return new SimpleSerializationReliableObjectStore(storage);
}else {
return new SimpleSerializationReliableRefObjectStore(storage);
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.drools.reliability.core;

import org.drools.core.common.InternalFactHandle;
import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class SimpleSerializationReliableRefObjectStore extends SimpleSerializationReliableObjectStore {

public SimpleSerializationReliableRefObjectStore(Storage<Long, StoredObject> storage) {
super(storage);
this.storage = storage.size()>0 ? updateObjectReferences(storage) : storage;
}

private Storage<Long, StoredObject> updateObjectReferences(Storage<Long, StoredObject> storage){
Storage<Long, StoredObject> updateStorage = storage;

for (Long key: storage.keySet()){
updateStorage.put(key, ((SerializableStoredRefObject) storage.get(key)).updateReferencedObjects(storage));
}
return updateStorage;
}

@Override
public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) {
Object object = handle.getObject();
StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object);
storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject));
}

@Override
protected StoredObject createStoredObject(boolean propagated, Object object) {
return new SerializableStoredRefObject(object, propagated);
}

private void updateReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {

}
}

private StoredObject setReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {
// for each referenced object in sObject
// lookup in storage, find the object of reference, get its fact handle id
// save this association in the StoredObject
referencedObjects.forEach(field -> {
field.setAccessible(true);
Object fieldObject = null;
try {
fieldObject = field.get(object.getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
Long objectKey = fromObjectToFactHandleId(fieldObject);
if (objectKey!=null){
((SerializableStoredRefObject) object).addReferencedObject(field.getName(), objectKey);}
});
}
return object;
}

private Long fromObjectToFactHandleId(Object object){
for (Long key : this.storage.keySet()){
if (( (SerializableStoredRefObject) storage.get(key)).getObject()==object){
return key;
}
}
return null;
}

private List<Field> getReferencedObjects(Object object){
Field[] fields = object.getClass().getDeclaredFields();

List<Field> nonPrimitiveFields = Arrays.stream(fields)
.filter(field -> !field.getType().isPrimitive())
.filter(field -> !field.getType().equals(String.class))
.collect(Collectors.toList());
return nonPrimitiveFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.drools.reliability.core.SimpleReliableObjectStore;
import org.drools.reliability.core.SimpleReliableObjectStoreFactory;
import org.drools.reliability.core.SimpleSerializationReliableObjectStore;
import org.drools.reliability.core.SimpleSerializationReliableRefObjectStore;
import org.drools.reliability.core.StorageManagerFactory;
import org.drools.reliability.core.StoredObject;
import org.drools.reliability.infinispan.proto.SimpleProtoStreamReliableObjectStore;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,18 +33,24 @@ public class SimpleInfinispanReliableObjectStoreFactory implements SimpleReliabl

private static final Logger LOG = LoggerFactory.getLogger(SimpleInfinispanReliableObjectStoreFactory.class);

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage) {
public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (((InfinispanStorageManager)StorageManagerFactory.get().getStorageManager()).isProtoStream()) {
LOG.debug("Using SimpleProtoStreamReliableObjectStore");
return new SimpleProtoStreamReliableObjectStore(storage);
} else {
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
if (persistedSessionOption.getPersistenceObjectsStrategy()== PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES){
LOG.debug("Using SimpleSerializationReliableRefObjectStore");
return new SimpleSerializationReliableRefObjectStore(storage);
}else{
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
}
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.drools.reliability.test;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.kie.api.runtime.rule.FactHandle;
import org.test.domain.fireandalarm.Alarm;
import org.test.domain.fireandalarm.Fire;
import org.test.domain.fireandalarm.Room;
import org.test.domain.fireandalarm.Sprinkler;

import static org.assertj.core.api.Assertions.assertThat;

public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{
private static final String FIRE_AND_ALARM =
"import " + Alarm.class.getCanonicalName() + ";" +
"import " + Fire.class.getCanonicalName() + ";" +
"import " + Sprinkler.class.getCanonicalName() + ";" +
"import " + Room.class.getCanonicalName() + ";" +
"global java.util.List results;" +
"rule 'When there is a fire turn on the sprinkler' when\n" +
" Fire($room : room) \n" +
" $sprinkler: Sprinkler( room == $room, on == false ) \n" +
"then\n" +
" modify($sprinkler) { setOn(true); } \n" +
" System.out.println(\"Turn on the sprinkler for room\" + $room.getName()); \n" +
"end\n" +
"rule 'Raise the alarm when we have one or more firs' when\n" +
" exists Fire() \n" +
"then\n" +
" insert( new Alarm() );\n" +
" System.out.println(\"Raise the alarm\");\n" +
"end\n"+
"rule 'Cancel the alarm when all the fires have gone' when \n" +
" not Fire() \n" +
" $alarm : Alarm() \n" +
"then\n" +
" delete ( $alarm ); \n" +
" System.out.println(\"Cancel the alarm\"); \n" +
"end\n" +
"rule 'Status output when things are ok' when\n" +
" not Alarm() \n" +
" not Sprinkler ( on == true ) \n" +
"then \n" +
" System.out.println(\"Everything is ok\"); \n" +
"end";


@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testNoFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy);

// phase 1
Room room1 = new Room("Room 1");
insert(room1);
FactHandle fireFact1 = insert(new Fire(room1));
fireAllRules();

// phase 2
Sprinkler sprinkler1 = new Sprinkler(room1);
insert(sprinkler1);
fireAllRules();

assertThat(sprinkler1.isOn()).isTrue();

// phase 3
delete(fireFact1);
fireAllRules();
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testInsertFailover_ShouldFireRules(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy, PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES);

Room room1 = new Room("Room 1");
insert(room1);
insert(new Fire(room1));
insert(new Sprinkler(room1));

failover();
restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy,PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES);

assertThat(fireAllRules()).isEqualTo(2);
}
}
Loading

0 comments on commit dbef320

Please sign in to comment.