Skip to content

Commit

Permalink
fix(store): fill the current node id when commiting wal object
Browse files Browse the repository at this point in the history
Signed-off-by: daniel-y <[email protected]>
  • Loading branch information
daniel-y committed Oct 10, 2023
1 parent a32fe3b commit ce8ad4e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ public CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes) {
@Override
public CompletableFuture<Void> commitWalObject(S3WALObject walObject, List<S3StreamObject> streamObjects,
List<Long> compactedObjects) {
return metadataStore.commitWalObject(walObject, streamObjects, compactedObjects);
// The underlying storage layer does not know the current node id when constructing the WAL object.
// So we should fill it here.
S3WALObject newWal = S3WALObject.newBuilder(walObject).setBrokerId(metadataStore.config().nodeId()).build();
return metadataStore.commitWalObject(newWal, streamObjects, compactedObjects);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,44 @@
package com.automq.rocketmq.metadata;

import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.S3WALObject;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.StreamRole;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.MetadataStore;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class DefaultStoreMetadataServiceTest {

private final ControllerConfig config;
@Mock
private ControllerConfig config;
@Mock
private MetadataStore metadataStore;

@Test
public void testCommitWalObject() {
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
S3WALObject walObject = S3WALObject.newBuilder().setObjectId(1L).setBrokerId(10).build();
int nodeId = 100;
when(metadataStore.config()).thenReturn(config);
when(config.nodeId()).thenReturn(nodeId);

public DefaultStoreMetadataServiceTest() {
this.config = Mockito.mock(ControllerConfig.class);
Mockito.when(this.config.nodeId()).thenReturn(1);
Mockito.when(this.config.epoch()).thenReturn(1L);
service.commitWalObject(walObject, new ArrayList<>(), new ArrayList<>());
// Verify the arguments passed to metadataStore.commitWalObject().
S3WALObject newWal = S3WALObject.newBuilder(walObject).setBrokerId(nodeId).build();
Mockito.verify(metadataStore).commitWalObject(ArgumentMatchers.eq(newWal), ArgumentMatchers.anyList(), ArgumentMatchers.anyList());
}

@Test
Expand All @@ -45,78 +64,67 @@ public void testGetStreamId() {
StreamMetadata metadata = StreamMetadata.newBuilder()
.setStreamId(1L).build();
future.complete(metadata);
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
Mockito.when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_DATA)))
.thenReturn(future);
Mockito.when(metadataStore.config()).thenReturn(this.config);

DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
Assertions.assertEquals(1L, service.getStreamId(1L, 2));
}

@Test
public void testGetStreamId_throws() {
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
future.completeExceptionally(new ControllerException(Code.NOT_FOUND_VALUE, "not found"));
Mockito.when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_DATA)))
.thenReturn(future);
Mockito.when(metadataStore.config()).thenReturn(this.config);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
Assertions.assertEquals(-1L, service.getStreamId(1L, 2));
}

@Test
public void testGetOperationLogStreamId() {
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
StreamMetadata metadata = StreamMetadata.newBuilder()
.setStreamId(1L).build();
future.complete(metadata);
Mockito.when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_OPS)))
.thenReturn(future);
Mockito.when(metadataStore.config()).thenReturn(this.config);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
Assertions.assertEquals(1L, service.getOperationLogStreamId(1L, 2));
}

@Test
public void testGetOperationLogStreamId_throws() {
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
future.completeExceptionally(new ControllerException(Code.NOT_FOUND_VALUE, "not found"));
Mockito.when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_OPS)))
.thenReturn(future);
Mockito.when(metadataStore.config()).thenReturn(this.config);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
Assertions.assertEquals(-1L, service.getOperationLogStreamId(1L, 2));
}

@Test
public void testGetRetryStreamId() {
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
StreamMetadata metadata = StreamMetadata.newBuilder()
.setStreamId(1L).build();
future.complete(metadata);
Mockito.when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_RETRY)))
.thenReturn(future);
Mockito.when(metadataStore.config()).thenReturn(this.config);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
Assertions.assertEquals(1L, service.getRetryStreamId(3L, 1L, 2));
}

@Test
public void testGetRetryStreamId_throws() {
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
CompletableFuture<StreamMetadata> future = new CompletableFuture<>();
future.completeExceptionally(new ControllerException(Code.NOT_FOUND_VALUE, "not found"));
Mockito.when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
when(metadataStore.getStream(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt(),
ArgumentMatchers.nullable(Long.class), ArgumentMatchers.eq(StreamRole.STREAM_ROLE_RETRY)))
.thenReturn(future);
DefaultStoreMetadataService service = new DefaultStoreMetadataService(metadataStore);
Expand Down

0 comments on commit ce8ad4e

Please sign in to comment.