Skip to content

Commit

Permalink
Apply comment
Browse files Browse the repository at this point in the history
Issue: #6774
  • Loading branch information
YongGoose committed Mar 3, 2025
1 parent d4df569 commit f308be6
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ public class NettyClientBootstrap implements RemotingBootstrap {
private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";

private static EventLoopGroup eventLoopGroupWorker;

private final NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final NettyPoolKey.TransactionRole transactionRole;
private final EventLoopGroup eventLoopGroupWorker;

private EventExecutorGroup defaultEventExecutorGroup;
private ChannelHandler[] channelHandlers;
Expand All @@ -86,9 +86,9 @@ public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExec

boolean enableClientSharedEventLoop = this.nettyClientConfig.getEnableClientSharedEventLoop();
if (enableClientSharedEventLoop) {
this.eventLoopGroupWorker = getOrCreateEventLoopGroupWorker(selectorThreadSizeThreadSize);
eventLoopGroupWorker = getOrCreateEventLoopGroupWorker(selectorThreadSizeThreadSize);
} else {
this.eventLoopGroupWorker = createEventLoopGroupWorker(selectorThreadSizeThreadSize);
eventLoopGroupWorker = createEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
this.defaultEventExecutorGroup = eventExecutorGroup;
}
Expand Down Expand Up @@ -123,7 +123,7 @@ public void start() {
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
this.bootstrap.group(this.eventLoopGroupWorker).channel(
this.bootstrap.group(eventLoopGroupWorker).channel(
nettyClientConfig.getClientChannelClazz()).option(
ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(
ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(
Expand Down Expand Up @@ -170,7 +170,7 @@ public void initChannel(SocketChannel ch) {
@Override
public void shutdown() {
try {
this.eventLoopGroupWorker.shutdownGracefully();
eventLoopGroupWorker.shutdownGracefully();
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();
}
Expand Down Expand Up @@ -235,10 +235,10 @@ private String getThreadPrefix(String threadPrefix) {
}

private EventLoopGroup getOrCreateEventLoopGroupWorker(int selectorThreadSizeThreadSize) {
if (this.eventLoopGroupWorker == null) {
if (eventLoopGroupWorker == null) {
return createEventLoopGroupWorker(selectorThreadSizeThreadSize);
}
return this.eventLoopGroupWorker;
return eventLoopGroupWorker;
}

private EventLoopGroup createEventLoopGroupWorker(int selectorThreadSizeThreadSize) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.apache.seata.core.rpc.netty;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class NettyClientBootstrapTest {

@Mock
private NettyClientConfig nettyClientConfig;
private DefaultEventExecutorGroup eventExecutorGroup;

@BeforeEach
void init() {
eventExecutorGroup = new DefaultEventExecutorGroup(1);
}

@Test
void testSharedEventLoopGroupEnabled() {
TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance();
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance();

tmNettyRemotingClient.init();
rmNettyRemotingClient.init();

NettyClientBootstrap tmBootstrap = getNettyClientBootstrap(tmNettyRemotingClient);
NettyClientBootstrap rmBootstrap = getNettyClientBootstrap(rmNettyRemotingClient);

EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmBootstrap);
EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmBootstrap);

Assertions.assertEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker);
}

@Test
void testSharedEventLoopGroupDisabled() {
when(nettyClientConfig.getEnableClientSharedEventLoop()).thenReturn(false);
NettyClientBootstrap tmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.TMROLE);
EventLoopGroup tmEventLoopGroupWorker = getEventLoopGroupWorker(tmNettyClientBootstrap);

NettyClientBootstrap rmNettyClientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, NettyPoolKey.TransactionRole.RMROLE);
EventLoopGroup rmEventLoopGroupWorker = getEventLoopGroupWorker(rmNettyClientBootstrap);

Assertions.assertNotEquals(tmEventLoopGroupWorker, rmEventLoopGroupWorker);
}

private NettyClientBootstrap getNettyClientBootstrap(AbstractNettyRemotingClient remotingClient) {
try {
java.lang.reflect.Field field = AbstractNettyRemotingClient.class.getDeclaredField("clientBootstrap");
field.setAccessible(true);
return (NettyClientBootstrap) field.get(remotingClient);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private EventLoopGroup getEventLoopGroupWorker(NettyClientBootstrap bootstrap) {
try {
java.lang.reflect.Field field = NettyClientBootstrap.class.getDeclaredField("eventLoopGroupWorker");
field.setAccessible(true);
return (EventLoopGroup) field.get(bootstrap);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit f308be6

Please sign in to comment.