Skip to content

Commit

Permalink
Update with WS fixes from master
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Oct 23, 2024
1 parent 97d2c6c commit b4f3afd
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 130 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Subreddit: [r/red5](http://www.reddit.com/r/red5)

Automatic builds (Courtesy of Apache [OpenMeetings](http://openmeetings.apache.org/)):

* [Red5](https://builds.apache.org/view/M-R/view/OpenMeetings/job/Red5-server/)
* [Windows Installer](https://builds.apache.org/view/M-R/view/OpenMeetings/job/red5-installer/)
* [Red5](https://ci-builds.apache.org/job/OpenMeetings/job/Red5-server/)
* [Windows Installer](https://ci-builds.apache.org/job/OpenMeetings/job/red5-installer/)

__Note on Bootstrap__ The bootstrap and shutdown classes have been moved to the [red5-service](https://github.com/Red5/red5-service) project; the dependency has been added to this projects pom.

Expand Down
13 changes: 2 additions & 11 deletions io/src/main/java/org/red5/io/utils/TlsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
public class TlsUtils {

@SuppressWarnings("unused")
private static byte[] DOWNGRADE_TLS11 = Hex.decodeStrict("444F574E47524400");

@SuppressWarnings("unused")
private static byte[] DOWNGRADE_TLS12 = Hex.decodeStrict("444F574E47524401");
private static byte[] DOWNGRADE_TLS11 = Hex.decodeStrict("444F574E47524400"), DOWNGRADE_TLS12 = Hex.decodeStrict("444F574E47524401");

public static final byte[] EMPTY_BYTES = new byte[0];

Expand Down Expand Up @@ -610,9 +607,7 @@ public static int[] readUint16Array(int count, InputStream input) throws IOExcep
}

public static ASN1Primitive readASN1Object(byte[] encoding) throws IOException {
ASN1InputStream asn1 = null;
try {
asn1 = new ASN1InputStream(encoding);
try (ASN1InputStream asn1 = new ASN1InputStream(encoding)) {
ASN1Primitive result = asn1.readObject();
if (null == result) {
throw new IOException("AlertDescription.decode_error");
Expand All @@ -621,10 +616,6 @@ public static ASN1Primitive readASN1Object(byte[] encoding) throws IOException {
throw new IOException("AlertDescription.decode_error");
}
return result;
} finally {
if (asn1 != null) {
asn1.close();
}
}
}

Expand Down
117 changes: 52 additions & 65 deletions server/src/main/java/org/red5/net/websocket/WebSocketConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.stream.Stream;

import jakarta.websocket.Extension;
import jakarta.websocket.Session;

import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.websocket.Constants;
import org.apache.tomcat.websocket.WsSession;
import org.red5.server.AttributeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.websocket.CloseReason;
import jakarta.websocket.CloseReason.CloseCode;
import jakarta.websocket.CloseReason.CloseCodes;
import jakarta.websocket.Extension;
import jakarta.websocket.Session;

/**
* WebSocketConnection <br>
* This class represents a WebSocket connection with a client (browser).
Expand Down Expand Up @@ -63,8 +66,9 @@ public class WebSocketConnection extends AttributeStore implements Comparable<We
private AtomicBoolean connected = new AtomicBoolean(false);

// associated websocket session
private WeakReference<WsSession> wsSession;
private final WsSession wsSession;

// reference to the scope for manager access
private WeakReference<WebSocketScope> scope;

// unique identifier for the session
Expand Down Expand Up @@ -114,9 +118,9 @@ public WebSocketConnection(WebSocketScope scope, Session session) {
log.debug("path: {}", path);
}
// cast ws session
this.wsSession = new WeakReference<>((WsSession) session);
this.wsSession = (WsSession) session;
if (isDebug) {
log.debug("ws session: {}", wsSession.get());
log.debug("ws session: {}", wsSession);
}
// the websocket session id will be used for hash code comparison, its the only usable value currently
wsSessionId = session.getId();
Expand Down Expand Up @@ -163,6 +167,8 @@ public WebSocketConnection(WebSocketScope scope, Session session) {
// add the timeouts to the user props
userProps.put(Constants.READ_IDLE_TIMEOUT_MS, readTimeout);
userProps.put(Constants.WRITE_IDLE_TIMEOUT_MS, sendTimeout);
// set the close timeout to 5 seconds
userProps.put(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, TimeUnit.SECONDS.toMillis(5));
if (isDebug) {
log.debug("userProps: {}", userProps);
}
Expand All @@ -186,9 +192,8 @@ public void send(String data) throws UnsupportedEncodingException, IOException {
}
// process the incoming string
if (StringUtils.isNotBlank(data)) {
final WsSession session = wsSession.get();
// attempt send only if the session is not closed
if (session != null && !session.isClosed()) {
if (!wsSession.isClosed()) {
try {
if (useAsync) {
if (sendFuture != null && !sendFuture.isDone()) {
Expand All @@ -197,21 +202,21 @@ public void send(String data) throws UnsupportedEncodingException, IOException {
} catch (TimeoutException e) {
log.warn("Send timed out {}", wsSessionId);
// if the session is not open, cancel the future
if (!session.isOpen()) {
if (!wsSession.isOpen()) {
sendFuture.cancel(true);
return;
}
}
}
synchronized (wsSessionId) {
int lengthToWrite = data.getBytes().length;
sendFuture = session.getAsyncRemote().sendText(data);
sendFuture = wsSession.getAsyncRemote().sendText(data);
updateWriteBytes(lengthToWrite);
}
} else {
synchronized (wsSessionId) {
int lengthToWrite = data.getBytes().length;
session.getBasicRemote().sendText(data);
wsSession.getBasicRemote().sendText(data);
updateWriteBytes(lengthToWrite);
}
}
Expand All @@ -236,8 +241,7 @@ public void send(byte[] buf) throws IOException {
if (isDebug) {
log.debug("send binary: {}", Arrays.toString(buf));
}
WsSession session = wsSession.get();
if (session != null && session.isOpen()) {
if (!wsSession.isClosed()) {
try {
// send the bytes
if (useAsync) {
Expand All @@ -253,12 +257,12 @@ public void send(byte[] buf) throws IOException {
}
}
synchronized (wsSessionId) {
sendFuture = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf));
sendFuture = wsSession.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf));
updateWriteBytes(buf.length);
}
} else {
synchronized (wsSessionId) {
session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf));
wsSession.getBasicRemote().sendBinary(ByteBuffer.wrap(buf));
updateWriteBytes(buf.length);
}
}
Expand All @@ -281,11 +285,10 @@ public void sendPing(byte[] buf) throws IllegalArgumentException, IOException {
if (isTrace) {
log.trace("send ping: {}", buf);
}
WsSession session = wsSession.get();
if (session != null && session.isOpen()) {
if (!wsSession.isClosed()) {
synchronized (wsSessionId) {
// send the bytes
session.getBasicRemote().sendPing(ByteBuffer.wrap(buf));
wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(buf));
// update counter
updateWriteBytes(buf.length);
}
Expand All @@ -305,11 +308,10 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
if (isTrace) {
log.trace("send pong: {}", buf);
}
WsSession session = wsSession.get();
if (session != null && session.isOpen()) {
if (!wsSession.isClosed()) {
synchronized (wsSessionId) {
// send the bytes
session.getBasicRemote().sendPong(ByteBuffer.wrap(buf));
wsSession.getBasicRemote().sendPong(ByteBuffer.wrap(buf));
// update counter
updateWriteBytes(buf.length);
}
Expand All @@ -319,20 +321,36 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException {
}

/**
* close Connection
* Close the connection.
*/
public void close() {
close(CloseCodes.NORMAL_CLOSURE, "");
}

/**
* Close the connection with a reason.
*
* @param code CloseCode
* @param reasonPhrase short reason for closing
*/
public void close(CloseCode code, String reasonPhrase) {
if (connected.compareAndSet(true, false)) {
log.debug("close: {}", wsSessionId);
WsSession session = wsSession != null ? wsSession.get() : null;
// session has to be open, or user props cannot be retrieved
if (session != null && session.isOpen()) {
// trying to close the session nicely
try {
session.close();
} catch (Exception e) {
log.debug("Exception closing session", e);
// no blank reasons
if (reasonPhrase == null) {
reasonPhrase = "";
}
log.debug("close: {} code: {} reason: {}", wsSessionId, code, reasonPhrase);
try {
// close the session if open
if (wsSession.isOpen()) {
CloseReason reason = new CloseReason(code, reasonPhrase);
if (isDebug) {
log.debug("Closing session: {} with reason: {}", wsSessionId, reason);
}
wsSession.close(reason);
}
} catch (Exception e) {
log.debug("Exception closing session", e);
}
// clean up our props
attributes.clear();
Expand All @@ -347,40 +365,9 @@ public void close() {
if (headers != null) {
headers = null;
}
if (scope.get() != null) {
// disconnect from scope
scope.get().removeConnection(this);
// clear weak refs
wsSession.clear();
scope.clear();
}
}
}

/*
WsSession uses these userProperties for checkExpiration along with maxIdleTimeout
configuration for read idle timeout on WebSocket session
READ_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.READ_IDLE_TIMEOUT_MS";
configuration for write idle timeout on WebSocket session
WRITE_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.WRITE_IDLE_TIMEOUT_MS";
*/
public void timeoutAsync(long now) {
// XXX(paul) only logging here as we should more than likely rely upon the container checking expiration
log.trace("timeoutAsync: {} on session id: {} read: {} written: {}", now, wsSessionId, readBytes, writtenBytes);
/*
WsSession session = wsSession.get();
Map<String, Object> props = session.getUserProperties();
log.debug("Session properties: {}", props);
long maxIdleTimeout = session.getMaxIdleTimeout();
long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS);
long sendTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS);
log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, sendTimeout);
//long readDelta = (now - lastReadTime), writeDelta = (now - lastWriteTime);
//log.debug("timeoutAsync: {} on {} last read: {} last write: {}", now, wsSessionId, readDelta, writeDelta);
*/
}

/**
* Async send is enabled in non-Windows based systems; this provides a means to override it.
*
Expand Down Expand Up @@ -453,7 +440,7 @@ public void setOrigin(String origin) {
* @return true if secure and false if unsecure or unconnected
*/
public boolean isSecure() {
Optional<WsSession> opt = Optional.ofNullable(wsSession.get());
Optional<WsSession> opt = Optional.ofNullable(wsSession);
if (opt.isPresent()) {
return (opt.get().isOpen() ? opt.get().isSecure() : false);
}
Expand Down Expand Up @@ -672,12 +659,12 @@ public Object getUserProperty(String key) {

public void setWsSessionTimeout(long idleTimeout) {
if (wsSession != null) {
wsSession.get().setMaxIdleTimeout(idleTimeout);
wsSession.setMaxIdleTimeout(idleTimeout);
}
}

public WsSession getWsSession() {
return wsSession != null ? wsSession.get() : null;
return wsSession != null ? wsSession : null;
}

public long getReadBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

import jakarta.websocket.CloseReason.CloseCodes;

/**
* WebSocketScope contains an IScope and keeps track of WebSocketConnection and IWebSocketDataListener instances.
*
Expand Down Expand Up @@ -91,7 +93,7 @@ public void unregister() {
// clean up the connections by first closing them
conns.forEach(conn -> {
if (conns.remove(conn)) {
conn.close();
conn.close(CloseCodes.GOING_AWAY, "WebSocket scope removed");
}
});
// clean up the listeners by first stopping them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.websocket.CloseReason.CloseCodes;

/**
* Manages websocket scopes and listeners.
*
Expand Down Expand Up @@ -172,15 +174,12 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) {
wsConn.sendPing(PING_BYTES);
} catch (Exception e) {
log.debug("Exception pinging connection: {} connection will be closed", wsConn.getSessionId(), e);
// if the connection isn't connected, remove them
wsScope.removeConnection(wsConn);
// if the ping fails, consider them gone
wsConn.close();
wsConn.close(CloseCodes.CLOSED_ABNORMALLY, e.getMessage());
}
} else {
log.debug("Removing unconnected connection: {} during ping loop", wsConn.getSessionId());
// if the connection isn't connected, remove them
wsScope.removeConnection(wsConn);
wsConn.close(CloseCodes.UNEXPECTED_CONDITION, "Connection not connected");
}
} catch (Exception e) {
log.warn("Exception in WS pinger", e);
Expand Down
Loading

0 comments on commit b4f3afd

Please sign in to comment.