Skip to content

Commit

Permalink
structured data persistency, a candidate solution
Browse files Browse the repository at this point in the history
  • Loading branch information
nprentza committed Jul 14, 2023
1 parent c779976 commit f6200f1
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,48 @@

package org.drools.reliability.core;

import org.drools.core.common.Storage;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SerializableStoredObject extends BaseStoredObject {

private final Serializable object;
private final Map<String, Long> referencedObjects;

public SerializableStoredObject(Object object, boolean propagated) {
super(propagated);
if (!(object instanceof Serializable)) {
throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName());
}
this.object = (Serializable) object;
referencedObjects=new HashMap<>();
}

public void addReferencedObject(String fieldName, Long refObjectKey){
this.referencedObjects.put(fieldName, refObjectKey);
}

public StoredObject updateReferencedObjects(Storage<Long, StoredObject> storage){
this.referencedObjects.keySet().forEach(fieldName -> {
List<Field> refField = Arrays.stream(object.getClass().getDeclaredFields())
.filter(f -> f.getName().equals(fieldName)).collect(Collectors.toList());
if (refField.size()==1){
refField.get(0).setAccessible(true);
try {
refField.get(0).set(this.object, storage.get(this.referencedObjects.get(refField.get(0).getName())).getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
});
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package org.drools.reliability.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
Expand All @@ -27,6 +23,13 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;
Expand All @@ -39,7 +42,17 @@ public SimpleSerializationReliableObjectStore() {

public SimpleSerializationReliableObjectStore(Storage<Long, StoredObject> storage) {
super();
this.storage = storage;
this.storage = storage.size()>0 ? updateObjectReferences(storage) : storage;
}

private Storage<Long, StoredObject> updateObjectReferences(Storage<Long, StoredObject> storage){
Storage<Long, StoredObject> updateStorage = storage;

for (Long key: storage.keySet()){
updateStorage.put(key, ((SerializableStoredObject) storage.get(key)).updateReferencedObjects(storage));
}

return updateStorage;
}

@Override
Expand Down Expand Up @@ -110,7 +123,7 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW
public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) {
Object object = handle.getObject();
StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object);
storage.put(getHandleForObject(object).getId(), storedObject);
storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject));
}

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
Expand Down Expand Up @@ -141,4 +154,52 @@ public void safepoint() {
storage.flush();
}
}

private void updateReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {

}
}

private StoredObject setReferencedObjects(StoredObject object){
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (referencedObjects.size()>0) {
// for each referenced object in sObject
// lookup in storage, find the object of reference, get its fact handle id
// save this association in the StoredObject
referencedObjects.forEach(field -> {
field.setAccessible(true);
Object fieldObject = null;
try {
fieldObject = field.get(object.getObject());
} catch (IllegalAccessException e) {
e.printStackTrace();
}
Long objectKey = fromObjectToFactHandleId(fieldObject);
if (objectKey!=null){
((SerializableStoredObject) object).addReferencedObject(field.getName(), objectKey);}
});
}
return object;
}

private Long fromObjectToFactHandleId(Object object){
for (Long key : this.storage.keySet()){
if (( (SerializableStoredObject) storage.get(key)).getObject().hashCode()==object.hashCode()){
return key;
}
}
return null;
}

private List<Field> getReferencedObjects(Object object){
Field[] fields = object.getClass().getDeclaredFields();

List<Field> nonPrimitiveFields = Arrays.stream(fields)
.filter(field -> !field.getType().isPrimitive())
.filter(field -> !field.getType().equals(String.class))
.collect(Collectors.toList());
return nonPrimitiveFields;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.drools.reliability.infinispan;

import org.drools.core.ClassObjectFilter;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.kie.api.runtime.conf.PersistedSessionOption;
Expand All @@ -10,9 +9,6 @@
import org.test.domain.fireandalarm.Room;
import org.test.domain.fireandalarm.Sprinkler;

import java.util.Optional;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{
Expand All @@ -26,7 +22,7 @@ public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{
" Fire($room : room) \n" +
" $sprinkler: Sprinkler( room == $room, on == false ) \n" +
"then\n" +
" modify($sprinkler) { setOn(true) }; \n" +
" modify($sprinkler) { setOn(true); } \n" +
" System.out.println(\"Turn on the sprinkler for room\" + $room.getName()); \n" +
"end\n" +
"rule 'Raise the alarm when we have one or more firs' when\n" +
Expand Down Expand Up @@ -74,35 +70,17 @@ void testNoFailover(PersistedSessionOption.PersistenceStrategy persistenceStrate

@ParameterizedTest
@MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints")
void testPhase1FailoverPhase2FailoverPhase3(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
void testInsertFailover_ShouldFireRules(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){
createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy);

// phase 1
Room room1 = new Room("Room 1");
insert(room1);
FactHandle fireFact1 = insert(new Fire(room1));
fireAllRules();

//failover();
//restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy);

// phase 2
Sprinkler sprinkler1 = new Sprinkler(room1);
FactHandle sprinklerFact2 = insert(sprinkler1);
fireAllRules();

sprinkler1 = (Sprinkler) sessions.get(0).getObjects(new ClassObjectFilter(Sprinkler.class)).stream().collect(Collectors.toList()).get(0);

assertThat(sprinkler1.isOn()).isTrue();
insert(new Fire(room1));
insert(new Sprinkler(room1));

failover();
restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy);

// phase 3
Optional<FactHandle> fhToDelete = getFactHandle(room1);
if (!fhToDelete.isEmpty()){
delete(fhToDelete.get());
}
fireAllRules();
assertThat(fireAllRules()).isEqualTo(2);
}
}

0 comments on commit f6200f1

Please sign in to comment.