diff --git a/CHANGELOG.md b/CHANGELOG.md index 25e702b..cb60875 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## 3.5.7 + +- `PgSessionBase`: + - Optimize `_prepare` by passing the already captured `StackTrace` to `_sendAndWaitForQuery`. + +- Added `ResultStreamTrace`: + - A `ResultStream` that adds a `callerTrace` parameter to the `listen` method. + +- `_BoundStatement` now implements `ResultStreamTrace` rather than `ResultStream`. + +- `_PgResultStreamSubscription`: added an optional `callerTrace` parameter to its constructors. + +- `PreparedStatement`: + - Optimized `run` and `_closePendingPortals` by reducing `StackTrace` captures. + ## 3.5.6 - Accept `null` values as part of the binary List encodings. diff --git a/lib/postgres.dart b/lib/postgres.dart index 77ee763..c3087a9 100644 --- a/lib/postgres.dart +++ b/lib/postgres.dart @@ -5,6 +5,7 @@ import 'dart:io'; import 'package:collection/collection.dart'; import 'package:meta/meta.dart'; import 'package:postgres/src/v3/connection_info.dart'; +import 'package:stack_trace/stack_trace.dart'; import 'package:stream_channel/stream_channel.dart'; import 'src/replication.dart'; @@ -245,6 +246,18 @@ abstract class ResultStream implements Stream { }); } +/// A [ResultStream] that adds a `callerTrace` parameter to the `listen` method. +abstract class ResultStreamTrace implements ResultStream { + @override + ResultStreamSubscription listen( + void Function(ResultRow event)? onData, { + Function? onError, + void Function()? onDone, + bool? cancelOnError, + Trace? callerTrace, + }); +} + abstract class ResultStreamSubscription implements StreamSubscription { Future get affectedRows; diff --git a/lib/src/v3/connection.dart b/lib/src/v3/connection.dart index 3ea77fb..6e85479 100644 --- a/lib/src/v3/connection.dart +++ b/lib/src/v3/connection.dart @@ -83,8 +83,9 @@ abstract class _PgSessionBase implements Session { /// Sends a message to the server and waits for a response [T], gracefully /// handling error messages that might come in instead. - Future _sendAndWaitForQuery(ClientMessage send) { - final trace = StackTrace.current; + Future _sendAndWaitForQuery(ClientMessage send, + {StackTrace? stackTrace}) { + final trace = stackTrace ?? StackTrace.current; return _withResource(() { _connection._channel.sink @@ -184,7 +185,8 @@ abstract class _PgSessionBase implements Session { } Future<_PreparedStatement> _prepare(Object query) async { - final trace = Trace.current(); + final stackTrace = StackTrace.current; + final trace = Trace.from(stackTrace); final conn = _connection; final name = 's/${conn._statementCounter++}'; final description = InternalQueryDescription.wrap( @@ -192,11 +194,14 @@ abstract class _PgSessionBase implements Session { typeRegistry: _connection._settings.typeRegistry, ); - await _sendAndWaitForQuery(ParseMessage( - description.transformedSql, - statementName: name, - typeOids: description.parameterTypes?.map((e) => e?.oid).toList(), - )); + await _sendAndWaitForQuery( + ParseMessage( + description.transformedSql, + statementName: name, + typeOids: description.parameterTypes?.map((e) => e?.oid).toList(), + ), + stackTrace: stackTrace, + ); return _PreparedStatement(description, name, this, trace); } @@ -661,7 +666,7 @@ class _PreparedStatement extends Statement { _PreparedStatement(this._description, this._name, this._session, this._trace); @override - ResultStream bind(Object? parameters) { + ResultStreamTrace bind(Object? parameters) { return _BoundStatement( this, _description.bindParameters( @@ -675,10 +680,12 @@ class _PreparedStatement extends Statement { Object? parameters, { Duration? timeout, }) async { + final stackTrace = StackTrace.current; + final trace = Trace.from(stackTrace); _session._connection._queryCount++; timeout ??= _session._settings.queryTimeout; final items = []; - final subscription = bind(parameters).listen(items.add); + final subscription = bind(parameters).listen(items.add, callerTrace: trace); try { return await (subscription as _PgResultStreamSubscription)._waitForResult( items: items, @@ -686,7 +693,7 @@ class _PreparedStatement extends Statement { ); } finally { await subscription.cancel(); - await _closePendingPortals(); + await _closePendingPortals(stackTrace: stackTrace); } } @@ -705,17 +712,18 @@ class _PreparedStatement extends Statement { _portalsToClose!.add(portalName); } - Future _closePendingPortals() async { + Future _closePendingPortals({StackTrace? stackTrace}) async { final list = _portalsToClose; while (list != null && list.isNotEmpty) { final portalName = list.removeFirst(); await _session._sendAndWaitForQuery( - CloseMessage.portal(portalName)); + CloseMessage.portal(portalName), + stackTrace: stackTrace); } } } -class _BoundStatement extends Stream implements ResultStream { +class _BoundStatement extends Stream implements ResultStreamTrace { final _PreparedStatement statement; final List parameters; @@ -723,13 +731,17 @@ class _BoundStatement extends Stream implements ResultStream { @override ResultStreamSubscription listen(void Function(ResultRow event)? onData, - {Function? onError, void Function()? onDone, bool? cancelOnError}) { + {Function? onError, + void Function()? onDone, + bool? cancelOnError, + Trace? callerTrace}) { final controller = StreamController(); // ignore: cancel_subscriptions final subscription = controller.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); - return _PgResultStreamSubscription(this, controller, subscription); + return _PgResultStreamSubscription(this, controller, subscription, + callerTrace: callerTrace); } } @@ -756,12 +768,13 @@ class _PgResultStreamSubscription final Trace _callerTrace; _PgResultStreamSubscription( - _BoundStatement statement, this._controller, this._source) + _BoundStatement statement, this._controller, this._source, + {Trace? callerTrace}) : session = statement.statement._session, ignoreRows = false, _boundStatement = statement, _parentTrace = statement.statement._trace, - _callerTrace = Trace.current() { + _callerTrace = callerTrace ?? Trace.current() { _scheduleStatement(() async { connection._pending = this; @@ -798,9 +811,10 @@ class _PgResultStreamSubscription this._controller, this._source, this.ignoreRows, { + Trace? callerTrace, void Function()? cleanup, }) : _parentTrace = null, - _callerTrace = Trace.current() { + _callerTrace = callerTrace ?? Trace.current() { _scheduleStatement(() async { connection._pending = this; diff --git a/pubspec.yaml b/pubspec.yaml index 8559026..c7a91e1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: postgres description: PostgreSQL database driver. Supports binary protocol, connection pooling and statement reuse. -version: 3.5.6 +version: 3.5.7 homepage: https://github.com/isoos/postgresql-dart topics: - sql