From f6200f17c8baab3fbec640727c9b06c200bc0be5 Mon Sep 17 00:00:00 2001 From: nprentza Date: Fri, 14 Jul 2023 20:44:20 +0300 Subject: [PATCH] structured data persistency, a candidate solution --- .../core/SerializableStoredObject.java | 30 ++++++++ ...impleSerializationReliableObjectStore.java | 73 +++++++++++++++++-- .../ReliabilityFireAndAlarmTest.java | 32 ++------ 3 files changed, 102 insertions(+), 33 deletions(-) diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SerializableStoredObject.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SerializableStoredObject.java index 9495d5720a36..aa599afa1489 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SerializableStoredObject.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SerializableStoredObject.java @@ -15,11 +15,20 @@ package org.drools.reliability.core; +import org.drools.core.common.Storage; + import java.io.Serializable; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; public class SerializableStoredObject extends BaseStoredObject { private final Serializable object; + private final Map referencedObjects; public SerializableStoredObject(Object object, boolean propagated) { super(propagated); @@ -27,6 +36,27 @@ public SerializableStoredObject(Object object, boolean propagated) { throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName()); } this.object = (Serializable) object; + referencedObjects=new HashMap<>(); + } + + public void addReferencedObject(String fieldName, Long refObjectKey){ + this.referencedObjects.put(fieldName, refObjectKey); + } + + public StoredObject updateReferencedObjects(Storage storage){ + this.referencedObjects.keySet().forEach(fieldName -> { + List refField = Arrays.stream(object.getClass().getDeclaredFields()) + .filter(f -> f.getName().equals(fieldName)).collect(Collectors.toList()); + if (refField.size()==1){ + refField.get(0).setAccessible(true); + try { + refField.get(0).set(this.object, storage.get(this.referencedObjects.get(refField.get(0).getName())).getObject()); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + } + }); + return this; } @Override diff --git a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java index 24acb1c5e6db..fd87bb3fe16b 100644 --- a/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java +++ b/drools-reliability/drools-reliability-core/src/main/java/org/drools/reliability/core/SimpleSerializationReliableObjectStore.java @@ -15,10 +15,6 @@ package org.drools.reliability.core; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.drools.core.ClockType; import org.drools.core.common.DefaultEventHandle; import org.drools.core.common.IdentityObjectStore; @@ -27,6 +23,13 @@ import org.drools.core.common.InternalWorkingMemoryEntryPoint; import org.drools.core.common.Storage; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore { protected final transient Storage storage; @@ -39,7 +42,17 @@ public SimpleSerializationReliableObjectStore() { public SimpleSerializationReliableObjectStore(Storage storage) { super(); - this.storage = storage; + this.storage = storage.size()>0 ? updateObjectReferences(storage) : storage; + } + + private Storage updateObjectReferences(Storage storage){ + Storage updateStorage = storage; + + for (Long key: storage.keySet()){ + updateStorage.put(key, ((SerializableStoredObject) storage.get(key)).updateReferencedObjects(storage)); + } + + return updateStorage; } @Override @@ -110,7 +123,7 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) { Object object = handle.getObject(); StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object); - storage.put(getHandleForObject(object).getId(), storedObject); + storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject)); } private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) { @@ -141,4 +154,52 @@ public void safepoint() { storage.flush(); } } + + private void updateReferencedObjects(StoredObject object){ + List referencedObjects = getReferencedObjects(object.getObject()); + if (referencedObjects.size()>0) { + + } + } + + private StoredObject setReferencedObjects(StoredObject object){ + List referencedObjects = getReferencedObjects(object.getObject()); + if (referencedObjects.size()>0) { + // for each referenced object in sObject + // lookup in storage, find the object of reference, get its fact handle id + // save this association in the StoredObject + referencedObjects.forEach(field -> { + field.setAccessible(true); + Object fieldObject = null; + try { + fieldObject = field.get(object.getObject()); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + Long objectKey = fromObjectToFactHandleId(fieldObject); + if (objectKey!=null){ + ((SerializableStoredObject) object).addReferencedObject(field.getName(), objectKey);} + }); + } + return object; + } + + private Long fromObjectToFactHandleId(Object object){ + for (Long key : this.storage.keySet()){ + if (( (SerializableStoredObject) storage.get(key)).getObject().hashCode()==object.hashCode()){ + return key; + } + } + return null; + } + + private List getReferencedObjects(Object object){ + Field[] fields = object.getClass().getDeclaredFields(); + + List nonPrimitiveFields = Arrays.stream(fields) + .filter(field -> !field.getType().isPrimitive()) + .filter(field -> !field.getType().equals(String.class)) + .collect(Collectors.toList()); + return nonPrimitiveFields; + } } diff --git a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityFireAndAlarmTest.java b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityFireAndAlarmTest.java index 47bdb500efe2..162ee1aba253 100644 --- a/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityFireAndAlarmTest.java +++ b/drools-reliability/drools-reliability-infinispan/src/test/java/org/drools/reliability/infinispan/ReliabilityFireAndAlarmTest.java @@ -1,6 +1,5 @@ package org.drools.reliability.infinispan; -import org.drools.core.ClassObjectFilter; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.kie.api.runtime.conf.PersistedSessionOption; @@ -10,9 +9,6 @@ import org.test.domain.fireandalarm.Room; import org.test.domain.fireandalarm.Sprinkler; -import java.util.Optional; -import java.util.stream.Collectors; - import static org.assertj.core.api.Assertions.assertThat; public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{ @@ -26,7 +22,7 @@ public class ReliabilityFireAndAlarmTest extends ReliabilityTestBasics{ " Fire($room : room) \n" + " $sprinkler: Sprinkler( room == $room, on == false ) \n" + "then\n" + - " modify($sprinkler) { setOn(true) }; \n" + + " modify($sprinkler) { setOn(true); } \n" + " System.out.println(\"Turn on the sprinkler for room\" + $room.getName()); \n" + "end\n" + "rule 'Raise the alarm when we have one or more firs' when\n" + @@ -74,35 +70,17 @@ void testNoFailover(PersistedSessionOption.PersistenceStrategy persistenceStrate @ParameterizedTest @MethodSource("strategyProviderStoresOnlyWithExplicitSafepoints") - void testPhase1FailoverPhase2FailoverPhase3(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){ + void testInsertFailover_ShouldFireRules(PersistedSessionOption.PersistenceStrategy persistenceStrategy, PersistedSessionOption.SafepointStrategy safepointStrategy){ createSession(FIRE_AND_ALARM, persistenceStrategy, safepointStrategy); - // phase 1 Room room1 = new Room("Room 1"); insert(room1); - FactHandle fireFact1 = insert(new Fire(room1)); - fireAllRules(); - - //failover(); - //restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy); - - // phase 2 - Sprinkler sprinkler1 = new Sprinkler(room1); - FactHandle sprinklerFact2 = insert(sprinkler1); - fireAllRules(); - - sprinkler1 = (Sprinkler) sessions.get(0).getObjects(new ClassObjectFilter(Sprinkler.class)).stream().collect(Collectors.toList()).get(0); - - assertThat(sprinkler1.isOn()).isTrue(); + insert(new Fire(room1)); + insert(new Sprinkler(room1)); failover(); restoreSession(FIRE_AND_ALARM, persistenceStrategy,safepointStrategy); - // phase 3 - Optional fhToDelete = getFactHandle(room1); - if (!fhToDelete.isEmpty()){ - delete(fhToDelete.get()); - } - fireAllRules(); + assertThat(fireAllRules()).isEqualTo(2); } }