Skip to content

Commit

Permalink
fix: Better stream and access token management (#1019)
Browse files Browse the repository at this point in the history
* fix: use correct join payload

* fix: keep auth token valid while in background when using realtime

* fix: correct supabase_flutter init log

* refactor: add toString method to RealtimeCloseEvent

* fix: reload data from postgrest after new realtime connection

* test: fix mock test by increasing delay

* fix: load postgrest before realtime conn and close realtime on error

* test: restore delay and expect access_token instead of user_token

* test: fix typo

* refactor: small rename

* fix: wait for conn being ready and re-add error to _triggerChanError

* fix: don't stringify errors and fix tests

* test: close conn from server

* fix: disconnect when in background and await connecting to be ready

* fix: rejoin channels in edge case

* docs: improve method comments
  • Loading branch information
Vinzent03 authored Sep 26, 2024
1 parent e095c14 commit 4a8b641
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 50 deletions.
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel;
export 'src/constants.dart'
show RealtimeConstants, RealtimeLogLevel, SocketStates;
export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
Expand Down
4 changes: 2 additions & 2 deletions packages/realtime_client/lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ enum SocketStates {
/// Connection is live and connected
open,

/// Socket is closing.
closing,
/// Socket is closing by the user
disconnecting,

/// Socket being close not by the user. Realtime should attempt to reconnect.
closed,
Expand Down
23 changes: 20 additions & 3 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class RealtimeChannel {
socket.remove(this);
});

_onError((String? reason) {
_onError((reason) {
if (isLeaving || isClosed) {
return;
}
Expand Down Expand Up @@ -260,9 +260,9 @@ class RealtimeChannel {
}

/// Registers a callback that will be executed when the channel encounteres an error.
void _onError(void Function(String?) callback) {
void _onError(Function callback) {
onEvents(ChannelEvents.error.eventName(), ChannelFilter(),
(reason, [ref]) => callback(reason?.toString()));
(reason, [ref]) => callback(reason));
}

/// Sets up a listener on your Supabase database.
Expand Down Expand Up @@ -646,6 +646,23 @@ class RealtimeChannel {
joinPush.resend(timeout ?? _timeout);
}

/// Resends [joinPush] to tell the server we join this channel again and marks
/// the channel as [ChannelStates.joining].
///
/// Usually [rejoin] only happens when the channel timeouts or errors out.
/// When manually disconnecting, the channel is still marked as
/// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will
/// unsubscribe itself, which causes issues when trying to rejoin. This method
/// therefore doesn't call [RealtimeClient.leaveOpenTopic].
@internal
void forceRejoin([Duration? timeout]) {
if (isLeaving) {
return;
}
_state = ChannelStates.joining;
joinPush.resend(timeout ?? _timeout);
}

void trigger(String type, [dynamic payload, String? ref]) {
final typeLower = type.toLowerCase();

Expand Down
76 changes: 53 additions & 23 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class RealtimeCloseEvent {
required this.code,
required this.reason,
});

@override
String toString() {
return 'RealtimeCloseEvent(code: $code, reason: $reason)';
}
}

class RealtimeClient {
Expand Down Expand Up @@ -134,17 +139,17 @@ class RealtimeClient {
(String payload, Function(dynamic result) callback) =>
callback(json.decode(payload));
reconnectTimer = RetryTimer(
() {
disconnect();
connect();
() async {
await disconnect();
await connect();
},
this.reconnectAfterMs,
);
}

/// Connects the socket.
@internal
void connect() async {
Future<void> connect() async {
if (conn != null) {
return;
}
Expand All @@ -153,8 +158,20 @@ class RealtimeClient {
connState = SocketStates.connecting;
conn = transport(endPointURL, headers);

// handle connection errors
conn!.ready.catchError(_onConnError);
try {
await conn!.ready;
} catch (error) {
// Don't schedule a reconnect and emit error if connection has been
// closed by the user or [disconnect] waits for the connection to be
// ready before closing it.
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
_onConnError(error);
reconnectTimer.scheduleTimeout();
}
return;
}

connState = SocketStates.open;

Expand All @@ -166,7 +183,8 @@ class RealtimeClient {
onError: _onConnError,
onDone: () {
// communication has been closed
if (connState != SocketStates.disconnected) {
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
}
_onConnClose();
Expand All @@ -179,20 +197,32 @@ class RealtimeClient {
}

/// Disconnects the socket with status [code] and [reason] for the disconnect
void disconnect({int? code, String? reason}) {
Future<void> disconnect({int? code, String? reason}) async {
final conn = this.conn;
if (conn != null) {
connState = SocketStates.disconnected;
if (code != null) {
conn.sink.close(code, reason ?? '');
} else {
conn.sink.close();
final oldState = connState;
connState = SocketStates.disconnecting;

// Connection cannot be closed while it's still connecting. Wait for connection to
// be ready and then close it.
if (oldState == SocketStates.connecting) {
await conn.ready.catchError((_) {});
}

if (oldState == SocketStates.open ||
oldState == SocketStates.connecting) {
if (code != null) {
await conn.sink.close(code, reason ?? '');
} else {
await conn.sink.close();
}
connState = SocketStates.disconnected;
reconnectTimer.reset();
}
this.conn = null;

// remove open handles
if (heartbeatTimer != null) heartbeatTimer?.cancel();
reconnectTimer.reset();
}
}

Expand Down Expand Up @@ -251,8 +281,8 @@ class RealtimeClient {
return 'connecting';
case SocketStates.open:
return 'open';
case SocketStates.closing:
return 'closing';
case SocketStates.disconnecting:
return 'disconnecting';
case SocketStates.disconnected:
return 'disconnected';
case SocketStates.closed:
Expand All @@ -262,7 +292,7 @@ class RealtimeClient {
}

/// Retuns `true` is the connection is open.
bool get isConnected => connectionState == 'open';
bool get isConnected => connState == SocketStates.open;

/// Removes a subscription from the socket.
@internal
Expand Down Expand Up @@ -353,15 +383,15 @@ class RealtimeClient {

for (final channel in channels) {
if (token != null) {
channel.updateJoinPayload({'user_token': token});
channel.updateJoinPayload({'access_token': token});
}
if (channel.joinedOnce && channel.isJoined) {
channel.push(ChannelEvents.accessToken, {'access_token': token});
}
}
}

/// Unsubscribe from channels with the specified topic.
/// Unsubscribe from joined or joining channels with the specified topic.
@internal
void leaveOpenTopic(String topic) {
final dupChannel = channels.firstWhereOrNull(
Expand Down Expand Up @@ -399,7 +429,7 @@ class RealtimeClient {
/// SocketStates.disconnected: by user with socket.disconnect()
/// SocketStates.closed: NOT by user, should try to reconnect
if (connState == SocketStates.closed) {
_triggerChanError();
_triggerChanError(event);
reconnectTimer.scheduleTimeout();
}
if (heartbeatTimer != null) heartbeatTimer!.cancel();
Expand All @@ -410,15 +440,15 @@ class RealtimeClient {

void _onConnError(dynamic error) {
log('transport', error.toString());
_triggerChanError();
_triggerChanError(error);
for (final callback in stateChangeCallbacks['error']!) {
callback(error);
}
}

void _triggerChanError() {
void _triggerChanError([dynamic error]) {
for (final channel in channels) {
channel.trigger(ChannelEvents.error.eventName());
channel.trigger(ChannelEvents.error.eventName(), error);
}
}

Expand Down
8 changes: 5 additions & 3 deletions packages/realtime_client/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ void main() {
final subscribeCallback =
expectAsync2((RealtimeSubscribeStatus event, error) {
if (event == RealtimeSubscribeStatus.channelError) {
expect(error, isNull);
expect(error, isA<RealtimeCloseEvent>());
error as RealtimeCloseEvent;
expect(error.reason, "heartbeat timeout");
} else {
expect(event, RealtimeSubscribeStatus.closed);
}
Expand All @@ -285,8 +287,8 @@ void main() {

channel.subscribe(subscribeCallback);

await client.conn!.sink
.close(Constants.wsCloseNormal, "heartbeat timeout");
await Future.delayed(Duration(milliseconds: 200));
await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout");
});
});

Expand Down
12 changes: 8 additions & 4 deletions packages/realtime_client/test/socket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void main() {
});

socket.connect();
await Future.delayed(const Duration(milliseconds: 200));
expect(opens, 1);

socket.sendHeartbeat();
Expand Down Expand Up @@ -214,8 +215,8 @@ void main() {
});

test('removes existing connection', () async {
socket.connect();
socket.disconnect();
await socket.connect();
await socket.disconnect();

expect(socket.conn, null);
});
Expand All @@ -229,7 +230,7 @@ void main() {
expect(closes, 1);
});

test('calls connection close callback', () {
test('calls connection close callback', () async {
final mockedSocketChannel = MockIOWebSocketChannel();
final mockedSocket = RealtimeClient(
socketEndpoint,
Expand All @@ -247,7 +248,10 @@ void main() {
const tReason = 'reason';

mockedSocket.connect();
mockedSocket.connState = SocketStates.open;
await Future.delayed(const Duration(milliseconds: 200));
mockedSocket.disconnect(code: tCode, reason: tReason);
await Future.delayed(const Duration(milliseconds: 200));

verify(
() => mockedSink.close(
Expand Down Expand Up @@ -423,7 +427,7 @@ void main() {
});

group('setAuth', () {
final updateJoinPayload = {'user_token': 'token123'};
final updateJoinPayload = {'access_token': 'token123'};
final pushPayload = {'access_token': 'token123'};

test(
Expand Down
9 changes: 7 additions & 2 deletions packages/supabase/lib/src/supabase_query_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ class SupabaseQueryBuilder extends PostgrestQueryBuilder {
url: Uri.parse(url),
);

/// Returns real-time data from your table as a `Stream`.
/// Combines the current state of your table from PostgREST with changes from the realtime server to return real-time data from your table as a [Stream].
///
/// Realtime is disabled by default for new tables. You can turn it on by [managing replication](https://supabase.com/docs/guides/realtime/extensions/postgres-changes#replication-setup).
///
/// Pass the list of primary key column names to [primaryKey], which will be used to updating and deleting the proper records internally as the library receives real-time updates.
/// Pass the list of primary key column names to [primaryKey], which will be used to update and delete the proper records internally as the stream receives real-time updates.
///
/// It handles the lifecycle of the realtime connection and automatically refetches data from PostgREST when needed.
///
/// Make sure to provide `onError` and `onDone` callbacks to [Stream.listen] to handle errors and completion of the stream.
/// The stream gets closed when the realtime connection is closed.
///
/// ```dart
/// supabase.from('chats').stream(primaryKey: ['id']).listen(_onChatsReceived);
Expand Down
Loading

0 comments on commit 4a8b641

Please sign in to comment.