Skip to content

Commit

Permalink
[DROOLS-7512] opt-in object store with references for reliable session (
Browse files Browse the repository at this point in the history
#5445)

* PersistenceObjectStrategy option, basic test 'insert-failover-shouldFireRules'

* Copyright header added

* addressing draft PR comments, improved test method

* add global results in testPhase1FailoverPhase2Phase3_ShouldFireRules

* init uniqueObjectTypesInStore

* more tests, other improvements

* sonar improvements

* improvements

* improvements: switch, groupingBy
  • Loading branch information
nprentza authored Aug 25, 2023
1 parent 0722b59 commit 3fd51b3
Show file tree
Hide file tree
Showing 15 changed files with 706 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public ReliableNamedEntryPoint(EntryPointId entryPoint, EntryPointNode entryPoin
protected ObjectStore createObjectStore(EntryPointId entryPoint, RuleBaseConfiguration conf, ReteEvaluator reteEvaluator) {
boolean storesOnlyStrategy = reteEvaluator.getSessionConfiguration().getPersistedSessionOption().getPersistenceStrategy() == PersistedSessionOption.PersistenceStrategy.STORES_ONLY;
return storesOnlyStrategy ?
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId())) :
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()),
reteEvaluator.getSessionConfiguration().getPersistedSessionOption()) :
new FullReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class SerializableStoredObject extends BaseStoredObject {

private final Serializable object;
protected final Serializable object;

public SerializableStoredObject(Object object, boolean propagated) {
super(propagated);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class SerializableStoredRefObject extends SerializableStoredObject {

private final Map<String, Long> referencedObjects;

public SerializableStoredRefObject(Object object, boolean propagated) {
super(object, propagated);
referencedObjects=new HashMap<>();
}

public void addReferencedObject(String fieldName, Long refObjectKey){
this.referencedObjects.put(fieldName, refObjectKey);
}

public StoredObject updateReferencedObjects(Storage<Long, StoredObject> storage){
this.referencedObjects.keySet().forEach(fieldName -> {
Optional<Field> refField = Arrays.stream(object.getClass().getDeclaredFields())
.filter(f -> f.getName().equals(fieldName)).findFirst();
if (refField.isPresent()){
refField.get().setAccessible(true);
try {
refField.get().set(this.object, storage.get(this.referencedObjects.get(refField.get().getName())).getObject());
} catch (IllegalAccessException e) {
throw new ReliabilityRuntimeException(e);
}
}
});
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import org.drools.core.common.Storage;
import org.kie.api.internal.utils.KieService;
import org.kie.api.runtime.conf.PersistedSessionOption;

public interface SimpleReliableObjectStoreFactory extends KieService {

SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage);
SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption);

class Holder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package org.drools.reliability.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
Expand All @@ -27,9 +23,13 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;
protected transient Storage<Long, StoredObject> storage;

protected boolean reInitPropagated = false;

Expand Down Expand Up @@ -113,7 +113,7 @@ public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagate
storage.put(getHandleForObject(object).getId(), storedObject);
}

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
protected StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
return handle.isEvent() ?
createStoredEvent(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration()) :
createStoredObject(propagated, object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.drools.reliability.core;

import org.drools.core.common.Storage;
import org.kie.api.runtime.conf.PersistedSessionOption;

public class SimpleSerializationReliableObjectStoreFactory implements SimpleReliableObjectStoreFactory {

Expand All @@ -25,6 +26,14 @@ public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, S
return new SimpleSerializationReliableObjectStore(storage);
}

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
switch (persistedSessionOption.getPersistenceObjectsStrategy()){
case SIMPLE: return new SimpleSerializationReliableObjectStore(storage);
case OBJECT_REFERENCES: return new SimpleSerializationReliableRefObjectStore(storage);
default: throw new UnsupportedOperationException();
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

import org.drools.core.common.InternalFactHandle;
import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public class SimpleSerializationReliableRefObjectStore extends SimpleSerializationReliableObjectStore {

private Map<String, Long> uniqueObjectTypesInStore; // object type name, occurances

public SimpleSerializationReliableRefObjectStore(Storage<Long, StoredObject> storage) {
super(storage);
uniqueObjectTypesInStore = new HashMap<>();
if (storage.isEmpty()) {
this.storage = storage;
} else {
updateObjectTypesList();
this.storage = updateObjectReferences(storage);
}
}

private Storage<Long, StoredObject> updateObjectReferences(Storage<Long, StoredObject> storage) {
Storage<Long, StoredObject> updateStorage = storage;

for (Long key : storage.keySet()) {
updateStorage.put(key, ((SerializableStoredRefObject) storage.get(key)).updateReferencedObjects(storage));
}
return updateStorage;
}

@Override
public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) {
Object object = handle.getObject();
StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object);
storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject));
// also add the type of the object into the uniqueObjectTypesInStore list (if not already there)
this.updateObjectTypesList(object);
}

@Override
public void removeFromPersistedStorage(Object object) {
super.removeFromPersistedStorage(object);
// also remove instance from uniqueObjectTypesInStore
this.updateObjectTypesList(object);
}

@Override
protected StoredObject createStoredObject(boolean propagated, Object object) {
return new SerializableStoredRefObject(object, propagated);
}

private StoredObject setReferencedObjects(StoredObject object) {
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (!referencedObjects.isEmpty()) {
// for each referenced object in sObject
// lookup in storage, find the object of reference, get its fact handle id
// save this association in the StoredObject
referencedObjects.forEach(field -> {
field.setAccessible(true);
Object fieldObject = null;
try {
fieldObject = field.get(object.getObject());
} catch (IllegalAccessException e) {
throw new ReliabilityRuntimeException(e);
}
Long objectKey = fromObjectToFactHandleId(fieldObject);
if (objectKey != null) {
((SerializableStoredRefObject) object).addReferencedObject(field.getName(), objectKey);
}
});
}
return object;
}

private Long fromObjectToFactHandleId(Object object) {
for (Long key : this.storage.keySet()) {
if (((SerializableStoredRefObject) storage.get(key)).getObject() == object) {
return key;
}
}
return null;
}

private List<Field> getReferencedObjects(Object object) {
Field[] fields = object.getClass().getDeclaredFields();

return Arrays.stream(fields)
.filter(field -> uniqueObjectTypesInStore.containsKey(field.getType().getName()))
.collect(Collectors.toList());
}

private void updateObjectTypesList(Object object) {
uniqueObjectTypesInStore.put(object.getClass().getName(),
storage.values().stream().filter(sObject -> sObject.getObject().getClass().equals(object.getClass())).count());
// if count==0 then remove entry
if (uniqueObjectTypesInStore.get(object.getClass().getName()) == 0) {
uniqueObjectTypesInStore.remove(object.getClass().getName());
}
}

private void updateObjectTypesList() {
// list of unique object types in storage
Set<String> uTypeNames = new HashSet<String>();
storage.values().forEach(sObject -> {
uTypeNames.add(sObject.getObject().getClass().getName());
});
// add unique object types + their occurrences in the uniqueObjectTypesInStore
uniqueObjectTypesInStore.putAll(storage.values().stream()
.map(sObject -> sObject.getObject().getClass().getName())
.filter(uTypeNames::contains)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting())));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.drools.reliability.core.SimpleReliableObjectStore;
import org.drools.reliability.core.SimpleReliableObjectStoreFactory;
import org.drools.reliability.core.SimpleSerializationReliableObjectStore;
import org.drools.reliability.core.SimpleSerializationReliableRefObjectStore;
import org.drools.reliability.core.StorageManagerFactory;
import org.drools.reliability.core.StoredObject;
import org.drools.reliability.infinispan.proto.SimpleProtoStreamReliableObjectStore;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,18 +33,24 @@ public class SimpleInfinispanReliableObjectStoreFactory implements SimpleReliabl

private static final Logger LOG = LoggerFactory.getLogger(SimpleInfinispanReliableObjectStoreFactory.class);

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage) {
public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (((InfinispanStorageManager)StorageManagerFactory.get().getStorageManager()).isProtoStream()) {
LOG.debug("Using SimpleProtoStreamReliableObjectStore");
return new SimpleProtoStreamReliableObjectStore(storage);
} else {
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
if (persistedSessionOption.getPersistenceObjectsStrategy()== PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES){
LOG.debug("Using SimpleSerializationReliableRefObjectStore");
return new SimpleSerializationReliableRefObjectStore(storage);
}else{
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
}
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
}

}
Loading

0 comments on commit 3fd51b3

Please sign in to comment.