Skip to content

Commit

Permalink
Merge pull request #64 from rneswold/pr-issue6
Browse files Browse the repository at this point in the history
The logic block functionality has been added.
  • Loading branch information
rneswold authored May 14, 2023
2 parents 1f1b789 + 5b524dd commit 108cf7c
Show file tree
Hide file tree
Showing 23 changed files with 2,589 additions and 1,778 deletions.
515 changes: 248 additions & 267 deletions Cargo.lock

Large diffs are not rendered by default.

192 changes: 106 additions & 86 deletions backends/drmem-db-redis/src/lib.rs

Large diffs are not rendered by default.

18 changes: 14 additions & 4 deletions backends/drmem-db-simple/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
use async_trait::async_trait;
use chrono::*;
use drmem_api::{
client,
client, device,
driver::{ReportReading, RxDeviceSetting, TxDeviceSetting},
types::{device, Error},
Result, Store,
Error, Result, Store,
};
use std::collections::{hash_map, HashMap};
use std::{
Expand Down Expand Up @@ -332,6 +331,17 @@ impl Store for SimpleStore {
}
}

async fn get_setting_chan(
&self, name: device::Name, _own: bool,
) -> Result<TxDeviceSetting> {
if let Some(di) = self.0.get(&name) {
if let Some(tx) = &di.tx_setting {
return Ok(tx.clone());
}
}
Err(Error::NotFound)
}

// Handles a request to monitor a device's changing value. The
// caller must pass in the name of the device. Returns a stream
// which returns the last value reported for the device followed
Expand Down Expand Up @@ -443,7 +453,7 @@ impl Store for SimpleStore {
#[cfg(test)]
mod tests {
use crate::{mk_report_func, DeviceInfo, SimpleStore};
use drmem_api::{types::device, Store};
use drmem_api::{device, Store};
use std::{collections::HashMap, time};
use tokio::sync::{mpsc::error::TryRecvError, oneshot};
use tokio::time::interval;
Expand Down
105 changes: 61 additions & 44 deletions drivers/drmem-drv-ntp/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use drmem_api::{
device,
driver::{self, DriverConfig},
types::{device, Error},
Result,
Error, Result,
};
use std::future::Future;
use std::sync::Arc;
use std::{convert::Infallible, pin::Pin};
use std::{
net::{SocketAddr, SocketAddrV4},
str,
};
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
use tokio::time::{self, Duration};
use tracing::{debug, error, info, warn, Span};

Expand Down Expand Up @@ -105,6 +107,9 @@ mod server {
pub struct Instance {
sock: UdpSocket,
seq: u16,
}

pub struct Devices {
d_state: driver::ReportReading<bool>,
d_source: driver::ReportReading<String>,
d_offset: driver::ReportReading<f64>,
Expand All @@ -125,16 +130,20 @@ impl Instance {
match cfg.get("addr") {
Some(toml::value::Value::String(addr)) => {
if let Ok(addr) = addr.parse::<SocketAddrV4>() {
return Ok(addr);
Ok(addr)
} else {
error!("'addr' not in hostname:port format")
Err(Error::BadConfig(String::from(
"'addr' not in hostname:port format",
)))
}
}
Some(_) => error!("'addr' config parameter should be a string"),
None => error!("missing 'addr' parameter in config"),
Some(_) => Err(Error::BadConfig(String::from(
"'addr' config parameter should be a string",
))),
None => Err(Error::BadConfig(String::from(
"missing 'addr' parameter in config",
))),
}

Err(Error::BadConfig)
}

// Combines and returns the first two bytes from a buffer as a
Expand Down Expand Up @@ -355,10 +364,11 @@ impl Instance {
}

impl driver::API for Instance {
fn create_instance(
cfg: &DriverConfig, core: driver::RequestChan,
max_history: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<driver::DriverType>> + Send>> {
type DeviceSet = Devices;

fn register_devices(
core: driver::RequestChan, _: &DriverConfig, max_history: Option<usize>,
) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
// It's safe to use `.unwrap()` for these names because, in a
// fully-tested, released version of this driver, we would
// have seen and fixed any panics.
Expand All @@ -368,6 +378,32 @@ impl driver::API for Instance {
let offset_name = "offset".parse::<device::Base>().unwrap();
let delay_name = "delay".parse::<device::Base>().unwrap();

Box::pin(async move {
// Define the devices managed by this driver.

let (d_state, _) =
core.add_ro_device(state_name, None, max_history).await?;
let (d_source, _) =
core.add_ro_device(source_name, None, max_history).await?;
let (d_offset, _) = core
.add_ro_device(offset_name, Some("ms"), max_history)
.await?;
let (d_delay, _) = core
.add_ro_device(delay_name, Some("ms"), max_history)
.await?;

Ok(Devices {
d_state,
d_source,
d_offset,
d_delay,
})
})
}

fn create_instance(
cfg: &DriverConfig,
) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
let addr = Instance::get_cfg_address(cfg);

let fut = async move {
Expand All @@ -378,29 +414,7 @@ impl driver::API for Instance {

if let Ok(sock) = UdpSocket::bind(loc_if).await {
if sock.connect(addr).await.is_ok() {
// Define the devices managed by this driver.

let (d_state, _) = core
.add_ro_device(state_name, None, max_history)
.await?;
let (d_source, _) = core
.add_ro_device(source_name, None, max_history)
.await?;
let (d_offset, _) = core
.add_ro_device(offset_name, Some("ms"), max_history)
.await?;
let (d_delay, _) = core
.add_ro_device(delay_name, Some("ms"), max_history)
.await?;

return Ok(Box::new(Instance {
sock,
seq: 1,
d_state,
d_source,
d_offset,
d_delay,
}) as driver::DriverType);
return Ok(Box::new(Instance { sock, seq: 1 }));
}
}
Err(Error::OperationError)
Expand All @@ -410,9 +424,9 @@ impl driver::API for Instance {
}

fn run<'a>(
&'a mut self,
&'a mut self, devices: Arc<Mutex<Devices>>,
) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
let fut = async {
let fut = async move {
// Record the peer's address in the "cfg" field of the
// span.

Expand All @@ -423,7 +437,7 @@ impl driver::API for Instance {
.map(|v| format!("{}", v))
.unwrap_or_else(|_| String::from("**unknown**"));

Span::current().record("cfg", &addr.as_str());
Span::current().record("cfg", addr.as_str());
}

// Set `info` to an initial, unmatchable value. `None`
Expand All @@ -434,6 +448,8 @@ impl driver::API for Instance {
let mut info = Some(server::Info::bad_value());
let mut interval = time::interval(Duration::from_millis(20_000));

let devices = devices.lock().await;

loop {
interval.tick().await;

Expand All @@ -451,10 +467,11 @@ impl driver::API for Instance {
tmp.get_offset(),
tmp.get_delay()
);
(self.d_source)(tmp.get_host().clone()).await;
(self.d_offset)(tmp.get_offset()).await;
(self.d_delay)(tmp.get_delay()).await;
(self.d_state)(true).await;
(devices.d_source)(tmp.get_host().clone())
.await;
(devices.d_offset)(tmp.get_offset()).await;
(devices.d_delay)(tmp.get_delay()).await;
(devices.d_state)(true).await;
info = host_info;
}
continue;
Expand All @@ -463,14 +480,14 @@ impl driver::API for Instance {
if info.is_some() {
warn!("no synced host information found");
info = None;
(self.d_state)(false).await;
(devices.d_state)(false).await;
}
}
}
} else if info.is_some() {
warn!("we're not synced to any host");
info = None;
(self.d_state)(false).await;
(devices.d_state)(false).await;
}
}
};
Expand Down
Loading

0 comments on commit 108cf7c

Please sign in to comment.