Skip to content

Commit

Permalink
Fixing snapshot bug (opensearch-project#1257)
Browse files Browse the repository at this point in the history
Signed-off-by: Kshitij Tandon <[email protected]>
Co-authored-by: bowenlan-amzn <[email protected]>
  • Loading branch information
tandonks and bowenlan-amzn authored Sep 17, 2024
1 parent 55633ee commit 809f85c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.snapshot

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.opensearch.cluster.SnapshotsInProgress.State
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {
Expand All @@ -33,32 +33,31 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {

try {
val snapshotName = getSnapshotName(managedIndexMetadata, indexName) ?: return this
val request =
SnapshotsStatusRequest()
.snapshots(arrayOf(snapshotName))
.repository(repository)
val response: SnapshotsStatusResponse = context.client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? =
val newRequest = GetSnapshotsRequest()
.snapshots(arrayOf(snapshotName))
.repository(repository)
val response: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil { getSnapshots(newRequest, it) }
val status: SnapshotInfo? =
response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == snapshotName && snapshotStatus.snapshot.repository == repository
.find { snapshotInfo ->
snapshotInfo.snapshotId().name == snapshotName
}
if (status != null) {
when (status.state) {
State.INIT, State.STARTED -> {
when (status.state()) {
SnapshotState.IN_PROGRESS -> {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state.name)
info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state().toString())
}
State.SUCCESS -> {
SnapshotState.SUCCESS -> {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state.name)
info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state().toString())
}
else -> { // State.FAILED, State.ABORTED
val message = getFailedExistsMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message, "state" to status.state.name)
info = mapOf("message" to message, "state" to status.state().toString())
}
}
} else {
Expand Down Expand Up @@ -98,13 +97,11 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {
return actionProperties.snapshotName
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)

override fun isIdempotent(): Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ import com.nhaarman.mockitokotlin2.doReturn
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.client.AdminClient
import org.opensearch.client.Client
import org.opensearch.client.ClusterAdminClient
import org.opensearch.cluster.SnapshotsInProgress
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.core.action.ActionListener
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.opensearch.indexmanagement.snapshotmanagement.mockInProgressSnapshotInfo
import org.opensearch.indexmanagement.snapshotmanagement.mockSnapshotInfo
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.jobscheduler.spi.utils.LockService
import org.opensearch.script.ScriptService
import org.opensearch.snapshots.Snapshot
import org.opensearch.snapshots.SnapshotId
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.transport.RemoteTransportException

Expand Down Expand Up @@ -70,13 +70,10 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
}

fun `test snapshot status states`() {
val snapshotStatus: SnapshotStatus = mock()
val response: SnapshotsStatusResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotStatus))
whenever(snapshotStatus.snapshot).doReturn(Snapshot("repo", SnapshotId("snapshot-name", "some_uuid")))
val snapshotInfo: SnapshotInfo = mockInProgressSnapshotInfo(snapshot)
val response: GetSnapshotsResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotInfo))
val client = getClient(getAdminClient(getClusterAdminClient(response, null)))

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.INIT)
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
Expand All @@ -88,31 +85,34 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
assertEquals("Did not get snapshot in progress message", WaitForSnapshotStep.getSnapshotInProgressMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.STARTED)
val snapshotInfo2: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.SUCCESS)
whenever(response.snapshots).doReturn(listOf(snapshotInfo2))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
val step = WaitForSnapshotStep(snapshotAction)
val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService)
step.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata)
assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot in progress message", WaitForSnapshotStep.getSnapshotInProgressMessage("test"), updatedManagedIndexMetaData.info!!["message"])
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot completed message", WaitForSnapshotStep.getSuccessMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.SUCCESS)
val snapshotInfo3: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.FAILED)
whenever(response.snapshots).doReturn(listOf(snapshotInfo3))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
val step = WaitForSnapshotStep(snapshotAction)
val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService)
step.preExecute(logger, context).execute()
val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata)
assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot completed message", WaitForSnapshotStep.getSuccessMessage("test"), updatedManagedIndexMetaData.info!!["message"])
assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus)
assertEquals("Did not get snapshot failed message", WaitForSnapshotStep.getFailedExistsMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.ABORTED)
val snapshotInfo4: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.PARTIAL)
whenever(response.snapshots).doReturn(listOf(snapshotInfo4))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
Expand All @@ -124,7 +124,8 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
assertEquals("Did not get snapshot failed message", WaitForSnapshotStep.getFailedExistsMessage("test"), updatedManagedIndexMetaData.info!!["message"])
}

whenever(snapshotStatus.state).doReturn(SnapshotsInProgress.State.FAILED)
val snapshotInfo5: SnapshotInfo = mockSnapshotInfo(snapshot, SnapshotState.INCOMPATIBLE)
whenever(response.snapshots).doReturn(listOf(snapshotInfo5))
runBlocking {
val snapshotAction = SnapshotAction("repo", snapshot, 0)
val metadata = ManagedIndexMetaData("test", "indexUuid", "policy_id", null, null, null, null, null, null, null, ActionMetaData(WaitForSnapshotStep.name, 1, 0, false, 0, null, ActionProperties(snapshotName = "snapshot-name")), null, null, null)
Expand All @@ -138,10 +139,9 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {
}

fun `test snapshot not in response list`() {
val snapshotStatus: SnapshotStatus = mock()
val response: SnapshotsStatusResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotStatus))
whenever(snapshotStatus.snapshot).doReturn(Snapshot("repo", SnapshotId("snapshot-different-name", "some_uuid")))
val snapshotInfo: SnapshotInfo = mockSnapshotInfo("snapshot-different-name")
val response: GetSnapshotsResponse = mock()
whenever(response.snapshots).doReturn(listOf(snapshotInfo))
val client = getClient(getAdminClient(getClusterAdminClient(response, null)))

runBlocking {
Expand Down Expand Up @@ -190,17 +190,17 @@ class WaitForSnapshotStepTests : OpenSearchTestCase() {

private fun getAdminClient(clusterAdminClient: ClusterAdminClient): AdminClient = mock { on { cluster() } doReturn clusterAdminClient }

private fun getClusterAdminClient(snapshotsStatusResponse: SnapshotsStatusResponse?, exception: Exception?): ClusterAdminClient {
assertTrue("Must provide one and only one response or exception", (snapshotsStatusResponse != null).xor(exception != null))
private fun getClusterAdminClient(getSnapshotsResponse: GetSnapshotsResponse?, exception: Exception?): ClusterAdminClient {
assertTrue("Must provide one and only one response or exception", (getSnapshotsResponse != null).xor(exception != null))
return mock {
doAnswer { invocationOnMock ->
val listener = invocationOnMock.getArgument<ActionListener<SnapshotsStatusResponse>>(1)
if (snapshotsStatusResponse != null) {
listener.onResponse(snapshotsStatusResponse)
val listener = invocationOnMock.getArgument<ActionListener<GetSnapshotsResponse>>(1)
if (getSnapshotsResponse != null) {
listener.onResponse(getSnapshotsResponse)
} else {
listener.onFailure(exception)
}
}.whenever(this.mock).snapshotsStatus(any(), any())
}.whenever(this.mock).getSnapshots(any(), any())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule
import org.opensearch.snapshots.Snapshot
import org.opensearch.snapshots.SnapshotId
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength
import org.opensearch.test.OpenSearchTestCase.randomBoolean
import org.opensearch.test.OpenSearchTestCase.randomIntBetween
Expand Down Expand Up @@ -177,11 +178,14 @@ fun randomSMState(): SMState = SMState.values()[randomIntBetween(0, SMState.valu

fun randomNotificationConfig(): NotificationConfig = NotificationConfig(randomChannel(), randomConditions())

fun randomConditions(): NotificationConfig.Conditions = NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())
fun randomConditions(): NotificationConfig.Conditions =
NotificationConfig.Conditions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean())

fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string()
fun ToXContent.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String =
this.toXContent(XContentFactory.jsonBuilder(), params).string()

fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map<String, Any> = this.toXContent(XContentFactory.jsonBuilder(), params).toMap()
fun ToXContent.toMap(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): Map<String, Any> =
this.toXContent(XContentFactory.jsonBuilder(), params).toMap()

fun mockIndexResponse(status: RestStatus = RestStatus.OK): IndexResponse {
val indexResponse: IndexResponse = mock()
Expand Down Expand Up @@ -263,6 +267,18 @@ fun mockInProgressSnapshotInfo(
return SnapshotInfo(entry)
}

fun mockSnapshotInfo(
name: String = randomAlphaOfLength(10),
snapshotState: SnapshotState,
): SnapshotInfo {
return SnapshotInfo(
SnapshotId(name, UUIDs.randomBase64UUID()),
emptyList(),
emptyList(),
snapshotState,
)
}

fun mockGetSnapshotResponse(num: Int): GetSnapshotsResponse {
val getSnapshotsRes: GetSnapshotsResponse = mock()
whenever(getSnapshotsRes.snapshots).doReturn(mockSnapshotInfoList(num))
Expand All @@ -281,4 +297,5 @@ fun mockSnapshotInfoList(num: Int, namePrefix: String = randomAlphaOfLength(10))
return result.toList()
}

fun String.parser(): XContentParser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, this)
fun String.parser(): XContentParser =
XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, this)

0 comments on commit 809f85c

Please sign in to comment.