Skip to content

v3.5.7 - Optimize StackTrace capture #432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## 3.5.7

- `PgSessionBase`:
- Optimize `_prepare` by passing the already captured `StackTrace` to `_sendAndWaitForQuery`.

- Added `ResultStreamTrace`:
- A `ResultStream` that adds a `callerTrace` parameter to the `listen` method.

- `_BoundStatement` now implements `ResultStreamTrace` rather than `ResultStream`.

- `_PgResultStreamSubscription`: added an optional `callerTrace` parameter to its constructors.

- `PreparedStatement`:
- Optimized `run` and `_closePendingPortals` by reducing `StackTrace` captures.

## 3.5.6

- Accept `null` values as part of the binary List encodings.
Expand Down
13 changes: 13 additions & 0 deletions lib/postgres.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'dart:io';
import 'package:collection/collection.dart';
import 'package:meta/meta.dart';
import 'package:postgres/src/v3/connection_info.dart';
import 'package:stack_trace/stack_trace.dart';
import 'package:stream_channel/stream_channel.dart';

import 'src/replication.dart';
Expand Down Expand Up @@ -245,6 +246,18 @@ abstract class ResultStream implements Stream<ResultRow> {
});
}

/// A [ResultStream] that adds a `callerTrace` parameter to the `listen` method.
abstract class ResultStreamTrace implements ResultStream {
@override
ResultStreamSubscription listen(
void Function(ResultRow event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
Trace? callerTrace,
});
}

abstract class ResultStreamSubscription
implements StreamSubscription<ResultRow> {
Future<int> get affectedRows;
Expand Down
52 changes: 33 additions & 19 deletions lib/src/v3/connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ abstract class _PgSessionBase implements Session {

/// Sends a message to the server and waits for a response [T], gracefully
/// handling error messages that might come in instead.
Future<T> _sendAndWaitForQuery<T extends ServerMessage>(ClientMessage send) {
final trace = StackTrace.current;
Future<T> _sendAndWaitForQuery<T extends ServerMessage>(ClientMessage send,
{StackTrace? stackTrace}) {
final trace = stackTrace ?? StackTrace.current;

return _withResource(() {
_connection._channel.sink
Expand Down Expand Up @@ -184,19 +185,23 @@ abstract class _PgSessionBase implements Session {
}

Future<_PreparedStatement> _prepare(Object query) async {
final trace = Trace.current();
final stackTrace = StackTrace.current;
final trace = Trace.from(stackTrace);
final conn = _connection;
final name = 's/${conn._statementCounter++}';
final description = InternalQueryDescription.wrap(
query,
typeRegistry: _connection._settings.typeRegistry,
);

await _sendAndWaitForQuery<ParseCompleteMessage>(ParseMessage(
description.transformedSql,
statementName: name,
typeOids: description.parameterTypes?.map((e) => e?.oid).toList(),
));
await _sendAndWaitForQuery<ParseCompleteMessage>(
ParseMessage(
description.transformedSql,
statementName: name,
typeOids: description.parameterTypes?.map((e) => e?.oid).toList(),
),
stackTrace: stackTrace,
);

return _PreparedStatement(description, name, this, trace);
}
Expand Down Expand Up @@ -661,7 +666,7 @@ class _PreparedStatement extends Statement {
_PreparedStatement(this._description, this._name, this._session, this._trace);

@override
ResultStream bind(Object? parameters) {
ResultStreamTrace bind(Object? parameters) {
return _BoundStatement(
this,
_description.bindParameters(
Expand All @@ -675,18 +680,20 @@ class _PreparedStatement extends Statement {
Object? parameters, {
Duration? timeout,
}) async {
final stackTrace = StackTrace.current;
final trace = Trace.from(stackTrace);
_session._connection._queryCount++;
timeout ??= _session._settings.queryTimeout;
final items = <ResultRow>[];
final subscription = bind(parameters).listen(items.add);
final subscription = bind(parameters).listen(items.add, callerTrace: trace);
try {
return await (subscription as _PgResultStreamSubscription)._waitForResult(
items: items,
timeout: timeout,
);
} finally {
await subscription.cancel();
await _closePendingPortals();
await _closePendingPortals(stackTrace: stackTrace);
}
}

Expand All @@ -705,31 +712,36 @@ class _PreparedStatement extends Statement {
_portalsToClose!.add(portalName);
}

Future<void> _closePendingPortals() async {
Future<void> _closePendingPortals({StackTrace? stackTrace}) async {
final list = _portalsToClose;
while (list != null && list.isNotEmpty) {
final portalName = list.removeFirst();
await _session._sendAndWaitForQuery<CloseCompleteMessage>(
CloseMessage.portal(portalName));
CloseMessage.portal(portalName),
stackTrace: stackTrace);
}
}
}

class _BoundStatement extends Stream<ResultRow> implements ResultStream {
class _BoundStatement extends Stream<ResultRow> implements ResultStreamTrace {
final _PreparedStatement statement;
final List<TypedValue> parameters;

_BoundStatement(this.statement, this.parameters);

@override
ResultStreamSubscription listen(void Function(ResultRow event)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
{Function? onError,
void Function()? onDone,
bool? cancelOnError,
Trace? callerTrace}) {
final controller = StreamController<ResultRow>();

// ignore: cancel_subscriptions
final subscription = controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
return _PgResultStreamSubscription(this, controller, subscription);
return _PgResultStreamSubscription(this, controller, subscription,
callerTrace: callerTrace);
}
}

Expand All @@ -756,12 +768,13 @@ class _PgResultStreamSubscription
final Trace _callerTrace;

_PgResultStreamSubscription(
_BoundStatement statement, this._controller, this._source)
_BoundStatement statement, this._controller, this._source,
{Trace? callerTrace})
: session = statement.statement._session,
ignoreRows = false,
_boundStatement = statement,
_parentTrace = statement.statement._trace,
_callerTrace = Trace.current() {
_callerTrace = callerTrace ?? Trace.current() {
_scheduleStatement(() async {
connection._pending = this;

Expand Down Expand Up @@ -798,9 +811,10 @@ class _PgResultStreamSubscription
this._controller,
this._source,
this.ignoreRows, {
Trace? callerTrace,
void Function()? cleanup,
}) : _parentTrace = null,
_callerTrace = Trace.current() {
_callerTrace = callerTrace ?? Trace.current() {
_scheduleStatement(() async {
connection._pending = this;

Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: postgres
description: PostgreSQL database driver. Supports binary protocol, connection pooling and statement reuse.
version: 3.5.6
version: 3.5.7
homepage: https://github.com/isoos/postgresql-dart
topics:
- sql
Expand Down