From b93de395c06d8affe17b2a1fa0da15261a57cabd Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 15 Mar 2024 16:17:28 -0700 Subject: [PATCH 01/11] Minor fixes and comments --- .../aws/eventstreamrpc/EventStreamClient.h | 13 +++++-- eventstream_rpc/source/EventStreamClient.cpp | 38 ++++++++++++------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h index 34240a7ee..305a0512d 100644 --- a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h +++ b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h @@ -117,7 +117,7 @@ namespace Aws { public: MessageAmendment(const MessageAmendment &lhs); - MessageAmendment(MessageAmendment &&rhs); + MessageAmendment(MessageAmendment &&rhs) noexcept; MessageAmendment &operator=(const MessageAmendment &lhs); ~MessageAmendment() noexcept; explicit MessageAmendment(Crt::Allocator *allocator = Crt::g_allocator) noexcept; @@ -292,7 +292,7 @@ namespace Aws * the TERMINATE_STREAM flag, or when the connection shuts down. */ virtual void OnContinuationClosed() = 0; - virtual ~ClientContinuationHandler() noexcept; + virtual ~ClientContinuationHandler() noexcept = default; private: friend class ClientContinuation; @@ -307,6 +307,13 @@ namespace Aws ClientContinuationHandler &continuationHandler, Crt::Allocator *allocator) noexcept; ~ClientContinuation() noexcept; + + ClientContinuation(const ClientContinuation &other) = default; + ClientContinuation(ClientContinuation &&other) noexcept = default; + + ClientContinuation &operator=(const ClientContinuation &other) = delete; + ClientContinuation &operator=(ClientContinuation &&other) noexcept = delete; + std::future Activate( const Crt::String &operation, const Crt::List &headers, @@ -354,7 +361,7 @@ namespace Aws class AWS_EVENTSTREAMRPC_API OperationError : public AbstractShapeBase { public: - explicit OperationError() noexcept; + explicit OperationError() noexcept = default; static void s_customDeleter(OperationError *shape) noexcept; virtual void SerializeToJsonObject(Crt::JsonObject &payloadObject) const override; virtual Crt::Optional GetMessage() noexcept = 0; diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index ad5a9159b..251dadf4e 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -79,6 +79,11 @@ namespace Aws MessageAmendment &MessageAmendment::operator=(const MessageAmendment &lhs) { + if (this == &lhs) + { + return *this; + } + m_headers = lhs.m_headers; if (lhs.m_payload.has_value()) { @@ -90,8 +95,8 @@ namespace Aws return *this; } - MessageAmendment::MessageAmendment(MessageAmendment &&rhs) - : m_headers(std::move(rhs.m_headers)), m_payload(rhs.m_payload), m_allocator(rhs.m_allocator) + MessageAmendment::MessageAmendment(MessageAmendment &&rhs) noexcept + : m_headers(std::move(rhs.m_headers)), m_payload(std::move(rhs.m_payload)), m_allocator(rhs.m_allocator) { rhs.m_allocator = nullptr; rhs.m_payload = Crt::Optional(); @@ -149,6 +154,15 @@ namespace Aws } }; + // FIXME This assignment operator is completely broken. + // 1. rhs' internal state can be changed during copying members one by one, which can lead to this being + // inconsistent/corrupted. + // 2. if rhs is in the CONNECTED state, then a pointer to it is passed to the underlying libraries and is used + // in callbacks. + // 3. If this is connected, what will happen to its state? + // + // Option 1 (preferable): This operator should be marked as deleted. It'll be a BREAKING CHANGE. + // Option 2: As an ugly alternative, throw runtime error if `Connect` method was called on rhs or this. ClientConnection &ClientConnection::operator=(ClientConnection &&rhs) noexcept { m_allocator = std::move(rhs.m_allocator); @@ -174,6 +188,7 @@ namespace Aws return *this; } + // FIXME Mark as deleted. See comment to the assignment operator. ClientConnection::ClientConnection(ClientConnection &&rhs) noexcept : m_lifecycleHandler(rhs.m_lifecycleHandler) { *this = std::move(rhs); @@ -182,6 +197,7 @@ namespace Aws ClientConnection::ClientConnection(Crt::Allocator *allocator) noexcept : m_allocator(allocator), m_underlyingConnection(nullptr), m_clientState(DISCONNECTED), m_lifecycleHandler(nullptr), m_connectMessageAmender(nullptr), m_connectionWillSetup(false), + m_onConnectCalled(false), m_closeReason{EVENT_STREAM_RPC_UNINITIALIZED, 0}, m_onConnectRequestCallback(nullptr) { } @@ -793,8 +809,7 @@ namespace Aws for (size_t i = 0; i < messageArgs->headers_count; ++i) { - pingHeaders.emplace_back( - EventStreamHeader(messageArgs->headers[i], thisConnection->m_allocator)); + pingHeaders.emplace_back(messageArgs->headers[i], thisConnection->m_allocator); } if (messageArgs->payload) @@ -886,8 +901,6 @@ namespace Aws } } - ClientContinuationHandler::~ClientContinuationHandler() noexcept {} - void ClientContinuation::s_onContinuationMessage( struct aws_event_stream_rpc_client_continuation_token *continuationToken, const struct aws_event_stream_rpc_message_args *messageArgs, @@ -901,6 +914,8 @@ namespace Aws Crt::List continuationMessageHeaders; for (size_t i = 0; i < messageArgs->headers_count; ++i) { + // FIXME Considering that below we check if thisContinuation is alive, this line looks super suspicious. + // Keep allocator in callbackData? continuationMessageHeaders.emplace_back( EventStreamHeader(messageArgs->headers[i], thisContinuation->m_allocator)); } @@ -1101,8 +1116,6 @@ namespace Aws { } - OperationError::OperationError() noexcept {} - void OperationError::SerializeToJsonObject(Crt::JsonObject &payloadObject) const { (void)payloadObject; } AbstractShapeBase::AbstractShapeBase() noexcept : m_allocator(nullptr) {} @@ -1115,7 +1128,7 @@ namespace Aws const OperationModelContext &operationModelContext, Crt::Allocator *allocator) noexcept : m_operationModelContext(operationModelContext), m_asyncLaunchMode(std::launch::deferred), - m_messageCount(0), m_allocator(allocator), m_streamHandler(streamHandler), + m_messageCount(0), m_allocator(allocator), m_streamHandler(std::move(streamHandler)), m_clientContinuation(connection.NewStream(*this)), m_expectedCloses(0), m_streamClosedCalled(false) { } @@ -1354,7 +1367,7 @@ namespace Aws return true; } - void StreamResponseHandler::OnStreamEvent(Crt::ScopedResource response) {} + void StreamResponseHandler::OnStreamEvent(Crt::ScopedResource /* response */) {} void StreamResponseHandler::OnStreamClosed() {} @@ -1485,10 +1498,9 @@ namespace Aws } Crt::List headers; - headers.emplace_back(EventStreamHeader( - Crt::String(CONTENT_TYPE_HEADER), Crt::String(CONTENT_TYPE_APPLICATION_JSON), m_allocator)); headers.emplace_back( - EventStreamHeader(Crt::String(SERVICE_MODEL_TYPE_HEADER), GetModelName(), m_allocator)); + Crt::String(CONTENT_TYPE_HEADER), Crt::String(CONTENT_TYPE_APPLICATION_JSON), m_allocator); + headers.emplace_back(Crt::String(SERVICE_MODEL_TYPE_HEADER), GetModelName(), m_allocator); Crt::JsonObject payloadObject; shape->SerializeToJsonObject(payloadObject); Crt::String payloadString = payloadObject.View().WriteCompact(); From c4be52afb3145c455ba3872cdaac83407e9ab295 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Fri, 15 Mar 2024 16:18:13 -0700 Subject: [PATCH 02/11] Fix UB in ClientConnection dtor --- eventstream_rpc/source/EventStreamClient.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index 251dadf4e..30f890388 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -205,21 +205,21 @@ namespace Aws ClientConnection::~ClientConnection() noexcept { m_stateMutex.lock(); - if (m_connectionWillSetup) + auto connectionWillSetup = m_connectionWillSetup; + m_stateMutex.unlock(); + if (connectionWillSetup) { - m_stateMutex.unlock(); m_connectionSetupPromise.get_future().wait(); } + m_stateMutex.lock(); - if (m_clientState != DISCONNECTED) + auto clientState = m_clientState; + m_stateMutex.unlock(); + if (clientState != DISCONNECTED) { Close(); - m_stateMutex.unlock(); m_closedPromise.get_future().wait(); } - /* Cover the case in which the if statements are not hit. */ - m_stateMutex.unlock(); - m_stateMutex.unlock(); m_underlyingConnection = nullptr; } From cb5a69b0fd853b824f1596f3f1bea19da5b5e148 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 19 Mar 2024 13:30:19 -0700 Subject: [PATCH 03/11] Fix codestyle --- .../aws/eventstreamrpc/EventStreamClient.h | 23 ++++---- eventstream_rpc/source/EventStreamClient.cpp | 56 +++++++++---------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h index 305a0512d..295a17558 100644 --- a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h +++ b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h @@ -47,7 +47,7 @@ namespace Aws /** * A callback prototype that is called upon flushing a message over the wire. - * @param errorCode A non-zero value if an error occured while attempting to flush the message. + * @param errorCode A non-zero value if an error occurred while attempting to flush the message. */ using OnMessageFlushCallback = std::function; @@ -162,10 +162,10 @@ namespace Aws OnMessageFlushCallback GetConnectRequestCallback() const noexcept { return m_connectRequestCallback; } ConnectMessageAmender GetConnectMessageAmender() const noexcept { - return [&](void) -> const MessageAmendment & { return m_connectAmendment; }; + return [&]() -> const MessageAmendment & { return m_connectAmendment; }; } - void SetHostName(Crt::String hostName) noexcept { m_hostName = hostName; } + void SetHostName(Crt::String hostName) noexcept { m_hostName = std::move(hostName); } void SetPort(uint32_t port) noexcept { m_port = port; } void SetSocketOptions(const Crt::Io::SocketOptions &socketOptions) noexcept { @@ -185,7 +185,7 @@ namespace Aws } void SetConnectRequestCallback(OnMessageFlushCallback connectRequestCallback) noexcept { - m_connectRequestCallback = connectRequestCallback; + m_connectRequestCallback = std::move(connectRequestCallback); } protected: @@ -226,6 +226,7 @@ namespace Aws class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler { public: + virtual ~ConnectionLifecycleHandler() noexcept = default; /** * This callback is only invoked upon receiving a CONNECT_ACK with the * CONNECTION_ACCEPTED flag set by the server. Therefore, once this callback @@ -349,7 +350,7 @@ namespace Aws { public: AbstractShapeBase() noexcept; - virtual ~AbstractShapeBase() noexcept; + virtual ~AbstractShapeBase() noexcept = default; static void s_customDeleter(AbstractShapeBase *shape) noexcept; virtual void SerializeToJsonObject(Crt::JsonObject &payloadObject) const = 0; virtual Crt::String GetModelName() const noexcept = 0; @@ -363,7 +364,7 @@ namespace Aws public: explicit OperationError() noexcept = default; static void s_customDeleter(OperationError *shape) noexcept; - virtual void SerializeToJsonObject(Crt::JsonObject &payloadObject) const override; + void SerializeToJsonObject(Crt::JsonObject &payloadObject) const override; virtual Crt::Optional GetMessage() noexcept = 0; }; @@ -375,6 +376,8 @@ namespace Aws class AWS_EVENTSTREAMRPC_API StreamResponseHandler { public: + virtual ~StreamResponseHandler() noexcept = default; + /** * Invoked when stream is closed, so no more messages will be received. */ @@ -431,7 +434,7 @@ namespace Aws } OperationResult(Crt::ScopedResource &&error) noexcept : m_error(std::move(error)) {} OperationResult() noexcept : m_response(nullptr) {} - ~OperationResult() noexcept {}; + ~OperationResult() noexcept {} Crt::ScopedResource m_response; Crt::ScopedResource m_error; }; @@ -453,6 +456,7 @@ namespace Aws { /* An interface shared by all operations for retrieving the response object given the model name. */ public: + virtual ~ResponseRetriever() noexcept = default; virtual ExpectedResponseFactory GetInitialResponseFromModelName( const Crt::String &modelName) const noexcept = 0; virtual ExpectedResponseFactory GetStreamingResponseFromModelName( @@ -464,6 +468,7 @@ namespace Aws class AWS_EVENTSTREAMRPC_API ServiceModel { public: + virtual ~ServiceModel() noexcept = default; virtual Crt::ScopedResource AllocateOperationErrorFromPayload( const Crt::String &errorModelName, Crt::StringView stringView, @@ -474,6 +479,7 @@ namespace Aws { public: OperationModelContext(const ServiceModel &serviceModel) noexcept; + virtual ~OperationModelContext() noexcept = default; virtual Crt::ScopedResource AllocateInitialResponseFromPayload( Crt::StringView stringView, Crt::Allocator *allocator) const noexcept = 0; @@ -519,9 +525,6 @@ namespace Aws std::future Activate( const AbstractShapeBase *shape, OnMessageFlushCallback onMessageFlushCallback) noexcept; - std::future SendStreamEvent( - AbstractShapeBase *shape, - OnMessageFlushCallback onMessageFlushCallback) noexcept; virtual Crt::String GetModelName() const noexcept = 0; const OperationModelContext &m_operationModelContext; std::launch m_asyncLaunchMode; diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index 30f890388..e6af046c5 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -155,11 +155,11 @@ namespace Aws }; // FIXME This assignment operator is completely broken. - // 1. rhs' internal state can be changed during copying members one by one, which can lead to this being + // 1. rhs' internal state can be changed while members are copying one by one, which can lead to this being // inconsistent/corrupted. - // 2. if rhs is in the CONNECTED state, then a pointer to it is passed to the underlying libraries and is used - // in callbacks. - // 3. If this is connected, what will happen to its state? + // 2. if rhs is in the CONNECTED state, then a pointer to it has already been passed to the underlying + // libraries and is used in callbacks. + // 3. If this is connected, what will happen to its underlying connection, state, callbacks, etc.? // // Option 1 (preferable): This operator should be marked as deleted. It'll be a BREAKING CHANGE. // Option 2: As an ugly alternative, throw runtime error if `Connect` method was called on rhs or this. @@ -285,7 +285,7 @@ namespace Aws Crt::Io::ClientBootstrap &clientBootstrap) noexcept { EventStreamRpcStatusCode baseError = EVENT_STREAM_RPC_SUCCESS; - struct aws_event_stream_rpc_client_connection_options connOptions; + aws_event_stream_rpc_client_connection_options connOptions{}; { const std::lock_guard lock(m_stateMutex); @@ -392,7 +392,7 @@ namespace Aws const Crt::Optional &payload, OnMessageFlushCallback onMessageFlushCallback) noexcept { - return s_sendPing(this, headers, payload, onMessageFlushCallback); + return s_sendPing(this, headers, payload, std::move(onMessageFlushCallback)); } std::future ClientConnection::SendPingResponse( @@ -400,7 +400,7 @@ namespace Aws const Crt::Optional &payload, OnMessageFlushCallback onMessageFlushCallback) noexcept { - return s_sendPingResponse(this, headers, payload, onMessageFlushCallback); + return s_sendPingResponse(this, headers, payload, std::move(onMessageFlushCallback)); } std::future ClientConnection::s_sendPing( @@ -410,7 +410,8 @@ namespace Aws OnMessageFlushCallback onMessageFlushCallback) noexcept { return s_sendProtocolMessage( - connection, headers, payload, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING, 0, onMessageFlushCallback); + connection, headers, payload, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING, 0, + std::move(onMessageFlushCallback)); } std::future ClientConnection::s_sendPingResponse( @@ -425,7 +426,7 @@ namespace Aws payload, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING_RESPONSE, 0, - onMessageFlushCallback); + std::move(onMessageFlushCallback)); } std::future ClientConnection::SendProtocolMessage( @@ -435,7 +436,8 @@ namespace Aws uint32_t messageFlags, OnMessageFlushCallback onMessageFlushCallback) noexcept { - return s_sendProtocolMessage(this, headers, payload, messageType, messageFlags, onMessageFlushCallback); + return s_sendProtocolMessage( + this, headers, payload, messageType, messageFlags, std::move(onMessageFlushCallback)); } void ClientConnection::s_protocolMessageCallback(int errorCode, void *userData) noexcept @@ -474,7 +476,7 @@ namespace Aws { std::promise onFlushPromise; OnMessageFlushCallbackContainer *callbackContainer = nullptr; - struct aws_array_list headersArray; + aws_array_list headersArray{}; /* The caller should never pass a NULL connection. */ AWS_PRECONDITION(connection != nullptr); @@ -600,6 +602,9 @@ namespace Aws EventStreamHeader &EventStreamHeader::operator=(const EventStreamHeader &lhs) noexcept { + if (this == &lhs) { + return *this; + } m_allocator = lhs.m_allocator; m_valueByteBuf = Crt::ByteBufNewCopy(lhs.m_allocator, lhs.m_valueByteBuf.buffer, lhs.m_valueByteBuf.len); m_underlyingHandle = lhs.m_underlyingHandle; @@ -862,7 +867,7 @@ namespace Aws Crt::Allocator *allocator) noexcept : m_allocator(allocator), m_continuationHandler(continuationHandler), m_continuationToken(nullptr) { - struct aws_event_stream_rpc_client_stream_continuation_options options; + aws_event_stream_rpc_client_stream_continuation_options options{}; options.on_continuation = ClientContinuation::s_onContinuationMessage; options.on_continuation_closed = ClientContinuation::s_onContinuationClosed; @@ -963,7 +968,7 @@ namespace Aws uint32_t messageFlags, OnMessageFlushCallback onMessageFlushCallback) noexcept { - struct aws_array_list headersArray; + aws_array_list headersArray{}; OnMessageFlushCallbackContainer *callbackContainer = nullptr; std::promise onFlushPromise; @@ -994,7 +999,7 @@ namespace Aws if (!errorCode) { struct aws_event_stream_rpc_message_args msg_args; - msg_args.headers = (struct aws_event_stream_header_value_pair *)headersArray.data; + msg_args.headers = (aws_event_stream_header_value_pair *)headersArray.data; msg_args.headers_count = headers.size(); msg_args.payload = payload.has_value() ? (aws_byte_buf *)(&(payload.value())) : nullptr; msg_args.message_type = messageType; @@ -1037,7 +1042,7 @@ namespace Aws uint32_t messageFlags, OnMessageFlushCallback onMessageFlushCallback) noexcept { - struct aws_array_list headersArray; + aws_array_list headersArray{}; OnMessageFlushCallbackContainer *callbackContainer = nullptr; std::promise onFlushPromise; @@ -1052,7 +1057,7 @@ namespace Aws if (!errorCode) { - struct aws_event_stream_rpc_message_args msg_args; + aws_event_stream_rpc_message_args msg_args{}; msg_args.headers = (struct aws_event_stream_header_value_pair *)headersArray.data; msg_args.headers_count = headers.size(); msg_args.payload = payload.has_value() ? (aws_byte_buf *)(&(payload.value())) : nullptr; @@ -1062,7 +1067,7 @@ namespace Aws /* This heap allocation is necessary so that the flush callback can still be invoked when this function * returns. */ callbackContainer = Crt::New(m_allocator, m_allocator); - callbackContainer->onMessageFlushCallback = onMessageFlushCallback; + callbackContainer->onMessageFlushCallback = std::move(onMessageFlushCallback); callbackContainer->onFlushPromise = std::move(onFlushPromise); if (m_continuationToken) @@ -1120,8 +1125,6 @@ namespace Aws AbstractShapeBase::AbstractShapeBase() noexcept : m_allocator(nullptr) {} - AbstractShapeBase::~AbstractShapeBase() noexcept {} - ClientOperation::ClientOperation( ClientConnection &connection, std::shared_ptr streamHandler, @@ -1378,8 +1381,6 @@ namespace Aws uint32_t messageFlags) { EventStreamRpcStatusCode errorCode = EVENT_STREAM_RPC_SUCCESS; - const EventStreamHeader *modelHeader = nullptr; - const EventStreamHeader *contentHeader = nullptr; Crt::String modelName; if (messageFlags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) @@ -1390,7 +1391,7 @@ namespace Aws m_messageCount += 1; - modelHeader = GetHeaderByName(headers, Crt::String(SERVICE_MODEL_TYPE_HEADER)); + const EventStreamHeader *modelHeader = GetHeaderByName(headers, Crt::String(SERVICE_MODEL_TYPE_HEADER)); if (modelHeader == nullptr) { /* Missing required service model type header. */ @@ -1429,7 +1430,7 @@ namespace Aws if (!errorCode) { Crt::String contentType; - contentHeader = GetHeaderByName(headers, Crt::String(CONTENT_TYPE_HEADER)); + const EventStreamHeader *contentHeader = GetHeaderByName(headers, Crt::String(CONTENT_TYPE_HEADER)); if (contentHeader == nullptr) { /* Missing required content type header. */ @@ -1469,7 +1470,7 @@ namespace Aws { const std::lock_guard lock(m_continuationMutex); m_resultReceived = true; - RpcError promiseValue = {(EventStreamRpcStatusCode)errorCode, 0}; + RpcError promiseValue = {errorCode, 0}; m_initialResponsePromise.set_value(TaggedResult(promiseValue)); } else @@ -1510,7 +1511,7 @@ namespace Aws Crt::ByteBufFromCString(payloadString.c_str()), AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE, 0, - onMessageFlushCallback); + std::move(onMessageFlushCallback)); } void ClientOperation::OnContinuationClosed() @@ -1559,9 +1560,8 @@ namespace Aws /* This heap allocation is necessary so that the flush callback can still be invoked when this function * returns. */ - OnMessageFlushCallbackContainer *callbackContainer = - Crt::New(m_allocator, m_allocator); - callbackContainer->onMessageFlushCallback = onMessageFlushCallback; + auto *callbackContainer = Crt::New(m_allocator, m_allocator); + callbackContainer->onMessageFlushCallback = std::move(onMessageFlushCallback); callbackContainer->onFlushPromise = std::move(onTerminatePromise); if (m_clientContinuation.m_continuationToken) From aa39c04b459b6f057e4a702563951432238afb9f Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 19 Mar 2024 13:32:51 -0700 Subject: [PATCH 04/11] Get rid of recursive mutex --- .../aws/eventstreamrpc/EventStreamClient.h | 6 +- eventstream_rpc/source/EventStreamClient.cpp | 62 +++++++++---------- 2 files changed, 31 insertions(+), 37 deletions(-) diff --git a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h index 295a17558..2418ef2fb 100644 --- a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h +++ b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h @@ -634,14 +634,14 @@ namespace Aws DISCONNECTING, }; /* This recursive mutex protects m_clientState & m_connectionWillSetup */ - std::recursive_mutex m_stateMutex; + std::mutex m_closeReasonMutex; Crt::Allocator *m_allocator; struct aws_event_stream_rpc_client_connection *m_underlyingConnection; - ClientState m_clientState; + std::atomic m_clientState; ConnectionLifecycleHandler *m_lifecycleHandler; ConnectMessageAmender m_connectMessageAmender; std::promise m_connectionSetupPromise; - bool m_connectionWillSetup; + std::atomic m_connectionWillSetup; std::promise m_connectAckedPromise; std::promise m_closedPromise; bool m_onConnectCalled; diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index e6af046c5..697502ac9 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -167,9 +167,7 @@ namespace Aws { m_allocator = std::move(rhs.m_allocator); m_underlyingConnection = rhs.m_underlyingConnection; - rhs.m_stateMutex.lock(); - m_clientState = rhs.m_clientState; - rhs.m_stateMutex.unlock(); + m_clientState = rhs.m_clientState.load(); m_lifecycleHandler = rhs.m_lifecycleHandler; m_connectMessageAmender = rhs.m_connectMessageAmender; m_connectAckedPromise = std::move(rhs.m_connectAckedPromise); @@ -204,18 +202,12 @@ namespace Aws ClientConnection::~ClientConnection() noexcept { - m_stateMutex.lock(); - auto connectionWillSetup = m_connectionWillSetup; - m_stateMutex.unlock(); - if (connectionWillSetup) + if (m_connectionWillSetup) { m_connectionSetupPromise.get_future().wait(); } - m_stateMutex.lock(); - auto clientState = m_clientState; - m_stateMutex.unlock(); - if (clientState != DISCONNECTED) + if (m_clientState != DISCONNECTED) { Close(); m_closedPromise.get_future().wait(); @@ -288,7 +280,6 @@ namespace Aws aws_event_stream_rpc_client_connection_options connOptions{}; { - const std::lock_guard lock(m_stateMutex); if (m_clientState == DISCONNECTED) { m_clientState = CONNECTING_SOCKET; @@ -296,7 +287,10 @@ namespace Aws m_connectionSetupPromise = {}; m_connectAckedPromise = {}; m_closedPromise = {}; - m_closeReason = {EVENT_STREAM_RPC_UNINITIALIZED, 0}; + { + const std::lock_guard lock(m_closeReasonMutex); + m_closeReason = {EVENT_STREAM_RPC_UNINITIALIZED, 0}; + } m_connectionConfig = connectionConfig; m_lifecycleHandler = connectionLifecycleHandler; } @@ -339,7 +333,6 @@ namespace Aws errorPromise.set_value({baseError, 0}); if (baseError == EVENT_STREAM_RPC_NULL_PARAMETER) { - const std::lock_guard lock(m_stateMutex); m_clientState = DISCONNECTED; } return errorPromise.get_future(); @@ -374,13 +367,11 @@ namespace Aws "A CRT error occurred while attempting to establish the connection: %s", Crt::ErrorDebugString(crtError)); errorPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, crtError}); - const std::lock_guard lock(m_stateMutex); m_clientState = DISCONNECTED; return errorPromise.get_future(); } else { - const std::lock_guard lock(m_stateMutex); m_connectionWillSetup = true; } @@ -533,8 +524,6 @@ namespace Aws void ClientConnection::Close() noexcept { - const std::lock_guard lock(m_stateMutex); - if (IsOpen()) { aws_event_stream_rpc_client_connection_close(this->m_underlyingConnection, AWS_OP_SUCCESS); @@ -549,6 +538,7 @@ namespace Aws m_clientState = DISCONNECTING; } + const std::lock_guard lock(m_closeReasonMutex); if (m_closeReason.baseStatus == EVENT_STREAM_RPC_UNINITIALIZED) { m_closeReason = {EVENT_STREAM_RPC_CONNECTION_CLOSED, 0}; @@ -658,8 +648,6 @@ namespace Aws /* The `userData` pointer is used to pass `this` of a `ClientConnection` object. */ auto *thisConnection = static_cast(userData); - const std::lock_guard lock(thisConnection->m_stateMutex); - if (errorCode) { thisConnection->m_clientState = DISCONNECTED; @@ -676,7 +664,10 @@ namespace Aws else if (thisConnection->m_clientState == DISCONNECTING || thisConnection->m_clientState == DISCONNECTED) { thisConnection->m_underlyingConnection = connection; - thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_CLOSED, 0}; + { + const std::lock_guard lock(thisConnection->m_closeReasonMutex); + thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_CLOSED, 0}; + } thisConnection->Close(); } else @@ -727,19 +718,21 @@ namespace Aws /* The `userData` pointer is used to pass `this` of a `ClientConnection` object. */ auto *thisConnection = static_cast(userData); - const std::lock_guard lock(thisConnection->m_stateMutex); - - if (thisConnection->m_closeReason.baseStatus == EVENT_STREAM_RPC_UNINITIALIZED && errorCode) { - thisConnection->m_closeReason = {EVENT_STREAM_RPC_CRT_ERROR, errorCode}; - } + const std::lock_guard lock(thisConnection->m_closeReasonMutex); - thisConnection->m_underlyingConnection = nullptr; + if (thisConnection->m_closeReason.baseStatus == EVENT_STREAM_RPC_UNINITIALIZED && errorCode) + { + thisConnection->m_closeReason = {EVENT_STREAM_RPC_CRT_ERROR, errorCode}; + } - if (thisConnection->m_closeReason.baseStatus != EVENT_STREAM_RPC_UNINITIALIZED && - !thisConnection->m_onConnectCalled) - { - thisConnection->m_connectAckedPromise.set_value(thisConnection->m_closeReason); + thisConnection->m_underlyingConnection = nullptr; + + if (thisConnection->m_closeReason.baseStatus != EVENT_STREAM_RPC_UNINITIALIZED && + !thisConnection->m_onConnectCalled) + { + thisConnection->m_connectAckedPromise.set_value(thisConnection->m_closeReason); + } } thisConnection->m_clientState = DISCONNECTED; @@ -786,7 +779,6 @@ namespace Aws switch (messageArgs->message_type) { case AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK: - thisConnection->m_stateMutex.lock(); if (thisConnection->m_clientState == WAITING_FOR_CONNECT_ACK) { if (messageArgs->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED) @@ -798,7 +790,10 @@ namespace Aws } else { - thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED, 0}; + { + const std::lock_guard lock(thisConnection->m_closeReasonMutex); + thisConnection->m_closeReason = {EVENT_STREAM_RPC_CONNECTION_ACCESS_DENIED, 0}; + } thisConnection->Close(); } } @@ -806,7 +801,6 @@ namespace Aws { /* Unexpected CONNECT_ACK received. */ } - thisConnection->m_stateMutex.unlock(); break; From 5ff7957d4d87ce1a30694dc30fe5a02cf23b4695 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 19 Mar 2024 13:43:40 -0700 Subject: [PATCH 05/11] Simplify loop --- eventstream_rpc/source/EventStreamClient.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index 697502ac9..a47fd9ae8 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -1248,11 +1248,9 @@ namespace Aws const Crt::List &headers, const Crt::String &name) noexcept { - for (auto it = headers.begin(); it != headers.end(); ++it) - { - if (name == it->GetHeaderName()) - { - return &(*it); + for (const auto &header : headers) { + if (header.GetHeaderName() == name) { + return &header; } } return nullptr; From eb03d717ba92886ef96f8773e221dc5abb5608da Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 19 Mar 2024 13:45:52 -0700 Subject: [PATCH 06/11] Fix format --- eventstream_rpc/source/EventStreamClient.cpp | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index a47fd9ae8..207d381bb 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -401,7 +401,11 @@ namespace Aws OnMessageFlushCallback onMessageFlushCallback) noexcept { return s_sendProtocolMessage( - connection, headers, payload, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING, 0, + connection, + headers, + payload, + AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PING, + 0, std::move(onMessageFlushCallback)); } @@ -592,7 +596,8 @@ namespace Aws EventStreamHeader &EventStreamHeader::operator=(const EventStreamHeader &lhs) noexcept { - if (this == &lhs) { + if (this == &lhs) + { return *this; } m_allocator = lhs.m_allocator; @@ -1248,8 +1253,10 @@ namespace Aws const Crt::List &headers, const Crt::String &name) noexcept { - for (const auto &header : headers) { - if (header.GetHeaderName() == name) { + for (const auto &header : headers) + { + if (header.GetHeaderName() == name) + { return &header; } } From fb5020d968a11ec57b0fc2886e51aca98747e5dd Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 19 Mar 2024 17:22:28 -0700 Subject: [PATCH 07/11] Add comments --- eventstream_rpc/source/EventStreamClient.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index 207d381bb..44dfca254 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -1491,8 +1491,25 @@ namespace Aws { /* Promises must be reset in case the client would like to send a subsequent request with the same * `ClientOperation`. */ + // TODO Is std::promise::operator=() thread-safe? + // TODO Is it possible to "reset" m_initialResponsePromise while user code waits for it to set value? m_initialResponsePromise = {}; { + // FIXME Possible race condition with ClientOperation::HandleData and/or other functions. + // t2: Calls ClientOperation::HandleData + // t2: Acquires m_continuationMutex + // t1: "Resets" m_initialResponsePromise + // t1: Locks here + // t2: m_initialResponsePromise.set_value() + // t2: m_resultReceived = true + // t2: completes ClientOperation::HandleData + // t1: m_resultReceived = false + // t2: Calls ClientOperation::OnContinuationClosed + // t2: Acquires m_continuationMutex + // t2: !m_resultReceived is true + // t2: m_initialResponsePromise.set_value() is called the second time + // Not sure if this exact scenario is possible, but considering that this scheme is used in other + // methods, it'll be safer to fix this construction. const std::lock_guard lock(m_continuationMutex); m_resultReceived = false; } From 148e38c63be43d29e73f661b21ae1f2c00c8cf93 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Wed, 20 Mar 2024 11:28:25 -0700 Subject: [PATCH 08/11] Remove not implemented methods from header file --- .../aws/eventstreamrpc/EventStreamClient.h | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h index 2418ef2fb..8a7ca31cc 100644 --- a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h +++ b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h @@ -71,37 +71,16 @@ namespace Aws EventStreamHeader( const struct aws_event_stream_header_value_pair &header, Crt::Allocator *allocator = Crt::g_allocator); - EventStreamHeader(const Crt::String &name, bool value); - EventStreamHeader(const Crt::String &name, int8_t value); - EventStreamHeader(const Crt::String &name, int16_t value); - EventStreamHeader(const Crt::String &name, int32_t value); - EventStreamHeader(const Crt::String &name, int64_t value); - EventStreamHeader(const Crt::String &name, Crt::DateTime &value); EventStreamHeader( const Crt::String &name, const Crt::String &value, Crt::Allocator *allocator = Crt::g_allocator) noexcept; - EventStreamHeader(const Crt::String &name, Crt::ByteBuf &value); - EventStreamHeader(const Crt::String &name, Crt::UUID value); - HeaderValueType GetHeaderValueType(); Crt::String GetHeaderName() const noexcept; - void SetHeaderName(const Crt::String &); - - bool GetValueAsBoolean(bool &); - bool GetValueAsByte(int8_t &); - bool GetValueAsShort(int16_t &); - bool GetValueAsInt(int32_t &); - bool GetValueAsLong(int64_t &); - bool GetValueAsTimestamp(Crt::DateTime &); bool GetValueAsString(Crt::String &) const noexcept; - bool GetValueAsBytes(Crt::ByteBuf &); - bool GetValueAsUUID(Crt::UUID &); const struct aws_event_stream_header_value_pair *GetUnderlyingHandle() const; - bool operator==(const EventStreamHeader &other) const noexcept; - private: Crt::Allocator *m_allocator; Crt::ByteBuf m_valueByteBuf; @@ -323,7 +302,6 @@ namespace Aws uint32_t messageFlags, OnMessageFlushCallback onMessageFlushCallback) noexcept; bool IsClosed() noexcept; - void Release() noexcept; std::future SendMessage( const Crt::List &headers, const Crt::Optional &payload, From 4e1eb4ae4f13a44fcda90722525635702f6a6b12 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Wed, 20 Mar 2024 11:28:59 -0700 Subject: [PATCH 09/11] Minor fixes and comments --- .../aws/eventstreamrpc/EventStreamClient.h | 2 +- eventstream_rpc/source/EventStreamClient.cpp | 76 ++++++++----------- 2 files changed, 33 insertions(+), 45 deletions(-) diff --git a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h index 8a7ca31cc..14e701e86 100644 --- a/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h +++ b/eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h @@ -488,7 +488,7 @@ namespace Aws std::shared_ptr streamHandler, const OperationModelContext &operationModelContext, Crt::Allocator *allocator) noexcept; - ~ClientOperation() noexcept; + virtual ~ClientOperation() noexcept; ClientOperation(const ClientOperation &clientOperation) noexcept = delete; ClientOperation(ClientOperation &&clientOperation) noexcept = delete; diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index 44dfca254..f4d333cb9 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -8,8 +8,8 @@ #include #include -#include -#include +#include +#include #include @@ -481,8 +481,8 @@ namespace Aws if (!errorCode) { - struct aws_event_stream_rpc_message_args msg_args; - msg_args.headers = (struct aws_event_stream_header_value_pair *)headersArray.data; + aws_event_stream_rpc_message_args msg_args{}; + msg_args.headers = static_cast(headersArray.data); msg_args.headers_count = headers.size(); msg_args.payload = payload.has_value() ? (aws_byte_buf *)(&(payload.value())) : nullptr; msg_args.message_type = messageType; @@ -492,7 +492,7 @@ namespace Aws * returns. */ callbackContainer = Crt::New(connection->m_allocator, connection->m_allocator); - callbackContainer->onMessageFlushCallback = onMessageFlushCallback; + callbackContainer->onMessageFlushCallback = std::move(onMessageFlushCallback); callbackContainer->onFlushPromise = std::move(onFlushPromise); errorCode = aws_event_stream_rpc_client_connection_send_protocol_message( @@ -510,6 +510,7 @@ namespace Aws if (errorCode) { + // FIXME Null pointer dereference if s_fillNativeHeadersArray fails. onFlushPromise = std::move(callbackContainer->onFlushPromise); AWS_LOGF_ERROR( AWS_LS_EVENT_STREAM_RPC_CLIENT, @@ -1026,6 +1027,7 @@ namespace Aws if (errorCode) { + // FIXME Null pointer dereference when s_fillNativeHeadersArray fails. onFlushPromise = std::move(callbackContainer->onFlushPromise); onFlushPromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode}); Crt::Delete(callbackContainer, m_allocator); @@ -1087,6 +1089,7 @@ namespace Aws if (errorCode) { + // FIXME Null pointer dereference when s_fillNativeHeadersArray fails. onFlushPromise = std::move(callbackContainer->onFlushPromise); AWS_LOGF_ERROR( AWS_LS_EVENT_STREAM_RPC_CLIENT, @@ -1562,52 +1565,37 @@ namespace Aws errorPromise.set_value({EVENT_STREAM_RPC_CONTINUATION_CLOSED, 0}); return errorPromise.get_future(); } - else - { - std::promise onTerminatePromise; - int errorCode = AWS_OP_ERR; - struct aws_event_stream_rpc_message_args msg_args; - msg_args.headers = nullptr; - msg_args.headers_count = 0; - msg_args.payload = nullptr; - msg_args.message_type = AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE; - msg_args.message_flags = AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM; + aws_event_stream_rpc_message_args msg_args{}; + msg_args.message_type = AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_MESSAGE; + msg_args.message_flags = AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM; - /* This heap allocation is necessary so that the flush callback can still be invoked when this function - * returns. */ - auto *callbackContainer = Crt::New(m_allocator, m_allocator); - callbackContainer->onMessageFlushCallback = std::move(onMessageFlushCallback); - callbackContainer->onFlushPromise = std::move(onTerminatePromise); + /* This heap allocation is necessary so that the flush callback can still be invoked when this function + * returns. */ + auto *callbackContainer = Crt::New(m_allocator, m_allocator); + callbackContainer->onMessageFlushCallback = std::move(onMessageFlushCallback); - if (m_clientContinuation.m_continuationToken) - { - errorCode = aws_event_stream_rpc_client_continuation_send_message( - m_clientContinuation.m_continuationToken, - &msg_args, - ClientConnection::s_protocolMessageCallback, - reinterpret_cast(callbackContainer)); - } + int errorCode = aws_event_stream_rpc_client_continuation_send_message( + m_clientContinuation.m_continuationToken, + &msg_args, + ClientConnection::s_protocolMessageCallback, + callbackContainer); - if (errorCode) - { - onTerminatePromise = std::move(callbackContainer->onFlushPromise); - std::promise errorPromise; - AWS_LOGF_ERROR( - AWS_LS_EVENT_STREAM_RPC_CLIENT, - "A CRT error occurred while closing the stream: %s", - Crt::ErrorDebugString(errorCode)); - onTerminatePromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode}); - Crt::Delete(callbackContainer, m_allocator); - } - else - { - m_expectedCloses.fetch_add(1); - return callbackContainer->onFlushPromise.get_future(); - } + if (errorCode) + { + AWS_LOGF_ERROR( + AWS_LS_EVENT_STREAM_RPC_CLIENT, + "A CRT error occurred while closing the stream: %s", + Crt::ErrorDebugString(errorCode)); + Crt::Delete(callbackContainer, m_allocator); + std::promise onTerminatePromise; + onTerminatePromise.set_value({EVENT_STREAM_RPC_CRT_ERROR, errorCode}); return onTerminatePromise.get_future(); } + + m_expectedCloses.fetch_add(1); + return callbackContainer->onFlushPromise.get_future(); } void OperationError::s_customDeleter(OperationError *shape) noexcept From 2184226d965fc3a3d570386467f8d474fd5b5073 Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 26 Mar 2024 09:35:34 -0700 Subject: [PATCH 10/11] Add another fixme comment --- eventstream_rpc/source/EventStreamClient.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index f4d333cb9..c7a481793 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -898,6 +898,9 @@ namespace Aws } if (m_callbackData != nullptr) { + // FIXME Setting `m_callbackData->continuationDestroyed` indicates that another actor is supposed + // to check this flag (see `ClientContinuation::s_onContinuationMessage`). However, we delete + // `m_callbackData` right after setting the flag, so it doesn't work as intended. { const std::lock_guard lock(m_callbackData->callbackMutex); m_callbackData->continuationDestroyed = true; @@ -913,6 +916,10 @@ namespace Aws { (void)continuationToken; /* The `userData` pointer is used to pass a `ContinuationCallbackData` object. */ + // FIXME Can `callbackData` be destroyed at this point? See `ClientContinuation::~ClientContinuation`. + // Probably `callbackData` is guaranteed to be alive after this PR: + // https://github.com/aws/aws-iot-device-sdk-cpp-v2/pull/437. But then we need to get rid of the + // `continuationDestroyed` flag. auto *callbackData = static_cast(userData); auto *thisContinuation = callbackData->clientContinuation; From 0d6076446c9944e4601269f4c0601916232333fe Mon Sep 17 00:00:00 2001 From: Igor Abdrakhimov Date: Tue, 26 Mar 2024 11:35:43 -0700 Subject: [PATCH 11/11] FIx comment --- eventstream_rpc/source/EventStreamClient.cpp | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/eventstream_rpc/source/EventStreamClient.cpp b/eventstream_rpc/source/EventStreamClient.cpp index c7a481793..36142ef70 100644 --- a/eventstream_rpc/source/EventStreamClient.cpp +++ b/eventstream_rpc/source/EventStreamClient.cpp @@ -1501,23 +1501,26 @@ namespace Aws { /* Promises must be reset in case the client would like to send a subsequent request with the same * `ClientOperation`. */ - // TODO Is std::promise::operator=() thread-safe? - // TODO Is it possible to "reset" m_initialResponsePromise while user code waits for it to set value? + // TODO When compiling with fno-exceptions, resetting promise can lead to abort() if someone is waiting + // for the associated future. m_initialResponsePromise = {}; { // FIXME Possible race condition with ClientOperation::HandleData and/or other functions. // t2: Calls ClientOperation::HandleData // t2: Acquires m_continuationMutex // t1: "Resets" m_initialResponsePromise - // t1: Locks here + // t1: Locks here on m_continuationMutex // t2: m_initialResponsePromise.set_value() // t2: m_resultReceived = true - // t2: completes ClientOperation::HandleData + // t2: Releases m_continuationMutex (and completes ClientOperation::HandleData) + // t1: Acquires m_continuationMutex // t1: m_resultReceived = false + // t1: Releases m_continuationMutex // t2: Calls ClientOperation::OnContinuationClosed // t2: Acquires m_continuationMutex // t2: !m_resultReceived is true // t2: m_initialResponsePromise.set_value() is called the second time + // t2: Calls abort() // Not sure if this exact scenario is possible, but considering that this scheme is used in other // methods, it'll be safer to fix this construction. const std::lock_guard lock(m_continuationMutex);