-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DROOLS-7480] Persist info to identify which activation is fired or not #5466
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,17 +15,22 @@ | |
|
||
package org.drools.reliability.core; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.drools.core.ClockType; | ||
import org.drools.core.common.DefaultEventHandle; | ||
import org.drools.core.common.IdentityObjectStore; | ||
import org.drools.core.common.InternalFactHandle; | ||
import org.drools.core.common.InternalWorkingMemory; | ||
import org.drools.core.common.InternalWorkingMemoryEntryPoint; | ||
import org.drools.core.common.Storage; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
import org.drools.reliability.core.util.ReliabilityUtils; | ||
import org.kie.api.runtime.conf.PersistedSessionOption; | ||
|
||
public class SimpleSerializationReliableObjectStore extends IdentityObjectStore implements SimpleReliableObjectStore { | ||
|
||
|
@@ -57,23 +62,23 @@ public void removeHandle(InternalFactHandle handle) { | |
@Override | ||
public List<StoredObject> reInit(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep) { | ||
reInitPropagated = true; | ||
List<StoredObject> propagated = new ArrayList<>(); | ||
Map<Long, StoredObject> propagated = new HashMap<>(); | ||
List<StoredObject> notPropagated = new ArrayList<>(); | ||
for (StoredObject entry : storage.values()) { | ||
if (entry.isPropagated()) { | ||
propagated.add(entry); | ||
for (Long factHandleId : storage.keySet()) { | ||
StoredObject storedObject = storage.get(factHandleId); | ||
if (storedObject.isPropagated()) { | ||
propagated.put(factHandleId, storedObject); | ||
} else { | ||
notPropagated.add(entry); | ||
notPropagated.add(storedObject); | ||
} | ||
} | ||
|
||
storage.clear(); | ||
|
||
if (session.getSessionConfiguration().getClockType() == ClockType.PSEUDO_CLOCK) { | ||
repropagateWithPseudoClock(session, ep, propagated); | ||
} else { | ||
// fact handles with a match have been already propagated in the original session, so they shouldn't fire | ||
propagated.forEach(obj -> obj.repropagate(ep)); | ||
session.fireAllRules(match -> false); | ||
repropagate(session, ep, propagated); | ||
} | ||
|
||
reInitPropagated = false; | ||
|
@@ -82,9 +87,41 @@ public List<StoredObject> reInit(InternalWorkingMemory session, InternalWorkingM | |
return notPropagated; | ||
} | ||
|
||
private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, List<StoredObject> propagated) { | ||
private void repropagate(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, Map<Long, StoredObject> propagated) { | ||
Map<Long, Long> factHandleIdMap = new HashMap<>(); | ||
propagated.forEach((oldFactHandleId, storedObject) -> { | ||
long newFactHandleId = storedObject.repropagate(ep); | ||
factHandleIdMap.put(newFactHandleId, oldFactHandleId); | ||
}); | ||
Comment on lines
+91
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
|
||
fireOnlyWhenActivationRemaining(session, factHandleIdMap); | ||
} | ||
|
||
private void fireOnlyWhenActivationRemaining(InternalWorkingMemory session, Map<Long, Long> factHandleIdMap) { | ||
if (session.getSessionConfiguration().getPersistedSessionOption().getActivationStrategy() == PersistedSessionOption.ActivationStrategy.ACTIVATION_KEY) { | ||
// fact handles with a match have been already propagated in the original session, so they shouldn't fire unless remained in activationsStorage | ||
Storage<String, Object> activationsStorage = ((ReliableKieSession)session).getActivationsStorage(); | ||
Set<String> activationKeySet = activationsStorage.keySet(); | ||
session.fireAllRules(match -> { | ||
String activationKey = ReliabilityUtils.getActivationKeyReplacingNewIdWithOldId(match, factHandleIdMap); | ||
if (activationKeySet.contains(activationKey)) { | ||
// If there is a remaining activation, it can fire | ||
activationsStorage.remove(activationKey); | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
}); | ||
} else { | ||
session.fireAllRules(match -> false); | ||
} | ||
} | ||
|
||
private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalWorkingMemoryEntryPoint ep, Map<Long, StoredObject> propagated) { | ||
ReliablePseudoClockScheduler clock = (ReliablePseudoClockScheduler) session.getSessionClock(); | ||
for (StoredObject storedObject : propagated) { | ||
Map<Long, Long> factHandleIdMap = new HashMap<>(); | ||
for (Map.Entry<Long, StoredObject> entry : propagated.entrySet()) { | ||
StoredObject storedObject = entry.getValue(); | ||
if (storedObject.isEvent()) { | ||
StoredEvent storedEvent = (StoredEvent) storedObject; | ||
long currentTime = clock.getCurrentTime(); | ||
|
@@ -93,10 +130,11 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW | |
clock.advanceTime(timestamp - currentTime, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
storedObject.repropagate(ep); // This may schedule an expiration | ||
long newFactHandleId = storedObject.repropagate(ep); // This may schedule an expiration | ||
factHandleIdMap.put(newFactHandleId, entry.getKey()); | ||
} | ||
// fact handles with a match have been already propagated in the original session, so they shouldn't fire | ||
session.fireAllRules(match -> false); | ||
|
||
fireOnlyWhenActivationRemaining(session, factHandleIdMap); | ||
|
||
// Finally, meet with the persistedTime | ||
long currentTime = clock.getCurrentTime(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright 2023 Red Hat, Inc. and/or its affiliates. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.drools.reliability.core.util; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import org.drools.core.reteoo.RuleTerminalNodeLeftTuple; | ||
import org.drools.reliability.core.ReliabilityRuntimeException; | ||
import org.kie.api.runtime.rule.FactHandle; | ||
import org.kie.api.runtime.rule.Match; | ||
|
||
public class ReliabilityUtils { | ||
|
||
private ReliabilityUtils() { | ||
// no constructor | ||
} | ||
|
||
/** | ||
* Returns a String representation of the activation. | ||
*/ | ||
public static String getActivationKey(Match match) { | ||
return getActivationKey(match, null); | ||
} | ||
|
||
/** | ||
* Returns a String representation of the activation, replacing the new fact handle id with the old fact handle id. | ||
* Used to find an activation key in the persisted storage. | ||
*/ | ||
public static String getActivationKeyReplacingNewIdWithOldId(Match match, Map<Long, Long> factHandleIdMap) { | ||
return getActivationKey(match, factHandleIdMap); | ||
} | ||
|
||
private static String getActivationKey(Match match, Map<Long, Long> factHandleIdMap) { | ||
if (!(match instanceof RuleTerminalNodeLeftTuple)) { | ||
throw new ReliabilityRuntimeException("getActivationKey doesn't support " + match.getClass()); | ||
} | ||
RuleTerminalNodeLeftTuple ruleTerminalNodeLeftTuple = (RuleTerminalNodeLeftTuple) match; | ||
String packageName = ruleTerminalNodeLeftTuple.getRule().getPackageName(); | ||
String ruleName = ruleTerminalNodeLeftTuple.getRule().getName(); | ||
List<FactHandle> factHandles = ruleTerminalNodeLeftTuple.getFactHandles(); | ||
List<Long> factHandleIdList = factHandles.stream() | ||
.map(FactHandle::getId) | ||
.map(handleId -> { | ||
if (factHandleIdMap != null) { | ||
return factHandleIdMap.get(handleId); // replace new id with old id | ||
} else { | ||
return handleId; // don't replace | ||
} | ||
}) | ||
.collect(Collectors.toList()); | ||
return "ActivationKey [packageName=" + packageName + ", ruleName=" + ruleName + ", factHandleIdList=" + factHandleIdList + "]"; | ||
Comment on lines
+48
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't simply serialize an activation (= In order to identify a unique activation, I use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this should be fine. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activationKey
is added when an activation is created.activationKey
is removed when the activation is cancelled or fired. If the storage has an activationKey, it means the activation is not yet fired.If I took an opposite approach (= add fired activationKey) , the storage could be piled up.