Skip to content

Commit

Permalink
fix: revamp open stream implementation (#223)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 10, 2023
1 parent 4945b61 commit ee104e5
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ public CompletableFuture<Void> trimStream(long streamId, long streamEpoch,
}

@Override
public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch, int nodeId) {
public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, int nodeId) {
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
for (; ; ) {
if (isLeader()) {
Expand All @@ -1160,136 +1160,136 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEp
}
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);
long nextStreamEpoch = streamEpoch + 1;
// verify epoch match
Stream stream = streamMapper.getByStreamId(streamId);
if (null == stream) {
// Verify target stream exists
if (null == stream || stream.getState() == StreamState.DELETED) {
ControllerException e = new ControllerException(Code.NOT_FOUND_VALUE,
String.format("Stream[stream-id=%d] is not found", streamId)
);
future.completeExceptionally(e);
return future;
}

if (nodeId != stream.getDstNodeId()) {
LOGGER.warn("dst-node-id of stream {} does not match. Expecting {}, actual request node {}",
streamId, stream.getDstNodeId(), nodeId);
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, "Node is not match");
future.completeExceptionally(e);
return future;
// Verify stream owner is correct
switch (stream.getState()) {
case CLOSING -> {
// nodeId should be equal to stream.srcNodeId
if (nodeId != stream.getSrcNodeId()) {
LOGGER.warn("State of Stream[stream-id={}] is {}. Current owner should be {}, while {} " +
"is attempting to open. Fenced!", stream.getId(), stream.getState(), stream.getSrcNodeId(),
nodeId);
ControllerException e = new ControllerException(Code.FENCED_VALUE, "Node does not match");
future.completeExceptionally(e);
return future;
}
}
case OPEN, CLOSED, UNINITIALIZED -> {
// nodeId should be equal to stream.dstNodeId
if (nodeId != stream.getDstNodeId()) {
LOGGER.warn("State of Stream[stream-id={}] is {}. Its current owner is {}, {} is attempting to open. Fenced!",
streamId, stream.getState(), stream.getDstNodeId(), nodeId);
ControllerException e = new ControllerException(Code.FENCED_VALUE, "Node does not match");
future.completeExceptionally(e);
return future;
}
}
}

if (stream.getEpoch() > streamEpoch && !Objects.equals(stream.getSrcNodeId(), stream.getDstNodeId())) {
LOGGER.warn("stream {}'s epoch {} is larger than request epoch {}",
streamId, stream.getEpoch(), streamEpoch);
// Verify epoch
if (epoch != stream.getEpoch()) {
LOGGER.warn("Epoch of Stream[stream-id={}] is {}, while the open stream request epoch is {}",
streamId, stream.getEpoch(), epoch);
ControllerException e = new ControllerException(Code.FENCED_VALUE, "Epoch of stream is deprecated");
future.completeExceptionally(e);
return future;
}

if (stream.getEpoch() == nextStreamEpoch) {
// broker may use the same epoch to open -> close -> open stream.
// verify broker
Range range = rangeMapper.get(stream.getRangeId(), streamId, null);

if (Objects.isNull(range)) {
LOGGER.warn("stream {}'s current range {} not exist when open stream with epoch: {}",
streamId, stream.getEpoch(), streamEpoch);
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, "Expected range is missing");
future.completeExceptionally(e);
return future;
}

// ensure that the broker corresponding to the range is alive
if (!this.nodes.containsKey(range.getBrokerId())
|| !this.nodes.get(range.getBrokerId()).isAlive(config)) {
LOGGER.warn("Node[node-id={}] that backs up Stream[stream-id={}, epoch={}] does not exist or is inactive",
range.getBrokerId(), streamId, stream.getEpoch());
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE,
String.format("Node[node-id=%d] backing up the stream is inactive", range.getBrokerId()));
future.completeExceptionally(e);
return future;
}

// epoch equals, node equals, regard it as redundant open operation, just return success
if (stream.getState() == StreamState.CLOSED) {
streamMapper.updateStreamState(streamId, stream.getTopicId(), stream.getQueueId(),
StreamState.OPEN);
// Commit transaction
session.commit();
// Verify that current stream state allows open ops
switch (stream.getState()) {
case CLOSING, OPEN -> {
LOGGER.warn("Stream[stream-id={}] is already OPEN with epoch={}", streamId, stream.getEpoch());
Range range = rangeMapper.get(stream.getRangeId(), streamId, null);
StreamMetadata metadata = StreamMetadata.newBuilder()
.setStreamId(streamId)
.setEpoch(stream.getEpoch())
.setEpoch(epoch)
.setRangeId(stream.getRangeId())
.setStartOffset(stream.getStartOffset())
.setEndOffset(range.getEndOffset())
.setState(StreamState.OPEN)
.setState(stream.getState())
.build();
future.complete(metadata);
return future;
}
case UNINITIALIZED, CLOSED -> {
}
default -> {
String msg = String.format("State of Stream[stream-id=%d] is %s, which is not supported",
stream.getId(), stream.getState());
future.completeExceptionally(new ControllerException(Code.ILLEGAL_STATE_VALUE, msg));
return future;
}
}

int rangeId = stream.getRangeId() + 1;
Range prevRange = rangeMapper.get(rangeId - 1, streamId, null);
// Make open reentrant
if (stream.getState() == StreamState.OPEN) {
LOGGER.warn("Stream[stream-id={}] is already OPEN with epoch={}", streamId, stream.getEpoch());
StreamMetadata metadata = StreamMetadata.newBuilder()
.setStreamId(streamId)
.setEpoch(nextStreamEpoch)
.setRangeId(stream.getRangeId())
.setStartOffset(stream.getStartOffset())
.setEndOffset(prevRange.getEndOffset())
.setState(StreamState.OPEN)
.build();
future.complete(metadata);
return future;
}
// now the request in valid, update the stream's epoch and create a new range for this broker

// Now that the request is valid, update the stream's epoch and create a new range for this broker

// If stream.state == uninitialized, its stream.rangeId will be -1;
// If stream.state == closed, stream.rangeId will be the previous one;

// get new range's start offset
// default regard this range is the first range in stream, use 0 as start offset
// if stream is not uninitialized, use previous range's end offset as start offset
long startOffset = stream.getState() == StreamState.UNINITIALIZED ? 0 : prevRange.getEndOffset();
long startOffset;
if (StreamState.UNINITIALIZED == stream.getState()) {
// default regard this range is the first range in stream, use 0 as start offset
startOffset = 0;
} else {
assert StreamState.CLOSED == stream.getState();
Range prevRange = rangeMapper.get(stream.getRangeId(), streamId, null);
// if stream is closed, use previous range's end offset as start offset
startOffset = prevRange.getEndOffset();
}

// stream update record
stream.setEpoch(nextStreamEpoch);
stream.setRangeId(rangeId);
// Increase stream epoch
stream.setEpoch(epoch + 1);
// Increase range-id
stream.setRangeId(stream.getRangeId() + 1);
stream.setStartOffset(stream.getStartOffset());
stream.setState(StreamState.OPEN);
streamMapper.update(stream);

// range create record
// Create a new range for the stream
Range range = new Range();
range.setStreamId(streamId);
range.setBrokerId(stream.getDstNodeId());
range.setStartOffset(startOffset);
range.setEndOffset(startOffset);
range.setEpoch(nextStreamEpoch);
range.setRangeId(rangeId);
range.setEpoch(epoch + 1);
range.setRangeId(stream.getRangeId());
rangeMapper.create(range);
LOGGER.info("Node[node-id={}] opens stream [stream-id={}] with epoch={}",
this.lease.getNodeId(), streamId, streamEpoch);

this.lease.getNodeId(), streamId, epoch + 1);
// Commit transaction
session.commit();

// Build open stream response
StreamMetadata metadata = StreamMetadata.newBuilder()
.setStreamId(streamId)
.setEpoch(nextStreamEpoch)
.setRangeId(rangeId)
.setEpoch(epoch + 1)
.setRangeId(stream.getRangeId())
.setStartOffset(stream.getStartOffset())
.setEndOffset(range.getEndOffset())
.setState(StreamState.OPEN)
.build();
future.complete(metadata);
return future;
} catch (Throwable e) {
LOGGER.error("Unexpected exception raised while open stream", e);
future.completeExceptionally(e);
return future;
}
} else {
OpenStreamRequest request = OpenStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(streamEpoch)
.setStreamEpoch(epoch)
.build();
try {
return this.controllerClient.openStream(this.leaderAddress(), request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public void setProperties(Properties properties) {
if (null == dataSource) {
HikariConfig config = new HikariConfig(properties);
config.setMaximumPoolSize(10);
config.setIdleTimeout(100);
config.setLeakDetectionThreshold(2100);
config.setIdleTimeout(10000);
config.setLeakDetectionThreshold(20000);
config.setMaxLifetime(30000);
dataSource = new HikariDataSource(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ List<Group> list(@Param("id") Long id,
* @param id Optional group-id to delete
* @return Number of rows affected.
*/
int delete(Long id);
int delete(@Param("id") Long id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,8 @@ public void testCreateTopic_OpenStream_CloseStream() throws IOException, Executi
.until(metadataStore::isLeader);

try (ControllerTestServer testServer = new ControllerTestServer(0, new ControllerServiceImpl(metadataStore));
ControllerClient client = new GrpcControllerClient()
ControllerClient client = new GrpcControllerClient();
SqlSession session = getSessionFactory().openSession()
) {
testServer.start();
int port = testServer.getPort();
Expand All @@ -1530,53 +1531,44 @@ public void testCreateTopic_OpenStream_CloseStream() throws IOException, Executi

StreamMetadata metadata = metadataStore.getStream(topicId, 0, null, StreamRole.STREAM_ROLE_DATA).get();
streamId = metadata.getStreamId();
streamEpoch = metadata.getEpoch();
OpenStreamRequest request = OpenStreamRequest.newBuilder()
.setStreamId(metadata.getStreamId())
.setStreamEpoch(metadata.getEpoch())
.setBrokerId(nodeId)
.build();

client.openStream(String.format("localhost:%d", port), request).get();
OpenStreamReply reply = client.openStream(String.format("localhost:%d", port), request).get();
StreamMetadata openStream = reply.getStreamMetadata();
Assertions.assertEquals(0, openStream.getStartOffset());
Assertions.assertEquals(streamEpoch + 1, openStream.getEpoch());
Assertions.assertEquals(metadata.getEpoch() + 1, openStream.getEpoch());
Assertions.assertEquals(0, openStream.getRangeId());
Assertions.assertEquals(StreamState.OPEN, openStream.getState());
rangeId = openStream.getRangeId();

client.closeStream(String.format("localhost:%d", port), CloseStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(streamEpoch + 1)
.setStreamEpoch(openStream.getEpoch())
.setBrokerId(nodeId)
.build()).get();

client.closeStream(String.format("localhost:%d", port), CloseStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(streamEpoch + 1)
.setBrokerId(nodeId)
.build()).get();
// Verify
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);

Stream stream = streamMapper.getByStreamId(streamId);
Assertions.assertEquals(streamId, stream.getId());
Assertions.assertEquals(0, stream.getStartOffset());
Assertions.assertEquals(openStream.getEpoch(), stream.getEpoch());
Assertions.assertEquals(0, stream.getRangeId());
Assertions.assertEquals(StreamState.CLOSED, stream.getState());

Range range = rangeMapper.get(rangeId, streamId, null);
Assertions.assertEquals(0, range.getRangeId());
Assertions.assertEquals(streamId, range.getStreamId());
Assertions.assertEquals(openStream.getEpoch(), range.getEpoch());
Assertions.assertEquals(0, range.getStartOffset());
Assertions.assertEquals(0, range.getEndOffset());
}
}
long targetStreamEpoch = streamEpoch + 1;
try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);

Stream stream = streamMapper.getByStreamId(streamId);
Assertions.assertEquals(streamId, stream.getId());
Assertions.assertEquals(0, stream.getStartOffset());
Assertions.assertEquals(targetStreamEpoch, stream.getEpoch());
Assertions.assertEquals(0, stream.getRangeId());
Assertions.assertEquals(StreamState.CLOSED, stream.getState());

Range range = rangeMapper.get(rangeId, streamId, null);
Assertions.assertEquals(0, range.getRangeId());
Assertions.assertEquals(streamId, range.getStreamId());
Assertions.assertEquals(targetStreamEpoch, range.getEpoch());
Assertions.assertEquals(0, range.getStartOffset());
Assertions.assertEquals(0, range.getEndOffset());
}
}
}
Loading

0 comments on commit ee104e5

Please sign in to comment.