diff --git a/js-src/Channel.ts b/js-src/Channel.ts index e7d1db9..faf937c 100644 --- a/js-src/Channel.ts +++ b/js-src/Channel.ts @@ -117,7 +117,9 @@ export class Channel extends BaseChannel implements PresenceChannel { } here(callback: Function): Channel { - // TODO: implement + this.on('subscription_succeeded', (data) => { + callback(data) + }) return this } @@ -126,7 +128,9 @@ export class Channel extends BaseChannel implements PresenceChannel { * Listen for someone joining the channel. */ joining(callback: Function): Channel { - // TODO: implement + this.on('member_added', (data) => { + callback(data) + }) return this } @@ -135,7 +139,9 @@ export class Channel extends BaseChannel implements PresenceChannel { * Listen for someone leaving the channel. */ leaving(callback: Function): Channel { - // TODO: implement + this.on('member_removed', (data) => { + callback(data) + }) return this } diff --git a/src/ConnectionRepository.php b/src/ConnectionRepository.php index 5fe631b..c6b93f7 100644 --- a/src/ConnectionRepository.php +++ b/src/ConnectionRepository.php @@ -5,6 +5,7 @@ use Aws\ApiGatewayManagementApi\ApiGatewayManagementApiClient; use Aws\ApiGatewayManagementApi\Exception\ApiGatewayManagementApiException; use GuzzleHttp\Exception\ClientException; +use Symfony\Component\HttpFoundation\Response; class ConnectionRepository { @@ -28,8 +29,11 @@ public function sendMessage(string $connectionId, string $data): void 'Data' => $data, ]); } catch (ApiGatewayManagementApiException $e) { - // GoneException: The connection with the provided id no longer exists. - if ($e->getAwsErrorCode() === 'GoneException') { + // GoneException: The connection with the provided id no longer exists. + if ( + $e->getStatusCode() === Response::HTTP_GONE || + $e->getAwsErrorCode() === 'GoneException' + ) { $this->subscriptionRepository->clearConnection($connectionId); return; diff --git a/src/Handler.php b/src/Handler.php index 1517fb7..29eb7cf 100644 --- a/src/Handler.php +++ b/src/Handler.php @@ -9,6 +9,7 @@ use Bref\Event\Http\HttpResponse; use Illuminate\Support\Arr; use Illuminate\Support\Str; +use Symfony\Component\HttpFoundation\Response; use Throwable; class Handler extends WebsocketHandler @@ -42,6 +43,7 @@ public function handleWebsocket(WebsocketEvent $event, Context $context): HttpRe protected function handleDisconnect(WebsocketEvent $event, Context $context): void { + $this->sendPresenceDisconnectNotices($event); $this->subscriptionRepository->clearConnection($event->getConnectionId()); } @@ -115,12 +117,28 @@ protected function subscribe(WebsocketEvent $event, Context $context): void } } - $this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel); + if (Str::startsWith($channel, 'presence-')) { + $this->subscriptionRepository->subscribeToPresenceChannel( + $event->getConnectionId(), + $channelData, + $channel + ); + $data = $this->subscriptionRepository->getUserListForPresenceChannel($channel) + ->transform(function ($user) { + $user = json_decode($user, true); + return Arr::get($user, 'user_info', json_encode($user)); + }) + ->toArray(); + $this->sendPresenceAdd($event, $channel, Arr::get(json_decode($channelData, true), 'user_info')); + } else { + $this->subscriptionRepository->subscribeToChannel($event->getConnectionId(), $channel); + $data = []; + } $this->sendMessage($event, $context, [ 'event' => 'subscription_succeeded', 'channel' => $channel, - 'data' => [], + 'data' => $data, ]); } @@ -138,6 +156,18 @@ protected function unsubscribe(WebsocketEvent $event, Context $context): void ]); } + public function sendPresenceDisconnectNotices(WebsocketEvent $event): void + { + $channels = $this->subscriptionRepository->getChannelsSubscribedToByConnectionId($event->getConnectionId()); + $channels->filter(function ($info) { + return Str::startsWith(Arr::get($info, 'channel'), 'presence-'); + })->each(function ($info) use ($event) { + $channel = Arr::get($info, 'channel'); + $userData = json_decode(Arr::get($info, 'userData'), true); + $this->sendPresenceRemove($event, $channel, Arr::get($userData, 'user_info')); + }); + } + public function broadcastToChannel(WebsocketEvent $event, Context $context): void { $skipConnectionId = $event->getConnectionId(); @@ -158,6 +188,34 @@ public function broadcastToChannel(WebsocketEvent $event, Context $context): voi ->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data)); } + public function sendPresenceAdd(WebsocketEvent $event, string $channel, array $data): void + { + $skipConnectionId = $event->getConnectionId(); + $eventBody = json_decode($event->getBody(), true); + $data = json_encode([ + 'event'=>'member_added', + 'channel'=>$channel, + 'data'=>$data + ]) ?: ''; + $this->subscriptionRepository->getConnectionIdsForChannel($channel) + ->reject(fn ($connectionId) => $connectionId === $skipConnectionId) + ->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data)); + } + + public function sendPresenceRemove(WebsocketEvent $event, string $channel, array $data): void + { + $skipConnectionId = $event->getConnectionId(); + $eventBody = json_decode($event->getBody(), true); + $data = json_encode([ + 'event'=>'member_removed', + 'channel'=>$channel, + 'data'=>$data + ]) ?: ''; + $this->subscriptionRepository->getConnectionIdsForChannel($channel) + ->reject(fn ($connectionId) => $connectionId === $skipConnectionId) + ->each(fn (string $connectionId) => $this->sendMessageToConnection($connectionId, $data)); + } + public function sendMessage(WebsocketEvent $event, Context $context, array $data): void { $this->connectionRepository->sendMessage($event->getConnectionId(), json_encode($data, JSON_THROW_ON_ERROR)); @@ -168,7 +226,7 @@ protected function sendMessageToConnection(string $connectionId, string $data): try { $this->connectionRepository->sendMessage($connectionId, $data); } catch (ApiGatewayManagementApiException $exception) { - if ($exception->getAwsErrorCode() === 'GoneException') { + if ($exception->getStatusCode() === Response::HTTP_GONE) { $this->subscriptionRepository->clearConnection($connectionId); return; } diff --git a/src/SubscriptionRepository.php b/src/SubscriptionRepository.php index df2e1e0..c90a659 100644 --- a/src/SubscriptionRepository.php +++ b/src/SubscriptionRepository.php @@ -41,6 +41,43 @@ public function getConnectionIdsForChannel(string ...$channels): Collection ->unique(); } + public function getUserListForPresenceChannel(string ...$channels): Collection + { + $promises = collect($channels)->map(fn ($channel) => $this->dynamoDb->queryAsync([ + 'TableName' => $this->table, + 'IndexName' => 'lookup-by-channel', + 'KeyConditionExpression' => 'channel = :channel', + 'ExpressionAttributeValues' => [ + ':channel' => ['S' => $channel], + ], + ]))->toArray(); + + $responses = Utils::all($promises)->wait(); + + return collect($responses) + ->flatmap(fn (\Aws\Result $result): array => $result['Items']) + ->map(fn (array $item): string => Arr::get($item, 'userData.S', '')) + ->unique(); + } + + public function getChannelsSubscribedToByConnectionId(string $connectionId): Collection + { + $response = $this->dynamoDb->query([ + 'TableName' => $this->table, + 'KeyConditionExpression' => 'connectionId = :connectionId', + 'ExpressionAttributeValues' => [ + ':connectionId' => ['S' => $connectionId], + ], + ]); + return collect(Arr::get($response, 'Items', [])) + ->transform(function ($item) { + return [ + 'channel'=>Arr::get($item, 'channel.S'), + 'userData'=>Arr::get($item, 'userData.S'), + ]; + }); + } + public function clearConnection(string $connectionId): void { $response = $this->dynamoDb->query([ @@ -86,4 +123,16 @@ public function unsubscribeFromChannel(string $connectionId, string $channel): v ], ]); } + + public function subscribeToPresenceChannel(string $connectionId, string $userData, string $channel): void + { + $this->dynamoDb->putItem([ + 'TableName' => $this->table, + 'Item' => [ + 'connectionId' => ['S' => $connectionId], + 'userData' => ['S' => $userData], + 'channel' => ['S' => $channel], + ], + ]); + } } diff --git a/tests/HandlerTest.php b/tests/HandlerTest.php index 896a221..ca4874d 100644 --- a/tests/HandlerTest.php +++ b/tests/HandlerTest.php @@ -12,6 +12,7 @@ use GuzzleHttp\Psr7\Response; use Mockery\Mock; use Psr\Http\Message\RequestInterface; +use Symfony\Component\HttpFoundation\Response as SymfonyResponse; it('can subscribe to open channels', function () { app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) { @@ -113,7 +114,85 @@ ], $context); }); -it('handles dropped connections', function () { +it('leaves presence channels', function () { + app()->instance(SubscriptionRepository::class, Mockery::mock(SubscriptionRepository::class, function ($mock) { + /** @var Mock $mock */ + $mock->shouldReceive('getChannelsSubscribedToByConnectionId')->withArgs(function (string $connectionId): bool { + return $connectionId === 'connection-id-1'; + })->once() + ->andReturn(collect([ + [ + 'channel'=>'presence-channel', + 'userData'=>json_encode(['user_info'=>['the user info']]), + ], + [ + 'channel'=>'other-channel', + ] + ])); + $mock->shouldReceive('getConnectionIdsForChannel')->withArgs(function (string $channel) { + return $channel === 'presence-channel'; + })->once() + ->andReturn(collect(['connection-id-1', 'connection-id-2'])); + $mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId) { + return $connectionId === 'connection-id-1'; + })->once(); + })); + + app()->instance(ConnectionRepository::class, Mockery::mock(ConnectionRepository::class, function ($mock) { + /** @var Mock $mock */ + $mock->shouldReceive('sendMessage')->withArgs(function (string $connectionId, string $data): bool { + return $connectionId === 'connection-id-2' and $data === '{"event":"member_removed","channel":"presence-channel","data":["the user info"]}'; + })->once(); + })); + + /** @var Handler $handler */ + $handler = app(Handler::class); + + $context = new Context('request-id-1', 50_000, 'function-arn', 'trace-id-1'); + + $handler->handle([ + 'requestContext' => [ + 'routeKey' => 'my-test-route-key', + 'eventType' => 'DISCONNECT', + 'connectionId' => 'connection-id-1', + 'domainName' => 'test-domain', + 'apiId' => 'api-id-1', + 'stage' => 'stage-test', + ], + 'body' => json_encode(['event' => 'disconnect']), + ], $context); +}); + +it('handles dropped connections with HTTP_GONE', function () { + $mock = new MockHandler(); + + $mock->append(function (CommandInterface $cmd, RequestInterface $req) { + $mock = Mockery::mock(SymfonyResponse::class, function ($mock) { + $mock->shouldReceive('getStatusCode') + ->andReturn(SymfonyResponse::HTTP_GONE); + }); + return new ApiGatewayManagementApiException('', $cmd, [ + 'response' => $mock + ]); + }); + + /** @var SubscriptionRepository */ + $subscriptionRepository = Mockery::mock(SubscriptionRepository::class, function ($mock) { + /** @var Mock $mock */ + $mock->shouldReceive('clearConnection')->withArgs(function (string $connectionId): bool { + return $connectionId === 'dropped-connection-id-1234'; + })->once(); + }); + + $config = config('laravel-echo-api-gateway'); + + /** @var ConnectionRepository */ + $connectionRepository = new ConnectionRepository($subscriptionRepository, array_merge_recursive(['connection' => ['handler' => $mock]], $config)); + + $connectionRepository->sendMessage('dropped-connection-id-1234', 'test-message'); +}); + +it('handles dropped connections with GoneException', function () { $mock = new MockHandler(); $mock->append(function (CommandInterface $cmd, RequestInterface $req) {