Skip to content

Commit

Permalink
fix: return complete stream metadata on implicit creating (#230)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 10, 2023
1 parent 450e0dc commit a32fe3b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -932,23 +932,49 @@ public CompletableFuture<StreamMetadata> getStream(long topicId, int queueId, Lo
int nodeId = assignment.getDstNodeId();
long streamId = createStream(streamMapper, topicId, queueId, groupId, streamRole, nodeId);
session.commit();

Stream stream = streamMapper.getByStreamId(streamId);
return StreamMetadata.newBuilder()
.setStreamId(streamId)
.setState(StreamState.UNINITIALIZED)
.setStartOffset(0)
.setEpoch(stream.getEpoch())
.setRangeId(stream.getRangeId())
.setStartOffset(stream.getStartOffset())
// Stream is uninitialized, its end offset is definitely 0.
.setEndOffset(0)
.setState(stream.getState())
.build();
}

// For other types of streams, creation is explicit.
ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE,
String.format("Stream for topic-id=%d, queue-id=%d, stream-role=%s is not found", topicId, queueId, streamRole.name()));
throw new CompletionException(e);
} else {
Stream stream = streams.get(0);
long endOffset = 0;
switch (stream.getState()) {
case UNINITIALIZED -> {
}
case DELETED -> {
ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE,
String.format("Stream for topic-id=%d, queue-id=%d, stream-role=%s has been deleted",
topicId, queueId, streamRole.name()));
throw new CompletionException(e);
}
case CLOSING, OPEN, CLOSED -> {
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);
Range range = rangeMapper.get(stream.getRangeId(), stream.getId(), null);
assert null != range;
endOffset = range.getEndOffset();
}
}
return StreamMetadata.newBuilder()
.setEpoch(stream.getEpoch())
.setStreamId(stream.getId())
.setEpoch(stream.getEpoch())
.setRangeId(stream.getRangeId())
.setState(stream.getState())
.setStartOffset(stream.getStartOffset())
.setEndOffset(endOffset)
.setState(stream.getState())
.build();
}
}
Expand Down Expand Up @@ -1176,7 +1202,7 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, i
// nodeId should be equal to stream.srcNodeId
if (nodeId != stream.getSrcNodeId()) {
LOGGER.warn("State of Stream[stream-id={}] is {}. Current owner should be {}, while {} " +
"is attempting to open. Fenced!", stream.getId(), stream.getState(), stream.getSrcNodeId(),
"is attempting to open. Fenced!", stream.getId(), stream.getState(), stream.getSrcNodeId(),
nodeId);
ControllerException e = new ControllerException(Code.FENCED_VALUE, "Node does not match");
future.completeExceptionally(e);
Expand Down Expand Up @@ -1230,7 +1256,6 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, i
}
}


// Now that the request is valid, update the stream's epoch and create a new range for this broker

// If stream.state == uninitialized, its stream.rangeId will be -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1464,19 +1464,17 @@ public void testGetStream() throws IOException, ExecutionException, InterruptedE
stream.setTopicId(1L);
stream.setQueueId(2);
stream.setRangeId(0);
stream.setState(StreamState.CLOSED);
stream.setState(StreamState.UNINITIALIZED);
stream.setStreamRole(StreamRole.STREAM_ROLE_DATA);
stream.setStartOffset(1234);
streamMapper.create(stream);
dataStreamId = stream.getId();

stream.setState(StreamState.CLOSED);
stream.setStreamRole(StreamRole.STREAM_ROLE_OPS);
stream.setStartOffset(1234);
streamMapper.create(stream);
opsStreamId = stream.getId();

stream.setState(StreamState.CLOSED);
stream.setStreamRole(StreamRole.STREAM_ROLE_RETRY);
stream.setGroupId(3L);
stream.setStartOffset(1234);
Expand Down

0 comments on commit a32fe3b

Please sign in to comment.