Skip to content

Commit

Permalink
RemoteSource: fix subscription logic
Browse files Browse the repository at this point in the history
Moves setting up the subscription to the start and stop methods instead
of performing them on settings change, which lead to duplicate
subscriptions.
Also fixes a bug that would not actually perform any unsubscribes making
the subscriptions live on forever.

Also introduces propagation of Timing Events.

Signed-off-by: Alexander Krimm <[email protected]>
  • Loading branch information
wirew0rm committed Nov 15, 2024
1 parent b6396ba commit bb0c188
Showing 1 changed file with 113 additions and 41 deletions.
154 changes: 113 additions & 41 deletions src/ui/blocks/RemoteSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace opendigitizer {
template<typename T>
requires std::is_floating_point_v<T>
struct RemoteStreamSource : public gr::Block<RemoteStreamSource<T>> {
using Parent = gr::Block<RemoteStreamSource<T>>;
gr::PortOut<T> out;
std::string remote_uri;
std::string signal_name;
Expand Down Expand Up @@ -59,6 +60,19 @@ struct RemoteStreamSource : public gr::Block<RemoteStreamSource<T>> {
} else {
std::ranges::transform(in, output.begin() + written, [](float v) { return static_cast<T>(v); });
}
{
const auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch());
auto latency = now - std::chrono::nanoseconds(d.acq.acqLocalTimeStamp.value());
std::size_t idx = written + in.size() - 1;
auto map = gr::property_map{{gr::tag::TRIGGER_NAME, {"CHUNK_END"}}, {gr::tag::TRIGGER_TIME, {d.acq.acqLocalTimeStamp.value()}}, {"REMOTE_SOURCE_LATENCY", {latency.count()}}};
output.publishTag(map, idx - d.read);
}
for (const auto& [idx, trigger, timestamp] : std::views::zip(d.acq.triggerIndices.value(), d.acq.triggerEventNames.value(), d.acq.triggerTimestamps.value())) {
const auto now = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now().time_since_epoch());
auto latency = now - std::chrono::nanoseconds(timestamp);
auto map = gr::property_map{{gr::tag::TRIGGER_NAME, {trigger}}, {gr::tag::TRIGGER_TIME, {timestamp}}, {"REMOTE_SOURCE_LATENCY", {latency.count()}}};
output.publishTag(map, idx - d.read);
}
written += in.size();
d.read += in.size();
if (d.read == d.acq.channelValue.size()) {
Expand All @@ -69,31 +83,39 @@ struct RemoteStreamSource : public gr::Block<RemoteStreamSource<T>> {
return gr::work::Status::OK;
}

void settingsChanged(const gr::property_map& old_settings, const gr::property_map& /*new_settings*/) {
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
fmt::print("Unsubscribing from {}\n", oldUri);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [oldUri](const opencmw::mdp::Message&) {
// TODO: Add cleanup once openCMW starts calling the callback
// on successful unsubscribe
fmt::print("Unsubscribed from {} successfully\n", oldUri);
};
}
}
void stopSubscription(const std::string& uri) {
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [uri](const opencmw::mdp::Message&) {};
_client.request(command);
}

void startSubscription(const std::string& uri) {
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.topic = opencmw::URI<>(remote_uri);
fmt::print("Subscribing to {}\n", remote_uri);
command.topic = opencmw::URI<>(uri);

std::weak_ptr maybeQueue = _queue;

command.callback = [maybeQueue](const opencmw::mdp::Message& rep) {
command.callback = [maybeQueue, uri, this](const opencmw::mdp::Message& rep) {
if (!rep.error.empty()) {
stopSubscription(remote_uri);
gr::sendMessage<gr::message::Command::Notify>(this->msgOut, this->unique_name /* serviceName */, "subscription", gr::Error(fmt::format("Error in subscription: re-subscribing{}", remote_uri)));
startSubscription(remote_uri);
auto queue = maybeQueue.lock();
if (!queue) {
return;
}
opendigitizer::acq::Acquisition acq;
acq.channelValue.value() = {0, -5, 5, -5, 5, 0}; // TODO: remove this once the UI supports showing tags and correct time axes
acq.triggerEventNames.value() = {"SubscriptionInterrupted"};
acq.triggerIndices.value() = {0};
acq.triggerTimestamps.value() = {0};
std::lock_guard lock(queue->mutex);
queue->data.push_back({std::move(acq), 0});
return;
}
if (rep.data.empty()) {
return;
}
Expand All @@ -102,23 +124,50 @@ struct RemoteStreamSource : public gr::Block<RemoteStreamSource<T>> {
if (!queue) {
return;
}
auto buf = rep.data;
opendigitizer::acq::Acquisition acq;
auto buf = rep.data;
opencmw::deserialise<opencmw::YaS, opencmw::ProtocolCheck::IGNORE>(buf, acq);
std::lock_guard lock(queue->mutex);
queue->data.push_back({std::move(acq), 0});
} catch (opencmw::ProtocolException& e) {
fmt::print(std::cerr, "{}\n", e.what());
gr::sendMessage<gr::message::Command::Notify>(this->msgOut, this->unique_name /* serviceName */, "subscription", gr::Error(fmt::format("failed to deserialise update from {}: {}\n", remote_uri, e.what())));
return;
}
};
_client.request(command);
}

void settingsChanged(const gr::property_map& old_settings, const gr::property_map& /*new_settings*/) {
if (Parent::state() != gr::lifecycle::State::RUNNING) {
return; // early return, only apply settings for the running flowgraph
}
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
stopSubscription(oldUri);
}
}
startSubscription(remote_uri);
}

void start() {
if (!remote_uri.empty()) {
startSubscription(remote_uri);
}
}

void stop() {
if (!remote_uri.empty()) {
stopSubscription(remote_uri);
}
}
};

template<typename T>
requires std::is_floating_point_v<T>
struct RemoteDataSetSource : public gr::Block<RemoteDataSetSource<T>> {
using Parent = gr::Block<RemoteDataSetSource<T>>;
gr::PortOut<gr::DataSet<T>> out;
std::string remote_uri;
opencmw::client::RestClient _client;
Expand All @@ -143,31 +192,28 @@ struct RemoteDataSetSource : public gr::Block<RemoteDataSetSource<T>> {
return gr::work::Status::OK;
}

void settingsChanged(const gr::property_map& old_settings, const gr::property_map& /*new_settings*/) {
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
fmt::print("Unsubscribing from {}\n", oldUri);
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [oldUri](const opencmw::mdp::Message&) {
// TODO: Add cleanup once openCMW starts calling the callback
// on successful unsubscribe
fmt::print("Unsubscribed from {} successfully\n", oldUri);
};
}
}
void stopSubscription(const std::string& uri) {
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Unsubscribe;
command.topic = opencmw::URI<>(remote_uri);
command.callback = [uri](const opencmw::mdp::Message&) {};
_client.request(command);
}

void startSubscription(const std::string& uri) {
opencmw::client::Command command;
command.command = opencmw::mdp::Command::Subscribe;
command.topic = opencmw::URI<>(remote_uri);
fmt::print("Subscribing to {}\n", remote_uri);
command.topic = opencmw::URI<>(uri);

std::weak_ptr maybeQueue = _queue;

command.callback = [maybeQueue](const opencmw::mdp::Message& rep) {
command.callback = [maybeQueue, uri, this](const opencmw::mdp::Message& rep) {
if (!rep.error.empty()) {
stopSubscription(remote_uri);
sendMessage<gr::message::Command::Notify>(this->msgOut, this->unique_name /* serviceName */, "subscription", gr::Error(fmt::format("Error in subscription: re-subscribing{}\n", remote_uri)));
startSubscription(remote_uri);
return;
}
if (rep.data.empty()) {
return;
}
Expand All @@ -192,12 +238,38 @@ struct RemoteDataSetSource : public gr::Block<RemoteDataSetSource<T>> {
std::lock_guard lock(queue->mutex);
queue->data.push_back(std::move(ds));
} catch (opencmw::ProtocolException& e) {
fmt::print(std::cerr, "{}\n", e.what());
gr::sendMessage<gr::message::Command::Notify>(this->msgOut, this->unique_name /* serviceName */, "subscription", gr::Error(fmt::format("failed to deserialise update from {}: {}\n", remote_uri, e.what())));
return;
}
};
_client.request(command);
}

void settingsChanged(const gr::property_map& old_settings, const gr::property_map& /*new_settings*/) {
if (Parent::state() != gr::lifecycle::State::RUNNING) {
return; // early return, only apply settings for the running flowgraph
}
const auto oldValue = old_settings.find("remote_uri");
if (oldValue != old_settings.end()) {
const auto oldUri = std::get<std::string>(oldValue->second);
if (!oldUri.empty()) {
stopSubscription(oldUri);
}
}
startSubscription(remote_uri);
}

void start() {
if (!remote_uri.empty()) {
startSubscription(remote_uri);
}
}

void stop() {
if (!remote_uri.empty()) {
stopSubscription(remote_uri);
}
}
};

} // namespace opendigitizer
Expand Down

0 comments on commit bb0c188

Please sign in to comment.