Skip to content

Commit

Permalink
Merge pull request #2 from aws/master
Browse files Browse the repository at this point in the history
Adding changes for advanced spark configuration setup (#121)
  • Loading branch information
asumitamazon authored Jul 21, 2023
2 parents 627fdd5 + c07c70a commit d606585
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 14 deletions.
2 changes: 1 addition & 1 deletion new_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ new_images:
use-case: "processing"
processors: ["cpu"]
python: ["py39"]
sm_version: "1.1"
sm_version: "1.2"
2 changes: 1 addition & 1 deletion smsparkbuild/py39/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ click = "==8.1.2"
watchdog = "==0.10.3"
waitress = "==2.1.2"
types-waitress = "==2.0.6"
requests = "==2.27.1"
requests = "==2.31.0"
types-requests = "==2.27.16"
rsa = "==4.9"
pyasn1 = "==0.4.8"
Expand Down
55 changes: 43 additions & 12 deletions src/smspark/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,18 +388,49 @@ def get_yarn_spark_resource_config(
self, instance_count: int, instance_mem_mb: int, instance_cores: int
) -> Tuple[Configuration, Configuration]:
aws_region = os.getenv("AWS_REGION")
executor_cores = instance_cores
executor_count_per_instance = int(instance_cores / executor_cores)
executor_count_total = instance_count * executor_count_per_instance
default_parallelism = instance_count * instance_cores * 2

driver_mem_mb = int(instance_mem_mb * constants.DRIVER_MEM_INSTANCE_MEM_RATIO)
driver_mem_overhead_mb = int(driver_mem_mb * constants.DRIVER_MEM_OVERHEAD_RATIO)
executor_mem_mb = int(
((instance_mem_mb * constants.EXECUTOR_MEM_INSTANCE_MEM_RATIO) / executor_count_per_instance)
* (1 - constants.EXECUTOR_MEM_OVERHEAD_RATIO)
)
executor_mem_overhead_mb = int(executor_mem_mb * constants.EXECUTOR_MEM_OVERHEAD_RATIO)
spark_config_mode = int(os.getenv("AWS_SPARK_CONFIG_MODE", str(constants.AWS_SPARK_CONFIG_MODE_STANDARD)))

if spark_config_mode == constants.AWS_SPARK_CONFIG_MODE_STANDARD:
executor_cores = instance_cores
executor_count_per_instance = int(instance_cores / executor_cores)
executor_count_total = instance_count * executor_count_per_instance
default_parallelism = instance_count * instance_cores * 2

driver_mem_mb = int(instance_mem_mb * constants.DRIVER_MEM_INSTANCE_MEM_RATIO)
driver_mem_overhead_mb = int(driver_mem_mb * constants.DRIVER_MEM_OVERHEAD_RATIO)
executor_mem_mb = int(
((instance_mem_mb * constants.EXECUTOR_MEM_INSTANCE_MEM_RATIO) / executor_count_per_instance)
* (1 - constants.EXECUTOR_MEM_OVERHEAD_RATIO)
)
executor_mem_overhead_mb = int(executor_mem_mb * constants.EXECUTOR_MEM_OVERHEAD_RATIO)
elif spark_config_mode == constants.AWS_SPARK_CONFIG_MODE_ADVANCED:
# memory reduction (safer choice)
reduced_instance_mem_mb = int(instance_mem_mb * constants.SAFE_MEMORY_REDUCTION_RATIO)
# executor cores (set to 5 as constant)
executor_cores = constants.EXECUTOR_CORES
if executor_cores >= instance_cores:
executor_cores = instance_cores - 1
# executor count per instance, subtract 1 core from the instance cores to save for the Hadoop daemons
executor_count_per_instance = int((instance_cores - 1) / executor_cores)
# executor instances, leave 1 slot for the driver
executor_count_total = (instance_count * executor_count_per_instance) - 1
# default parallelism
default_parallelism = executor_count_total * executor_cores * 2
# total memory for one executor on the instance, leave 1GB for the Hadoop daemons
total_executor_memory = int((reduced_instance_mem_mb - constants.HADOOP_DAEMONS_MEM_MB) / executor_count_per_instance)
# executor memory MB (90% of the total executor mem)
executor_mem_mb = int(total_executor_memory * constants.EXECUTOR_MEM_INSTANCE_MEM_RATIO_ADV)
# executor memory overhead MB (10% of the total executor mem)
executor_mem_overhead_mb = int(total_executor_memory * constants.EXECUTOR_MEM_OVERHEAD_RATIO)
# setting driver memory as the executor memory
driver_mem_mb = executor_mem_mb
driver_mem_overhead_mb = executor_mem_overhead_mb
else:
raise ValueError(
"Could not determine Spark configuration mode: {}.".format(
spark_config_mode
)
)

driver_gc_config = (
"-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 "
Expand Down
7 changes: 7 additions & 0 deletions src/smspark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
DRIVER_MEM_OVERHEAD_RATIO = 0.1
EXECUTOR_MEM_INSTANCE_MEM_RATIO = 0.95
EXECUTOR_MEM_OVERHEAD_RATIO = 0.1

EXECUTOR_CORES = 5
HADOOP_DAEMONS_MEM_MB = 1024
SAFE_MEMORY_REDUCTION_RATIO = 0.95
EXECUTOR_MEM_INSTANCE_MEM_RATIO_ADV = 0.90
AWS_SPARK_CONFIG_MODE_STANDARD = 1
AWS_SPARK_CONFIG_MODE_ADVANCED = 2

0 comments on commit d606585

Please sign in to comment.