Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable stage metrics and stage_id tags #18791

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions spark/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ files:
type: boolean
display_default: false
example: true
- name: disable_spark_job_stage_tags
description: |
Enable to stop submitting the tag `stage_id` for Spark jobs.
value:
type: boolean
display_default: false
example: true
- name: disable_spark_stage_metrics
description: |
Enable to stop collecting Spark stage metrics.
value:
type: boolean
display_default: false
example: true
- template: instances/http
overrides:
auth_token.description: |
Expand Down
2 changes: 2 additions & 0 deletions spark/changelog.d/18791.added
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add configuration option to disable `stage_id` tag on `spark.job` metrics
Add configuration option to disable `spark.stage` metrics
8 changes: 8 additions & 0 deletions spark/datadog_checks/spark/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ def instance_disable_legacy_cluster_tag():
return False


def instance_disable_spark_job_stage_tags():
return False


def instance_disable_spark_stage_metrics():
return False


def instance_empty_default_hostname():
return False

Expand Down
2 changes: 2 additions & 0 deletions spark/datadog_checks/spark/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class InstanceConfig(BaseModel):
connect_timeout: Optional[float] = None
disable_generic_tags: Optional[bool] = None
disable_legacy_cluster_tag: Optional[bool] = None
disable_spark_job_stage_tags: Optional[bool] = None
disable_spark_stage_metrics: Optional[bool] = None
empty_default_hostname: Optional[bool] = None
enable_query_name_tag: Optional[bool] = None
executor_level_metrics: Optional[bool] = None
Expand Down
10 changes: 10 additions & 0 deletions spark/datadog_checks/spark/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ instances:
#
# enable_query_name_tag: true

## @param disable_spark_job_stage_tags - boolean - optional - default: false
## Enable to stop submitting the tag `stage_id` for Spark jobs.
#
# disable_spark_job_stage_tags: true

## @param disable_spark_stage_metrics - boolean - optional - default: false
## Enable to stop collecting Spark stage metrics.
#
# disable_spark_stage_metrics: true

## @param proxy - mapping - optional
## This overrides the `proxy` setting in `init_config`.
##
Expand Down
12 changes: 8 additions & 4 deletions spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def __init__(self, name, init_config, instances):
self.metricsservlet_path = self.instance.get('metricsservlet_path', '/metrics/json')

self._enable_query_name_tag = is_affirmative(self.instance.get('enable_query_name_tag', False))
self._disable_spark_job_stage_tags = is_affirmative(self.instance.get('disable_spark_job_stage_tags', False))
self._disable_spark_stage_metrics = is_affirmative(self.instance.get('disable_spark_stage_metrics', False))

# Get the cluster name from the instance configuration
self.cluster_name = self.instance.get('cluster_name')
Expand All @@ -97,8 +99,9 @@ def check(self, _):
# Get the job metrics
self._spark_job_metrics(spark_apps, tags)

# Get the stage metrics
self._spark_stage_metrics(spark_apps, tags)
if not self._disable_spark_stage_metrics:
# Get the stage metrics
self._spark_stage_metrics(spark_apps, tags)

# Get the executor metrics
self._spark_executor_metrics(spark_apps, tags)
Expand Down Expand Up @@ -412,8 +415,9 @@ def _spark_job_metrics(self, running_apps, addl_tags):
if job_id is not None:
tags.append('job_id:{}'.format(job_id))

for stage_id in job.get('stageIds', []):
tags.append('stage_id:{}'.format(stage_id))
if not self._disable_spark_job_stage_tags:
for stage_id in job.get('stageIds', []):
tags.append('stage_id:{}'.format(stage_id))

self._set_metrics_from_json(tags, job, SPARK_JOB_METRICS)
self._set_metric('spark.job.count', COUNT, 1, tags)
Expand Down
67 changes: 67 additions & 0 deletions spark/tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,15 @@ def proxy_with_warning_page_mock(url, *args, **kwargs):
'executor_level_metrics': True,
}

STANDALONE_CONFIG_STAGE_DISABLED = {
'spark_url': 'http://localhost:8080',
'cluster_name': CLUSTER_NAME,
'spark_cluster_mode': 'spark_standalone_mode',
'executor_level_metrics': True,
'disable_spark_stage_metrics': True,
'disable_spark_job_stage_tags': True,
}

STANDALONE_CONFIG_PRE_20 = {
'spark_url': 'http://localhost:8080',
'cluster_name': CLUSTER_NAME,
Expand Down Expand Up @@ -412,6 +421,11 @@ def proxy_with_warning_page_mock(url, *args, **kwargs):
'stage_id:1',
] + COMMON_TAGS

SPARK_JOB_RUNNING_NO_STAGE_METRIC_TAGS = [
'status:running',
'job_id:0',
] + COMMON_TAGS

SPARK_JOB_SUCCEEDED_METRIC_VALUES = {
'spark.job.count': 3,
'spark.job.num_tasks': 1000,
Expand All @@ -432,6 +446,11 @@ def proxy_with_warning_page_mock(url, *args, **kwargs):
'stage_id:1',
] + COMMON_TAGS

SPARK_JOB_SUCCEEDED_NO_STAGE_METRIC_TAGS = [
'status:succeeded',
'job_id:0',
] + COMMON_TAGS

SPARK_STAGE_RUNNING_METRIC_VALUES = {
'spark.stage.count': 3,
'spark.stage.num_active_tasks': 3 * 3,
Expand Down Expand Up @@ -919,6 +938,54 @@ def test_standalone_unit(aggregator, dd_run_check):
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.unit
def test_standalone_stage_disabled_unit(aggregator, dd_run_check):
with mock.patch('requests.get', standalone_requests_get_mock):
c = SparkCheck('spark', {}, [STANDALONE_CONFIG_STAGE_DISABLED])
dd_run_check(c)

_assert(
aggregator,
[
# Check the running job metrics
(SPARK_JOB_RUNNING_METRIC_VALUES, SPARK_JOB_RUNNING_NO_STAGE_METRIC_TAGS),
# Check the running job metrics
(SPARK_JOB_RUNNING_METRIC_VALUES, SPARK_JOB_RUNNING_NO_STAGE_METRIC_TAGS),
# Check the succeeded job metrics
(SPARK_JOB_SUCCEEDED_METRIC_VALUES, SPARK_JOB_SUCCEEDED_NO_STAGE_METRIC_TAGS),
# Check the driver metrics
(SPARK_DRIVER_METRIC_VALUES, COMMON_TAGS),
# Check the optional driver metrics
(SPARK_DRIVER_OPTIONAL_METRIC_VALUES, COMMON_TAGS),
# Check the executor level metrics
(SPARK_EXECUTOR_LEVEL_METRIC_VALUES, SPARK_EXECUTOR_LEVEL_METRIC_TAGS),
# Check the optional executor level metrics
(SPARK_EXECUTOR_LEVEL_OPTIONAL_PROCESS_TREE_METRIC_VALUES, SPARK_EXECUTOR_LEVEL_METRIC_TAGS),
# Check the executor metrics
(SPARK_EXECUTOR_METRIC_VALUES, COMMON_TAGS),
# Check the optional summary executor metrics
(SPARK_EXECUTOR_OPTIONAL_METRIC_VALUES, COMMON_TAGS),
# Check the RDD metrics
(SPARK_RDD_METRIC_VALUES, COMMON_TAGS),
# Check the streaming statistics metrics
(SPARK_STREAMING_STATISTICS_METRIC_VALUES, COMMON_TAGS),
# Check the structured streaming metrics
(SPARK_STRUCTURED_STREAMING_METRIC_VALUES, COMMON_TAGS),
],
)
# Check the service tests
for sc in aggregator.service_checks(STANDALONE_SERVICE_CHECK):
assert sc.status == SparkCheck.OK
assert sc.tags == ['url:http://localhost:8080'] + CLUSTER_TAGS
for sc in aggregator.service_checks(SPARK_SERVICE_CHECK):
assert sc.status == SparkCheck.OK
assert sc.tags == ['url:http://localhost:4040'] + CLUSTER_TAGS

# Assert coverage for this check on this instance
aggregator.assert_all_metrics_covered()
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.unit
def test_standalone_unit_with_proxy_warning_page(aggregator, dd_run_check):
c = SparkCheck('spark', {}, [STANDALONE_CONFIG])
Expand Down
Loading