Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PersistenceObjectStrategy option #5418

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public ReliableNamedEntryPoint(EntryPointId entryPoint, EntryPointNode entryPoin
protected ObjectStore createObjectStore(EntryPointId entryPoint, RuleBaseConfiguration conf, ReteEvaluator reteEvaluator) {
boolean storesOnlyStrategy = reteEvaluator.getSessionConfiguration().getPersistedSessionOption().getPersistenceStrategy() == PersistedSessionOption.PersistenceStrategy.STORES_ONLY;
return storesOnlyStrategy ?
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId())) :
SimpleReliableObjectStoreFactory.get().createSimpleReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()),
reteEvaluator.getSessionConfiguration().getPersistedSessionOption()) :
new FullReliableObjectStore(StorageManagerFactory.get().getStorageManager().getOrCreateStorageForSession(reteEvaluator, "ep" + getEntryPointId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

public class SerializableStoredObject extends BaseStoredObject {

private final Serializable object;
protected final Serializable object;

public SerializableStoredObject(Object object, boolean propagated) {
super(propagated);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Copyright header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @tkobayas, copyright header added.


import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class SerializableStoredRefObject extends SerializableStoredObject {

private final Map<String, Long> referencedObjects;

public SerializableStoredRefObject(Object object, boolean propagated) {
super(object, propagated);
referencedObjects=new HashMap<>();
}

public void addReferencedObject(String fieldName, Long refObjectKey){
this.referencedObjects.put(fieldName, refObjectKey);
}

public StoredObject updateReferencedObjects(Storage<Long, StoredObject> storage){
this.referencedObjects.keySet().forEach(fieldName -> {
Optional<Field> refField = Arrays.stream(object.getClass().getDeclaredFields())
.filter(f -> f.getName().equals(fieldName)).findFirst();
if (refField.isPresent()){
refField.get().setAccessible(true);
try {
refField.get().set(this.object, storage.get(this.referencedObjects.get(refField.get().getName())).getObject());
} catch (IllegalAccessException e) {
throw new ReliabilityRuntimeException(e);
}
}
});
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

import org.drools.core.common.Storage;
import org.kie.api.internal.utils.KieService;
import org.kie.api.runtime.conf.PersistedSessionOption;

public interface SimpleReliableObjectStoreFactory extends KieService {

SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage);
SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption);

class Holder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

package org.drools.reliability.core;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.drools.core.ClockType;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.IdentityObjectStore;
Expand All @@ -27,9 +23,13 @@
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.common.Storage;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore {

protected final transient Storage<Long, StoredObject> storage;
protected transient Storage<Long, StoredObject> storage;

protected boolean reInitPropagated = false;

Expand Down Expand Up @@ -113,7 +113,7 @@ public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagate
storage.put(getHandleForObject(object).getId(), storedObject);
}

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
protected StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
return handle.isEvent() ?
createStoredEvent(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration()) :
createStoredObject(propagated, object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.drools.reliability.core;

import org.drools.core.common.Storage;
import org.kie.api.runtime.conf.PersistedSessionOption;

public class SimpleSerializationReliableObjectStoreFactory implements SimpleReliableObjectStoreFactory {

Expand All @@ -25,6 +26,14 @@ public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, S
return new SimpleSerializationReliableObjectStore(storage);
}

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (persistedSessionOption.getPersistenceObjectsStrategy() == PersistedSessionOption.PersistenceObjectsStrategy.SIMPLE) {
return new SimpleSerializationReliableObjectStore(storage);
}else {
return new SimpleSerializationReliableRefObjectStore(storage);
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Copyright header

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @tkobayas, copyright header added.


import org.drools.core.common.InternalFactHandle;
import org.drools.core.common.Storage;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class SimpleSerializationReliableRefObjectStore extends SimpleSerializationReliableObjectStore {

private Map<String, Long> uniqueObjectTypesInStore; // object type name, occurances

public SimpleSerializationReliableRefObjectStore() {
throw new UnsupportedOperationException("This constructor should never be called");
}

public SimpleSerializationReliableRefObjectStore(Storage<Long, StoredObject> storage) {
super(storage);
uniqueObjectTypesInStore = new HashMap<>();
if (storage.size() > 0) {
updateObjectTypesList();
}
this.storage = storage.size() > 0 ? updateObjectReferences(storage) : storage;
}

private Storage<Long, StoredObject> updateObjectReferences(Storage<Long, StoredObject> storage) {
Storage<Long, StoredObject> updateStorage = storage;

for (Long key : storage.keySet()) {
updateStorage.put(key, ((SerializableStoredRefObject) storage.get(key)).updateReferencedObjects(storage));
}
return updateStorage;
}

@Override
public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagated) {
Object object = handle.getObject();
StoredObject storedObject = factHandleToStoredObject(handle, reInitPropagated || propagated, object);
storage.put(getHandleForObject(object).getId(), setReferencedObjects(storedObject));
// also add the type of the object into the uniqueObjectTypesInStore list (if not already there)
this.updateObjectTypesList(object);
}

@Override
public void removeFromPersistedStorage(Object object) {
super.removeFromPersistedStorage(object);
// also remove instance from uniqueObjectTypesInStore
this.updateObjectTypesList(object);
}

@Override
protected StoredObject createStoredObject(boolean propagated, Object object) {
return new SerializableStoredRefObject(object, propagated);
}

private StoredObject setReferencedObjects(StoredObject object) {
List<Field> referencedObjects = getReferencedObjects(object.getObject());
if (!referencedObjects.isEmpty()) {
// for each referenced object in sObject
// lookup in storage, find the object of reference, get its fact handle id
// save this association in the StoredObject
referencedObjects.forEach(field -> {
field.setAccessible(true);
Object fieldObject = null;
try {
fieldObject = field.get(object.getObject());
} catch (IllegalAccessException e) {
throw new ReliabilityRuntimeException(e);
}
Long objectKey = fromObjectToFactHandleId(fieldObject);
if (objectKey != null) {
((SerializableStoredRefObject) object).addReferencedObject(field.getName(), objectKey);
}
});
}
return object;
}

private Long fromObjectToFactHandleId(Object object) {
for (Long key : this.storage.keySet()) {
if (((SerializableStoredRefObject) storage.get(key)).getObject() == object) {
return key;
}
}
return null;
}

private List<Field> getReferencedObjects(Object object) {
Field[] fields = object.getClass().getDeclaredFields();

return Arrays.stream(fields)
.filter(field -> uniqueObjectTypesInStore.containsKey(field.getType().getName()))
.collect(Collectors.toList());
}

private void updateObjectTypesList(Object object) {
uniqueObjectTypesInStore.put(object.getClass().getName(),
storage.values().stream().filter(sObject -> sObject.getObject().getClass().equals(object.getClass())).count());
// if count==0 then remove entry
if (uniqueObjectTypesInStore.get(object.getClass().getName()) == 0) {
uniqueObjectTypesInStore.remove(object.getClass().getName());
}
}

private void updateObjectTypesList() {
// list of unique object types in storage
List<String> uTypeNames = new ArrayList<>();
storage.values().forEach(sObject -> {
if (!uTypeNames.contains(sObject.getObject().getClass().getName())) {
uTypeNames.add(sObject.getObject().getClass().getName());
}
});
// add unique object types + their occurrences in uniqueObjectTypesInStore
uTypeNames.forEach(uType -> {
uniqueObjectTypesInStore.put(uType,
storage.values().stream().filter(sObject -> sObject.getObject().getClass().getName().equals(uType)).count());
});
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.drools.reliability.core.SimpleReliableObjectStore;
import org.drools.reliability.core.SimpleReliableObjectStoreFactory;
import org.drools.reliability.core.SimpleSerializationReliableObjectStore;
import org.drools.reliability.core.SimpleSerializationReliableRefObjectStore;
import org.drools.reliability.core.StorageManagerFactory;
import org.drools.reliability.core.StoredObject;
import org.drools.reliability.infinispan.proto.SimpleProtoStreamReliableObjectStore;
import org.kie.api.runtime.conf.PersistedSessionOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,18 +33,24 @@ public class SimpleInfinispanReliableObjectStoreFactory implements SimpleReliabl

private static final Logger LOG = LoggerFactory.getLogger(SimpleInfinispanReliableObjectStoreFactory.class);

public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage) {
public SimpleReliableObjectStore createSimpleReliableObjectStore(Storage<Long, StoredObject> storage, PersistedSessionOption persistedSessionOption) {
if (((InfinispanStorageManager)StorageManagerFactory.get().getStorageManager()).isProtoStream()) {
LOG.debug("Using SimpleProtoStreamReliableObjectStore");
return new SimpleProtoStreamReliableObjectStore(storage);
} else {
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
if (persistedSessionOption.getPersistenceObjectsStrategy()== PersistedSessionOption.PersistenceObjectsStrategy.OBJECT_REFERENCES){
LOG.debug("Using SimpleSerializationReliableRefObjectStore");
return new SimpleSerializationReliableRefObjectStore(storage);
}else{
LOG.debug("Using SimpleSerializationReliableObjectStore");
return new SimpleSerializationReliableObjectStore(storage);
}
}
}

@Override
public int servicePriority() {
return servicePriorityValue;
}

}
Loading
Loading