Skip to content

Commit

Permalink
rebase, minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nprentza committed Jul 18, 2023
1 parent 948a448 commit a0af91a
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,47 @@

package org.drools.reliability.core;

import org.drools.core.common.Storage;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class SerializableStoredObject extends BaseStoredObject {

private final Serializable object;
private final Map<String, Long> referencedObjects;

public SerializableStoredObject(Object object, boolean propagated) {
super(propagated);
if (!(object instanceof Serializable)) {
throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName());
}
this.object = (Serializable) object;
referencedObjects=new HashMap<>();
}

public void addReferencedObject(String fieldName, Long refObjectKey){
this.referencedObjects.put(fieldName, refObjectKey);
}

public StoredObject updateReferencedObjects(Storage<Long, StoredObject> storage){
this.referencedObjects.keySet().forEach(fieldName -> {
Optional<Field> refField = Arrays.stream(object.getClass().getDeclaredFields())
.filter(f -> f.getName().equals(fieldName)).findFirst();
if (refField.isPresent()){
refField.get().setAccessible(true);
try {
refField.get().set(this.object, storage.get(this.referencedObjects.get(refField.get().getName())).getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
return this;
}

@Override
Expand All @@ -41,4 +70,4 @@ public String toString() {
", propagated=" + propagated +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package org.drools.reliability.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
Expand All @@ -27,6 +23,13 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;
Expand All @@ -39,7 +42,17 @@ public SimpleSerializationReliableObjectStore() {

public SimpleSerializationReliableObjectStore(Storage<Long, StoredObject> storage) {
super();
this.storage = storage;
this.storage = storage.size()>0 ? updateObjectReferences(storage) : storage;
}

private Storage<Long, StoredObject> updateObjectReferences(Storage<Long, StoredObject> storage){
Storage<Long, StoredObject> updateStorage = storage;

for (Long key: storage.keySet()){
updateStorage.put(key, ((SerializableStoredObject) storage.get(key)).updateReferencedObjects(storage));
}

return updateStorage;
}

@Override
Expand Down Expand Up @@ -110,7 +123,7 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW
public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) {
Object object = handle.getObject();
StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object);
storage.put(getHandleForObject(object).getId(), storedObject);
storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject));
}

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
Expand Down Expand Up @@ -141,4 +154,53 @@ public void safepoint() {
storage.flush();
}
}

private void updateReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {

}
}

private StoredObject setReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {
// for each referenced object in sObject
// lookup in storage, find the object of reference, get its fact handle id
// save this association in the StoredObject
referencedObjects.forEach(field -> {
field.setAccessible(true);
Object fieldObject = null;
try {
fieldObject = field.get(object.getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
Long objectKey = fromObjectToFactHandleId(fieldObject);
if (objectKey!=null){
((SerializableStoredObject) object).addReferencedObject(field.getName(), objectKey);}
});
}
return object;
}

private Long fromObjectToFactHandleId(Object object){
for (Long key : this.storage.keySet()){
if (( (SerializableStoredObject) storage.get(key)).getObject()==object){
return key;
}
}
return null;
}

private List<Field> getReferencedObjects(Object object){
Field[] fields = object.getClass().getDeclaredFields();

List<Field> nonPrimitiveFields = Arrays.stream(fields)
.filter(field -> !field.getType().isPrimitive())
.filter(field -> !field.getType().equals(String.class))
.collect(Collectors.toList());
return nonPrimitiveFields;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.drools.reliability.test;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.kie.api.runtime.rule.FactHandle;
import org.test.domain.fireandalarm.Alarm;
import org.test.domain.fireandalarm.Fire;
import org.test.domain.fireandalarm.Room;
import org.test.domain.fireandalarm.Sprinkler;

import static org.assertj.core.api.Assertions.assertThat;

public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{
private static final String FIRE_AND_ALARM =
"import " + Alarm.class.getCanonicalName() + ";" +
"import " + Fire.class.getCanonicalName() + ";" +
"import " + Sprinkler.class.getCanonicalName() + ";" +
"import " + Room.class.getCanonicalName() + ";" +
"global java.util.List results;" +
"rule 'When there is a fire turn on the sprinkler' when\n" +
" Fire($room : room) \n" +
" $sprinkler: Sprinkler( room == $room, on == false ) \n" +
"then\n" +
" modify($sprinkler) { setOn(true); } \n" +
" System.out.println(\"Turn on the sprinkler for room\" + $room.getName()); \n" +
"end\n" +
"rule 'Raise the alarm when we have one or more firs' when\n" +
" exists Fire() \n" +
"then\n" +
" insert( new Alarm() );\n" +
" System.out.println(\"Raise the alarm\");\n" +
"end\n"+
"rule 'Cancel the alarm when all the fires have gone' when \n" +
" not Fire() \n" +
" $alarm : Alarm() \n" +
"then\n" +
" delete ( $alarm ); \n" +
" System.out.println(\"Cancel the alarm\"); \n" +
"end\n" +
"rule 'Status output when things are ok' when\n" +
" not Alarm() \n" +
" not Sprinkler ( on == true ) \n" +
"then \n" +
" System.out.println(\"Everything is ok\"); \n" +
"end";

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testNoFailover(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy);

// phase 1
Room room1 = new Room("Room 1");
insert(room1);
FactHandle fireFact1 = insert(new Fire(room1));
fireAllRules();

// phase 2
Sprinkler sprinkler1 = new Sprinkler(room1);
insert(sprinkler1);
fireAllRules();

assertThat(sprinkler1.isOn()).isTrue();

// phase 3
delete(fireFact1);
fireAllRules();
}

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testInsertFailover_ShouldFireRules(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy);

Room room1 = new Room("Room 1");
insert(room1);
insert(new Fire(room1));
insert(new Sprinkler(room1));

failover();
restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy);

assertThat(fireAllRules()).isEqualTo(2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.test.domain.fireandalarm;

import java.io.Serializable;

public class Alarm implements Serializable {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.test.domain.fireandalarm;

import java.io.Serializable;

public class Fire implements Serializable {
private Room room;

public Fire() { }

public Fire(Room room) {
this.room = room;
}

public Room getRoom() {
return room;
}

public void setRoom(Room room) {
this.room = room;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.test.domain.fireandalarm;

import java.io.Serializable;

public class Room implements Serializable {
private String name;

public Room() { }

public Room(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.test.domain.fireandalarm;

import java.io.Serializable;

public class Sprinkler implements Serializable {
private Room room;
private boolean on = false;

public Sprinkler() { }

public Sprinkler(Room room) {
this.room = room;
}

public Room getRoom() {
return room;
}

public void setRoom(Room room) {
this.room = room;
}

public boolean isOn() {
return on;
}

public void setOn(boolean on) {
this.on = on;
}

@Override
public String toString() {
return "Sprinkler for " + room;
}
}

0 comments on commit a0af91a

Please sign in to comment.