Skip to content

Commit

Permalink
Support tiered storage and rotating zone ranges automatically + polish
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Jan 22, 2022
1 parent 158c422 commit 937c564
Show file tree
Hide file tree
Showing 23 changed files with 317 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy-playground.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
sudo docker-compose -f docker-compose.standalone.yml down
sudo docker system prune -a -f
sudo docker system prune --volumes -f
rm -rf /var/log/journal
sudo rm -rf /var/log/journal
sudo git pull
sudo git reset --hard
sudo ENV=dev,demo docker-compose -f docker-compose.standalone.yml --profile monitoring up --force-recreate -d
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Note: The main disadvantage of the current Turms project is that it does not pro

### Business Features

1. Support a complete set of [IM features](https://turms-im.github.io/docs/features). Turms supports almost all IM features supported by commercial instant messaging products and no restrictions on business features. And Turms also supports advanced features such as unwanted words filtering (using Aho-Corasick automaton with double array trie).
1. Support a complete set of [IM features](https://turms-im.github.io/docs/features). Turms supports almost all IM features supported by commercial instant messaging products and no restrictions on business features. And Turms also supports advanced features such as unwanted words filtering (using Aho-Corasick automaton with double array trie) and tiered storage for messages.
2. (Extensibility) Turms supports two approaches to extend: configuration properties and custom plugins. Of course, you can also modify the source code. For example, the plugin turms-plugin-minio based on turms-plugin is used to interact with MinIO server.
2. (Flexibility) Turms provides hundreds of configuration properties for developers to meet various requirements. And most of the properties can be updated at the cluster level when the cluster is running without performance loss.

Expand Down
2 changes: 1 addition & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Turms基于读扩散消息模型进行架构设计,对业务数据变化感知

### 业务功能相关特性

1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)等高级IM功能
1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)、消息冷热分离存储等高级IM功能
2. (功能拓展性)Turms同时支持两种拓展模式:配置参数与开发插件。当然您也完全可以对源码进行修改。目前用于接入的MinIO对象存储服务的插件turms-plugin-minio就是基于turms-plugin实现的。
3. (配置灵活性)Turms提供了上百个配置参数供用户定制,以满足各种需求。并且大部分配置都可以在集群运作时(不需要停机),进行集群级别的同步更新,并且无性能损失。

Expand Down
6 changes: 3 additions & 3 deletions docker-compose.standalone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ services:
image: mongo:4.4.10-focal
# depends_on:
# - loki
entrypoint: [ "mongod", "--port", "27017", "--shardsvr", "--replSet", "rs-shard", "--bind_ip_all" ]
entrypoint: [ "mongod", "--port", "27017", "--shardsvr", "--replSet", "shard01", "--bind_ip_all" ]
healthcheck:
test: [ "CMD", "mongo", "--quiet", "--eval", "db.runCommand(\"ping\").ok" ]
interval: 10s
Expand Down Expand Up @@ -118,9 +118,9 @@ services:
- -c
- |
mongo --host mongodb-config:27017 --eval "rs.initiate({_id: 'rs-config', configsvr: true, version: 1, members: [ { _id: 0, host : 'mongodb-config:27017' } ] })"
mongo --host mongodb-shard:27017 --eval "rs.initiate({_id: 'rs-shard', version: 1, members: [ { _id: 0, host : 'mongodb-shard:27017' } ] })"
mongo --host mongodb-shard:27017 --eval "rs.initiate({_id: 'shard01', version: 1, members: [ { _id: 0, host : 'mongodb-shard:27017' } ] })"
sleep 10
mongo --host mongodb-router:27017 --eval "sh.addShard('rs-shard/mongodb-shard:27017')"
mongo --host mongodb-router:27017 --eval "sh.addShard('shard01/mongodb-shard:27017')"
restart: "no"

# Redis
Expand Down
2 changes: 1 addition & 1 deletion turms-docs/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Turms基于读扩散消息模型进行架构设计,对业务数据变化感知

### 业务功能相关特性

1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)等高级IM功能
1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)、消息冷热分离存储等高级IM功能
2. (功能拓展性)Turms同时支持两种拓展模式:配置参数与开发插件。当然您也完全可以对源码进行修改。目前用于接入的MinIO对象存储服务的插件turms-plugin-minio就是基于turms-plugin实现的。
3. (配置灵活性)Turms提供了上百个配置参数供用户定制,以满足各种需求。并且大部分配置都可以在集群运作时(不需要停机),进行集群级别的同步更新,并且无性能损失。

Expand Down
2 changes: 1 addition & 1 deletion turms-docs/src/for-developers/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Turms集合中有几十个可选但默认不开启的索引,这是因为:

#### 消息(Message)

`消息`是我们目前唯一计划支持冷热数据分离的模型**而冷热数据分离能极大地节省数据库服务器成本**,比如将热数据放到`16核128G`服务器中,把冷数据放到`4核8G`服务器中。
`消息`是目前唯一支持冷热数据分离存储的模型**而冷热数据分离能极大地节省数据库服务器成本**,比如将热数据放到`16核128G`服务器中,把冷数据放到`4核8G`服务器中。另外,其他模型没有冷热数据分离存储的意义,因此其他模型不支持

##### 索引

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ public abstract class BaseTurmsApplication {
static {
// Hard code these properties to ensure they work as expected

TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
// Disable the max direct memory limit and buffer counters of Netty
// so that we can get the used direct memory via BufferPoolMXBean without depending on ByteBufAllocator of Netty
System.setProperty("io.netty.maxDirectMemory", "0");
System.setProperty("spring.main.banner-mode", "off");
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
}

protected static void bootstrap(Class<?> applicationClass, String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private Mono<SharedClusterProperties> initializeSharedProperties() {
.onErrorResume(DuplicateKeyException.class, e -> findAndUpdatePropertiesByNodeType(clusterProperties))
.doOnSuccess(properties -> {
sharedClusterProperties = properties;
LOGGER.info("Shared properties were retrieved successfully");
LOGGER.info("Shared properties were retrieved");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Mono<Void> registerLocalMember(boolean suppressDuplicateMemberError) {
return discoveryService.registerMember(localMember)
.doOnSuccess(ignored -> {
isLocalNodeRegistered = true;
LOGGER.info("Registered the local member successfully");
LOGGER.info("Registered the local member");
})
.onErrorResume(t -> {
if (suppressDuplicateMemberError && t instanceof DuplicateKeyException) {
Expand All @@ -118,7 +118,7 @@ public Mono<Void> unregisterLocalMember() {
.doOnError(e -> LOGGER.error("Failed to unregister the local member", e))
.doOnSuccess(ignored -> {
isLocalNodeRegistered = false;
LOGGER.info("Unregistered the local member successfully");
LOGGER.info("Unregistered the local member");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private CronConstant() {

public static final String DEFAULT_EXPIRED_MESSAGES_CLEANUP_CRON = "0 45 2 * * *";

public static final String DEFAULT_TIERED_STORAGE_TIER_RANGE_UPDATING_CRON = "0 0 3 * * *";

public static final String EXPIRED_BLOCKED_CLIENT_CLEANUP_CRON = "0 0 1/6 * * *";

public static final String EXPIRED_ADMIN_API_ACCESS_INFO_CLEANUP_CRON = "0 0/30 * * * *";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class MongoContext {
private final MongoDatabase database;
@Getter
private final MongoDatabase adminDatabase;
@Getter
private final MongoDatabase configDatabase;
private final CodecRegistry codecRegistry;
private final Map<Class<?>, MongoEntity<?>> entityMap = new IdentityHashMap<>(64);
private final Map<Class<?>, MongoCollection<?>> collectionMap = new IdentityHashMap<>(64);
Expand Down Expand Up @@ -118,6 +120,7 @@ public void clusterDescriptionChanged(ClusterDescriptionChangedEvent event) {
client = MongoClients.create(settings);
database = client.getDatabase(connectionSettings.getDatabase());
adminDatabase = client.getDatabase("admin");
configDatabase = client.getDatabase("config");

SerializationUtil.codecRegistry = codecRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import im.turms.server.common.mongo.entity.annotation.Indexed;
import im.turms.server.common.mongo.entity.annotation.PropertySetter;
import im.turms.server.common.mongo.entity.annotation.Sharded;
import im.turms.server.common.mongo.entity.annotation.WithTemperature;
import im.turms.server.common.mongo.entity.annotation.TieredStorage;
import im.turms.server.common.util.ReflectionUtil;
import lombok.Data;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -74,18 +74,18 @@ public <T> MongoEntity<T> parse(Class<T> clazz) {
}

private <T> Zone parseZone(Class<T> clazz, ShardKey shardKey) {
WithTemperature temperature = clazz.getAnnotation(WithTemperature.class);
if (temperature == null) {
TieredStorage storage = clazz.getAnnotation(TieredStorage.class);
if (storage == null) {
return null;
}
String creationDateFieldName = temperature.creationDateFieldName();
String creationDateFieldName = storage.creationDateFieldName();
if (!StringUtils.hasText(creationDateFieldName)) {
throw new IllegalStateException(
"The creationDateFieldName of @WithTemperature must not be blank for the class " + clazz.getName());
"The creationDateFieldName of @TieredStorage must not be blank for the class " + clazz.getName());
}
if (!shardKey.document().containsKey(creationDateFieldName)) {
throw new IllegalStateException(
"The creationDateFieldName of @WithTemperature must be a part of the shard key of the class " + clazz.getName());
"The creationDateFieldName of @TieredStorage must be a part of the shard key of the class " + clazz.getName());
}
return new Zone(creationDateFieldName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

/**
* @author James Chen
* @implNote Note that the multi-temperature data also has a monotonic shard key.
* @implNote Note that the collections supporting tiered storage have a monotonic shard key.
* In other words, all operations will be routed to hot shards (Note that there
* can be multiple shards for hot data) instead of all shards evenly distributed.
* The imbalance is what we want to support multi-temperature data
* The imbalance is what we want to support tiered storage.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface WithTemperature {
public @interface TieredStorage {
String creationDateFieldName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoDatabase;
import im.turms.server.common.mongo.entity.MongoEntity;
import im.turms.server.common.mongo.operation.option.Filter;
import im.turms.server.common.mongo.operation.option.QueryOptions;
Expand Down Expand Up @@ -106,9 +105,11 @@ public interface MongoOperationsSupport {

<T> Flux<Document> listIndexes(Class<T> clazz);

Mono<Void> enableSharding(MongoDatabase databaseToShard, MongoDatabase adminDatabase);
Mono<Void> deleteTags(String collectionName);

Mono<Void> shard(MongoDatabase databaseToShard, MongoDatabase adminDatabase, MongoEntity<?> entity);
Mono<Void> enableSharding();

Mono<Void> shard(MongoEntity<?> entity);

Mono<Void> ensureIndexesAndShard(Collection<Class<?>> classes);

Expand All @@ -131,4 +132,11 @@ Mono<Void> updateZoneKeyRange(String collectionName,
Mono<Boolean> validate(Class<?> clazz, String jsonSchema);

<T> Mono<T> inTransaction(Function<ClientSession, Mono<T>> execute);

Mono<Void> disableBalancing(String collectionName);

Mono<Void> enableBalancing(String collectionName);

Mono<Boolean> isBalancerRunning();

}
Loading

0 comments on commit 937c564

Please sign in to comment.