Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add comments to the Dart package #69

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dart/analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include: package:leancode_lint/analysis_options.yaml
include: package:leancode_lint/analysis_options_package.yaml

analyzer:
exclude:
Expand Down
6 changes: 6 additions & 0 deletions dart/lib/leancode_pipe/authorized_pipe_http_client.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import 'package:http/http.dart' as http;
import 'package:leancode_pipe/leancode_pipe/pipe_client.dart';

/// An HTTP client that automatically adds authorization headers to requests.
///
/// Uses a `tokenFactory` to obtain Bearer tokens which are added to each request's
/// Authorization header. If no `tokenFactory` is provided, requests are sent without
/// authorization.
class AuthorizedPipeHttpClient extends http.BaseClient {
/// Creates a new [AuthorizedPipeHttpClient].
AuthorizedPipeHttpClient({
http.Client? client,
required PipeTokenFactory? tokenFactory,
Expand Down
45 changes: 45 additions & 0 deletions dart/lib/leancode_pipe/contracts/contracts.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,71 @@ const _pipeContractsSerializable = JsonSerializable(
fieldRename: FieldRename.pascal,
);

/// Result of a subscription operation returned by the server.
@_pipeContractsSerializable
class SubscriptionResult {
/// Creates a new [SubscriptionResult] instance.
const SubscriptionResult({
required this.subscriptionId,
required this.status,
required this.type,
});

/// Creates a new [SubscriptionResult] instance from a JSON object.
factory SubscriptionResult.fromJson(Map<String, dynamic> json) =>
_$SubscriptionResultFromJson(json);

/// Unique identifier for the subscription.
final String subscriptionId;

/// Status of the subscription operation.
final SubscriptionStatus status;

/// Type of operation (subscribe/unsubscribe).
final OperationType type;

/// Converts the subscription result to a JSON object.
Map<String, dynamic> toJson() => _$SubscriptionResultToJson(this);
}

/// Status codes returned by the server for subscription operations.
enum SubscriptionStatus {
/// Operation completed successfully.
@JsonValue(0)
success,

/// Client is not authorized to perform the operation.
@JsonValue(1)
unauthorized,

/// Request was malformed.
@JsonValue(2)
malformed,

/// Request was invalid.
@JsonValue(3)
invalid,

/// Server encountered an internal error.
@JsonValue(4)
internalServerError,
}

/// Types of subscription operations.
enum OperationType {
/// Subscribe to a topic.
@JsonValue(0)
subscribe,

/// Unsubscribe from a topic.
@JsonValue(1)
unsubscribe,
}

/// Envelope containing a notification sent by the server.
@_pipeContractsSerializable
class NotificationEnvelope {
/// Creates a new [NotificationEnvelope] instance.
const NotificationEnvelope({
required this.id,
required this.topicType,
Expand All @@ -54,15 +79,26 @@ class NotificationEnvelope {
required this.notification,
});

/// Creates a new [NotificationEnvelope] instance from a JSON object.
factory NotificationEnvelope.fromJson(Map<String, dynamic> json) =>
_$NotificationEnvelopeFromJson(json);

/// Unique identifier for this notification.
final String id;

/// Full name of the topic type.
final String topicType;

/// Type of the notification.
final String notificationType;

/// Topic data in JSON format.
final Map<String, dynamic> topic;

/// The notification payload.
final Object notification;

/// Converts the notification envelope to a JSON object.
Map<String, dynamic> toJson() => _$NotificationEnvelopeToJson(this);

@override
Expand All @@ -71,21 +107,30 @@ class NotificationEnvelope {
}
}

/// Envelope containing subscription request data sent to the server.
@_pipeContractsSerializable
class SubscriptionEnvelope {
/// Creates a new [SubscriptionEnvelope] instance.
const SubscriptionEnvelope({
required this.id,
required this.topic,
required this.topicType,
});

/// Creates a new [SubscriptionEnvelope] instance from a JSON object.
factory SubscriptionEnvelope.fromJson(Map<String, dynamic> json) =>
_$SubscriptionEnvelopeFromJson(json);

/// Unique identifier for this subscription request.
final String id;

/// Topic data in JSON format.
final Map<String, dynamic> topic;

/// Full name of the topic type.
final String topicType;

/// Converts the subscription envelope to a JSON object.
Map<String, dynamic> toJson() => _$SubscriptionEnvelopeToJson(this);

@override
Expand Down
8 changes: 8 additions & 0 deletions dart/lib/leancode_pipe/contracts/topic.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
/// Represents a topic that can be subscribed to for receiving notifications.
///
/// A topic is a source of typed notifications that clients can subscribe to.
/// Each topic has a specific [Notification] type that it emits.
abstract interface class Topic<Notification extends Object> {
/// Attempts to cast a JSON notification to the correct [Notification] type.
Notification? castNotification(String tag, dynamic json);

/// Returns the fully qualified name of this topic type.
String getFullName();

/// Converts this topic to a JSON representation.
Map<String, dynamic> toJson();

/// Creates a new instance of this topic from JSON data.
Topic<Notification> fromJson(Map<String, dynamic> json);
}
9 changes: 9 additions & 0 deletions dart/lib/leancode_pipe/create_hub_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ import 'package:leancode_pipe/leancode_pipe/authorized_pipe_http_client.dart';
import 'package:leancode_pipe/leancode_pipe/pipe_client.dart';
import 'package:signalr_core/signalr_core.dart';

/// Creates a new SignalR hub connection configured for the pipe service.
///
/// [pipeUrl] - The URL of the pipe service endpoint
/// [tokenFactory] - Optional factory function to provide authentication tokens
///
/// The connection is configured with:
/// - WebSocket transport
/// - Automatic reconnection with exponential backoff
/// - Authorization via [AuthorizedPipeHttpClient] if [tokenFactory] is provided
HubConnection createHubConnection({
required String pipeUrl,
PipeTokenFactory? tokenFactory,
Expand Down
43 changes: 43 additions & 0 deletions dart/lib/leancode_pipe/pipe_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@ import 'package:rxdart/rxdart.dart';
import 'package:signalr_core/signalr_core.dart';
import 'package:uuid/uuid.dart';

/// A callback that handles pipe-related actions.
typedef OnPipeAction<T> = void Function(T value);

/// A function that provides authentication tokens for the pipe connection.
typedef PipeTokenFactory = Future<String> Function();

/// Client for the pipe service that manages real-time subscriptions.
///
/// PipeClient handles:
/// - Connecting to the pipe service using SignalR
/// - Managing topic subscriptions
/// - Automatic reconnection and subscription recovery
/// - Authentication via tokens
class PipeClient {
/// Creates a new pipe client connected to the specified URL.
///
/// [pipeUrl] - The URL of the pipe service endpoint
/// [tokenFactory] - Function that provides authentication tokens
/// [onReconnected] - Called when connection is re-established
/// [onReconnecting] - Called when attempting to reconnect
/// [onClose] - Called when connection is closed
factory PipeClient({
required String pipeUrl,
required PipeTokenFactory tokenFactory,
Expand All @@ -36,6 +53,7 @@ class PipeClient {
onClose: onClose,
);

/// Creates a pipe client with a mock hub connection for testing.
@visibleForTesting
PipeClient.fromMock({
required HubConnection hubConnection,
Expand All @@ -52,18 +70,27 @@ class PipeClient {
static const _topicSubscriptionReconnectsCount = 3;
static const _signalRRequestTimeoutDuration = Duration(seconds: 30);

/// Called when connection is re-established.
final void Function()? onReconnected;

/// Called when attempting to reconnect.
final OnPipeAction<Exception?>? onReconnecting;

/// Called when connection is closed.
final OnPipeAction<Exception?>? onClose;

static final _logger = Logger('PipeClient');
final _registeredTopicSubscriptions = <TopicSubscription>[];

/// Current state of the pipe connection.
PipeConnectionState get connectionState =>
PipeConnectionStateMapper.fromHubConnectionState(
_hubConnection.state,
);

/// Connects to the pipe service.
///
/// Throws [PipeConnectionException] if connection fails.
Future<void> connect() async {
if (connectionState != PipeConnectionState.disconnected) {
_logger.warning(
Expand Down Expand Up @@ -100,6 +127,7 @@ class PipeClient {
}
}

/// Disconnects from the pipe service.
Future<void> disconnect() async {
if (connectionState == PipeConnectionState.disconnected) {
_logger.fine(
Expand All @@ -116,6 +144,18 @@ class PipeClient {
}
}

/// Subscribes to notifications from the specified topic.
///
/// [topic] - The topic to subscribe to
/// [onReconnect] - Optional callback when subscription reconnects
///
/// Returns a [PipeSubscription] that can be used to receive notifications
/// and unsubscribe.
///
/// Throws:
/// - [PipeConnectionException] if connection fails
/// - [PipeServerException] if server returns an error
/// - [PipeUnauthorizedException] if not authorized
Future<PipeSubscription<N>> subscribe<N extends Object>(
Topic<N> topic, {
void Function()? onReconnect,
Expand Down Expand Up @@ -556,6 +596,9 @@ class PipeClient {
}
}

/// Releases all resources used by this client.
///
/// Closes all subscriptions and disconnects from the server.
Future<void> dispose() async {
await Future.wait(_registeredTopicSubscriptions.map((e) => e.close()));
await _hubConnection.stop();
Expand Down
2 changes: 1 addition & 1 deletion dart/lib/leancode_pipe/pipe_connection_state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ enum PipeConnectionState {
disconnecting,

/// The pipe connection is reconnecting.
reconnecting
reconnecting,
}
4 changes: 4 additions & 0 deletions dart/lib/leancode_pipe/pipe_connection_state_mapper.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import 'package:leancode_pipe/leancode_pipe/pipe_connection_state.dart';
import 'package:signalr_core/signalr_core.dart';

/// Utility class for mapping SignalR connection states to [PipeConnectionState].
abstract final class PipeConnectionStateMapper {
/// Converts a SignalR [HubConnectionState] to a [PipeConnectionState].
///
/// If the input state is null, returns [PipeConnectionState.disconnected].
static PipeConnectionState fromHubConnectionState(
HubConnectionState? state,
) =>
Expand Down
8 changes: 8 additions & 0 deletions dart/lib/leancode_pipe/pipe_error.dart
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
/// Thrown when there is an error establishing or maintaining the pipe connection.
class PipeConnectionException implements Exception {
/// Creates a new [PipeConnectionException].
const PipeConnectionException();
}

/// Thrown when the pipe fails to reconnect after a connection loss.
class PipeReconnectException implements Exception {
/// Creates a new [PipeReconnectException].
const PipeReconnectException();
}

/// Thrown when the pipe connection is rejected due to invalid or missing authentication.
class PipeUnauthorizedException implements Exception {
/// Creates a new [PipeUnauthorizedException].
const PipeUnauthorizedException();
}

/// Thrown when the pipe server encounters an internal error.
class PipeServerException implements Exception {
/// Creates a new [PipeServerException].
const PipeServerException();
}
18 changes: 14 additions & 4 deletions dart/lib/leancode_pipe/pipe_subscription.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import 'dart:async';

/// A structure representing subscription to a [Topic&lt;Notification&gt;].
import 'package:leancode_pipe/leancode_pipe.dart';

import 'package:leancode_pipe/leancode_pipe/contracts/topic.dart';

/// Represents an active subscription to a [Topic].
///
/// Provides a stream of notifications from the topic and allows unsubscribing.
/// Each subscription is tied to a specific topic and notification type.
///
/// - ❗ Equality is based on references, do not override equality.
class PipeSubscription<Notification extends Object>
extends StreamView<Notification> {
/// Creates a new instance of [PipeSubscription].
PipeSubscription({
required Stream<Notification> stream,
required Future<void> Function() unsubscribe,
Expand All @@ -11,10 +21,10 @@ class PipeSubscription<Notification extends Object>

final Future<void> Function() _unsubscribe;

/// Unsubscribe to the topic.
/// Unsubscribes from the topic, stopping the flow of notifications.
///
/// After unsubscribing, no more notifications will be received from this subscription.
Future<void> unsubscribe() {
return _unsubscribe();
}

// Equality is based on references, do not override equality
}
Loading