diff --git a/lib/postgresql.dart b/lib/postgresql.dart index 333f1ac..3951aca 100644 --- a/lib/postgresql.dart +++ b/lib/postgresql.dart @@ -103,6 +103,9 @@ abstract class Connection { /// This will never throw an exception. void close(); + /// Listen to messages send via NOTIFY to [channel]. + Stream listen(String channel); + /// The server can send errors and notices, or the network can cause errors /// while the connection is not being used to make a query. These can be /// caught by listening to the messages stream. See [ClientMessage] and diff --git a/lib/src/mock/mock.dart b/lib/src/mock/mock.dart index 81dd313..c3c6e11 100644 --- a/lib/src/mock/mock.dart +++ b/lib/src/mock/mock.dart @@ -138,6 +138,10 @@ class MockConnection implements pg.Connection { Future runInTransaction(Future operation(), [pg.Isolation isolation]) => throw new UnimplementedError(); + @override + Stream listen(String channel) { + throw new UnimplementedError(); + } } diff --git a/lib/src/pool_impl.dart b/lib/src/pool_impl.dart index 1b2dcbc..8f89cb0 100644 --- a/lib/src/pool_impl.dart +++ b/lib/src/pool_impl.dart @@ -84,6 +84,11 @@ class ConnectionDecorator implements pg.Connection { @override String toString() => "$_pconn"; + + @override + Stream listen(String channel) { + throw new UnsupportedError('LISTEN isn\'t supported for pool connections'); + } } diff --git a/lib/src/postgresql_impl/connection.dart b/lib/src/postgresql_impl/connection.dart index 3799a5a..eec6f29 100644 --- a/lib/src/postgresql_impl/connection.dart +++ b/lib/src/postgresql_impl/connection.dart @@ -37,6 +37,7 @@ class ConnectionImpl implements Connection { bool _hasConnected = false; final Completer _connected = new Completer(); final Queue<_Query> _sendQueryQueue = new Queue<_Query>(); + final Map> _channelControllers = new Map>(); _Query _query; int _msgType; int _msgLength; @@ -399,6 +400,8 @@ class ConnectionImpl implements Connection { case _MSG_EMPTY_QUERY_REPONSE: assert(length == 0); break; case _MSG_COMMAND_COMPLETE: _readCommandComplete(msgType, length); break; + case _MSG_NOTIFICATION_RESPONSE: _readNotification(msgType, length); break; + default: throw new PostgresqlException('Unknown, or unimplemented message: ' '${UTF8.decode([msgType])}.', _getDebugName()); @@ -408,6 +411,20 @@ class ConnectionImpl implements Connection { throw new PostgresqlException('Lost message sync.', _getDebugName()); } + void _readNotification(int msgType, int length) { + assert(_buffer.bytesAvailable >= length); + + var pid = _buffer.readInt32(); // Unused for now + var channel = _buffer.readUtf8String(length); + var payload = _buffer.readUtf8String(length); + + var controller = _channelControllers[channel]; + + if (controller != null) { + controller.add(payload); + } + } + void _readErrorOrNoticeResponse(int msgType, int length) { assert(_buffer.bytesAvailable >= length); @@ -687,4 +704,24 @@ class ConnectionImpl implements Connection { new Future(() => _messages.close()); } + @override + Stream listen(String channel) { + var controller = _channelControllers[channel]; + + if (controller == null) { + controller = _channelControllers[channel] = new StreamController( + onListen: () async { + await execute('LISTEN $channel'); + }, + onCancel: () async { + if (state != ConnectionState.closed) { + // Only cleanup if the user hasn't already closed the connection + await execute('UNLISTEN $channel'); + } + _channelControllers.remove(channel); + }); + } + + return controller.stream; + } }