Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7009] dogstatsd mode in statds reporter #4188

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

dbrinegar
Copy link

  • converts output to ascii alphanumeric characters with underbar,
    delimited by periods
  • reports all Flink variables as tags
  • compresses overly long segments with a first-ten plus hash symbol
  • compresses Flink ID values to first eight characters
  • removes object references from names, for correctness
  • drops negative and invalid values
  • handles LatencyGauge values

* converts output to ascii alphanumeric characters with underbar,
delimited by periods
* reports all Flink variables as tags
* compresses overly long segments with a first-ten plus hash symbol
* compresses Flink ID values to first eight characters
* removes object references from names, for correctness
* drops negative and invalid values
* handles LatencyGauge values
@dbrinegar
Copy link
Author

The check fail seems to be an elasticsearch timeout, otherwise is all green:

Tests run: 9, Failures: 0, Errors: 4, Skipped: 0, Time elapsed: 66.507 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkITCase
testDeprecatedIndexRequestBuilderVariant(org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkITCase)  Time elapsed: 60.187 sec  <<< ERROR!
Caused by: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
	at org.apache.flink.streaming.connectors.elasticsearch.Elasticsearch1ApiCallBridge.createClient(Elasticsearch1ApiCallBridge.java:105)

@@ -420,10 +420,30 @@ metrics.reporter.grph.protocol: TCP
In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder
of your Flink distribution.

In `dogstatsd` mode, all variables in Flink metrics such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>`,
`<operator_name>` and others, will be included as tags. It is recommended to define scopes for this reporter such that no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the same approach we use for JMX, Datadog and Prometheus: ignore the configured scope and use a logical scope (i.e "taskmanager.task.operator.myMetricName").

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call! I'll make these and other suggested changes, hopefully this week

return;
}
String tags = tagTable.get(gauge);
if (value instanceof Map) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gauge instanceof LatencyGauge is more reliable. (you can either make the class public or just compare the class name)

// remove first comma, prefix with "|#"
tagTable.put(metric, "|#" + statsdTagLine.substring(1));

String name = metric.getClass().getSimpleName();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this variable isn't used.

if (dogstatsdMode) {
// memoize dogstatsd tag section: "|#tag:val,tag:val,tag:val"
StringBuilder statsdTagLine = new StringBuilder();
Map<String, String> orderedTags = new TreeMap<>(group.getAllVariables());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we sort them here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry! this was the beginning of looking at a more efficient tag table, as many entries are duplicates, but I'll take out since we're going to put the tag line in the metric object

String v = filterCharacters(entry.getValue());
statsdTagLine.append(",").append(k).append(":").append(v);
}
if (statsdTagLine.length() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be omitted since the tab line can never be empty.

}
if (statsdTagLine.length() > 0) {
// remove first comma, prefix with "|#"
tagTable.put(metric, "|#" + statsdTagLine.substring(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's store the tags directly in the maps containing the metrics.

I've seen this requirement a few times now, so let's add a new AbstractReporter class that has a generic argument for the information that is stored in the map for each metric.

This would look like this:

public abstract class AbstractReporterV2<T> implements MetricReporter, CharacterFilter {
	protected final Logger log = LoggerFactory.getLogger(getClass());

	protected final Map<Gauge<?>, T> gauges = new HashMap<>();
	protected final Map<Counter, T> counters = new HashMap<>();
	protected final Map<Histogram, T> histograms = new HashMap<>();
	protected final Map<Meter, T> meters = new HashMap<>();

	protected abstract T getMetricInfo(Metric metric, String metricName, MetricGroup group);

	@Override
	public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
		T metricInfo = getMetricInfo(metric, metricName, group);
		
		synchronized (this) {
			if (metric instanceof Counter) {
				counters.put((Counter) metric, metricInfo);
			} else if (metric instanceof Gauge) {
				gauges.put((Gauge<?>) metric, metricInfo);
			} else if (metric instanceof Histogram) {
				histograms.put((Histogram) metric, metricInfo);
			} else if (metric instanceof Meter) {
				meters.put((Meter) metric, metricInfo);
			} else {
				log.warn("Cannot add unknown metric type {}. This indicates that the reporter " +
					"does not support this metric type.", metric.getClass().getName());
			}
		}
	}

	@Override
	public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
		synchronized (this) {
			if (metric instanceof Counter) {
				counters.remove(metric);
			} else if (metric instanceof Gauge) {
				gauges.remove(metric);
			} else if (metric instanceof Histogram) {
				histograms.remove(metric);
			} else if (metric instanceof Meter) {
				meters.remove(metric);
			} else {
				log.warn("Cannot remove unknown metric type {}. This indicates that the reporter " +
					"does not support this metric type.", metric.getClass().getName());
			}
		}
	}
}

You then wouldn't need to override notifyOfAdded/Removed metric and save 1 HashMap + 1 lookup per metric per report.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
int limit = Integer.MAX_VALUE;
// optionally shrink flink ids
if (shortIds && input.length() == 32 && flinkId.matcher(input).matches()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why truncate the IDs? This could could create collisions and lead to invalid metrics. It also makes it impossible to take the ID out of a metric and query the API for more information about that job. This makes it very hard to work with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to point that this is an opt-in feature. Also, given that the IDs can be controlled by the user (`SingleOutputStreamOperator#setUid), you can guarantee uniqueness even with 8 characters.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see that now. I missed the shortIds flag. This is good.

*/

private boolean isValidStatsdChar(char c) {
return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || (c == '_');

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I have seen '-' seems to be ok at least for some implementations. There may be other safe characters too. In fact the ReadyTalk implementation replaces spaces with '-'.
https://github.com/ReadyTalk/metrics-statsd/blob/master/metrics-statsd-common/src/main/java/com/readytalk/metrics/StatsD.java#L129

I'm sure many other characters are ok too. The main ones that need to be escaped are ones that have special meaning in the statsd format.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a fair point. The thought here is to take an extremely limited set to maximize compatibility with collectors and timeseries databases, to avoid translation. I think if one were to make a compatibliity table you might find a few extra punctuation type characters, but I couldn't see how they changed the significance or meaning of the metrics, so landed on picking one commonly accepted non-period delimiter. The underbar is also guidance from datadog, for best practice with their systems. The metrics from Flink are super clean this way. Flink metric names seem to always be alpha and dots, the tag names are all alpha and underscore (eg task_id), so looks quite natural for the tag values to be alphanumeric + underscore.

But yeah, is an arbitrary choice.

@sv3ndk
Copy link
Contributor

sv3ndk commented Jul 11, 2017

Hi all,

@dbrinegar , are you still working on this? I started using Datadog for flink a couple of days ago, then directly wanted to suggest a PR to remove the object ids from metrics names... only to find you had started working on it already :)

I would really like this to be in place, at the moment I edit my datadog dashboard at every flink or job restart, that will not be an option in production.

Anything I can contribute to this PR to help it go forward?

@zentol
Copy link
Contributor

zentol commented Jul 11, 2017

@svendx4f Removing object ID's from operator/task names should be solved in general and not just for StatsD. The corresponding issue is FLINK-6464. Feel free to pick it up.

@sv3ndk
Copy link
Contributor

sv3ndk commented Jul 11, 2017

Thanks for the pointer, I'll look into it

@dbrinegar
Copy link
Author

Hi! Yes a bit stalled here on other tasks, but hoping to get back to this soon.

David Brinegar added 2 commits July 12, 2017 17:03
* use common storage for dogstatsd tags intead of a look aside table
* refactored AbstractReporter to have extensible common storage
* use logical scope, ignoring scope settings if all scope are tags
* match LatencyGauge class name for more reliable type match
@dbrinegar
Copy link
Author

ping! let me know if any other changes are needed, thanks

@zentol
Copy link
Contributor

zentol commented Aug 9, 2017

Sorry that it's taking so long for me to response, I'm completely swamped with issues at the moment.

I will take another deeper look when i finally find time to try this out, but from what I've seen so far it looks good.

There are 3 changes that I would like to promote to general features of the metric system: the short_ids option, the size limit and the replacement of object references.
The names to tasks/operators can be modified in TaskManagerMetricGroup#addTaskForJob() and TaskMetricGroup#addOperator(). The IDs can be modified in TaskManagerMetricGroup#addTaskForJob() , JobManagerMetricGroup#addJob(), and the constructor of TaskManagerMetricGroup` for which we ideally add a static factory method.

Hence, I'm unsure on whether to merge this PR as is, or first implement the above and then rebase the PR.

@dbrinegar
Copy link
Author

Totally understand! Busy here too. Let me know what you decide to do and I'll try to chip in.

@DerekV
Copy link

DerekV commented Sep 30, 2017

+1 Tags for dogstatsd would be great

@sap1ens
Copy link

sap1ens commented Jun 10, 2021

Sorry for pinging such an old PR, but I was wondering if someone is still planning to work on this... Otherwise, I can attempt to recreate this in another PR.

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 10, 2021

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants