Skip to content

Commit

Permalink
[DROOLS-7502] remove no longer necessary explicit deletion of expired…
Browse files Browse the repository at this point in the history
… events
  • Loading branch information
mariofusco committed Jul 11, 2023
1 parent 315721b commit 3a57b1a
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@

package org.drools.core.common;

import java.util.concurrent.atomic.AtomicInteger;

import org.drools.base.common.RuleBasePartitionId;
import org.drools.core.WorkingMemoryEntryPoint;
import org.drools.base.rule.EntryPointId;
import org.drools.base.time.JobHandle;
import org.drools.core.WorkingMemoryEntryPoint;
import org.drools.core.time.TimerService;
import org.drools.core.time.impl.DefaultJobHandle;
import org.drools.core.util.LinkedList;
Expand All @@ -42,8 +39,6 @@ public class DefaultEventHandle extends DefaultFactHandle implements EventHandle

private DefaultEventHandle linkedFactHandle;

private AtomicInteger notExpiredPartitions;

private final transient LinkedList<DefaultJobHandle> jobs = new LinkedList<>();

public DefaultEventHandle() {
Expand Down Expand Up @@ -77,10 +72,6 @@ public DefaultEventHandle(long id,
super( id, object, recency, wmEntryPoint );
this.startTimestamp = timestamp;
this.duration = duration;

if ( wmEntryPoint.getKnowledgeBase() != null && wmEntryPoint.getKnowledgeBase().getRuleBaseConfiguration().isMultithreadEvaluation() ) {
notExpiredPartitions = new AtomicInteger( RuleBasePartitionId.PARALLEL_PARTITIONS_NUMBER );
}
}

protected DefaultEventHandle(long id,
Expand Down Expand Up @@ -175,14 +166,6 @@ public boolean isExpired() {
}
}

public boolean expirePartition() {
if ( linkedFactHandle != null ) {
return linkedFactHandle.expirePartition();
} else {
return notExpiredPartitions == null || notExpiredPartitions.decrementAndGet() == 0;
}
}

public void setExpired(boolean expired) {
if ( linkedFactHandle != null ) {
linkedFactHandle.setExpired(expired);
Expand Down
28 changes: 4 additions & 24 deletions drools-core/src/main/java/org/drools/core/phreak/RuleExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
package org.drools.core.phreak;

import org.drools.base.base.SalienceInteger;
import org.drools.base.definitions.rule.impl.RuleImpl;
import org.drools.base.rule.consequence.Consequence;
import org.drools.base.rule.consequence.ConsequenceException;
import org.drools.core.common.ActivationsManager;
import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.EventSupport;
import org.drools.core.common.InternalActivationGroup;
import org.drools.core.common.InternalFactHandle;
import org.drools.core.common.ReteEvaluator;
import org.drools.core.conflict.MatchConflictResolver;
import org.drools.core.conflict.RuleAgendaConflictResolver;
import org.drools.base.definitions.rule.impl.RuleImpl;
import org.drools.core.event.RuleEventListenerSupport;
import org.drools.core.reteoo.PathMemory;
import org.drools.core.reteoo.RuleTerminalNode;
import org.drools.core.reteoo.RuleTerminalNodeLeftTuple;
import org.drools.core.reteoo.Tuple;
import org.drools.base.rule.consequence.Consequence;
import org.drools.base.rule.consequence.ConsequenceException;
import org.drools.core.rule.consequence.InternalMatch;
import org.drools.core.rule.consequence.KnowledgeHelper;
import org.drools.core.util.Queue;
Expand Down Expand Up @@ -362,26 +361,7 @@ public void fireActivation(ReteEvaluator reteEvaluator, ActivationsManager activ
}
internalMatch.setQueued(false);

try {
fireActivationEvent(reteEvaluator, activationsManager, internalMatch, internalMatch.getConsequence());
} finally {
// if the tuple contains expired events
for (Tuple tuple = internalMatch.getTuple().skipEmptyHandles(); tuple != null; tuple = tuple.getParent() ) {
if ( tuple.getFactHandle().isEvent() ) {
// can be null for eval, not and exists that have no right input
DefaultEventHandle handle = (DefaultEventHandle) tuple.getFactHandle();
// decrease the activation count for the event
handle.decreaseActivationsCount();
// handles "expire" only in stream mode.
if ( handle.expirePartition() && handle.isExpired() &&
handle.getFirstRightTuple() == null && handle.getActivationsCount() <= 0 ) {
// and if no more activations, retract the handle
handle.getEntryPoint( reteEvaluator ).delete( handle );
}
}
}
}

fireActivationEvent(reteEvaluator, activationsManager, internalMatch, internalMatch.getConsequence());
activationsManager.getAgendaEventSupport().fireAfterActivationFired(internalMatch, reteEvaluator, beforeMatchFiredEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@

package org.drools.kiesession.agenda;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import org.drools.base.common.NetworkNode;
import org.drools.base.common.RuleBasePartitionId;
import org.drools.core.common.ActivationsFilter;
import org.drools.core.common.AgendaGroupsManager;
import org.drools.core.common.InternalActivationGroup;
import org.drools.core.common.InternalAgenda;
import org.drools.core.common.InternalAgendaGroup;
import org.drools.core.common.InternalWorkingMemory;
import org.drools.base.common.NetworkNode;
import org.drools.core.common.PropagationContext;
import org.drools.core.common.ReteEvaluator;
import org.drools.base.common.RuleBasePartitionId;
import org.drools.core.common.RuleFlowGroup;
import org.drools.core.event.AgendaEventSupport;
import org.drools.core.impl.InternalRuleBase;
import org.drools.core.phreak.ExecutableEntry;
Expand All @@ -45,22 +37,30 @@
import org.drools.core.reteoo.RuleTerminalNodeLeftTuple;
import org.drools.core.reteoo.TerminalNode;
import org.drools.core.rule.consequence.InternalMatch;
import org.drools.core.common.InternalActivationGroup;
import org.drools.core.rule.consequence.KnowledgeHelper;
import org.drools.core.common.PropagationContext;
import org.drools.core.common.RuleFlowGroup;
import org.drools.core.util.CompositeIterator;
import org.kie.api.runtime.rule.AgendaFilter;
import org.kie.internal.concurrent.ExecutorProviderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;

public class CompositeDefaultAgenda implements Externalizable, InternalAgenda {

protected static final transient Logger log = LoggerFactory.getLogger( CompositeDefaultAgenda.class );
protected static final Logger log = LoggerFactory.getLogger( CompositeDefaultAgenda.class );

private static final ExecutorService EXECUTOR = ExecutorProviderFactory.getExecutorProvider().getExecutor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,8 @@ private static ProcessMarshaller createProcessMarshaller() {
}

public static void writeSession( ProtobufMarshallerWriteContext context) throws IOException {

ProtobufMessages.KnowledgeSession _session = serializeSession( context );

// System.out.println("=============================================================================");
// System.out.println(_session);

PersisterHelper.writeToStreamWithHeader( context,
_session );
PersisterHelper.writeToStreamWithHeader( context, _session );
}

private static ProtobufMessages.KnowledgeSession serializeSession( MarshallerWriteContext context) throws IOException {
Expand Down Expand Up @@ -548,10 +542,10 @@ private static void writeFactHandles( MarshallerWriteContext context,

// Write out FactHandles
for ( InternalFactHandle handle : orderFacts( objectStore ) ) {
ProtobufMessages.FactHandle _handle = writeFactHandle( context,
objectMarshallingStrategyStore,
handle );
_epb.addHandle( _handle );
if (!handle.isExpired()) {
ProtobufMessages.FactHandle _handle = writeFactHandle( context, objectMarshallingStrategyStore, handle );
_epb.addHandle( _handle );
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1999,11 +1999,11 @@ public void testMarshallEvents() throws Exception {

ksession.insert( new A() );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

ksession.insert( new B() );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

ksession.fireAllRules();
assertThat(ksession.getObjects().size()).isEqualTo(2);
Expand Down Expand Up @@ -2141,21 +2141,21 @@ public void testMarshallEntryPointsWithExpires() throws Exception {
EntryPoint aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

EntryPoint bep = ksession.getEntryPoint( "b-ep" );
bep.insert( new B() );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

EntryPoint cep = ksession.getEntryPoint( "c-ep" );
cep.insert( new C() );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

ksession.fireAllRules();

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

assertThat(list.size()).isEqualTo(3);

Expand All @@ -2168,14 +2168,14 @@ public void testMarshallEntryPointsWithExpires() throws Exception {
cep = ksession.getEntryPoint( "c-ep" );
assertThat(cep.getFactHandles().size()).isEqualTo(1);

PseudoClockScheduler timeService = (PseudoClockScheduler) ksession.getSessionClock();
PseudoClockScheduler timeService = ksession.getSessionClock();
timeService.advanceTime( 11, TimeUnit.SECONDS );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

ksession.fireAllRules();

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

aep = ksession.getEntryPoint( "a-ep" );
assertThat(aep.getFactHandles().size()).isEqualTo(0);
Expand Down Expand Up @@ -2226,16 +2226,16 @@ public void testMarshallEntryPointsWithNot() throws Exception {
EntryPoint aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

PseudoClockScheduler timeService = (PseudoClockScheduler) ksession.getSessionClock();
timeService.advanceTime( 3, TimeUnit.SECONDS );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

ksession.fireAllRules();

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

assertThat(list.size()).isEqualTo(0);
}
Expand Down Expand Up @@ -2278,37 +2278,37 @@ public void testMarshallEntryPointsWithSlidingTimeWindow() throws Exception {

EntryPoint aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

list.clear();
ksession.fireAllRules();
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );
assertThat(((List) list.get(0)).size()).isEqualTo(2);

PseudoClockScheduler timeService = (PseudoClockScheduler) ksession.getSessionClock();
timeService.advanceTime( 15, TimeUnit.SECONDS );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

list.clear();
ksession.fireAllRules();
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );
assertThat(((List) list.get(0)).size()).isEqualTo(4);

timeService = (PseudoClockScheduler) ksession.getSessionClock();
timeService.advanceTime( 20, TimeUnit.SECONDS );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

list.clear();
ksession.fireAllRules();
Expand Down Expand Up @@ -2353,28 +2353,28 @@ public void testMarshallEntryPointsWithSlidingLengthWindow() throws Exception {

EntryPoint aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

list.clear();
ksession.fireAllRules();
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );
assertThat(((List) list.get(0)).size()).isEqualTo(2);

aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

aep = ksession.getEntryPoint( "a-ep" );
aep.insert( new A() );
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

list.clear();
ksession.fireAllRules();
ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );
assertThat(((List) list.get(0)).size()).isEqualTo(3);
}

Expand All @@ -2388,7 +2388,7 @@ public void testMarshalWithProtoBuf() throws Exception {
final Person bob = new Person( "bob" );
ksession.insert( bob );

ksession = marsallStatefulKnowledgeSession( ksession );
ksession = marshallStatefulKnowledgeSession( ksession );

assertThat(ksession.getFactCount()).isEqualTo(1);
assertThat(ksession.getObjects().iterator().next()).isEqualTo(bob);
Expand All @@ -2408,15 +2408,13 @@ public void testMarshalWithProtoBuf() throws Exception {
assertThat(facts.size()).isEqualTo(2);
}

private KieSession marsallStatefulKnowledgeSession(KieSession ksession) throws IOException,
ClassNotFoundException {
private KieSession marshallStatefulKnowledgeSession(KieSession ksession) throws IOException, ClassNotFoundException {
Globals globals = ksession.getGlobals();

KieBase kbase = ksession.getKieBase();

ByteArrayOutputStream out = new ByteArrayOutputStream();
MarshallerFactory.newMarshaller( kbase ).marshall( out,
ksession );
MarshallerFactory.newMarshaller( kbase ).marshall( out, ksession );

KieSessionConfiguration ksconf = RuleBaseFactory.newKnowledgeSessionConfiguration();
ksconf.setOption( TimerJobFactoryOption.get("trackable") );
Expand Down
Loading

0 comments on commit 3a57b1a

Please sign in to comment.