Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate kafka-operator to use listener-operator for exposing brokers #443

Merged
merged 51 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
6c9c18f
Migrate kafka-operator to use lb-operator for exposing brokers
nightkr Aug 4, 2022
ae8ed3a
regenerated charts
maltesander Aug 29, 2022
f57097b
Update for lb-operator -> listener-operator rebrand
nightkr Sep 8, 2022
d963287
Merge branch 'spike/lb-operator' of github.com:stackabletech/kafka-op…
nightkr Sep 8, 2022
1f5a4e5
Merge branch 'main' into spike/lb-operator
siegfriedweber Oct 18, 2022
9c4b0b3
Merge branch 'main' into spike/lb-operator
siegfriedweber Oct 25, 2022
3a72373
Use ListenerOperatorVolumeSourceBuilder
siegfriedweber Oct 25, 2022
f35b23b
Use shortcut for creating the listener volume
siegfriedweber Oct 26, 2022
0b70333
Allow git source for operator-rs temporarily
siegfriedweber Oct 26, 2022
1d5749e
Merge remote-tracking branch 'origin/main' into spike/lb-operator
sbernauer Mar 1, 2023
e9d768a
Rename node_cmd -> node_address_cmd
sbernauer Mar 1, 2023
006b122
Remove operator-rs entry in deny.toml
sbernauer Mar 1, 2023
6d84c59
Merge remote-tracking branch 'origin/main' into spike/lb-operator
sbernauer Apr 25, 2023
df70f88
Merge branch 'main' into spike/lb-operator
nightkr Sep 12, 2023
6d47407
Merge branch 'main' into spike/lb-operator
nightkr Sep 4, 2024
3167720
Make listenerclass configurable
nightkr Sep 5, 2024
f357a06
Use operator-rs from git
nightkr Sep 5, 2024
f93d9ab
Use listener secret scope
nightkr Sep 6, 2024
cab56ff
Use listener for bootstrap and discovery
nightkr Sep 6, 2024
acb686e
Separate out kcat secret management now that it is the only PEM user
nightkr Sep 9, 2024
c91a041
Remove unused KafkaTlsSecurity::kafka
nightkr Sep 9, 2024
d960573
Regenerate CRD
nightkr Sep 9, 2024
25cca0e
Switch to main op-rs
nightkr Sep 9, 2024
c3018bc
Switch to per-rolegroup bootstrap listeners
nightkr Sep 11, 2024
bc44888
make crds
nightkr Sep 11, 2024
26d5922
CRD docs
nightkr Sep 11, 2024
f1b6c62
Delete old test files
nightkr Sep 11, 2024
a79b91a
Documentation
nightkr Sep 11, 2024
ad3fd98
Try to un-angry rustdoc
nightkr Sep 11, 2024
b62edf6
Re-enable orphan cleanup
nightkr Sep 11, 2024
a6343d4
Changelog
nightkr Sep 11, 2024
6a1a423
Fix configuration test
nightkr Sep 11, 2024
194d98d
Update docs/modules/kafka/examples/getting_started/kafka.yaml
nightkr Oct 1, 2024
b01c22f
Rename TLS volume variables for consistency
nightkr Oct 1, 2024
220bc4c
Rename volumes/paths too, correspondingly
nightkr Oct 1, 2024
2da65c6
Merge branch 'spike/lb-operator' of github.com:stackabletech/kafka-op…
nightkr Oct 1, 2024
df56f73
Remove stale FIXME
nightkr Oct 1, 2024
66db99c
Add RBAC permission to delete listeners (orphan clenaup)
nightkr Oct 1, 2024
362dc80
Make service_ports return ListenerPorts instead
nightkr Oct 1, 2024
fa9b8e2
Recommend using the same ListenerClass across rolegroups
nightkr Oct 1, 2024
88dde6b
Apply recommended labels to broker listeners
nightkr Oct 2, 2024
7aa5919
Apply the correct rolegroup to bootstrap listeners, too
nightkr Oct 2, 2024
534d88e
Apply the correct rolegroup label to the rolegroup configmaps
nightkr Oct 2, 2024
7dae34d
Apply recommended labels to the bootstrap listener PVC too
nightkr Oct 2, 2024
853d5f1
Merge branch 'main' into spike/lb-operator
nightkr Oct 2, 2024
2913081
Remove version from persistent PVCs
nightkr Oct 3, 2024
11a4814
Add note about upgrading to changelog
nightkr Oct 4, 2024
6e2d0d3
docs: Add note on what bootstrap listeners return
sbernauer Oct 7, 2024
c1c8c9f
Update futures to 0.3.31
nightkr Oct 7, 2024
f58c534
Drop unused futures compat feature
nightkr Oct 7, 2024
17e96e1
Reformat bootstrap rolegroup note slightly
nightkr Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1.39", features = ["full"] }
tracing = "0.1"

# [patch."https://github.com/stackabletech/operator-rs.git"]
# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
[patch."https://github.com/stackabletech/operator-rs.git"]
# stackable-operator = { path = "../operator-rs/crates/stackable-operator" }
stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
4 changes: 2 additions & 2 deletions crate-hashes.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions deploy/helm/kafka-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ spec:
nullable: true
type: object
x-kubernetes-preserve-unknown-fields: true
required:
- nodeAffinity
- podAffinity
- podAntiAffinity
type: object
brokerListenerClass:
nullable: true
type: string
gracefulShutdownTimeout:
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
nullable: true
Expand Down Expand Up @@ -333,11 +332,10 @@ spec:
nullable: true
type: object
x-kubernetes-preserve-unknown-fields: true
required:
- nodeAffinity
- podAffinity
- podAntiAffinity
type: object
brokerListenerClass:
nullable: true
type: string
gracefulShutdownTimeout:
description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
nullable: true
Expand Down Expand Up @@ -582,6 +580,9 @@ spec:
- configMapName
type: object
type: object
bootstrapListenerClass:
default: cluster-internal
type: string
tls:
default:
internalSecretClass: tls
Expand Down
8 changes: 1 addition & 7 deletions deploy/helm/kafka-operator/templates/roles-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,4 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "operator.fullname" . }}-kafka-broker-clusterrole
rules:
- apiGroups:
- ""
resources:
- services
verbs:
- get
rules: []
10 changes: 10 additions & 0 deletions deploy/helm/kafka-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ rules:
- get
- list
- watch
- apiGroups:
- listeners.stackable.tech
resources:
- listeners
verbs:
- get
- list
- watch
- patch
- create
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
17 changes: 16 additions & 1 deletion rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ pub const KAFKA_HEAP_OPTS: &str = "KAFKA_HEAP_OPTS";
// server_properties
pub const LOG_DIRS_VOLUME_NAME: &str = "log-dirs";
// directories
pub const STACKABLE_TMP_DIR: &str = "/stackable/tmp";
pub const LISTENER_BROKER_VOLUME_NAME: &str = "listener-broker";
pub const LISTENER_BOOTSTRAP_VOLUME_NAME: &str = "listener-bootstrap";
pub const STACKABLE_LISTENER_BROKER_DIR: &str = "/stackable/listener-broker";
pub const STACKABLE_LISTENER_BOOTSTRAP_DIR: &str = "/stackable/listener-bootstrap";
pub const STACKABLE_DATA_DIR: &str = "/stackable/data";
pub const STACKABLE_CONFIG_DIR: &str = "/stackable/config";
pub const STACKABLE_LOG_CONFIG_DIR: &str = "/stackable/log_config";
Expand Down Expand Up @@ -159,6 +162,15 @@ pub struct KafkaClusterConfig {
/// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/)
/// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource.
pub zookeeper_config_map_name: String,

#[serde(default = "KafkaClusterConfig::default_bootstrap_listener_class")]
pub bootstrap_listener_class: String,
}

impl KafkaClusterConfig {
fn default_bootstrap_listener_class() -> String {
"cluster-internal".to_string()
}
}

impl KafkaCluster {
Expand Down Expand Up @@ -405,6 +417,8 @@ pub struct KafkaConfig {
/// Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details.
#[fragment_attrs(serde(default))]
pub graceful_shutdown_timeout: Option<Duration>,

pub broker_listener_class: String,
}

impl KafkaConfig {
Expand All @@ -430,6 +444,7 @@ impl KafkaConfig {
},
affinity: get_affinity(cluster_name, role),
graceful_shutdown_timeout: Some(DEFAULT_BROKER_GRACEFUL_SHUTDOWN_TIMEOUT),
broker_listener_class: Some("cluster-internal".to_string()),
}
}
}
Expand Down
54 changes: 36 additions & 18 deletions rust/crd/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{KafkaCluster, STACKABLE_TMP_DIR};
use crate::{KafkaCluster, STACKABLE_LISTENER_BROKER_DIR};

use crate::security::KafkaTlsSecurity;
use snafu::{OptionExt, Snafu};
Expand All @@ -8,7 +8,6 @@ use std::fmt::{Display, Formatter};
use strum::{EnumDiscriminants, EnumString};

const LISTENER_LOCAL_ADDRESS: &str = "0.0.0.0";
const LISTENER_NODE_ADDRESS: &str = "$NODE";

#[derive(Snafu, Debug, EnumDiscriminants)]
pub enum KafkaListenerError {
Expand Down Expand Up @@ -106,8 +105,11 @@ pub fn get_kafka_listener_config(
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::ClientAuth,
host: LISTENER_NODE_ADDRESS.to_string(),
port: node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port: node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name(),
),
});
listener_security_protocol_map
.insert(KafkaListenerName::ClientAuth, KafkaListenerProtocol::Ssl);
Expand All @@ -120,8 +122,11 @@ pub fn get_kafka_listener_config(
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: LISTENER_NODE_ADDRESS.to_string(),
port: node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port: node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name(),
),
});
listener_security_protocol_map
.insert(KafkaListenerName::Client, KafkaListenerProtocol::Ssl);
Expand All @@ -134,8 +139,11 @@ pub fn get_kafka_listener_config(
});
advertised_listeners.push(KafkaListener {
name: KafkaListenerName::Client,
host: LISTENER_NODE_ADDRESS.to_string(),
port: node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
host: node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port: node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name(),
),
});
listener_security_protocol_map
.insert(KafkaListenerName::Client, KafkaListenerProtocol::Plaintext);
Expand Down Expand Up @@ -180,8 +188,12 @@ pub fn get_kafka_listener_config(
})
}

fn node_address_cmd(directory: &str) -> String {
format!("$(cat {directory}/default-address/address)")
}

fn node_port_cmd(directory: &str, port_name: &str) -> String {
format!("$(cat {directory}/{port_name}_nodeport)")
format!("$(cat {directory}/default-address/ports/{port_name})")
}

fn pod_fqdn(kafka: &KafkaCluster, object_name: &str) -> Result<String, KafkaListenerError> {
Expand Down Expand Up @@ -228,7 +240,6 @@ mod tests {
"#;
let kafka: KafkaCluster = serde_yaml::from_str(kafka_cluster).expect("illegal test input");
let kafka_security = KafkaTlsSecurity::new(
&kafka,
ResolvedAuthenticationClasses::new(vec![AuthenticationClass {
metadata: ObjectMetaBuilder::new().name("auth-class").build(),
spec: AuthenticationClassSpec {
Expand Down Expand Up @@ -260,8 +271,11 @@ mod tests {
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
name = KafkaListenerName::ClientAuth,
host = LISTENER_NODE_ADDRESS,
port = node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name).unwrap(),
internal_port = kafka_security.internal_port(),
Expand Down Expand Up @@ -295,7 +309,6 @@ mod tests {
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka_security = KafkaTlsSecurity::new(
&kafka,
ResolvedAuthenticationClasses::new(vec![]),
"tls".to_string(),
Some("tls".to_string()),
Expand All @@ -320,8 +333,11 @@ mod tests {
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
name = KafkaListenerName::Client,
host = LISTENER_NODE_ADDRESS,
port = node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name).unwrap(),
internal_port = kafka_security.internal_port(),
Expand Down Expand Up @@ -357,7 +373,6 @@ mod tests {
"#;
let kafka: KafkaCluster = serde_yaml::from_str(input).expect("illegal test input");
let kafka_security = KafkaTlsSecurity::new(
&kafka,
ResolvedAuthenticationClasses::new(vec![]),
"".to_string(),
None,
Expand All @@ -382,8 +397,11 @@ mod tests {
format!(
"{name}://{host}:{port},{internal_name}://{internal_host}:{internal_port}",
name = KafkaListenerName::Client,
host = LISTENER_NODE_ADDRESS,
port = node_port_cmd(STACKABLE_TMP_DIR, kafka_security.client_port_name()),
host = node_address_cmd(STACKABLE_LISTENER_BROKER_DIR),
port = node_port_cmd(
STACKABLE_LISTENER_BROKER_DIR,
kafka_security.client_port_name()
),
internal_name = KafkaListenerName::Internal,
internal_host = pod_fqdn(&kafka, object_name).unwrap(),
internal_port = kafka_security.internal_port(),
Expand Down
Loading
Loading