From e15a11a63ec12860ce9a1032ad01acbe454c222f Mon Sep 17 00:00:00 2001 From: Hongbo Miao <3375461+hongbo-miao@users.noreply.github.com> Date: Thu, 21 Nov 2024 22:36:28 -0800 Subject: [PATCH] refactor(zeromq): switch from zmq to zeromq --- .../iads/iads-data-producer/src/main.rs | 76 +-- .../kafka-rust/zeromq-kafka-bridge/Cargo.lock | 502 ++++++++---------- .../kafka-rust/zeromq-kafka-bridge/Cargo.toml | 3 +- .../zeromq-kafka-bridge/src/main.rs | 22 +- 4 files changed, 275 insertions(+), 328 deletions(-) diff --git a/data-visualization/iads/iads-data-producer/src/main.rs b/data-visualization/iads/iads-data-producer/src/main.rs index ea6af5d5b3..0978492857 100644 --- a/data-visualization/iads/iads-data-producer/src/main.rs +++ b/data-visualization/iads/iads-data-producer/src/main.rs @@ -4,15 +4,15 @@ use tokio::net::TcpListener; use tokio::time::{sleep, Duration}; // Constants for packet structure -const HEADER_SIZE: i32 = 32; -const TAG_SIZE: i32 = 2; -const VALUE_SIZE: i32 = 4; -const TIME_PAIR_SIZE: i32 = (TAG_SIZE * 2) + (VALUE_SIZE * 2); // 2 tags + 2 values -const SIGNAL_PAIR_SIZE: i32 = (TAG_SIZE * 2) + (VALUE_SIZE * 2); // 2 tags + 2 values +const HEADER_SIZE_BYTE: i32 = 32; +const TAG_SIZE_BYTE: i32 = 2; +const VALUE_SIZE_BYTE: i32 = 4; +const TIME_PAIR_SIZE_BYTE: i32 = (TAG_SIZE_BYTE * 2) + (VALUE_SIZE_BYTE * 2); // 2 tags + 2 values +const SIGNAL_PAIR_SIZE_BYTE: i32 = (TAG_SIZE_BYTE * 2) + (VALUE_SIZE_BYTE * 2); // 2 tags + 2 values // Configuration signals struct StreamConfig { - num_signals: i32, + signal_number: i32, rate_hz: f64, port: u16, } @@ -20,33 +20,33 @@ struct StreamConfig { impl Default for StreamConfig { fn default() -> Self { StreamConfig { - num_signals: 4, + signal_number: 4, rate_hz: 50.0, port: 49000, } } } -fn calculate_packet_size(num_signals: i32) -> i32 { - let signal_pairs = (num_signals + 1) / 2; - HEADER_SIZE + TIME_PAIR_SIZE + (signal_pairs * SIGNAL_PAIR_SIZE) +fn calculate_packet_size_byte(signal_number: i32) -> i32 { + let signal_pair_number = (signal_number + 1) / 2; + HEADER_SIZE_BYTE + TIME_PAIR_SIZE_BYTE + (signal_pair_number * SIGNAL_PAIR_SIZE_BYTE) } async fn send_data_stream( mut stream: tokio::net::TcpStream, config: &StreamConfig, ) -> std::io::Result<()> { - let packet_size = calculate_packet_size(config.num_signals); + let packet_size_byte = calculate_packet_size_byte(config.signal_number); let mut rng = rand::thread_rng(); // Calculate the interval in milliseconds from Hz let interval_ms = (1000.0 / config.rate_hz) as u64; // Calculate time increment in nanoseconds from Hz - let time_increment = (1.0 / config.rate_hz * 1_000_000_000.0) as i64; + let time_increment_ns = (1.0 / config.rate_hz * 1_000_000_000.0) as i64; - let mut time = 0i64; + let mut time_ns = 0i64; let mut packet_counter = 0i32; - let mut buffer = vec![0u8; packet_size as usize]; + let mut buffer = vec![0u8; packet_size_byte as usize]; // Handshake stream.write_all(&[1u8]).await?; @@ -56,49 +56,49 @@ async fn send_data_stream( let mut offset = 0usize; // Header - buffer[offset..offset + VALUE_SIZE as usize] - .copy_from_slice(&(packet_size - VALUE_SIZE).to_le_bytes()); - buffer[offset + VALUE_SIZE as usize..offset + VALUE_SIZE as usize * 2] + buffer[offset..offset + VALUE_SIZE_BYTE as usize] + .copy_from_slice(&(packet_size_byte - VALUE_SIZE_BYTE).to_le_bytes()); + buffer[offset + VALUE_SIZE_BYTE as usize..offset + VALUE_SIZE_BYTE as usize * 2] .copy_from_slice(&packet_counter.to_le_bytes()); - buffer[offset + VALUE_SIZE as usize * 2..offset + VALUE_SIZE as usize * 3] + buffer[offset + VALUE_SIZE_BYTE as usize * 2..offset + VALUE_SIZE_BYTE as usize * 3] .copy_from_slice(&packet_counter.to_le_bytes()); for i in 3..8 { - buffer[offset + VALUE_SIZE as usize * i..offset + VALUE_SIZE as usize * (i + 1)] + buffer[offset + VALUE_SIZE_BYTE as usize * i..offset + VALUE_SIZE_BYTE as usize * (i + 1)] .copy_from_slice(&0i32.to_le_bytes()); } - offset += HEADER_SIZE as usize; + offset += HEADER_SIZE_BYTE as usize; // Time tags and values for tag in [1u16, 2u16] { - buffer[offset..offset + TAG_SIZE as usize].copy_from_slice(&tag.to_le_bytes()); - offset += TAG_SIZE as usize; + buffer[offset..offset + TAG_SIZE_BYTE as usize].copy_from_slice(&tag.to_le_bytes()); + offset += TAG_SIZE_BYTE as usize; } - let time_high = (time >> 32) as u32; - let time_low = (time & 0xFFFFFFFF) as u32; - buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&time_high.to_le_bytes()); - offset += VALUE_SIZE as usize; - buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&time_low.to_le_bytes()); - offset += VALUE_SIZE as usize; + let time_high = (time_ns >> 32) as u32; + let time_low = (time_ns & 0xFFFFFFFF) as u32; + buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&time_high.to_le_bytes()); + offset += VALUE_SIZE_BYTE as usize; + buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&time_low.to_le_bytes()); + offset += VALUE_SIZE_BYTE as usize; // Signals with random values - for n in (0..config.num_signals).step_by(2) { + for n in (0..config.signal_number).step_by(2) { // Tags - buffer[offset..offset + TAG_SIZE as usize] + buffer[offset..offset + TAG_SIZE_BYTE as usize] .copy_from_slice(&((3 + n) as u16).to_le_bytes()); - offset += TAG_SIZE as usize; - buffer[offset..offset + TAG_SIZE as usize] + offset += TAG_SIZE_BYTE as usize; + buffer[offset..offset + TAG_SIZE_BYTE as usize] .copy_from_slice(&((4 + n) as u16).to_le_bytes()); - offset += TAG_SIZE as usize; + offset += TAG_SIZE_BYTE as usize; // Random values let value1: f32 = rng.gen_range(0.0..100.0); let value2: f32 = rng.gen_range(0.0..100.0); - buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&value1.to_le_bytes()); - offset += VALUE_SIZE as usize; - buffer[offset..offset + VALUE_SIZE as usize].copy_from_slice(&value2.to_le_bytes()); - offset += VALUE_SIZE as usize; + buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&value1.to_le_bytes()); + offset += VALUE_SIZE_BYTE as usize; + buffer[offset..offset + VALUE_SIZE_BYTE as usize].copy_from_slice(&value2.to_le_bytes()); + offset += VALUE_SIZE_BYTE as usize; } if let Err(e) = stream.write_all(&buffer).await { @@ -107,7 +107,7 @@ async fn send_data_stream( } packet_counter += 1; - time += time_increment; + time_ns += time_increment_ns; sleep(Duration::from_millis(interval_ms)).await; } } diff --git a/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.lock b/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.lock index 11cb3de66e..6da951ddd1 100644 --- a/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.lock +++ b/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.lock @@ -28,9 +28,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "arrayref" @@ -44,6 +44,30 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-trait" +version = "0.1.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -83,12 +107,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" -[[package]] -name = "bitflags" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" - [[package]] name = "bitflags" version = "2.6.0" @@ -126,25 +144,13 @@ checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "cc" -version = "1.1.34" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9" +checksum = "fd9de9f2205d5ef3fd67e685b0df337994ddd4495e2a28d185500d0e1edfea47" dependencies = [ - "jobserver", - "libc", "shlex", ] -[[package]] -name = "cfg-expr" -version = "0.15.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" -dependencies = [ - "smallvec", - "target-lexicon", -] - [[package]] name = "cfg-if" version = "1.0.0" @@ -191,47 +197,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "crossbeam" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" -dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-channel" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-deque" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" -dependencies = [ - "crossbeam-epoch", - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-epoch" -version = "0.9.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -249,12 +214,11 @@ checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "dashmap" -version = "6.1.0" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "crossbeam-utils", "hashbrown 0.14.5", "lock_api", "once_cell", @@ -262,14 +226,17 @@ dependencies = [ ] [[package]] -name = "dircpy" -version = "0.3.19" +name = "dashmap" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88521b0517f5f9d51d11925d8ab4523497dcf947073fa3231a311b63941131c" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ - "jwalk", - "log", - "walkdir", + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", ] [[package]] @@ -291,7 +258,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -318,9 +285,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "fixedbitset" @@ -414,7 +381,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -458,6 +425,17 @@ dependencies = [ "wasi 0.9.0+wasi-snapshot-preview1", ] +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", +] + [[package]] name = "gimli" version = "0.31.1" @@ -530,9 +508,9 @@ checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" [[package]] name = "hyper" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" +checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" dependencies = [ "bytes", "futures-channel", @@ -697,7 +675,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -754,18 +732,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" - -[[package]] -name = "jobserver" -version = "0.1.32" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" -dependencies = [ - "libc", -] +checksum = "540654e97a3f4470a492cd30ff187bc95d89557a903a2bbf112e2fae98104ef2" [[package]] name = "js-sys" @@ -776,16 +745,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "jwalk" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2735847566356cd2179a2a38264839308f7079fa96e6bd5a42d740460e003c56" -dependencies = [ - "crossbeam", - "rayon", -] - [[package]] name = "kakfa-rust-zeromq-kafka-bridge" version = "1.0.0" @@ -796,9 +755,8 @@ dependencies = [ "prost-build", "rdkafka", "schema_registry_converter", - "serde_json", "tokio", - "zmq", + "zeromq", ] [[package]] @@ -809,9 +767,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" [[package]] name = "libz-sys" @@ -874,7 +832,7 @@ dependencies = [ "proc-macro2", "quote", "regex-syntax", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -942,6 +900,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "num_enum" version = "0.5.11" @@ -984,7 +951,7 @@ version = "0.10.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ - "bitflags 2.6.0", + "bitflags", "cfg-if", "foreign-types", "libc", @@ -1001,7 +968,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1079,6 +1046,15 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.25" @@ -1086,7 +1062,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1096,14 +1072,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "toml_edit 0.19.15", + "toml_edit", ] [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -1135,7 +1111,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.87", + "syn 2.0.89", "tempfile", ] @@ -1149,7 +1125,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1171,23 +1147,33 @@ dependencies = [ ] [[package]] -name = "rayon" -version = "1.10.0" +name = "rand" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ - "either", - "rayon-core", + "libc", + "rand_chacha", + "rand_core", ] [[package]] -name = "rayon-core" -version = "1.12.1" +name = "rand_chacha" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ - "crossbeam-deque", - "crossbeam-utils", + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.15", ] [[package]] @@ -1233,7 +1219,7 @@ version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ - "bitflags 2.6.0", + "bitflags", ] [[package]] @@ -1242,7 +1228,7 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de0737333e7a9502c789a36d7c7fa6092a49895d4faa31ca5df163857ded2e9d" dependencies = [ - "getrandom", + "getrandom 0.1.16", "redox_syscall 0.1.57", "rust-argon2", ] @@ -1261,9 +1247,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -1335,11 +1321,11 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustix" -version = "0.38.39" +version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" +checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ - "bitflags 2.6.0", + "bitflags", "errno", "libc", "linux-raw-sys", @@ -1367,20 +1353,11 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01227be5826fa0690321a2ba6c5cd57a19cf3f6a09e76973b58e61de6ab9d1c1" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" dependencies = [ "windows-sys 0.59.0", ] @@ -1392,7 +1369,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcc3cf40651cf503827a34bcd7efbbd4750a7e3adc6768bb8089977e4d07303b" dependencies = [ "byteorder", - "dashmap", + "dashmap 6.1.0", "futures", "integer-encoding", "logos", @@ -1414,7 +1391,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.6.0", + "bitflags", "core-foundation", "core-foundation-sys", "libc", @@ -1423,9 +1400,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" dependencies = [ "core-foundation-sys", "libc", @@ -1433,22 +1410,22 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1463,15 +1440,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_spanned" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" -dependencies = [ - "serde", -] - [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -1543,9 +1511,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.87" +version = "2.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +checksum = "44d46482f1c1c87acd84dea20c1bf5ebff4c757009ed6bf19cfd36fb10e92c4e" dependencies = [ "proc-macro2", "quote", @@ -1554,9 +1522,9 @@ dependencies = [ [[package]] name = "sync_wrapper" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" dependencies = [ "futures-core", ] @@ -1569,33 +1537,14 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] -[[package]] -name = "system-deps" -version = "6.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" -dependencies = [ - "cfg-expr", - "heck", - "pkg-config", - "toml", - "version-compare", -] - -[[package]] -name = "target-lexicon" -version = "0.12.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" - [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", @@ -1615,6 +1564,26 @@ dependencies = [ "winapi", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -1651,7 +1620,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", ] [[package]] @@ -1665,15 +1634,17 @@ dependencies = [ ] [[package]] -name = "toml" -version = "0.8.19" +name = "tokio-util" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" dependencies = [ - "serde", - "serde_spanned", - "toml_datetime", - "toml_edit 0.22.22", + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "pin-project-lite", + "tokio", ] [[package]] @@ -1681,9 +1652,6 @@ name = "toml_datetime" version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" -dependencies = [ - "serde", -] [[package]] name = "toml_edit" @@ -1693,20 +1661,7 @@ checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ "indexmap", "toml_datetime", - "winnow 0.5.40", -] - -[[package]] -name = "toml_edit" -version = "0.22.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" -dependencies = [ - "indexmap", - "serde", - "serde_spanned", - "toml_datetime", - "winnow 0.6.20", + "winnow", ] [[package]] @@ -1742,9 +1697,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "unicode-ident" -version = "1.0.13" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" +checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" [[package]] name = "url" @@ -1770,26 +1725,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] -name = "vcpkg" -version = "0.2.15" +name = "uuid" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - -[[package]] -name = "version-compare" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852e951cb7832cb45cb1169900d19760cfa39b82bc0ea9c0e5a14ae88411c98b" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom 0.2.15", +] [[package]] -name = "walkdir" -version = "2.5.0" +name = "vcpkg" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" [[package]] name = "want" @@ -1834,7 +1782,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "wasm-bindgen-shared", ] @@ -1868,7 +1816,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1905,15 +1853,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -2041,15 +1980,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winnow" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" -dependencies = [ - "memchr", -] - [[package]] name = "write16" version = "1.0.0" @@ -2082,10 +2012,31 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.89", +] + [[package]] name = "zerofrom" version = "0.1.4" @@ -2103,18 +2054,35 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.89", "synstructure", ] [[package]] -name = "zeromq-src" -version = "0.2.6+4.3.4" +name = "zeromq" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc120b771270365d5ed0dfb4baf1005f2243ae1ae83703265cb3504070f4160b" +checksum = "6a4528179201f6eecf211961a7d3276faa61554c82651ecc66387f68fc3004bd" dependencies = [ - "cc", - "dircpy", + "async-trait", + "asynchronous-codec", + "bytes", + "crossbeam-queue", + "dashmap 5.5.3", + "futures-channel", + "futures-io", + "futures-task", + "futures-util", + "log", + "num-traits", + "once_cell", + "parking_lot", + "rand", + "regex", + "thiserror", + "tokio", + "tokio-util", + "uuid", ] [[package]] @@ -2136,27 +2104,5 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", -] - -[[package]] -name = "zmq" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd3091dd571fb84a9b3e5e5c6a807d186c411c812c8618786c3c30e5349234e7" -dependencies = [ - "bitflags 1.3.2", - "libc", - "zmq-sys", -] - -[[package]] -name = "zmq-sys" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e8351dc72494b4d7f5652a681c33634063bbad58046c1689e75270908fdc864" -dependencies = [ - "libc", - "system-deps", - "zeromq-src", + "syn 2.0.89", ] diff --git a/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.toml b/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.toml index b908400255..92c6a7312d 100644 --- a/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.toml +++ b/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/Cargo.toml @@ -9,9 +9,8 @@ futures = "0.3.31" prost = "0.13.3" rdkafka = { version = "0.36.2", features = ["cmake-build", "dynamic-linking"] } schema_registry_converter = { version = "4.2.0", features = ["easy", "proto_raw"] } -serde_json = "1.0.132" tokio = { version = "1.41.1", features = ["full"] } -zmq = "0.10.0" +zeromq = "0.4.1" [dev-dependencies] clippy = "0.0.302" diff --git a/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/src/main.rs b/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/src/main.rs index 96bab1d088..a72a53a11f 100644 --- a/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/src/main.rs +++ b/hm-kafka/kafka-client/kafka-rust/zeromq-kafka-bridge/src/main.rs @@ -10,7 +10,8 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use tokio::sync::mpsc; - +use zeromq::{Socket, SocketRecv}; +use zeromq::SubSocket; pub mod production { pub mod iot { include!(concat!(env!("OUT_DIR"), "/production.iot.rs")); @@ -80,12 +81,11 @@ async fn main() -> Result<(), Box> { const LOW_CAPACITY_THRESHOLD: usize = CHANNEL_BUFFER_SIZE / 10; const INACTIVITY_TIMEOUT: Duration = Duration::from_secs(5); - // Set up ZMQ subscriber - let context = zmq::Context::new(); - let socket = context.socket(zmq::SUB)?; - socket.connect("tcp://10.0.0.100:5555")?; - socket.set_subscribe(b"")?; - println!("Connected to ZMQ publisher"); + // Set up ZeroMQ subscriber + let mut socket = SubSocket::new(); + socket.connect("tcp://10.0.0.100:5555").await?; + socket.subscribe("").await?; + println!("Connected to ZeroMQ publisher"); // Set up Kafka producer and Schema Registry encoder let bootstrap_server = "localhost:9092"; @@ -148,10 +148,12 @@ async fn main() -> Result<(), Box> { } }); - // Main ZMQ receiving loop + // Main ZeroMQ receiving loop loop { - let msg = socket.recv_bytes(0)?; - match Signals::decode(&msg[..]) { + let msg = socket.recv().await?; + let default_bytes = prost::bytes::Bytes::new(); + let bytes = msg.get(0).unwrap_or(&default_bytes); + match Signals::decode(&bytes[..]) { Ok(signals) => { context.messages_received.fetch_add(1, Ordering::Relaxed); if let Err(e) = tx.send(signals).await {