diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index be5badb19b1d7..2a117ec1df5db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -398,6 +398,18 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori } } + protected void internalCreateNonPartitionedTopic(boolean authoritative) { + validateAdminAccessForTenant(topicName.getTenant()); + + try { + getOrCreateTopic(topicName); + log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName); + } catch (Exception e) { + log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e); + throw new RestException(e); + } + } + /** * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to * already exist and number of new partitions must be greater than existing number of partitions. Decrementing diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 30cb4b323158d..a9c9369e7373c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -140,6 +140,22 @@ public void createPartitionedTopic(@PathParam("tenant") String tenant, @PathPara internalCreatePartitionedTopic(numPartitions, authoritative); } + @PUT + @Path("/{tenant}/{namespace}/{topic}") + @ApiOperation(value="Create a non-partitioned topic.", notes = "This is the only REST endpoint from which non-partitioned topics could be created.") + @ApiResponses(value = { + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 409, message = "Partitioned topic already exist"), + @ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"), + @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") + }) + public void createNonPartitionedTopic(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateGlobalNamespaceOwnership(tenant,namespace); + internalCreateNonPartitionedTopic(authoritative); + } + /** * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be * already exist and number of new partitions must be greater than existing number of partitions. Decrementing diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 2d23f50a3932c..fdefd469c9cb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -117,7 +117,7 @@ public void testGetSubscriptions() { } @Test - public void testGetSubscriptionsWithAutoTopicCreationDisabled() { + public void testNonPartitionedTopics() { pulsar.getConfiguration().setAllowAutoTopicCreation(false); final String nonPartitionTopic = "non-partitioned-topic"; persistentTopics.createSubscription(testTenant, testNamespace, nonPartitionTopic, "test", true, (MessageIdImpl) MessageId.latest); @@ -126,5 +126,8 @@ public void testGetSubscriptionsWithAutoTopicCreationDisabled() { } catch (RestException exc) { Assert.assertTrue(exc.getMessage().contains("zero partitions")); } + final String nonPartitionTopic2 = "secondary-non-partitioned-topic"; + persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true); + Assert.assertEquals(persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true).partitions, 0); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 9c442c73d2e0a..691d0abd66bb3 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -202,6 +202,18 @@ List getListInBundle(String namespace, String bundleRange) */ void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException; + /** + * Create a non-partitioned topic. + * + *

+ * Create a non-partitioned topic. + *

+ * + * @param topic Topic name + * @throws PulsarAdminException + */ + void createNonPartitionedTopic(String topic) throws PulsarAdminException; + /** * Create a partitioned topic asynchronously. *

@@ -217,6 +229,13 @@ List getListInBundle(String namespace, String bundleRange) */ CompletableFuture createPartitionedTopicAsync(String topic, int numPartitions); + /** + * Create a non-partitioned topic asynchronously. + * + * @param topic Topic name + */ + CompletableFuture createNonPartitionedTopicAsync(String topic); + /** * Update number of partitions of a non-global partitioned topic. *

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 763d2218d9e6c..91991c6af6ca6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -206,6 +206,25 @@ public void createPartitionedTopic(String topic, int numPartitions) throws Pulsa } } + @Override + public void createNonPartitionedTopic(String topic) throws PulsarAdminException { + try { + createNonPartitionedTopicAsync(topic).get(); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e.getCause()); + } + } + + @Override + public CompletableFuture createNonPartitionedTopicAsync(String topic){ + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public CompletableFuture createPartitionedTopicAsync(String topic, int numPartitions) { checkArgument(numPartitions > 1, "Number of partitions should be more than 1"); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index d4c2cd3f60831..e852786fba9b6 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -606,6 +606,9 @@ void topics() throws Exception { cmdTopics.run(split("create-partitioned-topic persistent://myprop/clust/ns1/ds1 --partitions 32")); verify(mockTopics).createPartitionedTopic("persistent://myprop/clust/ns1/ds1", 32); + cmdTopics.run(split("create persistent://myprop/clust/ns1/ds1")); + verify(mockTopics).createNonPartitionedTopic("persistent://myprop/clust/ns1/ds1"); + cmdTopics.run(split("list-partitioned-topics myprop/clust/ns1")); verify(mockTopics).getPartitionedTopicList("myprop/clust/ns1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index ceaebf4ea13a1..9fdce5e1aaf5b 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -82,6 +82,7 @@ public CmdTopics(PulsarAdmin admin) { jcommander.addCommand("expire-messages-all-subscriptions", new ExpireMessagesForAllSubscriptions()); jcommander.addCommand("create-partitioned-topic", new CreatePartitionedCmd()); + jcommander.addCommand("create", new CreateNonPartitionedCmd()); jcommander.addCommand("update-partitioned-topic", new UpdatePartitionedCmd()); jcommander.addCommand("get-partitioned-topic-metadata", new GetPartitionedTopicMetadataCmd()); @@ -213,6 +214,19 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Create a non-partitioned topic.") + private class CreateNonPartitionedCmd extends CliCommand { + + @Parameter(description = "persistent://tenant/namespace/topic\n", required = true) + private java.util.List params; + + @Override + void run() throws Exception { + String topic = validateTopicName(params); + topics.createNonPartitionedTopic(topic); + } + } + @Parameters(commandDescription = "Update existing non-global partitioned topic. \n" + "\t\tNew updating number of partitions must be greater than existing number of partitions.") private class UpdatePartitionedCmd extends CliCommand { diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md index 34670bbe4b6ac..d86a256591882 100644 --- a/site2/docs/admin-api-partitioned-topics.md +++ b/site2/docs/admin-api-partitioned-topics.md @@ -44,6 +44,33 @@ int numPartitions = 4; admin.persistentTopics().createPartitionedTopic(topicName, numPartitions); ``` +## Nonpartitioned topics resources + +### Create + +Nonpartitioned topics in Pulsar must be explicitly created if allowAutoTopicCreation or createIfMissing is disabled. +When creating a non-partitioned topic, you need to provide a topic name. + +#### pulsar-admin + +You can create non-partitioned topics using the [`create`](reference-pulsar-admin.md#create) +command and specifying the topic name as an argument. This is an example command: + +```shell +$ bin/pulsar-admin topics create persistent://my-tenant/my-namespace/my-topic +``` + +#### REST API + +{@inject: endpoint|PUT|admin/v2/persistent/:tenant/:namespace/:topic|operation/createNonPartitionedTopic} + +#### Java + +```java +String topicName = "persistent://my-tenant/my-namespace/my-topic"; +admin.topics().createNonPartitionedTopic(topicName); +``` + ### Get metadata Partitioned topics have metadata associated with them that you can fetch as a JSON object. diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 65356ae8fde27..62a8d382cafeb 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1684,6 +1684,7 @@ Subcommands * `offload-status` * `create-partitioned-topic` * `delete-partitioned-topic` +* `create` * `get-partitioned-topic-metadata` * `update-partitioned-topic` * `list` @@ -1773,7 +1774,6 @@ Options |---|---|---| |`-p`, `--partitions`|The number of partitions for the topic|0| - ### `delete-partitioned-topic` Delete a partitioned topic. This will also delete all the partitions of the topic if they exist. @@ -1782,6 +1782,14 @@ Usage $ pulsar-admin topics delete-partitioned-topic {persistent|non-persistent} ``` +### `create` +Creates a non-partitioned topic. A non-partitioned topic must explicitly be created by the user if allowAutoTopicCreation or createIfMissing is disabled. + +Usage +```bash +$ pulsar-admin topics create {persistent|non-persistent}://tenant/namespace/topic +``` + ### `get-partitioned-topic-metadata` Get the partitioned topic metadata. If the topic is not created or is a non-partitioned topic, this will return an empty topic with zero partitions.