From f1659157a88254894999de39bb622ee9ca1970ee Mon Sep 17 00:00:00 2001 From: Daniel Mulvad Date: Sat, 30 Sep 2023 15:52:43 -0700 Subject: [PATCH 1/4] rebased PR #267 --- src/client.rs | 39 +++++++++++++++++++++++++++++++++++++++ src/connection.rs | 16 ++++++++++++++++ src/connection_manager.rs | 27 +++++++++++++++++++++++++++ 3 files changed, 82 insertions(+) diff --git a/src/client.rs b/src/client.rs index 0fdfc3c..fb38c2d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -511,6 +511,25 @@ impl PulsarBuilder { self } + /// add a certificate and private key to authenticate the client in TLS connections + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + pub fn with_identity(mut self, certificate: Vec, private_key: Vec) -> Self { + match &mut self.tls_options { + Some(tls) => { + tls.certificate = Some(certificate); + tls.private_key = Some(private_key); + } + None => { + self.tls_options = Some(TlsOptions { + certificate: Some(certificate), + private_key: Some(private_key), + ..Default::default() + }) + } + } + self + } + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self { match &mut self.tls_options { @@ -560,6 +579,26 @@ impl PulsarBuilder { self } + /// add a certificate and private key to authenticate the client in TLS connections + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + pub fn with_identity_files>( + self, + certificate_path: P, + private_key_path: P, + ) -> Result { + use std::io::Read; + + let mut file = std::fs::File::open(certificate_path)?; + let mut certificate = vec![]; + file.read_to_end(&mut certificate)?; + + let mut file = std::fs::File::open(private_key_path)?; + let mut private_key = vec![]; + file.read_to_end(&mut private_key)?; + + Ok(self.with_identity(certificate, private_key)) + } + /// creates the Pulsar client and connects it #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] pub async fn build(self) -> Result, Error> { diff --git a/src/connection.rs b/src/connection.rs index fad471b..7ca1f59 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -20,6 +20,8 @@ use futures::{ task::{Context, Poll}, Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] +use native_tls::{Certificate, Identity}; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; use url::Url; @@ -781,6 +783,9 @@ impl Connection { auth_data: Option>>>, proxy_to_broker_url: Option, certificate_chain: &[Certificate], + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] identity: &Option< + Identity, + >, allow_insecure_connection: bool, tls_hostname_verification_enabled: bool, connection_timeout: Duration, @@ -840,6 +845,8 @@ impl Connection { auth_data.clone(), proxy_to_broker_url.clone(), certificate_chain, + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] + identity.clone(), allow_insecure_connection, tls_hostname_verification_enabled, executor.clone(), @@ -918,6 +925,9 @@ impl Connection { auth: Option>>>, proxy_to_broker_url: Option, certificate_chain: &[Certificate], + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] identity: Option< + Identity, + >, allow_insecure_connection: bool, tls_hostname_verification_enabled: bool, executor: Arc, @@ -934,6 +944,9 @@ impl Connection { for certificate in certificate_chain { builder.add_root_certificate(certificate.clone()); } + if let Some(identity) = identity { + builder.identity(identity); + } builder.danger_accept_invalid_hostnames( allow_insecure_connection && !tls_hostname_verification_enabled, ); @@ -1035,6 +1048,9 @@ impl Connection { for certificate in certificate_chain { connector = connector.add_root_certificate(certificate.clone()); } + if let Some(identity) = identity { + connector = connector.identity(identity); + } connector = connector.danger_accept_invalid_hostnames( allow_insecure_connection && !tls_hostname_verification_enabled, ); diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 6dcb1a9..82b84b3 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -1,6 +1,8 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; +#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] +use native_tls::{Certificate, Identity}; use rand::Rng; use url::Url; @@ -74,6 +76,12 @@ pub struct TlsOptions { /// contains a list of PEM encoded certificates pub certificate_chain: Option>, + /// PEM encoded X509 certificates + pub certificate: Option>, + + /// is a PEM encoded PKCS #8 formatted private key for the leaf certificate + pub private_key: Option>, + /// allow insecure TLS connection if set to true /// /// defaults to *false* @@ -90,6 +98,8 @@ impl Default for TlsOptions { fn default() -> Self { Self { certificate_chain: None, + certificate: None, + private_key: None, allow_insecure_connection: false, tls_hostname_verification_enabled: true, } @@ -116,6 +126,8 @@ pub struct ConnectionManager { pub(crate) operation_retry_options: OperationRetryOptions, tls_options: TlsOptions, certificate_chain: Vec, + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] + identity: Option, outbound_channel_size: usize, } @@ -172,6 +184,17 @@ impl ConnectionManager { } }; + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] + let identity = match ( + tls_options.certificate.as_ref(), + tls_options.private_key.as_ref(), + ) { + (None, _) | (_, None) => None, + (Some(certificate), Some(privatekey)) => { + Some(native_tls::Identity::from_pkcs8(&certificate, &privatekey)?) + } + }; + if let Some(auth) = auth.clone() { auth.lock().await.initialize().await?; } @@ -185,6 +208,8 @@ impl ConnectionManager { operation_retry_options, tls_options, certificate_chain, + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] + identity, outbound_channel_size, }; let broker_address = BrokerAddress { @@ -303,6 +328,8 @@ impl ConnectionManager { self.auth.clone(), proxy_url.clone(), &self.certificate_chain, + #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] + &self.identity, self.tls_options.allow_insecure_connection, self.tls_options.tls_hostname_verification_enabled, self.connection_retry_options.connection_timeout, From b3f4563b43b1dec387d868035f9bf3f66726a5ec Mon Sep 17 00:00:00 2001 From: alesharik Date: Wed, 18 Sep 2024 09:00:28 +0300 Subject: [PATCH 2/4] fix: compilation errors in connection --- src/connection.rs | 2 +- src/connection_manager.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 7ca1f59..cd888ce 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -21,7 +21,7 @@ use futures::{ Future, FutureExt, Sink, SinkExt, Stream, StreamExt, }; #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] -use native_tls::{Certificate, Identity}; +use native_tls::Identity; use proto::MessageIdData; use rand::{seq::SliceRandom, thread_rng}; use url::Url; diff --git a/src/connection_manager.rs b/src/connection_manager.rs index 82b84b3..86929e3 100644 --- a/src/connection_manager.rs +++ b/src/connection_manager.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use futures::{channel::oneshot, lock::Mutex}; #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))] -use native_tls::{Certificate, Identity}; +use native_tls::Identity; use rand::Rng; use url::Url; From 85375540f0047a22f2775d37af49010dedd2eee8 Mon Sep 17 00:00:00 2001 From: alesharik Date: Sat, 1 Mar 2025 21:24:56 +0300 Subject: [PATCH 3/4] feat: add tests --- .github/certs/ca.cert.pem | 19 +++++++++ .github/certs/ca.cert.srl | 1 + .github/certs/ca.key.pem | 28 +++++++++++++ .github/certs/client.cert.pem | 18 ++++++++ .github/certs/client.csr.pem | 15 +++++++ .github/certs/client.key-pk8.pem | 28 +++++++++++++ .github/certs/client.key.pem | 28 +++++++++++++ .github/certs/server.cert.pem | 21 ++++++++++ .github/certs/server.conf | 20 +++++++++ .github/certs/server.csr.pem | 15 +++++++ .github/certs/server.key-pk8.pem | 28 +++++++++++++ .github/certs/server.key.pem | 28 +++++++++++++ .github/gen_test_certs.sh | 25 ++++++++++++ .github/workflows/rust.yml | 2 +- src/lib.rs | 70 ++++++++++++++++++++++++++++++++ 15 files changed, 345 insertions(+), 1 deletion(-) create mode 100755 .github/certs/ca.cert.pem create mode 100755 .github/certs/ca.cert.srl create mode 100755 .github/certs/ca.key.pem create mode 100755 .github/certs/client.cert.pem create mode 100755 .github/certs/client.csr.pem create mode 100755 .github/certs/client.key-pk8.pem create mode 100755 .github/certs/client.key.pem create mode 100755 .github/certs/server.cert.pem create mode 100755 .github/certs/server.conf create mode 100755 .github/certs/server.csr.pem create mode 100755 .github/certs/server.key-pk8.pem create mode 100755 .github/certs/server.key.pem create mode 100755 .github/gen_test_certs.sh diff --git a/.github/certs/ca.cert.pem b/.github/certs/ca.cert.pem new file mode 100755 index 0000000..50db2f8 --- /dev/null +++ b/.github/certs/ca.cert.pem @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDAzCCAeugAwIBAgIUIX2hsIzC72XHvsJwtvi0vm7QVhcwDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGQ0FSb290MB4XDTI1MDMwMTE4MTQwOVoXDTI2MDMwMTE4 +MTQwOVowETEPMA0GA1UEAwwGQ0FSb290MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEA8nyyyDTz9G9Tf2xO6b679O9Z9OtV50oiMPB5T7DpEI1rp2Md+Szr +FYMUTJBn55KOyyYtyxX9gswVWjHSuNSfR2JEY5154OMF6+FVlz/y2pR36qjoe+f2 +8g6ZdZlMAI2iKB9gwBVHGzC3RtAOpio7Hot/CU86b3UEd1u8wIsqT1rXA+g6q02d +hruEAxfMMLBu9muWlvt593r2XoHEWyB/+3NcrQ0jUaKzzUC83VdVNNHMSwWXuf1p +bcrx77Q7o02AR6m5UcP5LEih/PyC07SMG2F4C51eCRGTXi99g7XdukvGTeYgMQgu +9MXmOLX43C9WTX6N0rYWAkUZFrQ0hNUyJwIDAQABo1MwUTAdBgNVHQ4EFgQUtQ5X +UF5Mt8+2tK5kZO8zx5GMQggwHwYDVR0jBBgwFoAUtQ5XUF5Mt8+2tK5kZO8zx5GM +QggwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEArpMWZzLlWUcs +oRrCDXRFjvXeZwxhIqiD8Ad9sEBQb4XmW+iDLEiLzQ4yEjhXtiYyKHWLpatQTG4D +k6Wx4YpGI4IFyZhJ0K0kmGeQLHh/TxvilLlf+8wD3TEYTS52Kyc44v9e4Ldqw2bw +QFk/Iur+5hQwKSHKuZxXYwdlPiWrf/v89/ddn6bbJSzDStm+42p9rhn0LToEToQI +ArW2X7YasLUrjg9SPjjfEZnzhSf23catFOb/XiRJu/jLx3Q5BAC34xA1zZnso6KE +Ys+XEqx9YVb6QCRfR8sf1v0Qplni+wJ702oGRGXB872Q+qdGO8cjv9KRgohZM5xo +MDcgUkE6UA== +-----END CERTIFICATE----- diff --git a/.github/certs/ca.cert.srl b/.github/certs/ca.cert.srl new file mode 100755 index 0000000..72b257d --- /dev/null +++ b/.github/certs/ca.cert.srl @@ -0,0 +1 @@ +2D94EB634C5223A76156537EBE0DB87498B6D16D diff --git a/.github/certs/ca.key.pem b/.github/certs/ca.key.pem new file mode 100755 index 0000000..a182a00 --- /dev/null +++ b/.github/certs/ca.key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDyfLLINPP0b1N/ +bE7pvrv071n061XnSiIw8HlPsOkQjWunYx35LOsVgxRMkGfnko7LJi3LFf2CzBVa +MdK41J9HYkRjnXng4wXr4VWXP/LalHfqqOh75/byDpl1mUwAjaIoH2DAFUcbMLdG +0A6mKjsei38JTzpvdQR3W7zAiypPWtcD6DqrTZ2Gu4QDF8wwsG72a5aW+3n3evZe +gcRbIH/7c1ytDSNRorPNQLzdV1U00cxLBZe5/WltyvHvtDujTYBHqblRw/ksSKH8 +/ILTtIwbYXgLnV4JEZNeL32Dtd26S8ZN5iAxCC70xeY4tfjcL1ZNfo3SthYCRRkW +tDSE1TInAgMBAAECggEAIg/uNT1q/20b94xJUYBferqcklD0kjbRsro08ELTmmWj +N7Iupa/vnef50/98+QSXVmYYjKycHeF5JmBVT21eqnJdVPjsA/EgBdBza3mqLZYp +eicb8TvWbo4qdGCCUpmSq7wspQ7YHDOtqx7Sz/iisVm7EJ22ga2YYQPb8dgaq2a7 +Sbf37Gr7LSqghpJLvqAXO/ft++cs9HXm9cVwDbVbpFxr7+ACNIO8kfMAn6EvVE0a +NIaXeZ6r1X6LN+iZx0/R5DFGr7EiMKKGw4fvRTGDozCzaKkIw8iA8adJTQ8qFmPi +nYjFsb0pViTsPUUnDEDUz7EdN81SHWSaxbVIcRV5YQKBgQD6Zr+ah4bhAMKQiVxe +rDm6CLZYSxwWL2w40/CPqOF15gdi3VOV0MQt4qutwFRcW85hl9G7iEJJQLLc3/AW +23AMpo7KNXlFJ6bybWKFLllmuouhtTCTYMOnzZ5caWSUt27KOlw/m3FkOqjfV4rv +WHHRWFSk8fICvDJ6h6wgKUS7HwKBgQD36KZwTKD8CFdzeFGjN37xGnJIeo7HSddc +8g6uISklELFSEMwD1uidc1qJAXywSILf1mKiEdvmDYyiJYRcZii43urpvpWChsC8 +dlUdtHY1NY6FYOyOrkaPA/aoS6LJRhqGBRhxOiH39khXS3eOSUqWWdSgDSHm0Ret +ApmRCc+v+QKBgQCb3zIOo6bWkX6MMrWJSXR11c6Mj9TgCo03otbw1pPirFqUtGm/ +rMEyPxPXiQ36oDuMjNFGB8AbkWbGD7CIyfiqJ1Rjk332CoTmDClz9jI4vI8HDp86 +XtXssxaNOB4gZT9WK5YZEk43sjxFK12VOF1Bj9bBBss74KMbmzEOpCHgyQKBgQDH +vkf8S4GaEA6GFveQDnLVSH20MCtCHxjK5q01V7qTBrQYbzgyD8I0unDD2tLgJkxs +lYu7EbCf12LsclYHLLi628FivBAyrugz8RP7bx+fF5vRk5qGC50GNidSyj5K9ZUB +TrG26SPdai43R7L4jczu/0n1+x7Jnsb4uxOIbONN4QKBgBqIVerc9iHWb/GwgSFt +9N631KqVLAtSCRskvxdtU7r5HmLp9GDXU6whkSHvYDTwKQOimkG2TvmCamftXmhY +77XPAf+GW3GCMo4LJPYI4y3tm/Wz29CSMEDd6bEu3A/yBcP15AC+0K8MEUo1mv40 +priSPLoDyBa9jHE5oc4HvkLs +-----END PRIVATE KEY----- diff --git a/.github/certs/client.cert.pem b/.github/certs/client.cert.pem new file mode 100755 index 0000000..6998dc7 --- /dev/null +++ b/.github/certs/client.cert.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC8jCCAdqgAwIBAgIULZTrY0xSI6dhVlN+vg24dJi20W0wDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGQ0FSb290MB4XDTI1MDMwMTE4MTQwOVoXDTI2MDMwMTE4 +MTQwOVowETEPMA0GA1UEAwwGY2xpZW50MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEA1y5ARPhmMiOuCFn4SXv/V9q8nKR7D/sc4+jCCiFbnDLH9h3f2v3j +S7cGcO68Q51NMS0c4pqwZBTf/9EHrfsJIQxu+iFScMnbvcip5B+oEciim33HbIJc +7b9Bd2pvtIZH9fwf69j4dvz9vhGa4EogU/0v0sngNXFqTpJ6JCT+luViDOc7AjqU +S5ryhURcGleKad94QRgIWmgG6ABlLplrzZRW4UXkZODJAZkb95XowMCacrk6TwLs +gDKM0GZb+INxh2A4iBjP62mY9lbsdaQJ4Lu5qpik5gcEuvqRIymbla0dnTSLBqnt +LJJJi83AujY8HI3vOwzDVxhXHqr1UNrMDQIDAQABo0IwQDAdBgNVHQ4EFgQUyOa8 +CnytxNCSc+gfyuywt4H857wwHwYDVR0jBBgwFoAUtQ5XUF5Mt8+2tK5kZO8zx5GM +QggwDQYJKoZIhvcNAQELBQADggEBAFoc3sok6+MfN08UYrdq+aXuNG0hdiWdDxTQ +4r5WhdMTcfSR3VAB5+cbDzyS52Cr3vpGoOskHtLcoYs/EJje83P051cuV134ShK2 +vkhU37+QJKbme9ca/EP8JNPRVPWFJeSf1d8K7aT8SmBHGeh9cjY6zBkz/fH657J9 +uqtt27BJ2erlFzp0gHU39XTGbKMrv3MW4cHrSrDdrO/n03lPzuVbCAUHr6/WWWS9 +3rJgVVJjq1D1oD+dIo+CLiNc9XZkOhyR5N5iAqGeuAc3XCOYJoLzlVLIqMEDrfCE +6wk8Fgfkhybj37fEUKosd0SYRJpI35/ESBofC8cKWwyxM6dgQPU= +-----END CERTIFICATE----- diff --git a/.github/certs/client.csr.pem b/.github/certs/client.csr.pem new file mode 100755 index 0000000..7bd7e61 --- /dev/null +++ b/.github/certs/client.csr.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICVjCCAT4CAQAwETEPMA0GA1UEAwwGY2xpZW50MIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEA1y5ARPhmMiOuCFn4SXv/V9q8nKR7D/sc4+jCCiFbnDLH +9h3f2v3jS7cGcO68Q51NMS0c4pqwZBTf/9EHrfsJIQxu+iFScMnbvcip5B+oEcii +m33HbIJc7b9Bd2pvtIZH9fwf69j4dvz9vhGa4EogU/0v0sngNXFqTpJ6JCT+luVi +DOc7AjqUS5ryhURcGleKad94QRgIWmgG6ABlLplrzZRW4UXkZODJAZkb95XowMCa +crk6TwLsgDKM0GZb+INxh2A4iBjP62mY9lbsdaQJ4Lu5qpik5gcEuvqRIymbla0d +nTSLBqntLJJJi83AujY8HI3vOwzDVxhXHqr1UNrMDQIDAQABoAAwDQYJKoZIhvcN +AQELBQADggEBAJgZYs1icSrzfLhJME7dSc8+DZQedMSBVlrYOynHk1pcmM7efOG+ +k6ioEjxVRtLwdw+OBtys3arbv0qF5CTn5lZnM6Oz9utFQzzaqxXRQ/oQ401LDEt2 +sTpvII/htO0/vUr4MtsnOEFnRlxdCc4ydD+Wexc0iC8REzFFS5U7yHJ+vbk6ZyBC +swR83bFAISgSyEdwpxGDoRIf8TphupmTHRbUdf+iCjW6UoDu9maliZN9BM8V85Op +IUiJZ2mta0H4M3CsCFuPWaQpQVTyfxAyQvZAnSY9YgfNWQ80rhyDtLjv9f1bwJP4 +nf9GNkU7ewv1ACx9+5tpXT4SA7cBOWL/bpk= +-----END CERTIFICATE REQUEST----- diff --git a/.github/certs/client.key-pk8.pem b/.github/certs/client.key-pk8.pem new file mode 100755 index 0000000..485b80d --- /dev/null +++ b/.github/certs/client.key-pk8.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDXLkBE+GYyI64I +WfhJe/9X2rycpHsP+xzj6MIKIVucMsf2Hd/a/eNLtwZw7rxDnU0xLRzimrBkFN// +0Qet+wkhDG76IVJwydu9yKnkH6gRyKKbfcdsglztv0F3am+0hkf1/B/r2Ph2/P2+ +EZrgSiBT/S/SyeA1cWpOknokJP6W5WIM5zsCOpRLmvKFRFwaV4pp33hBGAhaaAbo +AGUumWvNlFbhReRk4MkBmRv3lejAwJpyuTpPAuyAMozQZlv4g3GHYDiIGM/raZj2 +Vux1pAngu7mqmKTmBwS6+pEjKZuVrR2dNIsGqe0skkmLzcC6Njwcje87DMNXGFce +qvVQ2swNAgMBAAECggEALghS725ETo2beWXl27KPGXhNySTWvsB1mbv9ZrVkJ7Il +XN1fOjX+HgB1fEtWMxC/prmD6GY3PfHJSayp++osNcm3JANk44QLYjo/PfIXvlYx +kxoBQ5FP1/yGQMmgQ/pGArWBDq9H2AyfK74QZ9Lzv4P2ax2MvWIZ390uC/VuEVF0 +eZGGJrNmwMrr16jhW/pMQS7euHq/Fi7RXPbHCK9sw/Xb03G/mOaPgFfCSz2vIMNR +TLVL3tNDbp6D3Z8JApLNFs9UTfE3mMEQuJ30NHZAbMDSA90Vjx47wrac/xMb8RfJ +V/KuA6U3hGhDz4iYY8QLafcNsYfTtTbPk8CTsrGScQKBgQD06GLwk8v5qPo9XS7l +CtXXyehSOj9RRvEA5RQl8Jmr3Xo0dGyVX0HYLn8+lrYw5zH8PE3zmccX/c8h+xR/ +pr0yuBiXcHBmjpnB2KDxGPbLzP0DYsUXZZO9WlR60XvLaU9h3ztmOezTqjAPwd6Y +JVJD8sROsjjYHKujmu9LVsR/rwKBgQDg7TCqsSkw3zga2iN4SVwBcWIBckJo6GkU +15vCNcYYi3rqFrUsD6kopWR2RCuP2mOPiAdZVJ6cKYm3mCIxA8XQMzBYnW/HU742 +CRCuQyQ0vxISS+7X2e0gpltydn0KmbWzRkM99KG5ueWeOG8fl12qofbFaIRbeCtR +OyFFRSXDAwKBgEQBO2f+QkeDCml7tIfZKGQRJreegPJ6tnU/JwcZ9jHpAAQMyBH6 +L3huANtKUjDmIdUeka6w5r3ctmWDKMABdsvcnsd+a1evBTb0hkwsWGti8ma8SGok +xI9xw7+O58cl6OsOZ5oNP5vOyCW3+a1Zv6wqfyQzDOHFpEt7vDK5Vzr1AoGAVTtM +D8JmO3C6j+JNQo154coXOzkS1TIMb6tsCGHcGPzxrLbCHFoq3Y4ezXnzj+VPiNog +/sbD+6T53Ko3ep9VXBmpnkq6Sqv/DfbNbVtf4uTsciZ38S9RXefsCym/JD8uzSo4 +3MdQaVJ6IkR8R1pSUhBhtFhwn0CgGX3wk5EqSAUCgYEAxOEZkBKpa3psvrsxGljz +J15p0WJlKvnEBB7rE4rWtS/H9kiYIwhv9/NtCU6dk/6/4raaqS1wz2CaOGHHj965 +YoC/BsWbvjNHk1slFyMMHL4vJ156vxlozkNKaLZ5hLS+1Yg2hz8Dn1OoUps/f0ss +C2yF0knH+4PtaRDtURojXec= +-----END PRIVATE KEY----- diff --git a/.github/certs/client.key.pem b/.github/certs/client.key.pem new file mode 100755 index 0000000..485b80d --- /dev/null +++ b/.github/certs/client.key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDXLkBE+GYyI64I +WfhJe/9X2rycpHsP+xzj6MIKIVucMsf2Hd/a/eNLtwZw7rxDnU0xLRzimrBkFN// +0Qet+wkhDG76IVJwydu9yKnkH6gRyKKbfcdsglztv0F3am+0hkf1/B/r2Ph2/P2+ +EZrgSiBT/S/SyeA1cWpOknokJP6W5WIM5zsCOpRLmvKFRFwaV4pp33hBGAhaaAbo +AGUumWvNlFbhReRk4MkBmRv3lejAwJpyuTpPAuyAMozQZlv4g3GHYDiIGM/raZj2 +Vux1pAngu7mqmKTmBwS6+pEjKZuVrR2dNIsGqe0skkmLzcC6Njwcje87DMNXGFce +qvVQ2swNAgMBAAECggEALghS725ETo2beWXl27KPGXhNySTWvsB1mbv9ZrVkJ7Il +XN1fOjX+HgB1fEtWMxC/prmD6GY3PfHJSayp++osNcm3JANk44QLYjo/PfIXvlYx +kxoBQ5FP1/yGQMmgQ/pGArWBDq9H2AyfK74QZ9Lzv4P2ax2MvWIZ390uC/VuEVF0 +eZGGJrNmwMrr16jhW/pMQS7euHq/Fi7RXPbHCK9sw/Xb03G/mOaPgFfCSz2vIMNR +TLVL3tNDbp6D3Z8JApLNFs9UTfE3mMEQuJ30NHZAbMDSA90Vjx47wrac/xMb8RfJ +V/KuA6U3hGhDz4iYY8QLafcNsYfTtTbPk8CTsrGScQKBgQD06GLwk8v5qPo9XS7l +CtXXyehSOj9RRvEA5RQl8Jmr3Xo0dGyVX0HYLn8+lrYw5zH8PE3zmccX/c8h+xR/ +pr0yuBiXcHBmjpnB2KDxGPbLzP0DYsUXZZO9WlR60XvLaU9h3ztmOezTqjAPwd6Y +JVJD8sROsjjYHKujmu9LVsR/rwKBgQDg7TCqsSkw3zga2iN4SVwBcWIBckJo6GkU +15vCNcYYi3rqFrUsD6kopWR2RCuP2mOPiAdZVJ6cKYm3mCIxA8XQMzBYnW/HU742 +CRCuQyQ0vxISS+7X2e0gpltydn0KmbWzRkM99KG5ueWeOG8fl12qofbFaIRbeCtR +OyFFRSXDAwKBgEQBO2f+QkeDCml7tIfZKGQRJreegPJ6tnU/JwcZ9jHpAAQMyBH6 +L3huANtKUjDmIdUeka6w5r3ctmWDKMABdsvcnsd+a1evBTb0hkwsWGti8ma8SGok +xI9xw7+O58cl6OsOZ5oNP5vOyCW3+a1Zv6wqfyQzDOHFpEt7vDK5Vzr1AoGAVTtM +D8JmO3C6j+JNQo154coXOzkS1TIMb6tsCGHcGPzxrLbCHFoq3Y4ezXnzj+VPiNog +/sbD+6T53Ko3ep9VXBmpnkq6Sqv/DfbNbVtf4uTsciZ38S9RXefsCym/JD8uzSo4 +3MdQaVJ6IkR8R1pSUhBhtFhwn0CgGX3wk5EqSAUCgYEAxOEZkBKpa3psvrsxGljz +J15p0WJlKvnEBB7rE4rWtS/H9kiYIwhv9/NtCU6dk/6/4raaqS1wz2CaOGHHj965 +YoC/BsWbvjNHk1slFyMMHL4vJ156vxlozkNKaLZ5hLS+1Yg2hz8Dn1OoUps/f0ss +C2yF0knH+4PtaRDtURojXec= +-----END PRIVATE KEY----- diff --git a/.github/certs/server.cert.pem b/.github/certs/server.cert.pem new file mode 100755 index 0000000..8fe149e --- /dev/null +++ b/.github/certs/server.cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDejCCAmKgAwIBAgIULZTrY0xSI6dhVlN+vg24dJi20WwwDQYJKoZIhvcNAQEL +BQAwETEPMA0GA1UEAwwGQ0FSb290MB4XDTI1MDMwMTE4MTQwOVoXDTI2MDMwMTE4 +MTQwOVowETEPMA0GA1UEAwwGc2VydmVyMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A +MIIBCgKCAQEAwYA2a8rKc5kQgWux2TVRxgbLCDTJXQi9ZVWfjMcP+cSsHGUoBBpq +VKlKGuRelI+7VL2a7qVpbh8+rXJnZzPeXQhozQalguGh3J4qSfa44pB1anM2zHGW +zvQHeH0VCdUEP1g6Agt2JCB9M2jriNvRioxvWHehHk7mJkrFa6qcQa+U8XghYTi4 +3pi/3Ejh4pG9h65+CaDI6C0cvcuBGShdJWd9F6uErLQIAHxefvQKemI/MxKOQttj +qDidK9Hd9oIURgGcOT9vN6/hP6hmAOrFxAxReqVKq8Joqq631yy3Dyoxdo0dXFe1 +gpLdKFvt8QUR/nINlnPox7bMmeJevSf5xwIDAQABo4HJMIHGMEwGA1UdIwRFMEOA +FLUOV1BeTLfPtrSuZGTvM8eRjEIIoRWkEzARMQ8wDQYDVQQDDAZDQVJvb3SCFCF9 +obCMwu9lx77CcLb4tL5u0FYXMAkGA1UdEwQCMAAwDgYDVR0PAQH/BAQDAgWgMBMG +A1UdJQQMMAoGCCsGAQUFBwMBMCcGA1UdEQQgMB6CBnB1bHNhcoIOcHVsc2FyLmRl +ZmF1bHSHBH8AAAEwHQYDVR0OBBYEFMafLLvP99GaKWRcCX8M2/oFltrgMA0GCSqG +SIb3DQEBCwUAA4IBAQDk0MvZHrVef8o7NI9WEP5PQ9QnarZQDMexwEOcPZgKnGoM +oKSMzIDn5UT62ykL3O5S4eJ/3MyOigiyjAZ9kDxcdqrtjcMad2EIxqjFx2ebxFug +cGKGHkGlgyt2hKYkeXc+3n335bAyjNhqEFYTRfYV+kQClGMag3wCy543neLyh9ii +65WVWbbJYBc8elfUpXh3yfVdZUnNMWoDkW3jj5N5OG3DmixMRGJio/0HHVBLCC21 +cVWjrXoMyofzfSZ1CUb0L+VfKBfdQuPWr1qYqNzIAxs03VCDUdft4+6tZZ5JPzcA +ihbqvvdbSvuVaN1XMqMXcOuQrC1RQVgdSwZId/c0 +-----END CERTIFICATE----- diff --git a/.github/certs/server.conf b/.github/certs/server.conf new file mode 100755 index 0000000..cf513b5 --- /dev/null +++ b/.github/certs/server.conf @@ -0,0 +1,20 @@ +[ req ] +default_bits = 2048 +prompt = no +default_md = sha256 +distinguished_name = dn + +[ v3_ext ] +authorityKeyIdentifier=keyid,issuer:always +basicConstraints=CA:FALSE +keyUsage=critical, digitalSignature, keyEncipherment +extendedKeyUsage=serverAuth +subjectAltName=@alt_names + +[ dn ] +CN = server + +[ alt_names ] +DNS.1 = pulsar +DNS.2 = pulsar.default +IP.1 = 127.0.0.1 \ No newline at end of file diff --git a/.github/certs/server.csr.pem b/.github/certs/server.csr.pem new file mode 100755 index 0000000..bb44225 --- /dev/null +++ b/.github/certs/server.csr.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICVjCCAT4CAQAwETEPMA0GA1UEAwwGc2VydmVyMIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAwYA2a8rKc5kQgWux2TVRxgbLCDTJXQi9ZVWfjMcP+cSs +HGUoBBpqVKlKGuRelI+7VL2a7qVpbh8+rXJnZzPeXQhozQalguGh3J4qSfa44pB1 +anM2zHGWzvQHeH0VCdUEP1g6Agt2JCB9M2jriNvRioxvWHehHk7mJkrFa6qcQa+U +8XghYTi43pi/3Ejh4pG9h65+CaDI6C0cvcuBGShdJWd9F6uErLQIAHxefvQKemI/ +MxKOQttjqDidK9Hd9oIURgGcOT9vN6/hP6hmAOrFxAxReqVKq8Joqq631yy3Dyox +do0dXFe1gpLdKFvt8QUR/nINlnPox7bMmeJevSf5xwIDAQABoAAwDQYJKoZIhvcN +AQELBQADggEBACCrUmDPoF3CzagaTv7J551bqdys+V/eSMSh23X9FCmynPKWf+Tl +OvYN8pL4bPNlfSQz3RYaTv2YTDdooIZ8MnhB4kTYhHMiN+snEyqQ2cUopi+b/rGE +bX5KMyCaBLCGnnQCsFUEZEkk9DtK+X1W/MT5kx369+gytqgDyZZdxSUm4CsDyuYY +lQnue+3h2TZAzIOuN0XOgPti+FtKJnG2Ko2uI0ofjVEanrRqaoHWkDv248cPRlYS ++99dH8An2VW8Eji2aEh4CBeoOLX/M64l0NTBkQD2UYQg8OkvbRG1DcqhrBwZX5xE +sSZkWg+/hWVVIhtuBX1kZ6AxDDSVTIdq/Tk= +-----END CERTIFICATE REQUEST----- diff --git a/.github/certs/server.key-pk8.pem b/.github/certs/server.key-pk8.pem new file mode 100755 index 0000000..7619dbb --- /dev/null +++ b/.github/certs/server.key-pk8.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDBgDZryspzmRCB +a7HZNVHGBssINMldCL1lVZ+Mxw/5xKwcZSgEGmpUqUoa5F6Uj7tUvZrupWluHz6t +cmdnM95dCGjNBqWC4aHcnipJ9rjikHVqczbMcZbO9Ad4fRUJ1QQ/WDoCC3YkIH0z +aOuI29GKjG9Yd6EeTuYmSsVrqpxBr5TxeCFhOLjemL/cSOHikb2Hrn4JoMjoLRy9 +y4EZKF0lZ30Xq4SstAgAfF5+9Ap6Yj8zEo5C22OoOJ0r0d32ghRGAZw5P283r+E/ +qGYA6sXEDFF6pUqrwmiqrrfXLLcPKjF2jR1cV7WCkt0oW+3xBRH+cg2Wc+jHtsyZ +4l69J/nHAgMBAAECggEAGKDujFK/6O27BZYFCX35XgigT/eUB6TkomC/EYaGuIY5 +V9XiPX91ODhh58FYbnVK5TLK5QQlwq/0ZNQyaBbYfyPiXvIxH9MDLJbvEAH4r1Gu +uDFY8LWmecnip4nYIyu7QirDG16FF30RInmAvScpaQaN54c2eSqWrZoHvbf18oAj +bjPYZNICxta4r7dLLAynX536S8kPLrf/EQvglEzafbFkMCTfA375+jy94gT9rqJU +VIz85WxrOI2QkgZFUfxU+4NgIycvTI2p2OzYfnWDCzXJ2wLbz3dlaE3INknZ2x/4 +dQBhWWRQokw0SdZCEzKRjJbDqOghT/ugo8awU3iq0QKBgQDpoM+U6PCQ8eKqAF7t +FYIg1FlRb8AQ6P1jcgdMHsnJYHWCAWFb4aUIxV6uBIAPlFDwp+vcoUdIfrkenPJE +7a9NTpypgwkjqouFRnhRnkTr69pjZilWf/2nx+lttgRyVR+ov6s3TSQ3Mfc0uRJA +4cT45WYTkCofmSI/3TvXBfRrUQKBgQDUB7cfm4wpbt4HDZYNrhW4xgMYwIFpg5+Q +gfteRA7Z6OMB/cBTeLuwpI9Qq/WzP835ZeYvbo/thuy7OrYQUX4PBAcQII7iEBD/ +Oci/x3q7RUG/pv3HE9blI8v6v3Q4ukrADJbKJ2UmH7lVHD1LmuaFnNHXoZndT6Ka +GvdFrmydlwKBgQCC9vSmoxYICrBnYCHWgZIa2S7gqeZfFFzG3GsezruLOZllsWRk +X09mZU23+ynWkev1nAxp97a/+tnHQW/GF/+HNKQ5mg0Lg1UG9wQHrNMx9VrNEb3m +kYUeAsDmEXzYosNGk3tlWscMceQEGCMVX/pNFDtuiAg1AmDisCAVX28uoQKBgACH +AGKYvXGA8BZuZzgjfbRW8b0HBYtgo231lM14N9084guoOOk8PKNE1YJq2xKWDwoW +wcrOaJc7pa0ViUxjF4AsD2Rv4EsZpSEtYJ1xfpGmq3IxlSIAn6E0RJacEUru067E +mT9pd6vvmkNm7ZQZj4c3i6IQqdROFWZyuLi4pVaDAoGAUTOTgMfTTqD0VsNKiRC+ +iiSnh3YQNz+ziWWPrrQTcFk98SWc6OjLVbolHLeZYP5tpJuIcTRjGt8WzPD8e4BH +pAbmoE8/AaLUzKNqIAJyZ/0SACq4b1/TBe60DnY5bJaYOhx4KooqybIh1bLyRsFs ++hFjlZ9jiwcU6gQVZARFSJU= +-----END PRIVATE KEY----- diff --git a/.github/certs/server.key.pem b/.github/certs/server.key.pem new file mode 100755 index 0000000..7619dbb --- /dev/null +++ b/.github/certs/server.key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDBgDZryspzmRCB +a7HZNVHGBssINMldCL1lVZ+Mxw/5xKwcZSgEGmpUqUoa5F6Uj7tUvZrupWluHz6t +cmdnM95dCGjNBqWC4aHcnipJ9rjikHVqczbMcZbO9Ad4fRUJ1QQ/WDoCC3YkIH0z +aOuI29GKjG9Yd6EeTuYmSsVrqpxBr5TxeCFhOLjemL/cSOHikb2Hrn4JoMjoLRy9 +y4EZKF0lZ30Xq4SstAgAfF5+9Ap6Yj8zEo5C22OoOJ0r0d32ghRGAZw5P283r+E/ +qGYA6sXEDFF6pUqrwmiqrrfXLLcPKjF2jR1cV7WCkt0oW+3xBRH+cg2Wc+jHtsyZ +4l69J/nHAgMBAAECggEAGKDujFK/6O27BZYFCX35XgigT/eUB6TkomC/EYaGuIY5 +V9XiPX91ODhh58FYbnVK5TLK5QQlwq/0ZNQyaBbYfyPiXvIxH9MDLJbvEAH4r1Gu +uDFY8LWmecnip4nYIyu7QirDG16FF30RInmAvScpaQaN54c2eSqWrZoHvbf18oAj +bjPYZNICxta4r7dLLAynX536S8kPLrf/EQvglEzafbFkMCTfA375+jy94gT9rqJU +VIz85WxrOI2QkgZFUfxU+4NgIycvTI2p2OzYfnWDCzXJ2wLbz3dlaE3INknZ2x/4 +dQBhWWRQokw0SdZCEzKRjJbDqOghT/ugo8awU3iq0QKBgQDpoM+U6PCQ8eKqAF7t +FYIg1FlRb8AQ6P1jcgdMHsnJYHWCAWFb4aUIxV6uBIAPlFDwp+vcoUdIfrkenPJE +7a9NTpypgwkjqouFRnhRnkTr69pjZilWf/2nx+lttgRyVR+ov6s3TSQ3Mfc0uRJA +4cT45WYTkCofmSI/3TvXBfRrUQKBgQDUB7cfm4wpbt4HDZYNrhW4xgMYwIFpg5+Q +gfteRA7Z6OMB/cBTeLuwpI9Qq/WzP835ZeYvbo/thuy7OrYQUX4PBAcQII7iEBD/ +Oci/x3q7RUG/pv3HE9blI8v6v3Q4ukrADJbKJ2UmH7lVHD1LmuaFnNHXoZndT6Ka +GvdFrmydlwKBgQCC9vSmoxYICrBnYCHWgZIa2S7gqeZfFFzG3GsezruLOZllsWRk +X09mZU23+ynWkev1nAxp97a/+tnHQW/GF/+HNKQ5mg0Lg1UG9wQHrNMx9VrNEb3m +kYUeAsDmEXzYosNGk3tlWscMceQEGCMVX/pNFDtuiAg1AmDisCAVX28uoQKBgACH +AGKYvXGA8BZuZzgjfbRW8b0HBYtgo231lM14N9084guoOOk8PKNE1YJq2xKWDwoW +wcrOaJc7pa0ViUxjF4AsD2Rv4EsZpSEtYJ1xfpGmq3IxlSIAn6E0RJacEUru067E +mT9pd6vvmkNm7ZQZj4c3i6IQqdROFWZyuLi4pVaDAoGAUTOTgMfTTqD0VsNKiRC+ +iiSnh3YQNz+ziWWPrrQTcFk98SWc6OjLVbolHLeZYP5tpJuIcTRjGt8WzPD8e4BH +pAbmoE8/AaLUzKNqIAJyZ/0SACq4b1/TBe60DnY5bJaYOhx4KooqybIh1bLyRsFs ++hFjlZ9jiwcU6gQVZARFSJU= +-----END PRIVATE KEY----- diff --git a/.github/gen_test_certs.sh b/.github/gen_test_certs.sh new file mode 100755 index 0000000..c36ae34 --- /dev/null +++ b/.github/gen_test_certs.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +cd certs/ + +echo "[+] Generating CA" +openssl genrsa -out ca.key.pem 2048 +openssl req -x509 -new -nodes -key ca.key.pem -subj "/CN=CARoot" -days 365 -out ca.cert.pem + +echo "[+] Generating server key" +openssl genrsa -out server.key.pem 2048 +openssl pkcs8 -topk8 -inform PEM -outform PEM -in server.key.pem -out server.key-pk8.pem -nocrypt + +echo "[+] Generating server cert" +openssl req -new -config server.conf -key server.key.pem -out server.csr.pem -sha256 +openssl x509 -req -in server.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out server.cert.pem -days 365 -extensions v3_ext -extfile server.conf -sha256 + +echo "[+] Generating client key" +openssl genrsa -out client.key.pem 2048 +openssl pkcs8 -topk8 -inform PEM -outform PEM -in client.key.pem -out client.key-pk8.pem -nocrypt + +echo "[+] Generating client cert" +openssl req -new -subj "/CN=client" -key client.key.pem -out client.csr.pem -sha256 +openssl x509 -req -in client.csr.pem -CA ca.cert.pem -CAkey ca.key.pem -CAcreateserial -out client.cert.pem -days 365 -sha256 + +chmod 555 * \ No newline at end of file diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b5a13c9..d3c7a19 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -30,7 +30,7 @@ jobs: pulsar-version: [ 2.10.4, 2.11.2, 3.0.4, 3.1.3 ] steps: - name: Start Pulsar Standalone Container - run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -d -e GITHUB_ACTIONS=true -e CI=true apachepulsar/pulsar:${{ matrix.pulsar-version }} bin/pulsar standalone + run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -p 8081:8081 -p 6651:6651 -p 8443:8443 -v $PWD/.github/certs:/certs -d -e GITHUB_ACTIONS=true -e CI=true -e PULSAR_PREFIX_brokerServicePortTls=6651 -e PULSAR_PREFIX_webServicePortTls=8443 -e PULSAR_PREFIX_tlsEnabled=true -e PULSAR_PREFIX_tlsCertificateFilePath=/certs/server.cert.pem -e PULSAR_PREFIX_tlsKeyFilePath=/certs/server.key-pk8.pem -e PULSAR_PREFIX_tlsTrustCertsFilePath=/certs/ca.cert.pem apachepulsar/pulsar:3.1.3 bash -c “/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone” - uses: actions/checkout@v3 - uses: Swatinem/rust-cache@v2 - name: Run tests diff --git a/src/lib.rs b/src/lib.rs index 53d473c..6afbab3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -386,6 +386,76 @@ mod tests { assert_eq!(received, message_ids); } + #[tokio::test] + #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] + async fn round_trip_tls() { + let _result = log::set_logger(&TEST_LOGGER); + log::set_max_level(LevelFilter::Debug); + + let addr = "pulsar+ssl://127.0.0.1:6651"; + let pulsar: Pulsar<_> = Pulsar::builder(addr, TokioExecutor) + .with_identity_files(&"./.github/certs/client.cert.pem", &"./.github/certs/client.key-pk8.pem").unwrap() + .with_certificate_chain_file(&"./.github/certs/ca.cert.pem").unwrap() + .with_allow_insecure_connection(false) + .with_tls_hostname_verification_enabled(false) + .build().await.unwrap(); + + // random topic to better allow multiple test runs while debugging + let topic = format!("test_{}", rand::random::()); + + let mut producer = pulsar.producer().with_topic(&topic).build().await.unwrap(); + info!("producer created"); + + let message_ids: BTreeSet = (0..100).collect(); + + info!("will send message"); + let mut sends = Vec::new(); + for &id in &message_ids { + let message = TestData { + data: "data".to_string(), + id, + }; + sends.push(producer.send_non_blocking(&message).await.unwrap()); + } + try_join_all(sends).await.unwrap(); + + info!("sent"); + + let mut consumer: Consumer = pulsar + .consumer() + .with_topic(&topic) + .with_consumer_name("test_consumer") + .with_subscription_type(SubType::Exclusive) + .with_subscription("test_subscription") + .with_options(ConsumerOptions { + initial_position: InitialPosition::Earliest, + ..Default::default() + }) + .build() + .await + .unwrap(); + + info!("consumer created"); + + let topics = consumer.topics(); + debug!("consumer connected to {:?}", topics); + assert_eq!(topics.len(), 1); + assert!(topics[0].ends_with(&topic)); + + let mut received = BTreeSet::new(); + while let Ok(Some(msg)) = timeout(Duration::from_secs(10), consumer.next()).await { + let msg: Message = msg.unwrap(); + info!("id: {:?}", msg.message_id()); + received.insert(msg.deserialize().unwrap().id); + consumer.ack(&msg).await.unwrap(); + if received.len() == message_ids.len() { + break; + } + } + assert_eq!(received.len(), message_ids.len()); + assert_eq!(received, message_ids); + } + #[tokio::test] #[cfg(any(feature = "tokio-runtime", feature = "tokio-rustls-runtime"))] async fn unsized_data() { From 19703505ee590ced1dc0051fad31309f4d5bf9c8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 27 Aug 2025 11:31:01 +0800 Subject: [PATCH 4/4] Fix the image version --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3cd1fba..f7a77a9 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -30,7 +30,7 @@ jobs: pulsar-version: [ 2.10.6, 2.11.4, 3.0.8, 3.2.4, 3.3.3, 4.0.1 ] steps: - name: Start Pulsar Standalone Container - run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -p 8081:8081 -p 6651:6651 -p 8443:8443 -v $PWD/.github/certs:/certs -d -e GITHUB_ACTIONS=true -e CI=true -e PULSAR_PREFIX_brokerServicePortTls=6651 -e PULSAR_PREFIX_webServicePortTls=8443 -e PULSAR_PREFIX_tlsEnabled=true -e PULSAR_PREFIX_tlsCertificateFilePath=/certs/server.cert.pem -e PULSAR_PREFIX_tlsKeyFilePath=/certs/server.key-pk8.pem -e PULSAR_PREFIX_tlsTrustCertsFilePath=/certs/ca.cert.pem apachepulsar/pulsar:3.1.3 bash -c “/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone” + run: docker run --name pulsar -p 6650:6650 -p 8080:8080 -p 8081:8081 -p 6651:6651 -p 8443:8443 -v $PWD/.github/certs:/certs -d -e GITHUB_ACTIONS=true -e CI=true -e PULSAR_PREFIX_brokerServicePortTls=6651 -e PULSAR_PREFIX_webServicePortTls=8443 -e PULSAR_PREFIX_tlsEnabled=true -e PULSAR_PREFIX_tlsCertificateFilePath=/certs/server.cert.pem -e PULSAR_PREFIX_tlsKeyFilePath=/certs/server.key-pk8.pem -e PULSAR_PREFIX_tlsTrustCertsFilePath=/certs/ca.cert.pem apachepulsar/pulsar:${{ matrix.pulsar-version }} bash -c “/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && /pulsar/bin/pulsar standalone” - uses: actions/checkout@v3 - uses: Swatinem/rust-cache@v2 - name: Run tests