Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Kafka REST from scratch #74

Merged
merged 41 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f492480
Add openapi generation and implement basic endpoint
rohitsanj Oct 3, 2024
a82e374
Implement remaining endpoints
rohitsanj Oct 3, 2024
6699962
add api spec
rohitsanj Oct 3, 2024
64dc693
Add cluster api and integration tests
rohitsanj Oct 3, 2024
caf5f85
fix null authorizedOperations
rohitsanj Oct 3, 2024
3e80734
Add metadata and related urls to response
rohitsanj Oct 3, 2024
2c8a5ef
Introduce AdminClients to cache AdminClient by connection id and clus…
rohitsanj Oct 7, 2024
7440295
refactor
rohitsanj Oct 7, 2024
4d51839
broken wide
rohitsanj Oct 8, 2024
a89a351
wip: needs refactor
rohitsanj Oct 8, 2024
72c5cb9
works again
rohitsanj Oct 8, 2024
27b82b7
use constant var
rohitsanj Oct 8, 2024
8130b99
Add tests for cluster API
rohitsanj Oct 8, 2024
68dec01
Support confluent local using internal kafka rest
rohitsanj Oct 8, 2024
b54b8ee
revert file changes
rohitsanj Oct 8, 2024
e73bfee
make util cass final
rohitsanj Oct 8, 2024
525e6d3
Add docstrings
rohitsanj Oct 8, 2024
ac2d8cc
fix
rohitsanj Oct 8, 2024
2f58138
Reduce AdminClient request timeout to 10 seconds and handle exception
rohitsanj Oct 8, 2024
a7dce67
Use Caffeine to do time-based eviction on AdminClient instances
rohitsanj Oct 8, 2024
2daf45d
remove unused import
rohitsanj Oct 8, 2024
b76bbef
remove TODO
rohitsanj Oct 8, 2024
539b6e9
Put unrelated openapi spec changes back in place
rohitsanj Oct 8, 2024
5a3434e
Add exception mapper for UnsupportedOperationException
rohitsanj Oct 8, 2024
18161f1
Set AdminClient timeout to 3 seconds
rohitsanj Oct 8, 2024
b6134d9
Implement Kafka REST from scratch
rohitsanj Oct 8, 2024
f7f7940
Merge branch 'rohitsanj/implement-kafka-rest' of github.com:confluent…
rohitsanj Oct 8, 2024
7c04ed8
Wait until kafka-rest has started up
rohitsanj Oct 8, 2024
8c1d716
Hide internal kafka routes
rohitsanj Oct 9, 2024
d03de7a
Incorporate PR feedback
rohitsanj Oct 9, 2024
f1b70b4
Fix graalvm/caffeine compilation
rohitsanj Oct 9, 2024
0c7ecdd
revert openapi files
rohitsanj Oct 9, 2024
ef08c12
remove newline
rohitsanj Oct 10, 2024
dfd7f74
Merge branch 'main' into rohitsanj/implement-kafka-rest
rohitsanj Oct 10, 2024
96fa116
Add exception mapper for kafka ApiException
rohitsanj Oct 10, 2024
1dc5bc1
Set content-length only if transfer-encoding is not set
rohitsanj Oct 10, 2024
d3eb11f
Support camelCase includeAuthorizedOperations query param in list topics
rohitsanj Oct 10, 2024
c773b66
remove unused params
rohitsanj Oct 10, 2024
05fb843
revert processProxyResponse changes
rohitsanj Oct 10, 2024
26928af
fix wording
rohitsanj Oct 10, 2024
f2a04cc
Address reviewer feedback
rohitsanj Oct 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,545 changes: 4,545 additions & 0 deletions kafka-rest.openapi.yaml

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,62 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator-maven-plugin</artifactId>
<version>7.8.0</version>
<executions>
<execution>
<id>kafka-rest-spec</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<inputSpec>kafka-rest.openapi.yaml</inputSpec>
<generatorName>jaxrs-spec</generatorName>
<generateSupportingFiles>false</generateSupportingFiles>
<addCompileSourceRoot>true</addCompileSourceRoot>
<apisToGenerate>ClusterV3,TopicV3,PartitionV3</apisToGenerate>
<configOptions>
<interfaceOnly>true</interfaceOnly>
<generatePom>false</generatePom>
<library>quarkus</library>
<useMicroProfileOpenAPIAnnotations>false</useMicroProfileOpenAPIAnnotations>
<apiPackage>io.confluent.idesidecar.restapi.kafkarest.api</apiPackage>
<modelPackage>io.confluent.idesidecar.restapi.kafkarest.model</modelPackage>
<generateBuilders>true</generateBuilders>
<useMutiny>true</useMutiny>
<supportAsync>true</supportAsync>
<useJakartaEe>true</useJakartaEe>
<useSwaggerAnnotations>false</useSwaggerAnnotations>
<openApiNullable>false</openApiNullable>
<useTags>true</useTags>
</configOptions>
<enablePostProcessFile>true</enablePostProcessFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/openapi/src/gen/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.confluent.idesidecar.restapi.cache;

import com.github.benmanes.caffeine.cache.CaffeineSpec;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Create an ApplicationScoped bean to cache AdminClient instances by connection ID and client ID.
*/
@ApplicationScoped
public class AdminClients extends Clients<AdminClient> {

// Evict cached AdminClient instances after 5 minutes of inactivity
private static final String CAFFEINE_SPEC = "expireAfterAccess=5m";

@Inject
ClusterCache clusterCache;

@ConfigProperty(name = "ide-sidecar.admin-client-configs")
Map<String, String> adminClientSidecarConfigs;

public AdminClients() {
super(CaffeineSpec.parse(CAFFEINE_SPEC));
}

/**
* Get an AdminClient for the given connection ID and Kafka cluster ID.
* If the client does not already exist, it will be created.
* @param connectionId The connection ID
* @param clusterId The cluster ID
* @return The AdminClient
*/
public AdminClient getClient(String connectionId, String clusterId) {
return getClient(
connectionId,
clusterId,
() -> AdminClient.create(getAdminClientConfig(connectionId, clusterId))
);
}

private Properties getAdminClientConfig(String connectionId, String clusterId) {
var cluster = clusterCache.getKafkaCluster(connectionId, clusterId);
var props = new Properties();
// Set AdminClient configs provided by the sidecar
props.putAll(adminClientSidecarConfigs);
props.put("bootstrap.servers", cluster.bootstrapServers());
return props;
}
}
140 changes: 140 additions & 0 deletions src/main/java/io/confluent/idesidecar/restapi/cache/Clients.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package io.confluent.idesidecar.restapi.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import com.github.benmanes.caffeine.cache.RemovalCause;
import io.confluent.idesidecar.restapi.connections.ConnectionState;
import io.confluent.idesidecar.restapi.events.Lifecycle;
import io.quarkus.logging.Log;
import jakarta.enterprise.event.ObservesAsync;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* Utilities to obtain and cache clients for a given connection and client ID.
*/
public abstract class Clients<T extends AutoCloseable> {

/**
* Caffeine spec to use for the client cache. By default, this will be an empty spec.
* See the
* <a href="https://github.com/ben-manes/caffeine/wiki/Specification">Caffeine Spec</a>
* for more information on the format. Inherited classes may override this method
* and return a different spec.
*/
private final CaffeineSpec caffeineSpec;

/**
* Store an instance of a Caffeine cache for each connection. The cache will store
* clients by client ID and its policy may be configured by the {@link #caffeineSpec}.
*/
private final Map<String, Cache<String, T>> clientsByIdByConnections =
new ConcurrentHashMap<>();

protected Clients(CaffeineSpec caffeineSpec) {
this.caffeineSpec = caffeineSpec;
}

protected Clients() {
this(CaffeineSpec.parse(""));
}

/**
* Get a client for the given connection and client ID. If the client does not
* already exist, it will be created using the provided factory.
*
* @param connectionId the ID of the connection
* @param clientId the identifier of the client
* @param factory the method that will create the client if there is not already one
* @return the client
*/
public T getClient(
String connectionId,
String clientId,
Supplier<T> factory
) {
return clientsForConnection(connectionId).asMap().computeIfAbsent(
clientId,
k -> factory.get()
);
}
rohitsanj marked this conversation as resolved.
Show resolved Hide resolved

private Cache<String, T> clientsForConnection(String connectionId) {
return clientsByIdByConnections.computeIfAbsent(connectionId, k -> createCache());
}

int clientCount() {
return clientsByIdByConnections
.values()
.stream()
.map(Cache::asMap)
.map(Map::size)
.mapToInt(Integer::intValue)
.sum();
}
flippingbits marked this conversation as resolved.
Show resolved Hide resolved

int clientCount(String connectionId) {
return clientsForConnection(connectionId).asMap().size();
}

void clearClients(String connectionId) {
var oldCache = clientsByIdByConnections.put(connectionId, createCache());
if (oldCache != null) {
// Invalidation will trigger the removal listener, which will close the clients
oldCache.invalidateAll();
}
}

void handleRemoval(String key, T value, RemovalCause cause) {
try {
if (value != null) {
Log.debugf("Closing client %s", value);
value.close();
}
} catch (Throwable t) {
Log.debugf("Error closing client %s: %s", value, t);
// Ignore these as we don't care
}
}

private Cache<String, T> createCache() {
return Caffeine
.from(caffeineSpec)
.removalListener(this::handleRemoval)
.build();
}

/**
* Respond to the connection being disconnected by clearing and closing the
* clients that were cached for that connection.
*
* @param connection the connection that was disconnected
*/
void onConnectionDisconnected(
@ObservesAsync @Lifecycle.Disconnected ConnectionState connection
) {
clearClients(connection.getId());
}

/**
* Respond to the connection being deleted by clearing and closing the
* Schema Registry clients that were cached for that connection.
*
* @param connection the connection that was deleted
*/
void onConnectionDeleted(@ObservesAsync @Lifecycle.Deleted ConnectionState connection) {
clearClients(connection.getId());
}

/**
* Respond to the connection being updated by clearing and closing the
* clients that were cached for that connection. This ensures that the clients
* don't use stale connection information.
* @param connection the connection that was updated
*/
void onConnectionUpdated(@ObservesAsync @Lifecycle.Updated ConnectionState connection) {
clearClients(connection.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.Duration;
import java.util.Deque;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -91,6 +92,22 @@ public KafkaCluster getKafkaCluster(String connectionId, String clusterId) {
return forConnection(connectionId).getKafkaCluster(clusterId);
}

/**
* Find the first Kafka cluster accessible over the specified connection. This is useful when
* it is known that there is only one Kafka cluster per connection.
* @param connectionId the ID of the connection
* @return the info for the first Kafka cluster, or null if none found
*/
public Optional<KafkaCluster> getKafkaClusterForConnection(String connectionId) {
return forConnection(connectionId)
.kafkaClusters
.values()
.stream()
.findFirst()
.map(ClusterInfo::spec);
}


/**
* Find the cluster info for the schema registry that is associated with the given Kafka cluster,
* accessible over the specified connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.confluent.idesidecar.restapi.cache;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import jakarta.enterprise.context.ApplicationScoped;

/**
* Create an ApplicationScoped bean to cache SchemaRegistryClient instances
* by connection ID and schema registry client ID.
*/
@ApplicationScoped
public class SchemaRegistryClients extends Clients<SchemaRegistryClient> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.jboss.resteasy.reactive.server.ServerExceptionMapper;

/**
Expand Down Expand Up @@ -180,4 +184,83 @@ public Response mapInvalidInputException(InvalidInputException exception) {
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}


@ServerExceptionMapper
public Response mapUnknownTopicException(UnknownTopicOrPartitionException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.NOT_FOUND.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.NOT_FOUND)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapTopicAlreadyExistsException(TopicExistsException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.CONFLICT.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.CONFLICT)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapClusterNotFoundException(ClusterNotFoundException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.NOT_FOUND.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.NOT_FOUND)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapKafkaTimeoutException(TimeoutException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.REQUEST_TIMEOUT.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.REQUEST_TIMEOUT)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapUnsupportedException(UnsupportedOperationException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.NOT_IMPLEMENTED.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.NOT_IMPLEMENTED)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapApiException(ApiException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.INTERNAL_SERVER_ERROR.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.INTERNAL_SERVER_ERROR)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}
}
Loading