diff --git a/SSE-README.md b/SSE-README.md new file mode 100644 index 000000000..420b08b63 --- /dev/null +++ b/SSE-README.md @@ -0,0 +1,381 @@ +# Red5 Server-Sent Events (SSE) Implementation + +This document describes the Server-Sent Events (SSE) implementation for the Red5 server, which provides W3C-compliant SSE functionality integrated with the existing Red5 Tomcat servlet infrastructure. + +## Overview + +The SSE implementation consists of several components that work together to provide real-time server-to-client communication: + +- **SSEConnection**: Manages individual SSE connections +- **SSEManager**: Handles connection lifecycle and broadcasting +- **SSEServlet**: HTTP servlet that establishes SSE connections +- **SSEService**: High-level API for application developers +- **SSEEvent**: Data structure for SSE events +- **SSEApplicationAdapter**: Example Red5 application adapter with built-in SSE support + +## Features + +- **W3C SSE Specification Compliance**: Full support for id, event, data, and retry fields +- **Connection Management**: Automatic cleanup of stale connections +- **Keep-alive Support**: Optional keep-alive messages to maintain connections +- **Scope Integration**: Events can be broadcast to specific Red5 scopes +- **CORS Support**: Built-in Cross-Origin Resource Sharing support +- **Async Servlet Processing**: Non-blocking connection handling +- **Thread-safe**: Concurrent connection management + +## Architecture + +### Core Components + +```plaintext +┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ SSEServlet │────│ SSEManager │────│ SSEConnection │ +│ │ │ │ │ │ +│ - HTTP endpoint │ │ - Lifecycle mgmt │ │ - Client conn │ +│ - Async support │ │ - Broadcasting │ │ - Event sending │ +│ - CORS handling │ │ - Cleanup tasks │ │ - Keep-alive │ +└─────────────────┘ └──────────────────┘ └─────────────────┘ + │ │ + │ │ + ▼ ▼ +┌─────────────────┐ ┌──────────────────┐ +│ SSEService │ │ SSEApplication │ +│ │ │ Adapter │ +│ - High-level │ │ │ +│ API │ │ - Red5 events │ +│ - Application │ │ - SSE integration│ +│ interface │ │ - Broadcasting │ +└─────────────────┘ └──────────────────┘ +``` + +## Installation and Configuration + +### 1. Include SSE Configuration + +Add the SSE beans to your Spring configuration, in `red5-common.xml` (they should already be present if you are using a recent version of Red5): + +```xml + + + + + + + + + + + + + + + + +``` + +Enable the SSE service in your Red5 Tomcat / JEE setup by setting `sse.enabled` to `true` in `red5.properties`: + +```properties +sse.enabled=true +``` + +This will carryover to the Tomcat loader configuration: + +```xml + + + + +``` + +### 2. Configure Web Application + +Update your `web.xml` to include the SSE servlet: + +```xml + + sse + org.red5.server.net.sse.SSEServlet + -1 + true + + cors.enabled + true + + + + + sse + /events + /events/* + +``` + +If an external CORS filter is used, you can disable CORS directly in the servlet by adding the following init parameters: + +```xml + + cors.enabled + false + +``` + +Ensure that the CORS filter allows these headers: + +- `Accept` +- `Cache-Control` +- `Last-Event-ID` + +### 3. Configuration Properties + +Configure SSE behavior using properties in `red5.properties`: + +```properties +# Enable/disable SSE support (default: true) +sse.enabled=true + +# Connection timeout in milliseconds (default: 5 minutes) +sse.connection.timeout.ms=300000 + +# Keep-alive interval in milliseconds (default: 30 seconds) +sse.keepalive.interval.ms=30000 + +# Enable/disable keep-alive messages (default: true) +sse.keepalive.enabled=true +``` + +## Usage + +### Using the SSE Service + +```java +@Autowired +private ISSEService sseService; + +// Broadcast to all connections +sseService.broadcastMessage("Hello, everyone!"); + +// Broadcast with event type +sseService.broadcastEvent("notification", "New message received"); + +// Broadcast to specific scope +sseService.broadcastToScope(scope, "Welcome to the application"); + +// Send to specific connection +sseService.sendToConnection(connectionId, "Personal message"); +``` + +### Using the SSE Application Adapter + +```java +public class MyApplication extends SSEApplicationAdapter { + + @Override + public boolean appConnect(IConnection conn, Object[] params) { + // Parent method automatically broadcasts user.connect event + boolean result = super.appConnect(conn, params); + + // Send custom welcome message via SSE + broadcastSSEEvent("welcome", "New user joined: " + conn.getClient().getId()); + + return result; + } + + public void sendNotification(String message) { + // Custom method to send notifications + broadcastSSEEvent("notification", message); + } +} +``` + +### Creating SSE Events + +```java +// Simple message +SSEEvent event1 = SSEEvent.message("Hello World"); + +// Event with type and data +SSEEvent event2 = SSEEvent.of("chat", "User says hello"); + +// Full event with all fields +SSEEvent event3 = SSEEvent.builder() + .setId("msg-123") + .setEvent("notification") + .setData("System maintenance in 5 minutes") + .setRetry(5000); + +sseService.broadcastEvent(event3); +``` + +## Client-Side Usage + +### JavaScript EventSource + +```javascript +// Connect to SSE endpoint +const eventSource = new EventSource('/events'); + +// Handle connection events +eventSource.onopen = function(event) { + console.log('Connected to SSE'); +}; + +// Handle messages (default event type) +eventSource.onmessage = function(event) { + console.log('Message:', event.data); +}; + +// Handle custom event types +eventSource.addEventListener('notification', function(event) { + console.log('Notification:', event.data); +}); + +eventSource.addEventListener('user.connect', function(event) { + console.log('User connected:', event.data); +}); + +// Handle errors +eventSource.onerror = function(event) { + console.error('SSE error:', event); +}; + +// Close connection when done +eventSource.close(); +``` + +### Testing + +A test HTML page is provided at `/sse-test.html` which demonstrates: + +- Connecting to the SSE endpoint +- Receiving various event types +- Connection status monitoring +- Event logging and statistics + +## SSE Event Format + +The implementation follows the W3C Server-Sent Events specification: + +```plaintext +id: unique-event-id +event: event-type +data: event data line 1 +data: event data line 2 +retry: 5000 + +``` + +### Event Fields + +- **id**: Unique identifier for the event (optional) +- **event**: Event type name (optional, defaults to "message") +- **data**: Event payload (can be multi-line) +- **retry**: Reconnection timeout in milliseconds (optional) + +## Integration with Red5 Features + +### Scope-based Broadcasting + +Events can be broadcast to specific Red5 scopes, allowing for: + +- Room-based messaging +- Application-specific notifications +- User group communications + +### Connection Management + +SSE connections are managed alongside Red5's existing connection handling: + +- Automatic cleanup when connections are lost +- Integration with Red5's scope lifecycle +- Thread-safe concurrent access + +### Application Events + +The SSEApplicationAdapter automatically broadcasts SSE events for: + +- Application start/stop +- User connect/disconnect +- Custom application events + +## Performance Considerations + +### Connection Limits + +- Each SSE connection consumes one thread in async mode +- Configure thread pools appropriately for expected load +- Monitor connection count and cleanup effectiveness + +### Memory Usage + +- Each connection maintains minimal state +- Connection timeout prevents memory leaks +- Regular cleanup cycles remove stale connections + +### Network Efficiency + +- Keep-alive messages maintain connection state +- Event compression is handled by HTTP layer +- Batch broadcasts are more efficient than individual sends + +## Troubleshooting + +### Common Issues + +1. **Connection Timeouts** + - Increase `sse.connection.timeout.ms` + - Check network stability + - Verify keep-alive configuration + +2. **CORS Errors** + - Ensure proper CORS headers are set + - Check origin restrictions + - Verify browser CORS policy compliance + +3. **Memory Leaks** + - Monitor connection cleanup logs + - Verify timeout configuration + - Check for proper connection closing + +### Logging + +Enable debug logging for SSE components: + +```xml + +``` + +## Security Considerations + +- **Authentication**: Implement authentication in the servlet if needed +- **Authorization**: Control access to different event types/scopes +- **Rate Limiting**: Consider implementing rate limiting for broadcast operations +- **Input Validation**: Validate all event data before broadcasting +- **CORS**: Configure CORS policies appropriately for your deployment + +## Compliance and Standards + +This implementation follows: + +- W3C Server-Sent Events specification +- HTTP/1.1 and HTTP/2 compatibility +- Jakarta EE servlet specification +- Red5 coding standards and patterns + +## Files Created + +### Core Implementation + +- `org.red5.server.net.sse.SSEConnection` +- `org.red5.server.net.sse.SSEManager` +- `org.red5.server.net.sse.SSEServlet` +- `org.red5.server.net.sse.SSEEvent` +- `org.red5.server.net.sse.ISSEService` +- `org.red5.server.net.sse.SSEService` +- `org.red5.server.adapter.SSEApplicationAdapter` + +### Test + +- `server/src/main/server/webapps/root/sse-test.html` + +This SSE implementation provides a robust, standards-compliant solution for real-time server-to-client communication in Red5 applications. diff --git a/common/pom.xml b/common/pom.xml index 6366b09f9..8fb4ca820 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -88,7 +88,8 @@ net.engio mbassador - 2.0.23 + 1.3.2 --> + diff --git a/common/src/main/java/org/red5/server/ContextLoader.java b/common/src/main/java/org/red5/server/ContextLoader.java index 47994fb5d..9f3b41c44 100644 --- a/common/src/main/java/org/red5/server/ContextLoader.java +++ b/common/src/main/java/org/red5/server/ContextLoader.java @@ -21,9 +21,9 @@ import javax.management.ObjectName; import javax.management.StandardMBean; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.jmx.mxbeans.ContextLoaderMXBean; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactoryUtils; import org.springframework.beans.factory.DisposableBean; @@ -49,7 +49,7 @@ public class ContextLoader implements ApplicationContextAware, InitializingBean, DisposableBean, ContextLoaderMXBean { /** Constant log */ - protected static Logger log = Red5LoggerFactory.getLogger(ContextLoader.class); + protected static Logger log = LoggerFactory.getLogger(ContextLoader.class); /** * Spring Application context diff --git a/common/src/main/java/org/red5/server/LoaderBase.java b/common/src/main/java/org/red5/server/LoaderBase.java index 68d925352..f9e010abd 100644 --- a/common/src/main/java/org/red5/server/LoaderBase.java +++ b/common/src/main/java/org/red5/server/LoaderBase.java @@ -11,10 +11,11 @@ import java.util.HashMap; import java.util.Map; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.IApplicationContext; import org.red5.server.api.IApplicationLoader; +import org.red5.server.net.sse.ISSEService; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -28,7 +29,7 @@ */ public abstract class LoaderBase implements ApplicationContextAware { - private static Logger log = Red5LoggerFactory.getLogger(LoaderBase.class); + private static Logger log = LoggerFactory.getLogger(LoaderBase.class); /** * We store the application context so we can access it later. @@ -50,6 +51,20 @@ public abstract class LoaderBase implements ApplicationContextAware { */ protected String webappFolder = null; + /** + * Singleton instance. + */ + protected static LoaderBase instance; + + /** + * Getter for the singleton instance. + * + * @return LoaderBase instance + */ + public static LoaderBase getInstance() { + return instance; + } + /** * Getter for the application loader. * @@ -174,4 +189,13 @@ public void removeContext(String path) { throw new UnsupportedOperationException(); } + /** + * Returns the SSE service if available. + * + * @return the SSE service + */ + public ISSEService getSSEService() { + throw new UnsupportedOperationException("Unimplemented method 'getSSEService'"); + } + } diff --git a/common/src/main/java/org/red5/server/api/IApplicationLoader.java b/common/src/main/java/org/red5/server/api/IApplicationLoader.java index 2dc5136fa..ba05b0965 100644 --- a/common/src/main/java/org/red5/server/api/IApplicationLoader.java +++ b/common/src/main/java/org/red5/server/api/IApplicationLoader.java @@ -37,4 +37,5 @@ public interface IApplicationLoader { * @return application context */ public ApplicationContext getRootContext(); + } diff --git a/common/src/main/java/org/red5/server/net/servlet/ServletUtils.java b/common/src/main/java/org/red5/server/net/servlet/ServletUtils.java index e625543df..8339c515a 100644 --- a/common/src/main/java/org/red5/server/net/servlet/ServletUtils.java +++ b/common/src/main/java/org/red5/server/net/servlet/ServletUtils.java @@ -15,10 +15,10 @@ import java.util.Collections; import java.util.List; -import jakarta.servlet.http.HttpServletRequest; - -import org.red5.logging.Red5LoggerFactory; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import jakarta.servlet.http.HttpServletRequest; /** *

ServletUtils class.

@@ -27,7 +27,7 @@ */ public class ServletUtils { - private static Logger log = Red5LoggerFactory.getLogger(ServletUtils.class); + private static Logger log = LoggerFactory.getLogger(ServletUtils.class); /** * Default value is 2048. diff --git a/common/src/main/java/org/red5/server/net/sse/ISSEService.java b/common/src/main/java/org/red5/server/net/sse/ISSEService.java new file mode 100644 index 000000000..2f0457e1d --- /dev/null +++ b/common/src/main/java/org/red5/server/net/sse/ISSEService.java @@ -0,0 +1,144 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.net.sse; + +import java.util.Collection; + +import org.red5.server.api.scope.IScope; +import org.red5.server.api.scope.IScopeService; + +/** + * Interface for Server-Sent Events service operations. + * This interface provides methods for managing SSE connections and broadcasting events. + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public interface ISSEService extends IScopeService { + + /** Constant BEAN_NAME="sseService" */ + public static String BEAN_NAME = "sseService"; + + /** + * Broadcasts a message to all connected SSE clients. + * + * @param message The message to broadcast + * @return The number of successful sends + */ + int broadcastMessage(String message); + + /** + * Broadcasts an event to all connected SSE clients. + * + * @param event The event type + * @param message The message to broadcast + * @return The number of successful sends + */ + int broadcastEvent(String event, String message); + + /** + * Broadcasts an SSE event to all connected clients. + * + * @param sseEvent The SSE event to broadcast + * @return The number of successful sends + */ + int broadcastEvent(SSEEvent sseEvent); + + /** + * Broadcasts a message to all clients in a specific scope. + * + * @param scope The scope to broadcast to + * @param message The message to broadcast + * @return The number of successful sends + */ + int broadcastToScope(IScope scope, String message); + + /** + * Broadcasts an event to all clients in a specific scope. + * + * @param scope The scope to broadcast to + * @param event The event type + * @param message The message to broadcast + * @return The number of successful sends + */ + int broadcastEventToScope(IScope scope, String event, String message); + + /** + * Broadcasts an SSE event to all clients in a specific scope. + * + * @param scope The scope to broadcast to + * @param sseEvent The SSE event to broadcast + * @return The number of successful sends + */ + int broadcastEventToScope(IScope scope, SSEEvent sseEvent); + + /** + * Sends a message to a specific connection. + * + * @param connectionId The connection ID + * @param message The message to send + * @return true if the message was sent successfully + */ + boolean sendToConnection(String connectionId, String message); + + /** + * Sends an event to a specific connection. + * + * @param connectionId The connection ID + * @param event The event type + * @param message The message to send + * @return true if the event was sent successfully + */ + boolean sendEventToConnection(String connectionId, String event, String message); + + /** + * Sends an SSE event to a specific connection. + * + * @param connectionId The connection ID + * @param sseEvent The SSE event to send + * @return true if the event was sent successfully + */ + boolean sendEventToConnection(String connectionId, SSEEvent sseEvent); + + /** + * Gets all active SSE connections. + * + * @return Collection of all active connections + */ + Collection getAllConnections(); + + /** + * Gets the number of active connections. + * + * @return number of active connections + */ + int getConnectionCount(); + + /** + * Gets the number of connections in a specific scope. + * + * @param scope The scope to count connections for + * @return The number of connections in the scope + */ + long getConnectionsInScope(IScope scope); + + /** + * Gets a specific connection by ID. + * + * @param connectionId The connection ID + * @return The connection, or null if not found + */ + SSEConnection getConnection(String connectionId); + + /** + * Closes a specific connection. + * + * @param connectionId The connection ID to close + * @return true if the connection was found and closed + */ + boolean closeConnection(String connectionId); +} \ No newline at end of file diff --git a/common/src/main/java/org/red5/server/net/sse/SSEConnection.java b/common/src/main/java/org/red5/server/net/sse/SSEConnection.java new file mode 100644 index 000000000..6ac35ca57 --- /dev/null +++ b/common/src/main/java/org/red5/server/net/sse/SSEConnection.java @@ -0,0 +1,239 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.net.sse; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.red5.server.api.scope.IScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.http.HttpServletResponse; + +/** + * Represents a Server-Sent Events connection to a client. + * This class manages the lifecycle of an SSE connection including sending events, + * handling connection state, and proper cleanup. + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public class SSEConnection { + + private static Logger log = LoggerFactory.getLogger(SSEConnection.class); + + private final String connectionId; + + private final AsyncContext asyncContext; + + private final HttpServletResponse response; + + private final IScope scope; + + private final AtomicBoolean connected = new AtomicBoolean(true); + + private final AtomicLong lastEventId = new AtomicLong(0); + + private volatile long lastActivity; + + /** + * Creates a new SSE connection. + * + * @param connectionId Unique identifier for this connection + * @param asyncContext The async servlet context + * @param response The HTTP response + * @param scope The Red5 scope this connection belongs to + */ + public SSEConnection(String connectionId, AsyncContext asyncContext, HttpServletResponse response, IScope scope) { + this.connectionId = connectionId; + this.asyncContext = asyncContext; + this.response = response; + this.scope = scope; + this.lastActivity = System.currentTimeMillis(); + // Set up SSE headers + response.setContentType("text/event-stream"); + response.setCharacterEncoding("UTF-8"); + response.setHeader("Cache-Control", "no-cache"); + response.setHeader("Connection", "keep-alive"); + response.setHeader("Access-Control-Allow-Origin", "*"); + response.setHeader("Access-Control-Allow-Credentials", "true"); + response.setHeader("Access-Control-Allow-Headers", "Cache-Control"); + log.debug("Created SSE connection: {} for scope: {}", connectionId, scope.getName()); + } + + /** + * Sends a simple message event. + * + * @param message The message to send + * @return true if the message was sent successfully + */ + public boolean sendMessage(String message) { + return sendEvent(null, null, message, null); + } + + /** + * Sends a message event with a specific event type. + * + * @param event The event type + * @param message The message to send + * @return true if the message was sent successfully + */ + public boolean sendEvent(String event, String message) { + return sendEvent(null, event, message, null); + } + + /** + * Sends a complete SSE event with all optional fields. + * + * @param id The event ID (optional) + * @param event The event type (optional) + * @param data The event data + * @param retry The retry timeout in milliseconds (optional) + * @return true if the event was sent successfully + */ + public boolean sendEvent(String id, String event, String data, Integer retry) { + if (isConnected()) { + try { + StringBuilder eventBuilder = new StringBuilder(); + if (id != null) { + eventBuilder.append("id: ").append(id).append("\n"); + } else { + // Auto-generate ID if not provided + eventBuilder.append("id: ").append(lastEventId.incrementAndGet()).append("\n"); + } + if (event != null) { + eventBuilder.append("event: ").append(event).append("\n"); + } + if (retry != null) { + eventBuilder.append("retry: ").append(retry).append("\n"); + } + if (data != null) { + // Handle multi-line data + String[] lines = data.split("\n"); + for (String line : lines) { + eventBuilder.append("data: ").append(line).append("\n"); + } + } + eventBuilder.append("\n"); // End of event + byte[] eventBytes = eventBuilder.toString().getBytes(StandardCharsets.UTF_8); + OutputStream outputStream = response.getOutputStream(); + if (outputStream != null) { + outputStream.write(eventBytes); + outputStream.flush(); + lastActivity = System.currentTimeMillis(); + log.trace("Sent SSE event to connection {}: {}", connectionId, eventBuilder.toString().trim()); + return true; + } else { + log.debug("Output stream is null for connection {}", connectionId); + } + } catch (IOException e) { + log.debug("Failed to send SSE event to connection {}: {}", connectionId, e.getMessage()); + close(); + } + } else { + log.debug("Connection {} is not connected, cannot send event", connectionId); + } + return false; + } + + /** + * Sends a keep-alive comment to maintain the connection. + * + * @return true if the keep-alive was sent successfully + */ + public boolean sendKeepAlive() { + if (isConnected()) { + try { + OutputStream outputStream = response.getOutputStream(); + if (outputStream != null) { + outputStream.write(": keep-alive\n\n".getBytes(StandardCharsets.UTF_8)); + outputStream.flush(); + lastActivity = System.currentTimeMillis(); + log.trace("Sent keep-alive to connection {}", connectionId); + return true; + } else { + log.debug("Output stream is null for connection {}", connectionId); + } + } catch (IOException e) { + log.debug("Failed to send keep-alive to connection {}: {}", connectionId, e.getMessage()); + close(); + } + } + return false; + } + + /** + * Closes the SSE connection. + */ + public void close() { + if (connected.compareAndSet(true, false)) { + log.debug("Closing SSE connection: {}", connectionId); + try { + if (asyncContext != null) { + asyncContext.complete(); + } + } catch (Exception e) { + log.debug("Error completing async context for connection {}: {}", connectionId, e.getMessage()); + } + } + } + + /** + * Checks if the connection is still active. + * + * @return true if connected + */ + public boolean isConnected() { + return connected.get(); + } + + /** + * Gets the connection ID. + * + * @return connection ID + */ + public String getConnectionId() { + return connectionId; + } + + /** + * Gets the scope this connection belongs to. + * + * @return the scope + */ + public IScope getScope() { + return scope; + } + + /** + * Gets the last activity timestamp. + * + * @return last activity time in milliseconds + */ + public long getLastActivity() { + return lastActivity; + } + + /** + * Gets the current event ID counter. + * + * @return current event ID + */ + public long getCurrentEventId() { + return lastEventId.get(); + } + + @Override + public String toString() { + return "SSEConnection{" + "connectionId='" + connectionId + '\'' + ", scope=" + (scope != null ? scope.getName() : "null") + ", connected=" + connected.get() + ", lastEventId=" + lastEventId.get() + ", lastActivity=" + lastActivity + '}'; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/red5/server/net/sse/SSEEvent.java b/common/src/main/java/org/red5/server/net/sse/SSEEvent.java new file mode 100644 index 000000000..2104689f4 --- /dev/null +++ b/common/src/main/java/org/red5/server/net/sse/SSEEvent.java @@ -0,0 +1,253 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.net.sse; + +/** + * Represents a Server-Sent Event with all supported fields. + * This class provides a builder pattern for creating SSE events + * that comply with the W3C Server-Sent Events specification. + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public class SSEEvent { + + private String id; + + private String event; + + private String data; + + private Integer retry; + + /** + * Default constructor. + */ + public SSEEvent() { + } + + /** + * Constructor with data only. + * + * @param data the event data + */ + public SSEEvent(String data) { + this.data = data; + } + + /** + * Constructor with event type and data. + * + * @param event the event type + * @param data the event data + */ + public SSEEvent(String event, String data) { + this.event = event; + this.data = data; + } + + /** + * Full constructor. + * + * @param id the event ID + * @param event the event type + * @param data the event data + * @param retry the retry timeout + */ + public SSEEvent(String id, String event, String data, Integer retry) { + this.id = id; + this.event = event; + this.data = data; + this.retry = retry; + } + + /** + * Gets the event ID. + * + * @return the event ID + */ + public String getId() { + return id; + } + + /** + * Sets the event ID. + * + * @param id the event ID + * @return this instance for chaining + */ + public SSEEvent setId(String id) { + this.id = id; + return this; + } + + /** + * Gets the event type. + * + * @return the event type + */ + public String getEvent() { + return event; + } + + /** + * Sets the event type. + * + * @param event the event type + * @return this instance for chaining + */ + public SSEEvent setEvent(String event) { + this.event = event; + return this; + } + + /** + * Gets the event data. + * + * @return the event data + */ + public String getData() { + return data; + } + + /** + * Sets the event data. + * + * @param data the event data + * @return this instance for chaining + */ + public SSEEvent setData(String data) { + this.data = data; + return this; + } + + /** + * Gets the retry timeout. + * + * @return the retry timeout in milliseconds + */ + public Integer getRetry() { + return retry; + } + + /** + * Sets the retry timeout. + * + * @param retry the retry timeout in milliseconds + * @return this instance for chaining + */ + public SSEEvent setRetry(Integer retry) { + this.retry = retry; + return this; + } + + /** + * Creates a new SSE event builder. + * + * @return new SSEEvent instance + */ + public static SSEEvent builder() { + return new SSEEvent(); + } + + /** + * Creates a simple message event. + * + * @param message the message + * @return new SSEEvent with the message + */ + public static SSEEvent message(String message) { + return new SSEEvent(message); + } + + /** + * Creates an event with type and data. + * + * @param event the event type + * @param data the event data + * @return new SSEEvent + */ + public static SSEEvent of(String event, String data) { + return new SSEEvent(event, data); + } + + /** + * Creates an event with ID, type and data. + * + * @param id the event ID + * @param event the event type + * @param data the event data + * @return new SSEEvent + */ + public static SSEEvent of(String id, String event, String data) { + return new SSEEvent(id, event, data, null); + } + + /** + * Converts this event to SSE format string. + * + * @return SSE formatted string + */ + public String toSSEFormat() { + StringBuilder sb = new StringBuilder(); + + if (id != null) { + sb.append("id: ").append(id).append("\n"); + } + + if (event != null) { + sb.append("event: ").append(event).append("\n"); + } + + if (retry != null) { + sb.append("retry: ").append(retry).append("\n"); + } + + if (data != null) { + // Handle multi-line data + String[] lines = data.split("\n"); + for (String line : lines) { + sb.append("data: ").append(line).append("\n"); + } + } + + sb.append("\n"); // End of event + return sb.toString(); + } + + @Override + public String toString() { + return "SSEEvent{" + "id='" + id + '\'' + ", event='" + event + '\'' + ", data='" + data + '\'' + ", retry=" + retry + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + SSEEvent sseEvent = (SSEEvent) o; + + if (id != null ? !id.equals(sseEvent.id) : sseEvent.id != null) + return false; + if (event != null ? !event.equals(sseEvent.event) : sseEvent.event != null) + return false; + if (data != null ? !data.equals(sseEvent.data) : sseEvent.data != null) + return false; + return retry != null ? retry.equals(sseEvent.retry) : sseEvent.retry == null; + } + + @Override + public int hashCode() { + int result = id != null ? id.hashCode() : 0; + result = 31 * result + (event != null ? event.hashCode() : 0); + result = 31 * result + (data != null ? data.hashCode() : 0); + result = 31 * result + (retry != null ? retry.hashCode() : 0); + return result; + } +} \ No newline at end of file diff --git a/common/src/main/java/org/red5/server/scheduling/JDKSchedulingService.java b/common/src/main/java/org/red5/server/scheduling/JDKSchedulingService.java index d4432c103..3f2f5bc39 100644 --- a/common/src/main/java/org/red5/server/scheduling/JDKSchedulingService.java +++ b/common/src/main/java/org/red5/server/scheduling/JDKSchedulingService.java @@ -21,11 +21,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.scheduling.IScheduledJob; import org.red5.server.api.scheduling.ISchedulingService; import org.red5.server.jmx.mxbeans.JDKSchedulingServiceMXBean; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.jmx.export.annotation.ManagedResource; @@ -39,7 +39,7 @@ @ManagedResource(objectName = "org.red5.server:name=schedulingService,type=JDKSchedulingService") public class JDKSchedulingService implements ISchedulingService, JDKSchedulingServiceMXBean, InitializingBean, DisposableBean { - private static Logger log = Red5LoggerFactory.getLogger(JDKSchedulingService.class); + private static Logger log = LoggerFactory.getLogger(JDKSchedulingService.class); /** * Service scheduler diff --git a/common/src/main/java/org/red5/server/scope/ScopeSecurityHandler.java b/common/src/main/java/org/red5/server/scope/ScopeSecurityHandler.java index 1bc926f01..abc994a68 100644 --- a/common/src/main/java/org/red5/server/scope/ScopeSecurityHandler.java +++ b/common/src/main/java/org/red5/server/scope/ScopeSecurityHandler.java @@ -7,11 +7,11 @@ package org.red5.server.scope; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.IConnection; import org.red5.server.api.scope.IScope; import org.red5.server.api.scope.IScopeSecurityHandler; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Scope security handler providing positive results to any allow request. @@ -20,7 +20,7 @@ */ public class ScopeSecurityHandler implements IScopeSecurityHandler { - private Logger log = Red5LoggerFactory.getLogger(this.getClass()); + private Logger log = LoggerFactory.getLogger(this.getClass()); protected boolean connectionAllowed = true; diff --git a/common/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java b/common/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java index 293e4cd0e..20c763ec1 100644 --- a/common/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java +++ b/common/src/main/java/org/red5/server/stream/PlaylistSubscriberStream.java @@ -11,7 +11,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.IConnection; import org.red5.server.api.IContext; import org.red5.server.api.Red5; @@ -26,6 +25,7 @@ import org.red5.server.api.stream.OperationNotSupportedException; import org.red5.server.api.stream.StreamState; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Stream of playlist subscriber @@ -34,7 +34,7 @@ */ public class PlaylistSubscriberStream extends AbstractClientStream implements IPlaylistSubscriberStream, IPlaylistSubscriberStreamStatistics { - private static final Logger log = Red5LoggerFactory.getLogger(PlaylistSubscriberStream.class); + private static final Logger log = LoggerFactory.getLogger(PlaylistSubscriberStream.class); /** * Playlist controller diff --git a/common/src/main/java/org/red5/server/stream/SingleItemSubscriberStream.java b/common/src/main/java/org/red5/server/stream/SingleItemSubscriberStream.java index c34a3c468..3d85fadad 100644 --- a/common/src/main/java/org/red5/server/stream/SingleItemSubscriberStream.java +++ b/common/src/main/java/org/red5/server/stream/SingleItemSubscriberStream.java @@ -11,7 +11,6 @@ import java.util.HashSet; import java.util.Set; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.IConnection; import org.red5.server.api.IContext; import org.red5.server.api.Red5; @@ -24,6 +23,7 @@ import org.red5.server.api.stream.OperationNotSupportedException; import org.red5.server.api.stream.StreamState; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Stream of a single play item for a subscriber @@ -32,7 +32,7 @@ */ public class SingleItemSubscriberStream extends AbstractClientStream implements ISingleItemSubscriberStream { - private static final Logger log = Red5LoggerFactory.getLogger(SingleItemSubscriberStream.class); + private static final Logger log = LoggerFactory.getLogger(SingleItemSubscriberStream.class); /** * Service used to provide notifications, keep client buffer filled, clean up, etc... diff --git a/common/src/main/java/org/red5/server/util/ScopeUtils.java b/common/src/main/java/org/red5/server/util/ScopeUtils.java index 61021d51f..94bc67ff5 100644 --- a/common/src/main/java/org/red5/server/util/ScopeUtils.java +++ b/common/src/main/java/org/red5/server/util/ScopeUtils.java @@ -249,7 +249,7 @@ public static Object getScopeService(IScope scope, Class intf) { } /** - *

getScopeService.

+ *

getScopeService

* * @param scope a {@link org.red5.server.api.scope.IScope} object * @param intf a {@link java.lang.Class} object @@ -276,7 +276,7 @@ public static Object getScopeService(IScope scope, Class intf, Class defau } /** - *

getScopeService.

+ *

getScopeService

* * @param scope a {@link org.red5.server.api.scope.IScope} object * @param intf a {@link java.lang.Class} object diff --git a/server/pom.xml b/server/pom.xml index 230dad912..dfa5d416d 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -139,7 +139,6 @@ com.google.code.gson gson - test diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java index 2a3a15c4b..b1fabcb0f 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; import org.apache.tomcat.websocket.Constants; import org.apache.tomcat.websocket.WsSession; @@ -128,10 +129,8 @@ public WebSocketConnection(WebSocketScope scope, Session session) { log.debug("ws session: {}", wsSession); } // the websocket session id will be used for hash code comparison, its the only usable value currently - wsSessionId = session.getId(); - if (isDebug) { - log.debug("wsSessionId: {}", wsSessionId); - } + //wsSessionId = session.getId(); + wsSessionId = RandomStringUtils.insecure().nextAlphabetic(11); // random 11 char string hashCode = wsSessionId.hashCode(); log.info("ws id: {} hashCode: {}", wsSessionId, hashCode); // get extensions diff --git a/server/src/main/java/org/red5/server/adapter/SSEApplicationAdapter.java b/server/src/main/java/org/red5/server/adapter/SSEApplicationAdapter.java new file mode 100644 index 000000000..435835ac1 --- /dev/null +++ b/server/src/main/java/org/red5/server/adapter/SSEApplicationAdapter.java @@ -0,0 +1,209 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.adapter; + +import java.util.List; + +import org.red5.server.api.IConnection; +import org.red5.server.api.Red5; +import org.red5.server.api.scope.IScope; +import org.red5.server.net.sse.ISSEService; +import org.red5.server.net.sse.SSEConnection; +import org.red5.server.net.sse.SSEEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Application adapter that provides Server-Sent Events integration. + * This adapter extends MultiThreadedApplicationAdapter and provides + * convenient methods for sending SSE events based on Red5 application events. + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public class SSEApplicationAdapter extends MultiThreadedApplicationAdapter { + + private static Logger log = LoggerFactory.getLogger(SSEApplicationAdapter.class); + + @Autowired(required = false) + private ISSEService sseService; + + @Override + public boolean appStart(IScope app) { + log.info("SSE-enabled application starting: {}", app.getName()); + return super.appStart(app); + } + + @Override + public void appStop(IScope app) { + log.info("SSE-enabled application stopping: {}", app.getName()); + if (sseService != null) { + // Notify SSE clients about application stop + sseService.broadcastEventToScope(app, "app.stop", "Application " + app.getName() + " is stopping"); + } + super.appStop(app); + } + + @Override + public boolean appConnect(IConnection conn, Object[] params) { + log.debug("Client connected to SSE-enabled app: {}", conn.getRemoteAddress()); + // look up the SSE service if not already injected + if (sseService == null) { + sseService = (ISSEService) scope.getServiceHandler(ISSEService.BEAN_NAME); + } + if (sseService != null) { + // Notify SSE clients about new connection + sseService.broadcastEventToScope(conn.getScope(), "user.connect", "New user connected from " + conn.getRemoteAddress()); + } + return super.appConnect(conn, params); + } + + @Override + public void appDisconnect(IConnection conn) { + log.debug("Client disconnected from SSE-enabled app: {}", conn.getRemoteAddress()); + if (sseService != null) { + // Notify SSE clients about disconnection + sseService.broadcastEventToScope(conn.getScope(), "user.disconnect", "User disconnected: " + conn.getRemoteAddress()); + } + super.appDisconnect(conn); + } + + /** + * Broadcasts a message to all SSE clients in the application scope. + * + * @param message The message to broadcast + * @return The number of successful sends + */ + public int broadcastSSEMessage(String message) { + if (sseService != null) { + IConnection conn = Red5.getConnectionLocal(); + IScope currentScope = conn != null ? conn.getScope() : null; + if (currentScope != null) { + return sseService.broadcastToScope(currentScope, message); + } + } + log.warn("SSE service not available for broadcasting or no current scope"); + return 0; + } + + /** + * Broadcasts an event to all SSE clients in the application scope. + * + * @param event The event type + * @param message The message to broadcast + * @return The number of successful sends + */ + public int broadcastSSEEvent(String event, String message) { + if (sseService != null) { + IConnection conn = Red5.getConnectionLocal(); + IScope currentScope = conn != null ? conn.getScope() : null; + if (currentScope != null) { + return sseService.broadcastEventToScope(currentScope, event, message); + } + } + log.warn("SSE service not available for broadcasting or no current scope"); + return 0; + } + + /** + * Broadcasts an SSE event to all clients in the application scope. + * + * @param sseEvent The SSE event to broadcast + * @return The number of successful sends + */ + public int broadcastSSEEvent(SSEEvent sseEvent) { + if (sseService != null) { + IConnection conn = Red5.getConnectionLocal(); + IScope currentScope = conn != null ? conn.getScope() : null; + if (currentScope != null) { + return sseService.broadcastEventToScope(currentScope, sseEvent); + } + } + log.warn("SSE service not available for broadcasting or no current scope"); + return 0; + } + + /** + * Gets all active SSE connections. + * + * @return List of active SSE connections + */ + public List getSSEConnections() { + if (sseService != null) { + return List.copyOf(sseService.getAllConnections()); + } + log.warn("SSE service not available"); + return List.of(); + } + + /** + * Gets the number of active SSE connections in this application scope. + * + * @return number of SSE connections + */ + public long getSSEConnectionCount() { + if (sseService != null) { + IConnection conn = Red5.getConnectionLocal(); + IScope currentScope = conn != null ? conn.getScope() : null; + if (currentScope != null) { + return sseService.getConnectionsInScope(currentScope); + } + } + return 0; + } + + /** + * Sends a message to a specific SSE connection. + * + * @param connectionId The connection ID + * @param message The message to send + * @return true if sent successfully + */ + public boolean sendSSEToConnection(String connectionId, String message) { + if (sseService != null) { + return sseService.sendToConnection(connectionId, message); + } + log.warn("SSE service not available for sending to connection"); + return false; + } + + /** + * Sends an event to a specific SSE connection. + * + * @param connectionId The connection ID + * @param event The event type + * @param message The message to send + * @return true if sent successfully + */ + public boolean sendSSEEventToConnection(String connectionId, String event, String message) { + if (sseService != null) { + return sseService.sendEventToConnection(connectionId, event, message); + } + log.warn("SSE service not available for sending to connection"); + return false; + } + + /** + * Gets the SSE service instance. + * + * @return the SSE service, or null if not available + */ + public ISSEService getSseService() { + return sseService; + } + + /** + * Sets the SSE service instance. + * + * @param sseService the SSE service to set + */ + public void setSseService(ISSEService sseService) { + this.sseService = sseService; + } + +} \ No newline at end of file diff --git a/server/src/main/java/org/red5/server/net/sse/SSEManager.java b/server/src/main/java/org/red5/server/net/sse/SSEManager.java new file mode 100644 index 000000000..d82c0f271 --- /dev/null +++ b/server/src/main/java/org/red5/server/net/sse/SSEManager.java @@ -0,0 +1,343 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.net.sse; + +import java.util.Collection; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.red5.server.api.scope.IScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; + +/** + * Manages Server-Sent Events connections and provides broadcasting capabilities. + * This class handles connection lifecycle, cleanup of stale connections, and + * broadcasting messages to groups of connections. + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public class SSEManager implements InitializingBean, DisposableBean { + + private static Logger log = LoggerFactory.getLogger(SSEManager.class); + + private final ConcurrentHashMap connections = new ConcurrentHashMap<>(); + + private ScheduledExecutorService executorService; + + private long connectionTimeoutMs = 300000; // 5 minutes default + + private long keepAliveIntervalMs = 30000; // 30 seconds default + + private boolean keepAliveEnabled = true; + + public SSEManager() { + log.debug("SSEManager instantiated"); + } + + @Override + public void afterPropertiesSet() throws Exception { + log.info("Starting SSE Manager"); + executorService = Executors.newScheduledThreadPool(2, r -> { + Thread t = new Thread(r, "SSE-Manager"); + t.setDaemon(true); + return t; + }); + // Schedule cleanup task + executorService.scheduleWithFixedDelay(this::cleanupStaleConnections, 60, 60, TimeUnit.SECONDS); + // Schedule keep-alive task if enabled + if (keepAliveEnabled) { + executorService.scheduleWithFixedDelay(this::sendKeepAlives, keepAliveIntervalMs, keepAliveIntervalMs, TimeUnit.MILLISECONDS); + } + log.info("SSE Manager started with {} connection timeout and {} keep-alive interval", connectionTimeoutMs, keepAliveIntervalMs); + } + + @Override + public void destroy() throws Exception { + log.info("Stopping SSE Manager"); + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + // Close all connections + connections.values().forEach(SSEConnection::close); + connections.clear(); + log.info("SSE Manager stopped"); + } + + /** + * Adds a new SSE connection. + * + * @param connection The SSE connection to add + */ + public void addConnection(SSEConnection connection) { + if (connection != null && connection.isConnected()) { + connections.put(connection.getConnectionId(), connection); + log.debug("Added SSE connection: {} (total: {})", connection.getConnectionId(), connections.size()); + } + } + + /** + * Removes an SSE connection. + * + * @param connectionId The connection ID to remove + * @return The removed connection, or null if not found + */ + public SSEConnection removeConnection(String connectionId) { + SSEConnection removed = connections.remove(connectionId); + if (removed != null) { + log.debug("Removed SSE connection: {} (total: {})", connectionId, connections.size()); + } + return removed; + } + + /** + * Gets an SSE connection by ID. + * + * @param connectionId The connection ID + * @return The connection, or null if not found + */ + public SSEConnection getConnection(String connectionId) { + return connections.get(connectionId); + } + + /** + * Gets all active connections. + * + * @return Collection of all active connections + */ + public Collection getAllConnections() { + return connections.values(); + } + + /** + * Gets the number of active connections. + * + * @return number of active connections + */ + public int getConnectionCount() { + return connections.size(); + } + + /** + * Broadcasts a message to all connections. + * + * @param message The message to broadcast + * @return The number of successful sends + */ + public int broadcastMessage(String message) { + return broadcastEvent(null, message); + } + + /** + * Broadcasts an event to all connections. + * + * @param event The event type + * @param message The message to broadcast + * @return The number of successful sends + */ + public int broadcastEvent(String event, String message) { + int successCount = 0; + for (SSEConnection connection : connections.values()) { + if (connection.sendEvent(event, message)) { + successCount++; + } + } + log.debug("Broadcast {} event to {}/{} connections", event != null ? event : "message", successCount, connections.size()); + return successCount; + } + + /** + * Broadcasts a message to all connections in a specific scope. + * + * @param scope The scope to broadcast to + * @param message The message to broadcast + * @return The number of successful sends + */ + public int broadcastToScope(IScope scope, String message) { + return broadcastEventToScope(scope, null, message); + } + + /** + * Broadcasts an event to all connections in a specific scope. + * + * @param scope The scope to broadcast to + * @param event The event type + * @param message The message to broadcast + * @return The number of successful sends + */ + public int broadcastEventToScope(IScope scope, String event, String message) { + if (scope == null) { + return 0; + } + int successCount = 0; + for (SSEConnection connection : connections.values()) { + if (scope.equals(connection.getScope()) && connection.sendEvent(event, message)) { + successCount++; + } + } + log.debug("Broadcast {} event to scope '{}': {}/{} connections", event != null ? event : "message", scope.getName(), successCount, getConnectionsInScope(scope)); + return successCount; + } + + /** + * Gets the number of connections in a specific scope. + * + * @param scope The scope to count connections for + * @return The number of connections in the scope + */ + public long getConnectionsInScope(IScope scope) { + if (scope == null) { + return 0; + } + return connections.values().stream().filter(conn -> scope.equals(conn.getScope())).count(); + } + + /** + * Sends a message to a specific connection. + * + * @param connectionId The connection ID + * @param message The message to send + * @return true if the message was sent successfully + */ + public boolean sendToConnection(String connectionId, String message) { + SSEConnection connection = connections.get(connectionId); + return connection != null && connection.sendMessage(message); + } + + /** + * Sends an event to a specific connection. + * + * @param connectionId The connection ID + * @param event The event type + * @param message The message to send + * @return true if the event was sent successfully + */ + public boolean sendEventToConnection(String connectionId, String event, String message) { + SSEConnection connection = connections.get(connectionId); + return connection != null && connection.sendEvent(event, message); + } + + /** + * Closes a specific connection. + * + * @param connectionId The connection ID to close + * @return true if the connection was found and closed + */ + public boolean closeConnection(String connectionId) { + SSEConnection connection = removeConnection(connectionId); + if (connection != null) { + connection.close(); + return true; + } + return false; + } + + /** + * Cleans up stale connections that have exceeded the timeout. + */ + private void cleanupStaleConnections() { + long now = System.currentTimeMillis(); + List removals = new ArrayList<>(); + for (SSEConnection connection : connections.values()) { + if (!connection.isConnected() || (now - connection.getLastActivity()) > connectionTimeoutMs) { + removals.add(connection.getConnectionId()); + } + } + for (String connectionId : removals) { + SSEConnection connection = removeConnection(connectionId); + if (connection != null) { + connection.close(); + } + } + removals.clear(); + } + + /** + * Sends keep-alive messages to all active connections. + */ + private void sendKeepAlives() { + int sentCount = 0; + for (SSEConnection connection : connections.values()) { + if (connection.sendKeepAlive()) { + sentCount++; + } + } + if (sentCount > 0) { + log.trace("Sent keep-alive to {} connections", sentCount); + } + } + + /** + * Sets the connection timeout in milliseconds. + * + * @param connectionTimeoutMs timeout in milliseconds + */ + public void setConnectionTimeoutMs(long connectionTimeoutMs) { + this.connectionTimeoutMs = connectionTimeoutMs; + } + + /** + * Gets the connection timeout in milliseconds. + * + * @return timeout in milliseconds + */ + public long getConnectionTimeoutMs() { + return connectionTimeoutMs; + } + + /** + * Sets the keep-alive interval in milliseconds. + * + * @param keepAliveIntervalMs interval in milliseconds + */ + public void setKeepAliveIntervalMs(long keepAliveIntervalMs) { + this.keepAliveIntervalMs = keepAliveIntervalMs; + } + + /** + * Gets the keep-alive interval in milliseconds. + * + * @return interval in milliseconds + */ + public long getKeepAliveIntervalMs() { + return keepAliveIntervalMs; + } + + /** + * Enables or disables keep-alive messages. + * + * @param keepAliveEnabled true to enable keep-alive + */ + public void setKeepAliveEnabled(boolean keepAliveEnabled) { + this.keepAliveEnabled = keepAliveEnabled; + } + + /** + * Checks if keep-alive is enabled. + * + * @return true if keep-alive is enabled + */ + public boolean isKeepAliveEnabled() { + return keepAliveEnabled; + } + +} diff --git a/server/src/main/java/org/red5/server/net/sse/SSEService.java b/server/src/main/java/org/red5/server/net/sse/SSEService.java new file mode 100644 index 000000000..19556bb4f --- /dev/null +++ b/server/src/main/java/org/red5/server/net/sse/SSEService.java @@ -0,0 +1,138 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.net.sse; + +import java.util.Collection; + +import org.red5.server.api.scope.IScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * Implementation of the SSE service interface. + * This service provides a high-level API for working with Server-Sent Events + * and delegates to the SSEManager for actual connection management. + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public class SSEService implements ISSEService { + + private static Logger log = LoggerFactory.getLogger(SSEService.class); + + @Autowired + private SSEManager sseManager; + + public SSEService() { + log.debug("SSEService instantiated"); + } + + @Override + public int broadcastMessage(String message) { + log.debug("Broadcasting message to all connections: {}", message); + return sseManager.broadcastMessage(message); + } + + @Override + public int broadcastEvent(String event, String message) { + log.debug("Broadcasting event '{}' to all connections: {}", event, message); + return sseManager.broadcastEvent(event, message); + } + + @Override + public int broadcastEvent(SSEEvent sseEvent) { + log.debug("Broadcasting SSE event to all connections: {}", sseEvent); + return sseManager.broadcastEvent(sseEvent.getEvent(), sseEvent.getData()); + } + + @Override + public int broadcastToScope(IScope scope, String message) { + log.debug("Broadcasting message to scope '{}': {}", scope != null ? scope.getName() : "null", message); + return sseManager.broadcastToScope(scope, message); + } + + @Override + public int broadcastEventToScope(IScope scope, String event, String message) { + log.debug("Broadcasting event '{}' to scope '{}': {}", event, scope != null ? scope.getName() : "null", message); + return sseManager.broadcastEventToScope(scope, event, message); + } + + @Override + public int broadcastEventToScope(IScope scope, SSEEvent sseEvent) { + log.debug("Broadcasting SSE event to scope '{}': {}", scope != null ? scope.getName() : "null", sseEvent); + return sseManager.broadcastEventToScope(scope, sseEvent.getEvent(), sseEvent.getData()); + } + + @Override + public boolean sendToConnection(String connectionId, String message) { + log.debug("Sending message to connection '{}': {}", connectionId, message); + return sseManager.sendToConnection(connectionId, message); + } + + @Override + public boolean sendEventToConnection(String connectionId, String event, String message) { + log.debug("Sending event '{}' to connection '{}': {}", event, connectionId, message); + return sseManager.sendEventToConnection(connectionId, event, message); + } + + @Override + public boolean sendEventToConnection(String connectionId, SSEEvent sseEvent) { + log.debug("Sending SSE event to connection '{}': {}", connectionId, sseEvent); + SSEConnection connection = sseManager.getConnection(connectionId); + if (connection != null) { + return connection.sendEvent(sseEvent.getId(), sseEvent.getEvent(), sseEvent.getData(), sseEvent.getRetry()); + } + return false; + } + + @Override + public Collection getAllConnections() { + return sseManager.getAllConnections(); + } + + @Override + public int getConnectionCount() { + return sseManager.getConnectionCount(); + } + + @Override + public long getConnectionsInScope(IScope scope) { + return sseManager.getConnectionsInScope(scope); + } + + @Override + public SSEConnection getConnection(String connectionId) { + return sseManager.getConnection(connectionId); + } + + @Override + public boolean closeConnection(String connectionId) { + log.debug("Closing connection: {}", connectionId); + return sseManager.closeConnection(connectionId); + } + + /** + * Sets the SSE manager. + * + * @param sseManager the SSE manager to set + */ + public void setSseManager(SSEManager sseManager) { + log.debug("Setting SSE manager: {}", sseManager); + this.sseManager = sseManager; + } + + /** + * Gets the SSE manager. + * + * @return the SSE manager + */ + public SSEManager getSseManager() { + return sseManager; + } + +} \ No newline at end of file diff --git a/server/src/main/java/org/red5/server/net/sse/SSEServlet.java b/server/src/main/java/org/red5/server/net/sse/SSEServlet.java new file mode 100644 index 000000000..b781c4e42 --- /dev/null +++ b/server/src/main/java/org/red5/server/net/sse/SSEServlet.java @@ -0,0 +1,393 @@ +/* + * RED5 Open Source Flash Server - https://github.com/Red5/ Copyright 2006-2023 by respective authors (see below). All rights reserved. Licensed under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + * required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions and limitations under the License. + */ + +package org.red5.server.net.sse; + +import java.io.BufferedReader; +import java.io.IOException; + +import org.apache.commons.lang3.RandomStringUtils; +import org.red5.server.api.IServer; +import org.red5.server.api.scope.IGlobalScope; +import org.red5.server.api.scope.IScope; +import org.red5.server.scope.WebScope; +import org.red5.server.util.ScopeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.context.WebApplicationContext; +import org.springframework.web.context.support.WebApplicationContextUtils; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import jakarta.servlet.AsyncContext; +import jakarta.servlet.AsyncEvent; +import jakarta.servlet.AsyncListener; +import jakarta.servlet.ServletContext; +import jakarta.servlet.ServletException; +import jakarta.servlet.http.HttpServlet; +import jakarta.servlet.http.HttpServletRequest; +import jakarta.servlet.http.HttpServletResponse; + +/** + * Servlet that handles Server-Sent Events (SSE) connections. + * This servlet establishes and manages SSE connections following the W3C + * Server-Sent Events specification. + * + * The servlet supports: + * - Standard SSE event format with id, event, data, and retry fields + * - Connection management and cleanup + * - Integration with Red5 scopes + * - Async servlet processing for long-lived connections + * + * @author Paul Gregoire (mondain@gmail.com) + */ +public class SSEServlet extends HttpServlet implements AsyncListener { + + private static final long serialVersionUID = 1L; + + private static Logger log = LoggerFactory.getLogger(SSEServlet.class); + + private transient WebApplicationContext webAppCtx; + + private transient IServer server; + + private transient WebScope webScope; + + private transient SSEService sseService; + + private transient SSEManager sseManager; + + @Override + public void init() throws ServletException { + super.init(); + log.debug("Initializing SSE servlet"); + ServletContext ctx = getServletContext(); + log.debug("Context path: {}", ctx.getContextPath()); + // Get the web application context + try { + webAppCtx = WebApplicationContextUtils.getRequiredWebApplicationContext(ctx); + } catch (IllegalStateException e) { + log.debug("Required web application context not found, trying fallback"); + webAppCtx = (WebApplicationContext) ctx.getAttribute(WebApplicationContext.ROOT_WEB_APPLICATION_CONTEXT_ATTRIBUTE); + } + if (webAppCtx != null) { + // Get the Red5 server instance + server = (IServer) webAppCtx.getBean("red5.server"); + // get the application scope + webScope = (WebScope) webAppCtx.getBean("web.scope"); + sseService = (SSEService) webScope.getServiceHandler(SSEService.BEAN_NAME); + if (sseService != null) { + sseManager = sseService.getSseManager(); + if (sseManager == null) { + log.warn("SSEManager not available from SSEService"); + } + } else { + log.info("SSEService not available from WebScope"); + } + } else { + throw new ServletException("No web application context available"); + } + log.info("SSE servlet initialized successfully"); + } + + @Override + public void destroy() { + log.debug("Destroying SSE servlet"); + super.destroy(); + } + + @Override + protected void doOptions(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + // Handle CORS preflight + handleCORS(req, resp); + resp.setStatus(HttpServletResponse.SC_OK); + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + log.debug("SSE connection request from: {} {}", req.getRemoteAddr(), req.getRequestURI()); + // Validate that this is an SSE request + String accept = req.getHeader("Accept"); + if (accept == null || !accept.contains("text/event-stream")) { + log.debug("Request does not accept text/event-stream, rejecting"); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "This endpoint only supports Server-Sent Events"); + return; + } + // Handle CORS preflight if needed + handleCORS(req, resp); + // Get the scope for this connection + IScope scope = getScope(req); + if (scope == null) { + log.warn("No scope available for SSE connection"); + resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Scope not available"); + return; + } + // Generate unique connection ID + String connectionId = RandomStringUtils.insecure().nextAlphabetic(11); // random 11 char string + // Start async processing + AsyncContext asyncContext = req.startAsync(); + asyncContext.setTimeout(0); // No timeout, managed by SSEManager + asyncContext.addListener(this); + // Create SSE connection + SSEConnection sseConnection = new SSEConnection(connectionId, asyncContext, resp, scope); + // Add to manager + sseManager.addConnection(sseConnection); + // Send initial connection confirmation + sseConnection.sendEvent("connection", "connected: " + connectionId); + log.info("Established SSE connection: {} for scope: {}", connectionId, scope.getName()); + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + log.debug("SSE event post request from: {} {}", req.getRemoteAddr(), req.getRequestURI()); + // Handle CORS + handleCORS(req, resp); + // Validate content type + String contentType = req.getContentType(); + if (contentType == null || !contentType.toLowerCase().contains("application/json")) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Content-Type must be application/json"); + return; + } + // Check if SSE manager is available + if (sseManager == null) { + log.warn("SSE manager not available for event posting"); + resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "SSE service not available"); + return; + } + try { + // Read request body + StringBuilder requestBody = new StringBuilder(); + try (BufferedReader reader = req.getReader()) { + String line; + while ((line = reader.readLine()) != null) { + requestBody.append(line); + } + } + String jsonBody = requestBody.toString(); + if (jsonBody.isEmpty()) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Request body is required"); + return; + } + // Parse JSON manually (simple parsing for basic event structure) + SSEEventRequest eventRequest = parseEventRequest(jsonBody); + if (eventRequest == null) { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Invalid JSON format or missing required fields"); + return; + } + int successCount = 0; + // Handle different event targets + if (eventRequest.connectionId != null && !eventRequest.connectionId.isEmpty()) { + // Send to specific connection + boolean success = sseManager.sendEventToConnection(eventRequest.connectionId, eventRequest.event, eventRequest.data); + successCount = success ? 1 : 0; + log.debug("Sent event '{}' to connection '{}': {}", eventRequest.event, eventRequest.connectionId, success); + } else if (eventRequest.scope != null && !eventRequest.scope.isEmpty()) { + // Send to specific scope + IScope targetScope = resolveScope(eventRequest.scope); + if (targetScope != null) { + successCount = sseManager.broadcastEventToScope(targetScope, eventRequest.event, eventRequest.data); + log.debug("Broadcast event '{}' to scope '{}': {} connections", eventRequest.event, eventRequest.scope, successCount); + } else { + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Scope not found: " + eventRequest.scope); + return; + } + } else { + // Broadcast to all connections + successCount = sseManager.broadcastEvent(eventRequest.event, eventRequest.data); + log.debug("Broadcast event '{}' to all connections: {} recipients", eventRequest.event, successCount); + } + // Return success response + resp.setStatus(HttpServletResponse.SC_OK); + } catch (Exception e) { + log.warn("Error processing SSE event post: {}", e.getMessage(), e); + resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error processing event"); + } + } + + /** + * Handles CORS headers for cross-origin requests. + */ + private void handleCORS(HttpServletRequest req, HttpServletResponse resp) { + String origin = req.getHeader("Origin"); + if (origin != null) { + resp.setHeader("Access-Control-Allow-Origin", origin); + } else { + resp.setHeader("Access-Control-Allow-Origin", "*"); + } + resp.setHeader("Access-Control-Allow-Credentials", "true"); + resp.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS"); + resp.setHeader("Access-Control-Allow-Headers", "Accept, Cache-Control, Last-Event-ID, Content-Type"); + resp.setHeader("Access-Control-Max-Age", "3600"); + } + + /** + * Gets the scope for the current request. + */ + private IScope getScope(HttpServletRequest req) { + if (webScope == null) { + log.warn("Web scope is not available"); + return null; + } + IGlobalScope globalScope = server.getGlobal("default"); + if (globalScope == null) { + log.warn("Global scope is not available"); + return null; + } + String path = req.getContextPath(); + log.debug("Request context path: {} path info: {}", path, req.getPathInfo()); + if (path == null || path.equals("/")) { + // Default to root application + return ScopeUtils.resolveScope(globalScope, "/live"); + } else { + // Extract application name from path + String[] parts = path.split("/"); + if (parts.length > 1) { + String appName = parts[1]; + IScope appScope = ScopeUtils.resolveScope(globalScope, appName); + if (appScope != null && ScopeUtils.isApp(appScope)) { + return appScope; + } else { + log.warn("Application scope '{}' not found, defaulting to 'live'", appName); + return ScopeUtils.resolveScope(globalScope, "/live"); + } + } else { + log.warn("Invalid path info '{}', defaulting to 'live'", path); + return ScopeUtils.resolveScope(globalScope, "/live"); + } + } + } + + // AsyncListener implementation + + @Override + public void onComplete(AsyncEvent event) throws IOException { + log.debug("Async context completed"); + cleanupConnection(event); + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException { + log.debug("Async context timed out"); + cleanupConnection(event); + } + + @Override + public void onError(AsyncEvent event) throws IOException { + log.debug("Async context error: {}", event.getThrowable() != null ? event.getThrowable().getMessage() : "unknown"); + cleanupConnection(event); + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException { + log.trace("Async context started"); + } + + /** + * Cleans up connection resources when async context ends. + */ + private void cleanupConnection(AsyncEvent event) { + try { + AsyncContext asyncContext = event.getAsyncContext(); + if (asyncContext != null) { + HttpServletRequest req = (HttpServletRequest) asyncContext.getRequest(); + String connectionId = (String) req.getAttribute("sse.connectionId"); + if (connectionId != null && sseManager != null) { + SSEConnection connection = sseManager.removeConnection(connectionId); + if (connection != null) { + connection.close(); + log.debug("Cleaned up SSE connection: {}", connectionId); + } + } + } + } catch (Exception e) { + log.debug("Error during connection cleanup: {}", e.getMessage()); + } + } + + /** + * Gets the SSE manager for this servlet. + * + * @return the SSE manager + */ + public SSEManager getSseManager() { + return sseManager; + } + + /** + * Parses a JSON event request into an SSEEventRequest object. + * Simple JSON parser for basic event structure. + */ + private SSEEventRequest parseEventRequest(String json) { + try { + // trim whitespace and validate basic JSON structure + json = json.trim(); + if (json.startsWith("{") && json.endsWith("}")) { + JsonObject jsonObject = JsonParser.parseString(json).getAsJsonObject(); + if (jsonObject != null) { + log.debug("Parsed JSON successfully: {}", jsonObject.toString()); + SSEEventRequest request = new SSEEventRequest(); + jsonObject.entrySet().forEach(entry -> { + String key = entry.getKey(); + String value = entry.getValue().isJsonObject() ? entry.getValue().toString() : entry.getValue().getAsString(); + log.debug("JSON field: {} = {}", key, value); + switch (key) { + case "event": + request.event = value; + break; + case "data": + request.data = value; + break; + case "connectionId": + request.connectionId = value; + break; + case "scope": + request.scope = value; + break; + } + }); + // Validate required fields + if (request.event != null && request.data != null) { + log.debug("Parsed event request: event='{}', data='{}'", request.event, request.data); + return request; + } + } + } + } catch (Exception e) { + log.warn("Error parsing event request JSON: {}", e.getMessage()); + } + return null; + } + + /** + * Resolves a scope by name. + */ + private IScope resolveScope(String scopeName) { + if (server == null) { + return null; + } + IGlobalScope globalScope = server.getGlobal("default"); + if (globalScope == null) { + return null; + } + return ScopeUtils.resolveScope(globalScope, scopeName); + } + + /** + * Simple data class for SSE event requests. + */ + private static class SSEEventRequest { + String event; + + String data; + + String connectionId; // Optional: target specific connection + + String scope; // Optional: target specific scope + } + +} \ No newline at end of file diff --git a/server/src/main/java/org/red5/server/plugin/PluginRegistry.java b/server/src/main/java/org/red5/server/plugin/PluginRegistry.java index 742112eb8..f1568e366 100644 --- a/server/src/main/java/org/red5/server/plugin/PluginRegistry.java +++ b/server/src/main/java/org/red5/server/plugin/PluginRegistry.java @@ -11,9 +11,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.plugin.IRed5Plugin; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Central registry for Red5 plug-ins. @@ -22,7 +22,7 @@ */ public class PluginRegistry { - private static Logger log = Red5LoggerFactory.getLogger(PluginRegistry.class, "plugins"); + private static Logger log = LoggerFactory.getLogger(PluginRegistry.class); // keeps track of plug-ins, keyed by plug-in name private static volatile ConcurrentMap plugins = new ConcurrentHashMap<>(3, 0.9f, 1); diff --git a/server/src/main/java/org/red5/server/scope/WebScope.java b/server/src/main/java/org/red5/server/scope/WebScope.java index 9578b9565..2d19ad697 100644 --- a/server/src/main/java/org/red5/server/scope/WebScope.java +++ b/server/src/main/java/org/red5/server/scope/WebScope.java @@ -17,6 +17,7 @@ import org.red5.server.api.scope.IGlobalScope; import org.red5.server.api.scope.ScopeType; import org.red5.server.jmx.mxbeans.WebScopeMXBean; +import org.red5.server.net.sse.ISSEService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; @@ -28,13 +29,14 @@ /** *

- * Web scope is special scope that is aware of servlet context and represents scope of a Red5 application within a servlet container (or application server) such as Tomcat, Jetty or JBoss. - *

- *

- * Web scope is aware of virtual hosts configuration for Red5 application and is the first scope that instantiated after Red5 application gets started. - *

- *

- * Then it loads virtual hosts configuration, adds mappings of paths to global scope that is injected thru Spring IoC context file and runs initialization process. + * Web scope is special scope that is aware of servlet context and represents scope of a Red5 application within a + * servlet container (or application server) such as Tomcat, Jetty or JBoss. + *
+ * Web scope is aware of virtual hosts configuration for Red5 application and is the first scope that instantiated + * after Red5 application gets started. + *
+ * Then it loads virtual hosts configuration, adds mappings of paths to global scope that is injected thru Spring IoC + * context file and runs initialization process. *

* * Red5 server implementation instance and ServletContext are injected as well. @@ -208,6 +210,13 @@ public void register() { log.debug("Webscope registering: {}", contextPath); getAppContext(); appLoader = LoaderBase.getApplicationLoader(); + ISSEService sseService = (ISSEService) LoaderBase.getInstance().getSSEService(); + if (sseService != null) { + log.debug("SSE service found: {}", sseService); + registerServiceHandler(ISSEService.BEAN_NAME, sseService); + } else { + log.info("SSE service not found"); + } // get the parent name String parentName = getParent().getName(); // add host name mappings @@ -238,6 +247,8 @@ public void unregister() { uninit(); // disconnect all clients before unregistering getClientConnections().forEach(IConnection::close); + // clear service + unregisterServiceHandler(ISSEService.BEAN_NAME); // remove host name mappings if (hostnames != null && hostnames.length > 0) { for (String element : hostnames) { diff --git a/server/src/main/java/org/red5/server/service/ShutdownServer.java b/server/src/main/java/org/red5/server/service/ShutdownServer.java index 962e9006e..72820f6d9 100644 --- a/server/src/main/java/org/red5/server/service/ShutdownServer.java +++ b/server/src/main/java/org/red5/server/service/ShutdownServer.java @@ -27,11 +27,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.ContextLoader; import org.red5.server.LoaderBase; import org.red5.server.plugin.PluginRegistry; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -46,7 +46,7 @@ */ public class ShutdownServer implements ApplicationContextAware, InitializingBean, DisposableBean { - private Logger log = Red5LoggerFactory.getLogger(ShutdownServer.class); + private Logger log = LoggerFactory.getLogger(ShutdownServer.class); /** * Port to which the server listens for shutdown requests. Default is 9999. diff --git a/server/src/main/java/org/red5/server/stream/ProviderService.java b/server/src/main/java/org/red5/server/stream/ProviderService.java index f60b4be8f..9413800f8 100644 --- a/server/src/main/java/org/red5/server/stream/ProviderService.java +++ b/server/src/main/java/org/red5/server/stream/ProviderService.java @@ -13,7 +13,6 @@ import java.nio.file.Paths; import java.util.Set; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.scope.IBroadcastScope; import org.red5.server.api.scope.IScope; import org.red5.server.api.scope.ScopeType; @@ -32,6 +31,7 @@ import org.red5.server.stream.provider.FileProvider; import org.red5.server.util.ScopeUtils; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** *

ProviderService class.

@@ -40,7 +40,7 @@ */ public class ProviderService implements IProviderService { - private static final Logger log = Red5LoggerFactory.getLogger(ProviderService.class); + private static final Logger log = LoggerFactory.getLogger(ProviderService.class); // whether or not to support FCS/FMS/AMS live-wait (default to off) private boolean liveWaitSupport; diff --git a/server/src/main/java/org/red5/server/tomcat/TomcatApplicationContext.java b/server/src/main/java/org/red5/server/tomcat/TomcatApplicationContext.java index 102db3b88..f972445ea 100644 --- a/server/src/main/java/org/red5/server/tomcat/TomcatApplicationContext.java +++ b/server/src/main/java/org/red5/server/tomcat/TomcatApplicationContext.java @@ -7,17 +7,17 @@ package org.red5.server.tomcat; -import jakarta.servlet.ServletContext; - import org.apache.catalina.Context; import org.apache.catalina.LifecycleState; import org.apache.catalina.core.StandardContext; -import org.red5.logging.Red5LoggerFactory; import org.red5.server.api.IApplicationContext; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.web.context.ConfigurableWebApplicationContext; import org.springframework.web.context.WebApplicationContext; +import jakarta.servlet.ServletContext; + /** * Class that wraps a Tomcat webapp context. * @@ -28,7 +28,7 @@ public class TomcatApplicationContext implements IApplicationContext { /** Constant log */ - protected static Logger log = Red5LoggerFactory.getLogger(TomcatApplicationContext.class); + protected static Logger log = LoggerFactory.getLogger(TomcatApplicationContext.class); /** Store a reference to the Tomcat webapp context. */ private Context context; diff --git a/server/src/main/java/org/red5/server/tomcat/TomcatConnector.java b/server/src/main/java/org/red5/server/tomcat/TomcatConnector.java index 09b9f0137..97b36f434 100644 --- a/server/src/main/java/org/red5/server/tomcat/TomcatConnector.java +++ b/server/src/main/java/org/red5/server/tomcat/TomcatConnector.java @@ -19,8 +19,8 @@ import org.apache.tomcat.util.IntrospectionUtils; import org.apache.tomcat.util.net.SSLHostConfig; import org.apache.tomcat.util.net.SSLHostConfigCertificate; -import org.red5.logging.Red5LoggerFactory; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Model object to contain a connector, socket address, and connection properties for a Tomcat connection. @@ -29,7 +29,7 @@ */ public class TomcatConnector { - private static Logger log = Red5LoggerFactory.getLogger(TomcatConnector.class); + private static Logger log = LoggerFactory.getLogger(TomcatConnector.class); private Connector connector; diff --git a/server/src/main/java/org/red5/server/tomcat/TomcatLoader.java b/server/src/main/java/org/red5/server/tomcat/TomcatLoader.java index 369697317..dd6066aef 100644 --- a/server/src/main/java/org/red5/server/tomcat/TomcatLoader.java +++ b/server/src/main/java/org/red5/server/tomcat/TomcatLoader.java @@ -48,6 +48,7 @@ import org.red5.server.api.Red5; import org.red5.server.jmx.mxbeans.ContextLoaderMXBean; import org.red5.server.jmx.mxbeans.LoaderMXBean; +import org.red5.server.net.sse.ISSEService; import org.red5.server.plugin.PluginRegistry; import org.red5.server.security.IRed5Realm; import org.red5.server.util.FileUtil; @@ -175,6 +176,11 @@ public boolean accept(File dir, String name) { */ protected boolean websocketEnabled = true; + /** + * SSE feature + */ + protected boolean sseEnabled = true; + /** * HTTPS/WSS feature */ @@ -190,6 +196,11 @@ public boolean accept(File dir, String name) { */ private WarDeployer deployer; + /** + * SSE service. + */ + private ISSEService sseService; + { // allow setting to true if we're running in Red5 Pro if (!awaitPlugins) { @@ -206,6 +217,8 @@ public boolean accept(File dir, String name) { /** {@inheritDoc} */ @Override public void afterPropertiesSet() throws Exception { + // set ourself as the instance + instance = this; // if we are not awaiting plugins, start immediately if (awaitPlugins) { log.info("Awaiting plugin loading"); @@ -326,6 +339,9 @@ public void start() throws ServletException { if (websocketEnabled) { checkWebsocketPlugin(); } + if (sseEnabled) { + checkSSEService(); + } // get a reference to the current threads classloader final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); // root location for servlet container @@ -356,10 +372,8 @@ public void start() throws ServletException { // Use default webapps directory webappFolder = FileUtil.formatPath(serverRoot, "/webapps"); } - // To negotiate for web-socket compression or not. UpgradeUtil.wsAllowCompression = Boolean.valueOf(System.getProperty("ws.allow.compression", "true")); - System.setProperty("red5.webapp.root", webappFolder); log.info("Application root: {}", webappFolder); // Root applications directory @@ -594,6 +608,9 @@ public void run() { log.debug("Tomcat load completed"); } + /** + * Check for websocket plugin and load it if available. + */ private void checkWebsocketPlugin() { // if websockets are enabled, ensure the websocket plugin is loaded if (PluginRegistry.getPlugin(WebSocketPlugin.NAME) == null) { @@ -615,6 +632,21 @@ private void checkWebsocketPlugin() { } } + /** + * Check for SSE service and load it if available. + */ + private void checkSSEService() { + // get common context + ApplicationContext common = (ApplicationContext) applicationContext.getBean("red5.common"); + // if SSE is enabled, ensure the SSE service is loaded + if (common.containsBean("sseService")) { + log.debug("SSE service was found"); + sseService = common.getBean("sseService", ISSEService.class); + } else { + log.debug("SSE service was not found"); + } + } + /** * {@inheritDoc} * @@ -887,6 +919,32 @@ public void setWebsocketEnabled(boolean websocketEnabled) { this.websocketEnabled = websocketEnabled; } + /** + * Returns enabled state of SSE support. + * + * @return true if enabled and false otherwise + */ + public boolean isSseEnabled() { + return sseEnabled; + } + + /** + * Set SSE feature enabled / disabled. + * + * @param sseEnabled a boolean + */ + public void setSseEnabled(boolean sseEnabled) { + this.sseEnabled = sseEnabled; + } + + /** + * {@inheritDoc} + */ + @Override + public ISSEService getSSEService() { + return sseService; + } + /** * Returns enabled state of secure support. * diff --git a/server/src/main/server/conf/jee-container.xml b/server/src/main/server/conf/jee-container.xml index c141b55b4..b49fb60b4 100644 --- a/server/src/main/server/conf/jee-container.xml +++ b/server/src/main/server/conf/jee-container.xml @@ -18,6 +18,7 @@ --> + diff --git a/server/src/main/server/conf/red5-common.xml b/server/src/main/server/conf/red5-common.xml index 04dea85b3..244e3dfb1 100644 --- a/server/src/main/server/conf/red5-common.xml +++ b/server/src/main/server/conf/red5-common.xml @@ -82,6 +82,23 @@ + + + + + + + + + + + + + + + + + +
+

SSE Connection

+
+ + + + +
+
Disconnected
+
+ + Not connected +
+
+ + +
+

Send Event via POST

+
+
+ + +
+
+ + +
+
+ + +
+ + +
+
+
+ + +
+

Received Events

+ +
+
+ + + + \ No newline at end of file diff --git a/server/src/main/server/webapps/live/WEB-INF/red5-web.xml b/server/src/main/server/webapps/live/WEB-INF/red5-web.xml index 6c9fd6ec7..f09453f22 100644 --- a/server/src/main/server/webapps/live/WEB-INF/red5-web.xml +++ b/server/src/main/server/webapps/live/WEB-INF/red5-web.xml @@ -22,4 +22,7 @@ + + + diff --git a/server/src/main/server/webapps/live/WEB-INF/web.xml b/server/src/main/server/webapps/live/WEB-INF/web.xml index 2a9125e43..6d2af0cf1 100644 --- a/server/src/main/server/webapps/live/WEB-INF/web.xml +++ b/server/src/main/server/webapps/live/WEB-INF/web.xml @@ -8,6 +8,22 @@ webAppRootKey /live + + + sse + org.red5.server.net.sse.SSEServlet + 3 + true + + cors.enabled + true + + + + sse + /events + /events/* +