diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 3e35b2997..47fef3edb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -30,8 +30,8 @@ import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.getCombinedTriggerRunResult import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext -import org.opensearch.common.unit.TimeValue import org.opensearch.client.Client +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3654a136a..51a32b642 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -699,36 +699,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return indexCreationDate > lastExecutionTime.toEpochMilli() } - /** - * Get the current max seq number of the shard. We find it by searching the last document - * in the primary shard. - */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { - val request: SearchRequest = SearchRequest() - .indices(index) - .preference("_shards:$shard") - .source( - SearchSourceBuilder() - .version(true) - .sort("_seq_no", SortOrder.DESC) - .seqNoAndPrimaryTerm(true) - .query(QueryBuilders.matchAllQuery()) - .size(1) - ) - request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(getCancelAfterTimeInterval()) - - val response: SearchResponse = client.suspendUntil { client.search(request, it) } - - if (response.status() !== RestStatus.OK) { - throw IOException("Failed to get max seq no for shard: $shard") - } - if (response.hits.hits.isEmpty()) - return -1L - - return response.hits.hits[0].seqNo - } - - private fun getShardsCount(clusterService: ClusterService, index: String): Int { val allShards: List = clusterService!!.state().routingTable().allShards(index) return allShards.filter { it.primary() }.size @@ -912,7 +882,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .size(monitorCtx.docLevelMonitorShardFetchSize) ) .preference(Preference.PRIMARY_FIRST.type()) - request.cancelAfterTimeInterval = TimeValue.timeValueMinutes( getCancelAfterTimeInterval() ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index db7a1d6bc..eefad1b6b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -53,7 +53,7 @@ data class MonitorRunnerExecutionContext( @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, @Volatile var indexTimeout: TimeValue? = null, - @Volatile var cancelAfterTimeInterval: TimeValue? = null + @Volatile var cancelAfterTimeInterval: TimeValue? = null, @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, @Volatile var fetchOnlyQueryFieldNames: Boolean = true, @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt index d837dc57b..6835a9625 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt @@ -132,7 +132,6 @@ class TransportGetFindingsSearchAction @Inject constructor( ) } searchSourceBuilder.query(queryBuilder).trackTotalHits(true) - client.threadPool().threadContext.stashContext().use { scope.launch { try { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 056d7d4ea..355945939 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -26,8 +26,8 @@ import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope import org.opensearch.commons.alerting.util.isBucketLevelMonitor -import kotlin.math.max import org.opensearch.script.Script +import kotlin.math.max private val logger = LogManager.getLogger("AlertingUtils")