Skip to content

Commit

Permalink
refactor(toggle): improve toggling the stream
Browse files Browse the repository at this point in the history
Now when the stream is paused/resumed the select! macro will not
continue looping at every sleep timeout, but it simply disables the
branch meant to send data to Astarte

Signed-off-by: Riccardo Gallo <[email protected]>
  • Loading branch information
rgallor committed Dec 3, 2024
1 parent 02f6be7 commit 6c21d39
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 149 deletions.
27 changes: 18 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,25 +175,34 @@ command.
The syntax to use is the following:
```
astartectl appengine
--appengine-url <APPENGINE_URL>
--appengine-url <APPENGINE_URL>
--realm-management-url <REALM_MANAGEMENT_URL>
--realm-key <REALM>_private.pem
--realm-name <REALM> devices send-data <DEVICE_ID>
<SERVER_OWNED_INTERFACE> <ENDPOINT> <VALUE>
```

Where APPENGINE_URL, REALM_MANAGEMENT_URL and REALM are the appengine url, realm management url and your realm
name respectively. The DEVICE_ID is the device ID to send the data to, ENDPOINT is the endpoint to send data to,
which in this example should be composed by a sensor id and the math function, the scale or the interval endpoint
Where APPENGINE_URL, REALM_MANAGEMENT_URL and REALM are the appengine url, realm management url and your realm
name respectively. The DEVICE_ID is the device ID to send the data to, ENDPOINT is the endpoint to send data to,
which in this example should be composed by a sensor id and the math function, the scale or the interval endpoint
(e.g. /sensor_id_123/function), and VALUE is the value to send (e.g. "sin" if _function_ is the chosen endpoint).

For instance, supposing you are using an Astarte instance running on localhost:
```
astartectl appengine
--appengine-url http://api.astarte.localhost/appengine
--realm-management-url http://api.astarte.localhost/realmmanagement
--realm-key test_private.pem
--realm-name test devices send-data <DEVICE_ID>
astartectl appengine
--appengine-url http://api.astarte.localhost/appengine
--realm-management-url http://api.astarte.localhost/realmmanagement
--realm-key test_private.pem
--realm-name test devices send-data <DEVICE_ID>
org.astarte-platform.genericcommands.ServerCommands "/sensor_id_123/function" "sin"
```

### Pause and resume the stream

Using the same command that changes stream parameters, you can toggle the stream's state, either stopping or resuming
its operation. To do this, use the `/{%sensor_id}/toggle` endpoint with a boolean value.
(Note that the boolean value is not used but is required since sending empty data is not possible.)

When a `toggle` command is sent, the stream state switches from `On` (default), indicating that the stream is sending
data to Astarte, to `Off`, where the stream stops sending data, or vice versa if the stream was previously paused by
another toggle command.
114 changes: 2 additions & 112 deletions src/astarte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@

//! Astarte builder and configuration structures.
use crate::config::{StreamConfig, StreamConfigUpdate};
use astarte_device_sdk::builder::{DeviceBuilder, DeviceSdkBuild};
use astarte_device_sdk::client::RecvError;
use astarte_device_sdk::store::SqliteStore;
use astarte_device_sdk::transport::grpc::{Grpc, GrpcConfig};
use astarte_device_sdk::transport::mqtt::{Credential, Mqtt, MqttConfig};
use astarte_device_sdk::{Client, DeviceClient, DeviceConnection};
use astarte_device_sdk::{DeviceClient, DeviceConnection};
use clap::ValueEnum;
use color_eyre::eyre;
use color_eyre::eyre::{bail, eyre, OptionExt, WrapErr};
use serde::Deserialize;
use std::env::VarError;
use std::path::{Path, PathBuf};
use std::{env, io};
use tokio::select;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, error, info, warn};
use tracing::{debug, error};
use uuid::{uuid, Uuid};

const DEVICE_DATASTREAM: &str =
Expand Down Expand Up @@ -258,112 +254,6 @@ impl GrpcConfigBuilder {
}
}

/// Send data to Astarte
pub async fn send_data(
client: DeviceClient<SqliteStore>,
mut rx: Receiver<StreamConfigUpdate>,
mut stream_cfg: StreamConfig,
) -> eyre::Result<()> {
loop {
// wait until an interval has elapsed, thus data must be sent to Astarte, or a new stream
// config is received, hence modify it
select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(stream_cfg.interval)) => {
// check the stream state to verify if it's running or if it is stopped
// TODO: use a Notify mechanism to avoid looping also when the stream is off
if stream_cfg.is_off() {
debug!("stream is off, stop sending data");
continue;
}

debug!(
"sending data to Astarte with {} math function and scale {}",
stream_cfg.math_function, stream_cfg.scale
);

// Send data to Astarte
let value = stream_cfg.next_value();

// TODO: here the sensor_id is static. Should we introduce it as a CLI argument or
// another way to receive it "dynamically"?

client
.send(stream_cfg.interface(), "/sensor_id_123/value", value)
.await?;

debug!("data sent on endpoint /sensor_id_123/value, content: {value}");

// update the value upon which the data to be sent to Astarte at the next iteration
// will be computed
stream_cfg.update_value();
}
new_cfg = rx.recv() => {
let Some(new_cfg) = new_cfg else {
warn!("channel closed, cannot update stream config anymore");
return Ok(());
};

info!("updating stream config");
stream_cfg.update_cfg(new_cfg).await
}
}
}
}

/// Receive data from Astarte
pub async fn receive_data(
client: DeviceClient<SqliteStore>,
tx: Sender<StreamConfigUpdate>,
) -> eyre::Result<()> {
loop {
match client.recv().await {
Ok(data) => {
if let astarte_device_sdk::Value::Individual(var) = data.data {
// split the mapping path, which looks like "/foo/bar"
let mut iter = data.path.splitn(3, '/').skip(1);

let sensor_id = iter.next().ok_or_eyre("missing sensor id")?;

match iter.next() {
Some("toggle") => {
debug!("Received new toggle datastream for sensor {sensor_id}.");
tx.send(StreamConfigUpdate::toggle_state(sensor_id)).await?;
}
Some("function") => {
let function = String::try_from(var)?.into();
debug!(
"Received new function datastream for sensor {sensor_id}. sensor function is now {function}"
);
tx.send(StreamConfigUpdate::function(sensor_id, function))
.await?;
}
Some("interval") => {
let interval = i64::try_from(var)?.try_into()?;
debug!(
"Received new interval datastream for sensor {sensor_id}. sensor interval is now {interval}"
);
tx.send(StreamConfigUpdate::interval(sensor_id, interval))
.await?;
}
Some("scale") => {
let scale = var.try_into()?;
debug!(
"Received new scale datastream for sensor {sensor_id}. sensor scale is now {scale}"
);
tx.send(StreamConfigUpdate::scale(sensor_id, scale)).await?;
}
item => {
error!("unrecognized {item:?}")
}
}
}
}
Err(RecvError::Disconnected) => return Ok(()),
Err(err) => error!(%err),
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
14 changes: 5 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl StreamConfig {
match update {
ConfigUpdate::State => {
debug!("toggle stream state for sensor {sensor_id}");
self.toggle_state();
self.state.toggle();
}
ConfigUpdate::Function(value) => {
debug!("update stream math function config with {value} for sensor {sensor_id}");
Expand All @@ -76,12 +76,8 @@ impl StreamConfig {
}
}

fn toggle_state(&mut self) {
self.state.toggle();
}

pub(crate) fn is_off(&mut self) -> bool {
self.state.is_off()
pub(crate) fn is_on(&self) -> bool {
self.state.is_on()
}

/// retrieve a reference to the Astarte interface
Expand Down Expand Up @@ -118,8 +114,8 @@ impl StreamState {
}
}

pub(crate) fn is_off(&self) -> bool {
matches!(*self, StreamState::Off)
pub(crate) fn is_on(&self) -> bool {
matches!(*self, StreamState::On)
}
}

Expand Down
127 changes: 127 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,135 @@
#![doc = include_str!("../README.md")]
#![warn(clippy::dbg_macro, missing_docs, rustdoc::missing_crate_level_docs)]

use astarte_device_sdk::client::RecvError;
use astarte_device_sdk::store::SqliteStore;
use astarte_device_sdk::{Client, DeviceClient, DeviceEvent};
use color_eyre::eyre;
use color_eyre::eyre::OptionExt;
use std::time::SystemTime;
use tokio::select;
use tracing::{debug, error};

use crate::cli::Config;
use crate::config::{StreamConfig, StreamConfigUpdate};

pub mod astarte;
pub mod cli;
pub mod config;
pub mod math;
pub mod shutdown;

/// Stream manager
///
/// It handles the following operations:
/// - send stream data to Astarte
/// - receive new stream configuration from Astarte
pub struct StreamManager {
stream_cfg: StreamConfig,
}

impl StreamManager {
/// Build a [StreamManager] from an initial configuration
pub async fn new(cfg: Config) -> eyre::Result<Self> {
// time instant when the program starts its execution
let now = SystemTime::now();
let stream_cfg = StreamConfig::try_from_cli(cfg, now)?;

Ok(Self { stream_cfg })
}

/// Handle sending data to Astarte and the reception of new stream configuration from Astarte
pub async fn handle(mut self, client: DeviceClient<SqliteStore>) -> eyre::Result<()> {
loop {
select! {
// send data after every timeout only if the stream is not paused
_ = tokio::time::sleep(tokio::time::Duration::from_millis(self.stream_cfg.interval)), if self.stream_cfg.is_on() => {
debug!(
"sending data to Astarte with {} math function and scale {}",
self.stream_cfg.math_function, self.stream_cfg.scale
);
self.send_data(&client).await?;
}
// receive data from Astarte
res = client.recv() => {
match res {
Ok(event) => {
debug!("received event from Astarte: {:?}", event);
self.receive_data(event).await?;
}
Err(RecvError::Disconnected) => return Ok(()),
Err(err) => error!(%err),
}
}
}
}
}

/// Send data to Astarte
async fn send_data(&mut self, client: &DeviceClient<SqliteStore>) -> eyre::Result<()> {
// Send data to Astarte
let value = self.stream_cfg.next_value();

// TODO: here the sensor_id is static. Should we introduce it as a CLI argument or another way to receive it "dynamically"?
client
.send(self.stream_cfg.interface(), "/sensor_id_123/value", value)
.await?;

debug!("data sent on endpoint /sensor_id_123/value, content: {value}");

// update the value upon which the data to be sent to Astarte at the next iteration will be computed
self.stream_cfg.update_value();

Ok(())
}

async fn receive_data(&mut self, event: DeviceEvent) -> eyre::Result<()> {
if let astarte_device_sdk::Value::Individual(var) = event.data {
// split the mapping path, which looks like "/foo/bar"
let mut iter = event.path.splitn(3, '/').skip(1);

let sensor_id = iter.next().ok_or_eyre("missing sensor id")?;

match iter.next() {
Some("toggle") => {
debug!("Received new toggle datastream for sensor {sensor_id}.");
self.stream_cfg
.update_cfg(StreamConfigUpdate::toggle_state(sensor_id))
.await;
}
Some("function") => {
let function = String::try_from(var)?.into();
debug!(
"Received new function datastream for sensor {sensor_id}. sensor function is now {function}"
);
self.stream_cfg
.update_cfg(StreamConfigUpdate::function(sensor_id, function))
.await;
}
Some("interval") => {
let interval = i64::try_from(var)?.try_into()?;
debug!(
"Received new interval datastream for sensor {sensor_id}. sensor interval is now {interval}"
);
self.stream_cfg
.update_cfg(StreamConfigUpdate::interval(sensor_id, interval))
.await;
}
Some("scale") => {
let scale = var.try_into()?;
debug!(
"Received new scale datastream for sensor {sensor_id}. sensor scale is now {scale}"
);
self.stream_cfg
.update_cfg(StreamConfigUpdate::scale(sensor_id, scale))
.await;
}
item => {
error!("unrecognized {item:?}")
}
}
}

Ok(())
}
}
Loading

0 comments on commit 6c21d39

Please sign in to comment.