From 4cc28cde6a815017035ac04786e3e29622c6a329 Mon Sep 17 00:00:00 2001 From: hippalus Date: Sat, 21 Dec 2024 04:02:33 +0100 Subject: [PATCH] Feat: Add Kafka integration for Parseable server #936 . --- Cargo.lock | 517 ++++++++- Cargo.toml | 11 +- scripts/kafka_log_stream_generator.py | 123 ++ src/cli.rs | 1024 ++++++++++++----- src/connectors/common/config.rs | 164 +++ src/connectors/common/mod.rs | 31 + src/connectors/common/processor.rs | 11 + src/connectors/common/shutdown.rs | 118 ++ src/connectors/common/types/mod.rs | 45 + src/connectors/kafka/config.rs | 627 ++++++++++ src/connectors/kafka/consumer.rs | 169 +++ src/connectors/kafka/metrics.rs | 37 + src/connectors/kafka/mod.rs | 231 ++++ .../kafka/partition_stream_queue.rs | 90 ++ src/connectors/kafka/processor.rs | 154 +++ src/connectors/kafka/rebalance_listener.rs | 65 ++ src/connectors/kafka/sink.rs | 68 ++ src/connectors/kafka/state.rs | 50 + src/connectors/mod.rs | 85 ++ src/lib.rs | 1 + src/main.rs | 34 +- src/metadata.rs | 14 + 22 files changed, 3313 insertions(+), 356 deletions(-) create mode 100644 scripts/kafka_log_stream_generator.py create mode 100644 src/connectors/common/config.rs create mode 100644 src/connectors/common/mod.rs create mode 100644 src/connectors/common/processor.rs create mode 100644 src/connectors/common/shutdown.rs create mode 100644 src/connectors/common/types/mod.rs create mode 100644 src/connectors/kafka/config.rs create mode 100644 src/connectors/kafka/consumer.rs create mode 100644 src/connectors/kafka/metrics.rs create mode 100644 src/connectors/kafka/mod.rs create mode 100644 src/connectors/kafka/partition_stream_queue.rs create mode 100644 src/connectors/kafka/processor.rs create mode 100644 src/connectors/kafka/rebalance_listener.rs create mode 100644 src/connectors/kafka/sink.rs create mode 100644 src/connectors/kafka/state.rs create mode 100644 src/connectors/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1b98329a3..15ca371a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -892,6 +892,56 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bollard" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d41711ad46fda47cd701f6908e59d1bd6b9a2b7464c0d0aeab95c6d37096ff8a" +dependencies = [ + "base64 0.22.0", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.1.0", + "http-body-util", + "hyper 1.4.1", + "hyper-named-pipe", + "hyper-rustls 0.27.3", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "rustls 0.23.13", + "rustls-native-certs 0.7.3", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.45.0-rc.26.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d7c5415e3a6bc6d3e99eff6268e488fd4ee25e7b28c10f08fa6760bd9de16e4" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "brotli" version = "6.0.0" @@ -1124,6 +1174,15 @@ dependencies = [ "chrono", ] +[[package]] +name = "cmake" +version = "0.1.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c682c223677e0e5b6b7f63a64b9351844c3f1b1678a68b7ee617e30fb082620e" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -1766,6 +1825,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", + "serde", ] [[package]] @@ -1814,11 +1874,34 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "docker_credential" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31951f49556e34d90ed28342e1df7e1cb7a229c4cab0aecc627b5d91edd41d07" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] + +[[package]] +name = "duct" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4ab5718d1224b63252cd0c6f74f6480f9ffeb117438a2e0f5cf6d9a4798929c" +dependencies = [ + "libc", + "once_cell", + "os_pipe", + "shared_child", +] + [[package]] name = "either" -version = "1.11.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" [[package]] name = "encoding_rs" @@ -1845,6 +1928,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -1860,6 +1954,18 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1924,9 +2030,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -1934,9 +2040,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -1951,9 +2057,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-lite" @@ -1972,9 +2078,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -1983,15 +2089,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-timer" @@ -2001,9 +2107,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -2160,6 +2266,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "home" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -2315,6 +2430,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -2340,7 +2470,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "rustls 0.23.13", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2381,6 +2511,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -2434,6 +2579,7 @@ checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown 0.12.3", + "serde", ] [[package]] @@ -2444,6 +2590,7 @@ checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" dependencies = [ "equivalent", "hashbrown 0.14.5", + "serde", ] [[package]] @@ -2611,6 +2758,29 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.5.0", + "libc", + "redox_syscall 0.5.7", +] + +[[package]] +name = "libz-sys" +version = "1.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2d16453e800a8cf6dd2fc3eb4bc99b786a9b90c663b8559a5b1a041bf89e472" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.1.4" @@ -2920,6 +3090,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -2999,6 +3190,18 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3008,6 +3211,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "os_pipe" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "overload" version = "0.1.1" @@ -3038,7 +3251,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets 0.48.5", ] @@ -3079,6 +3292,31 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax 0.8.5", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax 0.8.5", + "structmeta", + "syn 2.0.79", +] + [[package]] name = "parse-zoneinfo" version = "0.3.0" @@ -3147,6 +3385,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rdkafka", "regex", "relative-path", "reqwest 0.11.27", @@ -3161,10 +3400,13 @@ dependencies = [ "sha2", "static-files", "sysinfo", + "testcontainers", + "testcontainers-modules", "thiserror", "thread-priority", "tokio", "tokio-stream", + "tokio-util", "tonic", "tonic-web", "tower-http 0.6.1", @@ -3617,6 +3859,49 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b52c81ac3cac39c9639b95c20452076e74b8d9a71bc6fc4d83407af2ea6fff" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", + "tracing", +] + +[[package]] +name = "rdkafka-sys" +version = "4.8.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" +dependencies = [ + "cmake", + "libc", + "libz-sys", + "num_enum", + "openssl-sys", + "pkg-config", + "sasl2-sys", +] + +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -3626,6 +3911,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "regex" version = "1.11.0" @@ -3752,7 +4046,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.13", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.1.2", "rustls-pki-types", "serde", @@ -3906,6 +4200,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.2", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.8.0" @@ -3986,6 +4293,18 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "sasl2-sys" +version = "0.1.22+2.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f2a7f7efd9fc98b3a9033272df10709f5ee3fa0eabbd61a527a3a1ed6bd3c6" +dependencies = [ + "cc", + "duct", + "libc", + "pkg-config", +] + [[package]] name = "schannel" version = "0.1.23" @@ -4113,6 +4432,36 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad483d2ab0149d5a5ebcd9972a3852711e0153d863bf5a5d0391d28883c4a20" +dependencies = [ + "base64 0.22.0", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.5.0", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65569b702f41443e8bc8bbb1c5779bd0450bbe723b56198980e80ec45780bce2" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4150,6 +4499,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared_child" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09fa9338aed9a1df411814a5b2252f7cd206c55ae9bf2fa763f8de84603aa60c" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "shlex" version = "1.3.0" @@ -4300,6 +4659,29 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn 2.0.79", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "strum" version = "0.26.2" @@ -4411,6 +4793,44 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "testcontainers" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f40cc2bd72e17f328faf8ca7687fe337e61bccd8acf9674fa78dd3792b045e1" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "docker_credential", + "either", + "etcetera", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "serde", + "serde_json", + "serde_with", + "thiserror", + "tokio", + "tokio-stream", + "tokio-tar", + "tokio-util", + "url", +] + +[[package]] +name = "testcontainers-modules" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "064a2677e164cad39ef3c1abddb044d5a25c49d27005804563d8c4227aac8bd0" +dependencies = [ + "testcontainers", +] + [[package]] name = "thiserror" version = "1.0.64" @@ -4525,28 +4945,27 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.37.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", "libc", - "mio 0.8.11", - "num_cpus", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -4596,6 +5015,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tar" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.3.5", + "tokio", + "tokio-stream", + "xattr", +] + [[package]] name = "tokio-util" version = "0.7.10" @@ -4817,6 +5251,7 @@ dependencies = [ "sharded-slab", "smallvec", "thread_local", + "time", "tracing", "tracing-core", "tracing-log", @@ -4952,9 +5387,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", ] @@ -4995,6 +5430,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vergen" version = "8.3.1" @@ -5326,6 +5767,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -5523,6 +5973,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "xattr" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" +dependencies = [ + "libc", + "linux-raw-sys 0.4.13", + "rustix 0.38.34", +] + [[package]] name = "xxhash-rust" version = "0.8.10" diff --git a/Cargo.toml b/Cargo.toml index 455321afe..92d6fa2de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,11 @@ actix-web-prometheus = { version = "0.1" } actix-web-static-files = "4.0" mime = "0.3.17" +### connectors dependencies +rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] } +testcontainers = "0.23" +testcontainers-modules = { version = "0.11", features = ["kafka"] } + ### other dependencies anyhow = { version = "1.0", features = ["backtrace"] } argon2 = "0.5.0" @@ -81,13 +86,15 @@ static-files = "0.2" sysinfo = "0.31.4" thiserror = "1.0.64" thread-priority = "1.0.0" -tokio = { version = "1.28", default-features = false, features = [ +tokio = { version = "1.42", default-features = false, features = [ "sync", "macros", "fs", + "rt-multi-thread" ] } tokio-stream = { version = "0.1", features = ["fs"] } -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +tokio-util = "0.7" +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "time"] } ulid = { version = "1.0", features = ["serde"] } uptime_lib = "0.3.0" xxhash-rust = { version = "0.8", features = ["xxh3"] } diff --git a/scripts/kafka_log_stream_generator.py b/scripts/kafka_log_stream_generator.py new file mode 100644 index 000000000..93eed25b3 --- /dev/null +++ b/scripts/kafka_log_stream_generator.py @@ -0,0 +1,123 @@ +import json +import time +from datetime import datetime, timezone +from random import choice, randint +from uuid import uuid4 +from confluent_kafka import Producer + +# Configuration +config = { + "kafka_broker": "localhost:9092", # Replace with your Kafka broker address + "kafka_topic": "log-stream", # Replace with your Kafka topic name + "log_rate": 500, # Logs per second + "log_template": { + "timestamp": "", # Timestamp will be added dynamically + "correlation_id": "", # Unique identifier for tracing requests + "level": "INFO", # Log level (e.g., INFO, ERROR, DEBUG) + "message": "", # Main log message to be dynamically set + "pod": { + "name": "example-pod", # Kubernetes pod name + "namespace": "default", # Kubernetes namespace + "node": "node-01" # Kubernetes node name + }, + "request": { + "method": "", # HTTP method + "path": "", # HTTP request path + "remote_address": "" # IP address of the client + }, + "response": { + "status_code": 200, # HTTP response status code + "latency_ms": 0 # Latency in milliseconds + }, + "metadata": { + "container_id": "", # Container ID + "image": "example/image:1.0", # Docker image + "environment": "prod" # Environment (e.g., dev, staging, prod) + } + } +} + +producer = Producer({"bootstrap.servers": config["kafka_broker"]}) + + +def delivery_report(err, msg): + if err is not None: + print(f"Delivery failed for message {msg.key()}: {err}") + else: + print(f"Message delivered to {msg.topic()} [{msg.partition()}]") + + +def generate_log(): + log = config["log_template"].copy() + log["timestamp"] = datetime.now(timezone.utc).isoformat() + log["correlation_id"] = str(uuid4()) + + levels = ["INFO", "WARNING", "ERROR", "DEBUG"] + messages = [ + "Received incoming HTTP request", + "Processed request successfully", + "Failed to process request", + "Request timeout encountered", + "Service unavailable" + ] + log["level"] = choice(levels) + log["message"] = choice(messages) + + # Populate request fields + methods = ["GET", "POST", "PUT", "DELETE"] + paths = ["/api/resource", "/api/login", "/api/logout", "/api/data"] + log["request"] = { + "method": choice(methods), + "path": choice(paths), + "remote_address": f"192.168.1.{randint(1, 255)}" + } + + # Populate response fields + log["response"] = { + "status_code": choice([200, 201, 400, 401, 403, 404, 500]), + "latency_ms": randint(10, 1000) + } + + # Populate pod and metadata fields + log["pod"] = { + "name": f"pod-{randint(1, 100)}", + "namespace": choice(["default", "kube-system", "production", "staging"]), + "node": f"node-{randint(1, 10)}" + } + + log["metadata"] = { + "container_id": f"container-{randint(1000, 9999)}", + "image": f"example/image:{randint(1, 5)}.0", + "environment": choice(["dev", "staging", "prod"]) + } + + return log + + +def main(): + try: + while True: + # Generate log message + log_message = generate_log() + log_json = json.dumps(log_message) + + # Send to Kafka + producer.produce( + config["kafka_topic"], + value=log_json, + callback=delivery_report + ) + + # Flush the producer to ensure delivery + producer.flush() + + # Wait based on the log rate + time.sleep(1 / config["log_rate"]) + except KeyboardInterrupt: + print("Stopped log generation.") + finally: + producer.flush() + + +if __name__ == "__main__": + main() diff --git a/src/cli.rs b/src/cli.rs index 982a2a765..9cb1ae840 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -18,9 +18,14 @@ use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches}; use std::path::PathBuf; - +use std::time::Duration; +use tracing::warn; use url::Url; +use crate::connectors::common::config::ConnectorConfig; +use crate::connectors::common::types::ConnectorType; +use crate::connectors::common::BadData; +use crate::connectors::kafka::config::{ConsumerConfig, KafkaConfig, SourceOffset}; use crate::{ oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, @@ -119,6 +124,8 @@ pub struct Cli { pub trino_auth: Option, pub trino_schema: Option, pub trino_catalog: Option, + //Connectors config + pub connector_config: Option, } impl Cli { @@ -164,6 +171,46 @@ impl Cli { pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; + // ConnectorConfig arguments + pub const CONNECTOR_NAME: &'static str = "connector-name"; + pub const CONNECTOR_BUFFER_SIZE: &'static str = "connector-buffer-size"; + pub const CONNECTOR_BUFFER_TIMEOUT: &'static str = "connector-buffer-timeout"; + pub const CONNECTOR_OFFSET_MODE: &'static str = "connector-offset-mode"; // earliest, latest, group + pub const CONNECTOR_BAD_DATA_POLICY: &'static str = "connector-bad-data-policy"; // e.g. "drop", "fail" , "dlt" + pub const CONNECTOR_MAX_RETRIES: &'static str = "connector-max-retries"; + pub const CONNECTOR_RETRY_INTERVAL_MS: &'static str = "connector-retry-interval-ms"; + pub const CONNECTOR_METRICS_ENABLED: &'static str = "connector-metrics-enabled"; + pub const CONNECTOR_INSTANCE_ID: &'static str = "connector-instance-id"; + + // ConsumerConfig arguments + pub const CONSUMER_GROUP_INSTANCE_ID: &'static str = "consumer-group-instance-id"; + pub const CONSUMER_PARTITION_ASSIGNMENT_STRATEGY: &'static str = + "consumer-partition-assignment-strategy"; + pub const CONSUMER_SESSION_TIMEOUT_MS: &'static str = "consumer-session-timeout-ms"; + pub const CONSUMER_HEARTBEAT_INTERVAL_MS: &'static str = "consumer-heartbeat-interval-ms"; + pub const CONSUMER_MAX_POLL_INTERVAL_MS: &'static str = "consumer-max-poll-interval-ms"; + pub const CONSUMER_ENABLE_AUTO_COMMIT: &'static str = "consumer-enable-auto-commit"; + pub const CONSUMER_AUTO_COMMIT_INTERVAL_MS: &'static str = "consumer-auto-commit-interval-ms"; + pub const CONSUMER_ENABLE_AUTO_OFFSET_STORE: &'static str = "consumer-enable-auto-offset-store"; + pub const CONSUMER_AUTO_OFFSET_RESET: &'static str = "consumer-auto-offset-reset"; + pub const CONSUMER_FETCH_MIN_BYTES: &'static str = "consumer-fetch-min-bytes"; + pub const CONSUMER_FETCH_MAX_BYTES: &'static str = "consumer-fetch-max-bytes"; + pub const CONSUMER_FETCH_MAX_WAIT_MS: &'static str = "consumer-fetch-max-wait-ms"; + pub const CONSUMER_MAX_PARTITION_FETCH_BYTES: &'static str = + "consumer-max-partition-fetch-bytes"; + pub const CONSUMER_QUEUED_MIN_MESSAGES: &'static str = "consumer-queued-min-messages"; + pub const CONSUMER_QUEUED_MAX_MESSAGES_KBYTES: &'static str = + "consumer-queued-max-messages-kbytes"; + pub const CONSUMER_ENABLE_PARTITION_EOF: &'static str = "consumer-enable-partition-eof"; + pub const CONSUMER_CHECK_CRCS: &'static str = "consumer-check-crcs"; + pub const CONSUMER_ISOLATION_LEVEL: &'static str = "consumer-isolation-level"; + pub const CONSUMER_FETCH_MESSAGE_MAX_BYTES: &'static str = "consumer-fetch-message-max-bytes"; + pub const CONSUMER_STATS_INTERVAL_MS: &'static str = "consumer-stats-interval-ms"; + + pub const KAFKA_TOPICS: &'static str = "kafka-topics"; + pub const KAFKA_BOOTSTRAP_SERVERS: &'static str = "kafka-bootstrap-servers"; + pub const KAFKA_GROUP_ID: &'static str = "kafka-group-id"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -177,57 +224,57 @@ impl Cli { pub fn create_cli_command_with_clap(name: &'static str) -> Command { Command::new(name).next_line_help(false) - .arg( - Arg::new(Self::TRINO_ENDPOINT) - .long(Self::TRINO_ENDPOINT) - .env("P_TRINO_ENDPOINT") - .value_name("STRING") - .help("Address and port for Trino HTTP(s) server"), - ) - .arg( - Arg::new(Self::TRINO_CATALOG_NAME) - .long(Self::TRINO_CATALOG_NAME) - .env("P_TRINO_CATALOG_NAME") - .value_name("STRING") - .help("Name of the catalog to be queried (Translates to X-Trino-Catalog)"), - ) - .arg( - Arg::new(Self::TRINO_SCHEMA) - .long(Self::TRINO_SCHEMA) - .env("P_TRINO_SCHEMA") - .value_name("STRING") - .help("Name of schema to be queried (Translates to X-Trino-Schema)"), - ) - .arg( - Arg::new(Self::TRINO_USER_NAME) - .long(Self::TRINO_USER_NAME) - .env("P_TRINO_USER_NAME") - .value_name("STRING") - .help("Name of Trino user (Translates to X-Trino-User)"), - ) - .arg( - Arg::new(Self::TRINO_AUTHORIZATION) - .long(Self::TRINO_AUTHORIZATION) - .env("P_TRINO_AUTHORIZATION") - .value_name("STRING") - .help("Base 64 encoded in the format username:password"), - ) - .arg( - Arg::new(Self::TLS_CERT) - .long(Self::TLS_CERT) - .env("P_TLS_CERT_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where certificate file is located. Required to enable TLS"), - ) - .arg( - Arg::new(Self::TLS_KEY) - .long(Self::TLS_KEY) - .env("P_TLS_KEY_PATH") - .value_name("PATH") - .value_parser(validation::file_path) - .help("Local path on this device where private key file is located. Required to enable TLS"), - ) + .arg( + Arg::new(Self::TRINO_ENDPOINT) + .long(Self::TRINO_ENDPOINT) + .env("P_TRINO_ENDPOINT") + .value_name("STRING") + .help("Address and port for Trino HTTP(s) server"), + ) + .arg( + Arg::new(Self::TRINO_CATALOG_NAME) + .long(Self::TRINO_CATALOG_NAME) + .env("P_TRINO_CATALOG_NAME") + .value_name("STRING") + .help("Name of the catalog to be queried (Translates to X-Trino-Catalog)"), + ) + .arg( + Arg::new(Self::TRINO_SCHEMA) + .long(Self::TRINO_SCHEMA) + .env("P_TRINO_SCHEMA") + .value_name("STRING") + .help("Name of schema to be queried (Translates to X-Trino-Schema)"), + ) + .arg( + Arg::new(Self::TRINO_USER_NAME) + .long(Self::TRINO_USER_NAME) + .env("P_TRINO_USER_NAME") + .value_name("STRING") + .help("Name of Trino user (Translates to X-Trino-User)"), + ) + .arg( + Arg::new(Self::TRINO_AUTHORIZATION) + .long(Self::TRINO_AUTHORIZATION) + .env("P_TRINO_AUTHORIZATION") + .value_name("STRING") + .help("Base 64 encoded in the format username:password"), + ) + .arg( + Arg::new(Self::TLS_CERT) + .long(Self::TLS_CERT) + .env("P_TLS_CERT_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where certificate file is located. Required to enable TLS"), + ) + .arg( + Arg::new(Self::TLS_KEY) + .long(Self::TLS_KEY) + .env("P_TLS_KEY_PATH") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where private key file is located. Required to enable TLS"), + ) .arg( Arg::new(Self::TRUSTED_CA_CERTS_PATH) .long(Self::TRUSTED_CA_CERTS_PATH) @@ -236,273 +283,492 @@ impl Cli { .value_parser(validation::canonicalize_path) .help("Local path on this device where all trusted certificates are located.") ) - .arg( - Arg::new(Self::ADDRESS) - .long(Self::ADDRESS) - .env("P_ADDR") - .value_name("ADDR:PORT") - .default_value("0.0.0.0:8000") - .value_parser(validation::socket_addr) - .help("Address and port for Parseable HTTP(s) server"), - ) - .arg( - Arg::new(Self::STAGING) - .long(Self::STAGING) - .env("P_STAGING_DIR") - .value_name("DIR") - .default_value("./staging") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used as landing point for incoming events") - .next_line_help(true), - ) - .arg( - Arg::new(Self::CACHE) - .long(Self::CACHE) - .env("P_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::CACHE_SIZE) - .long(Self::CACHE_SIZE) - .env("P_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - .arg( - Arg::new(Self::QUERY_CACHE) - .long(Self::QUERY_CACHE) - .env("P_QUERY_CACHE_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for caching data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::QUERY_CACHE_SIZE) - .long(Self::QUERY_CACHE_SIZE) - .env("P_QUERY_CACHE_SIZE") - .value_name("size") - .default_value("1GiB") - .value_parser(validation::cache_size) - .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") - .next_line_help(true), - ) - .arg( - Arg::new(Self::USERNAME) - .long(Self::USERNAME) - .env("P_USERNAME") - .value_name("STRING") - .required(true) - .help("Admin username to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::PASSWORD) - .long(Self::PASSWORD) - .env("P_PASSWORD") - .value_name("STRING") - .required(true) - .help("Admin password to be set for this Parseable server"), - ) - .arg( - Arg::new(Self::CHECK_UPDATE) - .long(Self::CHECK_UPDATE) - .env("P_CHECK_UPDATE") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable checking for new Parseable release"), - ) - .arg( - Arg::new(Self::SEND_ANALYTICS) - .long(Self::SEND_ANALYTICS) - .env("P_SEND_ANONYMOUS_USAGE_DATA") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable anonymous telemetry data collection"), - ) - .arg( - Arg::new(Self::OPEN_AI_KEY) - .long(Self::OPEN_AI_KEY) - .env("P_OPENAI_API_KEY") - .value_name("STRING") - .required(false) - .help("OpenAI key to enable llm features"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_ID) - .long(Self::OPENID_CLIENT_ID) - .env("P_OIDC_CLIENT_ID") - .value_name("STRING") - .required(false) - .help("Client id for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_CLIENT_SECRET) - .long(Self::OPENID_CLIENT_SECRET) - .env("P_OIDC_CLIENT_SECRET") - .value_name("STRING") - .required(false) - .help("Client secret for OIDC provider"), - ) - .arg( - Arg::new(Self::OPENID_ISSUER) - .long(Self::OPENID_ISSUER) - .env("P_OIDC_ISSUER") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("OIDC provider's host address"), - ) - .arg( - Arg::new(Self::DOMAIN_URI) - .long(Self::DOMAIN_URI) - .env("P_ORIGIN_URI") - .value_name("URL") - .required(false) - .value_parser(validation::url) - .help("Parseable server global domain address"), - ) - .arg( - Arg::new(Self::GRPC_PORT) - .long(Self::GRPC_PORT) - .env("P_GRPC_PORT") - .value_name("PORT") - .default_value("8001") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for gRPC server"), - ) - .arg( - Arg::new(Self::FLIGHT_PORT) - .long(Self::FLIGHT_PORT) - .env("P_FLIGHT_PORT") - .value_name("PORT") - .default_value("8002") - .required(false) - .value_parser(value_parser!(u16)) - .help("Port for Arrow Flight Querying Engine"), - ) - .arg( - Arg::new(Self::CORS) - .long(Self::CORS) - .env("P_CORS") - .value_name("BOOL") - .required(false) - .default_value("true") - .value_parser(value_parser!(bool)) - .help("Enable/Disable CORS, default disabled"), - ) - .arg( - Arg::new(Self::LIVETAIL_CAPACITY) - .long(Self::LIVETAIL_CAPACITY) - .env("P_LIVETAIL_CAPACITY") - .value_name("NUMBER") - .default_value("1000") - .required(false) - .value_parser(value_parser!(usize)) - .help("Number of rows in livetail channel"), - ) - .arg( - Arg::new(Self::QUERY_MEM_POOL_SIZE) - .long(Self::QUERY_MEM_POOL_SIZE) - .env("P_QUERY_MEMORY_LIMIT") - .value_name("Gib") - .required(false) - .value_parser(value_parser!(u8)) - .help("Set a fixed memory limit for query"), - ) - .arg( - // RowGroupSize controls the number of rows present in one row group - // More rows = better compression but HIGHER Memory consumption during read/write - // 1048576 is the default value for DataFusion - Arg::new(Self::ROW_GROUP_SIZE) - .long(Self::ROW_GROUP_SIZE) - .env("P_PARQUET_ROW_GROUP_SIZE") - .value_name("NUMBER") - .required(false) - .default_value("1048576") - .value_parser(value_parser!(usize)) - .help("Number of rows in a row group"), - ).arg( - Arg::new(Self::MODE) - .long(Self::MODE) - .env("P_MODE") - .value_name("STRING") - .required(false) - .default_value("all") - .value_parser([ - "query", - "ingest", - "all"]) - .help("Mode of operation"), - ) - .arg( - Arg::new(Self::INGESTOR_ENDPOINT) - .long(Self::INGESTOR_ENDPOINT) - .env("P_INGESTOR_ENDPOINT") - .value_name("URL") - .required(false) - .help("URL to connect to this specific ingestor. Default is the address of the server.") - ) - .arg( - Arg::new(Self::PARQUET_COMPRESSION_ALGO) - .long(Self::PARQUET_COMPRESSION_ALGO) - .env("P_PARQUET_COMPRESSION_ALGO") - .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") - .required(false) - .default_value("lz4") - .value_parser([ - "uncompressed", - "snappy", - "gzip", - "lzo", - "brotli", - "lz4", - "zstd"]) - .help("Parquet compression algorithm"), - ) - .arg( - Arg::new(Self::HOT_TIER_PATH) - .long(Self::HOT_TIER_PATH) - .env("P_HOT_TIER_DIR") - .value_name("DIR") - .value_parser(validation::canonicalize_path) - .help("Local path on this device to be used for hot tier data") - .next_line_help(true), - ) - .arg( - Arg::new(Self::MAX_DISK_USAGE) - .long(Self::MAX_DISK_USAGE) - .env("P_MAX_DISK_USAGE_PERCENT") - .value_name("percentage") - .default_value("80.0") - .value_parser(validation::validate_disk_usage) - .help("Maximum allowed disk usage in percentage e.g 90.0 for 90%") - .next_line_help(true), - ) - .arg( - Arg::new(Self::MS_CLARITY_TAG) - .long(Self::MS_CLARITY_TAG) - .env("P_MS_CLARITY_TAG") - .value_name("STRING") - .required(false) - .help("Tag for MS Clarity"), - ) - .group( - ArgGroup::new("oidc") - .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) - .multiple(true) - ) + .arg( + Arg::new(Self::ADDRESS) + .long(Self::ADDRESS) + .env("P_ADDR") + .value_name("ADDR:PORT") + .default_value("0.0.0.0:8000") + .value_parser(validation::socket_addr) + .help("Address and port for Parseable HTTP(s) server"), + ) + .arg( + Arg::new(Self::STAGING) + .long(Self::STAGING) + .env("P_STAGING_DIR") + .value_name("DIR") + .default_value("./staging") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used as landing point for incoming events") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE) + .long(Self::CACHE) + .env("P_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::CACHE_SIZE) + .long(Self::CACHE_SIZE) + .env("P_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) + .arg( + Arg::new(Self::QUERY_CACHE) + .long(Self::QUERY_CACHE) + .env("P_QUERY_CACHE_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for caching data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::QUERY_CACHE_SIZE) + .long(Self::QUERY_CACHE_SIZE) + .env("P_QUERY_CACHE_SIZE") + .value_name("size") + .default_value("1GiB") + .value_parser(validation::cache_size) + .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") + .next_line_help(true), + ) + .arg( + Arg::new(Self::USERNAME) + .long(Self::USERNAME) + .env("P_USERNAME") + .value_name("STRING") + .required(true) + .help("Admin username to be set for this Parseable server"), + ) + .arg( + Arg::new(Self::PASSWORD) + .long(Self::PASSWORD) + .env("P_PASSWORD") + .value_name("STRING") + .required(true) + .help("Admin password to be set for this Parseable server"), + ) + .arg( + Arg::new(Self::CHECK_UPDATE) + .long(Self::CHECK_UPDATE) + .env("P_CHECK_UPDATE") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable checking for new Parseable release"), + ) + .arg( + Arg::new(Self::SEND_ANALYTICS) + .long(Self::SEND_ANALYTICS) + .env("P_SEND_ANONYMOUS_USAGE_DATA") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable anonymous telemetry data collection"), + ) + .arg( + Arg::new(Self::OPEN_AI_KEY) + .long(Self::OPEN_AI_KEY) + .env("P_OPENAI_API_KEY") + .value_name("STRING") + .required(false) + .help("OpenAI key to enable llm features"), + ) + .arg( + Arg::new(Self::OPENID_CLIENT_ID) + .long(Self::OPENID_CLIENT_ID) + .env("P_OIDC_CLIENT_ID") + .value_name("STRING") + .required(false) + .help("Client id for OIDC provider"), + ) + .arg( + Arg::new(Self::OPENID_CLIENT_SECRET) + .long(Self::OPENID_CLIENT_SECRET) + .env("P_OIDC_CLIENT_SECRET") + .value_name("STRING") + .required(false) + .help("Client secret for OIDC provider"), + ) + .arg( + Arg::new(Self::OPENID_ISSUER) + .long(Self::OPENID_ISSUER) + .env("P_OIDC_ISSUER") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("OIDC provider's host address"), + ) + .arg( + Arg::new(Self::DOMAIN_URI) + .long(Self::DOMAIN_URI) + .env("P_ORIGIN_URI") + .value_name("URL") + .required(false) + .value_parser(validation::url) + .help("Parseable server global domain address"), + ) + .arg( + Arg::new(Self::GRPC_PORT) + .long(Self::GRPC_PORT) + .env("P_GRPC_PORT") + .value_name("PORT") + .default_value("8001") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for gRPC server"), + ) + .arg( + Arg::new(Self::FLIGHT_PORT) + .long(Self::FLIGHT_PORT) + .env("P_FLIGHT_PORT") + .value_name("PORT") + .default_value("8002") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for Arrow Flight Querying Engine"), + ) + .arg( + Arg::new(Self::CORS) + .long(Self::CORS) + .env("P_CORS") + .value_name("BOOL") + .required(false) + .default_value("true") + .value_parser(value_parser!(bool)) + .help("Enable/Disable CORS, default disabled"), + ) + .arg( + Arg::new(Self::LIVETAIL_CAPACITY) + .long(Self::LIVETAIL_CAPACITY) + .env("P_LIVETAIL_CAPACITY") + .value_name("NUMBER") + .default_value("1000") + .required(false) + .value_parser(value_parser!(usize)) + .help("Number of rows in livetail channel"), + ) + .arg( + Arg::new(Self::QUERY_MEM_POOL_SIZE) + .long(Self::QUERY_MEM_POOL_SIZE) + .env("P_QUERY_MEMORY_LIMIT") + .value_name("Gib") + .required(false) + .value_parser(value_parser!(u8)) + .help("Set a fixed memory limit for query"), + ) + .arg( + // RowGroupSize controls the number of rows present in one row group + // More rows = better compression but HIGHER Memory consumption during read/write + // 1048576 is the default value for DataFusion + Arg::new(Self::ROW_GROUP_SIZE) + .long(Self::ROW_GROUP_SIZE) + .env("P_PARQUET_ROW_GROUP_SIZE") + .value_name("NUMBER") + .required(false) + .default_value("1048576") + .value_parser(value_parser!(usize)) + .help("Number of rows in a row group"), + ).arg( + Arg::new(Self::MODE) + .long(Self::MODE) + .env("P_MODE") + .value_name("STRING") + .required(false) + .default_value("all") + .value_parser([ + "query", + "ingest", + "all"]) + .help("Mode of operation"), + ) + .arg( + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") + .value_name("URL") + .required(false) + .help("URL to connect to this specific ingestor. Default is the address of the server.") + ) + .arg( + Arg::new(Self::PARQUET_COMPRESSION_ALGO) + .long(Self::PARQUET_COMPRESSION_ALGO) + .env("P_PARQUET_COMPRESSION_ALGO") + .value_name("[UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD]") + .required(false) + .default_value("lz4") + .value_parser([ + "uncompressed", + "snappy", + "gzip", + "lzo", + "brotli", + "lz4", + "zstd"]) + .help("Parquet compression algorithm"), + ) + .arg( + Arg::new(Self::HOT_TIER_PATH) + .long(Self::HOT_TIER_PATH) + .env("P_HOT_TIER_DIR") + .value_name("DIR") + .value_parser(validation::canonicalize_path) + .help("Local path on this device to be used for hot tier data") + .next_line_help(true), + ) + .arg( + Arg::new(Self::MAX_DISK_USAGE) + .long(Self::MAX_DISK_USAGE) + .env("P_MAX_DISK_USAGE_PERCENT") + .value_name("percentage") + .default_value("80.0") + .value_parser(validation::validate_disk_usage) + .help("Maximum allowed disk usage in percentage e.g 90.0 for 90%") + .next_line_help(true), + ) + .arg( + Arg::new(Self::MS_CLARITY_TAG) + .long(Self::MS_CLARITY_TAG) + .env("P_MS_CLARITY_TAG") + .value_name("STRING") + .required(false) + .help("Tag for MS Clarity"), + ).arg( + Arg::new(Self::CONNECTOR_NAME) + .long(Self::CONNECTOR_NAME) + .env("P_CONNECTOR_NAME") + .required(false) + .help("Name of the connector") + ) + .arg( + Arg::new(Self::CONNECTOR_BUFFER_SIZE) + .long(Self::CONNECTOR_BUFFER_SIZE) + .env("P_CONNECTOR_BATCH_SIZE") + .value_parser(value_parser!(usize)) + .required(false) + .help("Buffer size for processing") + ) + .arg( + Arg::new(Self::CONNECTOR_BUFFER_TIMEOUT) + .long(Self::CONNECTOR_BUFFER_TIMEOUT) + .env("P_CONNECTOR_BUFFER_TIMEOUT") + .value_parser(value_parser!(u64)) + .required(false) + .help("Buffer timeout for processing") + ) + .arg( + Arg::new(Self::CONNECTOR_OFFSET_MODE) + .long(Self::CONNECTOR_OFFSET_MODE) + .required(false) + .env("P_CONNECTOR_OFFSET_MODE") + .value_parser(["earliest", "latest", "group"]) + .help("Offset mode: earliest, latest, or group") + ) + .arg( + Arg::new(Self::CONNECTOR_BAD_DATA_POLICY) + .long(Self::CONNECTOR_BAD_DATA_POLICY) + .required(false) + .env("P_CONNECTOR_BAD_DATA_POLICY") + .help("Bad data handling policy: skip, error") + ) + .arg( + Arg::new(Self::CONNECTOR_MAX_RETRIES) + .long(Self::CONNECTOR_MAX_RETRIES) + .env("P_CONNECTOR_MAX_RETRIES") + .required(false) + .value_parser(value_parser!(u32)) + .help("Maximum number of retries on errors") + ) + .arg( + Arg::new(Self::CONNECTOR_RETRY_INTERVAL_MS) + .long(Self::CONNECTOR_RETRY_INTERVAL_MS) + .env("P_CONNECTOR_RETRY_INTERVAL_MS") + .value_parser(value_parser!(u64)) + .required(false) + .help("Retry interval in milliseconds") + ) + .arg( + Arg::new(Self::CONNECTOR_METRICS_ENABLED) + .long(Self::CONNECTOR_METRICS_ENABLED) + .env("P_CONNECTOR_METRICS_ENABLED") + .value_parser(value_parser!(bool)) + .required(false) + .help("Enable or disable connector metrics") + ) + .arg( + Arg::new(Self::CONNECTOR_INSTANCE_ID) + .long(Self::CONNECTOR_INSTANCE_ID) + .required(false) + .env("P_CONNECTOR_INSTANCE_ID") + .help("Instance ID for the connector") + ) + + // ConsumerConfig arguments: + .arg( + Arg::new(Self::CONSUMER_GROUP_INSTANCE_ID) + .long(Self::CONSUMER_GROUP_INSTANCE_ID) + .required(false) + .env("P_CONSUMER_GROUP_INSTANCE_ID") + .help("Consumer group instance ID") + ) + .arg( + Arg::new(Self::CONSUMER_PARTITION_ASSIGNMENT_STRATEGY) + .long(Self::CONSUMER_PARTITION_ASSIGNMENT_STRATEGY) + .env("P_CONSUMER_PARTITION_ASSIGNMENT_STRATEGY") + .help("Partition assignment strategy") + .required(false) + ) + .arg( + Arg::new(Self::CONSUMER_SESSION_TIMEOUT_MS) + .long(Self::CONSUMER_SESSION_TIMEOUT_MS) + .env("P_CONSUMER_SESSION_TIMEOUT_MS") + .value_parser(value_parser!(u32)) + .help("Consumer session timeout in ms") + .required(false) + ) + .arg( + Arg::new(Self::CONSUMER_HEARTBEAT_INTERVAL_MS) + .long(Self::CONSUMER_HEARTBEAT_INTERVAL_MS) + .env("P_CONSUMER_HEARTBEAT_INTERVAL_MS") + .value_parser(value_parser!(u32)) + .help("Consumer heartbeat interval in ms") + .required(false) + ) + .arg( + Arg::new(Self::CONSUMER_MAX_POLL_INTERVAL_MS) + .long(Self::CONSUMER_MAX_POLL_INTERVAL_MS) + .env("P_CONSUMER_MAX_POLL_INTERVAL_MS") + .value_parser(value_parser!(u32)) + .help("Max poll interval in ms") + .required(false) + ) + .arg( + Arg::new(Self::CONSUMER_ENABLE_AUTO_OFFSET_STORE) + .long(Self::CONSUMER_ENABLE_AUTO_OFFSET_STORE) + .env("P_CONSUMER_ENABLE_AUTO_OFFSET_STORE") + .value_parser(value_parser!(bool)) + .help("Enable auto offset store") + .default_value("true") // Just for as few metrics + .required(false) + ) + .arg( + Arg::new(Self::CONSUMER_AUTO_OFFSET_RESET) + .long(Self::CONSUMER_AUTO_OFFSET_RESET) + .env("P_CONSUMER_AUTO_OFFSET_RESET") + .value_parser(["earliest", "latest", "none"]) + .help("Auto offset reset behavior") + ) + .arg( + Arg::new(Self::CONSUMER_FETCH_MIN_BYTES) + .long(Self::CONSUMER_FETCH_MIN_BYTES) + .env("P_CONSUMER_FETCH_MIN_BYTES") + .value_parser(value_parser!(u32)) + .help("Fetch min bytes") + ) + .arg( + Arg::new(Self::CONSUMER_FETCH_MAX_BYTES) + .long(Self::CONSUMER_FETCH_MAX_BYTES) + .env("P_CONSUMER_FETCH_MAX_BYTES") + .value_parser(value_parser!(u32)) + .help("Fetch max bytes") + ) + .arg( + Arg::new(Self::CONSUMER_FETCH_MAX_WAIT_MS) + .long(Self::CONSUMER_FETCH_MAX_WAIT_MS) + .env("P_CONSUMER_FETCH_MAX_WAIT_MS") + .value_parser(value_parser!(u32)) + .help("Fetch max wait in ms") + ) + .arg( + Arg::new(Self::CONSUMER_MAX_PARTITION_FETCH_BYTES) + .long(Self::CONSUMER_MAX_PARTITION_FETCH_BYTES) + .env("P_CONSUMER_MAX_PARTITION_FETCH_BYTES") + .value_parser(value_parser!(u32)) + .help("Max partition fetch bytes") + ) + .arg( + Arg::new(Self::CONSUMER_QUEUED_MIN_MESSAGES) + .long(Self::CONSUMER_QUEUED_MIN_MESSAGES) + .env("P_CONSUMER_QUEUED_MIN_MESSAGES") + .value_parser(value_parser!(u32)) + .help("Queued min messages") + ) + .arg( + Arg::new(Self::CONSUMER_QUEUED_MAX_MESSAGES_KBYTES) + .long(Self::CONSUMER_QUEUED_MAX_MESSAGES_KBYTES) + .env("P_CONSUMER_QUEUED_MAX_MESSAGES_KBYTES") + .value_parser(value_parser!(u32)) + .help("Queued max messages kbytes") + ) + .arg( + Arg::new(Self::CONSUMER_ENABLE_PARTITION_EOF) + .long(Self::CONSUMER_ENABLE_PARTITION_EOF) + .env("P_CONSUMER_ENABLE_PARTITION_EOF") + .value_parser(value_parser!(bool)) + .help("Enable partition EOF") + ) + .arg( + Arg::new(Self::CONSUMER_CHECK_CRCS) + .long(Self::CONSUMER_CHECK_CRCS) + .env("P_CONSUMER_CHECK_CRCS") + .value_parser(value_parser!(bool)) + .help("Check CRCs") + ) + .arg( + Arg::new(Self::CONSUMER_ISOLATION_LEVEL) + .long(Self::CONSUMER_ISOLATION_LEVEL) + .env("P_CONSUMER_ISOLATION_LEVEL") + .value_parser(["read_uncommitted", "read_committed"]) + .help("Isolation level") + ) + .arg( + Arg::new(Self::CONSUMER_FETCH_MESSAGE_MAX_BYTES) + .long(Self::CONSUMER_FETCH_MESSAGE_MAX_BYTES) + .env("P_CONSUMER_FETCH_MESSAGE_MAX_BYTES") + .help("Fetch message max bytes (string)") + ) + .arg( + Arg::new(Self::CONSUMER_STATS_INTERVAL_MS) + .long(Self::CONSUMER_STATS_INTERVAL_MS) + .env("P_CONSUMER_STATS_INTERVAL_MS") + .value_parser(value_parser!(u64)) + .help("Consumer stats interval ms") + ) + .arg( + Arg::new(Self::KAFKA_TOPICS) + .long(Self::KAFKA_TOPICS) + .env("P_KAFKA_TOPICS") + .help("Kafka topics to consume from.Comma seperated string") + ) + .arg( + Arg::new(Self::KAFKA_BOOTSTRAP_SERVERS) + .long(Self::KAFKA_BOOTSTRAP_SERVERS) + .env("P_KAFKA_BOOTSTRAP_SERVERS") + .help("Kafka bootstrap servers.") + ) + .arg( + Arg::new(Self::KAFKA_GROUP_ID) + .long(Self::KAFKA_GROUP_ID) + .required(false) + .env("P_KAFKA_GROUP_ID") + .help("Kafka consumer group ID.") + ) + .group( + ArgGroup::new("oidc") + .args([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .requires_all([Self::OPENID_CLIENT_ID, Self::OPENID_CLIENT_SECRET, Self::OPENID_ISSUER]) + .multiple(true) + ) } } @@ -514,6 +780,126 @@ impl FromArgMatches for Cli { } fn update_from_arg_matches(&mut self, m: &clap::ArgMatches) -> Result<(), clap::Error> { + if matches!(self.mode, Mode::Query) { + self.connector_config = None; + } + + if let Some(topics) = m.get_one::(Cli::KAFKA_TOPICS).cloned() { + let bootstrap_servers = m + .get_one::(Cli::KAFKA_BOOTSTRAP_SERVERS) + .cloned() + .unwrap_or_default(); + let group_id = m + .get_one::(Cli::KAFKA_GROUP_ID) + .cloned() + .unwrap_or("parseable-default-group".to_string()); + + if topics.is_empty() { + return Err(clap::Error::raw( + clap::error::ErrorKind::MissingRequiredArgument, + "Kafka topics required in ingest/all mode.", + )); + } + if bootstrap_servers.is_empty() { + return Err(clap::Error::raw( + clap::error::ErrorKind::MissingRequiredArgument, + "Kafka bootstrap servers required in ingest/all mode.", + )); + } + + let offset_mode = match m + .get_one::(Cli::CONNECTOR_OFFSET_MODE) + .map(|s| s.as_str()) + { + Some("earliest") => SourceOffset::Earliest, + Some("latest") => SourceOffset::Latest, + Some("group") | None => SourceOffset::Group, + _ => SourceOffset::Latest, + }; + + let buffer_size = m + .get_one::(Cli::CONNECTOR_BUFFER_SIZE) + .cloned() + .unwrap_or(10000); + let buffer_timeout = m + .get_one::(Cli::CONNECTOR_BUFFER_TIMEOUT) + .cloned() + .unwrap_or(5000); + + let max_retries = m + .get_one::(Cli::CONNECTOR_MAX_RETRIES) + .cloned() + .unwrap_or(20); + let retry_interval_ms = m + .get_one::(Cli::CONNECTOR_RETRY_INTERVAL_MS) + .cloned() + .unwrap_or(10000); + let metrics_enabled = m + .get_one::(Cli::CONNECTOR_METRICS_ENABLED) + .cloned() + .unwrap_or(true); + let connector_name = m + .get_one::(Cli::CONNECTOR_NAME) + .cloned() + .unwrap_or_else(|| "parseable-connectors".to_string()); + let instance_id = m + .get_one::(Cli::CONNECTOR_INSTANCE_ID) + .cloned() + .unwrap_or_else(|| "parseable-connectors".to_string()); + + let bad_data_policy = m.get_one::(Cli::CONNECTOR_BAD_DATA_POLICY).cloned(); + let bad_data = match bad_data_policy.as_deref() { + Some("drop") => Some(BadData::Drop {}), + Some("fail") => Some(BadData::Fail {}), + Some("dlt") => Some(BadData::Dlt {}), + _ => None, + }; + + let auto_offset_reset = m + .get_one::(Cli::CONSUMER_AUTO_OFFSET_RESET) + .cloned() + .unwrap_or_else(|| "earliest".to_string()); + + let mut consumer = ConsumerConfig::default(); + consumer.group_id = group_id.clone(); + consumer.auto_offset_reset = auto_offset_reset; + + let topics: Vec = topics.split(",").map(|t| t.to_owned()).collect(); + let topics_clone = topics.to_vec(); + + let kafka_config = KafkaConfig::builder() + .bootstrap_servers(bootstrap_servers) + .topic(topics_clone) + .with_consumer(consumer) + .build() + .map_err(|e| { + clap::Error::raw(clap::error::ErrorKind::InvalidValue, e.to_string()) + })?; + + let mut connector_config = ConnectorConfig::builder() + .connector_type(ConnectorType::KafkaSource) + .name(connector_name) + .buffer_size(buffer_size) + .buffer_timeout(Duration::from_millis(buffer_timeout)) + .offset_mode(offset_mode) + .topic(topics) + .max_retries(max_retries) + .retry_interval(Duration::from_millis(retry_interval_ms)) + .metrics_enabled(metrics_enabled) + .kafka_config(kafka_config) + .instance_id(instance_id) + .build() + .map_err(|e| { + clap::Error::raw(clap::error::ErrorKind::InvalidValue, e.to_string()) + })?; + + connector_config.bad_data = bad_data; + + self.connector_config = Some(connector_config); + } else { + warn!("No Kafka topics provided"); + } + self.trino_catalog = m.get_one::(Self::TRINO_CATALOG_NAME).cloned(); self.trino_endpoint = m.get_one::(Self::TRINO_ENDPOINT).cloned(); self.trino_auth = m.get_one::(Self::TRINO_AUTHORIZATION).cloned(); diff --git a/src/connectors/common/config.rs b/src/connectors/common/config.rs new file mode 100644 index 000000000..3aed150a3 --- /dev/null +++ b/src/connectors/common/config.rs @@ -0,0 +1,164 @@ +use crate::connectors::common::types::ConnectorType; +use crate::connectors::common::{BadData, ConnectorError}; +use crate::connectors::kafka::config::{KafkaConfig, SourceOffset}; +use serde::{Deserialize, Serialize}; +use std::{time::Duration, vec}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConnectorConfig { + // Basic Configuration + pub connector_type: ConnectorType, + pub name: String, + + // Performance Configuration + pub buffer_size: usize, + pub buffer_timeout: Duration, + + // Topic/Stream Configuration + pub topics: Vec, + pub offset_mode: SourceOffset, + + // Error Handling + pub bad_data: Option, + pub max_retries: u32, + pub retry_interval: Duration, + + // Kafka-specific Configuration + #[serde(skip_serializing_if = "Option::is_none")] + pub kafka_config: Option, + + // Monitoring + pub metrics_enabled: bool, + pub instance_id: String, +} + +impl Default for ConnectorConfig { + fn default() -> Self { + Self { + connector_type: ConnectorType::KafkaSource, + name: String::from("parseable-connectors"), + buffer_size: 10000, + buffer_timeout: Duration::from_millis(500), + topics: vec![], + offset_mode: SourceOffset::Earliest, + bad_data: None, + max_retries: 3, + retry_interval: Duration::from_secs(5), + kafka_config: Some(KafkaConfig::default()), + metrics_enabled: true, + instance_id: String::from("parseable-connectors"), + } + } +} + +impl ConnectorConfig { + pub fn builder() -> ConnectorConfigBuilder { + ConnectorConfigBuilder::default() + } + + pub fn validate(&self) -> anyhow::Result<(), ConnectorError> { + if self.buffer_size == 0 { + return Err(ConnectorError::Config("Buffer size must be > 0".into())); + } + + if let Some(kafka_config) = &self.kafka_config { + self.validate_kafka_config(kafka_config)?; + } + + Ok(()) + } + + fn validate_kafka_config(&self, config: &KafkaConfig) -> Result<(), ConnectorError> { + if config.bootstrap_servers.is_empty() { + return Err(ConnectorError::Config("Bootstrap servers required".into())); + } + + if config.topics().is_empty() { + return Err(ConnectorError::Config("Topic name required".into())); + } + + Ok(()) + } +} + +#[derive(Default)] +pub struct ConnectorConfigBuilder { + config: ConnectorConfig, +} + +impl ConnectorConfigBuilder { + pub fn connector_type(mut self, connector_type: ConnectorType) -> Self { + self.config.connector_type = connector_type; + self + } + + pub fn name(mut self, name: impl Into) -> Self { + self.config.name = name.into(); + self + } + + pub fn buffer_size(mut self, buffer_size: usize) -> Self { + self.config.buffer_size = buffer_size; + self + } + + pub fn buffer_timeout(mut self, buffer_timeout: Duration) -> Self { + self.config.buffer_timeout = buffer_timeout; + self + } + + pub fn max_retries(mut self, max_retries: u32) -> Self { + self.config.max_retries = max_retries; + self + } + + pub fn instance_id(mut self, instance_id: String) -> Self { + self.config.instance_id = instance_id; + self + } + + pub fn retry_interval(mut self, retry_interval: Duration) -> Self { + self.config.retry_interval = retry_interval; + self + } + + pub fn metrics_enabled(mut self, metrics_enabled: bool) -> Self { + self.config.metrics_enabled = metrics_enabled; + self + } + + pub fn topic(mut self, topics: Vec) -> Self { + self.config.topics = topics; + self + } + + pub fn offset_mode(mut self, offset_mode: SourceOffset) -> Self { + self.config.offset_mode = offset_mode; + self + } + + pub fn kafka_config(mut self, kafka_config: KafkaConfig) -> Self { + self.config.kafka_config = Some(kafka_config); + self + } + + pub fn build(self) -> anyhow::Result { + let config = self.config; + config.validate()?; + Ok(config) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_config_validation() { + let result = ConnectorConfig::builder() + .connector_type(ConnectorType::KafkaSource) + .buffer_size(0) + .build(); + + assert!(result.is_err()); + } +} diff --git a/src/connectors/common/mod.rs b/src/connectors/common/mod.rs new file mode 100644 index 000000000..b0474ffa5 --- /dev/null +++ b/src/connectors/common/mod.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; +pub mod config; +pub mod processor; +pub mod shutdown; +pub mod types; + +#[derive(Debug, thiserror::Error)] +pub enum ConnectorError { + #[error("Kafka error: {0}")] + Kafka(#[from] rdkafka::error::KafkaError), + #[error("Configuration error: {0}")] + Config(String), + #[error("Processing error: {0}")] + Processing(String), + #[error("Initialization error: {0}")] + Init(String), +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd)] +#[serde(rename_all = "snake_case")] +pub enum BadData { + Fail {}, + Drop {}, + Dlt {}, +} + +impl Default for BadData { + fn default() -> Self { + BadData::Drop {} + } +} diff --git a/src/connectors/common/processor.rs b/src/connectors/common/processor.rs new file mode 100644 index 000000000..5f1e07bdc --- /dev/null +++ b/src/connectors/common/processor.rs @@ -0,0 +1,11 @@ +use async_trait::async_trait; + +#[async_trait] +pub trait Processor: Send + Sync + Sized + 'static { + async fn process(&self, records: IN) -> anyhow::Result; + + #[allow(unused_variables)] + async fn post_stream(&self) -> anyhow::Result<()> { + Ok(()) + } +} diff --git a/src/connectors/common/shutdown.rs b/src/connectors/common/shutdown.rs new file mode 100644 index 000000000..ba8f169cb --- /dev/null +++ b/src/connectors/common/shutdown.rs @@ -0,0 +1,118 @@ +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +#[derive(Debug)] +pub struct Shutdown { + cancel_token: CancellationToken, + shutdown_complete_tx: mpsc::Sender<()>, + shutdown_complete_rx: Option>, +} + +impl Shutdown { + pub fn new() -> Self { + let cancel_token = CancellationToken::new(); + let (shutdown_complete_tx, shutdown_complete_rx) = mpsc::channel(1); + Self { + cancel_token, + shutdown_complete_tx, + shutdown_complete_rx: Some(shutdown_complete_rx), + } + } + + pub fn start(&self) { + self.cancel_token.cancel(); + } + + pub async fn recv(&self) { + self.cancel_token.cancelled().await; + } + + pub async fn signal_listener(&self) { + let ctrl_c_signal = tokio::signal::ctrl_c(); + #[cfg(unix)] + let mut sigterm_signal = + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap(); + #[cfg(unix)] + tokio::select! { + _ = ctrl_c_signal => {}, + _ = sigterm_signal.recv() => {} + } + #[cfg(windows)] + let _ = ctrl_c_signal.await; + + warn!("Shutdown signal received!"); + self.start(); + } + + pub async fn complete(self) { + drop(self.shutdown_complete_tx); + self.shutdown_complete_rx.unwrap().recv().await; + info!("Shutdown complete!") + } +} + +impl Default for Shutdown { + fn default() -> Self { + Self::new() + } +} + +impl Clone for Shutdown { + fn clone(&self) -> Self { + Self { + cancel_token: self.cancel_token.clone(), + shutdown_complete_tx: self.shutdown_complete_tx.clone(), + shutdown_complete_rx: None, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::{Arc, Mutex}; + + use super::*; + use tokio::time::Duration; + + #[tokio::test] + async fn test_shutdown_recv() { + let shutdown = Shutdown::new(); + let shutdown_clone = shutdown.clone(); + // receive shutdown task + let task = tokio::spawn(async move { + shutdown_clone.recv().await; + 1 + }); + // start shutdown task after 200 ms + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(200)).await; + shutdown.start(); + }); + // if shutdown is not received within 5 seconds, fail test + let check_value = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => panic!("Shutdown not received within 5 seconds"), + v = task => v.unwrap(), + }; + assert_eq!(check_value, 1); + } + + #[tokio::test] + async fn test_shutdown_wait_for_complete() { + let shutdown = Shutdown::new(); + let shutdown_clone = shutdown.clone(); + let check_value: Arc> = Arc::new(Mutex::new(false)); + let check_value_clone = Arc::clone(&check_value); + // receive shutdown task + tokio::spawn(async move { + shutdown_clone.recv().await; + tokio::time::sleep(Duration::from_millis(200)).await; + let mut check: std::sync::MutexGuard<'_, bool> = check_value_clone.lock().unwrap(); + *check = true; + }); + shutdown.start(); + shutdown.complete().await; + let check = check_value.lock().unwrap(); + assert!(*check, "shutdown did not successfully wait for complete"); + } +} diff --git a/src/connectors/common/types/mod.rs b/src/connectors/common/types/mod.rs new file mode 100644 index 000000000..ee2085eab --- /dev/null +++ b/src/connectors/common/types/mod.rs @@ -0,0 +1,45 @@ +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::hash::Hash; + +#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] +pub enum ConnectorType { + KafkaSource, +} + +impl Display for ConnectorType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ConnectorType::KafkaSource => write!(f, "kafka_source"), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd)] +#[serde(rename_all = "snake_case")] +pub enum ConnectionType { + Source, + Sink, +} + +impl Display for ConnectionType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + ConnectionType::Source => write!(f, "SOURCE"), + ConnectionType::Sink => write!(f, "SINK"), + } + } +} + +impl TryFrom for ConnectionType { + type Error = String; + + fn try_from(value: String) -> anyhow::Result { + match value.to_lowercase().as_str() { + "source" => Ok(ConnectionType::Source), + "sink" => Ok(ConnectionType::Sink), + _ => Err(format!("Invalid connection type: {}", value)), + } + } +} diff --git a/src/connectors/kafka/config.rs b/src/connectors/kafka/config.rs new file mode 100644 index 000000000..367024372 --- /dev/null +++ b/src/connectors/kafka/config.rs @@ -0,0 +1,627 @@ +use anyhow::bail; +use rdkafka::Offset; +use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use rustls_pemfile::{self, Item}; +use serde::{Deserialize, Serialize}; +use std::fmt::Display; +use std::io::BufReader; +use std::sync::Arc; +use tracing::{debug, info}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KafkaConfig { + // Common configuration + pub bootstrap_servers: String, + topics: Vec, + pub client_id: Option, + + // Component-specific configurations + #[serde(skip_serializing_if = "Option::is_none")] + pub consumer: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub producer: Option, + + // Security and advanced settings + #[serde(skip_serializing_if = "Option::is_none")] + pub security: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConsumerConfig { + // Consumer group configuration + pub group_id: String, + pub group_instance_id: Option, + pub partition_assignment_strategy: String, + + // Session handling + pub session_timeout_ms: u32, + pub heartbeat_interval_ms: u32, + pub max_poll_interval_ms: u32, + + // Offset management + pub enable_auto_commit: bool, + pub auto_commit_interval_ms: u32, + pub enable_auto_offset_store: bool, + pub auto_offset_reset: String, + + // Fetch configuration + pub fetch_min_bytes: u32, + pub fetch_max_bytes: u32, + pub fetch_max_wait_ms: u32, + pub max_partition_fetch_bytes: u32, + + // Queue configuration + pub queued_min_messages: u32, + pub queued_max_messages_kbytes: u32, + + // Processing configuration + pub enable_partition_eof: bool, + pub check_crcs: bool, + pub isolation_level: String, + pub fetch_message_max_bytes: String, + pub stats_interval_ms: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProducerConfig { + pub acks: String, + pub compression_type: String, + pub batch_size: u32, + pub linger_ms: u32, + pub delivery_timeout_ms: u32, + pub max_in_flight_requests_per_connection: u32, + pub max_request_size: u32, + pub enable_idempotence: bool, + pub transaction_timeout_ms: Option, + pub queue_buffering_max_messages: u32, + queue_buffering_max_ms: u32, + retry_backoff_ms: u32, + batch_num_messages: u32, + retries: u32, +} + +impl Default for ConsumerConfig { + fn default() -> Self { + Self { + group_id: "default-cg".to_string(), + group_instance_id: Some("default-cg-ii".to_string()), + // NOTE: cooperative-sticky does not work well in rdkafka when using manual commit. + // @see https://github.com/confluentinc/librdkafka/issues/4629 + // @see https://github.com/confluentinc/librdkafka/issues/4368 + partition_assignment_strategy: "roundrobin,range".to_string(), + session_timeout_ms: 60000, + heartbeat_interval_ms: 3000, + max_poll_interval_ms: 300000, + enable_auto_commit: false, + auto_commit_interval_ms: 5000, + enable_auto_offset_store: true, + auto_offset_reset: "earliest".to_string(), + fetch_min_bytes: 1, + fetch_max_bytes: 52428800, + fetch_max_wait_ms: 500, + max_partition_fetch_bytes: 1048576, + queued_min_messages: 100000, + queued_max_messages_kbytes: 65536, + enable_partition_eof: false, + check_crcs: false, + isolation_level: "read_committed".to_string(), + fetch_message_max_bytes: "1048576".to_string(), + stats_interval_ms: Some(10000), + } + } +} + +impl Default for ProducerConfig { + fn default() -> Self { + Self { + acks: "all".to_string(), + compression_type: "lz4".to_string(), + batch_size: 16384, // 16KB default batch size + linger_ms: 5, // Small latency for better batching + delivery_timeout_ms: 120000, // 2 minute delivery timeout + max_in_flight_requests_per_connection: 5, + max_request_size: 1048576, // 1MB max request size + enable_idempotence: true, // Ensure exactly-once delivery + transaction_timeout_ms: Some(60000), // 1 minute transaction timeout + queue_buffering_max_messages: 100000, // Producer queue size + queue_buffering_max_ms: 100, // Max time to wait before sending + retry_backoff_ms: 100, // Backoff time between retries + batch_num_messages: 10000, // Messages per batch + retries: 3, // Number of retries + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[warn(non_camel_case_types)] +#[serde(rename_all = "UPPERCASE")] +#[allow(non_camel_case_types)] +pub enum SecurityProtocol { + Plaintext, + SSL, + SASL_SSL, + SASL_PLAINTEXT, +} + +impl Display for SecurityProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + SecurityProtocol::Plaintext => "PLAINTEXT", + SecurityProtocol::SSL => "SSL", + SecurityProtocol::SASL_SSL => "SASL_SSL", + SecurityProtocol::SASL_PLAINTEXT => "SASL_PLAINTEXT", + } + .to_string(); + write!(f, "{}", str) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SecurityConfig { + pub protocol: SecurityProtocol, + #[serde(skip_serializing_if = "Option::is_none")] + pub ssl_config: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sasl_config: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SSLConfig { + pub ca_certificate_pem: String, + pub client_certificate_pem: String, + pub client_key_pem: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum SASLMechanism { + Plain, + ScramSha256, + ScramSha512, + GssAPI, +} + +impl Display for SASLMechanism { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { + SASLMechanism::Plain => "PLAIN", + SASLMechanism::ScramSha256 => "SCRAM-SHA-256", + SASLMechanism::ScramSha512 => "SCRAM-SHA-512", + SASLMechanism::GssAPI => "GSSAPI", + } + .to_string(); + write!(f, "{}", str) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SASLConfig { + pub mechanism: SASLMechanism, + pub username: String, + pub password: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub kerberos_service_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub kerberos_principal: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub kerberos_keytab: Option, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SourceOffset { + Earliest, + Latest, + Group, +} + +impl SourceOffset { + pub fn get_offset(&self) -> Offset { + match self { + SourceOffset::Earliest => Offset::Beginning, + SourceOffset::Latest => Offset::End, + SourceOffset::Group => Offset::Stored, + } + } +} + +impl KafkaConfig { + pub fn new( + bootstrap_servers: String, + topics: Vec, + consumer_config: Option, + ) -> Self { + Self { + bootstrap_servers, + topics, + client_id: None, + consumer: consumer_config, + producer: None, + security: None, + } + } + + pub fn consumer_config(&self) -> rdkafka::ClientConfig { + let mut config = rdkafka::ClientConfig::new(); + config.set("bootstrap.servers", &self.bootstrap_servers); + + if let Some(client_id) = &self.client_id { + config + .set("client.id", format!("parseable-{}-ci", client_id)) + .set("client.rack", format!("parseable-{}-cr", client_id)); + } + + if let Some(consumer) = &self.consumer { + let enable_auto_commit = consumer.enable_auto_commit.to_string(); + let group_id = format!("parseable-{}-gi", &consumer.group_id); + info!("Setting group.id to {}", group_id); + config + .set("group.id", group_id) + .set("log_level", "7") + .set("enable.auto.commit", enable_auto_commit) + .set( + "enable.auto.offset.store", + consumer.enable_auto_offset_store.to_string(), + ) + .set("auto.offset.reset", &consumer.auto_offset_reset) + .set( + "partition.assignment.strategy", + &consumer.partition_assignment_strategy, + ) + .set( + "session.timeout.ms", + consumer.session_timeout_ms.to_string(), + ) + .set( + "heartbeat.interval.ms", + consumer.heartbeat_interval_ms.to_string(), + ) + .set( + "max.poll.interval.ms", + consumer.max_poll_interval_ms.to_string(), + ) + .set("fetch.min.bytes", consumer.fetch_min_bytes.to_string()) + .set("fetch.max.bytes", consumer.fetch_max_bytes.to_string()) + .set( + "fetch.message.max.bytes", + consumer.fetch_message_max_bytes.to_string(), + ) + .set( + "max.partition.fetch.bytes", + consumer.max_partition_fetch_bytes.to_string(), + ) + .set( + "queued.min.messages", + consumer.queued_min_messages.to_string(), + ) + .set( + "queued.max.messages.kbytes", + consumer.queued_max_messages_kbytes.to_string(), + ) + .set( + "enable.partition.eof", + consumer.enable_partition_eof.to_string(), + ) + .set("isolation.level", &consumer.isolation_level) + .set( + "statistics.interval.ms", + consumer.stats_interval_ms.unwrap_or(10000).to_string(), + ); + + if let Some(instance_id) = &consumer.group_instance_id { + config.set("group.instance.id", instance_id); + } + } + + self.apply_security_config(&mut config); + + info!("Consumer configuration: {:?}", config); + config + } + + pub fn producer_config(&self) -> rdkafka::config::ClientConfig { + let mut config = rdkafka::config::ClientConfig::new(); + config.set("bootstrap.servers", &self.bootstrap_servers); + + if let Some(client_id) = &self.client_id { + config + .set("client.id", format!("parseable-{}-ci", client_id)) + .set("client.rack", format!("parseable-{}-cr", client_id)); + } + + if let Some(producer_config) = &self.producer { + config + .set("acks", &producer_config.acks) + .set("compression.type", &producer_config.compression_type) + .set("batch.size", producer_config.batch_size.to_string()) + .set("linger.ms", producer_config.linger_ms.to_string()) + .set( + "delivery.timeout.ms", + producer_config.delivery_timeout_ms.to_string(), + ) + .set( + "max.in.flight.requests.per.connection", + producer_config + .max_in_flight_requests_per_connection + .to_string(), + ) + .set( + "max.request.size", + producer_config.max_request_size.to_string(), + ) + .set( + "enable.idempotence", + producer_config.enable_idempotence.to_string(), + ) + .set( + "batch.num.messages", + producer_config.batch_num_messages.to_string(), + ) + .set( + "queue.buffering.max.messages", + producer_config.queue_buffering_max_messages.to_string(), + ) + .set( + "queue.buffering.max.ms", + producer_config.queue_buffering_max_ms.to_string(), + ) + .set( + "retry.backoff.ms", + producer_config.retry_backoff_ms.to_string(), + ) + .set("retries", producer_config.retries.to_string()); + + if let Some(timeout) = producer_config.transaction_timeout_ms { + config.set("transaction.timeout.ms", timeout.to_string()); + } + } + + self.apply_security_config(&mut config); + + config + } + + fn apply_security_config(&self, config: &mut rdkafka::ClientConfig) { + let security = match &self.security { + Some(sec) => sec, + None => { + debug!("No security configuration provided, using PLAINTEXT"); + config.set("security.protocol", "plaintext"); + return; + } + }; + + config.set( + "security.protocol", + security.protocol.to_string().to_lowercase(), + ); + + if matches!( + security.protocol, + SecurityProtocol::SSL | SecurityProtocol::SASL_SSL + ) { + if let Some(ssl) = &security.ssl_config { + debug!("Applying SSL configuration"); + config + .set("ssl.ca.pem", &ssl.ca_certificate_pem) + .set("ssl.certificate.pem", &ssl.client_certificate_pem) + .set("ssl.key.pem", &ssl.client_key_pem); + } else { + panic!( + "SSL configuration required for {:?} protocol", + security.protocol + ); + } + } + + if matches!( + security.protocol, + SecurityProtocol::SASL_SSL | SecurityProtocol::SASL_PLAINTEXT + ) { + if let Some(sasl) = &security.sasl_config { + debug!( + "Applying SASL configuration with mechanism: {}", + sasl.mechanism.to_string() + ); + config + .set("sasl.mechanism", sasl.mechanism.to_string()) + .set("sasl.username", &sasl.username) + .set("sasl.password", &sasl.password); + + // Apply Kerberos-specific configuration if using GSSAPI + if matches!(sasl.mechanism, SASLMechanism::GssAPI) { + if let Some(service_name) = &sasl.kerberos_service_name { + config.set("sasl.kerberos.service.name", service_name); + } + if let Some(principal) = &sasl.kerberos_principal { + config.set("sasl.kerberos.principal", principal); + } + if let Some(keytab) = &sasl.kerberos_keytab { + config.set("sasl.kerberos.keytab", keytab); + } + } + } else { + panic!( + "SASL configuration required for {:?} protocol", + security.protocol + ); + } + } + } +} +impl Default for KafkaConfig { + fn default() -> Self { + Self { + // Common configuration with standard broker port + bootstrap_servers: "localhost:9092".to_string(), + topics: vec![], + client_id: None, // Let Kafka generate a unique client ID if not specified + + // Component-specific configurations with production-ready defaults + consumer: Some(ConsumerConfig::default()), + producer: Some(ProducerConfig::default()), + + // Security defaults to plaintext for development + // Production environments should explicitly configure security + security: Some(SecurityConfig { + protocol: SecurityProtocol::Plaintext, + ssl_config: None, + sasl_config: None, + }), + } + } +} + +impl KafkaConfig { + pub fn builder() -> KafkaConfigBuilder { + KafkaConfigBuilder::default() + } + + pub fn topics(&self) -> Vec<&str> { + self.topics.iter().map(|s| s.as_str()).collect() + } +} + +#[derive(Default, Debug)] +pub struct KafkaConfigBuilder { + config: KafkaConfig, +} + +impl KafkaConfigBuilder { + pub fn bootstrap_servers(mut self, servers: impl Into) -> Self { + self.config.bootstrap_servers = servers.into(); + self + } + + pub fn topic(mut self, topics: Vec) -> Self { + self.config.topics = topics; + self + } + + pub fn client_id(mut self, client_id: impl Into) -> Self { + self.config.client_id = Some(client_id.into()); + self + } + + pub fn with_consumer(mut self, consumer: ConsumerConfig) -> Self { + self.config.consumer = Some(consumer); + self + } + + pub fn with_producer(mut self, producer: ProducerConfig) -> Self { + self.config.producer = Some(producer); + self + } + + pub fn with_security(mut self, security: SecurityConfig) -> Self { + self.config.security = Some(security); + self + } + + pub fn build(self) -> anyhow::Result { + let config = self.config; + + if config.bootstrap_servers.is_empty() { + anyhow::bail!("bootstrap_servers cannot be empty"); + } + + Ok(config) + } +} + +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct KafkaCertificates { + ca_certificate: Arc>, + client_certificate: Arc>, + client_key: Arc>, +} + +#[allow(dead_code)] +fn parse_first_certificate(pem: &str) -> anyhow::Result> { + let mut reader = BufReader::new(pem.as_bytes()); + let items = rustls_pemfile::read_all(&mut reader); + + for item in items.flatten() { + if let Item::X509Certificate(cert_data) = item { + return Ok(cert_data); + } + } + bail!("No certificate found in PEM") +} + +#[allow(dead_code)] +fn parse_first_private_key(pem: &str) -> anyhow::Result> { + let mut reader = BufReader::new(pem.as_bytes()); + let items = rustls_pemfile::read_all(&mut reader); + + for item in items { + if let Ok(Item::Pkcs1Key(key_data)) = item { + return Ok(key_data.into()); + } + if let Ok(Item::Pkcs8Key(key_data)) = item { + return Ok(key_data.into()); + } + if let Ok(Item::Sec1Key(key_data)) = item { + return Ok(key_data.into()); + } + } + + bail!("No private key found in PEM") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_consumer_config() { + let consumer_config = ConsumerConfig { + group_id: "test-group".to_string(), + partition_assignment_strategy: "cooperative-sticky".to_string(), + ..ConsumerConfig::default() + }; + + let config = KafkaConfig::new( + "localhost:9092".to_string(), + vec!["test-topic".to_string()], + Some(consumer_config), + ); + + let rdkafka_config = config.consumer_config(); + assert_eq!(rdkafka_config.get("group.id"), Some("test-group")); + assert_eq!( + rdkafka_config.get("partition.assignment.strategy"), + Some("cooperative-sticky") + ); + } + + #[test] + fn test_default_kafka_config() { + let config = KafkaConfig::default(); + assert_eq!(config.bootstrap_servers, "localhost:9092"); + assert!(config.topics.is_empty()); + assert!(config.consumer.is_some()); + assert!(config.producer.is_some()); + + if let Some(producer) = config.producer { + assert_eq!(producer.acks, "all"); + assert!(producer.enable_idempotence); + assert_eq!(producer.compression_type, "snappy"); + } + } + + #[test] + fn test_kafka_config_builder() { + let config = KafkaConfig::builder() + .bootstrap_servers("kafka1:9092,kafka2:9092") + .topic(vec!["test-topic".to_string()]) + .client_id("test-client") + .build() + .unwrap(); + + assert_eq!(config.bootstrap_servers, "kafka1:9092,kafka2:9092"); + assert_eq!(config.topics.first().unwrap(), "test-topic"); + assert_eq!(config.client_id, Some("test-client".to_string())); + } +} diff --git a/src/connectors/kafka/consumer.rs b/src/connectors/kafka/consumer.rs new file mode 100644 index 000000000..529bf40ee --- /dev/null +++ b/src/connectors/kafka/consumer.rs @@ -0,0 +1,169 @@ +use crate::connectors::common::shutdown::Shutdown; +use crate::connectors::kafka::partition_stream_queue::PartitionStreamReceiver; +use crate::connectors::kafka::state::StreamState; +use crate::connectors::kafka::{ + partition_stream_queue, ConsumerRecord, KafkaContext, StreamConsumer, TopicPartition, +}; +use futures_util::FutureExt; +use rdkafka::consumer::Consumer; +use rdkafka::Statistics; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{error, info}; + +pub struct KafkaStreams { + consumer: Arc, + stream_state: Arc>, + statistics: Arc>, + shutdown_handle: Shutdown, +} + +impl KafkaStreams { + pub fn init( + context: KafkaContext, + stream_state: Arc>, + shutdown_handle: Shutdown, + ) -> anyhow::Result { + info!("Initializing KafkaStreams..."); + let consumer = KafkaStreams::create_consumer(context); + let statistics = Arc::new(std::sync::RwLock::new(Statistics::default())); + info!("KafkaStreams initialized successfully."); + + Ok(Self { + consumer, + stream_state, + statistics, + shutdown_handle, + }) + } + + pub fn consumer(&self) -> Arc { + Arc::clone(&self.consumer) + } + + pub fn statistics(&self) -> Arc> { + Arc::clone(&self.statistics) + } + + pub fn state(&self) -> Arc> { + Arc::clone(&self.stream_state) + } + + /// Manages Kafka partition streams manually due to limitations in `rust-rdkafka`'s `split_partition_queue`. + /// + /// This method continuously listens incoming Kafka messages, dynamically creating + /// or updating streams for each partition. It is implemented using a separate standard thread to avoid + /// potential deadlocks and long-running task issues encountered with `tokio::spawn`. + /// + /// Steps: + /// 1. Consumes Kafka messages in a loop, processes each message to identify the associated partition. + /// 2. Dynamically creates a new stream for untracked partitions, allowing for isolated processing. + /// 3. Updates existing streams when new messages arrive for already tracked partitions. + /// 4. Listens for shutdown signals and gracefully terminates all partition streams, unsubscribing the consumer. + /// + /// Limitations and References: + /// - Issues with `split_partition_queue` in rust-rdkafka: + /// - https://github.com/fede1024/rust-rdkafka/issues/535 + /// - https://github.com/confluentinc/librdkafka/issues/4059 + /// - https://github.com/fede1024/rust-rdkafka/issues/535 + /// - https://github.com/confluentinc/librdkafka/issues/4059 + /// - https://github.com/fede1024/rust-rdkafka/issues/654 + /// - https://github.com/fede1024/rust-rdkafka/issues/651 + /// - https://github.com/fede1024/rust-rdkafka/issues/604 + /// - https://github.com/fede1024/rust-rdkafka/issues/564 + /// + /// - Potential deadlocks and long-running task issues with `tokio::spawn`: + /// - Details on blocking vs. async design choices: + /// - https://ryhl.io/blog/async-what-is-blocking/ + /// + /// Returns: + /// A `ReceiverStream` that produces `PartitionStreamReceiver` for each active partition. + pub fn partitioned(&self) -> ReceiverStream { + let (stream_tx, stream_rx) = mpsc::channel(100); + let consumer = self.consumer(); + let stream_state = self.state(); + let tokio_handle = tokio::runtime::Handle::current(); + let shutdown_handle = self.shutdown_handle.clone(); + + std::thread::spawn(move || { + tokio_handle.block_on(async move { + loop { + tokio::select! { + result = consumer.recv() => { + match result { + Ok(msg) => { + let mut state = stream_state.write().await; + let tp = TopicPartition::from_kafka_msg(&msg); + let consumer_record = ConsumerRecord::from_borrowed_msg(msg); + let ps_tx = match state.get_partition_sender(&tp) { + Some(ps_tx) => ps_tx.clone(), + None => { + info!("Creating new stream for {:?}", tp); + let (ps_tx, ps_rx) = partition_stream_queue::bounded(10_000, tp.clone()); + state.insert_partition_sender(tp.clone(), ps_tx.clone()); + stream_tx.send(ps_rx).await.unwrap(); + ps_tx + } + }; + ps_tx.send(consumer_record).await; + } + Err(err) => { + error!("Cannot get message from kafka consumer! Cause {:?}", err); + break + }, + }; + }, + _ = shutdown_handle.recv() => { + info!("Gracefully stopping kafka partition streams!"); + let mut stream_state = stream_state.write().await; + stream_state.clear(); + consumer.unsubscribe(); + break; + }, + else => { + error!("KafkaStreams terminated!"); + break; + } + } + } + }) + }); + + ReceiverStream::new(stream_rx) + } + + fn create_consumer(context: KafkaContext) -> Arc { + info!("Creating Kafka consumer from configs {:#?}", context.config); + + let kafka_config = &context.config; + let consumer_config = kafka_config.consumer_config(); + info!("Consumer configs: {:#?}", &consumer_config); + + let consumer: StreamConsumer = consumer_config + .create_with_context(context.clone()) + .expect("Consumer creation failed"); + + if consumer.recv().now_or_never().is_some() { + panic!("Consumer should not have any messages"); + } + + let consumer = Arc::new(consumer); + + let topics = &kafka_config.topics(); + KafkaStreams::subscribe(&consumer, topics); + + consumer + } + + fn subscribe(consumer: &Arc, topics: &Vec<&str>) { + match consumer.subscribe(topics) { + Ok(_) => { + info!("Subscribed to topics: {:?}", topics); + } + Err(e) => { + error!("Error subscribing to topics: {:?} {:?}", topics, e); + } + }; + } +} diff --git a/src/connectors/kafka/metrics.rs b/src/connectors/kafka/metrics.rs new file mode 100644 index 000000000..3a7ea7e24 --- /dev/null +++ b/src/connectors/kafka/metrics.rs @@ -0,0 +1,37 @@ +use prometheus::core::{Collector, Desc}; +use prometheus::proto::MetricFamily; +use rdkafka::Statistics; +use std::sync::{Arc, RwLock}; + +#[derive(Debug)] +pub struct KafkaConsumerMetricsCollector { + stats: Arc>, +} + +impl KafkaConsumerMetricsCollector { + pub fn new(stats: Arc>) -> Self { + Self { stats } + } + + pub fn statistics(&self) -> Result { + match self.stats.read() { + Ok(stats) => Ok(stats.clone()), + Err(err) => Err(format!( + "Cannot get kafka statistics from RwLock. Error: {}", + err + )), + } + } +} + +impl Collector for KafkaConsumerMetricsCollector { + fn desc(&self) -> Vec<&Desc> { + //TODO: + vec![] + } + + fn collect(&self) -> Vec { + //TODO: encode metrics + vec![] + } +} diff --git a/src/connectors/kafka/mod.rs b/src/connectors/kafka/mod.rs new file mode 100644 index 000000000..132d8dc98 --- /dev/null +++ b/src/connectors/kafka/mod.rs @@ -0,0 +1,231 @@ +use crate::connectors::kafka::config::KafkaConfig; +use derive_more::Constructor; +use rdkafka::consumer::{ConsumerContext, Rebalance}; +use rdkafka::error::KafkaResult; +use rdkafka::message::{BorrowedMessage, Headers}; +use rdkafka::producer::ProducerContext; +use rdkafka::topic_partition_list::TopicPartitionListElem; +use rdkafka::{ClientContext, Message, Offset, Statistics}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; +use tracing::{error, info, warn}; + +pub mod config; +pub mod consumer; +pub mod metrics; +mod partition_stream_queue; +pub mod processor; +pub mod rebalance_listener; +pub mod sink; +pub mod state; +#[allow(dead_code)] +type BaseConsumer = rdkafka::consumer::BaseConsumer; +#[allow(dead_code)] +type FutureProducer = rdkafka::producer::FutureProducer; +type StreamConsumer = rdkafka::consumer::StreamConsumer; + +#[derive(Clone, Debug)] +pub struct KafkaContext { + config: Arc, + statistics: Arc>, + rebalance_tx: mpsc::Sender, +} + +impl KafkaContext { + pub fn new(config: Arc) -> (Self, Receiver) { + let (rebalance_tx, rebalance_rx) = mpsc::channel(10); + let statistics = Arc::new(RwLock::new(Statistics::default())); + ( + Self { + config, + statistics, + rebalance_tx, + }, + rebalance_rx, + ) + } + + pub fn notify(&self, rebalance_event: RebalanceEvent) { + let rebalance_sender = self.rebalance_tx.clone(); + std::thread::spawn(move || { + info!("Sending RebalanceEvent to listener..."); + if rebalance_sender.blocking_send(rebalance_event).is_err() { + warn!("Rebalance event receiver is closed!"); + } + info!("Sent RebalanceEvent to lister."); + }); + } + + pub fn config(&self) -> Arc { + Arc::clone(&self.config) + } +} + +#[derive(Debug, Clone)] +pub enum RebalanceEvent { + Assign(TopicPartitionList), + Revoke(TopicPartitionList, std::sync::mpsc::Sender<()>), +} + +impl RebalanceEvent { + pub fn get_assignment(&self) -> &TopicPartitionList { + match self { + RebalanceEvent::Assign(tpl) => tpl, + RebalanceEvent::Revoke(tpl, _) => tpl, + } + } +} + +#[derive(Constructor, Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct TopicPartition { + pub topic: String, + pub partition: i32, +} + +impl TopicPartition { + pub fn from_kafka_msg(msg: &BorrowedMessage) -> Self { + Self::new(msg.topic().to_owned(), msg.partition()) + } + + pub fn from_tp_elem(elem: &TopicPartitionListElem<'_>) -> Self { + Self::new(elem.topic().to_owned(), elem.partition()) + } +} + +#[derive(Constructor, Serialize, Deserialize, Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct TopicPartitionList { + pub tpl: Vec, +} + +impl TopicPartitionList { + pub fn from_rdkafka_tpl(tpl: &rdkafka::topic_partition_list::TopicPartitionList) -> Self { + let elements = tpl.elements(); + let mut tp_vec = Vec::with_capacity(elements.len()); + for ref element in elements { + let tp = TopicPartition::from_tp_elem(element); + tp_vec.push(tp); + } + Self::new(tp_vec) + } + + pub fn is_empty(&self) -> bool { + self.tpl.is_empty() + } +} + +#[derive(Constructor, Debug, Hash, Eq, PartialEq)] +pub struct ConsumerRecord { + pub payload: Option>, + pub key: Option>, + pub topic: String, + pub partition: i32, + pub offset: i64, + pub timestamp: Option, + //pub headers: Option>>, +} + +impl ConsumerRecord { + pub fn from_borrowed_msg(msg: BorrowedMessage) -> Self { + Self { + key: msg.key().map(|k| k.to_vec()), + payload: msg.payload().map(|p| p.to_vec()), + topic: msg.topic().to_owned(), + partition: msg.partition(), + offset: msg.offset(), + timestamp: msg.timestamp().to_millis(), + //headers: extract_headers(&msg), + } + } + + pub fn key_str(&self) -> String { + self.key + .clone() + .map(|k| String::from_utf8_lossy(k.as_ref()).to_string()) + .unwrap_or_else(|| String::from("null")) + } + + pub fn offset_to_commit(&self) -> KafkaResult { + let mut offset_to_commit = rdkafka::TopicPartitionList::new(); + offset_to_commit.add_partition_offset( + &self.topic, + self.partition, + Offset::Offset(self.offset + 1), + )?; + Ok(offset_to_commit) + } +} + +#[allow(unused)] +fn extract_headers(msg: &BorrowedMessage<'_>) -> Option>> { + msg.headers().map(|headers| { + headers + .iter() + .map(|header| { + ( + header.key.to_string(), + header.value.map(|v| String::from_utf8_lossy(v).to_string()), + ) + }) + .collect() + }) +} + +impl ConsumerContext for KafkaContext { + fn pre_rebalance( + &self, + _base_consumer: &rdkafka::consumer::BaseConsumer, + rebalance: &Rebalance<'_>, + ) { + info!("Running pre-rebalance with {:?}", rebalance); + match rebalance { + Rebalance::Revoke(tpl) => { + let (pq_waiter_tx, pq_waiter_rx) = std::sync::mpsc::channel(); + if pq_waiter_rx.recv().is_err() { + warn!("Queue termination sender dropped"); + } + let tpl = TopicPartitionList::from_rdkafka_tpl(tpl); + self.notify(RebalanceEvent::Revoke(tpl, pq_waiter_tx)); + } + Rebalance::Assign(tpl) => { + let tpl = TopicPartitionList::from_rdkafka_tpl(tpl); + self.notify(RebalanceEvent::Assign(tpl)); + } + + Rebalance::Error(err) => error!("Error occurred during rebalance {:?}", err), + }; + } + + fn post_rebalance( + &self, + _base_consumer: &rdkafka::consumer::BaseConsumer, + rebalance: &Rebalance<'_>, + ) { + info!("Running post-rebalance with {:?}", rebalance); + } +} + +impl ProducerContext for KafkaContext { + type DeliveryOpaque = (); + fn delivery( + &self, + _delivery_result: &rdkafka::message::DeliveryResult<'_>, + _delivery_opaque: Self::DeliveryOpaque, + ) { + } +} + +impl ClientContext for KafkaContext { + fn stats(&self, new_stats: Statistics) { + match self.statistics.write() { + Ok(mut stats) => { + *stats = new_stats; + } + Err(e) => { + error!("Cannot write to kafka statistics from RwLock. Error: {}", e) + } + }; + } +} diff --git a/src/connectors/kafka/partition_stream_queue.rs b/src/connectors/kafka/partition_stream_queue.rs new file mode 100644 index 000000000..732ede282 --- /dev/null +++ b/src/connectors/kafka/partition_stream_queue.rs @@ -0,0 +1,90 @@ +use crate::connectors::kafka::{ConsumerRecord, TopicPartition}; +use std::sync::Arc; +use tokio::sync::{mpsc, Notify}; +use tokio_stream::wrappers::ReceiverStream; +use tracing::info; + +#[derive(Clone)] +pub struct PartitionStreamSender { + inner: mpsc::Sender, + notify: Arc, +} + +impl PartitionStreamSender { + fn new(inner: mpsc::Sender, notify: Arc) -> Self { + Self { inner, notify } + } + + pub fn terminate(&self) { + self.notify.notify_waiters(); + } + + pub async fn send(&self, consumer_record: ConsumerRecord) { + self.inner.send(consumer_record).await.unwrap(); + } + + pub fn sender(&self) -> mpsc::Sender { + self.inner.clone() + } +} + +pub struct PartitionStreamReceiver { + inner: ReceiverStream, + topic_partition: TopicPartition, + notify: Arc, +} + +impl PartitionStreamReceiver { + fn new( + receiver: mpsc::Receiver, + topic_partition: TopicPartition, + notify: Arc, + ) -> Self { + Self { + inner: ReceiverStream::new(receiver), + topic_partition, + notify, + } + } + + /// Processes the stream with a provided callback and listens for termination. + /// + /// # Parameters + /// - `invoke`: A callback function that processes the `ReceiverStream`. + /// + /// # Behavior + /// - The callback runs until either the stream is completed or a termination signal is received. + pub async fn run_drain(self, f: F) + where + F: Fn(ReceiverStream) -> Fut, + Fut: futures_util::Future, + { + let notify = self.notify.clone(); + + tokio::select! { + _ = f(self.inner) => { + info!("PartitionStreamReceiver completed processing for {:?}.", self.topic_partition); + } + _ = notify.notified() => { + info!("Received termination signal for {:?}.", self.topic_partition); + } + } + } + + pub fn topic_partition(&self) -> &TopicPartition { + &self.topic_partition + } +} + +pub fn bounded( + size: usize, + topic_partition: TopicPartition, +) -> (PartitionStreamSender, PartitionStreamReceiver) { + let (tx, rx) = mpsc::channel(size); + let notify = Arc::new(Notify::new()); + + let sender = PartitionStreamSender::new(tx, notify.clone()); + let receiver = PartitionStreamReceiver::new(rx, topic_partition, notify); + + (sender, receiver) +} diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs new file mode 100644 index 000000000..129ff7cd9 --- /dev/null +++ b/src/connectors/kafka/processor.rs @@ -0,0 +1,154 @@ +use crate::connectors::common::processor::Processor; +use crate::connectors::kafka::{ConsumerRecord, StreamConsumer, TopicPartition}; +use crate::event::format; +use crate::event::format::EventFormat; +use crate::handlers::http::ingest::create_stream_if_not_exists; +use crate::metadata::STREAM_INFO; +use crate::storage::StreamType; +use async_trait::async_trait; +use chrono::Utc; +use futures_util::StreamExt; +use rdkafka::consumer::{CommitMode, Consumer}; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{debug, warn}; +use tracing::{error, info}; + +#[derive(Default, Debug, Clone)] +pub struct ParseableSinkProcessor; + +impl ParseableSinkProcessor { + async fn deserialize( + &self, + consumer_record: &ConsumerRecord, + ) -> anyhow::Result> { + let stream_name = consumer_record.topic.as_str(); + + create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + let schema = STREAM_INFO.schema_raw(stream_name)?; + + match &consumer_record.payload { + None => { + warn!( + "Skipping tombstone or empty payload in partition {} key {}", + consumer_record.partition, + consumer_record.key_str() + ); + Ok(None) + } + Some(payload) => { + let data: Value = serde_json::from_slice(payload.as_ref())?; + + let event = format::json::Event { + data, + tags: String::default(), + metadata: String::default(), + }; + + // TODO: Implement a buffer (e.g., a wrapper around [Box]) to optimize the creation of ParseableEvent by compacting the internal RecordBatch. + let (record_batch, is_first) = event.into_recordbatch(schema, None, None)?; + + let p_event = crate::event::Event { + rb: record_batch, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: payload.len() as u64, + is_first_event: is_first, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, + }; + + Ok(Some(p_event)) + } + } + } +} + +#[async_trait] +impl Processor, ()> for ParseableSinkProcessor { + async fn process(&self, records: Vec) -> anyhow::Result<()> { + let len = records.len(); + debug!("Processing {} records", len); + + for cr in records { + if let Some(event) = self.deserialize(&cr).await? { + event.process().await?; + } + } + + debug!("Processed {} records", len); + Ok(()) + } +} + +#[derive(Clone)] +pub struct StreamWorker

+where + P: Processor, ()>, +{ + processor: Arc

, + consumer: Arc, + buffer_size: usize, + buffer_timeout: Duration, +} + +impl

StreamWorker

+where + P: Processor, ()> + Send + Sync + 'static, +{ + pub fn new( + processor: Arc

, + consumer: Arc, + buffer_size: usize, + buffer_timeout: Duration, + ) -> Self { + Self { + processor, + consumer, + buffer_size, + buffer_timeout, + } + } + + pub async fn process_partition( + &self, + tp: TopicPartition, + record_stream: ReceiverStream, + ) -> anyhow::Result<()> { + info!("Started processing stream for {:?}", tp); + let chunked_stream = tokio_stream::StreamExt::chunks_timeout( + record_stream, + self.buffer_size, + self.buffer_timeout, + ); + + chunked_stream + .for_each_concurrent(None, |records| async { + if let Some(last_record) = records.iter().max_by_key(|r| r.offset) { + let tpl = last_record.offset_to_commit().unwrap(); + + if let Err(e) = self.processor.process(records).await { + error!("Failed to process records for {:?}: {:?}", tp, e); + } + + //CommitMode::Async race condition. + //@see https://github.com/confluentinc/librdkafka/issues/4534 + //@see https://github.com/confluentinc/librdkafka/issues/4059 + if let Err(e) = self.consumer.commit(&tpl, CommitMode::Sync) { + error!("Failed to commit offsets for {:?}: {:?}", tp, e); + } + } + }) + .await; + + info!("Finished processing stream for {:?}", tp); + self.processor.post_stream().await?; + + Ok(()) + } +} diff --git a/src/connectors/kafka/rebalance_listener.rs b/src/connectors/kafka/rebalance_listener.rs new file mode 100644 index 000000000..f35a4062f --- /dev/null +++ b/src/connectors/kafka/rebalance_listener.rs @@ -0,0 +1,65 @@ +use crate::connectors::common::shutdown::Shutdown; +use crate::connectors::kafka::state::StreamState; +use crate::connectors::kafka::RebalanceEvent; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio::{runtime::Handle, sync::mpsc::Receiver}; +use tracing::{info, warn}; + +pub struct RebalanceListener { + rebalance_rx: Receiver, + stream_state: Arc>, + shutdown_handle: Shutdown, +} + +impl RebalanceListener { + pub fn new( + rebalance_rx: Receiver, + stream_state: Arc>, + shutdown_handle: Shutdown, + ) -> Self { + Self { + rebalance_rx, + stream_state, + shutdown_handle, + } + } + + pub fn start(self) { + let mut rebalance_receiver = self.rebalance_rx; + let stream_state = self.stream_state.clone(); + let shutdown_handle = self.shutdown_handle.clone(); + let tokio_runtime_handle = Handle::current(); + + std::thread::spawn(move || { + tokio_runtime_handle.block_on(async move { + loop { + tokio::select! { + rebalance = rebalance_receiver.recv() => { + match rebalance { + Some(RebalanceEvent::Assign(tpl)) => info!("RebalanceEvent Assign: {:?}", tpl), + Some(RebalanceEvent::Revoke(tpl, callback)) => { + info!("RebalanceEvent Revoke: {:?}", tpl); + let mut stream_state = stream_state.write().await; + stream_state.terminate_partition_streams(tpl).await; + if let Err(err) = callback.send(()) { + warn!("Error during sending response to context. Cause: {:?}", err); + } + info!("Finished Rebalance Revoke"); + } + None => { + info!("Rebalance event sender is closed!"); + break + } + } + }, + _ = shutdown_handle.recv() => { + info!("Gracefully stopping rebalance listener!"); + break; + }, + } + } + }) + }); + } +} diff --git a/src/connectors/kafka/sink.rs b/src/connectors/kafka/sink.rs new file mode 100644 index 000000000..e9a57e997 --- /dev/null +++ b/src/connectors/kafka/sink.rs @@ -0,0 +1,68 @@ +use crate::connectors::common::processor::Processor; +use crate::connectors::kafka::consumer::KafkaStreams; +use crate::connectors::kafka::processor::StreamWorker; +use crate::connectors::kafka::ConsumerRecord; +use anyhow::Result; +use futures_util::StreamExt; +use std::sync::Arc; +use tokio::time::Duration; +use tracing::error; + +pub struct KafkaSinkConnector

+where + P: Processor, ()>, +{ + kafka_streams: KafkaStreams, + worker: Arc>, +} + +impl

KafkaSinkConnector

+where + P: Processor, ()> + Send + Sync + 'static, +{ + pub fn new( + kafka_streams: KafkaStreams, + processor: P, + buffer_size: usize, + buffer_timeout: Duration, + ) -> Self { + let worker = Arc::new(StreamWorker::new( + Arc::new(processor), + kafka_streams.consumer(), + buffer_size, + buffer_timeout, + )); + + Self { + kafka_streams, + worker, + } + } + + pub async fn run(self) -> Result<()> { + self.kafka_streams + .partitioned() + .map(|partition_queue| { + let worker = Arc::clone(&self.worker); + let tp = partition_queue.topic_partition().clone(); + tokio::spawn(async move { + partition_queue + .run_drain(|record_stream| async { + worker + .process_partition(tp.clone(), record_stream) + .await + .unwrap(); + }) + .await + }) + }) + .for_each_concurrent(None, |task| async { + if let Err(e) = task.await { + error!("Task failed: {:?}", e); + } + }) + .await; + + Ok(()) + } +} diff --git a/src/connectors/kafka/state.rs b/src/connectors/kafka/state.rs new file mode 100644 index 000000000..471b4dd34 --- /dev/null +++ b/src/connectors/kafka/state.rs @@ -0,0 +1,50 @@ +use crate::connectors::kafka::partition_stream_queue::PartitionStreamSender; +use crate::connectors::kafka::{TopicPartition, TopicPartitionList}; +use std::collections::HashMap; +use tracing::info; + +pub struct StreamState { + partition_senders: HashMap, +} + +impl StreamState { + pub fn new(capacity: usize) -> Self { + Self { + partition_senders: HashMap::with_capacity(capacity), + } + } + + pub fn insert_partition_sender( + &mut self, + tp: TopicPartition, + sender: PartitionStreamSender, + ) -> Option { + self.partition_senders.insert(tp, sender) + } + + pub fn get_partition_sender(&self, tp: &TopicPartition) -> Option<&PartitionStreamSender> { + self.partition_senders.get(tp) + } + + pub async fn terminate_partition_streams(&mut self, tpl: TopicPartitionList) { + info!("Terminating streams: {:?}", tpl); + + for tp in tpl.tpl { + if let Some(sender) = self.partition_senders.remove(&tp) { + info!("Terminating stream for {:?}", tp); + drop(sender.sender()); + sender.terminate(); + info!("Waiting for stream to finish for {:?}", tp); + } else { + info!("Stream already completed for {:?}", tp); + } + } + + info!("All streams terminated!"); + } + + pub fn clear(&mut self) { + info!("Clearing all stream states..."); + self.partition_senders.clear(); + } +} diff --git a/src/connectors/mod.rs b/src/connectors/mod.rs new file mode 100644 index 000000000..6f31175de --- /dev/null +++ b/src/connectors/mod.rs @@ -0,0 +1,85 @@ +use crate::connectors::common::config::ConnectorConfig; +use crate::connectors::common::processor::Processor; +use crate::connectors::common::shutdown::Shutdown; +use crate::connectors::kafka::consumer::KafkaStreams; +use crate::connectors::kafka::metrics::KafkaConsumerMetricsCollector; +use crate::connectors::kafka::processor::ParseableSinkProcessor; +use crate::connectors::kafka::rebalance_listener::RebalanceListener; +use crate::connectors::kafka::sink::KafkaSinkConnector; +use crate::connectors::kafka::state::StreamState; +use crate::connectors::kafka::{ConsumerRecord, KafkaContext}; +use crate::metrics; +use crate::option::{Mode, CONFIG}; +use prometheus::Registry; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{info, warn}; + +pub mod common; +pub mod kafka; + +pub async fn init() -> anyhow::Result<()> { + if matches!(CONFIG.parseable.mode, Mode::Ingest | Mode::All) { + match CONFIG.parseable.connector_config.clone() { + Some(config) => { + let shutdown_handle = Shutdown::new(); + let prometheus = metrics::build_metrics_handler(); + let registry = prometheus.registry.clone(); + let processor = ParseableSinkProcessor; + + tokio::spawn({ + let shutdown_handle = shutdown_handle.clone(); + async move { + shutdown_handle.signal_listener().await; + info!("Connector received shutdown signal!"); + } + }); + + run_kafka2parseable(config, registry, processor, shutdown_handle).await? + } + None => { + warn!("Kafka connector configuration is missing. Skipping Kafka pipeline."); + } + } + } + + Ok(()) +} + +async fn run_kafka2parseable

( + config: ConnectorConfig, + registry: Registry, + processor: P, + shutdown_handle: Shutdown, +) -> anyhow::Result<()> +where + P: Processor, ()> + Send + Sync + 'static, +{ + let kafka_config = Arc::new(config.kafka_config.clone().unwrap_or_default()); + let (kafka_context, rebalance_rx) = KafkaContext::new(kafka_config); + + //TODO: fetch topics metadata from kafka then give dynamic value to StreamState + let stream_state = Arc::new(RwLock::new(StreamState::new(60))); + let rebalance_listener = RebalanceListener::new( + rebalance_rx, + Arc::clone(&stream_state), + shutdown_handle.clone(), + ); + + let kafka_streams = KafkaStreams::init(kafka_context, stream_state, shutdown_handle.clone())?; + + let stats = kafka_streams.statistics(); + registry.register(Box::new(KafkaConsumerMetricsCollector::new(stats)))?; + + let kafka_parseable_sink_connector = KafkaSinkConnector::new( + kafka_streams, + processor, + config.buffer_size, + config.buffer_timeout, + ); + + rebalance_listener.start(); + kafka_parseable_sink_connector.run().await?; + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 140c32dcc..a19b53365 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ pub mod analytics; pub mod banner; mod catalog; mod cli; +pub mod connectors; mod event; pub mod handlers; pub mod hottier; diff --git a/src/main.rs b/src/main.rs index f9f2e5993..7cfc085f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,18 +17,18 @@ */ use parseable::{ - banner, + banner, connectors, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; -use tracing_subscriber::EnvFilter; +use tracing::level_filters::LevelFilter; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{fmt, EnvFilter, Registry}; #[actix_web::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .init(); + init_logger(LevelFilter::DEBUG); // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { @@ -46,7 +46,27 @@ async fn main() -> anyhow::Result<()> { // keep metadata info in mem metadata.set_global(); - server.init().await?; + let parseable_server = server.init(); + let connectors_task = connectors::init(); + + tokio::try_join!(parseable_server, connectors_task)?; Ok(()) } + +pub fn init_logger(default_level: LevelFilter) { + let filter_layer = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new(default_level.to_string())); + + let fmt_layer = fmt::layer() + .with_thread_names(true) + .with_thread_ids(true) + .with_line_number(true) + .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()) + .compact(); + + Registry::default() + .with(filter_layer) + .with(fmt_layer) + .init(); +} diff --git a/src/metadata.rs b/src/metadata.rs index 5447ea796..341fd9aae 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -178,6 +178,20 @@ impl StreamInfo { }) } + pub fn schema_raw( + &self, + stream_name: &str, + ) -> Result>, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + + let schema = map + .get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.schema.clone())?; + + Ok(schema) + } + pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name)