Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

stop historical detector; support return AD task in get detector API #359

Merged
merged 6 commits into from
Jan 20, 2021

Conversation

ylwu-amzn
Copy link
Contributor

@ylwu-amzn ylwu-amzn commented Jan 12, 2021

Issue #, if available:

Description of changes:

In last PR #355 we added starting historical detector change. As historical detector may run for a long time, we should allow user to kill running historical detector before its task done to avoid consuming unnecessary resource if user think no need to run the historical detector any more for example user find they configured wrong feature.

This PR's main change:

  1. Support stopping historical detector. Add stop detector method in ADTaskManager, and add cancel task transport action. Will reuse the same stop detector API to stop realtime&historical detector
  2. Reuse current profile API to support AD task profile. Will return AD task and runtime info like training data size, model trained or not.
  3. As frontend(AD Kibana) needs to show the latest task state/progress, will return AD task in get detector API if user set return task URL param as true. Add task URL param in get detector API, if user set it as true, will return latest AD task together with detector.
  4. The start detector may have race condition if user send out multiple start historical detector requests for same detector at the same time. This PR fixed the race condition. Use hash ring(has on detector id) to find coordinating node of the historical detector first, then forward the request to the same coordinating node. Will cache all running historical detectors on coordinating node. If detector is already in cache, will reject the start historical detector request; otherwise, use the same logic in last PR start historical detector #355 to gather nodes' states and dispatch task to worker node and run historical detection task.

Test

  1. ./gradlew build
  2. ./gradlew integTest -PnumNodes=3

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@codecov
Copy link

codecov bot commented Jan 12, 2021

Codecov Report

Merging #359 (77c1bfb) into master (19b8f9d) will increase coverage by 0.28%.
The diff coverage is 74.50%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #359      +/-   ##
============================================
+ Coverage     78.65%   78.94%   +0.28%     
- Complexity     2465     2635     +170     
============================================
  Files           224      243      +19     
  Lines         10995    11658     +663     
  Branches        943     1002      +59     
============================================
+ Hits           8648     9203     +555     
- Misses         1907     1981      +74     
- Partials        440      474      +34     
Flag Coverage Δ Complexity Δ
plugin 78.91% <74.50%> (+0.30%) 0.00 <169.00> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ Complexity Δ
...n/opendistroforelasticsearch/ad/MemoryTracker.java 77.33% <ø> (ø) 21.00 <0.00> (ø)
...distroforelasticsearch/ad/constant/CommonName.java 66.66% <ø> (ø) 1.00 <0.00> (ø)
...icsearch/ad/rest/RestGetAnomalyDetectorAction.java 38.09% <0.00%> (-1.91%) 4.00 <0.00> (ø)
...stroforelasticsearch/ad/util/RestHandlerUtils.java 93.93% <ø> (ø) 13.00 <0.00> (ø)
...distroforelasticsearch/ad/model/ADTaskProfile.java 40.00% <40.00%> (ø) 13.00 <13.00> (?)
...earch/ad/transport/GetAnomalyDetectorResponse.java 75.00% <40.00%> (-0.37%) 10.00 <1.00> (+3.00) ⬇️
...stroforelasticsearch/ad/model/DetectorProfile.java 37.96% <44.18%> (+5.87%) 19.00 <2.00> (+3.00)
...forelasticsearch/ad/model/DetectorProfileName.java 57.14% <66.66%> (-0.55%) 4.00 <0.00> (ø)
...d/transport/GetAnomalyDetectorTransportAction.java 57.55% <68.42%> (+2.51%) 17.00 <1.00> (+3.00)
...ndistroforelasticsearch/ad/task/ADTaskManager.java 75.00% <70.63%> (-7.59%) 79.00 <51.00> (+37.00) ⬇️
... and 69 more

@ylwu-amzn ylwu-amzn force-pushed the pr_stop_task branch 5 times, most recently from eb8f2c0 to 4128228 Compare January 12, 2021 08:08
@@ -56,6 +63,7 @@ public ADTaskCacheManager(Settings settings, ClusterService clusterService, Memo
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_BATCH_TASK_PER_NODE, it -> maxAdBatchTaskPerNode = it);
taskCaches = new ConcurrentHashMap<>();
this.memoryTracker = memoryTracker;
this.detectors = Sets.newConcurrentHashSet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this field?

Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this field to record all detectors which running on the coordinating node to resolve race condition. If user starts multiple tasks for the same detector, we will put the first task in cache and record the detector id in this field. For other tasks, we will check if the detector id exists in this set, then we find the detector id exists that means there is already one task running for this detector, so we will reject other tasks.

Will add more comments here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the checking logic in another PR? don't see the related codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check this method ADTaskCacheManager#put(String detectorId)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. It will throw DuplicateTaskException for duplicated tasks. Thanks.
Usually any new state will add some complexity for concurrency control/failure mode into the codes. I don't see any issue for this change now. pls check if there is some edge case from e2e p.o.v. such as: for some case, the detector id can't be removed which causes the customer can't start new task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, will test more edge cases together with frontend changes.

taskCaches.remove(taskId);
List<ADBatchTaskCache> taskCaches = getBatchTaskCacheByDetectorId(detectorId);
if (taskCaches.isEmpty()) {
detectors.remove(detectorId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the concurrency control is added in "put" method. why not for "remove"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding concurrency control in "put" method, we make sure only one detector id exists in detectors set, so no need to add concurrency control when we remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious: is there only 1 detectors object in memory across all the nodes? Or, is detectors object in memory of master node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user start historical detector, the request will be forwarded to coordinating node(hash on detector id) first. We will put detectors on coordinating node. When we start detector, will check if the detector exists on coordinating node or not. If exists, will reject the request to avoid running multiple tasks for one detector.

@@ -0,0 +1,24 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021? Have you figured out how to configure spotless for the year in copyright header?

Copy link
Contributor Author

@ylwu-amzn ylwu-amzn Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check this comment #355 (comment) , paste the comment here

./gradlew spotlessApply will reset copyright year as 2020 as configured in file spotless.license.java.

To make this PR clean, will send out a separate PR to update this license file and apply to all files. Will replace 2020 with $YEAR, then spotless can fill as current year automatically.

}
}, e -> {
if (e instanceof IndexNotFoundException) {
function.accept(Optional.empty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this line? looks like to me function is not being executed if Optional.empty() is given

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different functions may handle empty task in different way, for example get detector with task will return empty task with detector(https://github.com/opendistro-for-elasticsearch/anomaly-detection/pull/359/files#diff-a30fb43728874ee34d76805a90385e2774d58bc7fc05bccc9d1fbd6e2a64a67fR210)

taskCaches.remove(taskId);
List<ADBatchTaskCache> taskCaches = getBatchTaskCacheByDetectorId(detectorId);
if (taskCaches.isEmpty()) {
detectors.remove(detectorId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious: is there only 1 detectors object in memory across all the nodes? Or, is detectors object in memory of master node?

}
}

protected void cleanDetectorCache(ADTask adTask, TransportService transportService, AnomalyDetectorFunction function) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand the function? The title is clean detector cache. Could you point where you cleaned detector cache? Seems what you did are: 1) forward to coordinating node the stop task action; 2) no matter you succeed or fail, you invoke the input function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method will be called at two places:

  1. Check ADBatchTaskRunner#internalBatchTaskListener, if the task finished normally or failed, we will call this function to clean detector cache on coordinating node.
  2. Check ADTaskManager#resetTaskStateAsStopped, if the task state is out of sync such as worker node crashed, then we need to clean detector cache on coordinating node and reset the task state.

*/
public ADTaskCancellationState cancelLocalTaskByDetectorId(String detectorId, String reason, String userName) {
ADTaskCancellationState cancellationState = adTaskCacheManager.cancelByDetectorId(detectorId, reason, userName);
logger.debug("Cancelled AD task for detector: " + detectorId + ", state: " + cancellationState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to add reason and userName to log, looks like such info is not logged anywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the reason is static message as we don't allow user to fill why they need to stop historical detector on Kibana. But it may help in future when we enable filling cancellation reason. As this is debug level log, sounds not bad to add more info, will add userName and reason in log.

} else if (detector.getDetectorId() == null) {
validationException = addValidationError(CommonErrorMessages.AD_ID_MISSING_MSG, validationException);
}
if (adTaskAction == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add such check for User ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User could be null here. If user is null, we will set tasks's startedBy field as null, check TaskManager#createNewADTask ,

String userName = user == null ? null : user.getName();

@ylwu-amzn ylwu-amzn merged commit 74230c5 into opendistro-for-elasticsearch:master Jan 20, 2021
@ohltyler ohltyler added enhancement New feature or request feature new feature and removed enhancement New feature or request labels Feb 2, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
feature new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants