diff --git a/src/dds/pubsub.rs b/src/dds/pubsub.rs index ca75748d..fb28b763 100644 --- a/src/dds/pubsub.rs +++ b/src/dds/pubsub.rs @@ -11,7 +11,7 @@ use byteorder::LittleEndian; use log::{debug, error, info, trace, warn}; use crate::{ - create_error_dropped, create_error_poisoned, + create_error_dropped, create_error_internal, create_error_poisoned, dds::{ adapters, key::Keyed, @@ -51,7 +51,7 @@ use super::{ }; #[cfg(feature = "security")] use crate::{ - create_error_internal, create_error_not_allowed_by_security, + create_error_not_allowed_by_security, security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo}, }; #[cfg(not(feature = "security"))] @@ -550,6 +550,8 @@ impl InnerPublisher { security_plugins: self.security_plugins_handle.clone(), }; + // Send writer ingredients to DP event loop, where the actual writer will be + // constructed self .add_writer_sender .send(new_writer) @@ -601,10 +603,36 @@ impl InnerPublisher { None }; + // Update topic to DiscoveryDB & inform Discovery about it let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info); db.update_local_topic_writer(dwd); db.update_topic_data_p(topic); + if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic { + topic_name: topic.name(), + }) { + // Log the error but don't quit, failing to inform Discovery about the topic + // shouldn't be that serious + error!( + "Failed send DiscoveryCommand::AddTopic about topic {}: {}", + topic.name(), + e + ); + } + + // Inform Discovery about the new writer + let writer_guid = self.domain_participant.guid().from_prefix(entity_id); + self + .discovery_command + .try_send(DiscoveryCommand::AddLocalWriter { guid: writer_guid }) + .or_else(|e| { + create_error_internal!( + "Cannot inform Discovery about the new writer {writer_guid:?}. Error: {}", + e + ) + })?; + + // Return the DataWriter to user Ok(data_writer) } @@ -1129,6 +1157,7 @@ impl InnerSubscriber { None }; + // Update topic to DiscoveryDB & inform Discovery about it { let mut db = self .discovery_db @@ -1136,6 +1165,18 @@ impl InnerSubscriber { .or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?; db.update_local_topic_reader(&dp, topic, &new_reader, security_info); db.update_topic_data_p(topic); + + if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic { + topic_name: topic.name(), + }) { + // Log the error but don't quit, failing to inform Discovery about the topic + // shouldn't be that serious + error!( + "Failed send DiscoveryCommand::AddTopic about topic {}: {}", + topic.name(), + e + ); + } } let datareader = with_key::SimpleDataReader::::new( @@ -1152,12 +1193,26 @@ impl InnerSubscriber { poll_event_source, )?; - // Return the DataReader Reader pairs to where they are used + // Send reader ingredients to DP event loop, where the actual reader will be + // constructed self .sender_add_reader .try_send(new_reader) .or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?; + // Inform Discovery about the new reader + let reader_guid = self.domain_participant.guid().from_prefix(entity_id); + self + .discovery_command + .try_send(DiscoveryCommand::AddLocalReader { guid: reader_guid }) + .or_else(|e| { + create_error_internal!( + "Cannot inform Discovery about the new reader {reader_guid:?}. Error: {}", + e + ) + })?; + + // Return the DataReader to user Ok(datareader) } diff --git a/src/discovery/discovery.rs b/src/discovery/discovery.rs index bf5c42b7..2cd73592 100644 --- a/src/discovery/discovery.rs +++ b/src/discovery/discovery.rs @@ -55,9 +55,18 @@ use crate::{ #[cfg(not(feature = "security"))] use crate::no_security::*; -#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum DiscoveryCommand { StopDiscovery, + AddLocalWriter { + guid: GUID, + }, + AddLocalReader { + guid: GUID, + }, + AddTopic { + topic_name: String, + }, RemoveLocalWriter { guid: GUID, }, @@ -153,6 +162,7 @@ mod no_key { pub topic: Topic, pub reader: crate::no_key::DataReader>, pub writer: crate::no_key::DataWriter>, + #[allow(dead_code)] // Timers currently not used for no_key discovery topics pub timer: Timer<()>, } } @@ -244,15 +254,15 @@ pub(crate) struct Discovery { #[cfg(feature = "security")] dcps_participant_volatile_message_secure: no_key::DiscoveryTopicCDR, // CDR? + + #[cfg(feature = "security")] + cached_secure_discovery_messages_resend_timer: Timer<()>, } impl Discovery { const PARTICIPANT_CLEANUP_PERIOD: StdDuration = StdDuration::from_secs(2); const TOPIC_CLEANUP_PERIOD: StdDuration = StdDuration::from_secs(60); // timer for cleaning up inactive topics const SEND_PARTICIPANT_INFO_PERIOD: StdDuration = StdDuration::from_secs(2); - const SEND_READERS_INFO_PERIOD: StdDuration = StdDuration::from_secs(2); - const SEND_WRITERS_INFO_PERIOD: StdDuration = StdDuration::from_secs(2); - const SEND_TOPIC_INFO_PERIOD: StdDuration = StdDuration::from_secs(10); const CHECK_PARTICIPANT_MESSAGES: StdDuration = StdDuration::from_secs(1); #[cfg(feature = "security")] const CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_PERIOD: StdDuration = StdDuration::from_secs(1); @@ -333,7 +343,7 @@ impl Discovery { $stateless_RTPS:expr, $reader_entity_id:expr, $reader_token:expr, $writer_entity_id:expr, - $timeout:expr, $timer_token:expr, ) => {{ + $timeout_and_timer_token_opt:expr, ) => {{ let topic = domain_participant .create_topic( $topic_name.to_string(), @@ -367,11 +377,12 @@ impl Discovery { .expect("Failed to register a discovery reader to poll."); let mut timer: Timer<()> = Timer::default(); - timer.set_timeout($timeout, ()); - poll - .register(&timer, $timer_token, Ready::readable(), PollOpt::edge()) - .expect("Unable to register timer token. "); - + if let Some((timeout_value, timer_token)) = $timeout_and_timer_token_opt { + timer.set_timeout(timeout_value, ()); + poll + .register(&timer, timer_token, Ready::readable(), PollOpt::edge()) + .expect("Unable to register timer token. "); + } paste! { $has_key ::[] { topic, reader, writer, timer } } }}; // macro } @@ -408,8 +419,10 @@ impl Discovery { EntityId::SPDP_BUILTIN_PARTICIPANT_READER, DISCOVERY_PARTICIPANT_DATA_TOKEN, EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER, - Self::SEND_PARTICIPANT_INFO_PERIOD, - DISCOVERY_SEND_PARTICIPANT_INFO_TOKEN, + Some(( + Self::SEND_PARTICIPANT_INFO_PERIOD, + DISCOVERY_SEND_PARTICIPANT_INFO_TOKEN, + )), ); // create lease duration check timer @@ -438,8 +451,7 @@ impl Discovery { EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_READER, DISCOVERY_READER_DATA_TOKEN, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, - Self::SEND_READERS_INFO_PERIOD, - DISCOVERY_SEND_READERS_INFO_TOKEN, + None, // No timer ); // Publication : Who are the Writers here and elsewhere @@ -454,8 +466,7 @@ impl Discovery { EntityId::SEDP_BUILTIN_PUBLICATIONS_READER, DISCOVERY_WRITER_DATA_TOKEN, EntityId::SEDP_BUILTIN_PUBLICATIONS_WRITER, - Self::SEND_WRITERS_INFO_PERIOD, - DISCOVERY_SEND_WRITERS_INFO_TOKEN, + None, // No timer ); // Topic topic (not a typo) @@ -470,8 +481,7 @@ impl Discovery { EntityId::SEDP_BUILTIN_TOPIC_READER, DISCOVERY_TOPIC_DATA_TOKEN, EntityId::SEDP_BUILTIN_TOPIC_WRITER, - Self::SEND_TOPIC_INFO_PERIOD, - DISCOVERY_SEND_TOPIC_INFO_TOKEN, + None, // No timer ); // create lease duration check timer @@ -499,8 +509,10 @@ impl Discovery { EntityId::P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, DISCOVERY_PARTICIPANT_MESSAGE_TOKEN, EntityId::P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, - Self::CHECK_PARTICIPANT_MESSAGES, - DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN, + Some(( + Self::CHECK_PARTICIPANT_MESSAGES, + DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN, + )), ); // DDS Security @@ -518,8 +530,7 @@ impl Discovery { EntityId::SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER, SECURE_DISCOVERY_PARTICIPANT_DATA_TOKEN, EntityId::SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, - Self::SEND_PARTICIPANT_INFO_PERIOD, - SECURE_DISCOVERY_SEND_PARTICIPANT_INFO_TOKEN, + None, // No timer. Periodic sending is done simultaneously with the non-secure topic ); // Subscriptions: What are the Readers on the network and what are they @@ -536,8 +547,7 @@ impl Discovery { EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, SECURE_DISCOVERY_READER_DATA_TOKEN, EntityId::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, - Self::SEND_READERS_INFO_PERIOD, - SECURE_DISCOVERY_SEND_READERS_INFO_TOKEN, + None, // No timer ); // Publication : Who are the Writers here and elsewhere @@ -553,8 +563,7 @@ impl Discovery { EntityId::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, SECURE_DISCOVERY_WRITER_DATA_TOKEN, EntityId::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, - Self::SEND_WRITERS_INFO_PERIOD, - SECURE_DISCOVERY_SEND_WRITERS_INFO_TOKEN, + None, // No timer ); // p2p Participant message secure @@ -570,8 +579,7 @@ impl Discovery { EntityId::P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, P2P_SECURE_DISCOVERY_PARTICIPANT_MESSAGE_TOKEN, EntityId::P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, - Self::CHECK_PARTICIPANT_MESSAGES, - P2P_SECURE_DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN, + None, // No timer. Periodic sending is done simultaneously with the non-secure topic ); // p2p Participant stateless message, used for authentication and Diffie-Hellman // key exchange @@ -587,8 +595,7 @@ impl Discovery { EntityId::P2P_BUILTIN_PARTICIPANT_STATELESS_READER, P2P_PARTICIPANT_STATELESS_MESSAGE_TOKEN, EntityId::P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, - Self::CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_PERIOD, - CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_TIMER_TOKEN, + None, // No timer ); // p2p Participant volatile message secure, used for key exchange @@ -605,10 +612,28 @@ impl Discovery { EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER, P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_TOKEN, EntityId::P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, - Self::CHECK_PARTICIPANT_MESSAGES, - P2P_BUILTIN_PARTICIPANT_VOLATILE_TIMER_TOKEN, + None, // No timer. ); + // Create a timer to periodically check whether to resend any cached security + // (authentication, key exchange) messages + #[cfg(feature = "security")] + let secure_message_resend_timer = { + let mut secure_message_resend_timer: Timer<()> = Timer::default(); + secure_message_resend_timer + .set_timeout(Self::CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_PERIOD, ()); + try_construct!( + poll.register( + &secure_message_resend_timer, + CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_TIMER_TOKEN, + Ready::readable(), + PollOpt::edge(), + ), + "Unable to create secure message resend timer. {:?}" + ); + secure_message_resend_timer + }; + #[cfg(not(feature = "security"))] let security_opt = security_plugins_opt.and(None); // = None, but avoid warning. @@ -660,6 +685,8 @@ impl Discovery { dcps_participant_stateless_message, #[cfg(feature = "security")] dcps_participant_volatile_message_secure, + #[cfg(feature = "security")] + cached_secure_discovery_messages_resend_timer: secure_message_resend_timer, }) } @@ -702,6 +729,15 @@ impl Discovery { info!("Stopped Discovery"); return; // terminate event loop } + DiscoveryCommand::AddLocalWriter { guid } => { + self.write_single_writer_info(guid); + } + DiscoveryCommand::AddLocalReader { guid } => { + self.write_single_reader_info(guid); + } + DiscoveryCommand::AddTopic { topic_name } => { + self.write_topic_info(&topic_name); + } DiscoveryCommand::RemoveLocalWriter { guid } => { if guid == self.dcps_publication.writer.guid() { continue; @@ -791,24 +827,9 @@ impl Discovery { DISCOVERY_READER_DATA_TOKEN => { self.handle_subscription_reader(None); } - DISCOVERY_SEND_READERS_INFO_TOKEN => { - self.write_readers_info(); - self - .dcps_subscription - .timer - .set_timeout(Self::SEND_READERS_INFO_PERIOD, ()); - } DISCOVERY_WRITER_DATA_TOKEN => { self.handle_publication_reader(None); } - DISCOVERY_SEND_WRITERS_INFO_TOKEN => { - self.write_writers_info(); - self - .dcps_publication - .timer - // .writers_send_info_timer - .set_timeout(Self::SEND_WRITERS_INFO_PERIOD, ()); - } DISCOVERY_TOPIC_DATA_TOKEN => { self.handle_topic_reader(None); } @@ -819,14 +840,6 @@ impl Discovery { .topic_cleanup_timer .set_timeout(Self::TOPIC_CLEANUP_PERIOD, ()); } - DISCOVERY_SEND_TOPIC_INFO_TOKEN => { - self.write_topic_info(); - self - .dcps_topic - .timer - //.topic_info_send_timer - .set_timeout(Self::SEND_TOPIC_INFO_PERIOD, ()); - } DISCOVERY_PARTICIPANT_MESSAGE_TOKEN | P2P_SECURE_DISCOVERY_PARTICIPANT_MESSAGE_TOKEN => { self.handle_participant_message_reader(); } @@ -866,13 +879,6 @@ impl Discovery { #[cfg(feature = "security")] self.handle_secure_publication_reader(None); } - SECURE_DISCOVERY_SEND_PARTICIPANT_INFO_TOKEN - | SECURE_DISCOVERY_SEND_READERS_INFO_TOKEN - | SECURE_DISCOVERY_SEND_WRITERS_INFO_TOKEN - | P2P_SECURE_DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN - | P2P_BUILTIN_PARTICIPANT_VOLATILE_TIMER_TOKEN => { - debug!("Handler not implemented for {:?}", event.token()); - } other_token => { error!("discovery event loop got token: {:?}", other_token); @@ -1653,10 +1659,9 @@ impl Discovery { &self.dcps_participant_volatile_message_secure.writer, ); - // Reset timer for resending authentication messages + // Reset timer for resending security messages self - .dcps_participant_stateless_message - .timer + .cached_secure_discovery_messages_resend_timer .set_timeout(Self::CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_PERIOD, ()); } } @@ -1677,9 +1682,66 @@ impl Discovery { discovery_db_write(&self.discovery_db).topic_cleanup(); } + pub fn write_single_reader_info(&self, guid: GUID) { + let db = discovery_db_read(&self.discovery_db); + if let Some(reader_data) = db.get_local_topic_reader(guid) { + if !reader_data + .reader_proxy + .remote_reader_guid + .entity_id + .kind() + .is_user_defined() + { + // Only readers of user-defined topics are published to discovery + return; + } + + #[cfg(not(feature = "security"))] + let do_nonsecure_write = true; + + #[cfg(feature = "security")] + let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() { + security.write_single_reader_info( + &self.dcps_subscription.writer, + &self.dcps_subscriptions_secure.writer, + reader_data, + ); + false + } else { + true // No security configured + }; + + if do_nonsecure_write { + match self + .dcps_subscription + .writer + .write(reader_data.clone(), None) + { + Ok(()) => { + debug!( + "Published DCPSSubscription data on topic {}, reader guid {:?}", + reader_data.subscription_topic_data.topic_name(), + guid + ); + } + Err(e) => { + error!( + "Failed to publish DCPSSubscription data on topic {}, reader guid {:?}. Error: {e}", + reader_data.subscription_topic_data.topic_name(), + guid + ); + // TODO: try again later? + } + } + } + } else { + warn!("Did not find a local reader with guid {guid:?}"); + } + } + pub fn write_readers_info(&self) { let db = discovery_db_read(&self.discovery_db); - let local_user_readers: Vec<&DiscoveredReaderData> = db + let local_user_reader_guids = db .get_all_local_topic_readers() .filter(|p| { p.reader_proxy @@ -1688,41 +1750,73 @@ impl Discovery { .kind() .is_user_defined() }) - .collect(); + .map(|drd| drd.reader_proxy.remote_reader_guid); - #[cfg(not(feature = "security"))] - let do_nonsecure_write = true; + for guid in local_user_reader_guids { + self.write_single_reader_info(guid); + } + } - #[cfg(feature = "security")] - let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() { - // Write subscriptions in Secure discovery - security.write_readers_info( - &self.dcps_subscription.writer, - &self.dcps_subscriptions_secure.writer, - &local_user_readers, - ); - false - } else { - true // No security configured - }; + pub fn write_single_writer_info(&self, guid: GUID) { + let db = discovery_db_read(&self.discovery_db); + if let Some(writer_data) = db.get_local_topic_writer(guid) { + if !writer_data + .writer_proxy + .remote_writer_guid + .entity_id + .kind() + .is_user_defined() + { + // Only writers of user-defined topics are published to discovery + return; + } - if do_nonsecure_write { - let mut count = 0; - for data in local_user_readers { - match self.dcps_subscription.writer.write(data.clone(), None) { - Ok(_) => { - count += 1; + #[cfg(not(feature = "security"))] + let do_nonsecure_write = true; + + #[cfg(feature = "security")] + let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() { + security.write_single_writer_info( + &self.dcps_publication.writer, + &self.dcps_publications_secure.writer, + writer_data, + ); + false + } else { + true // No security configured + }; + + if do_nonsecure_write { + match self + .dcps_publication + .writer + .write(writer_data.clone(), None) + { + Ok(()) => { + debug!( + "Published DCPSPublication data on topic {}, writer guid {:?}", + writer_data.publication_topic_data.topic_name(), + guid + ); + } + Err(e) => { + error!( + "Failed to publish DCPSPublication data on topic {}, writer guid {:?}. Error: {e}", + writer_data.publication_topic_data.topic_name(), + guid + ); + // TODO: try again later? } - Err(e) => error!("Unable to write new readers info. {e:?}"), } } - debug!("Announced {} readers", count); + } else { + warn!("Did not find a local writer with guid {guid:?}"); } } pub fn write_writers_info(&self) { let db: std::sync::RwLockReadGuard<'_, DiscoveryDB> = discovery_db_read(&self.discovery_db); - let local_user_writers: Vec<&DiscoveredWriterData> = db + let local_user_writer_guids = db .get_all_local_topic_writers() .filter(|p| { p.writer_proxy @@ -1731,48 +1825,39 @@ impl Discovery { .kind() .is_user_defined() }) - .collect(); + .map(|drd| drd.writer_proxy.remote_writer_guid); - #[cfg(not(feature = "security"))] - let do_nonsecure_write = true; + for guid in local_user_writer_guids { + self.write_single_writer_info(guid); + } + } - #[cfg(feature = "security")] - let do_nonsecure_write = if let Some(security) = self.security_opt.as_ref() { - // Write publications in Secure discovery - security.write_writers_info( - &self.dcps_publication.writer, - &self.dcps_publications_secure.writer, - &local_user_writers, - ); - false - } else { - true // No security configured + pub fn write_topic_info(&self, topic_name: &str) { + let db = discovery_db_read(&self.discovery_db); + // We might have multiple topics with the same name (but different Qos etc..), + // and the following call gets just one of them. Should we publish all of + // them or is this enough? + let topic_data = match db.get_topic(topic_name) { + Some(data) => data, + None => { + warn!("Did not find topic data with topic name {topic_name}"); + return; + } }; - if do_nonsecure_write { - let mut count = 0; - for data in local_user_writers { - if self - .dcps_publication - .writer - .write(data.clone(), None) - .is_err() - { - error!("Unable to write new writers info."); - } else { - count += 1; - } - } - debug!("Announced {} writers", count); + // Only user-defined topics are published to discovery + let is_user_defined = !topic_data.topic_name().starts_with("DCPS"); + if !is_user_defined { + return; } - } - pub fn write_topic_info(&self) { - let db = discovery_db_read(&self.discovery_db); - let datas = db.local_user_topics(); - for data in datas { - if let Err(e) = self.dcps_topic.writer.write(data.clone(), None) { - error!("Unable to write new topic info: {e:?}"); + match self.dcps_topic.writer.write(topic_data.clone(), None) { + Ok(()) => { + debug!("Published topic {topic_name} to DCPSTopic"); + } + Err(e) => { + error!("Failed to publish topic {topic_name} to DCPSTopic: {e}"); + // TODO: try again later? } } } diff --git a/src/discovery/discovery_db.rs b/src/discovery/discovery_db.rs index f0812793..45a6ed2a 100644 --- a/src/discovery/discovery_db.rs +++ b/src/discovery/discovery_db.rs @@ -674,6 +674,14 @@ impl DiscoveryDB { self.local_topic_readers.remove(&guid); } + pub fn get_local_topic_reader(&self, guid: GUID) -> Option<&DiscoveredReaderData> { + self.local_topic_readers.get(&guid) + } + + pub fn get_local_topic_writer(&self, guid: GUID) -> Option<&DiscoveredWriterData> { + self.local_topic_writers.get(&guid) + } + pub fn get_all_local_topic_readers(&self) -> impl Iterator { self.local_topic_readers.values() } @@ -694,20 +702,6 @@ impl DiscoveryDB { .flat_map(|(_, gm)| gm.iter().map(|(_, dtd)| &dtd.1)) } - // as above, but only from my GUID - pub fn local_user_topics(&self) -> impl Iterator { - let me = self.my_guid.prefix; - self - .topics - .iter() - .filter(|(s, _)| !s.starts_with("DCPS")) - .flat_map(move |(_, gm)| { - gm.iter() - .filter(move |(guid, _)| guid.prefix == me) - .map(|(_, dtd)| &dtd.1) - }) - } - // a Topic may have multiple definitions, because there may be multiple // participants publishing the topic information. // At least the QoS details may be different. diff --git a/src/discovery/secure_discovery.rs b/src/discovery/secure_discovery.rs index c78540d8..7438e35c 100644 --- a/src/discovery/secure_discovery.rs +++ b/src/discovery/secure_discovery.rs @@ -855,16 +855,16 @@ impl SecureDiscovery { } } - pub fn write_readers_info( + pub fn write_single_reader_info( &self, nonsecure_sub_writer: &DataWriterPlCdr, secure_sub_writer: &DataWriterPlCdr, - local_user_readers: &Vec<&DiscoveredReaderData>, + local_user_reader: &DiscoveredReaderData, ) { - for data in local_user_readers { - // See if this subscription needs to be written to DCPSSubscriptionsSecure or - // the normal one - let do_secure_write = if let Some(sec_info) = data.subscription_topic_data.security_info() { + // See if this subscription needs to be written to DCPSSubscriptionsSecure or + // the normal one + let do_secure_write = + if let Some(sec_info) = local_user_reader.subscription_topic_data.security_info() { let sec_attributes = EndpointSecurityAttributes::from(sec_info.clone()); sec_attributes .topic_security_attributes @@ -873,33 +873,44 @@ impl SecureDiscovery { false }; - if do_secure_write { - let sec_sub_data = SubscriptionBuiltinTopicDataSecure::from((*data).clone()); - if let Err(e) = secure_sub_writer.write(sec_sub_data, None) { - error!( - "Failed to write subscription to DCPSSubscriptionsSecure: {}", - e - ); - } + if do_secure_write { + let sec_sub_data = SubscriptionBuiltinTopicDataSecure::from((*local_user_reader).clone()); + if let Err(e) = secure_sub_writer.write(sec_sub_data, None) { + error!( + "Failed to write subscription to DCPSSubscriptionsSecure: {}", + e + ); } else { - // Do a non-secure write - if let Err(e) = nonsecure_sub_writer.write((*data).clone(), None) { - error!("Failed to write subscription to DCPSSubscriptions: {}", e); - } + security_info!( + "Published DCPSSubscriptionsSecure data on topic {}, reader guid {:?}", + local_user_reader.subscription_topic_data.topic_name(), + local_user_reader.reader_proxy.remote_reader_guid + ); + } + } else { + // Do a non-secure write + if let Err(e) = nonsecure_sub_writer.write((*local_user_reader).clone(), None) { + error!("Failed to write subscription to DCPSSubscriptions: {}", e); + } else { + debug!( + "Published DCPSSubscriptions data on topic {}, reader guid {:?}", + local_user_reader.subscription_topic_data.topic_name(), + local_user_reader.reader_proxy.remote_reader_guid + ); } } } - pub fn write_writers_info( + pub fn write_single_writer_info( &self, nonsecure_pub_writer: &DataWriterPlCdr, secure_pub_writer: &DataWriterPlCdr, - local_user_writers: &Vec<&DiscoveredWriterData>, + local_user_writer: &DiscoveredWriterData, ) { - for data in local_user_writers { - // See if this publication needs to be written to DCPSPublicationsSecure or the - // normal one - let do_secure_write = if let Some(sec_info) = data.publication_topic_data.security_info() { + // See if this publication needs to be written to DCPSPublicationsSecure or the + // normal one + let do_secure_write = + if let Some(sec_info) = local_user_writer.publication_topic_data.security_info() { let sec_attributes = EndpointSecurityAttributes::from(sec_info.clone()); sec_attributes .topic_security_attributes @@ -908,19 +919,30 @@ impl SecureDiscovery { false }; - if do_secure_write { - let sec_pub_data = PublicationBuiltinTopicDataSecure::from((*data).clone()); - if let Err(e) = secure_pub_writer.write(sec_pub_data, None) { - error!( - "Failed to write publication to DCPSPublicationsSecure: {}", - e - ); - } + if do_secure_write { + let sec_pub_data = PublicationBuiltinTopicDataSecure::from((*local_user_writer).clone()); + if let Err(e) = secure_pub_writer.write(sec_pub_data, None) { + error!( + "Failed to write publication to DCPSPublicationsSecure: {}", + e + ); } else { - // Do a non-secure write - if let Err(e) = nonsecure_pub_writer.write((*data).clone(), None) { - error!("Failed to write publication to DCPSPublications: {}", e); - } + security_info!( + "Published DCPSPublicationsSecure data on topic {}, writer guid {:?}", + local_user_writer.publication_topic_data.topic_name(), + local_user_writer.writer_proxy.remote_writer_guid + ); + } + } else { + // Do a non-secure write + if let Err(e) = nonsecure_pub_writer.write((*local_user_writer).clone(), None) { + error!("Failed to write publication to DCPSPublications: {}", e); + } else { + debug!( + "Published DCPSPublication data on topic {}, writer guid {:?}", + local_user_writer.publication_topic_data.topic_name(), + local_user_writer.writer_proxy.remote_writer_guid + ); } } } diff --git a/src/rtps/constant.rs b/src/rtps/constant.rs index 1c5e8299..1ffe5357 100644 --- a/src/rtps/constant.rs +++ b/src/rtps/constant.rs @@ -227,12 +227,9 @@ pub const DISCOVERY_PARTICIPANT_DATA_TOKEN: Token = Token(30 + PTB); pub const DISCOVERY_PARTICIPANT_CLEANUP_TOKEN: Token = Token(31 + PTB); pub const DISCOVERY_SEND_PARTICIPANT_INFO_TOKEN: Token = Token(32 + PTB); pub const DISCOVERY_READER_DATA_TOKEN: Token = Token(33 + PTB); -pub const DISCOVERY_SEND_READERS_INFO_TOKEN: Token = Token(34 + PTB); pub const DISCOVERY_WRITER_DATA_TOKEN: Token = Token(35 + PTB); -pub const DISCOVERY_SEND_WRITERS_INFO_TOKEN: Token = Token(36 + PTB); pub const DISCOVERY_TOPIC_DATA_TOKEN: Token = Token(37 + PTB); pub const DISCOVERY_TOPIC_CLEANUP_TOKEN: Token = Token(38 + PTB); -pub const DISCOVERY_SEND_TOPIC_INFO_TOKEN: Token = Token(39 + PTB); pub const DISCOVERY_PARTICIPANT_MESSAGE_TOKEN: Token = Token(40 + PTB); pub const DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN: Token = Token(41 + PTB); @@ -240,21 +237,13 @@ pub const DPEV_ACKNACK_TIMER_TOKEN: Token = Token(45 + PTB); pub const SECURE_DISCOVERY_PARTICIPANT_DATA_TOKEN: Token = Token(50 + PTB); // pub const DISCOVERY_PARTICIPANT_CLEANUP_TOKEN: Token = Token(51 + PTB); -pub const SECURE_DISCOVERY_SEND_PARTICIPANT_INFO_TOKEN: Token = Token(52 + PTB); pub const SECURE_DISCOVERY_READER_DATA_TOKEN: Token = Token(53 + PTB); -pub const SECURE_DISCOVERY_SEND_READERS_INFO_TOKEN: Token = Token(54 + PTB); pub const SECURE_DISCOVERY_WRITER_DATA_TOKEN: Token = Token(55 + PTB); -pub const SECURE_DISCOVERY_SEND_WRITERS_INFO_TOKEN: Token = Token(56 + PTB); -// pub const DISCOVERY_TOPIC_DATA_TOKEN: Token = Token(57 + PTB); -// pub const DISCOVERY_TOPIC_CLEANUP_TOKEN: Token = Token(58 + PTB); -// pub const DISCOVERY_SEND_TOPIC_INFO_TOKEN: Token = Token(59 + PTB); pub const P2P_SECURE_DISCOVERY_PARTICIPANT_MESSAGE_TOKEN: Token = Token(60 + PTB); -pub const P2P_SECURE_DISCOVERY_PARTICIPANT_MESSAGE_TIMER_TOKEN: Token = Token(61 + PTB); pub const P2P_PARTICIPANT_STATELESS_MESSAGE_TOKEN: Token = Token(62 + PTB); pub const CACHED_SECURE_DISCOVERY_MESSAGE_RESEND_TIMER_TOKEN: Token = Token(63 + PTB); pub const P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_TOKEN: Token = Token(64 + PTB); -pub const P2P_BUILTIN_PARTICIPANT_VOLATILE_TIMER_TOKEN: Token = Token(65 + PTB); // See note about maximum allowed number above.