Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devin/feature/add dfgm interface and handler #3

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
dfgm_data/
target/
Cargo.lock
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ edition = "2021"

[dependencies]
clap = { version = "4.4.8", features = ["derive"] }
nix = "0.24"
tokio = { version = "1", features = ["full"] }


5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ The command and response packets for the same format:
| Length | Dest | Opcode | Data 0 | Data 1 | ... | 0 |

where the _Length_ is the total number of bytes of data including the header,
_Dest_ is the payload (1 for `EPS`, 2 for `ADCS`, 3 for `DFGM`), _Opcode_ is
_Dest_ is the payload (1 for `EPS`, 2 for `DFGM`, 3 for `ADCS`), _Opcode_ is
the payload specific command code, and _Data i_ is the payload and command
specific arguments.

For replies the _Dest_ and _Opcode_ are the same as the command, and the
optional response status and data start at _Data 0_.


## Sending a command to the OBC via TCP quickly

The easiest way to send a command to the OBC via TCP direct (no coms system in between for now) is to use the provided [Ground station rust program](https://github.com/AlbertaSat/ex3_ground_station) . You can also use a simple CLI tool like nc (netcat), but keep in mind data is read as bytes and thus ascii characters will be converted to their equivalent byte value by the OBC.
3 changes: 3 additions & 0 deletions src/DFGM.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod dfgm_interface;
pub mod dfgm_handler;
pub mod ipc_interface;
116 changes: 116 additions & 0 deletions src/DFGM/dfgm_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
DFGM Handler receives commands related to the DFGM and performs their related action.

Communication to the DFGM is done through the DFGM interface.

TODO - What is the point of the port variable? It is set by the command, but never used.
TODO - Implement the dispatch function to handle the DFGM command to toggle 'listen_to_DFGM' flag
*/
use crate::DFGMInterface;

use crate::cmd::{Command, Message};
use std::sync::{Arc, Mutex};
use std::thread;

use std::fs::OpenOptions;
use std::io::prelude::*;



const DFGM_DATA_DIR_PATH: &str = "dfgm_data";

const DFGM_PACKET_SIZE: usize = 1250;
pub struct Dfgm {
port: u16,
state: u8,
interface: Option<Arc<Mutex<DFGMInterface>>>,
listen_to_DFGM: bool,
}

impl Dfgm {
pub fn new() -> Dfgm {
Dfgm {
port: 1,
state: 0,
interface: None,
listen_to_DFGM: true,
}
}
/// Called in program main to create an instance of Dfgm and setup a interfacing thread,
/// sort of like a constructor.
pub fn configure() -> Dfgm {
let mut dfgm = Dfgm::new();
dfgm.setup_interface_thread();
dfgm
}

/// Create thread to communicate with DFGM interface
pub fn setup_interface_thread(&mut self) {
//Create new interface for TCP client connection to sim dfgm
let dfgm_interface_obj = match DFGMInterface::new_tcp("localhost:1802") {
Ok(interface) => interface,
Err(e) => {
eprintln!("Failed to open TCP connection for DFGM: {:?}", e);
return;
}
};

self.interface = Some(Arc::new(Mutex::new(dfgm_interface_obj)));

let interface_clone = self.interface.as_ref().map(Arc::clone).unwrap();
let listen_to_DFGM_clone = self.listen_to_DFGM.clone();
thread::spawn(move || {
loop {
{
let mut interface = interface_clone.lock().unwrap();

let mut buffer = [0; DFGM_PACKET_SIZE];
match interface.receive(&mut buffer) {
Ok(size) => {
if size > 0 && listen_to_DFGM_clone {
//TODO - log data IF listen_to_DFGM flag is set, otherwise ignore
//println!("Received: {:?}", &buffer[..size]);
let write_result = store_dfgm_data(buffer.as_ref());
match write_result {
Ok(_) => println!("DFGM data written to file"),
Err(e) => eprintln!("Error writing data to file: {:?}", e),
}
}
}
Err(e) => {
eprintln!("Receive error: {:?}", e);
}
}
}
}
});
}

/// Right now this is how a command is received from GS, through the OBC... Later we prob want a command dispatcher for all sub systems
pub fn dispatch(&mut self, cmd: &Command) -> Result<Message, &'static str> {
if cmd.oplen > 0 {
let arg = std::str::from_utf8(&cmd.opdata).unwrap();
println!("dfgm op data: {0}", arg);
match arg.parse::<u16>() {
Ok(i) => self.port = i,
Err(s) => println!("Parse failed: {}", s),
}
}
println!("Dfgm: opcode {}, port: {}", cmd.opcode, self.port);
self.state += 1;
Ok(cmd.status_msg(self.state))
}
}

/// Write DFGM data to a file (for now --- this may changer later if we use a db or other storage)
/// Later on we likely want to specify a path to specific storage medium (sd card 1 or 2)
/// We may also want to implement something generic to handle 'payload data' storage so we can have it duplicated, stored in multiple locations, or compressed etc.
fn store_dfgm_data(data: &[u8]) -> std::io::Result<()> {
std::fs::create_dir_all(DFGM_DATA_DIR_PATH)?;
let mut file = OpenOptions::new()
.append(true)
.create(true)
.open(format!("{}/data", DFGM_DATA_DIR_PATH))?;
file.write_all(data)?;
Ok(())
}
75 changes: 75 additions & 0 deletions src/DFGM/dfgm_interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*

DFGM interface is for abstracting communication between OBC and DFGM.

The DFGM interface can be either a TCP interface or a UART interface, allowing the OBC
code to be agnostic to the underlying communication protocol used for communication with the DFGM.

TODO - Use hardcoded port for TCP connection to DFGM simulation
TODO - Implement UARTInterface for communication with DFGM
TODO - Write unit tests for both TCP and UART interfaces
*/

use std::io::{self, Read, Write};
use std::net::TcpStream;

const DFGM_sim_port: u16 = 1802;

trait Interface: Send + Sync {
/// Send buffer as reference to array of bytes
fn send(&mut self, data: &[u8]) -> io::Result<()>;
/// Place recevied bytes into buffer reference, and return number of byte recevied
fn receive(&mut self, buffer: &mut [u8]) -> io::Result<usize>;
}

pub struct TCPInterface {
stream: TcpStream,
}

impl TCPInterface {
pub fn new(address: &str) -> io::Result<Self> {
let stream: TcpStream = TcpStream::connect(address)?;
Ok(Self { stream })
}
}

impl Interface for TCPInterface {
fn send(&mut self, data: &[u8]) -> io::Result<()> {
self.stream.write_all(data)
}

fn receive(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
self.stream.read(buffer)
}
}

/// DFGM interface struct can either be a TCP Interface OR a UART interface
pub struct DFGMInterface {
interface: Box<dyn Interface>,
}

impl DFGMInterface {
pub fn new_tcp(address: &str) -> io::Result<Self> {
let interface = Box::new(TCPInterface::new(address)?);
Ok(Self { interface })
}

pub fn send(&mut self, data: &[u8]) -> io::Result<()> {
self.interface.send(data)
}

pub fn receive(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
self.interface.receive(buffer)
}
}

#[cfg(tests)]
mod tests {
use super::*;

#[test]
fn test_tcp_send() {}

#[test]
fn test_tcp_receive() {}
}
167 changes: 167 additions & 0 deletions src/DFGM/ipc_interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
Interface for IPC communication. Allows other processes on the OBC to communicate with a subsystem handler.

Creation of a IPCReader checks if fifo exists already and if not creates one.

Anything that uses an IPC reader must also have the FifoDataHandler trait implemented, which contains use
case specific logic on how the data read from the fifo is handled.


TODO - build fifo paths always in /tmp/fifos/ , then append the fifo name provided, that way all FSW fifos are
in their own directory
*/

use nix::unistd::mkfifo;
use std::fs::{remove_file, OpenOptions};
use std::io::{Error, Read, Write};
use std::path::Path;
use std::sync::Arc;

const MAX_FIFO_BUFFER_SIZE: usize = 1024;
const FIFO_DIR_PREPEND: &str = "/tmp/fifo-";

/// Contains a callback fired when data is read from the fifo
/// Handlers are to be defined by whatever is using the IPCReader (processes will do different things with the data)
trait FifoDataHandler: Send + Sync + 'static {
fn handle_fifo_input(&self, data: &[u8]);
}

pub struct IpcReader {
fifo_path_str: String,
data_handler: Arc<dyn FifoDataHandler>,
}

impl IpcReader {
fn new(fifo_name: &str, handler: Arc<dyn FifoDataHandler>) -> Result<IpcReader, Error> {
let fifo_path = format!("{}{}", FIFO_DIR_PREPEND, fifo_name);
let reader = IpcReader {
fifo_path_str: fifo_path.to_string(),
data_handler: handler,
};
reader.setup_fifo()?;
Ok(reader)
}

/// Check if fifo exists, if it does delete it, then create a new one
fn setup_fifo(&self) -> Result<(), Error> {
let path = Path::new(&self.fifo_path_str);
if path.exists() {
remove_file(&self.fifo_path_str)?;
}
match mkfifo(self.fifo_path_str.as_str(), nix::sys::stat::Mode::S_IRWXU) {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}

fn read(&self, buffer: &mut [u8]) -> Result<usize, Error> {
// Open the named pipe for reading
let path_obj = Path::new(self.fifo_path_str.as_str());
let mut fifo = OpenOptions::new().read(true).open(path_obj)?;

let n = fifo.read(buffer)?;
return Ok(n);
}

/// Begin a thread which calls the Handlers 'handle_fifo_input' fxn when data is read
fn start_reader_thread(&self) {
let fifo_path_str_clone = self.fifo_path_str.clone();
let handler = Arc::clone(&self.data_handler);
std::thread::spawn(move || {
let mut buffer = vec![0; MAX_FIFO_BUFFER_SIZE];
let path = Path::new(&fifo_path_str_clone);
let mut fifo = OpenOptions::new()
.read(true)
.open(path)
.expect("Failed to open FIFO for reading");
loop {
match fifo.read(&mut buffer) {
Ok(n) => {
if n > 0 {
handler.handle_fifo_input(&buffer[..n]);
}
}
Err(e) => {
println!("Error reading FIFO: {}", e);
break;
}
}
}
});
}
}

pub struct IpcWriter {
fifo_path_str: String,
}

impl IpcWriter {
fn new(fifo_name: &str) -> IpcWriter {
let fifo_path = format!("{}{}", FIFO_DIR_PREPEND, fifo_name);
IpcWriter {
fifo_path_str: fifo_path.to_string(),
}
}

fn write(&self, data: &[u8]) -> Result<(), std::io::Error> {
let path_obj = Path::new(self.fifo_path_str.as_str());
let mut fifo = OpenOptions::new().write(true).open(path_obj)?;
fifo.write_all(data)?;
println!("Message written to the pipe");
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

struct TestHandler {
received_data: Arc<Mutex<Vec<u8>>>,
}

impl TestHandler {
fn new() -> Self {
Self {
received_data: Arc::new(Mutex::new(Vec::new())),
}
}
}

impl FifoDataHandler for TestHandler {
fn handle_fifo_input(&self, data: &[u8]) {
let mut received_data = self.received_data.lock().unwrap();
received_data.extend_from_slice(data);
println!("Fifo handler callback received data: {:?}", received_data);
}
}

#[tokio::test]
async fn test_ipc_reader_writer() -> Result<(), Box<dyn std::error::Error>> {
let fifo_name = "rust_ipc_test";

let handler = Arc::new(TestHandler::new());
let reader = IpcReader::new(fifo_name, handler.clone()).unwrap();

reader.start_reader_thread();

// Ensure reader thread is ready
thread::sleep(Duration::from_secs(1));

let writer = IpcWriter::new(fifo_name);

let test_data = b"This is from the writer";
writer.write(test_data)?;

// Give some time for the reader to process the message
thread::sleep(Duration::from_secs(1));

let received_data = handler.received_data.lock().unwrap().clone();
assert_eq!(received_data, test_data);

Ok(())
}
}
Loading