Skip to content

Commit

Permalink
fix: fix openStream and closeStream to avoid duplicate opens on diffe…
Browse files Browse the repository at this point in the history
…rent nodes (#183)

Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye authored Oct 8, 2023
1 parent 3099c33 commit b9f040e
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void createRetryStream(CreateRetryStreamRequest request,

@Override
public void openStream(OpenStreamRequest request, StreamObserver<OpenStreamReply> responseObserver) {
metadataStore.openStream(request.getStreamId(), request.getStreamEpoch()).whenComplete((metadata, e) -> {
metadataStore.openStream(request.getStreamId(), request.getStreamEpoch(), request.getBrokerId()).whenComplete((metadata, e) -> {
if (null != e) {
if (e instanceof ControllerException ex) {
OpenStreamReply reply = OpenStreamReply.newBuilder()
Expand All @@ -464,7 +464,7 @@ public void openStream(OpenStreamRequest request, StreamObserver<OpenStreamReply

@Override
public void closeStream(CloseStreamRequest request, StreamObserver<CloseStreamReply> responseObserver) {
metadataStore.closeStream(request.getStreamId(), request.getStreamEpoch()).whenComplete((res, e) -> {
metadataStore.closeStream(request.getStreamId(), request.getStreamEpoch(), request.getBrokerId()).whenComplete((res, e) -> {
if (null != e) {
if (e instanceof ControllerException ex) {
CloseStreamReply reply = CloseStreamReply.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ CompletableFuture<Long> createGroup(String groupName, int maxRetry, GroupType ty

CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) throws ControllerException;

CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch);
CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch, int nodeId);

CompletableFuture<Void> closeStream(long streamId, long streamEpoch);
CompletableFuture<Void> closeStream(long streamId, long streamEpoch, int nodeId);

CompletableFuture<List<StreamMetadata>> listOpenStreams(int nodeId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,9 +866,7 @@ public CompletableFuture<StreamMetadata> getStream(long topicId, int queueId, Lo
int nodeId = assignments.isEmpty() ? 0 : assignments.get(0).getDstNodeId();
long streamId = createStream(streamMapper, topicId, queueId, groupId, streamRole, nodeId);
return StreamMetadata.newBuilder()
.setEpoch(0)
.setStreamId(streamId)
.setRangeId(0)
.setState(StreamState.UNINITIALIZED)
.setStartOffset(0)
.build();
Expand Down Expand Up @@ -1085,7 +1083,7 @@ public CompletableFuture<Void> trimStream(long streamId, long streamEpoch,
}

@Override
public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch) {
public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEpoch, int nodeId) {
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
for (; ; ) {
if (isLeader()) {
Expand All @@ -1106,6 +1104,14 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEp
return future;
}

if (nodeId != stream.getDstNodeId()) {
LOGGER.warn("stream {}'s dst node {} is not match request node {}",
streamId, stream.getDstNodeId(), nodeId);
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, "Node is not match");
future.completeExceptionally(e);
return future;
}

if (stream.getEpoch() > streamEpoch && !Objects.equals(stream.getSrcNodeId(), stream.getDstNodeId())) {
LOGGER.warn("stream {}'s epoch {} is larger than request epoch {}",
streamId, stream.getEpoch(), streamEpoch);
Expand Down Expand Up @@ -1226,7 +1232,7 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEp
}

@Override
public CompletableFuture<Void> closeStream(long streamId, long streamEpoch) {
public CompletableFuture<Void> closeStream(long streamId, long streamEpoch, int nodeId) {
CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
if (isLeader()) {
Expand All @@ -1245,6 +1251,15 @@ public CompletableFuture<Void> closeStream(long streamId, long streamEpoch) {
break;
}

// Verify node
if (nodeId != stream.getDstNodeId()) {
LOGGER.warn("stream {}'s dst node {} is not match request node {}",
streamId, stream.getDstNodeId(), nodeId);
ControllerException e = new ControllerException(Code.ILLEGAL_STATE_VALUE, "Node is not match");
future.completeExceptionally(e);
break;
}

// Verify epoch
if (streamEpoch < stream.getEpoch()) {
ControllerException e = new ControllerException(Code.FENCED_VALUE, "Stream epoch is deprecated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import apache.rocketmq.controller.v1.ControllerServiceGrpc;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.HeartbeatReply;
import apache.rocketmq.controller.v1.HeartbeatRequest;
Expand All @@ -40,6 +41,7 @@
import apache.rocketmq.controller.v1.PrepareS3ObjectsReply;
import apache.rocketmq.controller.v1.PrepareS3ObjectsRequest;
import apache.rocketmq.controller.v1.S3ObjectState;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.StreamState;
import apache.rocketmq.controller.v1.TopicStatus;
Expand Down Expand Up @@ -526,7 +528,7 @@ public void testOpenStream() throws IOException, ExecutionException, Interrupted
int queueId = 2;
int srcNodeId = 1;
int dstNodeId = 2;
long streamId;
long streamId, streamEpoch;

try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
Expand All @@ -540,6 +542,7 @@ public void testOpenStream() throws IOException, ExecutionException, Interrupted
stream.setDstNodeId(dstNodeId);
streamMapper.create(stream);
streamId = stream.getId();
streamEpoch = stream.getEpoch();
session.commit();
}

Expand All @@ -556,8 +559,8 @@ public void testOpenStream() throws IOException, ExecutionException, Interrupted
int port = testServer.getPort();
OpenStreamRequest request = OpenStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(0)
.setBrokerEpoch(1)
.setStreamEpoch(streamEpoch)
.setBrokerId(2)
.build();
client.openStream(String.format("localhost:%d", port), request).get();
}
Expand Down Expand Up @@ -693,7 +696,7 @@ public void testOpenStream_Fenced() throws IOException, ExecutionException, Inte
OpenStreamRequest request = OpenStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(0)
.setBrokerEpoch(1)
.setBrokerId(2)
.build();
OpenStreamReply reply = client.openStream(String.format("localhost:%d", port), request).get();
Assertions.assertEquals(Code.FENCED, reply.getStatus().getCode());
Expand Down Expand Up @@ -1554,4 +1557,105 @@ public void test3WALObjects_2PC() throws IOException, ExecutionException, Interr
}

}


@Test
public void testCreateTopic_OpenStream_CloseStream() throws IOException, ExecutionException, InterruptedException, ControllerException {
ControllerClient controllerClient = Mockito.mock(ControllerClient.class);
ControllerConfig controllerConfig = Mockito.mock(ControllerConfig.class);
Mockito.when(controllerConfig.nodeId()).thenReturn(1);
Mockito.when(controllerConfig.scanIntervalInSecs()).thenReturn(1);
Mockito.when(controllerConfig.leaseLifeSpanInSecs()).thenReturn(2);
Mockito.when(controllerConfig.scanIntervalInSecs()).thenReturn(1);

long streamId, streamEpoch;
int rangeId;
int nodeId = 1;

String topicName = "t1";
int queueNum = 4;
List<MessageType> messageTypeList = new ArrayList<>();
messageTypeList.add(MessageType.NORMAL);
messageTypeList.add(MessageType.FIFO);
messageTypeList.add(MessageType.DELAY);

try (MetadataStore metadataStore = new DefaultMetadataStore(controllerClient, getSessionFactory(), controllerConfig)) {
metadataStore.start();
Awaitility.await().with().pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(metadataStore::isLeader);

try (ControllerTestServer testServer = new ControllerTestServer(0, new ControllerServiceImpl(metadataStore));
ControllerClient client = new GrpcControllerClient()
) {
testServer.start();
int port = testServer.getPort();
String name = "broker-0";
String address = "localhost:1234";
String instanceId = "i-register";

Node node = new Node();
node.setName(name);
node.setAddress(address);
node.setInstanceId(instanceId);
node.setId(nodeId);
((DefaultMetadataStore) metadataStore).addBrokerNode(node);

Long topicId = client.createTopic(String.format("localhost:%d", port), CreateTopicRequest.newBuilder()
.setTopic(topicName)
.setCount(queueNum)
.addAllAcceptMessageTypes(messageTypeList)
.build()).get();

StreamMetadata metadata = metadataStore.getStream(topicId, 0, null, StreamRole.STREAM_ROLE_DATA).get();
streamId = metadata.getStreamId();
streamEpoch = metadata.getEpoch();
OpenStreamRequest request = OpenStreamRequest.newBuilder()
.setStreamId(metadata.getStreamId())
.setStreamEpoch(metadata.getEpoch())
.setBrokerId(nodeId)
.build();

client.openStream(String.format("localhost:%d", port), request).get();
OpenStreamReply reply = client.openStream(String.format("localhost:%d", port), request).get();
StreamMetadata openStream = reply.getStreamMetadata();
Assertions.assertEquals(0, openStream.getStartOffset());
Assertions.assertEquals(streamEpoch + 1, openStream.getEpoch());
Assertions.assertEquals(0, openStream.getRangeId());
Assertions.assertEquals(StreamState.OPEN, openStream.getState());
rangeId = openStream.getRangeId();

client.closeStream(String.format("localhost:%d", port), CloseStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(streamEpoch + 1)
.setBrokerId(nodeId)
.build()).get();

client.closeStream(String.format("localhost:%d", port), CloseStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(streamEpoch + 1)
.setBrokerId(nodeId)
.build()).get();
}
}
long targetStreamEpoch = streamEpoch + 1;
try (SqlSession session = getSessionFactory().openSession()) {
StreamMapper streamMapper = session.getMapper(StreamMapper.class);
RangeMapper rangeMapper = session.getMapper(RangeMapper.class);

Stream stream = streamMapper.getByStreamId(streamId);
Assertions.assertEquals(streamId, stream.getId());
Assertions.assertEquals(0, stream.getStartOffset());
Assertions.assertEquals(targetStreamEpoch, stream.getEpoch());
Assertions.assertEquals(0, stream.getRangeId());
Assertions.assertEquals(StreamState.CLOSED, stream.getState());

Range range = rangeMapper.get(rangeId, streamId, null);
Assertions.assertEquals(0, range.getRangeId());
Assertions.assertEquals(streamId, range.getStreamId());
Assertions.assertEquals(targetStreamEpoch, range.getEpoch());
Assertions.assertEquals(0, range.getStartOffset());
Assertions.assertEquals(0, range.getEndOffset());
}
}
}
Loading

0 comments on commit b9f040e

Please sign in to comment.