Skip to content

Commit

Permalink
chore(refactor): Rename CloudSqlInstance to CloudSqlConnectorInfoCache
Browse files Browse the repository at this point in the history
  • Loading branch information
hessjcg committed Sep 8, 2023
1 parent fc3d7ca commit 188a5bf
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
* SQL Admin API. The operations to retrieve information with the API are largely done
* asynchronously, and this class should be considered threadsafe.
*/
class CloudSqlInstance {
class CloudSqlConnectorInfoCache {

private static final Logger logger = Logger.getLogger(CloudSqlInstance.class.getName());
private static final Logger logger = Logger.getLogger(CloudSqlConnectorInfoCache.class.getName());

private final ListeningScheduledExecutorService executor;
private final InstanceDataSupplier instanceDataSupplier;
Expand Down Expand Up @@ -76,7 +76,7 @@ class CloudSqlInstance {
* @param executor executor used to schedule asynchronous tasks
* @param keyPair public/private key pair used to authenticate connections
*/
CloudSqlInstance(
CloudSqlConnectorInfoCache(
String connectionName,
InstanceDataSupplier instanceDataSupplier,
AuthType authType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private String generatePublicKeyCert(KeyPair keyPair) {
}

/**
* Internal Use Only: Gets the instance data for the CloudSqlInstance from the API.
* Internal Use Only: Gets the instance data from the API.
*
* @throws ExecutionException if an exception is thrown during execution.
* @throws InterruptedException if the executor is interrupted.
Expand Down
29 changes: 15 additions & 14 deletions core/src/main/java/com/google/cloud/sql/core/CoreSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public final class CoreSocketFactory {
private static final String version = getVersion();
private static CoreSocketFactory coreSocketFactory;
private final ListenableFuture<KeyPair> localKeyPair;
private final ConcurrentHashMap<String, CloudSqlInstance> instances = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, CloudSqlConnectorInfoCache> instances =
new ConcurrentHashMap<>();
private final ListeningScheduledExecutorService executor;
private final CredentialFactory credentialFactory;
private final int serverProxyPort;
Expand Down Expand Up @@ -225,11 +226,11 @@ public static SslData getSslData(
throws IOException {
if (enableIamAuth) {
return getInstance()
.getCloudSqlInstance(csqlInstanceName, AuthType.IAM, targetPrincipal, delegates)
.getConnectorInfo(csqlInstanceName, AuthType.IAM, targetPrincipal, delegates)
.getSslData();
}
return getInstance()
.getCloudSqlInstance(csqlInstanceName, AuthType.PASSWORD, targetPrincipal, delegates)
.getConnectorInfo(csqlInstanceName, AuthType.PASSWORD, targetPrincipal, delegates)
.getSslData();
}

Expand All @@ -243,9 +244,9 @@ public static String getHostIp(

private String getHostIp(
String instanceName, List<String> ipTypes, String targetPrincipal, List<String> delegates) {
CloudSqlInstance instance =
getCloudSqlInstance(instanceName, AuthType.PASSWORD, targetPrincipal, delegates);
return instance.getPreferredIp(ipTypes);
CloudSqlConnectorInfoCache cache =
getConnectorInfo(instanceName, AuthType.PASSWORD, targetPrincipal, delegates);
return cache.getPreferredIp(ipTypes);
}

/**
Expand Down Expand Up @@ -358,37 +359,37 @@ Socket createSslSocket(
String targetPrincipal,
List<String> delegates)
throws IOException, InterruptedException {
CloudSqlInstance instance =
getCloudSqlInstance(instanceName, authType, targetPrincipal, delegates);
CloudSqlConnectorInfoCache cache =
getConnectorInfo(instanceName, authType, targetPrincipal, delegates);

try {
SSLSocket socket = instance.createSslSocket();
SSLSocket socket = cache.createSslSocket();

// TODO(kvg): Support all socket related options listed here:
// https://dev.mysql.com/doc/connector-j/en/connector-j-reference-configuration-properties.html
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);

String instanceIp = instance.getPreferredIp(ipTypes);
String instanceIp = cache.getPreferredIp(ipTypes);

socket.connect(new InetSocketAddress(instanceIp, serverProxyPort));
socket.startHandshake();

return socket;
} catch (Exception ex) {
// TODO(kvg): Let user know about the rate limit
instance.forceRefresh();
cache.forceRefresh();
throw ex;
}
}

CloudSqlInstance getCloudSqlInstance(
CloudSqlConnectorInfoCache getConnectorInfo(
String instanceName, AuthType authType, String targetPrincipal, List<String> delegates) {
return instances.computeIfAbsent(
instanceName, k -> createInstance(k, authType, targetPrincipal, delegates));
}

private CloudSqlInstance createInstance(
private CloudSqlConnectorInfoCache createInstance(
String instanceName, AuthType authType, String targetPrincipal, List<String> delegates) {

final CredentialFactory instanceCredentialFactory;
Expand All @@ -409,7 +410,7 @@ private CloudSqlInstance createInstance(
HttpRequestInitializer credential = instanceCredentialFactory.create();
CloudSqlConnectorInfoRepository adminApi = adminClientFactory.create(credential);

return new CloudSqlInstance(
return new CloudSqlConnectorInfoCache(
instanceName,
adminApi,
authType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import java.security.KeyPair;
import java.util.concurrent.ExecutionException;

/** Internal Use Only: Gets the instance data for the CloudSqlInstance from the API. */
/** Internal Use Only: Gets the instance data from the API. */
interface InstanceDataSupplier {
/**
* Internal Use Only: Gets the instance data for the CloudSqlInstance from the API.
* Internal Use Only: Gets the instance data from the API.
*
* @throws ExecutionException if an exception is thrown during execution.
* @throws InterruptedException if the executor is interrupted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import java.util.stream.Collectors;
import org.junit.Test;

public class CloudSqlInstanceConcurrencyTest {
public class CloudSqlConnectorInfoCacheConcurrencyTest {

private static final Logger logger =
Logger.getLogger(CloudSqlInstanceConcurrencyTest.class.getName());
Logger.getLogger(CloudSqlConnectorInfoCacheConcurrencyTest.class.getName());

private static class TestCredentialFactory implements CredentialFactory, HttpRequestInitializer {

Expand All @@ -59,8 +59,8 @@ public void testThatForceRefreshBalksWhenARefreshIsInProgress() throws Exception
Futures.immediateFuture(mockAdminApi.getClientKeyPair());
ListeningScheduledExecutorService executor = CoreSocketFactory.getDefaultExecutor();
TestDataSupplier supplier = new TestDataSupplier(true);
CloudSqlInstance instance =
new CloudSqlInstance(
CloudSqlConnectorInfoCache cache =
new CloudSqlConnectorInfoCache(
"a:b:c",
supplier,
AuthType.PASSWORD,
Expand All @@ -74,16 +74,17 @@ public void testThatForceRefreshBalksWhenARefreshIsInProgress() throws Exception
// to cause a deadlock.
ListenableFuture<List<Object>> allData =
Futures.allAsList(
executor.submit(instance::getSslData),
executor.submit(instance::getSslData),
executor.submit(instance::getSslData));
executor.submit(cache::getSslData),
executor.submit(cache::getSslData),
executor.submit(cache::getSslData));

List<Object> d = allData.get();
assertThat(d.get(0)).isNotNull();
assertThat(d.get(1)).isNotNull();
assertThat(d.get(2)).isNotNull();

// Test that there was 1 successful attempt from when the CloudSqlInstance was instantiated.
// Test that there was 1 successful attempt from when the CloudSqlConnectorInfoCache was
// instantiated.
assertThat(supplier.successCounter.get()).isEqualTo(1);

// Now, run through 20 cycles where we call forceRefresh() multiple times and make sure that
Expand All @@ -96,24 +97,25 @@ public void testThatForceRefreshBalksWhenARefreshIsInProgress() throws Exception

// Call forceRefresh 3 times in rapid succession. This should only kick off 1 refresh
// cycle.
instance.forceRefresh();
cache.forceRefresh();
// force Java to run a different thread now. That gives the refrsh task an opportunity to
// start.
Thread.yield();
instance.forceRefresh();
instance.forceRefresh();
cache.forceRefresh();
cache.forceRefresh();
Thread.yield();

// This will loop forever if CloudSqlInstance does not successfully retry after a failed
// This will loop forever if CloudSqlConnectorInfoCache does not successfully retry after a
// failed
// forceRefresh() attempt.
while (true) {
try {
// Attempt to get sslData 3 times, simultaneously, in different threads.
ListenableFuture<List<Object>> allData2 =
Futures.allAsList(
executor.submit(instance::getSslData),
executor.submit(instance::getSslData),
executor.submit(instance::getSslData));
executor.submit(cache::getSslData),
executor.submit(cache::getSslData),
executor.submit(cache::getSslData));

// Wait for all to finish.
allData2.get();
Expand Down Expand Up @@ -143,13 +145,13 @@ public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Ex
Futures.immediateFuture(mockAdminApi.getClientKeyPair());
ListeningScheduledExecutorService executor = CoreSocketFactory.getDefaultExecutor();
TestDataSupplier supplier = new TestDataSupplier(false);
List<CloudSqlInstance> instances = new ArrayList<>();
List<CloudSqlConnectorInfoCache> cloudSqlConnectorInfoCaches = new ArrayList<>();

final int instanceCount = 5;

for (int i = 0; i < instanceCount; i++) {
instances.add(
new CloudSqlInstance(
cloudSqlConnectorInfoCaches.add(
new CloudSqlConnectorInfoCache(
"a:b:instance" + i,
supplier,
AuthType.PASSWORD,
Expand All @@ -160,7 +162,7 @@ public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Ex
}

// Get SSL Data for each instance, forcing the first refresh to complete.
instances.forEach(CloudSqlInstance::getSslData);
cloudSqlConnectorInfoCaches.forEach(CloudSqlConnectorInfoCache::getSslData);

assertThat(supplier.counter.get()).isEqualTo(instanceCount);

Expand All @@ -170,7 +172,9 @@ public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Ex
// Start a thread for each instance that will force refresh and get InstanceData
// 50 times.
List<Thread> threads =
instances.stream().map(this::startForceRefreshThread).collect(Collectors.toList());
cloudSqlConnectorInfoCaches.stream()
.map(this::startForceRefreshThread)
.collect(Collectors.toList());

for (Thread t : threads) {
// If threads don't complete in 10 seconds, throw an exception,
Expand All @@ -180,26 +184,26 @@ public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Ex

// Check if there is a scheduled future
int brokenLoop = 0;
for (CloudSqlInstance inst : instances) {
if (inst.getCurrent().isDone() && inst.getNext().isDone()) {
logger.warning("No future scheduled thing for instance " + inst.getInstanceName());
for (CloudSqlConnectorInfoCache cache : cloudSqlConnectorInfoCaches) {
if (cache.getCurrent().isDone() && cache.getNext().isDone()) {
logger.warning("No future scheduled thing for instance " + cache.getInstanceName());
brokenLoop++;
}
}
assertThat(brokenLoop).isEqualTo(0);
}

private Thread startForceRefreshThread(CloudSqlInstance inst) {
private Thread startForceRefreshThread(CloudSqlConnectorInfoCache cache) {
Runnable forceRefreshRepeat =
() -> {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(100);
inst.forceRefresh();
inst.forceRefresh();
inst.forceRefresh();
cache.forceRefresh();
cache.forceRefresh();
cache.forceRefresh();
Thread.yield();
inst.getSslData();
cache.getSslData();
} catch (Exception e) {
logger.info("Exception in force refresh loop.");
}
Expand All @@ -208,7 +212,7 @@ private Thread startForceRefreshThread(CloudSqlInstance inst) {
};

Thread t = new Thread(forceRefreshRepeat);
t.setName("test-" + inst.getInstanceName());
t.setName("test-" + cache.getInstanceName());
t.start();
return t;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class CloudSqlInstanceNameTest {
public class CloudSqlConnectorInfoCacheNameTest {

@Test
public void parseStandardConnectionName() {
Expand Down
Loading

0 comments on commit 188a5bf

Please sign in to comment.