Skip to content

Commit

Permalink
Rename ConnectionResourceLock
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Dec 21, 2024
1 parent 95bb5aa commit 76bd14e
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ResourceLock;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
Expand Down Expand Up @@ -67,7 +67,7 @@ public final class ProxyDatabaseConnectionManager implements DatabaseConnectionM

private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();

private final ResourceLock resourceLock = new ResourceLock();
private final ConnectionResourceLock connectionResourceLock = new ConnectionResourceLock();

private final AtomicBoolean closed = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.concurrent.locks.ReentrantLock;

/**
* Resource lock.
* Connection resource lock.
*/
public final class ResourceLock {
public final class ConnectionResourceLock {

private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@

@ExtendWith(AutoMockExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class ResourceLockTest {
class ConnectionResourceLockTest {

@Mock
private ChannelHandlerContext channelHandlerContext;
Expand All @@ -49,18 +49,18 @@ class ResourceLockTest {
private Channel channel;

@Mock
private ResourceLock resourceLock;
private ConnectionResourceLock connectionResourceLock;

@Test
void assertDoAwait() throws NoSuchFieldException, IllegalAccessException {
when(channel.isWritable()).thenReturn(false);
when(channel.isActive()).thenReturn(true);
when(channelHandlerContext.channel()).thenReturn(channel);
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> resourceLock.doAwait(channelHandlerContext));
executorService.submit(() -> connectionResourceLock.doAwait(channelHandlerContext));
Awaitility.await().pollDelay(200L, TimeUnit.MILLISECONDS).until(() -> true);
Plugins.getMemberAccessor().set(ResourceLock.class.getDeclaredField("condition"), resourceLock, new ReentrantLock().newCondition());
verify(resourceLock, times(1)).doAwait(channelHandlerContext);
Plugins.getMemberAccessor().set(ConnectionResourceLock.class.getDeclaredField("condition"), connectionResourceLock, new ReentrantLock().newCondition());
verify(connectionResourceLock, times(1)).doAwait(channelHandlerContext);
}

@Test
Expand All @@ -72,9 +72,9 @@ void assertDoNotify() {
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
Awaitility.await().pollDelay(50L, TimeUnit.MILLISECONDS).until(() -> true);
resourceLock.doNotify();
connectionResourceLock.doNotify();
});
resourceLock.doAwait(channelHandlerContext);
connectionResourceLock.doAwait(channelHandlerContext);
assertTrue(System.currentTimeMillis() >= startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void closeAllResources() {
@Override
public void channelWritabilityChanged(final ChannelHandlerContext context) {
if (context.channel().isWritable()) {
connectionSession.getDatabaseConnectionManager().getResourceLock().doNotify();
connectionSession.getDatabaseConnectionManager().getConnectionResourceLock().doNotify();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void writeQueryData(final ChannelHandlerContext context,
int flushThreshold = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
while (queryCommandExecutor.next()) {
count++;
databaseConnectionManager.getResourceLock().doAwait(context);
databaseConnectionManager.getConnectionResourceLock().doAwait(context);
DatabasePacket dataValue = queryCommandExecutor.getQueryRowPacket();
context.write(dataValue);
if (flushThreshold == count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private long writeDataPackets(final ChannelHandlerContext context, final ProxyDa
.getContextManager().getMetaDataContexts().getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.PROXY_FRONTEND_FLUSH_THRESHOLD);
while (queryCommandExecutor.next()) {
flushCount++;
databaseConnectionManager.getResourceLock().doAwait(context);
databaseConnectionManager.getConnectionResourceLock().doAwait(context);
DatabasePacket resultValue = queryCommandExecutor.getQueryRowPacket();
context.write(resultValue);
if (proxyFrontendFlushThreshold == flushCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ResourceLock;
import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.backend.session.transaction.TransactionStatus;
Expand Down Expand Up @@ -114,9 +114,9 @@ void assertWriteQueryDataWithHasNextResult() throws SQLException {
when(channel.isActive()).thenReturn(true);
when(queryCommandExecutor.next()).thenReturn(true, false);
when(channel.isWritable()).thenReturn(false, true);
ResourceLock resourceLock = mock(ResourceLock.class);
ConnectionResourceLock connectionResourceLock = mock(ConnectionResourceLock.class);
ProxyDatabaseConnectionManager databaseConnectionManager = mock(ProxyDatabaseConnectionManager.class);
when(databaseConnectionManager.getResourceLock()).thenReturn(resourceLock);
when(databaseConnectionManager.getConnectionResourceLock()).thenReturn(connectionResourceLock);
when(databaseConnectionManager.getConnectionSession()).thenReturn(connectionSession);
PostgreSQLPacket packet = mock(PostgreSQLPacket.class);
when(queryCommandExecutor.getQueryRowPacket()).thenReturn(packet);
Expand All @@ -127,7 +127,7 @@ void assertWriteQueryDataWithHasNextResult() throws SQLException {
new ShardingSphereMetaData()), computeNodeInstanceContext, mock(PersistRepository.class));
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
commandExecuteEngine.writeQueryData(channelHandlerContext, databaseConnectionManager, queryCommandExecutor, 0);
verify(resourceLock).doAwait(channelHandlerContext);
verify(connectionResourceLock).doAwait(channelHandlerContext);
verify(channelHandlerContext).write(packet);
verify(channelHandlerContext).write(isA(PostgreSQLCommandCompletePacket.class));
verify(channelHandlerContext).write(isA(PostgreSQLReadyForQueryPacket.class));
Expand Down

0 comments on commit 76bd14e

Please sign in to comment.