Skip to content

Commit

Permalink
Optimize resource consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
visortelle committed Apr 24, 2024
1 parent 07feb54 commit ca24a2e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
106 changes: 59 additions & 47 deletions src/docker_compose/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,14 @@ pub fn generate_pulsar_proxy(
cluster_index: u32,
) -> Result<PulsarProxyOutput> {
let pulsar_version = instance_config.pulsar_version;
let zookeepers_per_cluster = instance_config.num_zookeepers;
let depends_on_zookeeper_template = (0..zookeepers_per_cluster)
.map(|i| format!("████████████zookeeper-{i}:\n████████████████condition: service_healthy"))
.collect::<Vec<String>>()
.join("\n");
let metadata_store_url = (0..zookeepers_per_cluster)

let depends_on_broker_template = format!("████████████broker-{cluster_name}-0:\n████████████████condition: service_healthy");

let metadata_store_url = (0..instance_config.num_zookeepers)
.map(|i| format!("zk:zookeeper-{i}:2181"))
.collect::<Vec<String>>()
.join(",");

let depends_on_brokers_template = (0..instance_config.num_brokers)
.map(|i| format!("████████████broker-{cluster_name}-{i}:\n████████████████condition: service_healthy"))
.collect::<Vec<String>>()
.join("\n");

let web_service_port = (cluster_index.to_string() + "8080").parse::<u32>().unwrap();
let web_service_url = format!("http://pulsar-proxy-{cluster_name}:8080");
let web_service_host_url = format!("http://localhost:{web_service_port}");
Expand All @@ -292,15 +285,19 @@ pub fn generate_pulsar_proxy(
████████████- clusterName={cluster_name}
████████████- metadataStoreUrl={metadata_store_url}
████████████- configurationMetadataStoreUrl={metadata_store_url}
████████████- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=128m
████████████- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=128m -XX:+ExitOnOutOfMemoryError
████████healthcheck:
████████████test: [\"CMD\", \"curl\", \"--fail\", \"http://127.0.0.1:8080/admin/v2/brokers/health\"]
████████████interval: 5s
████████████timeout: 5s
████████████retries: 30
████████depends_on:
{depends_on_zookeeper_template}
{depends_on_brokers_template}
{depends_on_broker_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '0.5'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
"};
Expand Down Expand Up @@ -345,14 +342,19 @@ pub fn generate_zookeeper_template(
████████restart: on-failure
████████command: bash -c \"bin/apply-config-from-env.py conf/zookeeper.conf && bin/apply-config-from-env.py conf/pulsar_env.sh && {append_zookeeper_servers} && {create_my_id_if_not_exists} && exec bin/pulsar zookeeper\"
████████environment:
████████████- PULSAR_MEM=\"-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m\"
████████████- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=128m -XX:+ExitOnOutOfMemoryError
████████healthcheck:
████████████test: [\"CMD\", \"bin/pulsar-zookeeper-ruok.sh\"]
████████████interval: 5s
████████████interval: 10s
████████████timeout: 5s
████████████retries: 10
████████volumes:
████████████- zookeeper-data-{zookeeper_index}:/pulsar/data
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '0.5'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
"}
Expand All @@ -364,7 +366,7 @@ pub fn generate_pulsar_init_job_template(
instance_name: String,
instance_config: InstanceConfig,
cluster_name: String,
cluster_index: u32,
cluster_index: u32
) -> String {
let pulsar_version = instance_config.pulsar_version;
let web_service_url = "http://broker-{cluster_name}:8080";
Expand All @@ -375,7 +377,7 @@ pub fn generate_pulsar_init_job_template(
.collect::<Vec<String>>()
.join("\n");

let depends_on_prev_cluster = if cluster_index == 0 {
let depends_on_prev_cluster_template = if cluster_index == 0 {
"".to_string()
} else {
let prev_cluster_name = format!("cluster-{}", cluster_index - 1);
Expand All @@ -389,10 +391,15 @@ pub fn generate_pulsar_init_job_template(
████████user: pulsar
████████command: bash -c \"bin/apply-config-from-env.py conf/pulsar_env.sh; bin/pulsar initialize-cluster-metadata --cluster {cluster_name} --metadata-store zk:zookeeper-0:2181/{cluster_name} --configuration-metadata-store zk:zookeeper-0:2181/{cluster_name} --web-service-url {web_service_url} --broker-service-url {broker_service_url}\"
████████environment:
████████████- PULSAR_MEM=\"-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m\"
████████████- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=128m -XX:+ExitOnOutOfMemoryError
████████depends_on:
{depends_on_zookeeper_template}
{depends_on_prev_cluster}
{depends_on_prev_cluster_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '0.5'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
"}.trim().to_string()
Expand Down Expand Up @@ -437,30 +444,24 @@ pub fn generate_post_cluster_create_job_template(
"{pulsar_admin} namespaces set-clusters --clusters {all_cluster_names} global/default"
);

let create_resources_script = format!("set +e; {register_clusters}; {create_cluster_tenant}; {create_cluster_namespace}; {create_global_tenant}; {create_global_namespace}; {set_global_namespace_clusters};");

let are_clusters_registered = (0..num_clusters)
.map(|cluster_index| format!("{pulsar_admin} clusters get cluster-{cluster_index}"))
.collect::<Vec<String>>()
.join("; ");
let is_cluster_tenant_created =
format!("{pulsar_admin} tenants get cluster-{cluster_index}-local");
let is_cluster_namespace_created =
format!("{pulsar_admin} namespaces get cluster-{cluster_index}-local/default");
let is_global_tenant_created = format!("{pulsar_admin} tenants get global");
let is_global_namespace_created = format!("{pulsar_admin} namespaces get global/default");

let check_resources_script = format!("set -e; {are_clusters_registered}; {is_cluster_tenant_created}; {is_cluster_namespace_created}; {is_global_tenant_created}; {is_global_namespace_created};");
let create_resources_script = format!("set -e; {register_clusters}; {create_cluster_tenant}; {create_cluster_namespace}; {create_global_tenant}; {create_global_namespace}; {set_global_namespace_clusters};");

format! {"
████# Register new cluster {cluster_name}
████pulsar-post-cluster-create-job-{cluster_name}:
████████image: apachepulsar/pulsar:{pulsar_version}
████████restart: on-failure
████████user: pulsar
████████command: bash -c \"{create_resources_script} {check_resources_script} echo success\"
████████command: bash -c \"{create_resources_script} echo success\"
████████environment:
████████████- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=128m -XX:+ExitOnOutOfMemoryError
{depends_on_proxy_template}
{depends_on_prev_cluster_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '0.5'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
"}
Expand All @@ -486,6 +487,7 @@ pub fn generate_dekaf(instance_name: String, cluster_index: u32) -> Result<Dekaf
████████████- DEKAF_PULSAR_WEB_URL=http://pulsar-proxy-cluster-{cluster_index}:8080
████████████- DEKAF_PULSAR_BROKER_URL=pulsar://pulsar-proxy-cluster-{cluster_index}:6650
████████████- DEKAF_PUBLIC_BASE_URL={dekaf_host_url}
████████████- JAVA_OPTS=-Xms128m -Xmx128m
████████ports:
████████████- {port}:8090
████████healthcheck:
Expand All @@ -495,6 +497,11 @@ pub fn generate_dekaf(instance_name: String, cluster_index: u32) -> Result<Dekaf
████████████retries: 20
████████depends_on:
{depends_on_pulsar_proxy_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '1'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
"}
Expand All @@ -516,19 +523,16 @@ pub fn generate_broker_template(
broker_index: u32,
) -> String {
let pulsar_version = instance_config.pulsar_version;
let managed_ledger_default_ensemble_size = min(instance_config.num_bookies, 3);
let managed_ledger_default_write_quorum = min(instance_config.num_bookies, 3);
let managed_ledger_default_ack_quorum = min(instance_config.num_bookies, 3);
let managed_ledger_default_ensemble_size = min(instance_config.num_bookies, 2);
let managed_ledger_default_write_quorum = min(instance_config.num_bookies, 2);
let managed_ledger_default_ack_quorum = min(instance_config.num_bookies, 2);
let zookeepers_per_cluster = instance_config.num_zookeepers;
let depends_on_zookeeper_template = (0..zookeepers_per_cluster)
.map(|i| format!("████████████zookeeper-{i}:\n████████████████condition: service_healthy"))
.collect::<Vec<String>>()
.join("\n");

let depends_on_bookies_template = (0..instance_config.num_bookies)
.map(|i| format!("████████████bookie-{cluster_name}-{i}:\n████████████████condition: service_healthy"))
.collect::<Vec<String>>()
.join("\n");
let depends_on_bookie_template = format!("████████████bookie-{cluster_name}-0:\n████████████████condition: service_healthy");

let metadata_store_url = (0..zookeepers_per_cluster)
.map(|i| format!("zk:zookeeper-{i}:2181"))
Expand All @@ -548,8 +552,7 @@ pub fn generate_broker_template(
████████████- managedLedgerDefaultEnsembleSize={managed_ledger_default_ensemble_size}
████████████- managedLedgerDefaultWriteQuorum={managed_ledger_default_write_quorum}
████████████- managedLedgerDefaultAckQuorum={managed_ledger_default_ack_quorum}
████████████- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
████████████- PULSAR_GC=-XX:+UseG1GC
████████████- PULSAR_MEM=-Xms128m -Xmx128m -XX:MaxDirectMemorySize=128m -XX:+ExitOnOutOfMemoryError
████████command: bash -c \"bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker\"
████████healthcheck:
████████████test: [\"CMD\", \"curl\", \"--fail\", \"http://127.0.0.1:8080/admin/v2/brokers/health\"]
Expand All @@ -558,7 +561,12 @@ pub fn generate_broker_template(
████████████retries: 20
████████depends_on:
{depends_on_zookeeper_template}
{depends_on_bookies_template}
{depends_on_bookie_template}
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 256M
████████networks:
████████████- pulsar-net-{instance_name}
"}
Expand Down Expand Up @@ -612,8 +620,7 @@ pub fn generate_bookie_template(
████████████- clusterName={cluster_name}
████████████- metadataServiceUri={metadata_service_uri}
████████████- useHostNameAsBookieID=true
████████████- BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
████████████- PULSAR_GC=-XX:+UseG1GC
████████████- BOOKIE_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m
████████████- dbStorage_writeCacheMaxSizeMb=16
████████████- dbStorage_readAheadCacheMaxSizeMb=16
████████depends_on:
Expand All @@ -630,6 +637,11 @@ pub fn generate_bookie_template(
████████████timeout: 30s
████████████retries: 30
████████████start_period: 60s
████████deploy:
████████████resources:
████████████████limits:
████████████████████cpus: '2'
████████████████████memory: 512M
████████networks:
████████████- pulsar-net-{instance_name}
"}.trim().to_string()
Expand Down
5 changes: 1 addition & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod docker_compose;
mod instance_config;

use anyhow::{anyhow, Error, Result};
use assert_cmd::output;
use clap::{Parser, Subcommand};
use dirs::home_dir;
use docker_compose::docker_compose::{generate_instance, InstanceOutput, PrintInfo};
Expand Down Expand Up @@ -77,7 +76,7 @@ pub struct StopCommandArgs {
}

#[derive(Parser, Clone, Debug)]
#[command(version, about, long_about = None, arg_required_else_help(true))]
#[command(version, about, long_about = None)]
pub struct TemplateCommandArgs {
pub instance_name: Option<String>,
}
Expand Down Expand Up @@ -876,8 +875,6 @@ fn main() -> Result<()> {

instance_output.print_info();

println!("Args {:?}", args);
println!("Intance {:?}", instance_output);
if !args.no_open_browser {
for cluster_output in instance_output.clusters {
if let Some(url) = cluster_output.dekaf_host_url.clone() {
Expand Down

0 comments on commit ca24a2e

Please sign in to comment.