Skip to content

Commit

Permalink
Merge branch 'main' into feat/logger
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinzent03 committed Sep 26, 2024
2 parents 5e6e0c9 + 4a8b641 commit df07430
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 68 deletions.
20 changes: 19 additions & 1 deletion infra/storage_client/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@ services:
FILE_STORAGE_BACKEND_PATH: /tmp/storage
ENABLE_IMAGE_TRANSFORMATION: "true"
IMGPROXY_URL: http://imgproxy:8080
DEBUG: "knex:*"

volumes:
- assets-volume:/tmp/storage
healthcheck:
test: ['CMD-SHELL', 'curl -f -LI http://localhost:5000/status']
interval: 2s

db:
build:
context: ./postgres
Expand All @@ -62,6 +66,20 @@ services:
timeout: 5s
retries: 5

dummy_data:
build:
context: ./postgres
depends_on:
storage:
condition: service_healthy
volumes:
- ./postgres:/sql
command:
- psql
- "postgresql://postgres:postgres@db:5432/postgres"
- -f
- /sql/dummy-data.sql

imgproxy:
image: darthsim/imgproxy
ports:
Expand All @@ -73,4 +91,4 @@ services:
- IMGPROXY_USE_ETAG=true
- IMGPROXY_ENABLE_WEBP_DETECTION=true
volumes:
assets-volume:
assets-volume:
3 changes: 1 addition & 2 deletions infra/storage_client/postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ FROM supabase/postgres:0.13.0
COPY 00-initial-schema.sql /docker-entrypoint-initdb.d/00-initial-schema.sql
COPY auth-schema.sql /docker-entrypoint-initdb.d/01-auth-schema.sql
COPY storage-schema.sql /docker-entrypoint-initdb.d/02-storage-schema.sql
COPY dummy-data.sql /docker-entrypoint-initdb.d/03-dummy-data.sql

# Build time defaults
ARG build_POSTGRES_DB=postgres
Expand All @@ -17,4 +16,4 @@ ENV POSTGRES_USER=$build_POSTGRES_USER
ENV POSTGRES_PASSWORD=$build_POSTGRES_PASSWORD
ENV POSTGRES_PORT=$build_POSTGRES_PORT

EXPOSE 5432
EXPOSE 5432
18 changes: 7 additions & 11 deletions infra/storage_client/postgres/storage-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ CREATE TABLE "storage"."objects" (
"last_accessed_at" timestamptz DEFAULT now(),
"metadata" jsonb,
CONSTRAINT "objects_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"),
CONSTRAINT "objects_owner_fkey" FOREIGN KEY ("owner") REFERENCES "auth"."users"("id"),
PRIMARY KEY ("id")
);
CREATE UNIQUE INDEX "bucketid_objname" ON "storage"."objects" USING BTREE ("bucket_id","name");
Expand Down Expand Up @@ -85,27 +84,24 @@ CREATE OR REPLACE FUNCTION storage.search(prefix text, bucketname text, limits i
)
LANGUAGE plpgsql
AS $function$
DECLARE
_bucketId text;
BEGIN
select buckets."id" from buckets where buckets.name=bucketname limit 1 into _bucketId;
return query
return query
with files_folders as (
select ((string_to_array(objects.name, '/'))[levels]) as folder
from objects
where objects.name ilike prefix || '%'
and bucket_id = _bucketId
and bucket_id = bucketname
GROUP by folder
limit limits
offset offsets
)
select files_folders.folder as name, objects.id, objects.updated_at, objects.created_at, objects.last_accessed_at, objects.metadata from files_folders
)
select files_folders.folder as name, objects.id, objects.updated_at, objects.created_at, objects.last_accessed_at, objects.metadata from files_folders
left join objects
on prefix || files_folders.folder = objects.name
where objects.id is null or objects.bucket_id=_bucketId;
on prefix || files_folders.folder = objects.name and objects.bucket_id=bucketname;
END
$function$;

GRANT ALL PRIVILEGES ON SCHEMA storage TO postgres;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA storage TO postgres;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA storage TO postgres;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA storage TO postgres;

4 changes: 2 additions & 2 deletions infra/storage_client/storage/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
FROM supabase/storage-api:v0.35.1
FROM supabase/storage-api:v1.8.2

RUN apk add curl --no-cache
RUN apk add curl --no-cache
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export 'src/constants.dart' show RealtimeConstants, RealtimeLogLevel;
export 'src/constants.dart'
show RealtimeConstants, RealtimeLogLevel, SocketStates;
export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
Expand Down
4 changes: 2 additions & 2 deletions packages/realtime_client/lib/src/constants.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ enum SocketStates {
/// Connection is live and connected
open,

/// Socket is closing.
closing,
/// Socket is closing by the user
disconnecting,

/// Socket being close not by the user. Realtime should attempt to reconnect.
closed,
Expand Down
23 changes: 20 additions & 3 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class RealtimeChannel {
socket.remove(this);
});

_onError((String? reason) {
_onError((reason) {
if (isLeaving || isClosed) {
return;
}
Expand Down Expand Up @@ -260,9 +260,9 @@ class RealtimeChannel {
}

/// Registers a callback that will be executed when the channel encounteres an error.
void _onError(void Function(String?) callback) {
void _onError(Function callback) {
onEvents(ChannelEvents.error.eventName(), ChannelFilter(),
(reason, [ref]) => callback(reason?.toString()));
(reason, [ref]) => callback(reason));
}

/// Sets up a listener on your Supabase database.
Expand Down Expand Up @@ -646,6 +646,23 @@ class RealtimeChannel {
joinPush.resend(timeout ?? _timeout);
}

/// Resends [joinPush] to tell the server we join this channel again and marks
/// the channel as [ChannelStates.joining].
///
/// Usually [rejoin] only happens when the channel timeouts or errors out.
/// When manually disconnecting, the channel is still marked as
/// [ChannelStates.joined]. Calling [RealtimeClient.leaveOpenTopic] will
/// unsubscribe itself, which causes issues when trying to rejoin. This method
/// therefore doesn't call [RealtimeClient.leaveOpenTopic].
@internal
void forceRejoin([Duration? timeout]) {
if (isLeaving) {
return;
}
_state = ChannelStates.joining;
joinPush.resend(timeout ?? _timeout);
}

void trigger(String type, [dynamic payload, String? ref]) {
final typeLower = type.toLowerCase();

Expand Down
76 changes: 53 additions & 23 deletions packages/realtime_client/lib/src/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ class RealtimeCloseEvent {
required this.code,
required this.reason,
});

@override
String toString() {
return 'RealtimeCloseEvent(code: $code, reason: $reason)';
}
}

class RealtimeClient {
Expand Down Expand Up @@ -134,17 +139,17 @@ class RealtimeClient {
(String payload, Function(dynamic result) callback) =>
callback(json.decode(payload));
reconnectTimer = RetryTimer(
() {
disconnect();
connect();
() async {
await disconnect();
await connect();
},
this.reconnectAfterMs,
);
}

/// Connects the socket.
@internal
void connect() async {
Future<void> connect() async {
if (conn != null) {
return;
}
Expand All @@ -153,8 +158,20 @@ class RealtimeClient {
connState = SocketStates.connecting;
conn = transport(endPointURL, headers);

// handle connection errors
conn!.ready.catchError(_onConnError);
try {
await conn!.ready;
} catch (error) {
// Don't schedule a reconnect and emit error if connection has been
// closed by the user or [disconnect] waits for the connection to be
// ready before closing it.
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
_onConnError(error);
reconnectTimer.scheduleTimeout();
}
return;
}

connState = SocketStates.open;

Expand All @@ -166,7 +183,8 @@ class RealtimeClient {
onError: _onConnError,
onDone: () {
// communication has been closed
if (connState != SocketStates.disconnected) {
if (connState != SocketStates.disconnected &&
connState != SocketStates.disconnecting) {
connState = SocketStates.closed;
}
_onConnClose();
Expand All @@ -179,20 +197,32 @@ class RealtimeClient {
}

/// Disconnects the socket with status [code] and [reason] for the disconnect
void disconnect({int? code, String? reason}) {
Future<void> disconnect({int? code, String? reason}) async {
final conn = this.conn;
if (conn != null) {
connState = SocketStates.disconnected;
if (code != null) {
conn.sink.close(code, reason ?? '');
} else {
conn.sink.close();
final oldState = connState;
connState = SocketStates.disconnecting;

// Connection cannot be closed while it's still connecting. Wait for connection to
// be ready and then close it.
if (oldState == SocketStates.connecting) {
await conn.ready.catchError((_) {});
}

if (oldState == SocketStates.open ||
oldState == SocketStates.connecting) {
if (code != null) {
await conn.sink.close(code, reason ?? '');
} else {
await conn.sink.close();
}
connState = SocketStates.disconnected;
reconnectTimer.reset();
}
this.conn = null;

// remove open handles
if (heartbeatTimer != null) heartbeatTimer?.cancel();
reconnectTimer.reset();
}
}

Expand Down Expand Up @@ -251,8 +281,8 @@ class RealtimeClient {
return 'connecting';
case SocketStates.open:
return 'open';
case SocketStates.closing:
return 'closing';
case SocketStates.disconnecting:
return 'disconnecting';
case SocketStates.disconnected:
return 'disconnected';
case SocketStates.closed:
Expand All @@ -262,7 +292,7 @@ class RealtimeClient {
}

/// Retuns `true` is the connection is open.
bool get isConnected => connectionState == 'open';
bool get isConnected => connState == SocketStates.open;

/// Removes a subscription from the socket.
@internal
Expand Down Expand Up @@ -353,15 +383,15 @@ class RealtimeClient {

for (final channel in channels) {
if (token != null) {
channel.updateJoinPayload({'user_token': token});
channel.updateJoinPayload({'access_token': token});
}
if (channel.joinedOnce && channel.isJoined) {
channel.push(ChannelEvents.accessToken, {'access_token': token});
}
}
}

/// Unsubscribe from channels with the specified topic.
/// Unsubscribe from joined or joining channels with the specified topic.
@internal
void leaveOpenTopic(String topic) {
final dupChannel = channels.firstWhereOrNull(
Expand Down Expand Up @@ -399,7 +429,7 @@ class RealtimeClient {
/// SocketStates.disconnected: by user with socket.disconnect()
/// SocketStates.closed: NOT by user, should try to reconnect
if (connState == SocketStates.closed) {
_triggerChanError();
_triggerChanError(event);
reconnectTimer.scheduleTimeout();
}
if (heartbeatTimer != null) heartbeatTimer!.cancel();
Expand All @@ -410,15 +440,15 @@ class RealtimeClient {

void _onConnError(dynamic error) {
log('transport', error.toString());
_triggerChanError();
_triggerChanError(error);
for (final callback in stateChangeCallbacks['error']!) {
callback(error);
}
}

void _triggerChanError() {
void _triggerChanError([dynamic error]) {
for (final channel in channels) {
channel.trigger(ChannelEvents.error.eventName());
channel.trigger(ChannelEvents.error.eventName(), error);
}
}

Expand Down
8 changes: 5 additions & 3 deletions packages/realtime_client/test/mock_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ void main() {
final subscribeCallback =
expectAsync2((RealtimeSubscribeStatus event, error) {
if (event == RealtimeSubscribeStatus.channelError) {
expect(error, isNull);
expect(error, isA<RealtimeCloseEvent>());
error as RealtimeCloseEvent;
expect(error.reason, "heartbeat timeout");
} else {
expect(event, RealtimeSubscribeStatus.closed);
}
Expand All @@ -285,8 +287,8 @@ void main() {

channel.subscribe(subscribeCallback);

await client.conn!.sink
.close(Constants.wsCloseNormal, "heartbeat timeout");
await Future.delayed(Duration(milliseconds: 200));
await webSocket?.close(Constants.wsCloseNormal, "heartbeat timeout");
});
});

Expand Down
Loading

0 comments on commit df07430

Please sign in to comment.