Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

431 websockets subscriptions #451

Merged
merged 41 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
922e010
Started websocket support
wsobel Apr 2, 2024
27ec271
use new uuid to check for device with * uuid shdr command
wsobel Apr 6, 2024
b5ff775
Version 2.3.0.6
wsobel Apr 6, 2024
4dd0602
upgraded to boost 1.84
wsobel Apr 6, 2024
e6f9692
Merge pull request #437 from mtconnect/419_uuid_unavailable_when_changed
wsobel Apr 6, 2024
bca1706
checkpoint with initial parsing of json
wsobel Apr 8, 2024
e999962
Merge branch 'main-dev' into 431_websockets_subscriptions
wsobel Apr 9, 2024
7003e19
First steps to adding routing support to websocket request.
wsobel Apr 11, 2024
c85b3b3
Fixed port in url for MQTT service.
wsobel Apr 11, 2024
0dd7ed9
fixed crash when closing mqtt client. set close handler to nullptr if…
wsobel Apr 12, 2024
fb69dca
Merge pull request #439 from mtconnect/438_mqtt_ingress_issues_on_cur…
wsobel Apr 12, 2024
49fe506
Merge branch 'main' into 431_websockets_subscriptions
wsobel Apr 12, 2024
211f691
Changed from command to request
wsobel Apr 12, 2024
e6b4f15
working to the point of response
wsobel Apr 16, 2024
f7802f7
checkpoint with write response
wsobel Apr 16, 2024
51c57b9
Handled threading issues and send queue
wsobel Apr 19, 2024
558d625
fixed expiry timeout
wsobel Apr 19, 2024
70f1ec1
Formatted new code
wsobel Apr 19, 2024
8877c03
Working version with error handling
wsobel Apr 19, 2024
20ce83a
Added request id to the printers for XML
wsobel Apr 19, 2024
ac8dbeb
Added format to all the requests
wsobel Apr 19, 2024
64fa6a2
Fixed session leak with observer circular ref in handler.
wsobel Apr 24, 2024
3bd86f4
Added code to session impl as well.
wsobel Apr 24, 2024
6201b2c
Fixed test issues
wsobel Apr 24, 2024
bbad3dc
fixed bug in dispatch
wsobel Apr 24, 2024
a89c4c7
refactored mqtt sinks and tests. Added cancel to websockets
wsobel Apr 25, 2024
4e8f8fd
fixed cancel
wsobel Apr 25, 2024
350a031
refactored async observer and created abstract async response to canc…
wsobel Apr 25, 2024
2e324a0
Fixed race in change observer destructor.
wsobel Apr 25, 2024
8b24f2e
fixed requests in websockets
wsobel Apr 27, 2024
7d27744
Merge branch 'main-dev' into 431_websockets_subscriptions
wsobel Apr 29, 2024
1ee15c1
merged main-dev
wsobel Apr 29, 2024
a6f5f8f
Removed extra character
wsobel Apr 29, 2024
3a9d54a
Merge branch 'main-dev' into 431_websockets_subscriptions
wsobel Apr 29, 2024
389aa9f
Added initial websocket test
wsobel May 2, 2024
dfdf1d9
cast to parameter value
wsobel May 3, 2024
e80975b
windows port of websockets
wsobel May 3, 2024
eb350cb
removed named scope from websocket session
wsobel May 3, 2024
8c92d73
Added BOOST_USE_WINAPI_VERSION for boost 1.84
wsobel May 3, 2024
b23b28b
added boost windows api version for windows 1.84 porting
wsobel May 3, 2024
ca11039
removed legacy mqtt service
wsobel May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

MTConnect C++ Agent Version 2.3
MTConnect C++ Agent Version 2.5
--------
[![Build MTConnect C++ Agent](https://github.com/mtconnect/cppagent/actions/workflows/build.yml/badge.svg)](https://github.com/mtconnect/cppagent/actions/workflows/build.yml)

Expand All @@ -13,6 +13,10 @@ the devices and the location of the adapter.

Pre-built binary releases for Windows are available from [Releases](https://github.com/mtconnect/cppagent/releases) for those who do not want to build the agent themselves. For *NIX users, you will need libxml2, cppunit, and cmake as well as build essentials.

Version 2.5.0 Added validation of observations in the stream

Version 2.4.0 Added support for version 2.4

Version 2.3.0 Support for all Version 2.3 standard changes and JSON ingress to MQTT adapter.

Version 2.2.0 Support for all Version 2.2 standard changes and dynamic configuration from adapters. Upgrade to conan 2.
Expand Down Expand Up @@ -639,11 +643,17 @@ Configuration Parameters
* `SuppressIPAddress` - Suppress the Adapter IP Address and port when creating the Agent Device ids and names. This applies to all adapters.

*Default*: false

* `Validation` - Turns on validation of model components and observations

*Default*: false

* `WorkerThreads` - The number of operating system threads dedicated to the Agent

*Default*: 1



#### Adapter General Configuration

These can be overridden on a per-adapter basis
Expand Down
9 changes: 5 additions & 4 deletions agent_lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ set(AGENT_SOURCES

# src/sink/mqtt_sink HEADER_FILE_ONLY

"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.hpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt2_service.hpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_legacy_service.hpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.hpp"

#src/sink/mqtt_sink SOURCE_FILES_ONLY

"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt2_service.cpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_legacy_service.cpp"
"${SOURCE_DIR}/sink/mqtt_sink/mqtt_service.cpp"

# src/sink/rest_sink HEADER_FILE_ONLY

Expand All @@ -267,6 +267,7 @@ set(AGENT_SOURCES
"${SOURCE_DIR}/sink/rest_sink/session.hpp"
"${SOURCE_DIR}/sink/rest_sink/session_impl.hpp"
"${SOURCE_DIR}/sink/rest_sink/tls_dector.hpp"
"${SOURCE_DIR}/sink/rest_sink/websocket_session.hpp"

# src/sink/rest_sink SOURCE_FILES_ONLY

Expand Down
2 changes: 1 addition & 1 deletion conan/mqtt_cpp/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class MqttcppConan(ConanFile):
url = "https://github.com/redboltz/mqtt_cpp"
description = "MQTT client/server for C++14 based on Boost.Asio"
topics = ("mqtt")
requires = ["boost/1.82.0"]
requires = ["boost/1.84.0"]
no_copy_source = True
exports_sources = "include/*"

Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def build_requirements(self):
self.tool_requires_version("doxygen", [1, 9, 4])

def requirements(self):
self.requires("boost/1.82.0", headers=True, libs=True, transitive_headers=True, transitive_libs=True)
self.requires("boost/1.84.0", headers=True, libs=True, transitive_headers=True, transitive_libs=True)
self.requires("libxml2/2.10.3", headers=True, libs=True, visible=True, transitive_headers=True, transitive_libs=True)
self.requires("date/2.4.1", headers=True, libs=True, transitive_headers=True, transitive_libs=True)
self.requires("nlohmann_json/3.9.1", headers=True, libs=False, transitive_headers=True, transitive_libs=False)
Expand Down
2 changes: 1 addition & 1 deletion docker/ubuntu/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ RUN apt-get update \
rake \
ruby \
&& rm -rf /var/lib/apt/lists/* \
&& pip install conan -v 'conan==2.0.9'
&& pip install conan

# make an agent directory and cd into it
WORKDIR /root/agent
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ namespace mtconnect {
{
if (item.expired())
continue;

auto di = item.lock();
if (di->hasInitialValue())
{
m_loopback->receive(di, *di->getInitialValue());
}
}
}

std::lock_guard<buffer::CircularBuffer> lock(m_circularBuffer);
if (m_circularBuffer.addToBuffer(observation) != 0)
{
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/configuration/agent_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
#include "mtconnect/configuration/config_options.hpp"
#include "mtconnect/device_model/device.hpp"
#include "mtconnect/printer/xml_printer.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt2_service.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt_legacy_service.hpp"
#include "mtconnect/sink/mqtt_sink/mqtt_service.hpp"
#include "mtconnect/sink/rest_sink/rest_service.hpp"
#include "mtconnect/source/adapter/agent_adapter/agent_adapter.hpp"
Expand Down Expand Up @@ -112,8 +112,8 @@ namespace mtconnect::configuration {

bool success = false;

sink::mqtt_sink::MqttLegacyService::registerFactory(m_sinkFactory);
sink::mqtt_sink::MqttService::registerFactory(m_sinkFactory);
sink::mqtt_sink::Mqtt2Service::registerFactory(m_sinkFactory);
sink::rest_sink::RestService::registerFactory(m_sinkFactory);
adapter::shdr::ShdrAdapter::registerFactory(m_sourceFactory);
adapter::mqtt_adapter::MqttAdapter::registerFactory(m_sourceFactory);
Expand Down
2 changes: 1 addition & 1 deletion src/mtconnect/device_model/data_item/data_item.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ namespace mtconnect {
}
}
}

if (const auto &init = maybeGet<string>("InitialValue"); init)
{
m_initialValue = *init;
Expand Down
4 changes: 2 additions & 2 deletions src/mtconnect/device_model/data_item/data_item.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ namespace mtconnect {
/// @brief get the topic name leaf node for this data item
/// @return the topic name
const auto &getTopicName() const { return m_topicName; }

/// @brief get the initial value if one is set
/// @return optional initial value
const auto &getInitialValue() const { return m_initialValue; }

Category getCategory() const { return m_category; }
Representation getRepresentation() const { return m_representation; }
SpecialClass getSpecialClass() const { return m_specialClass; }
Expand Down
7 changes: 3 additions & 4 deletions src/mtconnect/mqtt/mqtt_client_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ namespace mtconnect {
m_connected = false;
if (m_handler && m_handler->m_disconnected)
m_handler->m_disconnected(shared_from_this());
m_handler->m_disconnected(shared_from_this());
if (m_running)
{
reconnect();
Expand Down Expand Up @@ -419,7 +418,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttTcpClient>(shared_from_this());
}

/// @brief Get the Mqtt TCP Client
/// @return pointer to the Mqtt TCP Client
auto &getClient()
Expand Down Expand Up @@ -501,7 +500,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttTlsWSClient>(shared_from_this());
}

/// @brief Get the Mqtt TLS WebSocket Client
/// @return pointer to the Mqtt TLS WebSocket Client
auto &getClient()
Expand Down Expand Up @@ -540,7 +539,7 @@ namespace mtconnect {
{
return static_pointer_cast<MqttWSClient>(shared_from_this());
}

/// @brief Get the Mqtt TLS WebSocket Client
/// @return pointer to the Mqtt TLS WebSocket Client
auto &getClient()
Expand Down
20 changes: 18 additions & 2 deletions src/mtconnect/observation/change_observer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,25 @@ using namespace std;
namespace mtconnect::observation {
ChangeObserver::~ChangeObserver()
{
std::lock_guard<std::recursive_mutex> scopedLock(m_mutex);
clear();
}

void ChangeObserver::clear()
{
std::unique_lock<std::recursive_mutex> lock(m_mutex);
m_timer.cancel();
m_handler.clear();
for (const auto signaler : m_signalers)
signaler->removeObserver(this);
m_signalers.clear();
}

void ChangeObserver::addSignaler(ChangeSignaler *sig) { m_signalers.emplace_back(sig); }

bool ChangeObserver::removeSignaler(ChangeSignaler *sig)
{
std::lock_guard<std::recursive_mutex> scopedLock(m_mutex);
auto newEndPos = std::remove(m_signalers.begin(), m_signalers.end(), sig);
if (newEndPos == m_signalers.end())
return false;
Expand All @@ -47,7 +58,8 @@ namespace mtconnect::observation {

void ChangeObserver::handler(boost::system::error_code ec)
{
boost::asio::post(m_strand, boost::bind(m_handler, ec));
if (m_handler)
boost::asio::post(m_strand, boost::bind(m_handler, ec));
}

// Signaler Management
Expand Down Expand Up @@ -99,7 +111,7 @@ namespace mtconnect::observation {
buffer::CircularBuffer &buffer, FilterSet &&filter,
std::chrono::milliseconds interval,
std::chrono::milliseconds heartbeat)
: m_interval(interval),
: AsyncResponse(interval),
m_heartbeat(heartbeat),
m_last(std::chrono::system_clock::now()),
m_filter(std::move(filter)),
Expand Down Expand Up @@ -145,9 +157,12 @@ namespace mtconnect::observation {

void AsyncObserver::handlerCompleted()
{
NAMED_SCOPE("AsyncObserver::handlerCompleted");

m_last = std::chrono::system_clock::now();
if (m_endOfBuffer)
{
LOG(trace) << "End of buffer";
using std::placeholders::_1;
m_observer.waitForSignal(m_heartbeat);
}
Expand All @@ -159,6 +174,7 @@ namespace mtconnect::observation {

void AsyncObserver::handleSignal(boost::system::error_code ec)
{
NAMED_SCOPE("AsyncObserver::handleSignal");
using namespace buffer;

using std::placeholders::_1;
Expand Down
45 changes: 37 additions & 8 deletions src/mtconnect/observation/change_observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ namespace mtconnect::observation {
auto try_lock() { return m_mutex.try_lock(); }
///@}

/// @brief clear the observer information.
void clear();

private:
boost::asio::io_context::strand &m_strand;
mutable std::recursive_mutex m_mutex;
Expand Down Expand Up @@ -185,6 +188,32 @@ namespace mtconnect::observation {
std::list<ChangeObserver *> m_observers;
};

/// @brief Abstract class for things asynchronouos timers
class AGENT_LIB_API AsyncResponse : public std::enable_shared_from_this<AsyncResponse>
{
public:
AsyncResponse(std::chrono::milliseconds interval) : m_interval(interval) {}

virtual bool cancel() = 0;

/// @brief method to determine if the sink is running
virtual bool isRunning() = 0;

/// @brief get the request id for webservices
const auto &getRequestId() const { return m_requestId; }

/// @brief sets the optonal request id for webservices.
void setRequestId(const std::optional<std::string> &id) { m_requestId = id; }

/// @brief Get the interval
const auto &getInterval() const { return m_interval; }

protected:
std::chrono::milliseconds m_interval {
0}; //! the minimum amout of time to wait before calling the handler
std::optional<std::string> m_requestId; //! request id
};

/// @brief Asyncronous change context for waiting for changes
///
/// This class must be subclassed and provide a fail and isRunning method.
Expand All @@ -196,7 +225,7 @@ namespace mtconnect::observation {
///
/// The handler and sequence numbers are handled inside the circular buffer lock to prevent race
/// conditions with incoming data.
class AGENT_LIB_API AsyncObserver : public std::enable_shared_from_this<AsyncObserver>
class AGENT_LIB_API AsyncObserver : public AsyncResponse
{
public:
/// @Brief callback when observations are ready
Expand All @@ -217,7 +246,7 @@ namespace mtconnect::observation {
virtual ~AsyncObserver() = default;

/// @brief Get a shared pointed
auto getptr() const { return const_cast<AsyncObserver *>(this)->shared_from_this(); }
auto getptr() { return std::dynamic_pointer_cast<AsyncObserver>(shared_from_this()); }

/// @brief sets up the `ChangeObserver` using the filter and initializes the references to the
/// buffer
Expand All @@ -235,8 +264,12 @@ namespace mtconnect::observation {
/// @brief abstract call to failure handler
virtual void fail(boost::beast::http::status status, const std::string &message) = 0;

/// @brief method to determine if the sink is running
virtual bool isRunning() = 0;
/// @brief Stop all timers and release resources.
bool cancel() override
{
m_observer.clear();
return true;
}

/// @brief handler callback when an action needs to be taken
///
Expand All @@ -250,9 +283,7 @@ namespace mtconnect::observation {
auto getSequence() const { return m_sequence; }
auto isEndOfBuffer() const { return m_endOfBuffer; }
const auto &getFilter() const { return m_filter; }

///@}
///

mutable bool m_endOfBuffer {false}; //! Public indicator that we are at the end of the buffer

Expand All @@ -266,8 +297,6 @@ namespace mtconnect::observation {

protected:
SequenceNumber_t m_sequence {0}; //! the current sequence number
std::chrono::milliseconds m_interval {
0}; //! the minimum amout of time to wait before calling the handler
std::chrono::milliseconds m_heartbeat {
0}; //! the maximum amount of time to wait before sending a heartbeat
std::chrono::system_clock::time_point m_last; //! the last time the handler completed
Expand Down
Loading
Loading