Skip to content

Commit

Permalink
[proxy] Do not share the same connection among different clients (str…
Browse files Browse the repository at this point in the history
…eamnative#101)

### Motivation

Currently, a `ConnectionPool` singleton is used to maintain connections
to broker. Each broker has its unique connection. Thus, for different
Kafka clients that produce messages to the same topic, only 1 connection
will be used. This connection might also be used to forward other
requests. It harms the concurrency. What's worse, each connection should
have different sessions (e.g. authentication role) for different
clients.

### Modifications

- Refactor `ConnectionPool` to `ConnectionFactory`, which no longer
maintains a connection cache.
- For each `KafkaProxyRequestHandler`, maintain the following
connections:
- `leaderBrokers`: the map that maps the unresolved address to the
connection of leader brokers that PRODUCE and FETCH requests might be
sent.
  - `metadataBroker`: the broker to query the metadata
  - `groupCoordinator`: the broker for group coordinator requests
  • Loading branch information
BewareMyPower authored Aug 23, 2023
1 parent ad67432 commit fdc6e32
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.proxy;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.RequiredArgsConstructor;

/**
* This class maintains connections of a Kafka client that could establish:
* 1. The connection for METADATA requests
* 2. The connection for group coordinator related requests.
* 3. The connection to leader brokers for PRODUCE and FETCH requests.
*/
@NotThreadSafe
@RequiredArgsConstructor
public class BrokerConnectionGroup {

private final ConnectionFactory connectionFactory;
private final Map<InetSocketAddress, ConnectionToBroker> leaderBrokers = new HashMap<>();
private ConnectionToBroker metadataBroker;
private ConnectionToBroker groupCoordinator;

public ConnectionToBroker getLeader(final InetSocketAddress leader) throws IOException {
var connectionToBroker = leaderBrokers.get(leader);
if (connectionToBroker == null) {
connectionToBroker = connectionFactory.getConnection(leader);
leaderBrokers.put(leader, connectionToBroker);
}
return connectionToBroker;
}

public ConnectionToBroker getMetadataBroker() throws IOException {
if (metadataBroker == null) {
metadataBroker = connectionFactory.getAnyConnection();
}
return metadataBroker;
}

public ConnectionToBroker getGroupCoordinator() throws IOException {
if (groupCoordinator == null) {
groupCoordinator = connectionFactory.getAnyConnection();
}
return groupCoordinator;
}

public void reset() {
leaderBrokers.clear();
metadataBroker = null;
groupCoordinator = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,27 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.netty.EventLoopUtil;

@Slf4j
class ConnectionPool {
class ConnectionFactory {

private final Map<InetSocketAddress, ConnectionToBroker> connectionToBrokers = new ConcurrentHashMap<>();
private final AtomicInteger id = new AtomicInteger(0);
private final Bootstrap bootstrap = new Bootstrap();
private final List<InetSocketAddress> addresses;
private final int connectTimeoutMs;
private final EventLoopGroup eventLoopGroup;
private final AddressResolver<InetSocketAddress> addressResolver;

ConnectionPool(final KafkaProxyConfiguration config, final EventLoopGroup eventLoopGroup,
final DnsAddressResolverGroup dnsAddressResolverGroup) {
ConnectionFactory(final KafkaProxyConfiguration config, final EventLoopGroup eventLoopGroup,
final DnsAddressResolverGroup dnsAddressResolverGroup) {
this.addresses = config.getKafkaBootstrapServers();
this.connectTimeoutMs = config.getBrokerProxyConnectTimeoutMs();
this.eventLoopGroup = eventLoopGroup;
this.addressResolver = dnsAddressResolverGroup.getResolver(eventLoopGroup.next());
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getBrokerProxyConnectTimeoutMs());
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(100 * 1024 * 1024/* 100 MB */, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ConnectionToBroker());
}
});

}

ConnectionToBroker getAnyConnection() throws IOException {
Expand All @@ -82,29 +66,24 @@ ConnectionToBroker getAnyConnection() throws IOException {
}

ConnectionToBroker getConnection(final InetSocketAddress unresolvedAddress) throws IOException {
try {
return connectionToBrokers.computeIfAbsent(unresolvedAddress, key -> {
final ConnectionToBroker connectionToBroker;
try {
connectionToBroker = createConnection(key);
} catch (IOException e) {
throw new RuntimeException(e);
}
connectionToBroker.register(unresolvedAddress, this);
return connectionToBroker;
});
} catch (RuntimeException e) {
throw (IOException) e.getCause();
}
}

void removeConnection(final InetSocketAddress address, final ConnectionToBroker connectionToBroker) {
connectionToBrokers.remove(address, connectionToBroker);
}

private ConnectionToBroker createConnection(final InetSocketAddress unresolvedAddress) throws IOException {
final var addresses = waitFuture(addressResolver.resolveAll(unresolvedAddress),
"Resolve " + unresolvedAddress);
final var bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT);

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(100 * 1024 * 1024/* 100 MB */, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ConnectionToBroker(unresolvedAddress));
}
});
final var registeredChannel = waitChannelFuture(bootstrap.register(), "Register channel");
for (int i = 0; i < addresses.size(); i++) {
final var address = addresses.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.requests.AbstractResponse;

@RequiredArgsConstructor
@Slf4j
public class ConnectionToBroker extends ChannelInboundHandlerAdapter {

private final Map<Integer, InflightRequest> pendingRequests = new ConcurrentHashMap<>();

// forwardRequest might be called in another thread, so here use volatile to ensure the safe access to ctx
private final AtomicBoolean closed = new AtomicBoolean(false);
@Getter
private final InetSocketAddress address;
private volatile ChannelHandlerContext ctx = null;
private volatile InetSocketAddress address;
private volatile ConnectionPool connectionPool;

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Expand All @@ -43,17 +46,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("[{}] Connection to broker is registered", ctx);
}

void register(final InetSocketAddress address, final ConnectionPool connectionPool) {
this.address = address;
this.connectionPool = connectionPool;
}

void unregister() {
if (address != null) {
connectionPool.removeConnection(address, this);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("[{}] Unexpected error in ConnectionToBroker", ctx, cause);
Expand Down Expand Up @@ -105,7 +97,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

private void close() {
if (closed.compareAndSet(false, true)) {
unregister();
pendingRequests.values().forEach(inflightRequest -> inflightRequest.fail(
new ConnectError("Connection is closed")));
pendingRequests.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
public class KafkaProxyChannelInitializer extends ChannelInitializer<SocketChannel> {

private final EndPoint advertisedEndPoint;
private final ConnectionPool connectionPool;
private final ConnectionFactory connectionFactory;

@Override
protected void initChannel(SocketChannel ch) throws Exception {
// TODO: support TLS
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(100 * 1024 * 1024/* 100 MB */, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new KafkaProxyRequestHandler(advertisedEndPoint, connectionPool));
ch.pipeline().addLast("handler", new KafkaProxyRequestHandler(advertisedEndPoint, connectionFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class KafkaProxyExtension implements ProxyExtension {

private KafkaProxyConfiguration config;
private ConnectionPool connectionPool;
private ConnectionFactory connectionFactory;

@Override
public String extensionName() {
Expand All @@ -56,7 +56,7 @@ public void start(ProxyService service) {
KopVersion.getBuildUser(),
KopVersion.getBuildHost(),
KopVersion.getBuildTime());
this.connectionPool = new ConnectionPool(config, service.getWorkerGroup(),
this.connectionFactory = new ConnectionFactory(config, service.getWorkerGroup(),
service.getDnsAddressResolverGroup());
}

Expand All @@ -73,7 +73,7 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
e -> e.getValue().getInetAddress(),
e -> {
final var advertisedEndPoint = advertisedListeners.get(e.getKey());
return new KafkaProxyChannelInitializer(advertisedEndPoint, connectionPool);
return new KafkaProxyChannelInitializer(advertisedEndPoint, connectionFactory);
}
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -84,16 +83,15 @@ public class KafkaProxyRequestHandler extends ChannelInboundHandlerAdapter {
private final AtomicBoolean isActive = new AtomicBoolean(false);
private final Node selfNode;
private final List<Integer> replicaIds;
private final ConnectionPool connectionPool;
private final BrokerConnectionGroup connectionGroup;
private ChannelHandlerContext ctx;
private ConnectionToBroker groupCoordinator;

public KafkaProxyRequestHandler(final EndPoint advertisedEndPoint, final ConnectionPool connectionPool) {
public KafkaProxyRequestHandler(final EndPoint advertisedEndPoint, final ConnectionFactory connectionFactory) {
this.selfNode = new Node(Murmur3_32Hash.getInstance().makeHash(
(advertisedEndPoint.getHostname() + advertisedEndPoint.getPort()).getBytes(UTF_8)
), advertisedEndPoint.getHostname(), advertisedEndPoint.getPort());
this.replicaIds = Collections.singletonList(selfNode.id());
this.connectionPool = connectionPool;
this.connectionGroup = new BrokerConnectionGroup(connectionFactory);
}

@Override
Expand All @@ -120,12 +118,15 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
case PRODUCE -> handleProduce(inflightRequest);
case FIND_COORDINATOR -> handleFindCoordinator(inflightRequest);
case JOIN_GROUP, SYNC_GROUP, LEAVE_GROUP, OFFSET_FETCH, OFFSET_COMMIT, HEARTBEAT ->
getGroupCoordinator().ifPresent(__ -> __.forwardRequest(inflightRequest));
connectionGroup.getGroupCoordinator().forwardRequest(inflightRequest);
case LIST_OFFSETS -> handleListOffsets(inflightRequest);
case FETCH -> handleFetch(inflightRequest);
default -> inflightRequest.complete(inflightRequest.getRequest().getErrorResponse(
new UnsupportedVersionException("API " + apiKeys + " is not supported")));
}
} catch (IOException e) {
log.warn("{}", e.getMessage());
close(ctx);
} catch (Throwable throwable) {
log.error("[{}] Unexpected exception when handling request", ctx.channel(), throwable);
close(ctx);
Expand Down Expand Up @@ -160,6 +161,7 @@ private void close(final ChannelHandlerContext ctx) {
log.info("[{}] Close with {} pending requests", ctx, requestQueue.size());
}
requestQueue.clear();
connectionGroup.reset();
}
}

Expand Down Expand Up @@ -227,7 +229,7 @@ private void handleMetadata(final InflightRequest inflightRequest) throws IOExce
.setHost(selfNode.host()).setPort(selfNode.port()));
return metadataResponse;
});
connectionPool.getAnyConnection().forwardRequest(inflightRequest);
connectionGroup.getMetadataBroker().forwardRequest(inflightRequest);
}

private void handleProduce(final InflightRequest inflightRequest) throws IOException {
Expand Down Expand Up @@ -269,7 +271,7 @@ private void handleProduce(final InflightRequest inflightRequest) throws IOExcep
}
});
final var firstLeader = partitionDataMap.keySet().iterator().next();
connectionPool.getConnection(firstLeader).forwardRequest(inflightRequest, cacheRequest);
connectionGroup.getLeader(firstLeader).forwardRequest(inflightRequest, cacheRequest);
} else {
// TODO: split partitionDataMap to multiple requests
}
Expand Down Expand Up @@ -311,19 +313,6 @@ private void handleFindCoordinator(final InflightRequest inflightRequest) {
inflightRequest.complete(new FindCoordinatorResponse(data));
}

private Optional<ConnectionToBroker> getGroupCoordinator() {
if (groupCoordinator == null) {
try {
groupCoordinator = connectionPool.getAnyConnection();
} catch (IOException e) {
log.error("[{}] Failed to get connection to broker coordinator: {}", ctx, e.getMessage());
close(ctx);
return Optional.empty();
}
}
return Optional.of(groupCoordinator);
}

private void handleListOffsets(final InflightRequest inflightRequest) throws IOException {
if (inflightRequest.getHeader().apiVersion() == 0) {
// TODO: handle ListOffset request v0
Expand Down Expand Up @@ -357,7 +346,7 @@ private void handleListOffsetsV1OrAbove(final InflightRequest originalRequest, f

if (leaderToOffsetData.size() == 1) {
final var leader = leaderToOffsetData.keySet().iterator().next();
connectionPool.getConnection(leader).forwardRequest(originalRequest);
connectionGroup.getLeader(leader).forwardRequest(originalRequest);
} else {
// TODO: handle LIST_OFFSETS request for multiple partitions, including errorsMap
}
Expand Down Expand Up @@ -387,7 +376,7 @@ private void handleFetch(final InflightRequest inflightRequest) throws IOExcepti
}
if (fetchPartitionMap.size() == 1) {
final var leader = fetchPartitionMap.keySet().iterator().next();
connectionPool.getConnection(leader).forwardRequest(inflightRequest);
connectionGroup.getLeader(leader).forwardRequest(inflightRequest);
} else {
// TODO: split it to multiple requests to different brokers
}
Expand Down

0 comments on commit fdc6e32

Please sign in to comment.