Skip to content

Commit

Permalink
Merge pull request #1 from riysaxen-amzn/fix-fix-cancellationTimeoutE…
Browse files Browse the repository at this point in the history
…rror

Fix fix cancellation timeout error
  • Loading branch information
riysaxen-amzn authored Apr 4, 2024
2 parents fbd6c4a + e778cf5 commit 9c39499
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.common.unit.TimeValue
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,36 +699,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(getCancelAfterTimeInterval())

val response: SearchResponse = client.suspendUntil { client.search(request, it) }

if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
if (response.hits.hits.isEmpty())
return -1L

return response.hits.hits[0].seqNo
}


private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
Expand Down Expand Up @@ -912,7 +882,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var cancelAfterTimeInterval: TimeValue? = null
@Volatile var cancelAfterTimeInterval: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class TransportGetFindingsSearchAction @Inject constructor(
)
}
searchSourceBuilder.query(queryBuilder).trackTotalHits(true)

client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import kotlin.math.max
import org.opensearch.script.Script
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down

0 comments on commit 9c39499

Please sign in to comment.