diff --git a/pom.xml b/pom.xml index 01ba12d..7fa85b4 100644 --- a/pom.xml +++ b/pom.xml @@ -49,6 +49,7 @@ 1.7.0 4.8.6.2 2.43.0 + 0.3.0 @@ -97,6 +98,11 @@ ${embedded-pulsar.version} test + + io.github.openfacade + http-facade + ${http-facade.version} + diff --git a/pulsar-admin-common/src/main/java/io/github/protocol/pulsar/admin/common/JacksonService.java b/pulsar-admin-common/src/main/java/io/github/protocol/pulsar/admin/common/JacksonService.java index d6830aa..9bd4bd1 100644 --- a/pulsar-admin-common/src/main/java/io/github/protocol/pulsar/admin/common/JacksonService.java +++ b/pulsar-admin-common/src/main/java/io/github/protocol/pulsar/admin/common/JacksonService.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import java.io.IOException; import java.util.List; public class JacksonService { @@ -19,25 +20,29 @@ public class JacksonService { } public static String toJson(Object o) throws JsonProcessingException { - return MAPPER.writeValueAsString(o); + return o == null ? null : MAPPER.writeValueAsString(o); } - public static T toObject(String json, Class type) throws JsonProcessingException { - if (json == null || json.isEmpty()) { + public static byte[] toBytes(Object o) throws JsonProcessingException { + return o == null ? null : MAPPER.writeValueAsBytes(o); + } + + public static T toObject(byte[] json, Class type) throws IOException { + if (json == null || json.length == 0) { return null; } return MAPPER.readValue(json, type); } - public static T toRefer(String json, TypeReference ref) throws JsonProcessingException { - if (json == null || json.isEmpty()) { + public static T toRefer(byte[] json, TypeReference ref) throws IOException { + if (json == null || json.length == 0) { return null; } return MAPPER.readValue(json, ref); } - public static List toList(String json, TypeReference> typeRef) throws JsonProcessingException { - if (json == null || json.isEmpty()) { + public static List toList(byte[] json, TypeReference> typeRef) throws IOException { + if (json == null || json.length == 0) { return List.of(); } return MAPPER.readValue(json, typeRef); diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java index f32da0c..534d1e8 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/BaseTopicsImpl.java @@ -1,13 +1,14 @@ package io.github.protocol.pulsar.admin.jdk; import com.fasterxml.jackson.core.type.TypeReference; +import io.github.openfacade.http.HttpResponse; import io.github.protocol.pulsar.admin.common.JacksonService; import java.io.IOException; -import java.net.http.HttpResponse; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; public abstract class BaseTopicsImpl { @@ -23,14 +24,14 @@ public void createPartitionedTopic(String tenant, String namespace, String encod boolean createLocalTopicOnly) throws PulsarAdminException { String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions"); try { - HttpResponse response = httpClient.put(url, numPartitions, "createLocalTopicOnly", + HttpResponse response = httpClient.put(url, numPartitions, "createLocalTopicOnly", String.valueOf(createLocalTopicOnly)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to create partitioned topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } @@ -39,14 +40,14 @@ public void deletePartitionedTopic(String tenant, String namespace, String encod boolean authoritative) throws PulsarAdminException { String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions"); try { - HttpResponse response = httpClient.delete(url, "force", String.valueOf(force), + HttpResponse response = httpClient.delete(url, "force", String.valueOf(force), "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to delete partitioned topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -56,16 +57,16 @@ public void updatePartitionedTopic(String tenant, String namespace, String encod boolean force, int numPartitions) throws PulsarAdminException { String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions"); try { - HttpResponse response = httpClient.post(url, numPartitions, "updateLocalTopicOnly", + HttpResponse response = httpClient.post(url, numPartitions, "updateLocalTopicOnly", String.valueOf(updateLocalTopicOnly), "authoritative", String.valueOf(authoritative), "force", String.valueOf(force)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to update partitioned topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -76,7 +77,7 @@ public PartitionedTopicStats getPartitionedStats(String tenant, String namespace String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitioned-stats"); try { - HttpResponse response = httpClient.get(url, "perPartition", + HttpResponse response = httpClient.get(url, "perPartition", Arrays.toString(new Object[] {perPartition}), "getPreciseBacklog", Arrays.toString(new Object[] {false}), "subscriptionBacklogSize", Arrays.toString(new Object[] {false}), @@ -84,10 +85,10 @@ public PartitionedTopicStats getPartitionedStats(String tenant, String namespace if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get partitioned stats of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), PartitionedTopicStats.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -97,16 +98,16 @@ public PartitionedTopicMetadata getPartitionedMetadata(String tenant, String nam throws PulsarAdminException { String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, "/partitions"); try { - HttpResponse response = httpClient.get(url, + HttpResponse response = httpClient.get(url, "checkAllowAutoCreation", String.valueOf(checkAllowAutoCreation), "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to update partitioned topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), PartitionedTopicMetadata.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -115,14 +116,14 @@ public void createNonPartitionedTopic(String tenant, String namespace, String en Map properties) throws PulsarAdminException { String url = String.format("%s/%s/%s/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic); try { - HttpResponse response = httpClient.put(url, properties, + HttpResponse response = httpClient.put(url, properties, "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to create non-partitioned topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -131,24 +132,24 @@ public void deleteTopic(String tenant, String namespace, String encodedTopic, bo throws PulsarAdminException { String url = String.format("%s/%s/%s/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic); try { - HttpResponse response = httpClient.delete(url, + HttpResponse response = httpClient.delete(url, "force", String.valueOf(force), "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to delete non-partitioned topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } - public List getList(String tenant, String namespace, String bundle, boolean includeSystemTopic) + public List getList(String tenant, String namespace, String bundle, boolean includeSystemTopic) throws PulsarAdminException { String url = String.format("%s/%s/%s", getDomainBaseUrl(), tenant, namespace); try { - HttpResponse response; + HttpResponse response; if (bundle != null) { response = httpClient.get(url, "bundle", bundle, @@ -161,30 +162,30 @@ public List getList(String tenant, String namespace, String bundle, bool throw new PulsarAdminException( String.format("failed to get list of non-partitioned-topics " + "under namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - return JacksonService.toRefer(response.body(), new TypeReference>() { + return JacksonService.toRefer(response.body(), new TypeReference<>() { }); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } - public List getPartitionedTopicList(String tenant, String namespace, boolean includeSystemTopic) + public List getPartitionedTopicList(String tenant, String namespace, boolean includeSystemTopic) throws PulsarAdminException { String url = String.format("%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, UrlConst.PARTITIONED); try { - HttpResponse response = httpClient.get(url, "includeSystemTopic", + HttpResponse response = httpClient.get(url, "includeSystemTopic", String.valueOf(includeSystemTopic)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get list of partitioned-topics under namespace %s/%s, " + "status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - return JacksonService.toRefer(response.body(), new TypeReference>() { + return JacksonService.toRefer(response.body(), new TypeReference<>() { }); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -194,14 +195,14 @@ public void createMissedPartitions(String tenant, String namespace, String encod String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.CREATE_MISSED_PARTITIONS); try { - HttpResponse response = httpClient.post(url); + HttpResponse response = httpClient.post(url); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to create missing partitions for topic %s/%s/%s, " + "status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -211,14 +212,14 @@ public MessageIdImpl getLastMessageId(String tenant, String namespace, String en String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.LAST_MESSAGE_ID); try { - HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative)); + HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get last message id of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), MessageIdImpl.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -228,17 +229,17 @@ public RetentionPolicies getRetention(String tenant, String namespace, String en String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.RETENTION); try { - HttpResponse response = httpClient.get(url, + HttpResponse response = httpClient.get(url, "isGlobal", String.valueOf(isGlobal), "applied", String.valueOf(applied), "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get retention of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), RetentionPolicies.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -248,15 +249,15 @@ public void setRetention(String tenant, String namespace, String encodedTopic, b String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.RETENTION); try { - HttpResponse response = httpClient.post(url, retention, + HttpResponse response = httpClient.post(url, retention, "authoritative", String.valueOf(authoritative), "isGlobal", String.valueOf(isGlobal)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to set retention for topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -266,13 +267,13 @@ public void removeRetention(String tenant, String namespace, String encodedTopic String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.RETENTION); try { - HttpResponse response = httpClient.delete(url, "authoritative", String.valueOf(authoritative)); + HttpResponse response = httpClient.delete(url, "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to delete retention of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -284,18 +285,18 @@ public Map getBacklogQuotaMap(String tenant, Str String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.BACKLOG_QUOTA_MAP); try { - HttpResponse response = httpClient.get(url, + HttpResponse response = httpClient.get(url, "applied", String.valueOf(applied), "authoritative", String.valueOf(authoritative), "isGlobal", String.valueOf(isGlobal)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get backlog quota map of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toRefer(response.body(), new TypeReference>() { }); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -306,16 +307,16 @@ public void setBacklogQuota(String tenant, String namespace, String encodedTopic String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.BACKLOG_QUOTA); try { - HttpResponse response = httpClient.post(url, backlogQuota, + HttpResponse response = httpClient.post(url, backlogQuota, "authoritative", String.valueOf(authoritative), "isGlobal", String.valueOf(isGlobal), "backlogQuotaType", String.valueOf(backlogQuotaType)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to set backlog quota for topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -326,16 +327,16 @@ public void removeBacklogQuota(String tenant, String namespace, String encodedTo String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.BACKLOG_QUOTA); try { - HttpResponse response = httpClient.delete(url, + HttpResponse response = httpClient.delete(url, "backlogQuotaType", String.valueOf(backlogQuotaType), "authoritative", String.valueOf(authoritative), "isGlobal", String.valueOf(isGlobal)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to remove backlog quota of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -345,14 +346,14 @@ public PersistentOfflineTopicStats getBacklog(String tenant, String namespace, S String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.BACKLOG); try { - HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative)); + HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get backlog of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), PersistentOfflineTopicStats.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -362,15 +363,15 @@ public long getBacklogSizeByMessageId(String tenant, String namespace, String en String url = String.format("%s/%s/%s/%s%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, UrlConst.BACKLOG_SIZE); try { - HttpResponse response = httpClient.put(url, messageId, "authoritative", + HttpResponse response = httpClient.put(url, messageId, "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get backlog size of topic %s/%s/%s, status code %s, body : %s", - tenant, namespace, encodedTopic, response.statusCode(), response.body())); + tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), Long.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Brokers.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Brokers.java index 0eecf5c..2f27612 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Brokers.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Brokers.java @@ -1,6 +1,6 @@ package io.github.protocol.pulsar.admin.jdk; -import java.net.http.HttpResponse; +import io.github.openfacade.http.HttpResponse; public class Brokers { private final InnerHttpClient innerHttpClient; @@ -15,12 +15,12 @@ public void healthcheck(TopicVersion topicVersion) throws PulsarAdminException { url += "?topicVersion=" + topicVersion; } try { - HttpResponse httpResponse = innerHttpClient.get(url); + HttpResponse httpResponse = innerHttpClient.get(url); if (httpResponse.statusCode() != 200) { throw new PulsarAdminException("healthcheck failed, status code: " + httpResponse.statusCode(), httpResponse.statusCode()); } - if (!httpResponse.body().equals("ok")) { + if (!httpResponse.bodyAsString().equals("ok")) { throw new PulsarAdminException("healthcheck failed, body: " + httpResponse.body()); } } catch (Exception e) { diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Clusters.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Clusters.java index be9f6f6..e96cc15 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Clusters.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Clusters.java @@ -1,11 +1,12 @@ package io.github.protocol.pulsar.admin.jdk; import com.fasterxml.jackson.core.type.TypeReference; +import io.github.openfacade.http.HttpResponse; import io.github.protocol.pulsar.admin.common.JacksonService; import java.io.IOException; -import java.net.http.HttpResponse; import java.util.List; +import java.util.concurrent.ExecutionException; public class Clusters { @@ -17,15 +18,14 @@ public Clusters(InnerHttpClient httpClient) { public List getClusters() throws PulsarAdminException { try { - HttpResponse response = httpClient.get(UrlConst.CLUSTERS); + HttpResponse response = httpClient.get(UrlConst.CLUSTERS); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get list of clusters, " + "status code %s, body : %s", response.statusCode(), response.body())); } - return JacksonService.toRefer(response.body(), new TypeReference>() { - }); - } catch (IOException | InterruptedException e) { + return JacksonService.toRefer(response.body(), new TypeReference<>() {}); + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClient.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClient.java index 3fef3d7..924d0ce 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClient.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClient.java @@ -1,128 +1,127 @@ package io.github.protocol.pulsar.admin.jdk; import com.fasterxml.jackson.core.JsonProcessingException; +import io.github.openfacade.http.HttpClient; +import io.github.openfacade.http.HttpClientConfig; +import io.github.openfacade.http.HttpClientEngine; +import io.github.openfacade.http.HttpClientFactory; +import io.github.openfacade.http.HttpMethod; +import io.github.openfacade.http.HttpRequest; +import io.github.openfacade.http.HttpResponse; +import io.github.openfacade.http.HttpSchema; +import io.github.openfacade.http.TlsConfig; +import io.github.openfacade.http.UrlBuilder; import io.github.protocol.pulsar.admin.api.Configuration; import io.github.protocol.pulsar.admin.common.JacksonService; import java.io.IOException; -import java.net.URI; import java.net.URLEncoder; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; public class InnerHttpClient { - private final Configuration conf; - private final HttpClient client; - private final String httpPrefix; + private UrlBuilder templateUrlBuilder; public InnerHttpClient(Configuration conf) { - this.conf = conf; - HttpClient.Builder builder = HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_1_1); + HttpClientConfig.Builder clientConfigBuilder = new HttpClientConfig.Builder(); + clientConfigBuilder.engine(HttpClientEngine.JAVA); if (conf.tlsEnabled) { - builder = builder - .sslContext(SslContextUtil.build(conf.tlsConfig)); - this.httpPrefix = "https://" + conf.host + ":" + conf.port; - } else { - this.httpPrefix = "http://" + conf.host + ":" + conf.port; + TlsConfig.Builder tlsConfigBuilder = new TlsConfig.Builder(); + io.github.protocol.pulsar.admin.api.TlsConfig tlsConfig = conf.tlsConfig; + tlsConfigBuilder.cipherSuites(tlsConfig.cipherSuites); + tlsConfigBuilder.hostnameVerifyDisabled(tlsConfig.hostnameVerifyDisabled); + tlsConfigBuilder.keyStore(tlsConfig.keyStorePath, tlsConfig.keyStorePassword); + tlsConfigBuilder.trustStore(tlsConfig.trustStorePath, tlsConfig.trustStorePassword); + tlsConfigBuilder.verifyDisabled(tlsConfig.verifyDisabled); + clientConfigBuilder.tlsConfig(tlsConfigBuilder.build()); } - this.client = builder.build(); + this.client = HttpClientFactory.createHttpClient(clientConfigBuilder.build()); + templateUrlBuilder = new UrlBuilder(); + templateUrlBuilder.setHttpSchema(conf.tlsEnabled ? HttpSchema.HTTPS : HttpSchema.HTTP).setHost(conf.host) + .setPort(conf.port); } - public HttpResponse get(String url) throws IOException, InterruptedException { - return this.get(url, new String[0]); + public HttpResponse post(String url) throws IOException, InterruptedException, ExecutionException { + return this.innerPost(url, new byte[0]); } - public HttpResponse get(String url, String... requestParams) - throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(getUri(url, requestParams)) - .GET() - .build(); - return client.send(request, HttpResponse.BodyHandlers.ofString()); + public HttpResponse post(String url, Object body) throws IOException, InterruptedException, ExecutionException { + return this.innerPost(url, objectToBytes(body)); } - public HttpResponse post(String url, Object body, String... params) - throws IOException, InterruptedException { - return this.post(url, objectToString(body), params); + public HttpResponse post(String url, Object body, String... params) + throws IOException, ExecutionException, InterruptedException { + return this.innerPost(url, objectToBytes(body), params); } - public HttpResponse post(String url, String body, String... params) - throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(getUri(url, params)) - .POST(HttpRequest.BodyPublishers.ofString(body == null ? "" : body)) - .setHeader("Content-Type", "application/json") - .build(); - return client.send(request, HttpResponse.BodyHandlers.ofString()); + private HttpResponse innerPost(String url, byte[] body, String... params) throws InterruptedException, ExecutionException { + Map> headers = new HashMap<>(); + headers.put("Content-Type", List.of("application/json")); + HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.POST, headers, body); + return client.send(request).get(); } - public HttpResponse post(String url) throws IOException, InterruptedException { - return this.post(url, ""); + public HttpResponse put(String url) throws IOException, InterruptedException, ExecutionException { + return this.innerPut(url, new byte[0]); } - public HttpResponse put(String url) throws IOException, InterruptedException { - return this.put(url, ""); + public HttpResponse put(String url, Object body) throws IOException, InterruptedException, ExecutionException { + return this.innerPut(url, objectToBytes(body)); } - public HttpResponse put(String url, Object body, String... params) - throws IOException, InterruptedException { - return this.put(url, objectToString(body), params); + public HttpResponse put(String url, Object body, String... params) throws IOException, InterruptedException, ExecutionException { + return this.innerPut(url, objectToBytes(body), params); } - public HttpResponse put(String url, String body, String... params) - throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(getUri(url, params)) - .PUT(HttpRequest.BodyPublishers.ofString(body == null ? "" : body)) - .setHeader("Content-Type", "application/json") - .build(); - return client.send(request, HttpResponse.BodyHandlers.ofString()); + private HttpResponse innerPut(String url, byte[] body, String... params) throws InterruptedException, ExecutionException { + Map> headers = new HashMap<>(); + headers.put("Content-Type", List.of("application/json")); + HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.PUT, headers, body); + return client.send(request).get(); } - public HttpResponse delete(String url, String... requestParams) - throws IOException, InterruptedException { - HttpRequest request = HttpRequest.newBuilder() - .uri(getUri(url, requestParams)) - .DELETE() - .build(); - return client.send(request, HttpResponse.BodyHandlers.ofString()); + public HttpResponse delete(String url, String... params) throws IOException, InterruptedException, ExecutionException { + HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.DELETE); + return client.send(request).get(); } - private URI getUri(String url, String... params) { - return URI.create(this.httpPrefix + url + mapToParams(params)); + public HttpResponse get(String url, String... params) throws IOException, InterruptedException, ExecutionException { + HttpRequest request = new HttpRequest(concatUrlWithParams(url, params), HttpMethod.GET); + return client.send(request).get(); } - static String mapToParams(String... requestParams) { + private List convertListToParams(String... requestParams) { if (requestParams.length % 2 != 0) { throw new IllegalArgumentException("params list length cannot be odd"); } - if (requestParams.length == 0) { - return ""; - } - StringBuilder res = new StringBuilder("?"); - res.append(requestParams[0]); - res.append('='); - res.append(requestParams[1]); - for (int i = 2; i < requestParams.length; ) { - res.append('&'); - res.append(encode(requestParams[i++])); - res.append('='); - res.append(encode(requestParams[i++])); + List queryParams = new ArrayList<>(); + for (int i = 0; i < requestParams.length; i = i + 2) { + queryParams.add(new UrlBuilder.Param(requestParams[i], requestParams[i+1])); } - return res.toString(); + return queryParams; } - private String objectToString(Object obj) throws JsonProcessingException { - return obj == null ? "" : JacksonService.toJson(obj); + private byte[] objectToBytes(Object obj) throws JsonProcessingException { + return obj == null ? new byte[0]: JacksonService.toBytes(obj); } + @Deprecated private static String encode(String value) { return URLEncoder.encode(value, StandardCharsets.UTF_8); } + private String concatUrlWithParams(String url, String... params) { + UrlBuilder urlBuilder = templateUrlBuilder.duplicate(); + urlBuilder.setPath(url); + if (params != null && params.length > 0) { + urlBuilder.setQueryParams(convertListToParams(params)); + } + return urlBuilder.build(); + } } diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Namespaces.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Namespaces.java index b60063f..be926e9 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Namespaces.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Namespaces.java @@ -1,12 +1,13 @@ package io.github.protocol.pulsar.admin.jdk; import com.fasterxml.jackson.core.type.TypeReference; +import io.github.openfacade.http.HttpResponse; import io.github.protocol.pulsar.admin.common.JacksonService; import java.io.IOException; -import java.net.http.HttpResponse; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; public class Namespaces { @@ -18,83 +19,80 @@ public Namespaces(InnerHttpClient httpClient) { public List getTenantNamespaces(String tenant) throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s", UrlConst.NAMESPACES, tenant)); + HttpResponse response = httpClient.get(String.format("%s/%s", UrlConst.NAMESPACES, tenant)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get namespaces of tenant %s, status code %s, body : %s", - tenant, response.statusCode(), response.body())); + tenant, response.statusCode(), response.bodyAsString())); } - return JacksonService.toList(response.body(), new TypeReference>() { + return JacksonService.toList(response.body(), new TypeReference<>() { }); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public List getTopics(String tenant, String namespace, Mode mode, boolean includeSystemTopic) - throws PulsarAdminException { + throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s/%s/topics", UrlConst.NAMESPACES, tenant, namespace), - "mode", String.valueOf(mode), - "includeSystemTopic", String.valueOf(includeSystemTopic)); + HttpResponse response = + httpClient.get(String.format("%s/%s/%s/topics", UrlConst.NAMESPACES, tenant, namespace), "mode", + String.valueOf(mode), "includeSystemTopic", String.valueOf(includeSystemTopic)); if (response.statusCode() != 200) { throw new PulsarAdminException( String.format("failed to get topics of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - return JacksonService.toList(response.body(), new TypeReference>() { + return JacksonService.toList(response.body(), new TypeReference<>() { }); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void createNamespace(String tenant, String namespace) throws PulsarAdminException { try { - HttpResponse response = httpClient.put( - String.format("%s/%s/%s", UrlConst.NAMESPACES, tenant, namespace)); + HttpResponse response = httpClient.put(String.format("%s/%s/%s", UrlConst.NAMESPACES, tenant, namespace)); if (response.statusCode() != 204) { throw new PulsarAdminException( String.format("failed to get topics of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void deleteNamespace(String tenant, String namespace, boolean force, boolean authoritative) - throws PulsarAdminException { + throws PulsarAdminException { try { - HttpResponse response = httpClient.delete( - String.format("%s/%s/%s", UrlConst.NAMESPACES, tenant, namespace), - "force", String.valueOf(force), - "authoritative", String.valueOf(authoritative)); + HttpResponse response = + httpClient.delete(String.format("%s/%s/%s", UrlConst.NAMESPACES, tenant, namespace), "force", + String.valueOf(force), "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to get topics of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public Map getBacklogQuotaMap(String tenant, String namespace) - throws PulsarAdminException { + throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.BACKLOG_QUOTA_MAP)); + String url = + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.BACKLOG_QUOTA_MAP); + HttpResponse response = httpClient.get(url); if (response.statusCode() != 200) { throw new PulsarAdminException(String.format( "failed to get backlog quota map of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - return JacksonService.toRefer(response.body(), new TypeReference>() { + return JacksonService.toRefer(response.body(), new TypeReference<>() { }); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -102,31 +100,30 @@ public Map getBacklogQuotaMap(String tenant, Str public void setBacklogQuota(String tenant, String namespace, BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) throws PulsarAdminException { try { - HttpResponse response = httpClient.post( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.BACKLOG_QUOTA), - backlogQuota, "backlogQuotaType", String.valueOf(backlogQuotaType)); + HttpResponse response = httpClient.post( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.BACKLOG_QUOTA), + backlogQuota, "backlogQuotaType", String.valueOf(backlogQuotaType)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to set backlog quota of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void removeBacklogQuota(String tenant, String namespace, BacklogQuotaType backlogQuotaType) - throws PulsarAdminException { + throws PulsarAdminException { try { - HttpResponse response = httpClient.delete( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.BACKLOG_QUOTA), - "backlogQuotaType", String.valueOf(backlogQuotaType)); + HttpResponse response = httpClient.delete(String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.BACKLOG_QUOTA), + "backlogQuotaType", String.valueOf(backlogQuotaType)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to delete backlog quota policy of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } @@ -134,167 +131,163 @@ public void removeBacklogQuota(String tenant, String namespace, BacklogQuotaType public void clearNamespaceBacklogForSubscription(String tenant, String namespace, String subscription, boolean authoritative) throws PulsarAdminException { try { - HttpResponse response = httpClient.post( - String.format("%s/%s/%s%s/%s", UrlConst.NAMESPACES, tenant, namespace, - UrlConst.CLEAR_BACKLOG, subscription), - null, - "authoritative", String.valueOf(authoritative)); + HttpResponse response = httpClient.post( + String.format("%s/%s/%s%s/%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.CLEAR_BACKLOG, + subscription), null, "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to clear backlog of namespace %s/%s subscription %s, status code %s, body : %s", - tenant, namespace, subscription, response.statusCode(), response.body())); + tenant, namespace, subscription, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (InterruptedException | ExecutionException | IOException e) { throw new PulsarAdminException(e); } } public void clearBackLog(String tenant, String namespace, boolean authoritative) throws PulsarAdminException { try { - HttpResponse response = httpClient.post( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.CLEAR_BACKLOG), - null, - "authoritative", String.valueOf(authoritative)); + HttpResponse response = httpClient.post( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.CLEAR_BACKLOG), null, + "authoritative", String.valueOf(authoritative)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to clear backlog of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (InterruptedException | ExecutionException | IOException e) { throw new PulsarAdminException(e); } } public RetentionPolicies getRetention(String tenant, String namespace) throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.RETENTION)); + HttpResponse response = + httpClient.get(String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.RETENTION)); if (response.statusCode() != 200) { throw new PulsarAdminException(String.format( "failed to get retention of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), RetentionPolicies.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void setRetention(String tenant, String namespace, RetentionPolicies retentionPolicies) - throws PulsarAdminException { + throws PulsarAdminException { try { - HttpResponse response = httpClient.post( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.RETENTION), + HttpResponse response = + httpClient.post(String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.RETENTION), retentionPolicies); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to set retention of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void removeRetention(String tenant, String namespace, RetentionPolicies retentionPolicies) - throws PulsarAdminException { + throws PulsarAdminException { try { - HttpResponse response = httpClient.delete( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.RETENTION)); + HttpResponse response = httpClient.delete( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.RETENTION)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to remove retention of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public Integer getNamespaceMessageTTL(String tenant, String namespace) throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.MESSAGE_TTL)); + HttpResponse response = httpClient.get( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.MESSAGE_TTL)); if (response.statusCode() != 200) { throw new PulsarAdminException(String.format( "failed to get messageTTL of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), Integer.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void setNamespaceMessageTTL(String tenant, String namespace, int messageTTL) throws PulsarAdminException { try { - HttpResponse response = httpClient.post( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.MESSAGE_TTL), - messageTTL); + HttpResponse response = httpClient.post( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.MESSAGE_TTL), messageTTL); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to set messageTTL of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void removeNamespaceMessageTTL(String tenant, String namespace) throws PulsarAdminException { try { - HttpResponse response = httpClient.delete( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.MESSAGE_TTL)); + HttpResponse response = httpClient.delete( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.MESSAGE_TTL)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to remove messageTTL of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public Long getCompactionThreshold(String tenant, String namespace) throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.COMPACTION_THRESHOLD)); + HttpResponse response = httpClient.get( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.COMPACTION_THRESHOLD)); if (response.statusCode() != 200) { throw new PulsarAdminException(String.format( "failed to get compaction threshold of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), Long.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void setCompactionThreshold(String tenant, String namespace, long newThreshold) throws PulsarAdminException { try { - HttpResponse response = httpClient.put( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.COMPACTION_THRESHOLD), - newThreshold); + HttpResponse response = httpClient.put( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.COMPACTION_THRESHOLD), + newThreshold); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to set compaction threshold of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void deleteCompactionThreshold(String tenant, String namespace) throws PulsarAdminException { try { - HttpResponse response = httpClient.delete( - String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.COMPACTION_THRESHOLD)); + HttpResponse response = httpClient.delete( + String.format("%s/%s/%s%s", UrlConst.NAMESPACES, tenant, namespace, UrlConst.COMPACTION_THRESHOLD)); if (response.statusCode() != 204) { throw new PulsarAdminException(String.format( "failed to remove compaction threshold of namespace %s/%s, status code %s, body : %s", - tenant, namespace, response.statusCode(), response.body())); + tenant, namespace, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } diff --git a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Tenants.java b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Tenants.java index 33b6bc2..1a9870d 100644 --- a/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Tenants.java +++ b/pulsar-admin-jdk/src/main/java/io/github/protocol/pulsar/admin/jdk/Tenants.java @@ -1,11 +1,12 @@ package io.github.protocol.pulsar.admin.jdk; import com.fasterxml.jackson.core.type.TypeReference; +import io.github.openfacade.http.HttpResponse; import io.github.protocol.pulsar.admin.common.JacksonService; import java.io.IOException; -import java.net.http.HttpResponse; import java.util.List; +import java.util.concurrent.ExecutionException; public class Tenants { @@ -17,67 +18,66 @@ public Tenants(InnerHttpClient httpClient) { public void createTenant(String tenant, TenantInfo tenantInfo) throws PulsarAdminException { try { - HttpResponse response = httpClient.put( - String.format("%s/%s", UrlConst.TENANTS, tenant), tenantInfo); + String url = String.format("%s/%s", UrlConst.TENANTS, tenant); + HttpResponse response = httpClient.put(url, tenantInfo); if (response.statusCode() != 204){ throw new PulsarAdminException(String.format("failed to create tenant %s, status code %s, body : %s", - tenant, response.statusCode(), response.body())); + tenant, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void deleteTenant(String tenant, boolean force) throws PulsarAdminException { try { - HttpResponse response = + HttpResponse response = httpClient.delete(String.format("%s/%s", UrlConst.TENANTS, tenant), "force", String.valueOf(force)); if (response.statusCode() != 204){ throw new PulsarAdminException(String.format("failed to create tenant %s, status code %s, body : %s", - tenant, response.statusCode(), response.body())); + tenant, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public void updateTenant(String tenant, TenantInfo tenantInfo) throws PulsarAdminException { try { - HttpResponse response = - httpClient.post( - String.format("%s/%s", UrlConst.TENANTS, tenant), tenantInfo); + String url = String.format("%s/%s", UrlConst.TENANTS, tenant); + HttpResponse response = httpClient.post(url, tenantInfo); if (response.statusCode() != 204){ throw new PulsarAdminException(String.format("failed to update tenant %s, status code %s, body : %s", - tenant, response.statusCode(), response.body())); + tenant, response.statusCode(), response.bodyAsString())); } - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public TenantInfo getTenantAdmin(String tenant) throws PulsarAdminException { try { - HttpResponse response = httpClient.get( - String.format("%s/%s", UrlConst.TENANTS, tenant)); + String url = String.format("%s/%s", UrlConst.TENANTS, tenant); + HttpResponse response = httpClient.get(url); if (response.statusCode() != 200){ throw new PulsarAdminException(String.format("failed to get tenant %s, status code %s, body : %s", - tenant, response.statusCode(), response.body())); + tenant, response.statusCode(), response.bodyAsString())); } return JacksonService.toObject(response.body(), TenantInfo.class); - } catch (IOException | InterruptedException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } public List getTenants() throws PulsarAdminException { try { - HttpResponse response = httpClient.get(UrlConst.TENANTS); + HttpResponse response = httpClient.get(UrlConst.TENANTS); if (response.statusCode() != 200){ throw new PulsarAdminException(String.format("failed to get list of tenant, status code %s, body : %s", - response.statusCode(), response.body())); + response.statusCode(), response.bodyAsString())); } - return JacksonService.toList(response.body(), new TypeReference>(){}); - } catch (IOException | InterruptedException e) { + return JacksonService.toList(response.body(), new TypeReference<>() {}); + } catch (IOException | InterruptedException | ExecutionException e) { throw new PulsarAdminException(e); } } diff --git a/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClientTest.java b/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClientTest.java deleted file mode 100644 index 602f23f..0000000 --- a/pulsar-admin-jdk/src/test/java/io/github/protocol/pulsar/admin/jdk/InnerHttpClientTest.java +++ /dev/null @@ -1,22 +0,0 @@ -package io.github.protocol.pulsar.admin.jdk; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class InnerHttpClientTest { - - @Test - public void mapToParamsCaseEmptyMap(){ - Assertions.assertEquals("", InnerHttpClient.mapToParams()); - } - - @Test - public void mapToParamsCaseSpecialChars(){ - Assertions.assertEquals("?a=b&a%2F=b%5C&%25=%26&%3D=%3F", InnerHttpClient.mapToParams( - "a", "b", - "a/", "b\\", - "%", "&", - "=", "?")); - } - -}