Skip to content

Commit

Permalink
[DROOLS-7479] Split StoredEvent from StoredFact (#5376)
Browse files Browse the repository at this point in the history
- removing handleId as no longer used
- reduce code duplication
  • Loading branch information
tkobayas authored Jul 7, 2023
1 parent 6d453fc commit b018b1f
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.rule.accessor.FactHandleFactory;

public abstract class BaseStoredEvent extends BaseStoredObject implements StoredEvent {

protected final long timestamp;
protected final long duration;

protected BaseStoredEvent(boolean propagated, long timestamp, long duration) {
super(propagated);
this.timestamp = timestamp;
this.duration = duration;
}

@Override
public long getTimestamp() {
return timestamp;
}

@Override
public long getDuration() {
return duration;
}

@Override
public void repropagate(InternalWorkingMemoryEntryPoint ep) {
FactHandleFactory fhFactory = ep.getHandleFactory();
DefaultEventHandle eFh = fhFactory.createEventFactHandle(fhFactory.getNextId(), getObject(), fhFactory.getNextRecency(), ep, timestamp, duration);
ep.insert(eFh);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,24 @@

import java.io.Serializable;

import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.rule.accessor.FactHandleFactory;

public abstract class BaseStoredObject implements StoredObject,
Serializable {

protected final boolean propagated;
protected final long timestamp;
protected final long duration;
protected final long handleId;

protected BaseStoredObject(boolean propagated, long timestamp, long duration, long handleId) {
protected BaseStoredObject(boolean propagated) {
this.propagated = propagated;
this.timestamp = timestamp;
this.duration = duration;
this.handleId = handleId;
}

@Override
public boolean isEvent() {
return timestamp >= 0;
}

@Override
public boolean isPropagated() {
return propagated;
}

@Override
public long getTimestamp() {
return timestamp;
}

@Override
public long getDuration() {
return duration;
}

@Override
public void repropagate(InternalWorkingMemoryEntryPoint ep) {
if (isEvent()) {
FactHandleFactory fhFactory = ep.getHandleFactory();
DefaultEventHandle eFh = fhFactory.createEventFactHandle(fhFactory.getNextId(), getObject(), fhFactory.getNextRecency(), ep, timestamp, duration);
ep.insert(eFh);
} else {
ep.insert(getObject());
}
ep.insert(getObject());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

import java.io.Serializable;

public class SerializableStoredEvent extends BaseStoredEvent {

private final Serializable object;

public SerializableStoredEvent(Object object, boolean propagated, long timestamp, long duration) {
super(propagated, timestamp, duration);
if (!(object instanceof Serializable)) {
throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName());
}
this.object = (Serializable) object;
}

@Override
public Serializable getObject() {
return object;
}

@Override
public String toString() {
return "SerializableStoredEvent{" +
"object=" + object +
", propagated=" + propagated +
", timestamp=" + timestamp +
", duration=" + duration +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ public class SerializableStoredObject extends BaseStoredObject {
private final Serializable object;

public SerializableStoredObject(Object object, boolean propagated) {
this(object, propagated, -1, -1, -1);
}

public SerializableStoredObject(Object object, boolean propagated, long timestamp, long duration, long handleId) {
super(propagated, timestamp, duration, handleId);
super(propagated);
if (!(object instanceof Serializable)) {
throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName());
}
Expand All @@ -43,9 +39,6 @@ public String toString() {
return "SerializableStoredObject{" +
"object=" + object +
", propagated=" + propagated +
", timestamp=" + timestamp +
", duration=" + duration +
", handleId=" + handleId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW
ReliablePseudoClockScheduler clock = (ReliablePseudoClockScheduler) session.getSessionClock();
for (StoredObject storedObject : propagated) {
if (storedObject.isEvent()) {
StoredEvent storedEvent = (StoredEvent) storedObject;
long currentTime = clock.getCurrentTime();
long timestamp = storedObject.getTimestamp();
long timestamp = storedEvent.getTimestamp();
if (currentTime < timestamp) {
clock.advanceTime(timestamp - currentTime, TimeUnit.MILLISECONDS);
}
Expand All @@ -114,16 +115,16 @@ public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagate

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
return handle.isEvent() ?
createStoredObject(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration(), handle.getId()) :
createStoredEvent(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration()) :
createStoredObject(propagated, object);
}

protected StoredObject createStoredObject(boolean propagated, Object object) {
return new SerializableStoredObject(object, propagated);
}

protected StoredObject createStoredObject(boolean propagated, Object object, long timestamp, long duration, long handleId) {
return new SerializableStoredObject(object, propagated, timestamp, duration, handleId);
protected StoredEvent createStoredEvent(boolean propagated, Object object, long timestamp, long duration) {
return new SerializableStoredEvent(object, propagated, timestamp, duration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

public interface StoredEvent extends StoredObject {

@Override
default boolean isEvent() {
return true;
}

long getTimestamp();

long getDuration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

public interface StoredObject {

boolean isEvent();
default boolean isEvent() {
return false;
}

boolean isPropagated();

Object getObject();

long getTimestamp();

long getDuration();

void repropagate(InternalWorkingMemoryEntryPoint ep);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.infinispan.proto;

import org.drools.reliability.core.BaseStoredEvent;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.types.protobuf.AnySchema;

/**
* This class is used to store objects in Infinispan using ProtoStream.
* This class inherits Serializable from BaseStoredEvent, but it uses ProtoStream instead of Java serialization.
*/
public class ProtoStreamStoredEvent extends BaseStoredEvent {

private final transient Object object;

private final transient AnySchema.Any protoObject;

public ProtoStreamStoredEvent(Object object, boolean propagated, long timestamp, long duration) {
super(propagated, timestamp, duration);

this.object = object;
this.protoObject = ProtoStreamUtils.toAnySchema(object);
}

@ProtoFactory
public ProtoStreamStoredEvent(AnySchema.Any protoObject, boolean propagated, long timestamp, long duration) {
super(propagated, timestamp, duration);

this.protoObject = protoObject;
this.object = ProtoStreamUtils.fromAnySchema(protoObject);
}

@ProtoField(value = 1, required = true)
public AnySchema.Any getProtoObject() {
return protoObject;
}

@Override
@ProtoField(value = 2, required = true)
public boolean isPropagated() {
return propagated;
}

@Override
@ProtoField(value = 3, required = true)
public long getTimestamp() {
return timestamp;
}

@Override
@ProtoField(value = 4, required = true)
public long getDuration() {
return duration;
}

@Override
public Object getObject() {
return object;
}

@Override
public String toString() {
return "ProtoStreamStoredEvent{" +
"object=" + object +
", propagated=" + propagated +
", timestamp=" + timestamp +
", duration=" + duration +
'}';
}
}
Loading

0 comments on commit b018b1f

Please sign in to comment.