Skip to content

Commit

Permalink
feat: setup external command processor to listen to the external comm…
Browse files Browse the repository at this point in the history
…and queue and correctly respond if there is a reply_to header
  • Loading branch information
Lazrius committed Aug 13, 2023
1 parent bb2cc7d commit 8da63ed
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 158 deletions.
9 changes: 9 additions & 0 deletions include/Core/Commands/AbstractExternalCommandProcessor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

class AbstractExternalCommandProcessor
{
public:
virtual ~AbstractExternalCommandProcessor() = default;
virtual std::optional<nlohmann::json> ProcessCommand(const nlohmann::json& command) = 0;
virtual std::vector<std::wstring_view> GetCommands() = 0;
};
12 changes: 6 additions & 6 deletions include/Core/Commands/ExternalCommandProcessor.hpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
#pragma once

class ExternalCommandProcessor
#include "AbstractExternalCommandProcessor.hpp"

class ExternalCommandProcessor final : public AbstractExternalCommandProcessor, public Singleton<ExternalCommandProcessor>
{
// clang-format off
std::unordered_map<std::wstring, std::function<nlohmann::json(const nlohmann::json&)>> functions =
{
{
std::wstring(L"beam"), Beam
},
{

}
};
// clang-format on

static nlohmann::json Beam(const nlohmann::json& parameters);
static nlohmann::json Beam(const nlohmann::json& parameters);

public:
std::optional<nlohmann::json> ProcessCommand(const nlohmann::json& command);
std::optional<nlohmann::json> ProcessCommand(const nlohmann::json& command) override;
std::vector<std::wstring_view> GetCommands() override;
};
63 changes: 31 additions & 32 deletions include/Core/MessageHandler.hpp
Original file line number Diff line number Diff line change
@@ -1,48 +1,47 @@
#pragma once

#pragma warning(push, 0)
#pragma warning(disable: 4244)
#include <amqpcpp.h>
#pragma warning(disable : 4244)
#include <External/Singleton.h>
#include <External/uvw.hpp>
#include <amqpcpp.h>
#include <memory>
#include <vector>
#include <External/Singleton.h>
#include <thread>
#include <vector>
#pragma warning(pop)

class MessageHandler final : public AMQP::ConnectionHandler, public Singleton<MessageHandler>
{
using QueueOnData = std::function<bool(const AMQP::Message& msg)>;
using QueueOnFail = std::function<void(const char* err)>;
using QueueOnData = std::function<bool(const AMQP::Message& msg, std::optional<nlohmann::json>& replyBody)>;
using QueueOnFail = std::function<void(const char* err)>;

std::shared_ptr<uvw::Loop> loop;
std::shared_ptr<uvw::TCPHandle> connectHandle;
std::unique_ptr<AMQP::Connection> connection;
std::unique_ptr<AMQP::Channel> channel;
std::map<std::wstring, std::vector<QueueOnData>> onMessageCallbacks;
std::map<std::wstring, std::vector<QueueOnFail>> onFailCallbacks;
std::atomic_bool isInitalizing = true;
std::jthread runner;
std::shared_ptr<uvw::Loop> loop;
std::shared_ptr<uvw::TCPHandle> connectHandle;
std::unique_ptr<AMQP::Connection> connection;
std::unique_ptr<AMQP::Channel> channel;
std::map<std::wstring, std::vector<QueueOnData>> onMessageCallbacks;
std::map<std::wstring, std::vector<QueueOnFail>> onFailCallbacks;
std::atomic_bool isInitalizing = true;
std::jthread runner;

void onData(AMQP::Connection* connection, const char* data, size_t size) override;
void onReady(AMQP::Connection* connection) override;
void onError(AMQP::Connection* connection, const char* message) override;
void onClosed(AMQP::Connection* connection) override;
void onData(AMQP::Connection* connection, const char* data, size_t size) override;
void onReady(AMQP::Connection* connection) override;
void onError(AMQP::Connection* connection, const char* message) override;
void onClosed(AMQP::Connection* connection) override;

public:
explicit MessageHandler();
~MessageHandler() noexcept override;
public:
explicit MessageHandler();
~MessageHandler() noexcept override;

enum class Queue
{
ServerStats,
ExternalCommands,
};
enum class Queue
{
ServerStats,
ExternalCommands,
};

static std::wstring_view QueueToStr(const Queue queue) { return magic_enum::enum_name<Queue>(queue); }
void Subscribe(const std::wstring& queue, QueueOnData callback, std::optional<QueueOnFail> onFail = std::nullopt);
void Publish(const std::wstring& jsonData, const std::wstring& exchange = L"", const std::wstring& queue = L"") const;
void DeclareQueue(const std::wstring& queue, int flags = 0) const;
void DeclareExchange(const std::wstring& exchange, AMQP::ExchangeType type = AMQP::fanout, int flags = 0) const;
void RemoteProcedureCall(nlohmann::json call);
static std::wstring_view QueueToStr(const Queue queue) { return magic_enum::enum_name<Queue>(queue); }
void Subscribe(const std::wstring& queue, QueueOnData callback, std::optional<QueueOnFail> onFail = std::nullopt);
void Publish(const std::wstring& jsonData, const std::wstring& exchange = L"", const std::wstring& queue = L"") const;
void DeclareQueue(const std::wstring& queue, int flags = 0) const;
void DeclareExchange(const std::wstring& exchange, AMQP::ExchangeType type = AMQP::fanout, int flags = 0) const;
};
1 change: 1 addition & 0 deletions project/FLHook.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
<ClInclude Include="..\Include\Core\Action.hpp" />
<ClInclude Include="..\Include\Core\Codec.hpp" />
<ClInclude Include="..\Include\Core\Commands\AbstractAdminCommandProcessor.hpp" />
<ClInclude Include="..\include\Core\Commands\AbstractExternalCommandProcessor.hpp" />
<ClInclude Include="..\Include\Core\Commands\AbstractUserCommandProcessor.hpp" />
<ClInclude Include="..\Include\Core\Commands\AdminCommandProcessor.hpp" />
<ClInclude Include="..\Include\Core\Commands\CommandLineParser.hpp" />
Expand Down
3 changes: 3 additions & 0 deletions project/FLHook.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -411,5 +411,8 @@
<ClInclude Include="..\Include\Core\Commands\ExternalCommandProcessor.hpp">
<Filter>Include\Core\Commands</Filter>
</ClInclude>
<ClInclude Include="..\include\Core\Commands\AbstractExternalCommandProcessor.hpp">
<Filter>Include\Core\Commands</Filter>
</ClInclude>
</ItemGroup>
</Project>
219 changes: 109 additions & 110 deletions source/API/FLHook/MessageHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,144 +1,143 @@
#include "PCH.hpp"

#include "Core/MessageHandler.hpp"
#include <Core/Logger.hpp>
#include "Defs/FLHookConfig.hpp"

#include <Core/Logger.hpp>

MessageHandler::MessageHandler()
{
if (!FLHookConfig::i()->messageQueue.enableQueues)
{
return;
}

Logger::i()->Log(LogLevel::Info, L"Attempting connection to RabbitMQ");

loop = uvw::Loop::getDefault();
connectHandle = loop->resource<uvw::TCPHandle>();

connectHandle->once<uvw::ErrorEvent>([](const uvw::ErrorEvent& event, uvw::TCPHandle&) {
// Just die on error, the message director always needs a connection to RabbitMQ.
// TODO: Add proper error handling and reconnects in the event of connection loss
Logger::i()->Log(LogLevel::Err, std::format(L"Socket error: {}", StringUtils::stows(event.what())));
throw std::runtime_error("Unable to connect to socket");
});

connectHandle->once<uvw::ConnectEvent>([this](const uvw::ConnectEvent&, uvw::TCPHandle& tcp) {
// Authenticate with the RabbitMQ cluster.
connection = std::make_unique<AMQP::Connection>(this, AMQP::Login("guest", "guest"), "/");

// Start reading from the socket.
connectHandle->read();
});

connectHandle->on<uvw::DataEvent>([this](const uvw::DataEvent& event, const uvw::TCPHandle&) { connection->parse(event.data.get(), event.length); });

connectHandle->connect("localhost", 5672);
runner = std::jthread([this] { loop->run<uvw::Loop::Mode::DEFAULT>(); });

while (isInitalizing)
{
// TODO: Add if trace mode log to console
Sleep(1000);
}
if (!FLHookConfig::i()->messageQueue.enableQueues)
{
return;
}

Logger::i()->Log(LogLevel::Info, L"Attempting connection to RabbitMQ");

loop = uvw::Loop::getDefault();
connectHandle = loop->resource<uvw::TCPHandle>();

connectHandle->once<uvw::ErrorEvent>(
[](const uvw::ErrorEvent& event, uvw::TCPHandle&)
{
// Just die on error, the message director always needs a connection to RabbitMQ.
// TODO: Add proper error handling and reconnects in the event of connection loss
Logger::i()->Log(LogLevel::Err, std::format(L"Socket error: {}", StringUtils::stows(event.what())));
throw std::runtime_error("Unable to connect to socket");
});

connectHandle->once<uvw::ConnectEvent>(
[this](const uvw::ConnectEvent&, uvw::TCPHandle& tcp)
{
// Authenticate with the RabbitMQ cluster.
connection = std::make_unique<AMQP::Connection>(this, AMQP::Login("guest", "guest"), "/");

// Start reading from the socket.
connectHandle->read();
});

connectHandle->on<uvw::DataEvent>([this](const uvw::DataEvent& event, const uvw::TCPHandle&) { connection->parse(event.data.get(), event.length); });

connectHandle->connect("localhost", 5672);
runner = std::jthread([this] { loop->run<uvw::Loop::Mode::DEFAULT>(); });

while (isInitalizing)
{
// TODO: Add if trace mode log to console
Sleep(1000);
}
}

void MessageHandler::onData(AMQP::Connection* conn, const char* data, size_t size)
{
connectHandle->write((char*)data, size);
}
void MessageHandler::onData(AMQP::Connection* conn, const char* data, size_t size) { connectHandle->write((char*)data, size); }

void MessageHandler::onReady(AMQP::Connection* conn)
{
Logger::i()->Log(LogLevel::Info, L"Connected to RabbitMQ!");
isInitalizing = false;
Logger::i()->Log(LogLevel::Info, L"Connected to RabbitMQ!");
isInitalizing = false;

channel = std::make_unique<AMQP::Channel>(conn);
channel = std::make_unique<AMQP::Channel>(conn);
}

void MessageHandler::onError(AMQP::Connection* conn, const char* message)
{
isInitalizing = false;
Logger::i()->Log(LogLevel::Err, std::format(L"AMQP error: {}", StringUtils::stows(message)));
isInitalizing = false;
Logger::i()->Log(LogLevel::Err, std::format(L"AMQP error: {}", StringUtils::stows(message)));
}

void MessageHandler::onClosed(AMQP::Connection* conn)
{
std::cout << "closed" << std::endl;
}
void MessageHandler::onClosed(AMQP::Connection* conn) { std::cout << "closed" << std::endl; }

MessageHandler::~MessageHandler() = default;

void MessageHandler::Subscribe(const std::wstring& queue, QueueOnData callback, std::optional<QueueOnFail> onFail)
{
if (!FLHookConfig::i()->messageQueue.enableQueues)
{
return;
}

if (!onMessageCallbacks.contains(queue))
{
onMessageCallbacks[queue] = {callback};
if (onFail.has_value())
{
onFailCallbacks[queue] = {onFail.value()};
}
else
{
onFailCallbacks[queue] = {};
}

channel->consume(StringUtils::wstos(queue))
.onSuccess([queue]() {
Logger::i()->Log(LogLevel::Info, std::format(L"successfully subscribed to {}", queue));
})
.onReceived([this, queue](const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) {
const auto callbacks = onMessageCallbacks.find(queue);
for (const auto& cb : callbacks->second)
{
if (cb(message))
{
channel->ack(deliveryTag);
return;
}
}
})
.onError([this, queue](const char* msg) {
Logger::i()->Log(LogLevel::Warn, std::format(L"connection terminated with {} - {}", queue, StringUtils::stows(std::string(msg))));
const auto callbacks = onFailCallbacks.find(queue);
for (const auto& cb : callbacks->second)
{
cb(msg);
}
});

return;
}

onMessageCallbacks[queue].emplace_back(callback);
if (onFail.has_value())
{
onFailCallbacks[queue].emplace_back(onFail.value());
}
if (!FLHookConfig::i()->messageQueue.enableQueues)
{
return;
}

if (!onMessageCallbacks.contains(queue))
{
onMessageCallbacks[queue] = { callback };
if (onFail.has_value())
{
onFailCallbacks[queue] = { onFail.value() };
}
else
{
onFailCallbacks[queue] = {};
}

channel->consume(StringUtils::wstos(queue))
.onSuccess([queue]() { Logger::i()->Log(LogLevel::Info, std::format(L"successfully subscribed to {}", queue)); })
.onReceived(
[this, queue](const AMQP::Message& message, uint64_t deliveryTag, bool redelivered)
{
const auto callbacks = onMessageCallbacks.find(queue);
for (const auto& cb : callbacks->second)
{
std::optional<nlohmann::json> responseBody;
if (cb(message, responseBody))
{
if (message.headers().contains("reply_to"))
{
channel->publish("", message.headers().get("reply_to"), responseBody.has_value() ? responseBody.value().dump() : "{}");
}

channel->ack(deliveryTag);
return;
}
}

channel->reject(deliveryTag);
})
.onError(
[this, queue](const char* msg)
{
Logger::i()->Log(LogLevel::Warn, std::format(L"connection terminated with {} - {}", queue, StringUtils::stows(std::string(msg))));
const auto callbacks = onFailCallbacks.find(queue);
for (const auto& cb : callbacks->second)
{
cb(msg);
}
});

return;
}

onMessageCallbacks[queue].emplace_back(callback);
if (onFail.has_value())
{
onFailCallbacks[queue].emplace_back(onFail.value());
}
}

void MessageHandler::DeclareQueue(const std::wstring& queue, const int flags) const
{
channel->declareQueue(StringUtils::wstos(queue), flags);
}
void MessageHandler::DeclareQueue(const std::wstring& queue, const int flags) const { channel->declareQueue(StringUtils::wstos(queue), flags); }

void MessageHandler::DeclareExchange(const std::wstring& exchange, const AMQP::ExchangeType type, const int flags) const
{
channel->declareExchange(StringUtils::wstos(exchange), type, flags);
channel->declareExchange(StringUtils::wstos(exchange), type, flags);
}

void MessageHandler::Publish(const std::wstring& jsonData, const std::wstring& exchange, const std::wstring& queue) const
{
channel->publish(StringUtils::wstos(exchange), StringUtils::wstos(queue), StringUtils::wstos(jsonData));
channel->publish(StringUtils::wstos(exchange), StringUtils::wstos(queue), StringUtils::wstos(jsonData));
}

void MessageHandler::RemoteProcedureCall(nlohmann::json call)
{

}
Loading

0 comments on commit 8da63ed

Please sign in to comment.