diff --git a/src/main/java/com/ververica/flink/table/jdbc/FlinkResultSet.java b/src/main/java/com/ververica/flink/table/jdbc/FlinkResultSet.java index 8ef4eb1..d3bc4de 100644 --- a/src/main/java/com/ververica/flink/table/jdbc/FlinkResultSet.java +++ b/src/main/java/com/ververica/flink/table/jdbc/FlinkResultSet.java @@ -66,9 +66,11 @@ public class FlinkResultSet implements ResultSet { // If an empty array is fetched as result, at least sleep this long millis before next attempt - private static final int DEFAULT_INIT_SLEEP_MILLIS = 10; + private static final long DEFAULT_INIT_SLEEP_MILLIS = 200L; // If an empty array is fetched as result, at most sleep this long millis before next attempt - private static final int DEFAULT_MAX_SLEEP_MILLIS = 1000; + private static final long DEFAULT_MAX_SLEEP_MILLIS = 60000L; + // If an empty array is fetched as result, at most sleep [query elapsed time] * this fraction before next attempt + private static final double DEFAULT_MAX_SLEEP_FRACTION = 0.1; private final SessionClient session; private final Either jobIdOrResultSet; @@ -82,6 +84,8 @@ public class FlinkResultSet implements ResultSet { private boolean wasNull; private boolean closed; + private long resultSetCreateMillis; + public FlinkResultSet( SessionClient session, Either jobIdOrResultSet, @@ -100,6 +104,7 @@ public FlinkResultSet( this.wasNull = false; this.closed = false; + this.resultSetCreateMillis = System.currentTimeMillis(); } @Override @@ -1435,7 +1440,7 @@ private boolean fetchNextResponse(boolean needData) throws SQLException { } // do the actual remote fetching work - int sleepMillis = DEFAULT_INIT_SLEEP_MILLIS; + long sleepMillis = DEFAULT_INIT_SLEEP_MILLIS; while (true) { currentToken++; ResultFetchResponseBody response; @@ -1457,7 +1462,12 @@ private boolean fetchNextResponse(boolean needData) throws SQLException { // empty array as result but we need data, sleep before next attempt try { Thread.sleep(sleepMillis); - sleepMillis = Math.min(sleepMillis * 2, DEFAULT_MAX_SLEEP_MILLIS); + long elapsedMillis = System.currentTimeMillis() - resultSetCreateMillis; + long maxSleepMillis = Math.min( + DEFAULT_MAX_SLEEP_MILLIS, Math.round(elapsedMillis * DEFAULT_MAX_SLEEP_FRACTION)); + sleepMillis = Math.min(sleepMillis * 2, maxSleepMillis); + // we do not want the sleep time to be too short, so we should have a lower bound + sleepMillis = Math.max(sleepMillis, DEFAULT_INIT_SLEEP_MILLIS); } catch (InterruptedException e) { throw new SQLException( "Interrupted while fetching more results for job " + jobIdOrResultSet.left(), e);