From 9a5e6110127644b4ab3489fb2f3bf13d34a20432 Mon Sep 17 00:00:00 2001 From: Warren James Date: Mon, 8 Jul 2024 12:34:44 -0400 Subject: [PATCH] refactor(NODE-6230): executeOperation to use iterative retry mechanism (#4170) --- src/operations/command.ts | 2 +- src/operations/execute_operation.ts | 260 ++++++++++++++-------------- src/operations/operation.ts | 4 +- 3 files changed, 135 insertions(+), 131 deletions(-) diff --git a/src/operations/command.ts b/src/operations/command.ts index 033ec8aa94..94ccc6ceaf 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -104,7 +104,7 @@ export abstract class CommandOperation extends AbstractOperation { if (this.hasAspect(Aspect.EXPLAINABLE)) { return this.explain == null; } - return true; + return super.canRetryWrite; } public async executeCommand( diff --git a/src/operations/execute_operation.ts b/src/operations/execute_operation.ts index 829ea2ce6e..317c970e83 100644 --- a/src/operations/execute_operation.ts +++ b/src/operations/execute_operation.ts @@ -24,7 +24,7 @@ import { } from '../sdam/server_selection'; import type { Topology } from '../sdam/topology'; import type { ClientSession } from '../sessions'; -import { squashError, supportsRetryableWrites } from '../utils'; +import { supportsRetryableWrites } from '../utils'; import { AbstractOperation, Aspect } from './operation'; const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation; @@ -45,10 +45,9 @@ type ResultTypeFromOperation = TOperation extends AbstractOperation< * not provided. * * The expectation is that this function: - * - Connects the MongoClient if it has not already been connected + * - Connects the MongoClient if it has not already been connected, see {@link autoConnect} * - Creates a session if none is provided and cleans up the session it creates - * - Selects a server based on readPreference or various factors - * - Retries an operation if it fails for certain errors, see {@link retryOperation} + * - Tries an operation and retries under certain conditions, see {@link tryOperation} * * @typeParam T - The operation's type * @typeParam TResult - The type of the operation's result, calculated from T @@ -65,23 +64,7 @@ export async function executeOperation< throw new MongoRuntimeError('This method requires a valid operation instance'); } - if (client.topology == null) { - // Auto connect on operation - if (client.s.hasBeenClosed) { - throw new MongoNotConnectedError('Client must be connected before running operations'); - } - client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true; - try { - await client.connect(); - } finally { - delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')]; - } - } - - const { topology } = client; - if (topology == null) { - throw new MongoRuntimeError('client.connect did not create a topology but also did not throw'); - } + const topology = await autoConnect(client); // The driver sessions spec mandates that we implicitly create sessions for operations // that are not explicitly provided with a session. @@ -108,7 +91,6 @@ export async function executeOperation< const inTransaction = !!session?.inTransaction(); const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION); - const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION); if ( inTransaction && @@ -124,6 +106,73 @@ export async function executeOperation< session.unpin(); } + try { + return await tryOperation(operation, { + topology, + session, + readPreference + }); + } finally { + if (session?.owner != null && session.owner === owner) { + await session.endSession(); + } + } +} + +/** + * Connects a client if it has not yet been connected + * @internal + */ +async function autoConnect(client: MongoClient): Promise { + if (client.topology == null) { + if (client.s.hasBeenClosed) { + throw new MongoNotConnectedError('Client must be connected before running operations'); + } + client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true; + try { + await client.connect(); + if (client.topology == null) { + throw new MongoRuntimeError( + 'client.connect did not create a topology but also did not throw' + ); + } + return client.topology; + } finally { + delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')]; + } + } + return client.topology; +} + +/** @internal */ +type RetryOptions = { + session: ClientSession | undefined; + readPreference: ReadPreference; + topology: Topology; +}; + +/** + * Executes an operation and retries as appropriate + * @internal + * + * @remarks + * Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable + * Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification + * + * This function: + * - performs initial server selection + * - attempts to execute an operation + * - retries the operation if it meets the criteria for a retryable read or a retryable write + * + * @typeParam T - The operation's type + * @typeParam TResult - The type of the operation's result, calculated from T + * + * @param operation - The operation to execute + * */ +async function tryOperation< + T extends AbstractOperation, + TResult = ResultTypeFromOperation +>(operation: T, { topology, session, readPreference }: RetryOptions): Promise { let selector: ReadPreference | ServerSelector; if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) { @@ -139,30 +188,14 @@ export async function executeOperation< selector = readPreference; } - const server = await topology.selectServer(selector, { + let server = await topology.selectServer(selector, { session, operationName: operation.commandName }); - if (session == null) { - // No session also means it is not retryable, early exit - return await operation.execute(server, undefined); - } - - if (!operation.hasAspect(Aspect.RETRYABLE)) { - // non-retryable operation, early exit - try { - return await operation.execute(server, session); - } finally { - if (session?.owner != null && session.owner === owner) { - try { - await session.endSession(); - } catch (error) { - squashError(error); - } - } - } - } + const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION); + const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION); + const inTransaction = session?.inTransaction() ?? false; const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead; @@ -172,105 +205,76 @@ export async function executeOperation< supportsRetryableWrites(server) && operation.canRetryWrite; - const willRetry = (hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite); + const willRetry = + operation.hasAspect(Aspect.RETRYABLE) && + session != null && + ((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite)); - if (hasWriteAspect && willRetryWrite) { + if (hasWriteAspect && willRetryWrite && session != null) { operation.options.willRetryWrite = true; session.incrementTransactionNumber(); } - try { - return await operation.execute(server, session); - } catch (operationError) { - if (willRetry && operationError instanceof MongoError) { - return await retryOperation(operation, operationError, { - session, - topology, - selector, - previousServer: server.description - }); - } - throw operationError; - } finally { - if (session?.owner != null && session.owner === owner) { - try { - await session.endSession(); - } catch (error) { - squashError(error); - } - } - } -} + // TODO(NODE-6231): implement infinite retry within CSOT timeout here + const maxTries = willRetry ? 2 : 1; + let previousOperationError: MongoError | undefined; + let previousServer: ServerDescription | undefined; -/** @internal */ -type RetryOptions = { - session: ClientSession; - topology: Topology; - selector: ReadPreference | ServerSelector; - previousServer: ServerDescription; -}; + // TODO(NODE-6231): implement infinite retry within CSOT timeout here + for (let tries = 0; tries < maxTries; tries++) { + if (previousOperationError) { + if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { + throw new MongoServerError({ + message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, + errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, + originalError: previousOperationError + }); + } -async function retryOperation< - T extends AbstractOperation, - TResult = ResultTypeFromOperation ->( - operation: T, - originalError: MongoError, - { session, topology, selector, previousServer }: RetryOptions -): Promise { - const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION); - const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION); + if (hasWriteAspect && !isRetryableWriteError(previousOperationError)) + throw previousOperationError; - if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) { - throw new MongoServerError({ - message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, - errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE, - originalError - }); - } + if (hasReadAspect && !isRetryableReadError(previousOperationError)) + throw previousOperationError; - if (isWriteOperation && !isRetryableWriteError(originalError)) { - throw originalError; - } - - if (isReadOperation && !isRetryableReadError(originalError)) { - throw originalError; - } + if ( + previousOperationError instanceof MongoNetworkError && + operation.hasAspect(Aspect.CURSOR_CREATING) && + session != null && + session.isPinned && + !session.inTransaction() + ) { + session.unpin({ force: true, forceClear: true }); + } - if ( - originalError instanceof MongoNetworkError && - session.isPinned && - !session.inTransaction() && - operation.hasAspect(Aspect.CURSOR_CREATING) - ) { - // If we have a cursor and the initial command fails with a network error, - // we can retry it on another connection. So we need to check it back in, clear the - // pool for the service id, and retry again. - session.unpin({ force: true, forceClear: true }); - } + server = await topology.selectServer(selector, { + session, + operationName: operation.commandName, + previousServer + }); - // select a new server, and attempt to retry the operation - const server = await topology.selectServer(selector, { - session, - operationName: operation.commandName, - previousServer - }); + if (hasWriteAspect && !supportsRetryableWrites(server)) { + throw new MongoUnexpectedServerResponseError( + 'Selected server does not support retryable writes' + ); + } + } - if (isWriteOperation && !supportsRetryableWrites(server)) { - throw new MongoUnexpectedServerResponseError( - 'Selected server does not support retryable writes' - ); - } + try { + return await operation.execute(server, session); + } catch (operationError) { + if (!(operationError instanceof MongoError)) throw operationError; - try { - return await operation.execute(server, session); - } catch (retryError) { - if ( - retryError instanceof MongoError && - retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed) - ) { - throw originalError; + if ( + previousOperationError != null && + operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed) + ) { + throw previousOperationError; + } + previousServer = server.description; + previousOperationError = operationError; } - throw retryError; } + + throw previousOperationError; } diff --git a/src/operations/operation.ts b/src/operations/operation.ts index e71baa44a9..4bedc502a7 100644 --- a/src/operations/operation.ts +++ b/src/operations/operation.ts @@ -102,11 +102,11 @@ export abstract class AbstractOperation { } get canRetryRead(): boolean { - return true; + return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.READ_OPERATION); } get canRetryWrite(): boolean { - return true; + return this.hasAspect(Aspect.RETRYABLE) && this.hasAspect(Aspect.WRITE_OPERATION); } }