-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Improve handling of futures and threads during refresh. #1457
Conversation
b753d95
to
5afe088
Compare
@@ -113,8 +116,19 @@ private InstanceData getInstanceData() { | |||
instanceDataFuture = currentInstanceData; | |||
} | |||
try { | |||
return Uninterruptibles.getUninterruptibly(instanceDataFuture); | |||
} catch (ExecutionException ex) { | |||
return instanceDataFuture.get(30, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a change of behavior:
This method is called by an application thread when it is trying to create a new connection to the database. (It is not called by a ListeningScheduledExecutorService task.) So it is OK to block waiting for a future to complete.
The old implementation:
- When no refresh attempt is in progress, returns immediately.
- Otherwise blocks application thread until the current refresh attempt finishes. If the refresh attempt succeeds, this returns the InstanceData. If not, this throws a RuntimeException, while a new refresh attempt is submitted to the executor in the background.
The new implementation:
- When no refresh attempt is in progress, returns immediately.
- Otherwise waits up to 30 seconds. If a refresh attempt succeeds, returns immediately at the end of that successful attempt. If no attempts succeed within the 30 second timeout, throws a RuntimeException with the exception from the last failed refresh attempt as the cause.
We could make this 30 second timeout configurable. Customers who want connection attempts to fail fast
could set this timeout lower. Customers who want connection attempts to block could set the timeout higher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor not: could we pull out the 30 into a constant? MAX_INSTANCE_DATA_WAIT_TIME
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
5afe088
to
feae3b3
Compare
private InstanceData performRefresh() throws InterruptedException, ExecutionException { | ||
logger.fine( | ||
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName)); | ||
private ListenableFuture<InstanceData> performRefresh() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a change of behavior.
The old way: performRefresh() used to block until either (1) an InstanceData was retrieved or (2) an attempt failed, and an exception was thrown.
The new way: performRefresh() does not block. It sets up a chain of futures. The future returned from this method will resolve only when a refresh attempt succeeds.
|
||
RuntimeException ex = Assert.assertThrows(RuntimeException.class, instance::getSslData); | ||
assertThat(ex).hasMessageThat().contains("always fails"); | ||
} | ||
|
||
@Test | ||
public void testInstanceFailsOnTooLongToRetrieve() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new test covers the behavior when the refresh attempt never returns.
@@ -139,7 +176,7 @@ public void testCloudSqlInstanceForcesRefresh() throws Exception { | |||
stubCredentialFactory, | |||
executorService, | |||
keyPairFuture, | |||
RateLimiter.create(1.0 / 30.0)); | |||
RateLimiter.create(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speed up tests - allow 10 requests/second.
feae3b3
to
86f06c8
Compare
4f01388
to
8519aff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a nice improvement.
@@ -113,8 +116,19 @@ private InstanceData getInstanceData() { | |||
instanceDataFuture = currentInstanceData; | |||
} | |||
try { | |||
return Uninterruptibles.getUninterruptibly(instanceDataFuture); | |||
} catch (ExecutionException ex) { | |||
return instanceDataFuture.get(30, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor not: could we pull out the 30 into a constant? MAX_INSTANCE_DATA_WAIT_TIME
or similar?
// Now update nextInstanceData to perform a refresh after the | ||
// scheduled delay | ||
ListenableFuture scheduleDelay = | ||
executor.schedule(() -> {}, secondsToRefresh, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other ways to do this? On the surface it looks unusual.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a better method call for this.
} | ||
throw new RuntimeException("fake read timeout"); | ||
}); | ||
f.get(); // this will throw an ExecutionException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some more details to the comment -- it will throw an exception because we've configured the connector wait only 30s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added comments and cleaned this up.
8519aff
to
69cc22a
Compare
69cc22a
to
ff3646c
Compare
ff3646c
to
a4f0db0
Compare
aded0c8
to
e5d3e5b
Compare
@hessjcg friendly ping on this. This is an important fix and improvement that we want to bake in CI to see if it resolves the flaky tests. |
chore: Give methods in CloudSqlInstance names that better reflect their new async roles.
… the token has expired.
53d3751
to
4b4712f
Compare
34cc99b
to
a7f9a7f
Compare
a7f9a7f
to
9727951
Compare
@@ -47,6 +47,8 @@ class CloudSqlInstance { | |||
|
|||
private static final Logger logger = Logger.getLogger(CloudSqlInstance.class.getName()); | |||
|
|||
public static final int MAX_INSTANCE_DATA_WAIT_MS = 30000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment explaining what this constant represents.
@@ -153,6 +153,7 @@ | |||
<compilerArgs> | |||
<arg>-XDcompilePolicy=simple</arg> | |||
<arg>-Xplugin:ErrorProne</arg> | |||
<arg>-Xlint:-options</arg> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a separate PR?
@@ -159,45 +216,88 @@ String getPreferredIp(List<String> preferredTypes) { | |||
* new refresh is already in progress. If successful, other methods will block until refresh has | |||
* been completed. | |||
*/ | |||
void forceRefresh() { | |||
boolean forceRefresh() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be returning a boolean? I don't see it being used.
Instant expiration; | ||
try { | ||
expiration = instanceDataFuture.get().getExpiration(); | ||
if (expiration == null || expiration.isBefore(Instant.now())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the current instance data has already expired, won't we ended up initiating a force refresh when the socket creation fails? In other words, do we really need to do this work here?
return instanceDataFuture.get(timeoutMs, TimeUnit.MILLISECONDS); | ||
} catch (TimeoutException e) { | ||
synchronized (instanceDataGuard) { | ||
if (currentRefreshFailure != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we split this out as a separate PR? It's nice to surface the refresh failure like this.
// has produced a successful refresh. | ||
if (forceRefreshRunning) { | ||
return; | ||
if (refreshRunning) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this rename be a separate PR? Seems like a nice improvement.
Closing, too big. I'll submit this as a sequence of PRs. |
Rewrite the performRefresh() as a chain of task futures from the ListeningScheduledExecutorService. Now, tasks
submitted to the ListeningScheduledExecutorService never block on another task submitted to the ListeningScheduledExecutorService.
This should fix a category of bugs that show up in exceptions and logs as "connection timed out" or "refresh failed"
or "bad client certificate". These exceptions can occur when the credentials fail to refresh.
This is the underlying bug: The ListeningScheduledExecutorService gets into a state where all its threads are busy
running tasks, all running tasks are blocked waiting for recently submitted task to complete, and the recently
submitted tasks can't start because there are no available threads in the ListeningScheduledExecutorService.