Skip to content

Commit

Permalink
Merge pull request #451 from mtconnect/431_websockets_subscriptions
Browse files Browse the repository at this point in the history
431 websockets subscriptions
  • Loading branch information
wsobel authored May 3, 2024
2 parents 45e37cb + ca11039 commit eb94b8f
Show file tree
Hide file tree
Showing 42 changed files with 1,913 additions and 1,697 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

MTConnect C++ Agent Version 2.3
MTConnect C++ Agent Version 2.5
--------
[![Build MTConnect C++ Agent](https://github.com/mtconnect/cppagent/actions/workflows/build.yml/badge.svg)](https://github.com/mtconnect/cppagent/actions/workflows/build.yml)

Expand All @@ -13,6 +13,10 @@ the devices and the location of the adapter.

Pre-built binary releases for Windows are available from [Releases](https://github.com/mtconnect/cppagent/releases) for those who do not want to build the agent themselves. For *NIX users, you will need libxml2, cppunit, and cmake as well as build essentials.

Version 2.5.0 Added validation of observations in the stream

Version 2.4.0 Added support for version 2.4

Version 2.3.0 Support for all Version 2.3 standard changes and JSON ingress to MQTT adapter.

Version 2.2.0 Support for all Version 2.2 standard changes and dynamic configuration from adapters. Upgrade to conan 2.
Expand Down Expand Up @@ -639,11 +643,17 @@ Configuration Parameters
* `SuppressIPAddress` - Suppress the Adapter IP Address and port when creating the Agent Device ids and names. This applies to all adapters.

*Default*: false

* `Validation` - Turns on validation of model components and observations

*Default*: false

* `WorkerThreads` - The number of operating system threads dedicated to the Agent

*Default*: 1



#### Adapter General Configuration

These can be overridden on a per-adapter basis
Expand Down
8 changes: 3 additions & 5 deletions agent_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,11 @@ set(AGENT_SOURCES

# src/sink/mqtt_sink HEADER_FILE_ONLY

"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.hpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt2_service.hpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.hpp"

#src/sink/mqtt_sink SOURCE_FILES_ONLY

"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt2_service.cpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"

# src/sink/rest_sink HEADER_FILE_ONLY

Expand All @@ -267,6 +265,7 @@ set(AGENT_SOURCES
"${SOURCE_DIR}/sink/rest_sink/session.hpp"
"${SOURCE_DIR}/sink/rest_sink/session_impl.hpp"
"${SOURCE_DIR}/sink/rest_sink/tls_dector.hpp"
"${SOURCE_DIR}/sink/rest_sink/websocket_session.hpp"

# src/sink/rest_sink SOURCE_FILES_ONLY

Expand Down Expand Up @@ -335,7 +334,6 @@ if(MSVC)
# The modules including Beast required the /bigobj option in Windows
set_property(SOURCE
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt2_service.cpp"
"${SOURCE_DIR}/sink/rest_sink/session_impl.cpp"
"${SOURCE_DIR}/source/adapter/mqtt/mqtt_adapter.cpp"
"${SOURCE_DIR}/source/adapter/agent_adapter/agent_adapter.cpp"
Expand Down
2 changes: 1 addition & 1 deletion conan/mqtt_cpp/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class MqttcppConan(ConanFile):
url = "https://github.com/redboltz/mqtt_cpp"
description = "MQTT client/server for C++14 based on Boost.Asio"
topics = ("mqtt")
requires = ["boost/1.82.0"]
requires = ["boost/1.84.0"]
no_copy_source = True
exports_sources = "include/*"

Expand Down
2 changes: 2 additions & 0 deletions conan/profiles/vs32
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ compiler.runtime=static
compiler.runtime_type=Release
build_type=Release

[options]
winver=0x0600
3 changes: 3 additions & 0 deletions conan/profiles/vs32debug
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ arch=x86
compiler.runtime=static
compiler.runtime_type=Debug
build_type=Debug

[options]
winver=0x0600
1 change: 1 addition & 0 deletions conan/profiles/vs32shared
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ build_type=Release

[options]
shared=True
winver=0x0600
8 changes: 6 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MTConnectAgentConan(ConanFile):
"with_ruby": True,
"development": False,
"shared": False,
"winver": "0x600",
"winver": "0x0602",
"with_docs": False,
"cpack": False,
"agent_prefix": None,
Expand Down Expand Up @@ -118,7 +118,7 @@ def build_requirements(self):
self.tool_requires_version("doxygen", [1, 9, 4])

def requirements(self):
self.requires("boost/1.82.0", headers=True, libs=True, transitive_headers=True, transitive_libs=True)
self.requires("boost/1.84.0", headers=True, libs=True, transitive_headers=True, transitive_libs=True)
self.requires("libxml2/2.10.3", headers=True, libs=True, visible=True, transitive_headers=True, transitive_libs=True)
self.requires("date/2.4.1", headers=True, libs=True, transitive_headers=True, transitive_libs=True)
self.requires("nlohmann_json/3.9.1", headers=True, libs=False, transitive_headers=True, transitive_libs=False)
Expand All @@ -139,6 +139,9 @@ def configure(self):
if self.options.shared:
self.options["boost/*"].shared = True
self.package_type = "shared-library"

if is_msvc(self):
self.options["boost/*"].extra_b2_flags = ("define=BOOST_USE_WINAPI_VERSION=" + str(self.options.winver))

# Make sure shared builds use shared boost
if is_msvc(self) and self.options.shared:
Expand Down Expand Up @@ -227,6 +230,7 @@ def package_info(self):
winver=str(self.options.winver)
self.cpp_info.defines.append("WINVER=" + winver)
self.cpp_info.defines.append("_WIN32_WINNT=" + winver)
self.cpp_info.defines.append("BOOST_USE_WINAPI_VERSION=" + winver)

def package(self):
cmake = CMake(self)
Expand Down
2 changes: 1 addition & 1 deletion docker/ubuntu/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ RUN apt-get update \
rake \
ruby \
&& rm -rf /var/lib/apt/lists/* \
&& pip install conan -v 'conan==2.0.9'
&& pip install conan

# make an agent directory and cd into it
WORKDIR /root/agent
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ namespace mtconnect {
{
if (item.expired())
continue;

auto di = item.lock();
if (di->hasInitialValue())
{
m_loopback->receive(di, *di->getInitialValue());
}
}
}

std::lock_guard<buffer::CircularBuffer> lock(m_circularBuffer);
if (m_circularBuffer.addToBuffer(observation) != 0)
{
Expand Down
2 changes: 0 additions & 2 deletions src/mtconnect/configuration/agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
#include "mtconnect/configuration/config_options.hpp"
#include "mtconnect/device_model/device.hpp"
#include "mtconnect/printer/xml_printer.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt2_service.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp"
#include "mtconnect/sink/rest_sink/rest_service.hpp"
#include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp"
Expand Down Expand Up @@ -113,7 +112,6 @@ namespace mtconnect::configuration {
bool success = false;

sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory);
sink::mqtt_sink::Mqtt2Service::registerFactory(m_sinkFactory);
sink::rest_sink::RestService::registerFactory(m_sinkFactory);
adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory);
adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory);
Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/device_model/data_item/data_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ namespace mtconnect {
}
}
}

if (const auto &init = maybeGet<string>("InitialValue"); init)
{
m_initialValue = *init;
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/device_model/data_item/data_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ namespace mtconnect {
/// @brief get the topic name leaf node for this data item
/// @return the topic name
const auto &getTopicName() const { return m_topicName; }

/// @brief get the initial value if one is set
/// @return optional initial value
const auto &getInitialValue() const { return m_initialValue; }

Category getCategory() const { return m_category; }
Representation getRepresentation() const { return m_representation; }
SpecialClass getSpecialClass() const { return m_specialClass; }
Expand Down
7 changes: 3 additions & 4 deletions src/mtconnect/mqtt/mqtt_client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ namespace mtconnect {
m_connected = false;
if (m_handler && m_handler->m_disconnected)
m_handler->m_disconnected(shared_from_this());
m_handler->m_disconnected(shared_from_this());
if (m_running)
{
reconnect();
Expand Down Expand Up @@ -419,7 +418,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttTcpClient>(shared_from_this());
}

/// @brief Get the Mqtt TCP Client
/// @return pointer to the Mqtt TCP Client
auto &getClient()
Expand Down Expand Up @@ -501,7 +500,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttTlsWSClient>(shared_from_this());
}

/// @brief Get the Mqtt TLS WebSocket Client
/// @return pointer to the Mqtt TLS WebSocket Client
auto &getClient()
Expand Down Expand Up @@ -540,7 +539,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttWSClient>(shared_from_this());
}

/// @brief Get the Mqtt TLS WebSocket Client
/// @return pointer to the Mqtt TLS WebSocket Client
auto &getClient()
Expand Down
20 changes: 18 additions & 2 deletions src/mtconnect/observation/change_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,25 @@ using namespace std;
namespace mtconnect::observation {
ChangeObserver::~ChangeObserver()
{
std::lock_guard<std::recursive_mutex> scopedLock(m_mutex);
clear();
}

void ChangeObserver::clear()
{
std::unique_lock<std::recursive_mutex> lock(m_mutex);
m_timer.cancel();
m_handler.clear();
for (const auto signaler : m_signalers)
signaler->removeObserver(this);
m_signalers.clear();
}

void ChangeObserver::addSignaler(ChangeSignaler *sig) { m_signalers.emplace_back(sig); }

bool ChangeObserver::removeSignaler(ChangeSignaler *sig)
{
std::lock_guard<std::recursive_mutex> scopedLock(m_mutex);
auto newEndPos = std::remove(m_signalers.begin(), m_signalers.end(), sig);
if (newEndPos == m_signalers.end())
return false;
Expand All @@ -47,7 +58,8 @@ namespace mtconnect::observation {

void ChangeObserver::handler(boost::system::error_code ec)
{
boost::asio::post(m_strand, boost::bind(m_handler, ec));
if (m_handler)
boost::asio::post(m_strand, boost::bind(m_handler, ec));
}

// Signaler Management
Expand Down Expand Up @@ -99,7 +111,7 @@ namespace mtconnect::observation {
buffer::CircularBuffer &buffer, FilterSet &&filter,
std::chrono::milliseconds interval,
std::chrono::milliseconds heartbeat)
: m_interval(interval),
: AsyncResponse(interval),
m_heartbeat(heartbeat),
m_last(std::chrono::system_clock::now()),
m_filter(std::move(filter)),
Expand Down Expand Up @@ -145,9 +157,12 @@ namespace mtconnect::observation {

void AsyncObserver::handlerCompleted()
{
NAMED_SCOPE("AsyncObserver::handlerCompleted");

m_last = std::chrono::system_clock::now();
if (m_endOfBuffer)
{
LOG(trace) << "End of buffer";
using std::placeholders::_1;
m_observer.waitForSignal(m_heartbeat);
}
Expand All @@ -159,6 +174,7 @@ namespace mtconnect::observation {

void AsyncObserver::handleSignal(boost::system::error_code ec)
{
NAMED_SCOPE("AsyncObserver::handleSignal");
using namespace buffer;

using std::placeholders::_1;
Expand Down
45 changes: 37 additions & 8 deletions src/mtconnect/observation/change_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ namespace mtconnect::observation {
auto try_lock() { return m_mutex.try_lock(); }
///@}

/// @brief clear the observer information.
void clear();

private:
boost::asio::io_context::strand &m_strand;
mutable std::recursive_mutex m_mutex;
Expand Down Expand Up @@ -185,6 +188,32 @@ namespace mtconnect::observation {
std::list<ChangeObserver *> m_observers;
};

/// @brief Abstract class for things asynchronouos timers
class AGENT_LIB_API AsyncResponse : public std::enable_shared_from_this<AsyncResponse>
{
public:
AsyncResponse(std::chrono::milliseconds interval) : m_interval(interval) {}

virtual bool cancel() = 0;

/// @brief method to determine if the sink is running
virtual bool isRunning() = 0;

/// @brief get the request id for webservices
const auto &getRequestId() const { return m_requestId; }

/// @brief sets the optonal request id for webservices.
void setRequestId(const std::optional<std::string> &id) { m_requestId = id; }

/// @brief Get the interval
const auto &getInterval() const { return m_interval; }

protected:
std::chrono::milliseconds m_interval {
0}; //! the minimum amout of time to wait before calling the handler
std::optional<std::string> m_requestId; //! request id
};

/// @brief Asyncronous change context for waiting for changes
///
/// This class must be subclassed and provide a fail and isRunning method.
Expand All @@ -196,7 +225,7 @@ namespace mtconnect::observation {
///
/// The handler and sequence numbers are handled inside the circular buffer lock to prevent race
/// conditions with incoming data.
class AGENT_LIB_API AsyncObserver : public std::enable_shared_from_this<AsyncObserver>
class AGENT_LIB_API AsyncObserver : public AsyncResponse
{
public:
/// @Brief callback when observations are ready
Expand All @@ -217,7 +246,7 @@ namespace mtconnect::observation {
virtual ~AsyncObserver() = default;

/// @brief Get a shared pointed
auto getptr() const { return const_cast<AsyncObserver *>(this)->shared_from_this(); }
auto getptr() { return std::dynamic_pointer_cast<AsyncObserver>(shared_from_this()); }

/// @brief sets up the `ChangeObserver` using the filter and initializes the references to the
/// buffer
Expand All @@ -235,8 +264,12 @@ namespace mtconnect::observation {
/// @brief abstract call to failure handler
virtual void fail(boost::beast::http::status status, const std::string &message) = 0;

/// @brief method to determine if the sink is running
virtual bool isRunning() = 0;
/// @brief Stop all timers and release resources.
bool cancel() override
{
m_observer.clear();
return true;
}

/// @brief handler callback when an action needs to be taken
///
Expand All @@ -250,9 +283,7 @@ namespace mtconnect::observation {
auto getSequence() const { return m_sequence; }
auto isEndOfBuffer() const { return m_endOfBuffer; }
const auto &getFilter() const { return m_filter; }

///@}
///

mutable bool m_endOfBuffer {false}; //! Public indicator that we are at the end of the buffer

Expand All @@ -266,8 +297,6 @@ namespace mtconnect::observation {

protected:
SequenceNumber_t m_sequence {0}; //! the current sequence number
std::chrono::milliseconds m_interval {
0}; //! the minimum amout of time to wait before calling the handler
std::chrono::milliseconds m_heartbeat {
0}; //! the maximum amount of time to wait before sending a heartbeat
std::chrono::system_clock::time_point m_last; //! the last time the handler completed
Expand Down
Loading

0 comments on commit eb94b8f

Please sign in to comment.