Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DROOLS-7479] Split StoredEvent from StoredFact #5376

Merged
merged 1 commit into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.rule.accessor.FactHandleFactory;

public abstract class BaseStoredEvent extends BaseStoredObject implements StoredEvent {

protected final long timestamp;
protected final long duration;

protected BaseStoredEvent(boolean propagated, long timestamp, long duration) {
super(propagated);
this.timestamp = timestamp;
this.duration = duration;
}

@Override
public long getTimestamp() {
return timestamp;
}

@Override
public long getDuration() {
return duration;
}

@Override
public void repropagate(InternalWorkingMemoryEntryPoint ep) {
FactHandleFactory fhFactory = ep.getHandleFactory();
DefaultEventHandle eFh = fhFactory.createEventFactHandle(fhFactory.getNextId(), getObject(), fhFactory.getNextRecency(), ep, timestamp, duration);
ep.insert(eFh);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,24 @@

import java.io.Serializable;

import org.drools.core.common.DefaultEventHandle;
import org.drools.core.common.InternalWorkingMemoryEntryPoint;
import org.drools.core.rule.accessor.FactHandleFactory;

public abstract class BaseStoredObject implements StoredObject,
Serializable {

protected final boolean propagated;
protected final long timestamp;
protected final long duration;
protected final long handleId;

protected BaseStoredObject(boolean propagated, long timestamp, long duration, long handleId) {
protected BaseStoredObject(boolean propagated) {
this.propagated = propagated;
this.timestamp = timestamp;
this.duration = duration;
this.handleId = handleId;
}

@Override
public boolean isEvent() {
return timestamp >= 0;
}

@Override
public boolean isPropagated() {
return propagated;
}

@Override
public long getTimestamp() {
return timestamp;
}

@Override
public long getDuration() {
return duration;
}

@Override
public void repropagate(InternalWorkingMemoryEntryPoint ep) {
if (isEvent()) {
FactHandleFactory fhFactory = ep.getHandleFactory();
DefaultEventHandle eFh = fhFactory.createEventFactHandle(fhFactory.getNextId(), getObject(), fhFactory.getNextRecency(), ep, timestamp, duration);
ep.insert(eFh);
} else {
ep.insert(getObject());
}
ep.insert(getObject());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

import java.io.Serializable;

public class SerializableStoredEvent extends BaseStoredEvent {

private final Serializable object;

public SerializableStoredEvent(Object object, boolean propagated, long timestamp, long duration) {
super(propagated, timestamp, duration);
if (!(object instanceof Serializable)) {
throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName());
}
this.object = (Serializable) object;
}

@Override
public Serializable getObject() {
return object;
}

@Override
public String toString() {
return "SerializableStoredEvent{" +
"object=" + object +
", propagated=" + propagated +
", timestamp=" + timestamp +
", duration=" + duration +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ public class SerializableStoredObject extends BaseStoredObject {
private final Serializable object;

public SerializableStoredObject(Object object, boolean propagated) {
this(object, propagated, -1, -1, -1);
}

public SerializableStoredObject(Object object, boolean propagated, long timestamp, long duration, long handleId) {
super(propagated, timestamp, duration, handleId);
super(propagated);
if (!(object instanceof Serializable)) {
throw new IllegalArgumentException("Object must be serializable : " + object.getClass().getCanonicalName());
}
Expand All @@ -43,9 +39,6 @@ public String toString() {
return "SerializableStoredObject{" +
"object=" + object +
", propagated=" + propagated +
", timestamp=" + timestamp +
", duration=" + duration +
", handleId=" + handleId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ private void repropagateWithPseudoClock(InternalWorkingMemory session, InternalW
ReliablePseudoClockScheduler clock = (ReliablePseudoClockScheduler) session.getSessionClock();
for (StoredObject storedObject : propagated) {
if (storedObject.isEvent()) {
StoredEvent storedEvent = (StoredEvent) storedObject;
long currentTime = clock.getCurrentTime();
long timestamp = storedObject.getTimestamp();
long timestamp = storedEvent.getTimestamp();
if (currentTime < timestamp) {
clock.advanceTime(timestamp - currentTime, TimeUnit.MILLISECONDS);
}
Expand All @@ -114,16 +115,16 @@ public void putIntoPersistedStorage(InternalFactHandle handle, boolean propagate

private StoredObject factHandleToStoredObject(InternalFactHandle handle, boolean propagated, Object object) {
return handle.isEvent() ?
createStoredObject(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration(), handle.getId()) :
createStoredEvent(propagated, object, ((DefaultEventHandle) handle).getStartTimestamp(), ((DefaultEventHandle) handle).getDuration()) :
createStoredObject(propagated, object);
}

protected StoredObject createStoredObject(boolean propagated, Object object) {
return new SerializableStoredObject(object, propagated);
}

protected StoredObject createStoredObject(boolean propagated, Object object, long timestamp, long duration, long handleId) {
return new SerializableStoredObject(object, propagated, timestamp, duration, handleId);
protected StoredEvent createStoredEvent(boolean propagated, Object object, long timestamp, long duration) {
return new SerializableStoredEvent(object, propagated, timestamp, duration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.core;

public interface StoredEvent extends StoredObject {

@Override
default boolean isEvent() {
return true;
}

long getTimestamp();

long getDuration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

public interface StoredObject {

boolean isEvent();
default boolean isEvent() {
return false;
}

boolean isPropagated();

Object getObject();

long getTimestamp();

long getDuration();

void repropagate(InternalWorkingMemoryEntryPoint ep);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.drools.reliability.infinispan.proto;

import org.drools.reliability.core.BaseStoredEvent;
import org.infinispan.protostream.annotations.ProtoFactory;
import org.infinispan.protostream.annotations.ProtoField;
import org.infinispan.protostream.types.protobuf.AnySchema;

/**
* This class is used to store objects in Infinispan using ProtoStream.
* This class inherits Serializable from BaseStoredEvent, but it uses ProtoStream instead of Java serialization.
*/
public class ProtoStreamStoredEvent extends BaseStoredEvent {

private final transient Object object;

private final transient AnySchema.Any protoObject;

public ProtoStreamStoredEvent(Object object, boolean propagated, long timestamp, long duration) {
super(propagated, timestamp, duration);

this.object = object;
this.protoObject = ProtoStreamUtils.toAnySchema(object);
}

@ProtoFactory
public ProtoStreamStoredEvent(AnySchema.Any protoObject, boolean propagated, long timestamp, long duration) {
super(propagated, timestamp, duration);

this.protoObject = protoObject;
this.object = ProtoStreamUtils.fromAnySchema(protoObject);
}

@ProtoField(value = 1, required = true)
public AnySchema.Any getProtoObject() {
return protoObject;
}

@Override
@ProtoField(value = 2, required = true)
public boolean isPropagated() {
return propagated;
}

@Override
@ProtoField(value = 3, required = true)
public long getTimestamp() {
return timestamp;
}

@Override
@ProtoField(value = 4, required = true)
public long getDuration() {
return duration;
}

@Override
public Object getObject() {
return object;
}

@Override
public String toString() {
return "ProtoStreamStoredEvent{" +
"object=" + object +
", propagated=" + propagated +
", timestamp=" + timestamp +
", duration=" + duration +
'}';
}
}
Loading