Skip to content

Implement LISTEN support #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/postgresql.dart
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ abstract class Connection {
/// This will never throw an exception.
void close();

/// Listen to messages send via NOTIFY to [channel].
Stream<String> listen(String channel);

/// The server can send errors and notices, or the network can cause errors
/// while the connection is not being used to make a query. These can be
/// caught by listening to the messages stream. See [ClientMessage] and
Expand Down
4 changes: 4 additions & 0 deletions lib/src/mock/mock.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ class MockConnection implements pg.Connection {
Future runInTransaction(Future operation(), [pg.Isolation isolation])
=> throw new UnimplementedError();

@override
Stream<String> listen(String channel) {
throw new UnimplementedError();
}
}


Expand Down
5 changes: 5 additions & 0 deletions lib/src/pool_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class ConnectionDecorator implements pg.Connection {

@override
String toString() => "$_pconn";

@override
Stream<String> listen(String channel) {
throw new UnsupportedError('LISTEN isn\'t supported for pool connections');
}
}


Expand Down
37 changes: 37 additions & 0 deletions lib/src/postgresql_impl/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ConnectionImpl implements Connection {
bool _hasConnected = false;
final Completer _connected = new Completer();
final Queue<_Query> _sendQueryQueue = new Queue<_Query>();
final Map<String, StreamController<String>> _channelControllers = new Map<String, StreamController<String>>();
_Query _query;
int _msgType;
int _msgLength;
Expand Down Expand Up @@ -399,6 +400,8 @@ class ConnectionImpl implements Connection {
case _MSG_EMPTY_QUERY_REPONSE: assert(length == 0); break;
case _MSG_COMMAND_COMPLETE: _readCommandComplete(msgType, length); break;

case _MSG_NOTIFICATION_RESPONSE: _readNotification(msgType, length); break;

default:
throw new PostgresqlException('Unknown, or unimplemented message: '
'${UTF8.decode([msgType])}.', _getDebugName());
Expand All @@ -408,6 +411,20 @@ class ConnectionImpl implements Connection {
throw new PostgresqlException('Lost message sync.', _getDebugName());
}

void _readNotification(int msgType, int length) {
assert(_buffer.bytesAvailable >= length);

var pid = _buffer.readInt32(); // Unused for now
var channel = _buffer.readUtf8String(length);
var payload = _buffer.readUtf8String(length);

var controller = _channelControllers[channel];

if (controller != null) {
controller.add(payload);
}
}

void _readErrorOrNoticeResponse(int msgType, int length) {
assert(_buffer.bytesAvailable >= length);

Expand Down Expand Up @@ -687,4 +704,24 @@ class ConnectionImpl implements Connection {
new Future(() => _messages.close());
}

@override
Stream<String> listen(String channel) {
var controller = _channelControllers[channel];

if (controller == null) {
controller = _channelControllers[channel] = new StreamController<String>(
onListen: () async {
await execute('LISTEN $channel');
},
onCancel: () async {
if (state != ConnectionState.closed) {
// Only cleanup if the user hasn't already closed the connection
await execute('UNLISTEN $channel');
}
_channelControllers.remove(channel);
});
}

return controller.stream;
}
}