From e8221f9cd17822220746259a8882ea66c0f4524b Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Fri, 1 Dec 2023 18:45:17 +0800 Subject: [PATCH] [KYUUBI #5799] [FLINK] Fix fetch timeout in session conf doesn't support ISO-8601 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— Currently, Flink engine supports overwriting result fetch timeout in session conf, but in that way IOS-8601 time format is not supported. This PR fixes the problem. This pull request fixes # ## Describe Your Solution ๐Ÿ”ง Apply ConfigOption's time value conf parsing in session conf parsing. ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5799 from link3280/timeconf_parsing. Closes #5799 417898a63 [Paul Lin] [FLINK] Use ISO-8601 time conf in unit test 99a496419 [Paul Lin] [FLINK] Fix fetch timeout in session conf doesn't support ISO-8601 Authored-by: Paul Lin Signed-off-by: Paul Lin (cherry picked from commit 8f529aacf1d8cd1d6e68f5692ad3532b71b2565b) Signed-off-by: Paul Lin --- .../engine/flink/operation/FlinkSQLOperationManager.scala | 5 ++++- .../kyuubi/engine/flink/operation/FlinkOperationSuite.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala index 3bb947e0738..324efb6585c 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala @@ -73,7 +73,10 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage resultMaxRowsDefault.toString).toInt val resultFetchTimeout = - flinkSession.normalizedConf.get(ENGINE_FLINK_FETCH_TIMEOUT.key).map(_.toLong milliseconds) + flinkSession.normalizedConf + .get(ENGINE_FLINK_FETCH_TIMEOUT.key) + .map(ENGINE_FLINK_FETCH_TIMEOUT.valueConverter) + .map(_.get milliseconds) .getOrElse(resultFetchTimeoutDefault) val op = mode match { diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala index 9469cf286c5..5c05e2f2391 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala @@ -1255,7 +1255,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest test("test result fetch timeout") { val exception = intercept[KyuubiSQLException]( - withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "60000"))() { + withSessionConf()(Map(ENGINE_FLINK_FETCH_TIMEOUT.key -> "PT60S"))() { withJdbcStatement("tbl_a") { stmt => stmt.executeQuery("create table tbl_a (a int) " + "with ('connector' = 'datagen', 'rows-per-second'='0')")