diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..034484a --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +dfgm_data/ +target/ +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 847170a..a59c0a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,7 @@ edition = "2021" [dependencies] clap = { version = "4.4.8", features = ["derive"] } +nix = "0.24" +tokio = { version = "1", features = ["full"] } + + diff --git a/README.md b/README.md index 0a18090..26c5682 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ The command and response packets for the same format: | Length | Dest | Opcode | Data 0 | Data 1 | ... | 0 | where the _Length_ is the total number of bytes of data including the header, -_Dest_ is the payload (1 for `EPS`, 2 for `ADCS`, 3 for `DFGM`), _Opcode_ is +_Dest_ is the payload (1 for `EPS`, 2 for `DFGM`, 3 for `ADCS`), _Opcode_ is the payload specific command code, and _Data i_ is the payload and command specific arguments. @@ -31,3 +31,6 @@ For replies the _Dest_ and _Opcode_ are the same as the command, and the optional response status and data start at _Data 0_. +## Sending a command to the OBC via TCP quickly + +The easiest way to send a command to the OBC via TCP direct (no coms system in between for now) is to use the provided [Ground station rust program](https://github.com/AlbertaSat/ex3_ground_station) . You can also use a simple CLI tool like nc (netcat), but keep in mind data is read as bytes and thus ascii characters will be converted to their equivalent byte value by the OBC. diff --git a/src/DFGM.rs b/src/DFGM.rs new file mode 100644 index 0000000..c79611b --- /dev/null +++ b/src/DFGM.rs @@ -0,0 +1,3 @@ +pub mod dfgm_interface; +pub mod dfgm_handler; +pub mod ipc_interface; \ No newline at end of file diff --git a/src/DFGM/dfgm_handler.rs b/src/DFGM/dfgm_handler.rs new file mode 100644 index 0000000..08e197c --- /dev/null +++ b/src/DFGM/dfgm_handler.rs @@ -0,0 +1,116 @@ +/* +DFGM Handler receives commands related to the DFGM and performs their related action. + +Communication to the DFGM is done through the DFGM interface. + +TODO - What is the point of the port variable? It is set by the command, but never used. +TODO - Implement the dispatch function to handle the DFGM command to toggle 'listen_to_DFGM' flag +*/ +use crate::DFGMInterface; + +use crate::cmd::{Command, Message}; +use std::sync::{Arc, Mutex}; +use std::thread; + +use std::fs::OpenOptions; +use std::io::prelude::*; + + + +const DFGM_DATA_DIR_PATH: &str = "dfgm_data"; + +const DFGM_PACKET_SIZE: usize = 1250; +pub struct Dfgm { + port: u16, + state: u8, + interface: Option>>, + listen_to_DFGM: bool, +} + +impl Dfgm { + pub fn new() -> Dfgm { + Dfgm { + port: 1, + state: 0, + interface: None, + listen_to_DFGM: true, + } + } + /// Called in program main to create an instance of Dfgm and setup a interfacing thread, + /// sort of like a constructor. + pub fn configure() -> Dfgm { + let mut dfgm = Dfgm::new(); + dfgm.setup_interface_thread(); + dfgm + } + + /// Create thread to communicate with DFGM interface + pub fn setup_interface_thread(&mut self) { + //Create new interface for TCP client connection to sim dfgm + let dfgm_interface_obj = match DFGMInterface::new_tcp("localhost:1802") { + Ok(interface) => interface, + Err(e) => { + eprintln!("Failed to open TCP connection for DFGM: {:?}", e); + return; + } + }; + + self.interface = Some(Arc::new(Mutex::new(dfgm_interface_obj))); + + let interface_clone = self.interface.as_ref().map(Arc::clone).unwrap(); + let listen_to_DFGM_clone = self.listen_to_DFGM.clone(); + thread::spawn(move || { + loop { + { + let mut interface = interface_clone.lock().unwrap(); + + let mut buffer = [0; DFGM_PACKET_SIZE]; + match interface.receive(&mut buffer) { + Ok(size) => { + if size > 0 && listen_to_DFGM_clone { + //TODO - log data IF listen_to_DFGM flag is set, otherwise ignore + //println!("Received: {:?}", &buffer[..size]); + let write_result = store_dfgm_data(buffer.as_ref()); + match write_result { + Ok(_) => println!("DFGM data written to file"), + Err(e) => eprintln!("Error writing data to file: {:?}", e), + } + } + } + Err(e) => { + eprintln!("Receive error: {:?}", e); + } + } + } + } + }); + } + + /// Right now this is how a command is received from GS, through the OBC... Later we prob want a command dispatcher for all sub systems + pub fn dispatch(&mut self, cmd: &Command) -> Result { + if cmd.oplen > 0 { + let arg = std::str::from_utf8(&cmd.opdata).unwrap(); + println!("dfgm op data: {0}", arg); + match arg.parse::() { + Ok(i) => self.port = i, + Err(s) => println!("Parse failed: {}", s), + } + } + println!("Dfgm: opcode {}, port: {}", cmd.opcode, self.port); + self.state += 1; + Ok(cmd.status_msg(self.state)) + } +} + +/// Write DFGM data to a file (for now --- this may changer later if we use a db or other storage) +/// Later on we likely want to specify a path to specific storage medium (sd card 1 or 2) +/// We may also want to implement something generic to handle 'payload data' storage so we can have it duplicated, stored in multiple locations, or compressed etc. +fn store_dfgm_data(data: &[u8]) -> std::io::Result<()> { + std::fs::create_dir_all(DFGM_DATA_DIR_PATH)?; + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open(format!("{}/data", DFGM_DATA_DIR_PATH))?; + file.write_all(data)?; + Ok(()) +} diff --git a/src/DFGM/dfgm_interface.rs b/src/DFGM/dfgm_interface.rs new file mode 100644 index 0000000..c61b532 --- /dev/null +++ b/src/DFGM/dfgm_interface.rs @@ -0,0 +1,75 @@ +/* + +DFGM interface is for abstracting communication between OBC and DFGM. + +The DFGM interface can be either a TCP interface or a UART interface, allowing the OBC +code to be agnostic to the underlying communication protocol used for communication with the DFGM. + +TODO - Use hardcoded port for TCP connection to DFGM simulation +TODO - Implement UARTInterface for communication with DFGM +TODO - Write unit tests for both TCP and UART interfaces +*/ + +use std::io::{self, Read, Write}; +use std::net::TcpStream; + +const DFGM_sim_port: u16 = 1802; + +trait Interface: Send + Sync { + /// Send buffer as reference to array of bytes + fn send(&mut self, data: &[u8]) -> io::Result<()>; + /// Place recevied bytes into buffer reference, and return number of byte recevied + fn receive(&mut self, buffer: &mut [u8]) -> io::Result; +} + +pub struct TCPInterface { + stream: TcpStream, +} + +impl TCPInterface { + pub fn new(address: &str) -> io::Result { + let stream: TcpStream = TcpStream::connect(address)?; + Ok(Self { stream }) + } +} + +impl Interface for TCPInterface { + fn send(&mut self, data: &[u8]) -> io::Result<()> { + self.stream.write_all(data) + } + + fn receive(&mut self, buffer: &mut [u8]) -> io::Result { + self.stream.read(buffer) + } +} + +/// DFGM interface struct can either be a TCP Interface OR a UART interface +pub struct DFGMInterface { + interface: Box, +} + +impl DFGMInterface { + pub fn new_tcp(address: &str) -> io::Result { + let interface = Box::new(TCPInterface::new(address)?); + Ok(Self { interface }) + } + + pub fn send(&mut self, data: &[u8]) -> io::Result<()> { + self.interface.send(data) + } + + pub fn receive(&mut self, buffer: &mut [u8]) -> io::Result { + self.interface.receive(buffer) + } +} + +#[cfg(tests)] +mod tests { + use super::*; + + #[test] + fn test_tcp_send() {} + + #[test] + fn test_tcp_receive() {} +} diff --git a/src/DFGM/ipc_interface.rs b/src/DFGM/ipc_interface.rs new file mode 100644 index 0000000..f913187 --- /dev/null +++ b/src/DFGM/ipc_interface.rs @@ -0,0 +1,167 @@ +/* +Interface for IPC communication. Allows other processes on the OBC to communicate with a subsystem handler. + +Creation of a IPCReader checks if fifo exists already and if not creates one. + +Anything that uses an IPC reader must also have the FifoDataHandler trait implemented, which contains use +case specific logic on how the data read from the fifo is handled. + + +TODO - build fifo paths always in /tmp/fifos/ , then append the fifo name provided, that way all FSW fifos are +in their own directory +*/ + +use nix::unistd::mkfifo; +use std::fs::{remove_file, OpenOptions}; +use std::io::{Error, Read, Write}; +use std::path::Path; +use std::sync::Arc; + +const MAX_FIFO_BUFFER_SIZE: usize = 1024; +const FIFO_DIR_PREPEND: &str = "/tmp/fifo-"; + +/// Contains a callback fired when data is read from the fifo +/// Handlers are to be defined by whatever is using the IPCReader (processes will do different things with the data) +trait FifoDataHandler: Send + Sync + 'static { + fn handle_fifo_input(&self, data: &[u8]); +} + +pub struct IpcReader { + fifo_path_str: String, + data_handler: Arc, +} + +impl IpcReader { + fn new(fifo_name: &str, handler: Arc) -> Result { + let fifo_path = format!("{}{}", FIFO_DIR_PREPEND, fifo_name); + let reader = IpcReader { + fifo_path_str: fifo_path.to_string(), + data_handler: handler, + }; + reader.setup_fifo()?; + Ok(reader) + } + + /// Check if fifo exists, if it does delete it, then create a new one + fn setup_fifo(&self) -> Result<(), Error> { + let path = Path::new(&self.fifo_path_str); + if path.exists() { + remove_file(&self.fifo_path_str)?; + } + match mkfifo(self.fifo_path_str.as_str(), nix::sys::stat::Mode::S_IRWXU) { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } + + fn read(&self, buffer: &mut [u8]) -> Result { + // Open the named pipe for reading + let path_obj = Path::new(self.fifo_path_str.as_str()); + let mut fifo = OpenOptions::new().read(true).open(path_obj)?; + + let n = fifo.read(buffer)?; + return Ok(n); + } + + /// Begin a thread which calls the Handlers 'handle_fifo_input' fxn when data is read + fn start_reader_thread(&self) { + let fifo_path_str_clone = self.fifo_path_str.clone(); + let handler = Arc::clone(&self.data_handler); + std::thread::spawn(move || { + let mut buffer = vec![0; MAX_FIFO_BUFFER_SIZE]; + let path = Path::new(&fifo_path_str_clone); + let mut fifo = OpenOptions::new() + .read(true) + .open(path) + .expect("Failed to open FIFO for reading"); + loop { + match fifo.read(&mut buffer) { + Ok(n) => { + if n > 0 { + handler.handle_fifo_input(&buffer[..n]); + } + } + Err(e) => { + println!("Error reading FIFO: {}", e); + break; + } + } + } + }); + } +} + +pub struct IpcWriter { + fifo_path_str: String, +} + +impl IpcWriter { + fn new(fifo_name: &str) -> IpcWriter { + let fifo_path = format!("{}{}", FIFO_DIR_PREPEND, fifo_name); + IpcWriter { + fifo_path_str: fifo_path.to_string(), + } + } + + fn write(&self, data: &[u8]) -> Result<(), std::io::Error> { + let path_obj = Path::new(self.fifo_path_str.as_str()); + let mut fifo = OpenOptions::new().write(true).open(path_obj)?; + fifo.write_all(data)?; + println!("Message written to the pipe"); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::Duration; + + struct TestHandler { + received_data: Arc>>, + } + + impl TestHandler { + fn new() -> Self { + Self { + received_data: Arc::new(Mutex::new(Vec::new())), + } + } + } + + impl FifoDataHandler for TestHandler { + fn handle_fifo_input(&self, data: &[u8]) { + let mut received_data = self.received_data.lock().unwrap(); + received_data.extend_from_slice(data); + println!("Fifo handler callback received data: {:?}", received_data); + } + } + + #[tokio::test] + async fn test_ipc_reader_writer() -> Result<(), Box> { + let fifo_name = "rust_ipc_test"; + + let handler = Arc::new(TestHandler::new()); + let reader = IpcReader::new(fifo_name, handler.clone()).unwrap(); + + reader.start_reader_thread(); + + // Ensure reader thread is ready + thread::sleep(Duration::from_secs(1)); + + let writer = IpcWriter::new(fifo_name); + + let test_data = b"This is from the writer"; + writer.write(test_data)?; + + // Give some time for the reader to process the message + thread::sleep(Duration::from_secs(1)); + + let received_data = handler.received_data.lock().unwrap().clone(); + assert_eq!(received_data, test_data); + + Ok(()) + } +} diff --git a/src/message.rs b/src/cmd.rs similarity index 94% rename from src/message.rs rename to src/cmd.rs index d658fc8..917d819 100644 --- a/src/message.rs +++ b/src/cmd.rs @@ -67,6 +67,7 @@ pub struct Command { impl Command { #[allow(dead_code)] + /// Convert a message (array of bytes) into a Command struct [handle uplink GS -> OBC] pub fn deserialize(msg: &Message) -> Command { let len: usize = usize::from(msg[MSG_LEN_IX]); let mut cmd = Command { @@ -86,6 +87,7 @@ impl Command { } #[allow(dead_code)] + /// Convert a Command struct into an array of bytes [handle downlink OBC -> GS] pub fn serialize(&self) -> Message { let mut msg: Message = [0; MSG_LEN]; msg[MSG_LEN_IX] = (self.oplen + MSG_OPDATA_OFF) as u8; diff --git a/src/component.rs b/src/component.rs index d3e020c..4508f62 100644 --- a/src/component.rs +++ b/src/component.rs @@ -1,4 +1,7 @@ -use crate::message::{self, Command, Message}; +use crate::{ + cmd::{self, Command, Message}, + DFGM::dfgm_handler, +}; pub struct Eps { uart: u8, @@ -37,60 +40,34 @@ impl Adcs { } } -pub struct Dfgm { - port: u16, - state: u8, -} - -impl Dfgm { - fn configure() -> Dfgm { - Dfgm { port: 1, state: 0 } - } - - fn dispatch(&mut self, cmd: &Command) -> Result { - if cmd.oplen > 0 { - let arg = std::str::from_utf8(&cmd.opdata).unwrap(); - println!("dfgm op data: {0}", arg); - match arg.parse::() { - Ok(i) => self.port = i, - Err(s) => println!("Parse failed: {}", s), - } - } - println!("Dfgm: opcode {}, port: {}", cmd.opcode, self.port); - self.state += 1; - Ok(cmd.status_msg(self.state)) - } -} - - pub enum Component { Root, Eps(Eps), Adcs(Adcs), - Dfgm(Dfgm), + Dfgm(dfgm_handler::Dfgm), } pub fn init() -> Vec { let mut components: Vec = Vec::new(); - for (index, p) in message::PAYLOADS.iter().enumerate() { + for (index, p) in cmd::PAYLOADS.iter().enumerate() { assert_eq!(index, p.id, "payload index/id mismatch"); match p.name { - message::PAYLOAD_EPS => components.push(Component::Eps(Eps::configure())), - message::PAYLOAD_ADCS => components.push(Component::Adcs(Adcs::configure(7))), - message::PAYLOAD_DFGM => components.push(Component::Dfgm(Dfgm::configure())), + cmd::PAYLOAD_EPS => components.push(Component::Eps(Eps::configure())), + cmd::PAYLOAD_ADCS => components.push(Component::Adcs(Adcs::configure(7))), + cmd::PAYLOAD_DFGM => components.push(Component::Dfgm(dfgm_handler::Dfgm::configure())), _ => components.push(Component::Root), } } - - components + + components } - + pub fn dispatch_cmd(target: &mut Component, cmd: &Command) -> Result { match target { Component::Eps(eps) => eps.dispatch(cmd), Component::Adcs(adcs) => adcs.dispatch(cmd), Component::Dfgm(dfgm) => dfgm.dispatch(cmd), - _ => Err("Unrecognized component") + _ => Err("Unrecognized component"), } } diff --git a/src/main.rs b/src/main.rs index e891670..97923fd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ use std::net::{TcpListener, TcpStream, Shutdown}; use std::io::{Read, Write}; use clap::Parser; -use crate::message::Message; +use crate::cmd::Message; -mod message; +mod cmd; mod component; +mod DFGM; +pub use DFGM::dfgm_interface::*; +pub use DFGM::dfgm_handler::*; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -50,7 +54,7 @@ fn main() { } fn handle_client(mut stream: TcpStream, components: &mut [component::Component]) { - let mut data = [0; message::MSG_LEN]; + let mut data = [0; cmd::MSG_LEN]; while match stream.read(&mut data) { Ok(size) => { // echo everything! @@ -62,7 +66,7 @@ fn handle_client(mut stream: TcpStream, components: &mut [component::Component]) println!("read {size} bytes"); let response = handle_message(&data, components); match stream.write(&response) { - Ok(wlen) => if size < message::MSG_LEN { + Ok(wlen) => if size < cmd::MSG_LEN { println!("short write: {wlen}"); false } @@ -86,7 +90,7 @@ fn handle_client(mut stream: TcpStream, components: &mut [component::Component]) } fn handle_message(msg: &Message, components: &mut [component::Component]) -> Message { - let cmd = message::Command::deserialize(msg); + let cmd = cmd::Command::deserialize(msg); println!("handle_message: payload {0}, opcode {1}", cmd.payload, cmd.opcode); @@ -95,4 +99,4 @@ fn handle_message(msg: &Message, components: &mut [component::Component]) -> Mes Ok(msg) => msg, Err(_) => cmd.status_msg(255) } -} +} \ No newline at end of file