Skip to content

Commit

Permalink
Implement Kafka REST from scratch (#74)
Browse files Browse the repository at this point in the history
* Add openapi generation and implement basic endpoint

* Implement remaining endpoints

* add api spec

* Add cluster api and integration tests

* fix null authorizedOperations

* Add metadata and related urls to response

* Introduce AdminClients to cache AdminClient by connection id and cluster id

* refactor

* broken wide

* wip: needs refactor

* works again

* use constant var

* Add tests for cluster API

* Support confluent local using internal kafka rest

* revert file changes

* make util cass final

* Add docstrings

* fix

* Reduce AdminClient request timeout to 10 seconds and handle exception

* Use Caffeine to do time-based eviction on AdminClient instances

* remove unused import

* remove TODO

* Put unrelated openapi spec changes back in place

* Add exception mapper for UnsupportedOperationException

* Set AdminClient timeout to 3 seconds

* Implement Kafka REST from scratch

* Add openapi generation and implement basic endpoint

* Implement remaining endpoints

* add api spec

* Add cluster api and integration tests

* fix null authorizedOperations

* Add metadata and related urls to response

* Introduce AdminClients to cache AdminClient by connection id and cluster id

* refactor

* broken wide

* wip: needs refactor

* works again

* use constant var

* Add tests for cluster API

* Support confluent local using internal kafka rest

* revert file changes

* make util cass final

* Add docstrings

* fix

* Reduce AdminClient request timeout to 10 seconds and handle exception

* Use Caffeine to do time-based eviction on AdminClient instances

* remove unused import

* remove TODO

* Put unrelated openapi spec changes back in place

* Add exception mapper for UnsupportedOperationException

* Wait until kafka-rest has started up

* Hide internal kafka routes

* Incorporate PR feedback

* Fix graalvm/caffeine compilation

* revert openapi files

* remove newline

* Add exception mapper for kafka ApiException

* Set content-length only if transfer-encoding is not set

* Support camelCase includeAuthorizedOperations query param in list topics

* remove unused params

* revert processProxyResponse changes

* fix wording

* Address reviewer feedback
  • Loading branch information
rohitsanj authored Oct 11, 2024
1 parent 777f155 commit b239bbf
Show file tree
Hide file tree
Showing 36 changed files with 6,091 additions and 373 deletions.
4,545 changes: 4,545 additions & 0 deletions kafka-rest.openapi.yaml

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,62 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.openapitools</groupId>
<artifactId>openapi-generator-maven-plugin</artifactId>
<version>7.8.0</version>
<executions>
<execution>
<id>kafka-rest-spec</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<inputSpec>kafka-rest.openapi.yaml</inputSpec>
<generatorName>jaxrs-spec</generatorName>
<generateSupportingFiles>false</generateSupportingFiles>
<addCompileSourceRoot>true</addCompileSourceRoot>
<apisToGenerate>ClusterV3,TopicV3,PartitionV3</apisToGenerate>
<configOptions>
<interfaceOnly>true</interfaceOnly>
<generatePom>false</generatePom>
<library>quarkus</library>
<useMicroProfileOpenAPIAnnotations>false</useMicroProfileOpenAPIAnnotations>
<apiPackage>io.confluent.idesidecar.restapi.kafkarest.api</apiPackage>
<modelPackage>io.confluent.idesidecar.restapi.kafkarest.model</modelPackage>
<generateBuilders>true</generateBuilders>
<useMutiny>true</useMutiny>
<supportAsync>true</supportAsync>
<useJakartaEe>true</useJakartaEe>
<useSwaggerAnnotations>false</useSwaggerAnnotations>
<openApiNullable>false</openApiNullable>
<useTags>true</useTags>
</configOptions>
<enablePostProcessFile>true</enablePostProcessFile>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/openapi/src/gen/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.confluent.idesidecar.restapi.cache;

import com.github.benmanes.caffeine.cache.CaffeineSpec;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.eclipse.microprofile.config.inject.ConfigProperty;

/**
* Create an ApplicationScoped bean to cache AdminClient instances by connection ID and client ID.
*/
@ApplicationScoped
public class AdminClients extends Clients<AdminClient> {

// Evict cached AdminClient instances after 5 minutes of inactivity
private static final String CAFFEINE_SPEC = "expireAfterAccess=5m";

@Inject
ClusterCache clusterCache;

@ConfigProperty(name = "ide-sidecar.admin-client-configs")
Map<String, String> adminClientSidecarConfigs;

public AdminClients() {
super(CaffeineSpec.parse(CAFFEINE_SPEC));
}

/**
* Get an AdminClient for the given connection ID and Kafka cluster ID.
* If the client does not already exist, it will be created.
* @param connectionId The connection ID
* @param clusterId The cluster ID
* @return The AdminClient
*/
public AdminClient getClient(String connectionId, String clusterId) {
return getClient(
connectionId,
clusterId,
() -> AdminClient.create(getAdminClientConfig(connectionId, clusterId))
);
}

private Properties getAdminClientConfig(String connectionId, String clusterId) {
var cluster = clusterCache.getKafkaCluster(connectionId, clusterId);
var props = new Properties();
// Set AdminClient configs provided by the sidecar
props.putAll(adminClientSidecarConfigs);
props.put("bootstrap.servers", cluster.bootstrapServers());
return props;
}
}
140 changes: 140 additions & 0 deletions src/main/java/io/confluent/idesidecar/restapi/cache/Clients.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package io.confluent.idesidecar.restapi.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
import com.github.benmanes.caffeine.cache.RemovalCause;
import io.confluent.idesidecar.restapi.connections.ConnectionState;
import io.confluent.idesidecar.restapi.events.Lifecycle;
import io.quarkus.logging.Log;
import jakarta.enterprise.event.ObservesAsync;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* Utilities to obtain and cache clients for a given connection and client ID.
*/
public abstract class Clients<T extends AutoCloseable> {

/**
* Caffeine spec to use for the client cache. By default, this will be an empty spec.
* See the
* <a href="https://github.com/ben-manes/caffeine/wiki/Specification">Caffeine Spec</a>
* for more information on the format. Inherited classes may override this method
* and return a different spec.
*/
private final CaffeineSpec caffeineSpec;

/**
* Store an instance of a Caffeine cache for each connection. The cache will store
* clients by client ID and its policy may be configured by the {@link #caffeineSpec}.
*/
private final Map<String, Cache<String, T>> clientsByIdByConnections =
new ConcurrentHashMap<>();

protected Clients(CaffeineSpec caffeineSpec) {
this.caffeineSpec = caffeineSpec;
}

protected Clients() {
this(CaffeineSpec.parse(""));
}

/**
* Get a client for the given connection and client ID. If the client does not
* already exist, it will be created using the provided factory.
*
* @param connectionId the ID of the connection
* @param clientId the identifier of the client
* @param factory the method that will create the client if there is not already one
* @return the client
*/
public T getClient(
String connectionId,
String clientId,
Supplier<T> factory
) {
return clientsForConnection(connectionId).asMap().computeIfAbsent(
clientId,
k -> factory.get()
);
}

private Cache<String, T> clientsForConnection(String connectionId) {
return clientsByIdByConnections.computeIfAbsent(connectionId, k -> createCache());
}

int clientCount() {
return clientsByIdByConnections
.values()
.stream()
.map(Cache::asMap)
.map(Map::size)
.mapToInt(Integer::intValue)
.sum();
}

int clientCount(String connectionId) {
return clientsForConnection(connectionId).asMap().size();
}

void clearClients(String connectionId) {
var oldCache = clientsByIdByConnections.put(connectionId, createCache());
if (oldCache != null) {
// Invalidation will trigger the removal listener, which will close the clients
oldCache.invalidateAll();
}
}

void handleRemoval(String key, T value, RemovalCause cause) {
try {
if (value != null) {
Log.debugf("Closing client %s", value);
value.close();
}
} catch (Throwable t) {
Log.debugf("Error closing client %s: %s", value, t);
// Ignore these as we don't care
}
}

private Cache<String, T> createCache() {
return Caffeine
.from(caffeineSpec)
.removalListener(this::handleRemoval)
.build();
}

/**
* Respond to the connection being disconnected by clearing and closing the
* clients that were cached for that connection.
*
* @param connection the connection that was disconnected
*/
void onConnectionDisconnected(
@ObservesAsync @Lifecycle.Disconnected ConnectionState connection
) {
clearClients(connection.getId());
}

/**
* Respond to the connection being deleted by clearing and closing the
* Schema Registry clients that were cached for that connection.
*
* @param connection the connection that was deleted
*/
void onConnectionDeleted(@ObservesAsync @Lifecycle.Deleted ConnectionState connection) {
clearClients(connection.getId());
}

/**
* Respond to the connection being updated by clearing and closing the
* clients that were cached for that connection. This ensures that the clients
* don't use stale connection information.
* @param connection the connection that was updated
*/
void onConnectionUpdated(@ObservesAsync @Lifecycle.Updated ConnectionState connection) {
clearClients(connection.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.Duration;
import java.util.Deque;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
Expand Down Expand Up @@ -91,6 +92,22 @@ public KafkaCluster getKafkaCluster(String connectionId, String clusterId) {
return forConnection(connectionId).getKafkaCluster(clusterId);
}

/**
* Find the first Kafka cluster accessible over the specified connection. This is useful when
* it is known that there is only one Kafka cluster per connection.
* @param connectionId the ID of the connection
* @return the info for the first Kafka cluster, or null if none found
*/
public Optional<KafkaCluster> getKafkaClusterForConnection(String connectionId) {
return forConnection(connectionId)
.kafkaClusters
.values()
.stream()
.findFirst()
.map(ClusterInfo::spec);
}


/**
* Find the cluster info for the schema registry that is associated with the given Kafka cluster,
* accessible over the specified connection.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.confluent.idesidecar.restapi.cache;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import jakarta.enterprise.context.ApplicationScoped;

/**
* Create an ApplicationScoped bean to cache SchemaRegistryClient instances
* by connection ID and schema registry client ID.
*/
@ApplicationScoped
public class SchemaRegistryClients extends Clients<SchemaRegistryClient> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.jboss.resteasy.reactive.server.ServerExceptionMapper;

/**
Expand Down Expand Up @@ -180,4 +184,83 @@ public Response mapInvalidInputException(InvalidInputException exception) {
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}


@ServerExceptionMapper
public Response mapUnknownTopicException(UnknownTopicOrPartitionException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.NOT_FOUND.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.NOT_FOUND)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapTopicAlreadyExistsException(TopicExistsException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.CONFLICT.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.CONFLICT)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapClusterNotFoundException(ClusterNotFoundException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.NOT_FOUND.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.NOT_FOUND)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapKafkaTimeoutException(TimeoutException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.REQUEST_TIMEOUT.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.REQUEST_TIMEOUT)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapUnsupportedException(UnsupportedOperationException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.NOT_IMPLEMENTED.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.NOT_IMPLEMENTED)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}

@ServerExceptionMapper
public Response mapApiException(ApiException exception) {
var error = io.confluent.idesidecar.restapi.kafkarest.model.Error
.builder()
.errorCode(Status.INTERNAL_SERVER_ERROR.getStatusCode())
.message(exception.getMessage()).build();
return Response
.status(Status.INTERNAL_SERVER_ERROR)
.entity(error)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON)
.build();
}
}
Loading

0 comments on commit b239bbf

Please sign in to comment.