Skip to content

Commit

Permalink
feat: define store facade (#265)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 12, 2023
1 parent 47723dd commit 2469fbc
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.broker;

import com.automq.rocketmq.broker.protocol.GrpcProtocolServer;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.ControllerServiceImpl;
Expand All @@ -30,6 +31,7 @@
import com.automq.rocketmq.proxy.config.ProxyConfiguration;
import com.automq.rocketmq.proxy.processor.ExtendMessagingProcessor;
import com.automq.rocketmq.proxy.service.DefaultServiceManager;
import com.automq.rocketmq.store.DataStoreFacade;
import com.automq.rocketmq.store.MessageStoreBuilder;
import com.automq.rocketmq.store.api.MessageStore;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
Expand All @@ -53,14 +55,16 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
ProxyConfiguration.intConfig(brokerConfig.proxy());

metadataStore = MetadataStoreBuilder.build(brokerConfig);
// Start the node registrar first, so that the node is registered before the proxy starts.
metadataStore.start();

proxyMetadataService = new DefaultProxyMetadataService(metadataStore);
storeMetadataService = new DefaultStoreMetadataService(metadataStore);

messageStore = MessageStoreBuilder.build(brokerConfig.store(), brokerConfig.s3Stream(), storeMetadataService);

DataStore dataStore = new DataStoreFacade(messageStore.getS3ObjectManager(), messageStore.getTopicQueueManager());
metadataStore.setDataStore(dataStore);


serviceManager = new DefaultServiceManager(brokerConfig.proxy(), proxyMetadataService, messageStore);
messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager);

Expand All @@ -73,6 +77,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {

@Override
public void start() throws Exception {
// Start the node registrar first, so that the node is registered before the proxy starts.
metadataStore.start();

messageStore.start();
messagingProcessor.start();
grpcServer.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,23 @@
* limitations under the License.
*/

package com.automq.rocketmq.common;
package com.automq.rocketmq.common.api;

import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface StoreHandle {
CompletableFuture<Void> close(long topicId, int queueId);
public interface DataStore {

CompletableFuture<Void> closeQueue(long topicId, int queueId);

/**
* Delete a list of S3 objects by object id.
* <p>
* Regard non-exist object as success delete.
*
* @param objectIds the objects to delete.
* @return the future of delete result, contains the deleted object id.
*/
CompletableFuture<List<Long>> batchDeleteS3Objects(List<Long> objectIds);

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import apache.rocketmq.controller.v1.S3WALObject;
import apache.rocketmq.controller.v1.StreamMetadata;
import apache.rocketmq.controller.v1.MessageType;
import com.automq.rocketmq.common.StoreHandle;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.database.dao.Lease;
Expand Down Expand Up @@ -62,9 +62,9 @@ public interface MetadataStore extends Closeable {

void setRole(Role role);

StoreHandle getStoreHandle();
DataStore getDataStore();

void setStoreHandle(StoreHandle storeHandle);
void setDataStore(DataStore dataStore);

void start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.StoreHandle;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.system.S3Constants;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.BrokerNode;
Expand Down Expand Up @@ -140,7 +140,7 @@ public class DefaultMetadataStore implements MetadataStore {

private final Gson gson;

private StoreHandle storeHandle;
private DataStore dataStore;

public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFactory, ControllerConfig config) {
this.controllerClient = client;
Expand Down Expand Up @@ -170,13 +170,13 @@ public ControllerClient controllerClient() {
return controllerClient;
}

public StoreHandle getStoreHandle() {
return storeHandle;
public DataStore getDataStore() {
return dataStore;
}

@Override
public void setStoreHandle(StoreHandle storeHandle) {
this.storeHandle = storeHandle;
public void setDataStore(DataStore dataStore) {
this.dataStore = dataStore;
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.automq.rocketmq.controller.metadata.database.tasks;

import apache.rocketmq.controller.v1.AssignmentStatus;
import com.automq.rocketmq.common.StoreHandle;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.controller.metadata.database.dao.QueueAssignment;
import com.automq.rocketmq.controller.metadata.database.mapper.QueueAssignmentMapper;
Expand Down Expand Up @@ -83,9 +83,9 @@ public int hashCode() {
}

public void doNext() {
StoreHandle storeHandle = metadataStore.getStoreHandle();
DataStore dataStore = metadataStore.getDataStore();
switch (next) {
case STORE_CLOSE -> closeQueue(storeHandle,
case STORE_CLOSE -> closeQueue(dataStore,
assignment.getTopicId(), assignment.getQueueId());

case NOTIFY_LEADER -> ScanYieldingQueueTask.this.metadataStore
Expand All @@ -101,8 +101,8 @@ public void doNext() {
}
}

private void closeQueue(StoreHandle handle, long topicId, int queueId) {
handle.close(topicId, queueId)
private void closeQueue(DataStore handle, long topicId, int queueId) {
handle.closeQueue(topicId, queueId)
.whenComplete((res, e) -> {
if (null != e) {
LOGGER.error("Failed to close queue[topic-id={}, queue-id={}]", topicId, queueId);
Expand Down Expand Up @@ -135,9 +135,9 @@ public void run() {
List<QueueAssignment> assignments = assignmentMapper.list(null, metadataStore.config().nodeId(),
null, AssignmentStatus.ASSIGNMENT_STATUS_YIELDING, this.lastScanTime);

StoreHandle storeHandle = metadataStore.getStoreHandle();
DataStore dataStore = metadataStore.getDataStore();

if (null != storeHandle && !assignments.isEmpty()) {
if (null != dataStore && !assignments.isEmpty()) {
for (QueueAssignment assignment : assignments) {
this.assignments.putIfAbsent(new ImmutablePair<>(assignment.getTopicId(), assignment.getQueueId()),
new QueueAssignmentStateMachine(assignment));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.store.api.LogicQueue;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.api.S3ObjectManager;
import com.automq.rocketmq.store.api.TopicQueueManager;
import com.automq.rocketmq.store.model.message.AckResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult;
import com.automq.rocketmq.store.model.message.Filter;
Expand All @@ -35,6 +37,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;

public class MockMessageStore implements MessageStore {
Expand All @@ -59,6 +62,16 @@ public void shutdown() {

}

@Override
public TopicQueueManager getTopicQueueManager() {
throw new NotImplementedException();
}

@Override
public S3ObjectManager getS3ObjectManager() {
throw new NotImplementedException();
}

@Override
public CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int queueId, Filter filter,
int batchSize, boolean fifo, boolean retry, long invisibleDuration) {
Expand Down
46 changes: 46 additions & 0 deletions store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.store;

import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.store.api.S3ObjectManager;
import com.automq.rocketmq.store.api.TopicQueueManager;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class DataStoreFacade implements DataStore {

private final S3ObjectManager s3ObjectManager;

private final TopicQueueManager topicQueueManager;

public DataStoreFacade(S3ObjectManager s3ObjectManager, TopicQueueManager topicQueueManager) {
this.s3ObjectManager = s3ObjectManager;
this.topicQueueManager = topicQueueManager;
}

@Override
public CompletableFuture<Void> closeQueue(long topicId, int queueId) {
return topicQueueManager.close(topicId, queueId);
}

@Override
public CompletableFuture<List<Long>> batchDeleteS3Objects(List<Long> objectIds) {
return s3ObjectManager.delete(objectIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.automq.rocketmq.metadata.api.StoreMetadataService;
import com.automq.rocketmq.store.api.LogicQueue;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.api.S3ObjectManager;
import com.automq.rocketmq.store.api.StreamStore;
import com.automq.rocketmq.store.api.TopicQueueManager;
import com.automq.rocketmq.store.model.generated.ReceiptHandle;
Expand All @@ -36,6 +37,7 @@
import com.automq.rocketmq.store.service.api.KVService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.NotImplementedException;

import static com.automq.rocketmq.store.util.SerializeUtil.decodeReceiptHandle;

Expand Down Expand Up @@ -71,6 +73,21 @@ public MessageStoreImpl(StoreConfig config, StreamStore streamStore,
this.reviveService = reviveService;
}

@Override
public TopicQueueManager getTopicQueueManager() {
return topicQueueManager;
}

/**
* TODO: implement it
*
* @return S3ObjectManager instance
*/
@Override
public S3ObjectManager getS3ObjectManager() {
throw new NotImplementedException();
}

@Override
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
import java.util.concurrent.CompletableFuture;

public interface MessageStore extends Lifecycle {

TopicQueueManager getTopicQueueManager();

S3ObjectManager getS3ObjectManager();

/**
* Pop message from specified topic and queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package com.automq.rocketmq.store.api;

import com.automq.rocketmq.common.StoreHandle;
import com.automq.rocketmq.common.util.Lifecycle;
import java.util.concurrent.CompletableFuture;

public interface TopicQueueManager extends Lifecycle, StoreHandle {
public interface TopicQueueManager extends Lifecycle {

CompletableFuture<LogicQueue> getOrCreate(long topicId, int queueId);

CompletableFuture<Void> close(long topicId, int queueId);
}

0 comments on commit 2469fbc

Please sign in to comment.