Skip to content

Commit

Permalink
Cease to periodically publish subscriptions/publications/topics to di…
Browse files Browse the repository at this point in the history
…scovery. Instead publish them only when they're created/updated.
  • Loading branch information
ohuopio committed Nov 8, 2023
1 parent 97661f3 commit c684881
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 190 deletions.
61 changes: 58 additions & 3 deletions src/dds/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use byteorder::LittleEndian;
use log::{debug, error, info, trace, warn};

use crate::{
create_error_dropped, create_error_poisoned,
create_error_dropped, create_error_internal, create_error_poisoned,
dds::{
adapters,
key::Keyed,
Expand Down Expand Up @@ -51,7 +51,7 @@ use super::{
};
#[cfg(feature = "security")]
use crate::{
create_error_internal, create_error_not_allowed_by_security,
create_error_not_allowed_by_security,
security::{security_plugins::SecurityPluginsHandle, EndpointSecurityInfo},
};
#[cfg(not(feature = "security"))]
Expand Down Expand Up @@ -550,6 +550,8 @@ impl InnerPublisher {
security_plugins: self.security_plugins_handle.clone(),
};

// Send writer ingredients to DP event loop, where the actual writer will be
// constructed
self
.add_writer_sender
.send(new_writer)
Expand Down Expand Up @@ -601,10 +603,36 @@ impl InnerPublisher {
None
};

// Update topic to DiscoveryDB & inform Discovery about it
let dwd = DiscoveredWriterData::new(&data_writer, topic, &dp, security_info);
db.update_local_topic_writer(dwd);
db.update_topic_data_p(topic);

if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
topic_name: topic.name(),
}) {
// Log the error but don't quit, failing to inform Discovery about the topic
// shouldn't be that serious
error!(
"Failed send DiscoveryCommand::AddTopic about topic {}: {}",
topic.name(),
e
);
}

// Inform Discovery about the new writer
let writer_guid = self.domain_participant.guid().from_prefix(entity_id);
self
.discovery_command
.try_send(DiscoveryCommand::AddLocalWriter { guid: writer_guid })
.or_else(|e| {
create_error_internal!(
"Cannot inform Discovery about the new writer {writer_guid:?}. Error: {}",
e
)
})?;

// Return the DataWriter to user
Ok(data_writer)
}

Expand Down Expand Up @@ -1129,13 +1157,26 @@ impl InnerSubscriber {
None
};

// Update topic to DiscoveryDB & inform Discovery about it
{
let mut db = self
.discovery_db
.write()
.or_else(|e| create_error_poisoned!("Cannot lock discovery_db. {}", e))?;
db.update_local_topic_reader(&dp, topic, &new_reader, security_info);
db.update_topic_data_p(topic);

if let Err(e) = self.discovery_command.try_send(DiscoveryCommand::AddTopic {
topic_name: topic.name(),
}) {
// Log the error but don't quit, failing to inform Discovery about the topic
// shouldn't be that serious
error!(
"Failed send DiscoveryCommand::AddTopic about topic {}: {}",
topic.name(),
e
);
}
}

let datareader = with_key::SimpleDataReader::<D, SA>::new(
Expand All @@ -1152,12 +1193,26 @@ impl InnerSubscriber {
poll_event_source,
)?;

// Return the DataReader Reader pairs to where they are used
// Send reader ingredients to DP event loop, where the actual reader will be
// constructed
self
.sender_add_reader
.try_send(new_reader)
.or_else(|e| create_error_poisoned!("Cannot add DataReader. Error: {}", e))?;

// Inform Discovery about the new reader
let reader_guid = self.domain_participant.guid().from_prefix(entity_id);
self
.discovery_command
.try_send(DiscoveryCommand::AddLocalReader { guid: reader_guid })
.or_else(|e| {
create_error_internal!(
"Cannot inform Discovery about the new reader {reader_guid:?}. Error: {}",
e
)
})?;

// Return the DataReader to user
Ok(datareader)
}

Expand Down
Loading

0 comments on commit c684881

Please sign in to comment.