Skip to content

Commit

Permalink
[fix] [broker] [branch-2.10] Remove the unnecessary HTTP response hea…
Browse files Browse the repository at this point in the history
…der: Content-Encoding when calling getSchema (apache#22307)
  • Loading branch information
poorbarcode authored Mar 20, 2024
1 parent ba895f0 commit 838c0c3
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.List;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
Expand Down Expand Up @@ -258,7 +257,7 @@ private static void handleGetSchemaResponse(AsyncResponse response, SchemaAndMet
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schema is deleted").build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
response.resume(Response.ok()
.entity(convertSchemaAndMetadataToGetSchemaResponse(schema)).build());
}
} else {
Expand All @@ -275,7 +274,7 @@ private static void handleGetAllSchemasResponse(AsyncResponse response, List<Sch
response.resume(Response.status(
Response.Status.NOT_FOUND.getStatusCode(), "Schemas not found").build());
} else {
response.resume(Response.ok().encoding(MediaType.APPLICATION_JSON)
response.resume(Response.ok()
.entity(GetAllVersionsSchemaResponse.builder()
.getSchemaResponses(schemas.stream()
.map(SchemasResourceBase::convertSchemaAndMetadataToGetSchemaResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -44,17 +45,23 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import lombok.Cleanup;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema.Parser;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
Expand Down Expand Up @@ -1257,6 +1264,40 @@ public void testCreateSchemaInParallel() throws Exception {
executor.shutdownNow();
}

@Test
public void testHTTPGetSchema() throws Exception {
final String namespace = "test-namespace-" + randomName(16);
String ns = PUBLIC_TENANT + "/" + namespace;
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
final String topic = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp");
// create a schema.
pulsarClient.newProducer(Schema.STRING).topic(topic).create().close();

// call get schemas.
SchemasImpl schemas = (SchemasImpl) admin.schemas();
Method methodSchemasPath = SchemasImpl.class.getDeclaredMethod("schemasPath", new Class[]{TopicName.class});
methodSchemasPath.setAccessible(true);
WebTarget path = (WebTarget) methodSchemasPath.invoke(schemas, TopicName.get(topic));
CompletableFuture<javax.ws.rs.core.Response> response = new CompletableFuture();
schemas.asyncGetRequest(path, new InvocationCallback<javax.ws.rs.core.Response>() {

@Override
public void completed(javax.ws.rs.core.Response getSchemaResponse) {
response.complete(getSchemaResponse);
}

@Override
public void failed(Throwable throwable) {
response.completeExceptionally(throwable);
}
});
MultivaluedMap<String, Object> responseHeaders = response.join().getHeaders();
assertTrue(!responseHeaders.containsKey(HttpHeaders.CONTENT_ENCODING)
|| !responseHeaders.get(HttpHeaders.CONTENT_ENCODING).toString().contains("application/json"));
assertTrue(responseHeaders.containsKey(HttpHeaders.CONTENT_TYPE)
&& responseHeaders.get(HttpHeaders.CONTENT_TYPE).toString().contains("application/json"));
}

@EqualsAndHashCode
static class User implements Serializable {
private String name;
Expand Down

0 comments on commit 838c0c3

Please sign in to comment.