Skip to content

Commit

Permalink
Merge pull request #481 from mtconnect/350_xpath_for_mqtt_service_2
Browse files Browse the repository at this point in the history
350 xpath for mqtt service 2
  • Loading branch information
wsobel authored Jul 30, 2024
2 parents ef4c359 + f4f9869 commit 5d1510c
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 14 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# The version number.
set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 3)
set(AGENT_VERSION_MINOR 4)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 16)
set(AGENT_VERSION_BUILD 1)
set(AGENT_VERSION_RC "")

# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent
Expand Down
46 changes: 39 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,10 @@ Configuration Parameters
understand the internal workings of the agent.

*Default*: 1000

* `CreateUniqueIds`: Changes all the ids in each element to a UUID that will be unique across devices. This is used for merging devices from multiple sources.

*Default*: `false`

* `Devices` - The XML file to load that specifies the devices and is
supplied as the result of a probe request. If the key is not found
Expand All @@ -604,23 +608,35 @@ Configuration Parameters
* `JsonVersion` - JSON Printer format. Old format: 1, new format: 2

*Default*: 2

* `LogStreams` - Debugging flag to log the streamed data to a file. Logs to a file named: `Stream_` + timestamp + `.log` in the current working directory. This is only for the Rest Sink.

* `SchemaVersion` - Change the schema version to a different version number.

*Default*: 2.0
*Default*: `false`

* `MaxAssets` - The maximum number of assets the agent can hold in its buffer. The
number is the actual count, not an exponent.

*Default*: 1024

* `MaxCachedFileSize` - The maximum size of a raw file to cache in memory.

* `MonitorConfigFiles` - Monitor agent.cfg and Devices.xml files and restart agent if they change.
*Default*: 20 kb

* `MinCompressFileSize` - The file size where we begin compressing raw files sent to the client.

*Default*: false
*Default*: 100 kb

* `MinimumConfigReloadAge` - The minimum age of a config file before an agent reload is triggered (seconds).

*Default*: 15
*Default*: 15 seconds

* `MonitorConfigFiles` - Monitor agent.cfg and Devices.xml files and restart agent if they change.

*Default*: false

* `MonitorInterval` - The interval between checks if the agent.cfg or Device.xml files have changed.

*Default*: 10 seconds

* `Pretty` - Pretty print the output with indententation

Expand All @@ -630,6 +646,14 @@ Configuration Parameters
process id of the daemon. This is not supported in Windows.

*Default*: agent.pid

* `SchemaVersion` - Change the schema version to a different version number.

*Default*: 2.0

* `Sender` - The value for the sender header attribute.

*Default*: Local machine name

* `ServiceName` - Changes the service name when installing or removing
the service. This allows multiple agents to run as services on the same machine.
Expand All @@ -638,7 +662,11 @@ Configuration Parameters

* `SuppressIPAddress` - Suppress the Adapter IP Address and port when creating the Agent Device ids and names. This applies to all adapters.

*Default*: false
*Default*: `false`

* `VersionDeviceXml` - Create a new versioned file every time the Device.xml file changes from an external source.

*Default*: `false`

* `WorkerThreads` - The number of operating system threads dedicated to the Agent

Expand Down Expand Up @@ -867,6 +895,10 @@ Sinks {
* `MqttQOS`: - For the MQTT Sinks, sets the Quality of Service. Must be one of `at_least_once`, `at_most_once`, `exactly_once`.

*Default*: `at_least_once`

* `MqttXPath`: - The xpath filter to apply to all current and samples published to MQTT. If the XPath is invalid, it will fall back to publishing all data items.

*Default*: All data items

### Adapter Configuration Items ###

Expand Down
1 change: 1 addition & 0 deletions src/mtconnect/configuration/config_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ namespace mtconnect {
DECLARE_CONFIGURATION(MqttPassword);
DECLARE_CONFIGURATION(MqttMaxTopicDepth);
DECLARE_CONFIGURATION(MqttLastWillTopic);
DECLARE_CONFIGURATION(MqttXPath);
///@}

/// @name Adapter Configuration
Expand Down
3 changes: 2 additions & 1 deletion src/mtconnect/sink/mqtt_sink/mqtt2_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ namespace mtconnect {
{configuration::MqttUserName, string()},
{configuration::MqttPassword, string()},
{configuration::MqttPort, int()},
{configuration::MqttXPath, string()},
{configuration::MqttRetain, bool()},
{configuration::MqttQOS, string()},
{configuration::MqttHost, string()}});
Expand Down Expand Up @@ -229,7 +230,7 @@ namespace mtconnect {
auto seq = publishCurrent(boost::system::error_code {});
for (auto &dev : m_sinkContract->getDevices())
{
FilterSet filterSet = filterForDevice(dev);
FilterSet filterSet { filterForDevice(dev) };
auto sampler =
make_shared<AsyncSample>(m_strand, m_sinkContract->getCircularBuffer(),
std::move(filterSet), m_sampleInterval, 600s, m_client, dev);
Expand Down
35 changes: 31 additions & 4 deletions src/mtconnect/sink/mqtt_sink/mqtt2_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,35 @@ namespace mtconnect {
auto pos = m_filters.emplace(*(device->getUuid()), FilterSet());
filter = pos.first;
auto &set = filter->second;
for (const auto &wdi : device->getDeviceDataItems())

auto xpath = GetOption<string>(m_options, configuration::MqttXPath);
if (xpath)
{
try
{
m_sinkContract->getDataItemsForPath(device, xpath, set, nullopt);
}
catch (exception &e)
{
LOG(warning) << "MqttService: Invalid xpath '" << *xpath <<
"', defaulting to all data items";
}

if (set.empty())
{
LOG(warning) << "MqttService: Invalid xpath '" << *xpath <<
"', defaulting to all data items";
}
}

if (set.empty())
{
const auto di = wdi.lock();
if (di)
set.insert(di->getId());
for (const auto &wdi : device->getDeviceDataItems())
{
const auto di = wdi.lock();
if (di)
set.insert(di->getId());
}
}
}
return filter->second;
Expand Down Expand Up @@ -208,6 +232,9 @@ namespace mtconnect {

bool m_retain {true};
MqttClient::QOS m_qos {MqttClient::QOS::at_least_once};

// For XPath

};
} // namespace mqtt_sink
} // namespace sink
Expand Down

0 comments on commit 5d1510c

Please sign in to comment.