diff --git a/README.md b/README.md index 254aa0c..759551a 100644 --- a/README.md +++ b/README.md @@ -17,4 +17,7 @@ Pravega clusters. We provide two scripts: one that collects the logs from the pods themselves, and another one that assumes the existence of a logging service (i.e., FluentBit) to collect the logs from. +- Pravega provisioning helper: Script tool that help with the right-sizing and proper configuration of a Pravega +cluster. + For detailed documentation of each tool, we refer to their respective README files. diff --git a/pravega-provisioner/cluster_provisioner.py b/pravega-provisioner/cluster_provisioner.py index a0bc10f..ef564c0 100644 --- a/pravega-provisioner/cluster_provisioner.py +++ b/pravega-provisioner/cluster_provisioner.py @@ -6,7 +6,9 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # +import math +from model.constants import Constants from model.provisioning_logic import * from performance.performance_profiles import BareMetalCluster @@ -39,12 +41,15 @@ def get_int_input(msg, valid_values): def get_vm_flavor(): """ Asks the user for the resources that each VM in the cluster will have. - :return: Amount of CPU cores and GBs of RAM per VM. + :return: Amount of CPU cores, GBs of RAM and local drives (if applies) per VM. """ print("Please, introduce the resource information about the VMs used int cluster:") vm_cpus = int(input("How many CPU cores has each VM/node?")) vm_ram_gb = int(input("How much memory in GB has each VM/node?")) - return vm_cpus, vm_ram_gb + vm_local_drives = 0 + if get_bool_input("Is your cluster using local drives?"): + vm_local_drives = int(input("How many local drives has each VM/node?")) + return vm_cpus, vm_ram_gb, vm_local_drives def provision_for_availability(): @@ -78,32 +83,108 @@ def get_requested_resources(zk_servers, bk_servers, ss_servers, cc_servers): return requested_cpus, requested_ram_gb -def main(): - print("### Provisioning model for Pravega clusters ###") +def resource_based_provisioning(vms, vm_cpus, vm_ram_gb, vm_local_drives, zookeeper_servers, bookkeeper_servers, segment_stores, controllers): + # Ask to the user the number of node failures he/she wants to tolerate. + failures_to_tolerate = get_int_input("How many node failures you want to tolerate?", range(0, 100)) + if failures_to_tolerate >= vms: + assert False, "You have not enough nodes to tolerate such amount of failures" + elif vms - failures_to_tolerate < Constants.min_bookkeeper_servers or vms - failures_to_tolerate < Constants.min_zookeeper_servers: + # If we are collocating more than 1 Zookeeper or Bookie per node, we need an additional instance. Otherwise, we + # lead into a situation like the following: 3 VM/nodes with 4 Bookie/Zookeeper servers. This means that there is + # one node with 2 instances, and if this node crashes, then we fall below the min number of replicas defined. + zookeeper_servers += 1 + bookkeeper_servers += 1 - # This sets the performance numbers of the Pravega services on a specific environment (e.g., bare metal, PKS). - performance_profile = BareMetalCluster() + # Given that we want to tolerate failures, we need to increase the minimum number of instances of each type defined + # with the number of failures we want to tolerate. This will be the baseline for the Pravega cluster. + zookeeper_servers += failures_to_tolerate + # We need an even number of Zookeeper instances. + if zookeeper_servers % 2 == 0: + zookeeper_servers += 1 + bookkeeper_servers += failures_to_tolerate + segment_stores += failures_to_tolerate + controllers += failures_to_tolerate - # First, get the type of VMs/nodes that will be used in the cluster. - vm_cpus, vm_ram_gb = get_vm_flavor() + can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, + zookeeper_servers, bookkeeper_servers, segment_stores, controllers) + # First, make sure that whatever initial number of instances can be allocated, otherwise just throw an error. + assert can_allocate_cluster, "Not even the minimal Pravega cluster can be allocated with the current resources. " + \ + "Please, check the Constants file to see the resources requested per instance." - # Initialize the number of instances with the minimum defaults. - zookeeper_servers = Constants.min_zookeeper_servers - bookkeeper_servers = Constants.min_bookkeeper_servers - segment_stores = Constants.min_segment_stores - controllers = Constants.min_controllers - vms = 0 + # Ask to the user whether this is a metadata-intensive workload or not. + metadata_heavy_workload = get_bool_input("Is the workload metadata-heavy (i.e., many clients, small transactions)?") + + # Search for the maximum number of instances to saturate the cluster. + while can_allocate_cluster: + + # Increase the number of Bookies first, if they keep a 1 to 1 relationship with Segment Stores. + if bookkeeper_servers <= segment_stores: + tentative_bookkeeper_servers = bookkeeper_servers + 1 + can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, + zookeeper_servers, tentative_bookkeeper_servers, segment_stores, controllers) + else: tentative_bookkeeper_servers = bookkeeper_servers + + # Try to increase the number of Segment Stores if possible. + if can_allocate_cluster: + bookkeeper_servers = tentative_bookkeeper_servers + tentative_segment_stores = max(segment_stores, Constants.segment_stores_to_bookies_ratio(bookkeeper_servers)) + can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, + zookeeper_servers, bookkeeper_servers, tentative_segment_stores, controllers) + else: continue + + # Try to increase the number of Controllers if possible. + if can_allocate_cluster: + segment_stores = tentative_segment_stores + tentative_controllers = max(controllers, Constants.controllers_to_segment_stores_ratio(segment_stores, metadata_heavy_workload)) + can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, + zookeeper_servers, bookkeeper_servers, segment_stores, tentative_controllers) + else: continue + + # Try to increase the number of Zookeeper servers if possible. + if can_allocate_cluster: + controllers = tentative_controllers + tentative_zookeeper_servers = max(zookeeper_servers, Constants.zookeeper_to_bookies_ratio(bookkeeper_servers)) + can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, + zookeeper_servers, bookkeeper_servers, segment_stores, controllers) + else: continue + + if can_allocate_cluster: + zookeeper_servers = tentative_zookeeper_servers + + # Get the resources used for the last possible allocation + the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, zookeeper_servers, + bookkeeper_servers, segment_stores, controllers)[1] + print("Allocation of pods on nodes: ", the_cluster) + # Finally, we need to check how much memory is left in the nodes so we can share it across Segment Stores for cache. + # In the worst case, we will have [math.ceil(vms/segment_stores)] Segment Store instances on a single node. Also, + # in the worst case, this could be the node with the least available memory available. For this reason, the + # in-memory cache size for a Segment Store would be as follows: + min_vm_mem_available = min(mem for (cpu, mem, disks, processes_in_vm) in the_cluster) + max_segment_stores_per_vm = math.ceil(vms / segment_stores) + new_direct_memory = Constants.segment_store_direct_memory_in_gb + int(min_vm_mem_available/max_segment_stores_per_vm) + print("--------- Segment Store In-Memory Cache Size (Pravega +0.7) ---------") + print("Segment Store pod memory limit: ", Constants.segment_store_jvm_size_in_gb + new_direct_memory, "GB") + print("Segment Store JVM Size (-Xmx JVM Option) : ", Constants.segment_store_jvm_size_in_gb, "GB") + print("Segment Store direct memory (-DirectMemory JVM Option): ", new_direct_memory, "GB") + # We leave 1GB extra of direct memory for non-caching purposes + print("Segment Store cache size (pravegaservice.cacheMaxSize): ", (new_direct_memory - 1) * 1024 * 1024 * 1024, + " (", new_direct_memory - 1, "GB)") + print("Buffering time that Pravega tolerates with Tier 2 unavailable given a (well distributed) write workload:") + for w in range(100, 1000, 200): + print("- Write throughput: ", w, "(MBps) -> buffering time: ", ((segment_stores * (new_direct_memory - 1) * 1024) / w), " seconds") + + # Add a warning if the number of Bookies is higher than the number of nodes, as Pravega may need to enable rack- + # aware placement in Bookkeeper so it tries to write to Bookies in different nodes (data availability). + if bookkeeper_servers > vms: + print("WARNING: To guarantee data availability and durability, consider enabling rack-aware placement in " + "Pravega to write to Bookkeeper.") + + return zookeeper_servers, bookkeeper_servers, segment_stores, controllers - # Initialize values for calculation of scaling policy. - event_size = 0 - write_events_per_second = 0 - num_streams = 0 - # Provision for data availability. - if get_bool_input("Do you want to provision redundant instances to tolerate failures?"): - # Calculate the number of instances of each type to tolerate the given number of failures. - zookeeper_servers, bookkeeper_servers, segment_stores, controllers = provision_for_availability() - vms = calc_min_vms_for_availability(zookeeper_servers, bookkeeper_servers, segment_stores, controllers) +def workload_based_provisioning(zookeeper_servers, bookkeeper_servers, segment_stores, controllers): + # This sets the performance numbers of the Pravega services on a specific environment (e.g., bare metal, PKS). + performance_profile = BareMetalCluster() # Provision Pravega data plane for workload (Bookkeeper, Segment Store). if get_bool_input("Do you want to right-size the Pravega 'data plane' based on the workload?"): @@ -131,7 +212,7 @@ def main(): num_writers = int(input("How many Writers are expected to write data to Pravega?")) num_readers = int(input("How many Readers are expected to read data from Pravega?")) metadata_ops_per_second = performance_profile.writer_default_metadata_ops_per_second * num_writers + \ - performance_profile.reader_default_metadata_ops_per_second * num_readers + performance_profile.reader_default_metadata_ops_per_second * num_readers transaction_operations_per_second = 0 if get_bool_input("Are there clients executing other types of metadata operations? (e.g., REST, list streams, get Stream info)"): num_clients_extra_metadata_ops = int(input("How many clients are executing extra metadata operations?")) @@ -150,10 +231,50 @@ def main(): controllers = max(controllers, calc_controllers_for_workload(num_streams, heavy_operations_per_second, metadata_ops_per_second, performance_profile)) - # Estimate the amount of resources required to allocate all the service instances. - requested_cpus, requested_ram_gb = get_requested_resources(zookeeper_servers, bookkeeper_servers, segment_stores, - controllers) - vms = max(vms, calc_min_vms_for_resources(vm_ram_gb, vm_cpus, requested_ram_gb, requested_cpus)) + return zookeeper_servers, bookkeeper_servers, segment_stores, controllers + + +def main(): + print("### Provisioning model for Pravega clusters ###") + + # First, get the type of VMs/nodes that will be used in the cluster. + vm_cpus, vm_ram_gb, vm_local_drives = get_vm_flavor() + + # Initialize the number of instances with the minimum defaults. + zookeeper_servers = Constants.min_zookeeper_servers + bookkeeper_servers = Constants.min_bookkeeper_servers + segment_stores = Constants.min_segment_stores + controllers = Constants.min_controllers + vms = 0 + + # Initialize values for calculation of scaling policy. + event_size = 0 + write_events_per_second = 0 + num_streams = 0 + + provisioning_model = get_int_input("Do you want to provision a Pravega cluster based on 0) the resources available, " + "1) the input workload, 2) none?", [0, 1, 2]) + + if provisioning_model == 0: + vms = int(input("How many VMs/nodes do you want to devote for a Pravega cluster?")) + zookeeper_servers, bookkeeper_servers, segment_stores, controllers = resource_based_provisioning(vms, vm_cpus, + vm_ram_gb, vm_local_drives, zookeeper_servers, bookkeeper_servers, segment_stores, controllers) + elif provisioning_model > 0: + # Provision for data availability. + if get_bool_input("Do you want to provision redundant instances to tolerate failures?"): + # Calculate the number of instances of each type to tolerate the given number of failures. + zookeeper_servers, bookkeeper_servers, segment_stores, controllers = provision_for_availability() + vms = calc_min_vms_for_availability(zookeeper_servers, bookkeeper_servers, segment_stores, controllers) + + if provisioning_model == 1: + zookeeper_servers, bookkeeper_servers, segment_stores, controllers = \ + workload_based_provisioning(zookeeper_servers, bookkeeper_servers, segment_stores, controllers) + else: + print("No provisioning model selected") + + # Get the number of VMs based on the necessary for the instances selected so far. + requested_cpus, requested_ram_gb = get_requested_resources(zookeeper_servers, bookkeeper_servers, segment_stores, controllers) + vms = max(vms, calc_min_vms_for_resources(vm_ram_gb, vm_cpus, requested_ram_gb, requested_cpus)) # Calculate the number of containers and buckets. num_containers = Constants.segment_containers_per_segment_store * segment_stores @@ -179,11 +300,11 @@ def main(): # Output the cluster estimation result. print("--------- Cluster Provisioning ---------") - print("Minimum number of VMs required: ", vms) - print("Minimum number of Zookeeper servers required: ", zookeeper_servers) - print("Minimum number of Bookkeeper servers required: ", bookkeeper_servers) - print("Minimum number of Segment Stores servers required: ", segment_stores) - print("Minimum number of Controller servers required: ", controllers) + print("Number of VMs required: ", vms) + print("Number of Zookeeper servers required: ", zookeeper_servers) + print("Number of Bookkeeper servers required: ", bookkeeper_servers) + print("Number of Segment Stores servers required: ", segment_stores) + print("Number of Controller servers required: ", controllers) print("--------- Cluster Config Params ---------") print("Number of Segment Containers in config: ", num_containers) print("Number of Stream Buckets in config: ", num_buckets) diff --git a/pravega-provisioner/model/constants.py b/pravega-provisioner/model/constants.py index ea75e2a..4bb6b65 100644 --- a/pravega-provisioner/model/constants.py +++ b/pravega-provisioner/model/constants.py @@ -6,9 +6,10 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # +import math -class Constants(object): +class Constants: """ Constant values and assumptions for the Pravega cluster provisioning model. """ @@ -22,13 +23,65 @@ class Constants(object): # Recommended resources to be provisioned per Pravega cluster instance. zk_server_ram_gb = 2 zk_server_cpus = 1 - bookie_ram_gb = 4 - bookie_cpus = 2 - controller_ram_gb = 3 + bookie_ram_gb = 16 + bookie_cpus = 8 + controller_ram_gb = 4 controller_cpus = 2 - segment_store_ram_gb = 6 - segment_store_cpus = 4 + segment_store_ram_gb = 16 + segment_store_cpus = 8 # Number of segment containers and buckets per Segment Store and Controller, respectively. segment_containers_per_segment_store = 8 stream_buckets_per_controller = 4 + segment_store_jvm_size_in_gb = 4 + segment_store_direct_memory_in_gb = segment_store_ram_gb - segment_store_jvm_size_in_gb + + # Number of local drives per Bookie: + # i) 1 drive means that Journal, Ledger and Index are located on the same drive. + # ii) 2 drives means that Journal is on one drive, whereas Ledger and Index are on the same drive. + # iii) 3 drives means that Journal, Ledger and Index are on independent disks. + # By default, we assume that at least each Bookie has 2 drives to separate Journal from Ledger/Index IOs. + drives_per_bookie = 2 + + + @staticmethod + def zookeeper_to_bookies_ratio(bookies): + """ + Number of Zookeeper instances based on the number of Bookies in the system, as Bookkeeper is the service that + makes a more intensive use of Zookeeper managing metadata (more than the Controller). For large deployments + e.g., > 20 Bookies), we keep a ratio of Zookeeper servers to Bookies of 1:4. + """ + zk_instances = math.ceil(bookies / 4) + if zk_instances < Constants.min_zookeeper_servers: + return Constants.min_zookeeper_servers + elif zk_instances % 2 == 0: + # We should keep an odd number of Zookeeper servers. + return zk_instances + 1 + else: + return zk_instances + + + @staticmethod + def segment_stores_to_bookies_ratio(bookies): + """ + In our benchmarks, we observe that we require the one Segment Store per Bookie to get full saturation of fast + drives. Therefore, we define a 1:1 ratio between Bookies and Segment Stores. + """ + return bookies + + + @staticmethod + def controllers_to_segment_stores_ratio(segment_stores, metadata_heavy_workload): + """ + Unless the workload at hand is very metadata intensive (i.e., many clients, very small transactions), we can + keep a 1:3 ratio between Controllers and Segment Stores. Otherwise, we can switch to a 1:2 ratio + """ + ratio = 3 + if metadata_heavy_workload: + ratio = 2 + controllers = segment_stores / ratio + if controllers < Constants.min_controllers: + return Constants.min_controllers + else: + return math.ceil(controllers) + diff --git a/pravega-provisioner/model/provisioning_logic.py b/pravega-provisioner/model/provisioning_logic.py index dbeebbb..afd57c3 100644 --- a/pravega-provisioner/model/provisioning_logic.py +++ b/pravega-provisioner/model/provisioning_logic.py @@ -151,3 +151,54 @@ def calc_controllers_for_workload(num_streams, heavy_operations_per_second, ligh heavy_operations_per_second / performance_profile.controller_max_heavy_operations_per_second + 1, light_operations_per_second / performance_profile.controller_max_light_operations_per_second + 1)) + +def can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, zookeeper_servers, bookkeeper_servers, segment_stores, controllers): + the_cluster = list() + # Initialize the cluster modelling the resources available (VM_CPUS, VM_RAM). + for i in range(vms): + the_cluster.append((vm_cpus, vm_ram_gb, vm_local_drives, [])) + + # Allocate the different instances in a round robin fashion across nodes to maximize failure tolerance. + # Start allocating Bookies + drives_per_bookie = 0 + if vm_local_drives > 0: + drives_per_bookie = Constants.drives_per_bookie + if not _subtract_resources_from_cluster(the_cluster, bookkeeper_servers, Constants.bookie_cpus, Constants.bookie_ram_gb, drives_per_bookie, "bk"): + return False, the_cluster + # Then, attempt to allocate Segment Stores + if not _subtract_resources_from_cluster(the_cluster, segment_stores, Constants.segment_store_cpus, Constants.segment_store_ram_gb, 0, "ss"): + return False, the_cluster + # Attempt to allocate Controllers + if not _subtract_resources_from_cluster(the_cluster, controllers, Constants.controller_cpus, Constants.controller_ram_gb, 0, "c"): + return False, the_cluster + # Attempt to allocate Zookeeper servers + if not _subtract_resources_from_cluster(the_cluster, zookeeper_servers, Constants.zk_server_cpus, Constants.zk_server_ram_gb, 0, "zk"): + return False, the_cluster + + # Allocation of all the instances has been possible. + return True, the_cluster + + +def _subtract_resources_from_cluster(the_cluster, num_instances, cpu_per_instance, ram_per_instance, drives_per_instance, node_type, enforce_load_balancing=True): + finish_allocation = False + completed_allocations = retries = 0 + while not finish_allocation: + for x in range(num_instances - completed_allocations): + [old_cpu, old_ram, old_drives, processes_in_vm] = the_cluster[x % len(the_cluster)] + # Only allocate when there are enough resources + if (old_cpu - cpu_per_instance) >= 0 and (old_ram - ram_per_instance) >= 0 and (old_drives - drives_per_instance) >= 0: + processes_in_vm.append(node_type) + the_cluster[x % len(the_cluster)] = (old_cpu - cpu_per_instance, old_ram - ram_per_instance, old_drives - drives_per_instance, processes_in_vm) + completed_allocations += 1 + # After allocating instances, sort nodes based on available resources. + the_cluster.sort(key=lambda tup: tup[0], reverse=True) + + if num_instances == completed_allocations: + finish_allocation = True + if retries > 0 and enforce_load_balancing or retries >= len(the_cluster): + return False + retries += 1 + + return True + +