Skip to content

Commit

Permalink
Issue 55: Resource-based provisioning model (#57)
Browse files Browse the repository at this point in the history
Issue 55: Added basic resource-based provisioning model to provision Pravega clusters.

Signed-off-by: Raúl Gracia <[email protected]>
  • Loading branch information
RaulGracia authored Feb 7, 2020
2 parents a9c8c49 + 3f0a4e0 commit 8eac53d
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 39 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ Pravega clusters.
We provide two scripts: one that collects the logs from the pods themselves, and another one that assumes the
existence of a logging service (i.e., FluentBit) to collect the logs from.

- Pravega provisioning helper: Script tool that help with the right-sizing and proper configuration of a Pravega
cluster.

For detailed documentation of each tool, we refer to their respective README files.
187 changes: 154 additions & 33 deletions pravega-provisioner/cluster_provisioner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
import math

from model.constants import Constants
from model.provisioning_logic import *
from performance.performance_profiles import BareMetalCluster

Expand Down Expand Up @@ -39,12 +41,15 @@ def get_int_input(msg, valid_values):
def get_vm_flavor():
"""
Asks the user for the resources that each VM in the cluster will have.
:return: Amount of CPU cores and GBs of RAM per VM.
:return: Amount of CPU cores, GBs of RAM and local drives (if applies) per VM.
"""
print("Please, introduce the resource information about the VMs used int cluster:")
vm_cpus = int(input("How many CPU cores has each VM/node?"))
vm_ram_gb = int(input("How much memory in GB has each VM/node?"))
return vm_cpus, vm_ram_gb
vm_local_drives = 0
if get_bool_input("Is your cluster using local drives?"):
vm_local_drives = int(input("How many local drives has each VM/node?"))
return vm_cpus, vm_ram_gb, vm_local_drives


def provision_for_availability():
Expand Down Expand Up @@ -78,32 +83,108 @@ def get_requested_resources(zk_servers, bk_servers, ss_servers, cc_servers):
return requested_cpus, requested_ram_gb


def main():
print("### Provisioning model for Pravega clusters ###")
def resource_based_provisioning(vms, vm_cpus, vm_ram_gb, vm_local_drives, zookeeper_servers, bookkeeper_servers, segment_stores, controllers):
# Ask to the user the number of node failures he/she wants to tolerate.
failures_to_tolerate = get_int_input("How many node failures you want to tolerate?", range(0, 100))
if failures_to_tolerate >= vms:
assert False, "You have not enough nodes to tolerate such amount of failures"
elif vms - failures_to_tolerate < Constants.min_bookkeeper_servers or vms - failures_to_tolerate < Constants.min_zookeeper_servers:
# If we are collocating more than 1 Zookeeper or Bookie per node, we need an additional instance. Otherwise, we
# lead into a situation like the following: 3 VM/nodes with 4 Bookie/Zookeeper servers. This means that there is
# one node with 2 instances, and if this node crashes, then we fall below the min number of replicas defined.
zookeeper_servers += 1
bookkeeper_servers += 1

# This sets the performance numbers of the Pravega services on a specific environment (e.g., bare metal, PKS).
performance_profile = BareMetalCluster()
# Given that we want to tolerate failures, we need to increase the minimum number of instances of each type defined
# with the number of failures we want to tolerate. This will be the baseline for the Pravega cluster.
zookeeper_servers += failures_to_tolerate
# We need an even number of Zookeeper instances.
if zookeeper_servers % 2 == 0:
zookeeper_servers += 1
bookkeeper_servers += failures_to_tolerate
segment_stores += failures_to_tolerate
controllers += failures_to_tolerate

# First, get the type of VMs/nodes that will be used in the cluster.
vm_cpus, vm_ram_gb = get_vm_flavor()
can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives,
zookeeper_servers, bookkeeper_servers, segment_stores, controllers)
# First, make sure that whatever initial number of instances can be allocated, otherwise just throw an error.
assert can_allocate_cluster, "Not even the minimal Pravega cluster can be allocated with the current resources. " + \
"Please, check the Constants file to see the resources requested per instance."

# Initialize the number of instances with the minimum defaults.
zookeeper_servers = Constants.min_zookeeper_servers
bookkeeper_servers = Constants.min_bookkeeper_servers
segment_stores = Constants.min_segment_stores
controllers = Constants.min_controllers
vms = 0
# Ask to the user whether this is a metadata-intensive workload or not.
metadata_heavy_workload = get_bool_input("Is the workload metadata-heavy (i.e., many clients, small transactions)?")

# Search for the maximum number of instances to saturate the cluster.
while can_allocate_cluster:

# Increase the number of Bookies first, if they keep a 1 to 1 relationship with Segment Stores.
if bookkeeper_servers <= segment_stores:
tentative_bookkeeper_servers = bookkeeper_servers + 1
can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives,
zookeeper_servers, tentative_bookkeeper_servers, segment_stores, controllers)
else: tentative_bookkeeper_servers = bookkeeper_servers

# Try to increase the number of Segment Stores if possible.
if can_allocate_cluster:
bookkeeper_servers = tentative_bookkeeper_servers
tentative_segment_stores = max(segment_stores, Constants.segment_stores_to_bookies_ratio(bookkeeper_servers))
can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives,
zookeeper_servers, bookkeeper_servers, tentative_segment_stores, controllers)
else: continue

# Try to increase the number of Controllers if possible.
if can_allocate_cluster:
segment_stores = tentative_segment_stores
tentative_controllers = max(controllers, Constants.controllers_to_segment_stores_ratio(segment_stores, metadata_heavy_workload))
can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives,
zookeeper_servers, bookkeeper_servers, segment_stores, tentative_controllers)
else: continue

# Try to increase the number of Zookeeper servers if possible.
if can_allocate_cluster:
controllers = tentative_controllers
tentative_zookeeper_servers = max(zookeeper_servers, Constants.zookeeper_to_bookies_ratio(bookkeeper_servers))
can_allocate_cluster, the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives,
zookeeper_servers, bookkeeper_servers, segment_stores, controllers)
else: continue

if can_allocate_cluster:
zookeeper_servers = tentative_zookeeper_servers

# Get the resources used for the last possible allocation
the_cluster = can_allocate_services_on_nodes(vms, vm_cpus, vm_ram_gb, vm_local_drives, zookeeper_servers,
bookkeeper_servers, segment_stores, controllers)[1]
print("Allocation of pods on nodes: ", the_cluster)
# Finally, we need to check how much memory is left in the nodes so we can share it across Segment Stores for cache.
# In the worst case, we will have [math.ceil(vms/segment_stores)] Segment Store instances on a single node. Also,
# in the worst case, this could be the node with the least available memory available. For this reason, the
# in-memory cache size for a Segment Store would be as follows:
min_vm_mem_available = min(mem for (cpu, mem, disks, processes_in_vm) in the_cluster)
max_segment_stores_per_vm = math.ceil(vms / segment_stores)
new_direct_memory = Constants.segment_store_direct_memory_in_gb + int(min_vm_mem_available/max_segment_stores_per_vm)
print("--------- Segment Store In-Memory Cache Size (Pravega +0.7) ---------")
print("Segment Store pod memory limit: ", Constants.segment_store_jvm_size_in_gb + new_direct_memory, "GB")
print("Segment Store JVM Size (-Xmx JVM Option) : ", Constants.segment_store_jvm_size_in_gb, "GB")
print("Segment Store direct memory (-DirectMemory JVM Option): ", new_direct_memory, "GB")
# We leave 1GB extra of direct memory for non-caching purposes
print("Segment Store cache size (pravegaservice.cacheMaxSize): ", (new_direct_memory - 1) * 1024 * 1024 * 1024,
" (", new_direct_memory - 1, "GB)")
print("Buffering time that Pravega tolerates with Tier 2 unavailable given a (well distributed) write workload:")
for w in range(100, 1000, 200):
print("- Write throughput: ", w, "(MBps) -> buffering time: ", ((segment_stores * (new_direct_memory - 1) * 1024) / w), " seconds")

# Add a warning if the number of Bookies is higher than the number of nodes, as Pravega may need to enable rack-
# aware placement in Bookkeeper so it tries to write to Bookies in different nodes (data availability).
if bookkeeper_servers > vms:
print("WARNING: To guarantee data availability and durability, consider enabling rack-aware placement in "
"Pravega to write to Bookkeeper.")

return zookeeper_servers, bookkeeper_servers, segment_stores, controllers

# Initialize values for calculation of scaling policy.
event_size = 0
write_events_per_second = 0
num_streams = 0

# Provision for data availability.
if get_bool_input("Do you want to provision redundant instances to tolerate failures?"):
# Calculate the number of instances of each type to tolerate the given number of failures.
zookeeper_servers, bookkeeper_servers, segment_stores, controllers = provision_for_availability()
vms = calc_min_vms_for_availability(zookeeper_servers, bookkeeper_servers, segment_stores, controllers)
def workload_based_provisioning(zookeeper_servers, bookkeeper_servers, segment_stores, controllers):
# This sets the performance numbers of the Pravega services on a specific environment (e.g., bare metal, PKS).
performance_profile = BareMetalCluster()

# Provision Pravega data plane for workload (Bookkeeper, Segment Store).
if get_bool_input("Do you want to right-size the Pravega 'data plane' based on the workload?"):
Expand Down Expand Up @@ -131,7 +212,7 @@ def main():
num_writers = int(input("How many Writers are expected to write data to Pravega?"))
num_readers = int(input("How many Readers are expected to read data from Pravega?"))
metadata_ops_per_second = performance_profile.writer_default_metadata_ops_per_second * num_writers + \
performance_profile.reader_default_metadata_ops_per_second * num_readers
performance_profile.reader_default_metadata_ops_per_second * num_readers
transaction_operations_per_second = 0
if get_bool_input("Are there clients executing other types of metadata operations? (e.g., REST, list streams, get Stream info)"):
num_clients_extra_metadata_ops = int(input("How many clients are executing extra metadata operations?"))
Expand All @@ -150,10 +231,50 @@ def main():
controllers = max(controllers, calc_controllers_for_workload(num_streams, heavy_operations_per_second,
metadata_ops_per_second, performance_profile))

# Estimate the amount of resources required to allocate all the service instances.
requested_cpus, requested_ram_gb = get_requested_resources(zookeeper_servers, bookkeeper_servers, segment_stores,
controllers)
vms = max(vms, calc_min_vms_for_resources(vm_ram_gb, vm_cpus, requested_ram_gb, requested_cpus))
return zookeeper_servers, bookkeeper_servers, segment_stores, controllers


def main():
print("### Provisioning model for Pravega clusters ###")

# First, get the type of VMs/nodes that will be used in the cluster.
vm_cpus, vm_ram_gb, vm_local_drives = get_vm_flavor()

# Initialize the number of instances with the minimum defaults.
zookeeper_servers = Constants.min_zookeeper_servers
bookkeeper_servers = Constants.min_bookkeeper_servers
segment_stores = Constants.min_segment_stores
controllers = Constants.min_controllers
vms = 0

# Initialize values for calculation of scaling policy.
event_size = 0
write_events_per_second = 0
num_streams = 0

provisioning_model = get_int_input("Do you want to provision a Pravega cluster based on 0) the resources available, "
"1) the input workload, 2) none?", [0, 1, 2])

if provisioning_model == 0:
vms = int(input("How many VMs/nodes do you want to devote for a Pravega cluster?"))
zookeeper_servers, bookkeeper_servers, segment_stores, controllers = resource_based_provisioning(vms, vm_cpus,
vm_ram_gb, vm_local_drives, zookeeper_servers, bookkeeper_servers, segment_stores, controllers)
elif provisioning_model > 0:
# Provision for data availability.
if get_bool_input("Do you want to provision redundant instances to tolerate failures?"):
# Calculate the number of instances of each type to tolerate the given number of failures.
zookeeper_servers, bookkeeper_servers, segment_stores, controllers = provision_for_availability()
vms = calc_min_vms_for_availability(zookeeper_servers, bookkeeper_servers, segment_stores, controllers)

if provisioning_model == 1:
zookeeper_servers, bookkeeper_servers, segment_stores, controllers = \
workload_based_provisioning(zookeeper_servers, bookkeeper_servers, segment_stores, controllers)
else:
print("No provisioning model selected")

# Get the number of VMs based on the necessary for the instances selected so far.
requested_cpus, requested_ram_gb = get_requested_resources(zookeeper_servers, bookkeeper_servers, segment_stores, controllers)
vms = max(vms, calc_min_vms_for_resources(vm_ram_gb, vm_cpus, requested_ram_gb, requested_cpus))

# Calculate the number of containers and buckets.
num_containers = Constants.segment_containers_per_segment_store * segment_stores
Expand All @@ -179,11 +300,11 @@ def main():

# Output the cluster estimation result.
print("--------- Cluster Provisioning ---------")
print("Minimum number of VMs required: ", vms)
print("Minimum number of Zookeeper servers required: ", zookeeper_servers)
print("Minimum number of Bookkeeper servers required: ", bookkeeper_servers)
print("Minimum number of Segment Stores servers required: ", segment_stores)
print("Minimum number of Controller servers required: ", controllers)
print("Number of VMs required: ", vms)
print("Number of Zookeeper servers required: ", zookeeper_servers)
print("Number of Bookkeeper servers required: ", bookkeeper_servers)
print("Number of Segment Stores servers required: ", segment_stores)
print("Number of Controller servers required: ", controllers)
print("--------- Cluster Config Params ---------")
print("Number of Segment Containers in config: ", num_containers)
print("Number of Stream Buckets in config: ", num_buckets)
Expand Down
65 changes: 59 additions & 6 deletions pravega-provisioner/model/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
import math


class Constants(object):
class Constants:
"""
Constant values and assumptions for the Pravega cluster provisioning model.
"""
Expand All @@ -22,13 +23,65 @@ class Constants(object):
# Recommended resources to be provisioned per Pravega cluster instance.
zk_server_ram_gb = 2
zk_server_cpus = 1
bookie_ram_gb = 4
bookie_cpus = 2
controller_ram_gb = 3
bookie_ram_gb = 16
bookie_cpus = 8
controller_ram_gb = 4
controller_cpus = 2
segment_store_ram_gb = 6
segment_store_cpus = 4
segment_store_ram_gb = 16
segment_store_cpus = 8

# Number of segment containers and buckets per Segment Store and Controller, respectively.
segment_containers_per_segment_store = 8
stream_buckets_per_controller = 4
segment_store_jvm_size_in_gb = 4
segment_store_direct_memory_in_gb = segment_store_ram_gb - segment_store_jvm_size_in_gb

# Number of local drives per Bookie:
# i) 1 drive means that Journal, Ledger and Index are located on the same drive.
# ii) 2 drives means that Journal is on one drive, whereas Ledger and Index are on the same drive.
# iii) 3 drives means that Journal, Ledger and Index are on independent disks.
# By default, we assume that at least each Bookie has 2 drives to separate Journal from Ledger/Index IOs.
drives_per_bookie = 2


@staticmethod
def zookeeper_to_bookies_ratio(bookies):
"""
Number of Zookeeper instances based on the number of Bookies in the system, as Bookkeeper is the service that
makes a more intensive use of Zookeeper managing metadata (more than the Controller). For large deployments
e.g., > 20 Bookies), we keep a ratio of Zookeeper servers to Bookies of 1:4.
"""
zk_instances = math.ceil(bookies / 4)
if zk_instances < Constants.min_zookeeper_servers:
return Constants.min_zookeeper_servers
elif zk_instances % 2 == 0:
# We should keep an odd number of Zookeeper servers.
return zk_instances + 1
else:
return zk_instances


@staticmethod
def segment_stores_to_bookies_ratio(bookies):
"""
In our benchmarks, we observe that we require the one Segment Store per Bookie to get full saturation of fast
drives. Therefore, we define a 1:1 ratio between Bookies and Segment Stores.
"""
return bookies


@staticmethod
def controllers_to_segment_stores_ratio(segment_stores, metadata_heavy_workload):
"""
Unless the workload at hand is very metadata intensive (i.e., many clients, very small transactions), we can
keep a 1:3 ratio between Controllers and Segment Stores. Otherwise, we can switch to a 1:2 ratio
"""
ratio = 3
if metadata_heavy_workload:
ratio = 2
controllers = segment_stores / ratio
if controllers < Constants.min_controllers:
return Constants.min_controllers
else:
return math.ceil(controllers)

Loading

0 comments on commit 8eac53d

Please sign in to comment.