diff --git a/common/src/main/java/org/red5/server/net/rtmp/event/Aggregate.java b/common/src/main/java/org/red5/server/net/rtmp/event/Aggregate.java index b61a1d752..e69de05b5 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/event/Aggregate.java +++ b/common/src/main/java/org/red5/server/net/rtmp/event/Aggregate.java @@ -36,11 +36,6 @@ public class Aggregate extends BaseEvent implements IoConstants, IStreamData, IStr private static final long serialVersionUID = -4102940670913999407L; - protected IoBuffer data; - /** * Data type */ @@ -253,6 +251,19 @@ public void writeExternal(ObjectOutput out) throws IOException { } } + @Override + public BaseEvent forkedDuplicate() { + + AudioData fork = new AudioData(super.concurrentDataCopy()); + fork.setTimestamp(this.timestamp); + if (header != null) { + fork.setHeader(header.clone()); + } + fork.setSource(this.getSource()); + fork.setSourceType(this.getSourceType()); + return fork; + } + /** * Duplicate this message / event. * diff --git a/common/src/main/java/org/red5/server/net/rtmp/event/BaseEvent.java b/common/src/main/java/org/red5/server/net/rtmp/event/BaseEvent.java index ad89dad51..3f3a031fc 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/event/BaseEvent.java +++ b/common/src/main/java/org/red5/server/net/rtmp/event/BaseEvent.java @@ -13,6 +13,7 @@ import java.io.ObjectOutput; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.mina.core.buffer.IoBuffer; import org.red5.server.api.event.IEventListener; import org.red5.server.net.rtmp.message.Constants; import org.red5.server.net.rtmp.message.Header; @@ -33,11 +34,20 @@ public abstract class BaseEvent implements Constants, IRTMPEvent, Externalizable // (2) make it aspect oriented private static final boolean allocationDebugging = false; + protected AtomicInteger forkRefs = new AtomicInteger(); + /** * Event type */ private Type type; + /** + * Multi-threaded copy-able array. + */ + protected ForkableData forkableData; + + protected IoBuffer data; + /** * Source type */ @@ -86,6 +96,41 @@ public BaseEvent(Type type) { this(type, null); } + /** + * Creates a raw byte array which is used to allow concurrent non-blocking copying of this event. + * Owning thread should call addForkReference for each thread making a copy and copying threads should call removeForkReference. + * @return + */ + public boolean prepareForkedDuplication() { + if (data != null && data.rewind().hasRemaining()) { + byte[] bytes = new byte[data.remaining()]; + data.get(bytes); + data.rewind(); + forkableData = new ForkableData(bytes); + return true; + } + return false; + } + + protected IoBuffer concurrentDataCopy() { + return IoBuffer.wrap(forkableData.rawData).asReadOnlyBuffer(); + } + + public void addForkReference() { + forkRefs.incrementAndGet(); + } + + public void removeForkReference() { + int ref = forkRefs.decrementAndGet(); + if (ref <= 0) { + forkableData = null; + } + } + + public BaseEvent forkedDuplicate() { + return null; + }; + /** * Create new event of given type * @@ -290,4 +335,15 @@ public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(timestamp); } + /** + * Provides final field of bytes for mounting non-blocking concurrent read access. + */ + private static class ForkableData { + final byte[] rawData; + + private ForkableData(byte[] bytes) { + rawData = bytes; + } + } + } diff --git a/common/src/main/java/org/red5/server/net/rtmp/event/Notify.java b/common/src/main/java/org/red5/server/net/rtmp/event/Notify.java index f7ec39dbf..dc6d0d652 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/event/Notify.java +++ b/common/src/main/java/org/red5/server/net/rtmp/event/Notify.java @@ -36,11 +36,6 @@ public class Notify extends BaseEvent implements ICommand, IStreamData, */ protected IServiceCall call; - /** - * Event data - */ - protected IoBuffer data; - /** * Event data type */ @@ -273,6 +268,21 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept } } + @Override + public BaseEvent forkedDuplicate() { + + Notify fork = new Notify(super.concurrentDataCopy()); + fork.setTimestamp(this.timestamp); + fork.setAction(action); + fork.setCall(call); + if (header != null) { + fork.setHeader(header.clone()); + } + fork.setSource(this.getSource()); + fork.setSourceType(this.getSourceType()); + return fork; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { diff --git a/common/src/main/java/org/red5/server/net/rtmp/event/Unknown.java b/common/src/main/java/org/red5/server/net/rtmp/event/Unknown.java index b3f38c41b..99546fcbd 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/event/Unknown.java +++ b/common/src/main/java/org/red5/server/net/rtmp/event/Unknown.java @@ -22,11 +22,6 @@ public class Unknown extends BaseEvent { private static final long serialVersionUID = -1352770037962252975L; - /** - * Event data - */ - protected IoBuffer data; - /** * Type of data */ diff --git a/common/src/main/java/org/red5/server/net/rtmp/event/VideoData.java b/common/src/main/java/org/red5/server/net/rtmp/event/VideoData.java index 7c12c1619..3304717d0 100644 --- a/common/src/main/java/org/red5/server/net/rtmp/event/VideoData.java +++ b/common/src/main/java/org/red5/server/net/rtmp/event/VideoData.java @@ -33,11 +33,6 @@ public class VideoData extends BaseEvent implements IoConstants, IStreamDataRed5 The Red5 server org.red5 - 2.0.18 + 2.0.19 https://github.com/Red5/red5-server 2005