You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello guys, I got a example running with rust dds as publisher and rti connext as subscriber.
I create following structure on the of c++/idl side
module dds_tutorials
{
module HelloWorldData
{
struct time {
longlong sec; // seconds since epoch in UTC
unsigned long nanosec; // nanoseconds since above second
};
struct Msg
{
long user_id;string<256>message;
time time_stamp;
};
};
};
and the c++ client look as following:
#include<dds/sub/ddssub.hpp>
#include<dds/core/ddscore.hpp>
#include<dds/domain/DomainParticipant.hpp>staticvoidRunPublisher(constint domainID)
{
const dds::domain::DomainParticipant participant(domain_id);
const dds::topic::Topic<dds_tutorials::HelloWorldData::Msg> topic(participant, "tutorials.helloworld");
const dds::sub::Subscriber subscriber(participant);
dds::sub::DataReader<dds_tutorials::HelloWorldData::Msg> reader(subscriber, topic);
dds::core::cond::StatusCondition status_condition(reader);
status_condition.enabled_statuses(dds::core::status::StatusMask::liveliness_changed());
// Lambda function for the status_condition// Handler register a custom handler with the condition
status_condition->handler([&reader]() {
// Get the status changes so we can check which status condition// triggered
dds::core::status::StatusMask status_mask = reader.status_changes();
// In Case of Liveliness changedif ((status_mask & dds::core::status::StatusMask::liveliness_changed()).any()) {
dds::core::status::LivelinessChangedStatus st = reader.liveliness_changed_status();
std::cout << "Liveliness changed => Active writers = " << st.alive_count() << std::endl;
}
});
dds::sub::cond::ReadCondition read_condition(
reader,
dds::sub::status::DataState::any(),
[&reader]() {
// Take all samples
dds::sub::LoanedSamples<dds_tutorials::HelloWorldData::Msg> samples = reader.take();
for (auto sample : samples) {
if (sample.info().valid()) {
std::cout << "=== [Subscriber] message received :"
<< "\n";
std::cout << " userID : " << sample->data().user_id() << "\n";
std::cout << " Message : \"" << sample->data().message() << "\"";
auto nanoseconds = std::chrono::nanoseconds(sample.data().time_stamp().nanosec());
auto seconds = std::chrono::seconds(sample.data().time_stamp().sec());
auto duration = seconds + nanoseconds;
auto tp = std::chrono::time_point<std::chrono::system_clock>{duration};
auto t_c = std::chrono::system_clock::to_time_t(tp);
std::cout << " time point: \"" << std::put_time(std::localtime(&t_c), "%F %T.\n") << "\"" << std::endl;
}
}
} // The LoanedSamples destructor returns the loan
);
// Create a WaitSet and attach both ReadCondition and StatusCondition
dds::core::cond::WaitSet waitset;
waitset += read_condition;
waitset += status_condition;
while (!shutdownRequested) {
waitset.dispatch(dds::core::Duration(4)); // Wait up to 4s each time
}
}
On the other side I create a publisher with rust dds:
fnmain(){configure_logging();let poll = Poll::new().unwrap();letmut events = Events::with_capacity(1);// Set Ctrl-C handlerlet(stop_sender, stop_receiver) = channel::channel();
ctrlc::set_handler(move || {
stop_sender.send(()).unwrap_or(());// ignore errors, as we are quitting anyway}).expect("Error setting Ctrl-C handler");println!("Press Ctrl-C to quit.");
poll.register(&stop_receiver,STOP_PROGRAM,Ready::readable(),PollOpt::edge(),).unwrap();let domain_id = 10u16;let domain_participant = DomainParticipant::new(domain_id).unwrap_or_else(|e| panic!("Domain Participant can not be created: Reason: {}", e));let qos = QosPolicyBuilder::new().reliability(policy::Reliability::BestEffort).durability(policy::Durability::Volatile).liveliness(rustdds::policy::Liveliness::Automatic{lease_duration: rustdds::Duration::DURATION_INFINITE}).history(policy::History::KeepLast{depth:10}).build();println!("Following topics are found:");let discovered_topics = domain_participant.discovered_topics();for topic in discovered_topics.iter(){println!("{}",topic.topic_name());}let camera_topic = domain_participant
.create_topic("tutorials.helloworld".to_string(),"dds_tutorials::HelloWorldData::Msg".to_string(),&qos,TopicKind::NoKey,).unwrap_or_else(|e| panic!("Topic can not be created: Reason: {}", e));let publisher = domain_participant.create_publisher(&qos).unwrap();let writer = publisher.create_datawriter_no_key_cdr::<HelloWorldData>(&camera_topic,None).unwrap();letmut last_write = Instant::now();let loop_delay = time::Duration::from_secs(2);let data = HelloWorldData{userID:10,message:"Hello World".to_string(),time_stamp:Time{sec:0i64,nanosec:0u32}};loop{
poll.poll(&mut events,Some(loop_delay)).unwrap();for event in&events {match event.token(){STOP_PROGRAM => {if stop_receiver.try_recv().is_ok(){println!("Done");return;}},
other_token => {println!("Polled event is {:?}. WTF?", other_token);}}}let now = Instant::now();if last_write + loop_delay < now {println!("Publishing new data:");letmut new_data = data.clone();let duration = time::SystemTime::now().duration_since(time::UNIX_EPOCH).expect("Time went backwards");
new_data.time_stamp.sec = duration.as_secs()asi64;
new_data.time_stamp.nanosec = (duration.as_nanos() - new_data.time_stamp.secasu128)asu32;//new_data.time_stamp.nanosec = duration.as_nanos();
writer
.write(new_data,None).unwrap_or_else(|e| panic!("DataWriter write failed: {:?}", e));
last_write = now;}}}
The connection between subscriber and publisher works without any problems but after approximately 1 min the subscriber print out that the liveliness has changed and no worker is available and no data is coming anymore. Additional I can see in the rtiadminconsole that the datawriter disappear. If I have the same scenario but with rust a subscriber and a publisher this issue will not appear. Can you help me with this problem? Thanks
The text was updated successfully, but these errors were encountered:
Subscriber would indicate that Writer's liveliness has changed either because they are not receiving periodic liveness indications, or the Writer is sending that it is shutting down (disposed). With liveness setting "automatic", incoming application data should count as indication of liveness.
I cannot directly tell what is wrong, but you could try some of the following:
Add a status event handler for RustDDS publisher also. It should notice RTI Subscriber matching and disconnecting also, if it thinks that is happening.
Find a third DDS implementation and try making a publisher/subscriber with that.
Analyse RTPS traffic with Wireshark. Obviously, you need to watch the situation until you see the problems. Set packet input/display filters for UDP and RTPS. Things to look for:
When data is not coming anymore, is RustDDS still sending out DATA submessages in the same manner? Note that only some DATA submessages are your application's data. The rest are e.g. Discovery traffic.
Does RustDDS keep sending DomainParticipant announcements in the Discovery traffic? If RTI does not receive periodic DomainParticipant update from other participants, then it will think they are no longer alive.
Are there any messages indicating that (Data)Writer in your publisher has been actively Disposed? (These would be sent when DataWriter or RustDDS are shutting down.)
Hello guys, I got a example running with rust dds as publisher and rti connext as subscriber.
I create following structure on the of c++/idl side
and the c++ client look as following:
On the other side I create a publisher with rust dds:
The connection between subscriber and publisher works without any problems but after approximately 1 min the subscriber print out that the liveliness has changed and no worker is available and no data is coming anymore. Additional I can see in the rtiadminconsole that the datawriter disappear. If I have the same scenario but with rust a subscriber and a publisher this issue will not appear. Can you help me with this problem? Thanks
The text was updated successfully, but these errors were encountered: