Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean and document eventstream #694

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
66 changes: 27 additions & 39 deletions eventstream_rpc/include/aws/eventstreamrpc/EventStreamClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace Aws

/**
* A callback prototype that is called upon flushing a message over the wire.
* @param errorCode A non-zero value if an error occured while attempting to flush the message.
* @param errorCode A non-zero value if an error occurred while attempting to flush the message.
*/
using OnMessageFlushCallback = std::function<void(int errorCode)>;

Expand All @@ -71,37 +71,16 @@ namespace Aws
EventStreamHeader(
const struct aws_event_stream_header_value_pair &header,
Crt::Allocator *allocator = Crt::g_allocator);
EventStreamHeader(const Crt::String &name, bool value);
EventStreamHeader(const Crt::String &name, int8_t value);
EventStreamHeader(const Crt::String &name, int16_t value);
EventStreamHeader(const Crt::String &name, int32_t value);
EventStreamHeader(const Crt::String &name, int64_t value);
EventStreamHeader(const Crt::String &name, Crt::DateTime &value);
EventStreamHeader(
const Crt::String &name,
const Crt::String &value,
Crt::Allocator *allocator = Crt::g_allocator) noexcept;
EventStreamHeader(const Crt::String &name, Crt::ByteBuf &value);
EventStreamHeader(const Crt::String &name, Crt::UUID value);

HeaderValueType GetHeaderValueType();
Crt::String GetHeaderName() const noexcept;
void SetHeaderName(const Crt::String &);

bool GetValueAsBoolean(bool &);
bool GetValueAsByte(int8_t &);
bool GetValueAsShort(int16_t &);
bool GetValueAsInt(int32_t &);
bool GetValueAsLong(int64_t &);
bool GetValueAsTimestamp(Crt::DateTime &);
bool GetValueAsString(Crt::String &) const noexcept;
bool GetValueAsBytes(Crt::ByteBuf &);
bool GetValueAsUUID(Crt::UUID &);

const struct aws_event_stream_header_value_pair *GetUnderlyingHandle() const;

bool operator==(const EventStreamHeader &other) const noexcept;

private:
Crt::Allocator *m_allocator;
Crt::ByteBuf m_valueByteBuf;
Expand All @@ -117,7 +96,7 @@ namespace Aws
{
public:
MessageAmendment(const MessageAmendment &lhs);
MessageAmendment(MessageAmendment &&rhs);
MessageAmendment(MessageAmendment &&rhs) noexcept;
MessageAmendment &operator=(const MessageAmendment &lhs);
~MessageAmendment() noexcept;
explicit MessageAmendment(Crt::Allocator *allocator = Crt::g_allocator) noexcept;
Expand Down Expand Up @@ -162,10 +141,10 @@ namespace Aws
OnMessageFlushCallback GetConnectRequestCallback() const noexcept { return m_connectRequestCallback; }
ConnectMessageAmender GetConnectMessageAmender() const noexcept
{
return [&](void) -> const MessageAmendment & { return m_connectAmendment; };
return [&]() -> const MessageAmendment & { return m_connectAmendment; };
}

void SetHostName(Crt::String hostName) noexcept { m_hostName = hostName; }
void SetHostName(Crt::String hostName) noexcept { m_hostName = std::move(hostName); }
void SetPort(uint32_t port) noexcept { m_port = port; }
void SetSocketOptions(const Crt::Io::SocketOptions &socketOptions) noexcept
{
Expand All @@ -185,7 +164,7 @@ namespace Aws
}
void SetConnectRequestCallback(OnMessageFlushCallback connectRequestCallback) noexcept
{
m_connectRequestCallback = connectRequestCallback;
m_connectRequestCallback = std::move(connectRequestCallback);
}

protected:
Expand Down Expand Up @@ -226,6 +205,7 @@ namespace Aws
class AWS_EVENTSTREAMRPC_API ConnectionLifecycleHandler
{
public:
virtual ~ConnectionLifecycleHandler() noexcept = default;
/**
* This callback is only invoked upon receiving a CONNECT_ACK with the
* CONNECTION_ACCEPTED flag set by the server. Therefore, once this callback
Expand Down Expand Up @@ -292,7 +272,7 @@ namespace Aws
* the TERMINATE_STREAM flag, or when the connection shuts down.
*/
virtual void OnContinuationClosed() = 0;
virtual ~ClientContinuationHandler() noexcept;
virtual ~ClientContinuationHandler() noexcept = default;

private:
friend class ClientContinuation;
Expand All @@ -307,6 +287,13 @@ namespace Aws
ClientContinuationHandler &continuationHandler,
Crt::Allocator *allocator) noexcept;
~ClientContinuation() noexcept;

ClientContinuation(const ClientContinuation &other) = default;
ClientContinuation(ClientContinuation &&other) noexcept = default;

ClientContinuation &operator=(const ClientContinuation &other) = delete;
ClientContinuation &operator=(ClientContinuation &&other) noexcept = delete;

std::future<RpcError> Activate(
const Crt::String &operation,
const Crt::List<EventStreamHeader> &headers,
Expand All @@ -315,7 +302,6 @@ namespace Aws
uint32_t messageFlags,
OnMessageFlushCallback onMessageFlushCallback) noexcept;
bool IsClosed() noexcept;
void Release() noexcept;
std::future<RpcError> SendMessage(
const Crt::List<EventStreamHeader> &headers,
const Crt::Optional<Crt::ByteBuf> &payload,
Expand All @@ -342,7 +328,7 @@ namespace Aws
{
public:
AbstractShapeBase() noexcept;
virtual ~AbstractShapeBase() noexcept;
virtual ~AbstractShapeBase() noexcept = default;
static void s_customDeleter(AbstractShapeBase *shape) noexcept;
virtual void SerializeToJsonObject(Crt::JsonObject &payloadObject) const = 0;
virtual Crt::String GetModelName() const noexcept = 0;
Expand All @@ -354,9 +340,9 @@ namespace Aws
class AWS_EVENTSTREAMRPC_API OperationError : public AbstractShapeBase
{
public:
explicit OperationError() noexcept;
explicit OperationError() noexcept = default;
static void s_customDeleter(OperationError *shape) noexcept;
virtual void SerializeToJsonObject(Crt::JsonObject &payloadObject) const override;
void SerializeToJsonObject(Crt::JsonObject &payloadObject) const override;
virtual Crt::Optional<Crt::String> GetMessage() noexcept = 0;
};

Expand All @@ -368,6 +354,8 @@ namespace Aws
class AWS_EVENTSTREAMRPC_API StreamResponseHandler
{
public:
virtual ~StreamResponseHandler() noexcept = default;

/**
* Invoked when stream is closed, so no more messages will be received.
*/
Expand Down Expand Up @@ -424,7 +412,7 @@ namespace Aws
}
OperationResult(Crt::ScopedResource<OperationError> &&error) noexcept : m_error(std::move(error)) {}
OperationResult() noexcept : m_response(nullptr) {}
~OperationResult() noexcept {};
~OperationResult() noexcept {}
Crt::ScopedResource<AbstractShapeBase> m_response;
Crt::ScopedResource<OperationError> m_error;
};
Expand All @@ -446,6 +434,7 @@ namespace Aws
{
/* An interface shared by all operations for retrieving the response object given the model name. */
public:
virtual ~ResponseRetriever() noexcept = default;
virtual ExpectedResponseFactory GetInitialResponseFromModelName(
const Crt::String &modelName) const noexcept = 0;
virtual ExpectedResponseFactory GetStreamingResponseFromModelName(
Expand All @@ -457,6 +446,7 @@ namespace Aws
class AWS_EVENTSTREAMRPC_API ServiceModel
{
public:
virtual ~ServiceModel() noexcept = default;
virtual Crt::ScopedResource<OperationError> AllocateOperationErrorFromPayload(
const Crt::String &errorModelName,
Crt::StringView stringView,
Expand All @@ -467,6 +457,7 @@ namespace Aws
{
public:
OperationModelContext(const ServiceModel &serviceModel) noexcept;
virtual ~OperationModelContext() noexcept = default;
virtual Crt::ScopedResource<AbstractShapeBase> AllocateInitialResponseFromPayload(
Crt::StringView stringView,
Crt::Allocator *allocator) const noexcept = 0;
Expand Down Expand Up @@ -497,7 +488,7 @@ namespace Aws
std::shared_ptr<StreamResponseHandler> streamHandler,
const OperationModelContext &operationModelContext,
Crt::Allocator *allocator) noexcept;
~ClientOperation() noexcept;
virtual ~ClientOperation() noexcept;

ClientOperation(const ClientOperation &clientOperation) noexcept = delete;
ClientOperation(ClientOperation &&clientOperation) noexcept = delete;
Expand All @@ -512,9 +503,6 @@ namespace Aws
std::future<RpcError> Activate(
const AbstractShapeBase *shape,
OnMessageFlushCallback onMessageFlushCallback) noexcept;
std::future<RpcError> SendStreamEvent(
AbstractShapeBase *shape,
OnMessageFlushCallback onMessageFlushCallback) noexcept;
virtual Crt::String GetModelName() const noexcept = 0;
const OperationModelContext &m_operationModelContext;
std::launch m_asyncLaunchMode;
Expand Down Expand Up @@ -624,14 +612,14 @@ namespace Aws
DISCONNECTING,
};
/* This recursive mutex protects m_clientState & m_connectionWillSetup */
std::recursive_mutex m_stateMutex;
std::mutex m_closeReasonMutex;
Crt::Allocator *m_allocator;
struct aws_event_stream_rpc_client_connection *m_underlyingConnection;
ClientState m_clientState;
std::atomic<ClientState> m_clientState;
ConnectionLifecycleHandler *m_lifecycleHandler;
ConnectMessageAmender m_connectMessageAmender;
std::promise<void> m_connectionSetupPromise;
bool m_connectionWillSetup;
std::atomic<bool> m_connectionWillSetup;
std::promise<RpcError> m_connectAckedPromise;
std::promise<RpcError> m_closedPromise;
bool m_onConnectCalled;
Expand Down
Loading
Loading