From a30d14ab698617e2e9c8b7845bc0a857e73e13c0 Mon Sep 17 00:00:00 2001 From: Kyle-Neale Date: Mon, 7 Oct 2024 12:44:50 -0400 Subject: [PATCH 1/4] Add option to disable stage metrics and stage_id tags --- spark/assets/configuration/spec.yaml | 14 ++++++++++++++ .../datadog_checks/spark/config_models/defaults.py | 8 ++++++++ .../datadog_checks/spark/config_models/instance.py | 2 ++ spark/datadog_checks/spark/data/conf.yaml.example | 10 ++++++++++ spark/datadog_checks/spark/spark.py | 10 +++++++--- 5 files changed, 41 insertions(+), 3 deletions(-) diff --git a/spark/assets/configuration/spec.yaml b/spark/assets/configuration/spec.yaml index 75a70db32510b..fa4cc8dbeb6ad 100644 --- a/spark/assets/configuration/spec.yaml +++ b/spark/assets/configuration/spec.yaml @@ -121,6 +121,20 @@ files: type: boolean display_default: false example: true + - name: disable_spark_job_stage_tags + description: | + Enable to stop submitting the tag `stage_id` for Spark jobs. + value: + type: boolean + display_default: false + example: true + - name: disable_spark_stage_metrics + description: | + Enable to stop collecting Spark stage metrics. + value: + type: boolean + display_default: false + example: true - template: instances/http overrides: auth_token.description: | diff --git a/spark/datadog_checks/spark/config_models/defaults.py b/spark/datadog_checks/spark/config_models/defaults.py index 564cd45b1722c..78e9f3d6de6b7 100644 --- a/spark/datadog_checks/spark/config_models/defaults.py +++ b/spark/datadog_checks/spark/config_models/defaults.py @@ -32,6 +32,14 @@ def instance_disable_legacy_cluster_tag(): return False +def instance_disable_spark_job_stage_tags(): + return False + + +def instance_disable_spark_stage_metrics(): + return False + + def instance_empty_default_hostname(): return False diff --git a/spark/datadog_checks/spark/config_models/instance.py b/spark/datadog_checks/spark/config_models/instance.py index 7ce704b0707df..6f199124f0428 100644 --- a/spark/datadog_checks/spark/config_models/instance.py +++ b/spark/datadog_checks/spark/config_models/instance.py @@ -64,6 +64,8 @@ class InstanceConfig(BaseModel): connect_timeout: Optional[float] = None disable_generic_tags: Optional[bool] = None disable_legacy_cluster_tag: Optional[bool] = None + disable_spark_job_stage_tags: Optional[bool] = None + disable_spark_stage_metrics: Optional[bool] = None empty_default_hostname: Optional[bool] = None enable_query_name_tag: Optional[bool] = None executor_level_metrics: Optional[bool] = None diff --git a/spark/datadog_checks/spark/data/conf.yaml.example b/spark/datadog_checks/spark/data/conf.yaml.example index c6d0c20093669..dcb6fbe15b266 100644 --- a/spark/datadog_checks/spark/data/conf.yaml.example +++ b/spark/datadog_checks/spark/data/conf.yaml.example @@ -145,6 +145,16 @@ instances: # # enable_query_name_tag: true + ## @param disable_spark_job_stage_tags - boolean - optional - default: false + ## Enable to stop submitting the tag `stage_id` for Spark jobs. + # + # disable_spark_job_stage_tags: true + + ## @param disable_spark_stage_metrics - boolean - optional - default: false + ## Enable to stop collecting Spark stage metrics. + # + # disable_spark_stage_metrics: true + ## @param proxy - mapping - optional ## This overrides the `proxy` setting in `init_config`. ## diff --git a/spark/datadog_checks/spark/spark.py b/spark/datadog_checks/spark/spark.py index cce92c87b2e41..124d13bbc55b0 100644 --- a/spark/datadog_checks/spark/spark.py +++ b/spark/datadog_checks/spark/spark.py @@ -73,6 +73,8 @@ def __init__(self, name, init_config, instances): self.metricsservlet_path = self.instance.get('metricsservlet_path', '/metrics/json') self._enable_query_name_tag = is_affirmative(self.instance.get('enable_query_name_tag', False)) + self._disable_spark_job_stage_tags = is_affirmative(self.instance.get('disable_spark_job_stage_tags', False)) + self._disable_spark_stage_metrics = is_affirmative(self.instance.get('disable_spark_stage_metrics', False)) # Get the cluster name from the instance configuration self.cluster_name = self.instance.get('cluster_name') @@ -97,8 +99,9 @@ def check(self, _): # Get the job metrics self._spark_job_metrics(spark_apps, tags) + if not self._disable_spark_stage_metrics: # Get the stage metrics - self._spark_stage_metrics(spark_apps, tags) + self._spark_stage_metrics(spark_apps, tags) # Get the executor metrics self._spark_executor_metrics(spark_apps, tags) @@ -412,8 +415,9 @@ def _spark_job_metrics(self, running_apps, addl_tags): if job_id is not None: tags.append('job_id:{}'.format(job_id)) - for stage_id in job.get('stageIds', []): - tags.append('stage_id:{}'.format(stage_id)) + if not self._disable_spark_job_stage_tags: + for stage_id in job.get('stageIds', []): + tags.append('stage_id:{}'.format(stage_id)) self._set_metrics_from_json(tags, job, SPARK_JOB_METRICS) self._set_metric('spark.job.count', COUNT, 1, tags) From 820b38cbab824def2b71f350e06ee90982dc1040 Mon Sep 17 00:00:00 2001 From: Kyle-Neale Date: Mon, 7 Oct 2024 13:04:42 -0400 Subject: [PATCH 2/4] Add tests --- spark/tests/test_spark.py | 67 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/spark/tests/test_spark.py b/spark/tests/test_spark.py index 37b06d8906d3a..9610c01e397a6 100644 --- a/spark/tests/test_spark.py +++ b/spark/tests/test_spark.py @@ -361,6 +361,15 @@ def proxy_with_warning_page_mock(url, *args, **kwargs): 'executor_level_metrics': True, } +STANDALONE_CONFIG_STAGE_DISABLED = { + 'spark_url': 'http://localhost:8080', + 'cluster_name': CLUSTER_NAME, + 'spark_cluster_mode': 'spark_standalone_mode', + 'executor_level_metrics': True, + 'disable_spark_stage_metrics': True, + 'disable_spark_job_stage_tags': True, +} + STANDALONE_CONFIG_PRE_20 = { 'spark_url': 'http://localhost:8080', 'cluster_name': CLUSTER_NAME, @@ -412,6 +421,11 @@ def proxy_with_warning_page_mock(url, *args, **kwargs): 'stage_id:1', ] + COMMON_TAGS +SPARK_JOB_RUNNING_NO_STAGE_METRIC_TAGS = [ + 'status:running', + 'job_id:0', +] + COMMON_TAGS + SPARK_JOB_SUCCEEDED_METRIC_VALUES = { 'spark.job.count': 3, 'spark.job.num_tasks': 1000, @@ -432,6 +446,11 @@ def proxy_with_warning_page_mock(url, *args, **kwargs): 'stage_id:1', ] + COMMON_TAGS +SPARK_JOB_SUCCEEDED_NO_STAGE_METRIC_TAGS = [ + 'status:succeeded', + 'job_id:0', +] + COMMON_TAGS + SPARK_STAGE_RUNNING_METRIC_VALUES = { 'spark.stage.count': 3, 'spark.stage.num_active_tasks': 3 * 3, @@ -919,6 +938,54 @@ def test_standalone_unit(aggregator, dd_run_check): aggregator.assert_metrics_using_metadata(get_metadata_metrics()) +@pytest.mark.unit +def test_standalone_stage_disabled_unit(aggregator, dd_run_check): + with mock.patch('requests.get', standalone_requests_get_mock): + c = SparkCheck('spark', {}, [STANDALONE_CONFIG_STAGE_DISABLED]) + dd_run_check(c) + + _assert( + aggregator, + [ + # Check the running job metrics + (SPARK_JOB_RUNNING_METRIC_VALUES, SPARK_JOB_RUNNING_NO_STAGE_METRIC_TAGS), + # Check the running job metrics + (SPARK_JOB_RUNNING_METRIC_VALUES, SPARK_JOB_RUNNING_NO_STAGE_METRIC_TAGS), + # Check the succeeded job metrics + (SPARK_JOB_SUCCEEDED_METRIC_VALUES, SPARK_JOB_SUCCEEDED_NO_STAGE_METRIC_TAGS), + # Check the driver metrics + (SPARK_DRIVER_METRIC_VALUES, COMMON_TAGS), + # Check the optional driver metrics + (SPARK_DRIVER_OPTIONAL_METRIC_VALUES, COMMON_TAGS), + # Check the executor level metrics + (SPARK_EXECUTOR_LEVEL_METRIC_VALUES, SPARK_EXECUTOR_LEVEL_METRIC_TAGS), + # Check the optional executor level metrics + (SPARK_EXECUTOR_LEVEL_OPTIONAL_PROCESS_TREE_METRIC_VALUES, SPARK_EXECUTOR_LEVEL_METRIC_TAGS), + # Check the executor metrics + (SPARK_EXECUTOR_METRIC_VALUES, COMMON_TAGS), + # Check the optional summary executor metrics + (SPARK_EXECUTOR_OPTIONAL_METRIC_VALUES, COMMON_TAGS), + # Check the RDD metrics + (SPARK_RDD_METRIC_VALUES, COMMON_TAGS), + # Check the streaming statistics metrics + (SPARK_STREAMING_STATISTICS_METRIC_VALUES, COMMON_TAGS), + # Check the structured streaming metrics + (SPARK_STRUCTURED_STREAMING_METRIC_VALUES, COMMON_TAGS), + ], + ) + # Check the service tests + for sc in aggregator.service_checks(STANDALONE_SERVICE_CHECK): + assert sc.status == SparkCheck.OK + assert sc.tags == ['url:http://localhost:8080'] + CLUSTER_TAGS + for sc in aggregator.service_checks(SPARK_SERVICE_CHECK): + assert sc.status == SparkCheck.OK + assert sc.tags == ['url:http://localhost:4040'] + CLUSTER_TAGS + + # Assert coverage for this check on this instance + aggregator.assert_all_metrics_covered() + aggregator.assert_metrics_using_metadata(get_metadata_metrics()) + + @pytest.mark.unit def test_standalone_unit_with_proxy_warning_page(aggregator, dd_run_check): c = SparkCheck('spark', {}, [STANDALONE_CONFIG]) From e0713d410e9f369d4db4372d35f6e08dcc5b5bd3 Mon Sep 17 00:00:00 2001 From: Kyle-Neale Date: Mon, 7 Oct 2024 13:08:37 -0400 Subject: [PATCH 3/4] Lint --- spark/datadog_checks/spark/spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/datadog_checks/spark/spark.py b/spark/datadog_checks/spark/spark.py index 124d13bbc55b0..49270e6af450c 100644 --- a/spark/datadog_checks/spark/spark.py +++ b/spark/datadog_checks/spark/spark.py @@ -100,7 +100,7 @@ def check(self, _): self._spark_job_metrics(spark_apps, tags) if not self._disable_spark_stage_metrics: - # Get the stage metrics + # Get the stage metrics self._spark_stage_metrics(spark_apps, tags) # Get the executor metrics From f6bb4321c63d3b1eb9910f39734e05029a4a2e5d Mon Sep 17 00:00:00 2001 From: Kyle-Neale Date: Mon, 7 Oct 2024 13:32:51 -0400 Subject: [PATCH 4/4] Add changelog --- spark/changelog.d/18791.added | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 spark/changelog.d/18791.added diff --git a/spark/changelog.d/18791.added b/spark/changelog.d/18791.added new file mode 100644 index 0000000000000..40c6d0ae5fcc0 --- /dev/null +++ b/spark/changelog.d/18791.added @@ -0,0 +1,2 @@ +Add configuration option to disable `stage_id` tag on `spark.job` metrics +Add configuration option to disable `spark.stage` metrics \ No newline at end of file