Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Working subscription #240

Merged
merged 10 commits into from
Nov 15, 2024
2 changes: 1 addition & 1 deletion cmake/DependenciesSHAs.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set(GIT_SHA_OPENCMW_CPP bb8996babab2000a4ae3612ea146a551a96e59c4 CACHE STRING ""

set(GIT_SHA_UT v2.0.1 CACHE STRING "" FORCE) # latest version as of 2023-12-19

set(GIT_SHA_GR_DIGITIZERS b5df25c9552b473b32c47b80cca3b6379b79a6da CACHE STRING "" FORCE) # dev-prototype as of 2024-10-18
set(GIT_SHA_GR_DIGITIZERS 749394c12285887eb8840ac4cd0c46f1d21b46b4 CACHE STRING "" FORCE) # dev-prototype as of 2024-11-06
1 change: 1 addition & 0 deletions src/acquisition/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_library(od_acquisition INTERFACE)

target_link_libraries(od_acquisition INTERFACE gnuradio-core)
target_include_directories(od_acquisition INTERFACE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
$<INSTALL_INTERFACE:include/>)

Expand Down
4 changes: 4 additions & 0 deletions src/acquisition/daq_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string>
#include <vector>

#include <MIME.hpp>
#include <MultiArray.hpp>
#include <opencmw.hpp>

Expand Down Expand Up @@ -155,6 +156,9 @@ struct Acquisition {
Annotated<float, opencmw::NoUnit, "minimum expected value for channel/signal"> channelRangeMin;
Annotated<float, opencmw::NoUnit, "maximum expected value for channel/signal"> channelRangeMax;
Annotated<float, opencmw::NoUnit, "temperature of the measurement device"> temperature;
Annotated<std::vector<int64_t>, opencmw::NoUnit, "indices of trigger tags"> triggerIndices;
Annotated<std::vector<std::string>, opencmw::NoUnit, "event names of trigger tags"> triggerEventNames;
Annotated<std::vector<int64_t>, si::time<nanosecond>, "timestamps of trigger tags"> triggerTimestamps;
wirew0rm marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
4 changes: 1 addition & 3 deletions src/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,8 @@ target_link_libraries(
services
client
project_options
fair-picoscope
assets::rest
gr-testing
gr-basic
digitizer_settings)
if(NOT EMSCRIPTEN)
target_link_libraries(opendigitizer PRIVATE fair-picoscope)
endif()
9 changes: 9 additions & 0 deletions src/service/gnuradio/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ target_link_libraries(
project_options)

add_subdirectory(test)

add_executable(cli-signal-subscribe cli-signal-subscribe.cpp)
target_link_libraries(
cli-signal-subscribe
PRIVATE core
client
od_acquisition
fmt::fmt
project_options)
24 changes: 22 additions & 2 deletions src/service/gnuradio/GnuRadioAcquisitionWorker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
}
Acquisition reply;

auto processData = [&reply, signalName, &pollerEntry](std::span<const double> data, std::span<const gr::Tag> tags) {
auto key = pollerIt->first;
auto processData = [&reply, signalName, &pollerEntry, &key](std::span<const double> data, std::span<const gr::Tag> tags) {
pollerEntry.populateFromTags(tags);
reply.acqTriggerName = "STREAMING";
reply.channelName = pollerEntry.signal_name.value_or(std::string(signalName));
Expand All @@ -495,6 +496,26 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
std::ranges::copy(data, reply.channelValue.begin());
std::fill(reply.channelError.begin(), reply.channelError.end(), 0.f); // TODO
std::fill(reply.channelTimeBase.begin(), reply.channelTimeBase.end(), 0); // TODO
// preallocate trigger vectors to number of tags
reply.triggerIndices.reserve(tags.size());
reply.triggerEventNames.reserve(tags.size());
reply.triggerTimestamps.reserve(tags.size());
for (auto& [idx, tagMap] : tags) {
if (tagMap.contains(gr::tag::TRIGGER_NAME) && tagMap.contains(gr::tag::TRIGGER_TIME)) {
if (std::get<std::string>(tagMap.at(gr::tag::TRIGGER_NAME)) == "systemtime") {
reply.acqLocalTimeStamp = std::get<uint64_t>(tagMap.at(gr::tag::TRIGGER_TIME));
auto dataTimestamp = std::chrono::nanoseconds(std::get<uint64_t>(tagMap.at(gr::tag::TRIGGER_TIME)));
const auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch());
auto latency = (dataTimestamp.count() == 0) ? 0ns : now - dataTimestamp;
}
reply.triggerIndices.push_back(idx);
reply.triggerEventNames.push_back(std::get<std::string>(tagMap.at(gr::tag::TRIGGER_NAME)));
reply.triggerTimestamps.push_back(static_cast<int64_t>(std::get<uint64_t>(tagMap.at(gr::tag::TRIGGER_TIME))));
}
}
reply.triggerIndices.shrink_to_fit();
reply.triggerEventNames.shrink_to_fit();
reply.triggerTimestamps.shrink_to_fit();
};
pollerEntry.in_use = true;

Expand All @@ -509,7 +530,6 @@ class GnuRadioAcquisitionWorker : public Worker<serviceName, TimeDomainContext,
const auto key = PollerKey{.mode = mode, .signal_name = std::string(signalName), .pre_samples = static_cast<std::size_t>(context.preSamples), .post_samples = static_cast<std::size_t>(context.postSamples), .maximum_window_size = static_cast<std::size_t>(context.maximumWindowSize), .snapshot_delay = std::chrono::nanoseconds(context.snapshotDelay)};
auto pollerIt = pollers.find(key);
if (pollerIt == pollers.end()) {
fmt::println("Register {}, '{}'", magic_enum::enum_name(mode), context.triggerNameFilter);
const auto query = basic::DataSinkQuery::signalName(signalName);
// TODO for triggered/multiplexed subscriptions that only differ in preSamples/postSamples/maximumWindowSize, we could use a single poller for the encompassing range
// and send snippets from their datasets to the individual subscribers
Expand Down
88 changes: 88 additions & 0 deletions src/service/gnuradio/cli-signal-subscribe.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#include <daq_api.hpp>
#include <fmt/core.h>

#include <Client.hpp>
#include <IoSerialiserYaS.hpp>
#include <MdpMessage.hpp>
#include <RestClient.hpp>
#include <opencmw.hpp>
#include <type_traits>

/***
* A simple test program which allows to subscribe to a specific acquisition property and displays the range and sample count and rate of the received
* data acquisition Objects.
*
* Example use:
* ``` bash
* $ ./cli-signal-subscribe mds://localhost:12345/GnuRadio/Acquisition?channelNameFilter=test
* Subscribing to mds://localhost:12345/GnuRadio/Acquisition?channelNameFilter=test
* t = 26ms: Update received: 1, samples: 640, min-max: -0.0027466659-0.0025940733, total_samples: 640, avg_sampling_rate: 24615.384615384617
* t = 76ms: Update received: 2, samples: 640, min-max: -0.0027466659-0.0025940733, total_samples: 1280, avg_sampling_rate: 16842.105263157893
* [...]
* $ ./cli-signal-subscribe https://localhost:8080/GnuRadio/Acquisition?channelNameFilter=test&LongPollingIdx=Next # TODO: fix http subscription to not need long polling index
* Subscribing to mds://localhost:12345/GnuRadio/Acquisition?channelNameFilter=test
* t = 26ms: Update received: 1, samples: 640, min-max: -0.0027466659-0.0025940733, total_samples: 640, avg_sampling_rate: 24615.384615384617
* t = 76ms: Update received: 2, samples: 640, min-max: -0.0027466659-0.0025940733, total_samples: 1280, avg_sampling_rate: 16842.105263157893
* [...]
* ```
*/
int main(int argc, char** argv) {
using opencmw::URI;
using namespace opendigitizer::acq;
using namespace std::chrono_literals;

opencmw::client::RestClient::CHECK_CERTIFICATES = false; // allow subscribing to local services with self-signed certificates

if (argc <= 1) {
fmt::print("Please provide subscription URL to AcquisitionWorker.\n");
return 1;
}

const opencmw::zmq::Context zctx{};
std::vector<std::unique_ptr<opencmw::client::ClientBase>> clients;
clients.emplace_back(std::make_unique<opencmw::client::MDClientCtx>(zctx, 20ms, ""));
clients.emplace_back(std::make_unique<opencmw::client::RestClient>(opencmw::client::DefaultContentTypeHeader(opencmw::MIME::BINARY), opencmw::client::MaxIoThreads(5)));
opencmw::client::ClientContext client{std::move(clients)};

std::size_t samples_received = 0UZ;
std::size_t update_count = 0UZ;
const auto start = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch());

fmt::print("Subscribing to {}\n", argv[1]);

client.subscribe(opencmw::URI<opencmw::STRICT>(argv[1]), [&samples_received, &update_count, &start](const opencmw::mdp::Message& msg) {
if (!msg.error.empty() || msg.data.empty()) {
fmt::print("received error or data is empty, error msg: {}\n", msg.error);
return;
}
const auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch());
auto uptime = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
opendigitizer::acq::Acquisition acq{};
auto buf = msg.data;
try {
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq);
} catch (opencmw::ProtocolException& e) {
fmt::print("deserialisation error: {}\n", e.what());
return;
}
auto dataTimestamp = std::chrono::nanoseconds(acq.acqLocalTimeStamp.value());
auto latency = (dataTimestamp.count() == 0) ? 0ns : now - dataTimestamp;
samples_received += acq.channelValue.size();
update_count++;
double sample_rate = (samples_received / static_cast<double>(uptime.count())) * 1000.0;
auto [min, max] = [&acq]() {
if (acq.channelValue.empty()) {
return std::ranges::min_max_result{0.0f, 0.0f};
} else {
return std::ranges::minmax(acq.channelValue);
}
}();
fmt::print("t = {}ms: Update received: {}, samples: {}, min-max: {}-{}, total_samples: {}, avg_sampling_rate: {}, latency: {}s\n", uptime.count(), update_count, acq.channelValue.size(), min, max, samples_received, sample_rate, 1e-9 * latency.count());
});

while (true) {
std::this_thread::sleep_for(1s);
}

client.stop();
}
1 change: 0 additions & 1 deletion src/service/gnuradio/examples/picoscope4000a-streaming.grc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ blocks:
- Test unit C
- Test unit D
sample_rate: 10000
streaming_mode_poll_rate: 0.000010
- name: convertA
id: gr::blocks::type::converter::Convert
template_args: "float,double"
Expand Down
3 changes: 3 additions & 0 deletions src/service/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
#include <fstream>
#include <thread>

#include <fmt/core.h>
#include <fmt/ranges.h>

#include "FAIR/DeviceNameHelper.hpp"
#include "dashboard/dashboardWorker.hpp"
#include "gnuradio/GnuRadioAcquisitionWorker.hpp"
Expand Down
26 changes: 9 additions & 17 deletions src/ui/App.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class App {

components::AppHeader header;

// The thread limit here is mainly for emscripten
std::shared_ptr<gr::thread_pool::BasicThreadPool> schedulerThreadPool = std::make_shared<gr::thread_pool::BasicThreadPool>("scheduler-pool", gr::thread_pool::CPU_BOUND, 4, 4);
// The thread limit here is mainly for emscripten becaue the default thread pool will exhaust the browser's limits and be recreated for every new scheduler
std::shared_ptr<gr::thread_pool::BasicThreadPool> schedulerThreadPool = std::make_shared<gr::thread_pool::BasicThreadPool>("scheduler-pool", gr::thread_pool::CPU_BOUND, 1, 1);

struct SchedWrapper {
template<typename T, typename... Args>
Expand All @@ -80,9 +80,8 @@ class App {

template<typename TScheduler>
struct HandlerImpl : Handler {
TScheduler _scheduler;
std::thread _thread;
std::atomic<bool> stopRequested = false;
TScheduler _scheduler;
std::thread _thread;

gr::MsgPortIn _fromScheduler;
gr::MsgPortOut _toScheduler;
Expand All @@ -101,20 +100,13 @@ class App {

_thread = std::thread([this]() {
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::INITIALISED); !e) {
// TODO: handle error return message
throw fmt::format("Failed to initialize flowgraph");
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::RUNNING); !e) {
// TODO: handle error return message
}
while (!stopRequested && _scheduler.isProcessing()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::REQUESTED_STOP); !e) {
// TODO: handle error return message
}
if (auto e = _scheduler.changeStateTo(gr::lifecycle::State::STOPPED); !e) {
// TODO: handle error return message
throw fmt::format("Failed to start flowgraph processing");
}
// NOTE: the single threaded scheduler runs its main loop inside its start() function and only returns after its state changes to non-active
// We once have to directly change the state to running, after this, all further state updates are performed via the msg API
});
}

Expand All @@ -138,7 +130,7 @@ class App {
}

~HandlerImpl() {
stopRequested = true;
gr::sendMessage<gr::message::Command::Set>(_toScheduler, _scheduler.unique_name, gr::block::property::kLifeCycleState, {{"state", std::string(magic_enum::enum_name(gr::lifecycle::State::REQUESTED_STOP))}}, "UI");
_thread.join();
}
};
Expand Down
Loading
Loading