Skip to content

Commit

Permalink
fix: polish duplicated topic/group error handling (#253)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 11, 2023
1 parent 88383bc commit da6a14b
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public ControllerServiceImpl(MetadataStore metadataStore) {
public void registerNode(NodeRegistrationRequest request,
StreamObserver<NodeRegistrationReply> responseObserver) {
metadataStore.registerBrokerNode(request.getBrokerName(), request.getAddress(),
request.getInstanceId()).whenComplete((res, e) -> {
request.getInstanceId())
.whenComplete((res, e) -> {
if (null != e) {
if (e.getCause() instanceof ControllerException ex) {
NodeRegistrationReply reply = NodeRegistrationReply.newBuilder()
Expand All @@ -104,8 +105,7 @@ public void registerNode(NodeRegistrationRequest request,
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
else {
} else {
responseObserver.onError(e);
}
} else {
Expand Down Expand Up @@ -152,18 +152,30 @@ public void createTopic(CreateTopicRequest request, StreamObserver<CreateTopicRe
}

try {
this.metadataStore.createTopic(request.getTopic(), request.getCount(), request.getAcceptMessageTypesList()).whenCompleteAsync((topicId, e) -> {
if (null != e) {
responseObserver.onError(e);
} else {
CreateTopicReply reply = CreateTopicReply.newBuilder()
.setStatus(Status.newBuilder().setCode(Code.OK).build())
.setTopicId(topicId)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
});
this.metadataStore.createTopic(request.getTopic(), request.getCount(), request.getAcceptMessageTypesList())
.whenCompleteAsync((topicId, e) -> {
if (null != e) {
if (e instanceof ControllerException ex) {
CreateTopicReply reply = CreateTopicReply.newBuilder()
.setStatus(Status.newBuilder()
.setCode(Code.forNumber(ex.getErrorCode()))
.setMessage(ex.getMessage())
.build())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} else {
responseObserver.onError(e);
}
} else {
CreateTopicReply reply = CreateTopicReply.newBuilder()
.setStatus(Status.newBuilder().setCode(Code.OK).build())
.setTopicId(topicId)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
});
} catch (ControllerException e) {
if (Code.DUPLICATED_VALUE == e.getErrorCode()) {
CreateTopicReply reply = CreateTopicReply.newBuilder()
Expand Down Expand Up @@ -244,7 +256,8 @@ public void updateTopic(UpdateTopicRequest request, StreamObserver<UpdateTopicRe
queueNumber = request.getCount();
}
this.metadataStore.updateTopic(request.getTopicId(), request.getName(), queueNumber,
request.getAcceptMessageTypesList()).whenComplete((res, e) -> {
request.getAcceptMessageTypesList())
.whenComplete((res, e) -> {
if (null != e) {
responseObserver.onError(e);
} else {
Expand Down Expand Up @@ -362,7 +375,8 @@ public void commitOffset(CommitOffsetRequest request, StreamObserver<CommitOffse
metadataStore.commitOffset(request.getGroupId(),
request.getQueue().getTopicId(),
request.getQueue().getQueueId(),
request.getOffset()).whenComplete((res, e) -> {
request.getOffset())
.whenComplete((res, e) -> {
if (null != e) {
if (e instanceof ControllerException ex) {
CommitOffsetReply reply = CommitOffsetReply.newBuilder()
Expand All @@ -387,29 +401,31 @@ public void commitOffset(CommitOffsetRequest request, StreamObserver<CommitOffse

@Override
public void createGroup(CreateGroupRequest request, StreamObserver<CreateGroupReply> responseObserver) {
try {
metadataStore.createGroup(request.getName(), request.getMaxRetryAttempt(), request.getGroupType(),
request.getDeadLetterTopicId()).whenComplete((groupId, e) -> {
if (null != e) {
responseObserver.onError(e);
} else {
metadataStore.createGroup(request.getName(), request.getMaxRetryAttempt(), request.getGroupType(),
request.getDeadLetterTopicId())
.whenComplete((groupId, e) -> {
if (null != e) {
if (e instanceof ControllerException ex) {
CreateGroupReply reply = CreateGroupReply.newBuilder()
.setGroupId(groupId)
.setStatus(Status.newBuilder().setCode(Code.OK).build())
.setStatus(Status.newBuilder()
.setCode(Code.forNumber(ex.getErrorCode()))
.setMessage(ex.getMessage())
.build())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} else {
responseObserver.onError(e);
}
});
} catch (ControllerException e) {
CreateGroupReply reply = CreateGroupReply.newBuilder()
.setStatus(Status.newBuilder()
.setCode(Code.forNumber(e.getErrorCode()))
.setMessage(e.getMessage()).build())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
} else {
CreateGroupReply reply = CreateGroupReply.newBuilder()
.setGroupId(groupId)
.setStatus(Status.newBuilder().setCode(Code.OK).build())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public void onSuccess(CreateTopicReply result) {
switch (result.getStatus().getCode()) {
case OK -> future.complete(result.getTopicId());
case DUPLICATED -> {
ControllerException e = new ControllerException(Code.DUPLICATED_VALUE, "Topic name is taken");
String msg = String.format("Topic name '%s' has been taken", request.getTopic());
ControllerException e = new ControllerException(Code.DUPLICATED_VALUE, msg);
future.completeExceptionally(e);
}
default -> {
Expand Down Expand Up @@ -368,7 +369,23 @@ public CompletableFuture<CreateGroupReply> createGroup(String target,
Futures.addCallback(this.buildStubForTarget(target).createGroup(request), new FutureCallback<>() {
@Override
public void onSuccess(CreateGroupReply result) {
future.complete(result);
switch (result.getStatus().getCode()) {
case OK -> {
future.complete(result);
}
case DUPLICATED -> {
LOGGER.info("Group name {} has been taken", request.getName());
ControllerException e = new ControllerException(result.getStatus().getCodeValue(),
result.getStatus().getMessage());
future.completeExceptionally(e);
}
default -> {
LOGGER.warn("Unexpected error while creating group {}", request.getName());
ControllerException e = new ControllerException(result.getStatus().getCodeValue(),
result.getStatus().getMessage());
future.completeExceptionally(e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s

CompletableFuture<Void> commitOffset(long groupId, long topicId, int queueId, long offset);

CompletableFuture<Long> createGroup(String groupName, int maxRetry, GroupType type,
long dlq) throws ControllerException;
CompletableFuture<Long> createGroup(String groupName, int maxRetry, GroupType type, long dlq);

CompletableFuture<StreamMetadata> getStream(long topicId, int queueId, Long groupId, StreamRole streamRole);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ public CompletableFuture<Long> createTopic(String topicName, int queueNum,
if (null != topicMapper.get(null, topicName)) {
ControllerException e = new ControllerException(Code.DUPLICATED_VALUE, String.format("Topic %s was taken", topicName));
future.completeExceptionally(e);
return future;
}

Topic topic = new Topic();
Expand Down Expand Up @@ -820,8 +821,7 @@ public CompletableFuture<Long> getConsumerOffset(long consumerGroupId, long topi
}

@Override
public CompletableFuture<Long> createGroup(String groupName, int maxRetry, GroupType type,
long deadLetterTopicId) throws ControllerException {
public CompletableFuture<Long> createGroup(String groupName, int maxRetry, GroupType type, long deadLetterTopicId) {
CompletableFuture<Long> future = new CompletableFuture<>();
for (; ; ) {
if (isLeader()) {
Expand All @@ -834,6 +834,7 @@ public CompletableFuture<Long> createGroup(String groupName, int maxRetry, Group
if (!groups.isEmpty()) {
ControllerException e = new ControllerException(Code.DUPLICATED_VALUE, String.format("Group name '%s' is not available", groupName));
future.completeExceptionally(e);
return future;
}

Group group = new Group();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,12 @@ public void testCreateTopic_OpenStream_CloseStream() throws IOException, Executi
.addAllAcceptMessageTypes(messageTypeList)
.build()).get();

Assertions.assertThrows(ExecutionException.class, () -> client.createTopic(String.format("localhost:%d", port), CreateTopicRequest.newBuilder()
.setTopic(topicName)
.setCount(queueNum)
.addAllAcceptMessageTypes(messageTypeList)
.build()).get());

StreamMetadata metadata = metadataStore.getStream(topicId, 0, null, StreamRole.STREAM_ROLE_DATA).get();
streamId = metadata.getStreamId();
OpenStreamRequest request = OpenStreamRequest.newBuilder()
Expand Down

0 comments on commit da6a14b

Please sign in to comment.