Skip to content
Draft
4 changes: 4 additions & 0 deletions cpp/csp/adapters/websocket/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ set(WS_CLIENT_HEADER_FILES
ClientInputAdapter.h
ClientOutputAdapter.h
ClientHeaderUpdateAdapter.h
ClientConnectionRequestAdapter.h
WebsocketEndpoint.h
WebsocketEndpointManager.h
${WEBSOCKET_HEADER}
)

Expand All @@ -14,7 +16,9 @@ set(WS_CLIENT_SOURCE_FILES
ClientInputAdapter.cpp
ClientOutputAdapter.cpp
ClientHeaderUpdateAdapter.cpp
ClientConnectionRequestAdapter.cpp
WebsocketEndpoint.cpp
WebsocketEndpointManager.cpp
${WS_CLIENT_HEADER_FILES}
${WEBSOCKET_SOURCE}
)
Expand Down
133 changes: 34 additions & 99 deletions cpp/csp/adapters/websocket/ClientAdapterManager.cpp
Original file line number Diff line number Diff line change
@@ -1,124 +1,59 @@
#include <csp/adapters/websocket/ClientAdapterManager.h>

namespace csp {

INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
"ACTIVE",
"GENERIC_ERROR",
"CONNECTION_FAILED",
"CLOSED",
"MESSAGE_SEND_FAIL",
);

}

// With TLS
namespace csp::adapters::websocket {

ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties )
: AdapterManager( engine ),
m_active( false ),
m_shouldRun( false ),
m_endpoint( std::make_unique<WebsocketEndpoint>( properties ) ),
m_inputAdapter( nullptr ),
m_outputAdapter( nullptr ),
m_updateAdapter( nullptr ),
m_thread( nullptr ),
m_properties( properties )
{ };
m_properties( properties )
{ }

ClientAdapterManager::~ClientAdapterManager()
{ };
{ }

void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
{
AdapterManager::start( starttime, endtime );

m_shouldRun = true;
m_endpoint -> setOnOpen(
[ this ]() {
m_active = true;
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
}
);
m_endpoint -> setOnFail(
[ this ]( const std::string& reason ) {
std::stringstream ss;
ss << "Connection Failure: " << reason;
m_active = false;
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, ss.str() );
}
);
if( m_inputAdapter ) {
m_endpoint -> setOnMessage(
[ this ]( void* c, size_t t ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
}
);
} else {
// if a user doesn't call WebsocketAdapterManager.subscribe, no inputadapter will be created
// but we still need something to avoid on_message_cb not being set in the endpoint.
m_endpoint -> setOnMessage( []( void* c, size_t t ){} );
}
m_endpoint -> setOnClose(
[ this ]() {
m_active = false;
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
}
);
m_endpoint -> setOnSendFail(
[ this ]( const std::string& s ) {
std::stringstream ss;
ss << "Failed to send: " << s;
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
}
);
WebsocketEndpointManager* ClientAdapterManager::getWebsocketManager(){
if( m_endpointManager == nullptr )
return nullptr;
return m_endpointManager.get();
}

m_thread = std::make_unique<std::thread>( [ this ]() {
while( m_shouldRun )
{
m_endpoint -> run();
m_active = false;
if( m_shouldRun ) sleep( m_properties.get<TimeDelta>( "reconnect_interval" ) );
}
});
};
void ClientAdapterManager::start(DateTime starttime, DateTime endtime) {
AdapterManager::start(starttime, endtime);
if (m_endpointManager != nullptr)
m_endpointManager -> start(starttime, endtime);
}

void ClientAdapterManager::stop() {
AdapterManager::stop();

m_shouldRun=false;
if( m_active ) m_endpoint->stop();
if( m_thread ) m_thread->join();
};
if (m_endpointManager != nullptr)
m_endpointManager -> stop();
}

PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
{
if (m_inputAdapter == nullptr)
{
m_inputAdapter = m_engine -> createOwnedObject<ClientInputAdapter>(
// m_engine,
type,
pushMode,
properties
);
}
return m_inputAdapter;
};
{
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getInputAdapter( type, pushMode, properties );
}

OutputAdapter* ClientAdapterManager::getOutputAdapter()
OutputAdapter* ClientAdapterManager::getOutputAdapter( const Dictionary & properties )
{
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(*m_endpoint);

return m_outputAdapter;
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getOutputAdapter( properties );
}

OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
{
if (m_updateAdapter == nullptr) m_updateAdapter = m_engine -> createOwnedObject<ClientHeaderUpdateOutputAdapter>( m_endpoint -> getProperties() );
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getHeaderUpdateAdapter();
}

return m_updateAdapter;
OutputAdapter * ClientAdapterManager::getConnectionRequestAdapter( const Dictionary & properties )
{
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getConnectionRequestAdapter( properties );
}

DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )
Expand Down
37 changes: 8 additions & 29 deletions cpp/csp/adapters/websocket/ClientAdapterManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H

#include <csp/adapters/websocket/WebsocketEndpoint.h>
#include <csp/adapters/websocket/WebsocketEndpointManager.h>
#include <csp/adapters/websocket/ClientInputAdapter.h>
#include <csp/adapters/websocket/ClientOutputAdapter.h>
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
#include <csp/core/Enum.h>
#include <csp/core/Hash.h>
Expand All @@ -15,30 +15,15 @@
#include <chrono>
#include <iomanip>
#include <iostream>
#include <vector>
#include <unordered_set>


namespace csp::adapters::websocket {

using namespace csp;

struct WebsocketClientStatusTypeTraits
{
enum _enum : unsigned char
{
ACTIVE = 0,
GENERIC_ERROR = 1,
CONNECTION_FAILED = 2,
CLOSED = 3,
MESSAGE_SEND_FAIL = 4,

NUM_TYPES
};

protected:
_enum m_value;
};

using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
class WebsocketEndpointManager;

class ClientAdapterManager final : public AdapterManager
{
Expand All @@ -57,23 +42,17 @@ class ClientAdapterManager final : public AdapterManager

void stop() override;

WebsocketEndpointManager* getWebsocketManager();
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
OutputAdapter * getOutputAdapter();
OutputAdapter * getOutputAdapter( const Dictionary & properties );
OutputAdapter * getHeaderUpdateAdapter();
OutputAdapter * getConnectionRequestAdapter( const Dictionary & properties );

DateTime processNextSimTimeSlice( DateTime time ) override;

private:
// need some client info

bool m_active;
bool m_shouldRun;
std::unique_ptr<WebsocketEndpoint> m_endpoint;
ClientInputAdapter* m_inputAdapter;
ClientOutputAdapter* m_outputAdapter;
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
std::unique_ptr<std::thread> m_thread;
Dictionary m_properties;
std::unique_ptr<WebsocketEndpointManager> m_endpointManager;
};

}
Expand Down
66 changes: 66 additions & 0 deletions cpp/csp/adapters/websocket/ClientConnectionRequestAdapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include <csp/adapters/websocket/ClientConnectionRequestAdapter.h>

namespace csp::adapters::websocket {

ClientConnectionRequestAdapter::ClientConnectionRequestAdapter(
Engine * engine,
WebsocketEndpointManager * websocketManager,
bool is_subscribe,
size_t caller_id,
boost::asio::strand<boost::asio::io_context::executor_type>& strand

) : OutputAdapter( engine ),
m_websocketManager( websocketManager ),
m_strand( strand ),
m_isSubscribe( is_subscribe ),
m_callerId( caller_id ),
m_checkPerformed( is_subscribe ? false : true ) // we only need to check for pruned input adapters
{}

void ClientConnectionRequestAdapter::executeImpl()
{
// One-time check for pruned status
if (unlikely(!m_checkPerformed)) {
m_isPruned = m_websocketManager->adapterPruned(m_callerId);
m_checkPerformed = true;
}

// Early return if pruned
if (unlikely(m_isPruned))
return;

std::vector<Dictionary> properties_list;
for (auto& request : input()->lastValueTyped<std::vector<InternalConnectionRequest::Ptr>>()) {
if (!request->allFieldsSet())
CSP_THROW(TypeError, "All fields must be set in InternalConnectionRequest");

Dictionary dict;
dict.update("host", request->host());
dict.update("port", request->port());
dict.update("route", request->route());
dict.update("uri", request->uri());
dict.update("use_ssl", request->use_ssl());
dict.update("reconnect_interval", request->reconnect_interval());
dict.update("persistent", request->persistent());

dict.update("headers", request -> headers() );
dict.update("on_connect_payload", request->on_connect_payload());
dict.update("action", request->action());
dict.update("dynamic", request->dynamic());
dict.update("binary", request->binary());

properties_list.push_back(std::move(dict));
}

// We intentionally post here, we want the thread running
// the strand to handle the connection request. We want to keep
// all updates to internal data structures at graph run-time
// to that thread.
boost::asio::post(m_strand, [this, properties_list=std::move(properties_list)]() {
for(const auto& conn_req: properties_list) {
m_websocketManager->handleConnectionRequest(conn_req, m_callerId, m_isSubscribe);
}
});
};

}
45 changes: 45 additions & 0 deletions cpp/csp/adapters/websocket/ClientConnectionRequestAdapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_CONNECTIONREQUESTADAPTER_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_CONNECTIONREQUESTADAPTER_H

#include <csp/adapters/websocket/ClientAdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/OutputAdapter.h>
#include <csp/adapters/utils/MessageWriter.h>
#include <csp/adapters/websocket/csp_autogen/websocket_types.h>

namespace csp::adapters::websocket
{
using namespace csp::autogen;

class ClientAdapterManager;
class WebsocketEndpointManager;

class ClientConnectionRequestAdapter final: public OutputAdapter
{
public:
ClientConnectionRequestAdapter(
Engine * engine,
WebsocketEndpointManager * websocketManager,
bool isSubscribe,
size_t callerId,
boost::asio::strand<boost::asio::io_context::executor_type>& strand
);

void executeImpl() override;

const char * name() const override { return "WebsocketClientConnectionRequestAdapter"; }

private:
WebsocketEndpointManager* m_websocketManager;
boost::asio::strand<boost::asio::io_context::executor_type>& m_strand;
bool m_isSubscribe;
size_t m_callerId;
bool m_checkPerformed;
bool m_isPruned{false};

};

}


#endif
Loading
Loading