Skip to content

Commit

Permalink
port is now u16, added protocol enum, split the examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ValMobBIllich committed Oct 18, 2024
1 parent 3db458b commit c88127c
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 73 deletions.
92 changes: 92 additions & 0 deletions examples/encrypted_publisher_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{env, str::FromStr, time::SystemTime};

use env_logger::{Builder, Target};
use log::LevelFilter;
use paho_mqtt::SslOptionsBuilder;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID};

#[tokio::main]
async fn main() -> Result<(), UStatus> {
Builder::new()
.target(Target::Stdout) // Logs to stdout
.filter(None, LevelFilter::Trace) // Default level
.init();

// Set the protocol type (Mqtts for encrypted mqtt)
let protocol = MqttProtocol::Mqtts;

// Build the ssl options (only needed if protocol is Mqtts!)
let ssl_options = Some(
SslOptionsBuilder::new()
.key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found"))
.expect("Certificate file not found.")
.private_key_password(
env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"),
)
.enable_server_cert_auth(false)
.finalize(),
);
// If the mqtt broker has a specific username attached to the ssl certificate, it must be included in the config
let user_name = env::var("CLIENT_NAME")
.expect("CLIENT_NAME env variable not found")
.to_string();

let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: env::var("MQTT_HOSTNAME")
.expect("MQTT_HOSTNAME env variable not found")
.to_string(),
mqtt_port: 8883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: ssl_options,
username: user_name,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_B".to_string(),
UPClientMqttType::Device,
)
.await?;

let source =
UUri::from_str("//Vehicle_B/A8000/2/8A50").expect("Failed to create source filter");

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let message = UMessageBuilder::publish(source.clone())
.build_with_payload(
current_time.to_string(),
UPayloadFormat::UPAYLOAD_FORMAT_TEXT,
)
.expect("Failed to build message");

println!(
"Sending message: {} to source: {}",
current_time,
source.to_uri(false)
);
client.send(message).await?;
}
}
105 changes: 105 additions & 0 deletions examples/encrypted_subscriber_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/********************************************************************************
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{
env,
str::{self, FromStr},
sync::Arc,
};

use async_trait::async_trait;
use env_logger::{Builder, Target};
use log::LevelFilter;
use paho_mqtt::SslOptionsBuilder;
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID};

const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF;
const WILDCARD_ENTITY_VERSION: u32 = 0x0000_00FF;
const WILDCARD_RESOURCE_ID: u32 = 0x0000_FFFF;

struct PrintlnListener {}

#[async_trait]
impl UListener for PrintlnListener {
async fn on_receive(&self, message: UMessage) {
let msg_payload = message.payload.unwrap();
let msg_str: &str = str::from_utf8(&msg_payload).unwrap();
println!("Received message: {msg_str}");
}
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
Builder::new()
.target(Target::Stdout) // Logs to stdout
.filter(None, LevelFilter::Trace) // Default level
.init();

// Set the protocol type ("mqtts" for encrypted mqtt)
let protocol = MqttProtocol::Mqtts;

// Build the ssl options (only needed if protocol is Mqtts!)
let ssl_options = Some(
SslOptionsBuilder::new()
.key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found"))
.expect("Certificate file not found.")
.private_key_password(
env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"),
)
.enable_server_cert_auth(false)
.finalize(),
);
// If the mqtt broker has a specific username attached to the ssl certificate, it must be included in the config
let user_name = env::var("CLIENT_NAME")
.expect("CLIENT_NAME env variable not found")
.to_string();

// Build the configuration for the connection
let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: env::var("MQTT_HOSTNAME")
.expect("MQTT_HOSTNAME env variable not found")
.to_string(),
mqtt_port: 8883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
ssl_options: ssl_options,
username: user_name,
};

let client = UPClientMqtt::new(
config,
UUID::build(),
"Vehicle_B".to_string(),
UPClientMqttType::Device,
)
.await?;

let listener = Arc::new(PrintlnListener {});
let source_filter = UUri::from_str(&format!(
"//Vehicle_B/{WILDCARD_ENTITY_ID:X}/{WILDCARD_ENTITY_VERSION:X}/{WILDCARD_RESOURCE_ID:X}"
))
.expect("Failed to create source filter");

println!("Subscribing to: {}", source_filter.to_uri(false));

client
.register_listener(&source_filter, None, listener.clone())
.await?;

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
46 changes: 10 additions & 36 deletions examples/publisher_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::{env, str::FromStr, time::SystemTime};
use std::{str::FromStr, time::SystemTime};

use env_logger::{Builder, Target};
use log::LevelFilter;
use paho_mqtt::SslOptionsBuilder;
use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType};
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UMessageBuilder, UPayloadFormat, UStatus, UTransport, UUri, UUID};

#[tokio::main]
Expand All @@ -26,42 +25,17 @@ async fn main() -> Result<(), UStatus> {
.filter(None, LevelFilter::Trace) // Default level
.init();

// Get the protocol type from the environment (either "mqtt" or "mqtts" for unencrypted/ encrypted mqtt)
let protocol = env::var("MQTT_PROTOCOL")
.expect("MQTT_PROTOCOL env variable not found")
.to_string();
let mut ssl_options = None;
let mut user_name = "eclipse_testuser".to_string();
// Set the protocol type ("mqtt" for unencrypted mqtt)
let protocol = MqttProtocol::Mqtt;

// Check if "mqtts" is selected and set the optional parameters
if protocol == "mqtts".to_string() {
// Build the ssl options
ssl_options = Some(
SslOptionsBuilder::new()
.key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found"))
.expect("Certificate file not found.")
.private_key_password(
env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"),
)
.enable_server_cert_auth(false)
.finalize(),
);
// Set the username to the name attached to the certificate
user_name = env::var("CLIENT_NAME")
.expect("CLIENT_NAME env variable not found")
.to_string();
}
// no need to build ssl options since we are using unencrypted mqtt, username is arbitrary
let ssl_options = None;
let user_name = "eclipse_testuser".to_string();

let config = MqttConfig {
mqtt_protocol: env::var("MQTT_PROTOCOL")
.expect("MQTT_PROTOCOL env variable not found")
.to_string(),
mqtt_hostname: env::var("MQTT_HOSTNAME")
.expect("MQTT_HOSTNAME env variable not found")
.to_string(),
mqtt_port: env::var("MQTT_PORT")
.expect("MQTT_PORT env variable not found")
.to_string(),
mqtt_protocol: protocol,
mqtt_hostname: "localhost".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
Expand Down
42 changes: 8 additions & 34 deletions examples/subscriber_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@
********************************************************************************/

use std::{
env,
str::{self, FromStr},
sync::Arc,
};

use async_trait::async_trait;
use env_logger::{Builder, Target};
use log::LevelFilter;
use paho_mqtt::SslOptionsBuilder;
use up_client_mqtt5_rust::{MqttConfig, UPClientMqtt, UPClientMqttType};
use up_client_mqtt5_rust::{MqttConfig, MqttProtocol, UPClientMqtt, UPClientMqttType};
use up_rust::{UListener, UMessage, UStatus, UTransport, UUri, UUID};

const WILDCARD_ENTITY_ID: u32 = 0x0000_FFFF;
Expand All @@ -46,41 +44,17 @@ async fn main() -> Result<(), UStatus> {
.filter(None, LevelFilter::Trace) // Default level
.init();

// Get the protocol type from the environment (either "mqtt" or "mqtts" for unencrypted/ encrypted mqtt)
let protocol = env::var("MQTT_PROTOCOL")
.expect("MQTT_PROTOCOL env variable not found")
.to_string();
let mut ssl_options = None;
let mut user_name = "eclipse_testuser".to_string();
// Set the protocol type ("mqtt" for unencrypted mqtt)
let protocol = MqttProtocol::Mqtt;

// Check if "mqtts" is selected and set the optional parameters
if protocol == "mqtts".to_string() {
// Build the ssl options
ssl_options = Some(
SslOptionsBuilder::new()
.key_store(env::var("KEY_STORE").expect("KEY_STORE env variable not found"))
.expect("Certificate file not found.")
.private_key_password(
env::var("PRIVATE_KEY_PW").expect("PRIVATE_KEY_PW env variable not found"),
)
.enable_server_cert_auth(false)
.finalize(),
);
// Set the username to the name attached to the certificate
user_name = env::var("CLIENT_NAME")
.expect("CLIENT_NAME env variable not found")
.to_string();
}
// no need to build ssl options since we are using unencrypted mqtt, username is arbitrary
let ssl_options = None;
let user_name = "eclipse_testuser".to_string();

// Build the configuration for the connection
let config = MqttConfig {
mqtt_protocol: protocol,
mqtt_hostname: env::var("MQTT_HOSTNAME")
.expect("MQTT_HOSTNAME env variable not found")
.to_string(),
mqtt_port: env::var("MQTT_PORT")
.expect("MQTT_PORT env variable not found")
.to_string(),
mqtt_hostname: "localhost".to_string(),
mqtt_port: 1883,
max_buffered_messages: 100,
max_subscriptions: 100,
session_expiry_interval: 3600,
Expand Down
19 changes: 16 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,16 @@ impl MockableMqttClient for AsyncMqttClient {
where
Self: Sized,
{
let mqtt_protocol = match config.mqtt_protocol {
MqttProtocol::Mqtt => "mqtt",
MqttProtocol::Mqtts => "mqtts",
};

let mqtt_uri = format!(
"{}://{}:{}",
config.mqtt_protocol, config.mqtt_hostname, config.mqtt_port
mqtt_protocol,
config.mqtt_hostname,
config.mqtt_port
);

let mut mqtt_cli = mqtt::CreateOptionsBuilder::new()
Expand Down Expand Up @@ -238,9 +245,9 @@ impl MockableMqttClient for AsyncMqttClient {
/// Configuration for the mqtt client.
pub struct MqttConfig {
/// Schema of the mqtt broker (mqtt or mqtts)
pub mqtt_protocol: String,
pub mqtt_protocol: MqttProtocol,
/// Port of the mqtt broker to connect to.
pub mqtt_port: String,
pub mqtt_port: u16,
/// Hostname of the mqtt broker.
pub mqtt_hostname: String,
/// Max buffered messages for the mqtt client.
Expand Down Expand Up @@ -279,6 +286,12 @@ pub enum UPClientMqttType {
Cloud,
}

/// Type of MQTT protocol
pub enum MqttProtocol {
Mqtt,
Mqtts,
}

impl UPClientMqtt {
/// Create a new UPClientMqtt.
///
Expand Down

0 comments on commit c88127c

Please sign in to comment.