Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,12 @@ jobs:
run: |
cd impl
cargo clippy --all-features --tests -- -D warnings

clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: UpdateRust
run: rustup update stable
- name: Clippy
run: cargo clippy --workspace --all-features -- -D warnings
19 changes: 10 additions & 9 deletions impl/src/gen_am.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ pub(crate) fn impl_lamellar_serde_trait(
quote! {
impl #impl_generics #lamellar::active_messaging::LamellarSerde for #am_name #ty_generics #where_clause {
fn serialized_size(&self)->usize{
#lamellar::serialized_size(self,true)
#lamellar::serialized_size(self)
}
fn serialize_into(&self,buf: &mut [u8]){
#lamellar::serialize_into(buf,self,true).expect("can serialize and enough space in buf");
#lamellar::serialize_into(buf,self).expect("can serialize and enough space in buf");
}
fn serialize(&self)->Vec<u8>{
#lamellar::serialize(self,true).expect("can serialize")
#lamellar::serialize(self).expect("can serialize")
}
}
}
Expand All @@ -92,13 +92,13 @@ fn impl_return_lamellar_serde_trait(
quote! {
impl #impl_generics #lamellar::active_messaging::LamellarSerde for #am_name #ty_generics #where_clause {
fn serialized_size(&self)->usize{
#lamellar::serialized_size(&self.val,true)
#lamellar::serialized_size(&self.val)
}
fn serialize_into(&self,buf: &mut [u8]){
#lamellar::serialize_into(buf,&self.val,true).expect("can serialize and enough space in buf");
#lamellar::serialize_into(buf,&self.val).expect("can serialize and enough space in buf");
}
fn serialize(&self)->Vec<u8>{
#lamellar::serialize(self,true).expect("can serialize")
#lamellar::serialize(self).expect("can serialize")
}
}
}
Expand All @@ -115,11 +115,11 @@ pub(crate) fn impl_lamellar_result_serde_trait(
impl #impl_generics #lamellar::active_messaging::LamellarResultSerde for #am_name #ty_generics #where_clause {
fn serialized_result_size(&self,result: & Box<dyn std::any::Any + Sync + Send>)->usize{
let result = result.downcast_ref::<#ret_type>().expect("can downcast result box");
#lamellar::serialized_size(result,true)
#lamellar::serialized_size(result)
}
fn serialize_result_into(&self,buf: &mut [u8],result: & Box<dyn std::any::Any + Sync + Send>){
let result = result.downcast_ref::<#ret_type>().expect("can downcast result box");
#lamellar::serialize_into(buf,result,true).expect("can serialize and enough size in buf");
#lamellar::serialize_into(buf,result).expect("can serialize and enough size in buf");
}
}
}
Expand Down Expand Up @@ -159,7 +159,7 @@ fn impl_unpack_and_register_function(
let am_name_unpack = quote::format_ident!("{}_unpack", am_name.clone());
quote! {
fn #am_name_unpack #impl_generics (bytes: &[u8], cur_pe: Result<usize,#lamellar::IdError>) -> std::sync::Arc<dyn #lamellar::active_messaging::RemoteActiveMessage + Sync + Send> {
let __lamellar_data: std::sync::Arc<#am_name #ty_generics> = std::sync::Arc::new(#lamellar::deserialize(&bytes,true).expect("can deserialize into remote active message"));
let __lamellar_data: std::sync::Arc<#am_name #ty_generics> = std::sync::Arc::new(#lamellar::deserialize(&bytes).expect("can deserialize into remote active message"));
<#am_name #ty_generics as #lamellar::active_messaging::DarcSerde>::des(&__lamellar_data,cur_pe);
__lamellar_data
}
Expand Down Expand Up @@ -469,6 +469,7 @@ pub(crate) fn generate_am(
}
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn create_am_struct(
generics: &syn::Generics,
attributes: &proc_macro2::TokenStream,
Expand Down
3 changes: 3 additions & 0 deletions impl/src/gen_am_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn gen_am_group_return_stmt(
}
}

#[allow(clippy::too_many_arguments)]
fn impl_am_group_remote(
generics: &syn::Generics,
am_group_am_name: &syn::Ident,
Expand Down Expand Up @@ -539,6 +540,7 @@ pub(crate) fn generate_am_group(
}
}

#[allow(clippy::too_many_arguments)]
fn create_am_group_remote(
generics: &syn::Generics,
attributes: &proc_macro2::TokenStream,
Expand Down Expand Up @@ -640,6 +642,7 @@ fn create_am_group_remote(
)
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn create_am_group_structs(
generics: &syn::Generics,
attributes: &proc_macro2::TokenStream,
Expand Down
29 changes: 15 additions & 14 deletions impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use syn::spanned::Spanned;
// use syn::visit_mut::VisitMut;
use syn::parse::{Parse, ParseStream, Result};
use syn::Token;

fn type_name(ty: &syn::Type) -> Option<String> {
match ty {
syn::Type::Path(syn::TypePath { qself: None, path }) => {
Expand Down Expand Up @@ -177,7 +178,7 @@ fn check_for_am_group(args: &Punctuated<syn::Meta, Token![,]>) -> bool {
/// impl LamellarAM for HelloWorld {
/// async fn exec(self) {
/// println!(
/// "{:?} on PE {:?} of {:?} using thread {:?}, received from PE {:?}",
/// "{:?} on PE {:?} of {:?} using thread {:?}, received from PE {}",
/// self.msg,
/// lamellar::current_pe,
/// lamellar::num_pes,
Expand Down Expand Up @@ -231,18 +232,18 @@ pub fn AmData(args: TokenStream, input: TokenStream) -> TokenStream {
/// );
/// }
/// }
/// fn main() {
/// let world = lamellar::LamellarWorldBuilder::new().build();
/// let my_pe = Arc::new(Mutex::new(world.my_pe()));
/// world.barrier();
///
/// let request = world.exec_am_local(HelloWorld {
/// original_pe: my_pe,
/// });
/// let world = lamellar::LamellarWorldBuilder::new().build();
/// let my_pe = Arc::new(Mutex::new(world.my_pe()));
/// world.barrier();
///
/// //wait for the request to complete
/// request.block();
/// } //when world drops there is an implicit world.barrier() that occurs
/// let request = world.exec_am_local(HelloWorld {
/// original_pe: my_pe,
/// });
///
/// //wait for the request to complete
/// request.block();
/// //when `world` drops there is an implicit world.barrier() that occurs
///```
#[allow(non_snake_case)]
#[proc_macro_error]
Expand Down Expand Up @@ -430,7 +431,7 @@ fn parse_am(
/// impl LamellarAM for HelloWorld {
/// async fn exec(self) {
/// println!(
/// "{:?} on PE {:?} of {:?} using thread {:?}, received from PE {:?}",
/// "{:?} on PE {:?} of {:?} using thread {:?}, received from PE {}",
/// self.msg,
/// lamellar::current_pe,
/// lamellar::num_pes,
Expand All @@ -447,7 +448,7 @@ fn parse_am(
/// //Send a Hello World Active Message to all pes
/// let request = world.exec_am_all(HelloWorld {
/// original_pe: my_pe,
/// msg: msg,
/// msg: msg.into(),
/// });
///
/// //wait for the request to complete
Expand All @@ -471,6 +472,7 @@ pub fn am_group(args: TokenStream, input: TokenStream) -> TokenStream {
///
///```
/// use lamellar::active_messaging::prelude::*;
/// use std::sync::{Arc, Mutex};
///
/// use std::sync::{Arc, Mutex};
///
Expand Down Expand Up @@ -894,7 +896,6 @@ impl Parse for AmGroups {
#[proc_macro_error]
#[proc_macro]
pub fn typed_am_group(input: TokenStream) -> TokenStream {
// println!("typed_am_group {:?}",input);
let am_group: AmGroups = syn::parse(input).unwrap();
let am_type = am_group.am;
let team = am_group.team;
Expand Down
30 changes: 14 additions & 16 deletions src/active_messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,22 +438,20 @@
//! }
//! }
//!
//! fn main(){
//! let world = lamellar::LamellarWorldBuilder::new().build();
//! let my_pe = world.my_pe();
//! let num_pes = world.num_pes();
//! //Send initial message to right neighbor
//! let next_pe = (my_pe + 1) % num_pes; //account for wrap arround
//! let request = world.exec_am_pe(
//! next_pe,
//! RingAm {
//! original_pe: my_pe
//! }
//! );
//! //wait for the request to complete
//! let results = request.block();
//! println!("PE {my_pe} {results:?}");
//! }
//! let world = lamellar::LamellarWorldBuilder::new().build();
//! let my_pe = world.my_pe();
//! let num_pes = world.num_pes();
//! //Send initial message to right neighbor
//! let next_pe = (my_pe + 1) % num_pes; //account for wrap arround
//! let request = world.exec_am_pe(
//! next_pe,
//! RingAm {
//! original_pe: my_pe
//! }
//! );
//! //wait for the request to complete
//! let results = request.block();
//! println!("PE {my_pe} {results:?}");
//!```
//! The key thing to notice in this example is how we wait for a request to finish will change depending on the context we are executing in.
//! When we are in the active message we are already in an asynchronous context so we can simply `await` the future returned to us by the `exec_am_pe()` call.
Expand Down
26 changes: 12 additions & 14 deletions src/active_messaging/batching/simple_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl Batcher for SimpleBatcher {
}
let mut darcs = vec![];
data.ser(1, &mut darcs); //1 because we are only sending back to the original PE
let darc_list_size = crate::serialized_size(&darcs, false);
let darc_list_size = crate::serialized_size(&darcs);
let size = batch.add(
req_data,
LamellarData::Data(data, darcs, darc_list_size),
Expand Down Expand Up @@ -284,7 +284,7 @@ impl Batcher for SimpleBatcher {
let mut i = 0;
// println!("executing batched msg {:?}", data.len());
while i < data.len() {
let cmd: Cmd = crate::deserialize(&data[i..i + *CMD_LEN], false).unwrap();
let cmd: Cmd = crate::deserialize(&data[i..i + *CMD_LEN]).unwrap();
i += *CMD_LEN;
// let temp_i = i;
// println!("cmd {:?}", cmd);
Expand Down Expand Up @@ -407,15 +407,15 @@ impl SimpleBatcher {
cmd: Cmd,
) {
// println!("serialize_am");
crate::serialize_into(&mut data_buf[*i..*i + *CMD_LEN], &cmd, false).unwrap();
crate::serialize_into(&mut data_buf[*i..*i + *CMD_LEN], &cmd).unwrap();
*i += *CMD_LEN;

let am_header = AmHeader {
am_id,
req_id: req_data.id,
team_addr: req_data.team_addr,
};
crate::serialize_into(&mut data_buf[*i..*i + *AM_HEADER_LEN], &am_header, false).unwrap();
crate::serialize_into(&mut data_buf[*i..*i + *AM_HEADER_LEN], &am_header).unwrap();
*i += *AM_HEADER_LEN;

let am_size = am_size - (*CMD_LEN + *AM_HEADER_LEN);
Expand Down Expand Up @@ -446,7 +446,7 @@ impl SimpleBatcher {
darc_list_size: usize,
) {
// println!("serialize_data");
crate::serialize_into(&mut data_buf[*i..*i + *CMD_LEN], &Cmd::Data, false).unwrap();
crate::serialize_into(&mut data_buf[*i..*i + *CMD_LEN], &Cmd::Data).unwrap();
*i += *CMD_LEN;
let data_size = data_size - (*CMD_LEN + *DATA_HEADER_LEN + darc_list_size);
let data_header = DataHeader {
Expand All @@ -457,12 +457,11 @@ impl SimpleBatcher {
crate::serialize_into(
&mut data_buf[*i..*i + *DATA_HEADER_LEN],
&data_header,
false,
)
.unwrap();
*i += *DATA_HEADER_LEN;

crate::serialize_into(&mut data_buf[*i..(*i + darc_list_size)], &darcs, false).unwrap();
crate::serialize_into(&mut data_buf[*i..(*i + darc_list_size)], &darcs).unwrap();
*i += darc_list_size;

data.serialize_into(&mut data_buf[*i..*i + data_size]);
Expand All @@ -472,7 +471,7 @@ impl SimpleBatcher {
//#[tracing::instrument(skip_all)]
fn serialize_unit(req_data: ReqMetaData, data_buf: &mut [u8], i: &mut usize) {
// println!("serialize_unit");
crate::serialize_into(&mut data_buf[*i..*i + *CMD_LEN], &Cmd::Unit, false).unwrap();
crate::serialize_into(&mut data_buf[*i..*i + *CMD_LEN], &Cmd::Unit).unwrap();
*i += *CMD_LEN;

let unit_header = UnitHeader {
Expand All @@ -481,7 +480,6 @@ impl SimpleBatcher {
crate::serialize_into(
&mut data_buf[*i..*i + *UNIT_HEADER_LEN],
&unit_header,
false,
)
.unwrap();
*i += *UNIT_HEADER_LEN;
Expand Down Expand Up @@ -512,7 +510,7 @@ impl SimpleBatcher {
Some(AllocError::OutOfMemoryError(_)) => {
lamellae.alloc_pool(size * 2);
}
_ => panic!("unhanlded error!! {:?}", err),
_ => panic!("unhandled error!! {:?}", err),
}
data = lamellae.serialize_header(header.clone(), size);
}
Expand All @@ -531,9 +529,9 @@ impl SimpleBatcher {
) {
// println!("exec_am");
let am_header: AmHeader =
crate::deserialize(&data[*i..*i + *AM_HEADER_LEN], false).unwrap();
crate::deserialize(&data[*i..*i + *AM_HEADER_LEN]).unwrap();
let (team, world) =
ame.get_team_and_world(msg.src as usize, am_header.team_addr, &lamellae);
ame.get_team_and_world(msg.src as usize, am_header.team_addr, lamellae);
*i += *AM_HEADER_LEN;

let am = AMS_EXECS.get(&am_header.am_id).unwrap()(&data[*i..], team.team.team_pe);
Expand Down Expand Up @@ -590,9 +588,9 @@ impl SimpleBatcher {
) {
// println!("exec_return_am");
let am_header: AmHeader =
crate::deserialize(&data[*i..*i + *AM_HEADER_LEN], false).unwrap();
crate::deserialize(&data[*i..*i + *AM_HEADER_LEN]).unwrap();
let (team, world) =
ame.get_team_and_world(msg.src as usize, am_header.team_addr, &lamellae);
ame.get_team_and_world(msg.src as usize, am_header.team_addr, lamellae);
*i += *AM_HEADER_LEN;
let am = AMS_EXECS.get(&am_header.am_id).unwrap()(&data[*i..], team.team.team_pe);
*i += am.serialized_size();
Expand Down
Loading
Loading