diff --git a/dart/analysis_options.yaml b/dart/analysis_options.yaml index d9e9bcd0..2e7b18df 100644 --- a/dart/analysis_options.yaml +++ b/dart/analysis_options.yaml @@ -1,4 +1,4 @@ -include: package:leancode_lint/analysis_options.yaml +include: package:leancode_lint/analysis_options_package.yaml analyzer: exclude: diff --git a/dart/lib/leancode_pipe/authorized_pipe_http_client.dart b/dart/lib/leancode_pipe/authorized_pipe_http_client.dart index 29e98933..b4323fb9 100644 --- a/dart/lib/leancode_pipe/authorized_pipe_http_client.dart +++ b/dart/lib/leancode_pipe/authorized_pipe_http_client.dart @@ -1,7 +1,13 @@ import 'package:http/http.dart' as http; import 'package:leancode_pipe/leancode_pipe/pipe_client.dart'; +/// An HTTP client that automatically adds authorization headers to requests. +/// +/// Uses a `tokenFactory` to obtain Bearer tokens which are added to each request's +/// Authorization header. If no `tokenFactory` is provided, requests are sent without +/// authorization. class AuthorizedPipeHttpClient extends http.BaseClient { + /// Creates a new [AuthorizedPipeHttpClient]. AuthorizedPipeHttpClient({ http.Client? client, required PipeTokenFactory? tokenFactory, diff --git a/dart/lib/leancode_pipe/contracts/contracts.dart b/dart/lib/leancode_pipe/contracts/contracts.dart index 305e550c..a7c3f3a2 100644 --- a/dart/lib/leancode_pipe/contracts/contracts.dart +++ b/dart/lib/leancode_pipe/contracts/contracts.dart @@ -6,46 +6,71 @@ const _pipeContractsSerializable = JsonSerializable( fieldRename: FieldRename.pascal, ); +/// Result of a subscription operation returned by the server. @_pipeContractsSerializable class SubscriptionResult { + /// Creates a new [SubscriptionResult] instance. const SubscriptionResult({ required this.subscriptionId, required this.status, required this.type, }); + /// Creates a new [SubscriptionResult] instance from a JSON object. factory SubscriptionResult.fromJson(Map json) => _$SubscriptionResultFromJson(json); + /// Unique identifier for the subscription. final String subscriptionId; + + /// Status of the subscription operation. final SubscriptionStatus status; + + /// Type of operation (subscribe/unsubscribe). final OperationType type; + /// Converts the subscription result to a JSON object. Map toJson() => _$SubscriptionResultToJson(this); } +/// Status codes returned by the server for subscription operations. enum SubscriptionStatus { + /// Operation completed successfully. @JsonValue(0) success, + + /// Client is not authorized to perform the operation. @JsonValue(1) unauthorized, + + /// Request was malformed. @JsonValue(2) malformed, + + /// Request was invalid. @JsonValue(3) invalid, + + /// Server encountered an internal error. @JsonValue(4) internalServerError, } +/// Types of subscription operations. enum OperationType { + /// Subscribe to a topic. @JsonValue(0) subscribe, + + /// Unsubscribe from a topic. @JsonValue(1) unsubscribe, } +/// Envelope containing a notification sent by the server. @_pipeContractsSerializable class NotificationEnvelope { + /// Creates a new [NotificationEnvelope] instance. const NotificationEnvelope({ required this.id, required this.topicType, @@ -54,15 +79,26 @@ class NotificationEnvelope { required this.notification, }); + /// Creates a new [NotificationEnvelope] instance from a JSON object. factory NotificationEnvelope.fromJson(Map json) => _$NotificationEnvelopeFromJson(json); + /// Unique identifier for this notification. final String id; + + /// Full name of the topic type. final String topicType; + + /// Type of the notification. final String notificationType; + + /// Topic data in JSON format. final Map topic; + + /// The notification payload. final Object notification; + /// Converts the notification envelope to a JSON object. Map toJson() => _$NotificationEnvelopeToJson(this); @override @@ -71,21 +107,30 @@ class NotificationEnvelope { } } +/// Envelope containing subscription request data sent to the server. @_pipeContractsSerializable class SubscriptionEnvelope { + /// Creates a new [SubscriptionEnvelope] instance. const SubscriptionEnvelope({ required this.id, required this.topic, required this.topicType, }); + /// Creates a new [SubscriptionEnvelope] instance from a JSON object. factory SubscriptionEnvelope.fromJson(Map json) => _$SubscriptionEnvelopeFromJson(json); + /// Unique identifier for this subscription request. final String id; + + /// Topic data in JSON format. final Map topic; + + /// Full name of the topic type. final String topicType; + /// Converts the subscription envelope to a JSON object. Map toJson() => _$SubscriptionEnvelopeToJson(this); @override diff --git a/dart/lib/leancode_pipe/contracts/topic.dart b/dart/lib/leancode_pipe/contracts/topic.dart index a026abb3..40afa6ab 100644 --- a/dart/lib/leancode_pipe/contracts/topic.dart +++ b/dart/lib/leancode_pipe/contracts/topic.dart @@ -1,9 +1,17 @@ +/// Represents a topic that can be subscribed to for receiving notifications. +/// +/// A topic is a source of typed notifications that clients can subscribe to. +/// Each topic has a specific [Notification] type that it emits. abstract interface class Topic { + /// Attempts to cast a JSON notification to the correct [Notification] type. Notification? castNotification(String tag, dynamic json); + /// Returns the fully qualified name of this topic type. String getFullName(); + /// Converts this topic to a JSON representation. Map toJson(); + /// Creates a new instance of this topic from JSON data. Topic fromJson(Map json); } diff --git a/dart/lib/leancode_pipe/create_hub_connection.dart b/dart/lib/leancode_pipe/create_hub_connection.dart index d2e2b583..90c3397b 100644 --- a/dart/lib/leancode_pipe/create_hub_connection.dart +++ b/dart/lib/leancode_pipe/create_hub_connection.dart @@ -2,6 +2,15 @@ import 'package:leancode_pipe/leancode_pipe/authorized_pipe_http_client.dart'; import 'package:leancode_pipe/leancode_pipe/pipe_client.dart'; import 'package:signalr_core/signalr_core.dart'; +/// Creates a new SignalR hub connection configured for the pipe service. +/// +/// [pipeUrl] - The URL of the pipe service endpoint +/// [tokenFactory] - Optional factory function to provide authentication tokens +/// +/// The connection is configured with: +/// - WebSocket transport +/// - Automatic reconnection with exponential backoff +/// - Authorization via [AuthorizedPipeHttpClient] if [tokenFactory] is provided HubConnection createHubConnection({ required String pipeUrl, PipeTokenFactory? tokenFactory, diff --git a/dart/lib/leancode_pipe/pipe_client.dart b/dart/lib/leancode_pipe/pipe_client.dart index 1417163b..a20e98fe 100644 --- a/dart/lib/leancode_pipe/pipe_client.dart +++ b/dart/lib/leancode_pipe/pipe_client.dart @@ -15,10 +15,27 @@ import 'package:rxdart/rxdart.dart'; import 'package:signalr_core/signalr_core.dart'; import 'package:uuid/uuid.dart'; +/// A callback that handles pipe-related actions. typedef OnPipeAction = void Function(T value); + +/// A function that provides authentication tokens for the pipe connection. typedef PipeTokenFactory = Future Function(); +/// Client for the pipe service that manages real-time subscriptions. +/// +/// PipeClient handles: +/// - Connecting to the pipe service using SignalR +/// - Managing topic subscriptions +/// - Automatic reconnection and subscription recovery +/// - Authentication via tokens class PipeClient { + /// Creates a new pipe client connected to the specified URL. + /// + /// [pipeUrl] - The URL of the pipe service endpoint + /// [tokenFactory] - Function that provides authentication tokens + /// [onReconnected] - Called when connection is re-established + /// [onReconnecting] - Called when attempting to reconnect + /// [onClose] - Called when connection is closed factory PipeClient({ required String pipeUrl, required PipeTokenFactory tokenFactory, @@ -36,6 +53,7 @@ class PipeClient { onClose: onClose, ); + /// Creates a pipe client with a mock hub connection for testing. @visibleForTesting PipeClient.fromMock({ required HubConnection hubConnection, @@ -52,18 +70,27 @@ class PipeClient { static const _topicSubscriptionReconnectsCount = 3; static const _signalRRequestTimeoutDuration = Duration(seconds: 30); + /// Called when connection is re-established. final void Function()? onReconnected; + + /// Called when attempting to reconnect. final OnPipeAction? onReconnecting; + + /// Called when connection is closed. final OnPipeAction? onClose; static final _logger = Logger('PipeClient'); final _registeredTopicSubscriptions = []; + /// Current state of the pipe connection. PipeConnectionState get connectionState => PipeConnectionStateMapper.fromHubConnectionState( _hubConnection.state, ); + /// Connects to the pipe service. + /// + /// Throws [PipeConnectionException] if connection fails. Future connect() async { if (connectionState != PipeConnectionState.disconnected) { _logger.warning( @@ -100,6 +127,7 @@ class PipeClient { } } + /// Disconnects from the pipe service. Future disconnect() async { if (connectionState == PipeConnectionState.disconnected) { _logger.fine( @@ -116,6 +144,18 @@ class PipeClient { } } + /// Subscribes to notifications from the specified topic. + /// + /// [topic] - The topic to subscribe to + /// [onReconnect] - Optional callback when subscription reconnects + /// + /// Returns a [PipeSubscription] that can be used to receive notifications + /// and unsubscribe. + /// + /// Throws: + /// - [PipeConnectionException] if connection fails + /// - [PipeServerException] if server returns an error + /// - [PipeUnauthorizedException] if not authorized Future> subscribe( Topic topic, { void Function()? onReconnect, @@ -556,6 +596,9 @@ class PipeClient { } } + /// Releases all resources used by this client. + /// + /// Closes all subscriptions and disconnects from the server. Future dispose() async { await Future.wait(_registeredTopicSubscriptions.map((e) => e.close())); await _hubConnection.stop(); diff --git a/dart/lib/leancode_pipe/pipe_connection_state.dart b/dart/lib/leancode_pipe/pipe_connection_state.dart index be0d7ec3..df6b246c 100644 --- a/dart/lib/leancode_pipe/pipe_connection_state.dart +++ b/dart/lib/leancode_pipe/pipe_connection_state.dart @@ -13,5 +13,5 @@ enum PipeConnectionState { disconnecting, /// The pipe connection is reconnecting. - reconnecting + reconnecting, } diff --git a/dart/lib/leancode_pipe/pipe_connection_state_mapper.dart b/dart/lib/leancode_pipe/pipe_connection_state_mapper.dart index 13ee5075..6e883da6 100644 --- a/dart/lib/leancode_pipe/pipe_connection_state_mapper.dart +++ b/dart/lib/leancode_pipe/pipe_connection_state_mapper.dart @@ -1,7 +1,11 @@ import 'package:leancode_pipe/leancode_pipe/pipe_connection_state.dart'; import 'package:signalr_core/signalr_core.dart'; +/// Utility class for mapping SignalR connection states to [PipeConnectionState]. abstract final class PipeConnectionStateMapper { + /// Converts a SignalR [HubConnectionState] to a [PipeConnectionState]. + /// + /// If the input state is null, returns [PipeConnectionState.disconnected]. static PipeConnectionState fromHubConnectionState( HubConnectionState? state, ) => diff --git a/dart/lib/leancode_pipe/pipe_error.dart b/dart/lib/leancode_pipe/pipe_error.dart index 45dc81f9..357c2bf3 100644 --- a/dart/lib/leancode_pipe/pipe_error.dart +++ b/dart/lib/leancode_pipe/pipe_error.dart @@ -1,15 +1,23 @@ +/// Thrown when there is an error establishing or maintaining the pipe connection. class PipeConnectionException implements Exception { + /// Creates a new [PipeConnectionException]. const PipeConnectionException(); } +/// Thrown when the pipe fails to reconnect after a connection loss. class PipeReconnectException implements Exception { + /// Creates a new [PipeReconnectException]. const PipeReconnectException(); } +/// Thrown when the pipe connection is rejected due to invalid or missing authentication. class PipeUnauthorizedException implements Exception { + /// Creates a new [PipeUnauthorizedException]. const PipeUnauthorizedException(); } +/// Thrown when the pipe server encounters an internal error. class PipeServerException implements Exception { + /// Creates a new [PipeServerException]. const PipeServerException(); } diff --git a/dart/lib/leancode_pipe/pipe_subscription.dart b/dart/lib/leancode_pipe/pipe_subscription.dart index 09c1b8b9..d458ce72 100644 --- a/dart/lib/leancode_pipe/pipe_subscription.dart +++ b/dart/lib/leancode_pipe/pipe_subscription.dart @@ -1,8 +1,18 @@ import 'dart:async'; -/// A structure representing subscription to a [Topic<Notification>]. +import 'package:leancode_pipe/leancode_pipe.dart'; + +import 'package:leancode_pipe/leancode_pipe/contracts/topic.dart'; + +/// Represents an active subscription to a [Topic]. +/// +/// Provides a stream of notifications from the topic and allows unsubscribing. +/// Each subscription is tied to a specific topic and notification type. +/// +/// - ❗ Equality is based on references, do not override equality. class PipeSubscription extends StreamView { + /// Creates a new instance of [PipeSubscription]. PipeSubscription({ required Stream stream, required Future Function() unsubscribe, @@ -11,10 +21,10 @@ class PipeSubscription final Future Function() _unsubscribe; - /// Unsubscribe to the topic. + /// Unsubscribes from the topic, stopping the flow of notifications. + /// + /// After unsubscribing, no more notifications will be received from this subscription. Future unsubscribe() { return _unsubscribe(); } - - // Equality is based on references, do not override equality } diff --git a/dart/lib/leancode_pipe/topic_subscription.dart b/dart/lib/leancode_pipe/topic_subscription.dart index 8b6f9068..872d769e 100644 --- a/dart/lib/leancode_pipe/topic_subscription.dart +++ b/dart/lib/leancode_pipe/topic_subscription.dart @@ -6,27 +6,45 @@ import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:rxdart/rxdart.dart'; +/// A completer that will complete when the subscription is established. typedef TopicSubscribtionCompleter = Completer; +/// Manages the state and lifecycle of a subscription to a specific topic. +/// +/// Handles the subscription process, reconnection, and cleanup of topic subscriptions. +/// Each [TopicSubscription] maintains its own state and can have multiple registered +/// subscriptions listening to the same topic. class TopicSubscription { + /// Creates a new topic subscription with the given initial state. TopicSubscription( this.topic, this.subscriptionId, TopicSubscriptionState initState, ) : stateSubject = BehaviorSubject.seeded(initState); + /// Creates a new topic subscription with the given initial state. @visibleForTesting TopicSubscription.subject(this.topic, this.subscriptionId, this.stateSubject); + /// The topic this subscription is for. final Topic topic; + + /// Unique identifier for this subscription. final String subscriptionId; + + /// Stream of subscription state changes. final BehaviorSubject stateSubject; static final _logger = Logger('TopicSubscription'); bool _isClosed = false; + + /// Whether this subscription has been closed. bool get isClosed => _isClosed; + /// Closes this subscription and all its registered subscriptions. + /// + /// After closing, the subscription cannot be reused. Future close() async { final state = stateSubject.value; @@ -51,15 +69,21 @@ class TopicSubscription { 'TopicSubscription(topic: $topic, subscriptionId: $subscriptionId, stateSubject: $stateSubject)'; } +/// Base class for all possible states of a topic subscription. sealed class TopicSubscriptionState {} +/// State when a subscription is in the process of being established. final class TopicSubscriptionSubscribing implements TopicSubscriptionState { + /// Creates a new [TopicSubscriptionSubscribing] instance. TopicSubscriptionSubscribing() : completer = TopicSubscribtionCompleter(); + /// Completer that will complete when the subscription is established. final TopicSubscribtionCompleter completer; } +/// Represents a registered subscription that is actively receiving notifications. class RegisteredSubscription { + /// Creates a new [RegisteredSubscription] instance. RegisteredSubscription({ required this.id, required this.controller, @@ -70,27 +94,43 @@ class RegisteredSubscription { unsubscribe: unsubscribe, ); + /// Unique identifier for this subscription. final String id; + + /// Controller for the notification stream. final StreamController controller; + + /// The public subscription interface. final PipeSubscription pipeSubscription; + + /// Optional callback invoked when the subscription reconnects. final void Function()? onReconnect; } +/// State when a subscription is active and receiving notifications. final class TopicSubscriptionSubscribed implements TopicSubscriptionState { + /// Creates a new [TopicSubscriptionSubscribed] instance. TopicSubscriptionSubscribed({ required this.registeredSubscriptions, }); + /// List of active subscriptions for this topic. final List registeredSubscriptions; } +/// State when a subscription is in the process of being unsubscribed. final class TopicSubscriptionUnsubscribing implements TopicSubscriptionState { - TopicSubscriptionUnsubscribing({required this.subscribedState}) - : completer = TopicSubscribtionCompleter(); + /// Creates a new [TopicSubscriptionUnsubscribing] instance. + TopicSubscriptionUnsubscribing({ + required this.subscribedState, + }) : completer = TopicSubscribtionCompleter(); + /// Completer that will complete when the subscription is unsubscribed. final TopicSubscribtionCompleter completer; + + /// The subscribed state that was being unsubscribed. final TopicSubscriptionSubscribed subscribedState; /// Count of the subscriptions requests that are waiting for unsubscribe() call @@ -100,8 +140,11 @@ final class TopicSubscriptionUnsubscribing int newSubscriptionRequestsCount = 0; } +/// State when a subscription is in the process of reconnecting. final class TopicSubscriptionReconnecting implements TopicSubscriptionState { + /// Creates a new [TopicSubscriptionReconnecting] instance. TopicSubscriptionReconnecting() : completer = TopicSubscribtionCompleter(); + /// Completer that will complete when the subscription is reconnected. final TopicSubscribtionCompleter completer; }