Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Jul 13, 2023
1 parent 699e8c1 commit ca67bcd
Show file tree
Hide file tree
Showing 34 changed files with 384 additions and 377 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.drools.base.common;

import java.util.concurrent.atomic.AtomicInteger;

public class PartitionsManager {

private static final int MIN_PARALLEL_THRESHOLD = Runtime.getRuntime().availableProcessors();
private static final int MAX_PARALLEL_THRESHOLD = Runtime.getRuntime().availableProcessors();

private final AtomicInteger partitionCounter = new AtomicInteger( 0 );

private int parallelEvaluationSlotsCount = -1;

public RuleBasePartitionId createNewPartitionId() {
return new RuleBasePartitionId(this, partitionCounter.incrementAndGet());
}

public int getPartitionsCount() {
return partitionCounter.get();
}

public boolean hasParallelEvaluation() {
return getPartitionsCount() >= MIN_PARALLEL_THRESHOLD;
}

public int getParallelEvaluationSlotsCount() {
return parallelEvaluationSlotsCount;
}

public void init() {
this.parallelEvaluationSlotsCount = Math.min(getPartitionsCount(), MAX_PARALLEL_THRESHOLD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,19 @@

package org.drools.base.common;

import org.kie.api.concurrent.KieExecutors;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A class to identify RuleBase partitions
*/
public final class RuleBasePartitionId implements Serializable {

private static final long serialVersionUID = 510l;

public static final int PARALLEL_PARTITIONS_NUMBER = KieExecutors.Pool.SIZE;
public final class RuleBasePartitionId {

public static final RuleBasePartitionId MAIN_PARTITION = new RuleBasePartitionId( 0 );
public static final RuleBasePartitionId MAIN_PARTITION = new RuleBasePartitionId(null, 0);

private static final AtomicInteger PARTITION_COUNTER = new AtomicInteger( 1 );
private final PartitionsManager partitionsManager;

private final int id;

private RuleBasePartitionId( int id ) {
public RuleBasePartitionId(PartitionsManager partitionsManager, int id ) {
this.partitionsManager = partitionsManager;
this.id = id;
}

Expand All @@ -45,7 +37,7 @@ public int getId() {
}

public int getParallelEvaluationSlot() {
return id % PARALLEL_PARTITIONS_NUMBER;
return id % partitionsManager.getParallelEvaluationSlotsCount();
}

@Override
Expand All @@ -58,11 +50,8 @@ public boolean equals(Object obj) {
return this == obj || (obj instanceof RuleBasePartitionId && id == ((RuleBasePartitionId)obj).id);
}

@Override
public String toString() {
return "Partition(" + (id == 0 ? "MAIN" : id) + ")";
}

public static RuleBasePartitionId createPartition() {
return new RuleBasePartitionId( PARTITION_COUNTER.getAndIncrement() );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public interface AgendaFactory {

InternalAgenda createAgenda(InternalRuleBase kBase, boolean initMain);

InternalAgenda createAgenda(InternalRuleBase kBase);

default InternalAgenda createAgenda(InternalRuleBase kBase) {
return createAgenda(kBase, true);
}
}
21 changes: 7 additions & 14 deletions drools-core/src/main/java/org/drools/core/common/BaseNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@

package org.drools.core.common;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.drools.base.common.NetworkNode;
import org.drools.base.common.RuleBasePartitionId;
import org.drools.base.reteoo.BaseTerminalNode;
Expand All @@ -36,6 +30,12 @@
import org.drools.core.reteoo.builder.BuildContext;
import org.kie.api.definition.rule.Rule;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* The base class for all Rete nodes.
*/
Expand All @@ -48,7 +48,6 @@ public abstract class BaseNode
protected int memoryId = -1;

protected RuleBasePartitionId partitionId;
protected boolean partitionsEnabled;
protected Set<Rule> associations;

private Map<Integer, TerminalNode> associatedTerminals;
Expand All @@ -68,12 +67,10 @@ public BaseNode() {
* The unique id
*/
public BaseNode(final int id,
final RuleBasePartitionId partitionId,
final boolean partitionsEnabled) {
final RuleBasePartitionId partitionId) {
super();
this.id = id;
this.partitionId = partitionId;
this.partitionsEnabled = partitionsEnabled;
this.associations = new HashSet<>();
this.associatedTerminals = new HashMap<>();
}
Expand Down Expand Up @@ -171,10 +168,6 @@ public void setPartitionId(BuildContext context, RuleBasePartitionId partitionId
this.partitionId = partitionId;
}

public void setPartitionsEnabled( boolean partitionsEnabled ) {
this.partitionsEnabled = partitionsEnabled;
}

/**
* Associates this node with the give rule
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ private DefaultEventHandle cloneWithoutTuples() {
clone.setOtnCount( getOtnCount() );
clone.setExpired( isExpired() );
clone.setEqualityKey( getEqualityKey() );
clone.linkedTuples = this.linkedTuples.newInstance();
clone.linkedTuples = this.linkedTuples.cloneEmpty();
clone.setObjectHashCode(getObjectHashCode());
clone.wmEntryPoint = this.wmEntryPoint;
return clone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.drools.core.common;

import org.drools.base.common.RuleBasePartitionId;
import org.drools.base.factmodel.traits.TraitTypeEnum;
import org.drools.base.rule.EntryPointId;
import org.drools.core.WorkingMemoryEntryPoint;
Expand Down Expand Up @@ -353,9 +352,9 @@ public WorkingMemoryEntryPoint getEntryPoint(ReteEvaluator reteEvaluator) {
return wmEntryPoint;
}

protected void setLinkedTuples( InternalRuleBase kbase) {
linkedTuples = kbase != null && kbase.getRuleBaseConfiguration().isMultithreadEvaluation() ?
new CompositeLinkedTuples() :
protected void setLinkedTuples(InternalRuleBase kbase) {
linkedTuples = kbase != null && kbase.hasParallelEvaluation() ?
new CompositeLinkedTuples(kbase.getParallelEvaluationSlotsCount()) :
new SingleLinkedTuples();
}

Expand Down Expand Up @@ -467,7 +466,7 @@ public SingleLinkedTuples clone() {
}

@Override
public LinkedTuples newInstance() {
public LinkedTuples cloneEmpty() {
return new SingleLinkedTuples();
}

Expand Down Expand Up @@ -717,7 +716,11 @@ RightTuple getFirstRightTuple() {

public static class CompositeLinkedTuples implements LinkedTuples {

private final LinkedTuples[] partitionedTuples = new LinkedTuples[RuleBasePartitionId.PARALLEL_PARTITIONS_NUMBER];
private final LinkedTuples[] partitionedTuples;

public CompositeLinkedTuples(int parallelEvaluationSlotsCount) {
this.partitionedTuples = new LinkedTuples[parallelEvaluationSlotsCount];
}

private LinkedTuples getPartitionedTuple(int partition) {
LinkedTuples tuples = partitionedTuples[partition];
Expand All @@ -733,8 +736,8 @@ private LinkedTuples getOrCreatePartitionedTuple(int partition) {
}

@Override
public LinkedTuples newInstance() {
return new CompositeLinkedTuples();
public LinkedTuples cloneEmpty() {
return new CompositeLinkedTuples(partitionedTuples.length);
}

@Override
Expand All @@ -749,7 +752,7 @@ public boolean hasTuples() {

@Override
public LinkedTuples clone() {
CompositeLinkedTuples clone = new CompositeLinkedTuples();
CompositeLinkedTuples clone = new CompositeLinkedTuples(partitionedTuples.length);
for (int i = 0; i < partitionedTuples.length; i++) {
clone.partitionedTuples[i] = partitionedTuples[i] == null ? null : partitionedTuples[i].clone();
}
Expand Down Expand Up @@ -875,7 +878,7 @@ public LinkedTuples clone() {
}

@Override
public LinkedTuples newInstance() {
public LinkedTuples cloneEmpty() {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ default boolean hasMatches() {

interface LinkedTuples extends Serializable {
LinkedTuples clone();
LinkedTuples newInstance();
LinkedTuples cloneEmpty();

boolean hasTuples();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@

package org.drools.core.impl;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;

import org.drools.base.RuleBase;
import org.drools.base.common.RuleBasePartitionId;
import org.drools.base.definitions.InternalKnowledgePackage;
import org.drools.base.definitions.rule.impl.RuleImpl;
import org.drools.base.rule.InvalidPatternException;
import org.drools.base.rule.TypeDeclaration;
import org.drools.base.ruleunit.RuleUnitDescriptionRegistry;
import org.drools.core.KieBaseConfigurationImpl;
import org.drools.core.RuleBaseConfiguration;
import org.drools.core.base.ClassFieldAccessorCache;
import org.drools.core.common.InternalWorkingMemory;
import org.drools.core.common.ReteEvaluator;
import org.drools.base.common.RuleBasePartitionId;
import org.drools.base.definitions.InternalKnowledgePackage;
import org.drools.base.definitions.rule.impl.RuleImpl;
import org.drools.core.reteoo.AsyncReceiveNode;
import org.drools.core.reteoo.EntryPointNode;
import org.drools.core.reteoo.LeftTupleNode;
Expand All @@ -41,10 +36,7 @@
import org.drools.core.reteoo.ReteooBuilder;
import org.drools.core.reteoo.SegmentMemory;
import org.drools.core.reteoo.SegmentMemory.SegmentPrototype;
import org.drools.base.rule.InvalidPatternException;
import org.drools.base.rule.TypeDeclaration;
import org.drools.core.rule.accessor.FactHandleFactory;
import org.drools.base.ruleunit.RuleUnitDescriptionRegistry;
import org.kie.api.KieBaseConfiguration;
import org.kie.api.builder.ReleaseId;
import org.kie.api.definition.KiePackage;
Expand All @@ -55,6 +47,14 @@
import org.kie.api.io.Resource;
import org.kie.api.runtime.KieSessionConfiguration;

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;

public interface InternalRuleBase extends RuleBase {

Collection<KiePackage> getKiePackages();
Expand All @@ -81,6 +81,8 @@ public interface InternalRuleBase extends RuleBase {
String getId();

RuleBasePartitionId createNewPartitionId();
boolean hasParallelEvaluation();
int getParallelEvaluationSlotsCount();

RuleBaseConfiguration getRuleBaseConfiguration();

Expand Down
Loading

0 comments on commit ca67bcd

Please sign in to comment.