Skip to content

Commit

Permalink
[KYUUBI #5138][TEST] Fix flaky test BatchesResourceSuite - get batch …
Browse files Browse the repository at this point in the history
…session list

### _Why are the changes needed?_

```
  override def afterEach(): Unit = {
    val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
    sessionManager.allSessions().foreach { session =>
      sessionManager.closeSession(session.handle)
    }

    // there is a chance that `KyuubiBatchService` do batch job submission during this phase
    // which causes the above cleanup can not catch all sessions.

    sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue).foreach { batch =>
      sessionManager.applicationManager.killApplication(ApplicationManagerInfo(None), batch.getId)
      sessionManager.cleanupMetadata(batch.getId)
    }
  }
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5246 from pan3793/5138.

Closes #5138

a8d6837 [Cheng Pan] try catch
0c418b4 [Cheng Pan] [KYUUBI #5138][TEST] Fix flaky test BatchesResourceSuite - get batch session list

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed Sep 4, 2023
1 parent 708a0be commit 466d35d
Showing 1 changed file with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.glassfish.jersey.media.multipart.FormDataMultiPart
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart

import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelper}
import org.apache.kyuubi.{BatchTestHelper, KyuubiFunSuite, RestFrontendTestHelper, Utils}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.client.util.BatchUtils
import org.apache.kyuubi.client.util.BatchUtils._
Expand All @@ -42,7 +42,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.server.KyuubiRestFrontendService
import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService}
import org.apache.kyuubi.server.http.authentication.AuthenticationHandler.AUTHORIZATION_HEADER
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.authentication.{InternalSecurityAccessor, KyuubiAuthenticationFactory}
Expand All @@ -60,6 +60,17 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase {
override def customConf: Map[String, String] = Map(
KyuubiConf.METADATA_REQUEST_ASYNC_RETRY_ENABLED.key -> "false",
KyuubiConf.BATCH_SUBMITTER_ENABLED.key -> "true")

override def afterEach(): Unit = {
val sessionManager = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
val batchService = server.getServices.collectFirst { case b: KyuubiBatchService => b }.get
sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, Int.MaxValue)
.foreach { batch => batchService.cancelUnscheduledBatch(batch.getId) }
super.afterEach()
sessionManager.allSessions().foreach { session =>
Utils.tryLogNonFatalError { sessionManager.closeSession(session.handle) }
}
}
}

abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
Expand Down

0 comments on commit 466d35d

Please sign in to comment.