diff --git a/.github/workflows/deploy-playground.yml b/.github/workflows/deploy-playground.yml index 27b8d2075c..69ae474c75 100644 --- a/.github/workflows/deploy-playground.yml +++ b/.github/workflows/deploy-playground.yml @@ -29,7 +29,7 @@ jobs: sudo docker-compose -f docker-compose.standalone.yml down sudo docker system prune -a -f sudo docker system prune --volumes -f - rm -rf /var/log/journal + sudo rm -rf /var/log/journal sudo git pull sudo git reset --hard sudo ENV=dev,demo docker-compose -f docker-compose.standalone.yml --profile monitoring up --force-recreate -d \ No newline at end of file diff --git a/README.md b/README.md index 3b3afee871..cafa29d5b8 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ Note: The main disadvantage of the current Turms project is that it does not pro ### Business Features -1. Support a complete set of [IM features](https://turms-im.github.io/docs/features). Turms supports almost all IM features supported by commercial instant messaging products and no restrictions on business features. And Turms also supports advanced features such as unwanted words filtering (using Aho-Corasick automaton with double array trie). +1. Support a complete set of [IM features](https://turms-im.github.io/docs/features). Turms supports almost all IM features supported by commercial instant messaging products and no restrictions on business features. And Turms also supports advanced features such as unwanted words filtering (using Aho-Corasick automaton with double array trie) and tiered storage for messages. 2. (Extensibility) Turms supports two approaches to extend: configuration properties and custom plugins. Of course, you can also modify the source code. For example, the plugin turms-plugin-minio based on turms-plugin is used to interact with MinIO server. 2. (Flexibility) Turms provides hundreds of configuration properties for developers to meet various requirements. And most of the properties can be updated at the cluster level when the cluster is running without performance loss. diff --git a/README_zh.md b/README_zh.md index 7e78cbf903..9c8164da58 100644 --- a/README_zh.md +++ b/README_zh.md @@ -66,7 +66,7 @@ Turms基于读扩散消息模型进行架构设计,对业务数据变化感知 ### 业务功能相关特性 -1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)等高级IM功能。 +1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)、消息冷热分离存储等高级IM功能。 2. (功能拓展性)Turms同时支持两种拓展模式:配置参数与开发插件。当然您也完全可以对源码进行修改。目前用于接入的MinIO对象存储服务的插件turms-plugin-minio就是基于turms-plugin实现的。 3. (配置灵活性)Turms提供了上百个配置参数供用户定制,以满足各种需求。并且大部分配置都可以在集群运作时(不需要停机),进行集群级别的同步更新,并且无性能损失。 diff --git a/docker-compose.standalone.yml b/docker-compose.standalone.yml index 68db46168c..67b5be338c 100644 --- a/docker-compose.standalone.yml +++ b/docker-compose.standalone.yml @@ -79,7 +79,7 @@ services: image: mongo:4.4.10-focal # depends_on: # - loki - entrypoint: [ "mongod", "--port", "27017", "--shardsvr", "--replSet", "rs-shard", "--bind_ip_all" ] + entrypoint: [ "mongod", "--port", "27017", "--shardsvr", "--replSet", "shard01", "--bind_ip_all" ] healthcheck: test: [ "CMD", "mongo", "--quiet", "--eval", "db.runCommand(\"ping\").ok" ] interval: 10s @@ -118,9 +118,9 @@ services: - -c - | mongo --host mongodb-config:27017 --eval "rs.initiate({_id: 'rs-config', configsvr: true, version: 1, members: [ { _id: 0, host : 'mongodb-config:27017' } ] })" - mongo --host mongodb-shard:27017 --eval "rs.initiate({_id: 'rs-shard', version: 1, members: [ { _id: 0, host : 'mongodb-shard:27017' } ] })" + mongo --host mongodb-shard:27017 --eval "rs.initiate({_id: 'shard01', version: 1, members: [ { _id: 0, host : 'mongodb-shard:27017' } ] })" sleep 10 - mongo --host mongodb-router:27017 --eval "sh.addShard('rs-shard/mongodb-shard:27017')" + mongo --host mongodb-router:27017 --eval "sh.addShard('shard01/mongodb-shard:27017')" restart: "no" # Redis diff --git a/turms-docs/src/README.md b/turms-docs/src/README.md index 7e78cbf903..9c8164da58 100644 --- a/turms-docs/src/README.md +++ b/turms-docs/src/README.md @@ -66,7 +66,7 @@ Turms基于读扩散消息模型进行架构设计,对业务数据变化感知 ### 业务功能相关特性 -1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)等高级IM功能。 +1. (业务功能完善性)Turms支持几乎所有商用即时通讯产品所支持的[即时通讯相关功能](https://turms-im.github.io/docs/features/)(甚至还有更多的业务功能),且无业务功能限制,同时也支持一些诸如敏感词过滤(基于双数组Trie的AC自动机算法实现)、消息冷热分离存储等高级IM功能。 2. (功能拓展性)Turms同时支持两种拓展模式:配置参数与开发插件。当然您也完全可以对源码进行修改。目前用于接入的MinIO对象存储服务的插件turms-plugin-minio就是基于turms-plugin实现的。 3. (配置灵活性)Turms提供了上百个配置参数供用户定制,以满足各种需求。并且大部分配置都可以在集群运作时(不需要停机),进行集群级别的同步更新,并且无性能损失。 diff --git a/turms-docs/src/for-developers/schema.md b/turms-docs/src/for-developers/schema.md index 060e974ab8..def7db421e 100644 --- a/turms-docs/src/for-developers/schema.md +++ b/turms-docs/src/for-developers/schema.md @@ -101,7 +101,7 @@ Turms集合中有几十个可选但默认不开启的索引,这是因为: #### 消息(Message) -`消息`是我们目前唯一计划支持冷热数据分离的模型。**而冷热数据分离能极大地节省数据库服务器成本**,比如将热数据放到`16核128G`服务器中,把冷数据放到`4核8G`服务器中。 +`消息`是目前唯一支持冷热数据分离存储的模型。**而冷热数据分离能极大地节省数据库服务器成本**,比如将热数据放到`16核128G`服务器中,把冷数据放到`4核8G`服务器中。另外,其他模型没有冷热数据分离存储的意义,因此其他模型不支持。 ##### 索引 diff --git a/turms-server-common/src/main/java/im/turms/server/common/BaseTurmsApplication.java b/turms-server-common/src/main/java/im/turms/server/common/BaseTurmsApplication.java index a8efa80109..f49b972408 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/BaseTurmsApplication.java +++ b/turms-server-common/src/main/java/im/turms/server/common/BaseTurmsApplication.java @@ -34,11 +34,11 @@ public abstract class BaseTurmsApplication { static { // Hard code these properties to ensure they work as expected + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); // Disable the max direct memory limit and buffer counters of Netty // so that we can get the used direct memory via BufferPoolMXBean without depending on ByteBufAllocator of Netty System.setProperty("io.netty.maxDirectMemory", "0"); System.setProperty("spring.main.banner-mode", "off"); - TimeZone.setDefault(TimeZone.getTimeZone("UTC")); } protected static void bootstrap(Class applicationClass, String[] args) { diff --git a/turms-server-common/src/main/java/im/turms/server/common/cluster/service/config/SharedPropertyService.java b/turms-server-common/src/main/java/im/turms/server/common/cluster/service/config/SharedPropertyService.java index 4d70bf4010..28e26667c6 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/cluster/service/config/SharedPropertyService.java +++ b/turms-server-common/src/main/java/im/turms/server/common/cluster/service/config/SharedPropertyService.java @@ -174,7 +174,7 @@ private Mono initializeSharedProperties() { .onErrorResume(DuplicateKeyException.class, e -> findAndUpdatePropertiesByNodeType(clusterProperties)) .doOnSuccess(properties -> { sharedClusterProperties = properties; - LOGGER.info("Shared properties were retrieved successfully"); + LOGGER.info("Shared properties were retrieved"); }); } diff --git a/turms-server-common/src/main/java/im/turms/server/common/cluster/service/discovery/LocalNodeStatusManager.java b/turms-server-common/src/main/java/im/turms/server/common/cluster/service/discovery/LocalNodeStatusManager.java index da127f2618..896519a36b 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/cluster/service/discovery/LocalNodeStatusManager.java +++ b/turms-server-common/src/main/java/im/turms/server/common/cluster/service/discovery/LocalNodeStatusManager.java @@ -98,7 +98,7 @@ public Mono registerLocalMember(boolean suppressDuplicateMemberError) { return discoveryService.registerMember(localMember) .doOnSuccess(ignored -> { isLocalNodeRegistered = true; - LOGGER.info("Registered the local member successfully"); + LOGGER.info("Registered the local member"); }) .onErrorResume(t -> { if (suppressDuplicateMemberError && t instanceof DuplicateKeyException) { @@ -118,7 +118,7 @@ public Mono unregisterLocalMember() { .doOnError(e -> LOGGER.error("Failed to unregister the local member", e)) .doOnSuccess(ignored -> { isLocalNodeRegistered = false; - LOGGER.info("Unregistered the local member successfully"); + LOGGER.info("Unregistered the local member"); }); } diff --git a/turms-server-common/src/main/java/im/turms/server/common/constant/CronConstant.java b/turms-server-common/src/main/java/im/turms/server/common/constant/CronConstant.java index 503f172f88..6064cbce6b 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/constant/CronConstant.java +++ b/turms-server-common/src/main/java/im/turms/server/common/constant/CronConstant.java @@ -36,6 +36,8 @@ private CronConstant() { public static final String DEFAULT_EXPIRED_MESSAGES_CLEANUP_CRON = "0 45 2 * * *"; + public static final String DEFAULT_TIERED_STORAGE_TIER_RANGE_UPDATING_CRON = "0 0 3 * * *"; + public static final String EXPIRED_BLOCKED_CLIENT_CLEANUP_CRON = "0 0 1/6 * * *"; public static final String EXPIRED_ADMIN_API_ACCESS_INFO_CLEANUP_CRON = "0 0/30 * * * *"; diff --git a/turms-server-common/src/main/java/im/turms/server/common/mongo/MongoContext.java b/turms-server-common/src/main/java/im/turms/server/common/mongo/MongoContext.java index b2c51edc14..e06bcda477 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/mongo/MongoContext.java +++ b/turms-server-common/src/main/java/im/turms/server/common/mongo/MongoContext.java @@ -71,6 +71,8 @@ public class MongoContext { private final MongoDatabase database; @Getter private final MongoDatabase adminDatabase; + @Getter + private final MongoDatabase configDatabase; private final CodecRegistry codecRegistry; private final Map, MongoEntity> entityMap = new IdentityHashMap<>(64); private final Map, MongoCollection> collectionMap = new IdentityHashMap<>(64); @@ -118,6 +120,7 @@ public void clusterDescriptionChanged(ClusterDescriptionChangedEvent event) { client = MongoClients.create(settings); database = client.getDatabase(connectionSettings.getDatabase()); adminDatabase = client.getDatabase("admin"); + configDatabase = client.getDatabase("config"); SerializationUtil.codecRegistry = codecRegistry; } diff --git a/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/MongoEntityFactory.java b/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/MongoEntityFactory.java index 4c75367908..63a5d76bf8 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/MongoEntityFactory.java +++ b/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/MongoEntityFactory.java @@ -27,7 +27,7 @@ import im.turms.server.common.mongo.entity.annotation.Indexed; import im.turms.server.common.mongo.entity.annotation.PropertySetter; import im.turms.server.common.mongo.entity.annotation.Sharded; -import im.turms.server.common.mongo.entity.annotation.WithTemperature; +import im.turms.server.common.mongo.entity.annotation.TieredStorage; import im.turms.server.common.util.ReflectionUtil; import lombok.Data; import org.bson.BsonDocument; @@ -74,18 +74,18 @@ public MongoEntity parse(Class clazz) { } private Zone parseZone(Class clazz, ShardKey shardKey) { - WithTemperature temperature = clazz.getAnnotation(WithTemperature.class); - if (temperature == null) { + TieredStorage storage = clazz.getAnnotation(TieredStorage.class); + if (storage == null) { return null; } - String creationDateFieldName = temperature.creationDateFieldName(); + String creationDateFieldName = storage.creationDateFieldName(); if (!StringUtils.hasText(creationDateFieldName)) { throw new IllegalStateException( - "The creationDateFieldName of @WithTemperature must not be blank for the class " + clazz.getName()); + "The creationDateFieldName of @TieredStorage must not be blank for the class " + clazz.getName()); } if (!shardKey.document().containsKey(creationDateFieldName)) { throw new IllegalStateException( - "The creationDateFieldName of @WithTemperature must be a part of the shard key of the class " + clazz.getName()); + "The creationDateFieldName of @TieredStorage must be a part of the shard key of the class " + clazz.getName()); } return new Zone(creationDateFieldName); } diff --git a/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/annotation/WithTemperature.java b/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/annotation/TieredStorage.java similarity index 85% rename from turms-server-common/src/main/java/im/turms/server/common/mongo/entity/annotation/WithTemperature.java rename to turms-server-common/src/main/java/im/turms/server/common/mongo/entity/annotation/TieredStorage.java index 8d0c961999..2bf1138719 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/annotation/WithTemperature.java +++ b/turms-server-common/src/main/java/im/turms/server/common/mongo/entity/annotation/TieredStorage.java @@ -24,13 +24,13 @@ /** * @author James Chen - * @implNote Note that the multi-temperature data also has a monotonic shard key. + * @implNote Note that the collections supporting tiered storage have a monotonic shard key. * In other words, all operations will be routed to hot shards (Note that there * can be multiple shards for hot data) instead of all shards evenly distributed. - * The imbalance is what we want to support multi-temperature data + * The imbalance is what we want to support tiered storage. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) -public @interface WithTemperature { +public @interface TieredStorage { String creationDateFieldName(); -} +} \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/MongoOperationsSupport.java b/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/MongoOperationsSupport.java index 15b0132d80..de8d816b3f 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/MongoOperationsSupport.java +++ b/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/MongoOperationsSupport.java @@ -23,7 +23,6 @@ import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; import com.mongodb.reactivestreams.client.ClientSession; -import com.mongodb.reactivestreams.client.MongoDatabase; import im.turms.server.common.mongo.entity.MongoEntity; import im.turms.server.common.mongo.operation.option.Filter; import im.turms.server.common.mongo.operation.option.QueryOptions; @@ -106,9 +105,11 @@ public interface MongoOperationsSupport { Flux listIndexes(Class clazz); - Mono enableSharding(MongoDatabase databaseToShard, MongoDatabase adminDatabase); + Mono deleteTags(String collectionName); - Mono shard(MongoDatabase databaseToShard, MongoDatabase adminDatabase, MongoEntity entity); + Mono enableSharding(); + + Mono shard(MongoEntity entity); Mono ensureIndexesAndShard(Collection> classes); @@ -131,4 +132,11 @@ Mono updateZoneKeyRange(String collectionName, Mono validate(Class clazz, String jsonSchema); Mono inTransaction(Function> execute); + + Mono disableBalancing(String collectionName); + + Mono enableBalancing(String collectionName); + + Mono isBalancerRunning(); + } \ No newline at end of file diff --git a/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/TurmsMongoOperations.java b/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/TurmsMongoOperations.java index de1611d9c5..844dc9125c 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/TurmsMongoOperations.java +++ b/turms-server-common/src/main/java/im/turms/server/common/mongo/operation/TurmsMongoOperations.java @@ -39,7 +39,6 @@ import com.mongodb.reactivestreams.client.FindPublisher; import com.mongodb.reactivestreams.client.ListIndexesPublisher; import com.mongodb.reactivestreams.client.MongoCollection; -import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.reactivestreams.client.internal.MongoCollectionUtil; import com.mongodb.reactivestreams.client.internal.MongoOperationPublisher; import com.mongodb.reactivestreams.client.internal.TurmsFindPublisherImpl; @@ -57,8 +56,10 @@ import im.turms.server.common.util.CollectorUtil; import im.turms.server.common.util.MapUtil; import org.apache.commons.lang3.StringUtils; +import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonDocumentWriter; +import org.bson.BsonString; import org.bson.BsonValue; import org.bson.Document; import org.bson.codecs.Codec; @@ -87,6 +88,7 @@ * we don't check whether arguments are legal or not and so on * 2. The publishers of mongo-java-driver are cold * and the publishers of TurmsMongoOperations are also cold + * Reference: https://github.com/mongodb/mongo/blob/master/src/mongo/shell/utils_sh.js */ public class TurmsMongoOperations implements MongoOperationsSupport { @@ -436,14 +438,14 @@ public Mono ensureIndexes(Class clazz, List indexModels String collectionName = context.getEntity(clazz).collectionName(); return Flux.from(source) .then() - .doOnError(throwable -> LOGGER.error("Failed to index the collection {}", collectionName, throwable)) + .onErrorMap(t -> new IllegalStateException("Failed to index the collection " + collectionName, t)) .doOnSuccess(ignored -> { List indexDocs = indexModels .stream() .map(indexModel -> indexModel.getKeys().toBsonDocument()) .collect(CollectorUtil.toList(indexModels.size())); String indexes = StringUtils.join(indexDocs, ", "); - LOGGER.info("Indexed the collection {} successfully: [{}]", collectionName, indexes); + LOGGER.info("Indexed the collection {}: [{}]", collectionName, indexes); }); } @@ -457,28 +459,36 @@ public Flux listIndexes(Class clazz) { // Shard @Override - public Mono enableSharding(MongoDatabase databaseToShard, MongoDatabase adminDatabase) { - String dbName = databaseToShard.getName(); - Mono enableSharding = Mono.from(adminDatabase.runCommand(new Document("enableSharding", dbName))) - .doOnError(throwable -> LOGGER.error("Failed to enable sharding the database {}", dbName, throwable)) - .doOnSuccess(ignored -> LOGGER.info("Enabled sharding the database {} successfully", dbName)); - return enableSharding.then(); + public Mono deleteTags(String collectionName) { + String namespace = getNamespace(collectionName); + BsonDocument filter = new BsonDocument("ns", new BsonString(namespace)); + Publisher source = context.getConfigDatabase().getCollection("tags").deleteMany(filter); + return Flux.from(source) + .then(); + } + + @Override + public Mono enableSharding() { + String dbName = context.getDatabase().getName(); + Publisher source = context.getAdminDatabase().runCommand(new Document("enableSharding", dbName)); + return Mono.from(source) + .onErrorMap(t -> new IllegalStateException("Failed to enable sharding the database " + dbName, t)) + .doOnSuccess(ignored -> LOGGER.info("Enabled sharding the database {}", dbName)) + .then(); } @Override - public Mono shard(MongoDatabase databaseToShard, MongoDatabase adminDatabase, MongoEntity entity) { + public Mono shard(MongoEntity entity) { BsonDocument shardKey = entity.getShardKeyBson(); if (shardKey == null) { return Mono.empty(); } - String dbName = databaseToShard.getName(); - String namespace = "%s.%s".formatted(dbName, entity.collectionName()); + String namespace = getNamespace(entity.collectionName()); Document command = new Document("shardCollection", namespace).append("key", shardKey); - Mono shardCollection = Mono.from(adminDatabase.runCommand(command)) - .doOnError(throwable -> - LOGGER.error("Failed to shard the collection {} with the shard key {}", namespace, shardKey.toJson(), throwable)) + Mono shardCollection = Mono.from(context.getAdminDatabase().runCommand(command)) + .onErrorMap(t -> new IllegalStateException("Failed to shard the collection %s with the shard key %s".formatted(namespace, shardKey.toJson()), t)) .doOnSuccess(ignored -> - LOGGER.info("Sharded the collection {} with the shard key {} successfully", namespace, shardKey.toJson())); + LOGGER.info("Sharded the collection {} with the shard key {}", namespace, shardKey.toJson())); return shardCollection.then(); } @@ -497,34 +507,34 @@ public Mono ensureIndexesAndShard(Collection> classes, Mono ensureIndexes = Mono.empty(); for (Class clazz : classes) { MongoEntity entity = context.getEntity(clazz); - List indexModels = entity.indexes(); - List indexes = new ArrayList<>(indexModels.size()); + List indexes = entity.indexes(); + List indexModels = new ArrayList<>(indexes.size()); IndexModel compoundIndex = entity.compoundIndex(); if (compoundIndex != null) { - indexes.add(compoundIndex); + indexModels.add(compoundIndex); } - for (Index index : indexModels) { + for (Index index : indexes) { if (index.indexed().optional()) { if (customIndexFilter != null && customIndexFilter.test(clazz, index.field())) { - indexes.add(index.model()); + indexModels.add(index.model()); } } else { - indexes.add(index.model()); + indexModels.add(index.model()); } } - if (!indexes.isEmpty()) { + if (!indexModels.isEmpty()) { ensureIndexes = ensureIndexes - .then(Mono.defer(() -> ensureIndexes(entity.entityClass(), indexes))); + .then(Mono.defer(() -> ensureIndexes(entity.entityClass(), indexModels))); } } return ensureIndexes - .then(Mono.defer(() -> enableSharding(context.getDatabase(), context.getAdminDatabase()))) + .then(enableSharding()) .then(Mono.defer(() -> { Mono shardEntities = Mono.empty(); for (Class clazz : classes) { MongoEntity entity = context.getEntity(clazz); shardEntities = shardEntities - .then(Mono.defer(() -> shard(context.getDatabase(), context.getAdminDatabase(), entity))); + .then(Mono.defer(() -> shard(entity))); } return shardEntities; })); @@ -534,8 +544,8 @@ public Mono ensureIndexesAndShard(Collection> classes, @Override public Mono addShardToZone(String shardName, String zoneName) { - Document command = new Document("addShardToZone", shardName) - .append("zone", zoneName); + BsonDocument command = new BsonDocument("addShardToZone", new BsonString(shardName)) + .append("zone", new BsonString(zoneName)); Publisher source = context.getAdminDatabase().runCommand(command); return Mono.from(source).then(); } @@ -545,7 +555,7 @@ public Mono updateZoneKeyRange(String collectionName, String zoneName, Document minimum, Document maximum) { - Document command = new Document("updateZoneKeyRange", context.getDatabase().getName() + "." + collectionName) + Document command = new Document("updateZoneKeyRange", getNamespace(collectionName)) .append("min", minimum) .append("max", maximum) .append("zone", zoneName); @@ -603,6 +613,34 @@ public Mono inTransaction(Function> action) { ClientSession::commitTransaction); } + // Balancer + + @Override + public Mono disableBalancing(String collectionName) { + String namespace = getNamespace(collectionName); + BsonDocument filter = new BsonDocument("_id", new BsonString(namespace)); + BsonDocument update = new BsonDocument("$set", new BsonDocument("noBalance", BsonBoolean.TRUE)); + Publisher disableBalancing = context.getConfigDatabase().getCollection("collections") + .updateOne(filter, update, DEFAULT_UPDATE_OPTIONS); + return Mono.from(disableBalancing).then(); + } + + @Override + public Mono enableBalancing(String collectionName) { + String namespace = getNamespace(collectionName); + BsonDocument filter = new BsonDocument("_id", new BsonString(namespace)); + BsonDocument update = new BsonDocument("$set", new BsonDocument("noBalance", BsonBoolean.FALSE)); + Publisher enableBalancing = context.getConfigDatabase().getCollection("collections") + .updateOne(filter, update, DEFAULT_UPDATE_OPTIONS); + return Mono.from(enableBalancing).then(); + } + + @Override + public Mono isBalancerRunning() { + Publisher balancerStatus = context.getAdminDatabase().runCommand(new BsonDocument("balancerStatus", BsonPool.BSON_INT32_1)); + return Mono.from(balancerStatus).map(document -> document.getBoolean("inBalancerRound")); + } + // Helper private BsonDocument encodeEntity(T value) { @@ -625,6 +663,10 @@ private TurmsFindPublisherImpl find(MongoCollection collection, return new TurmsFindPublisherImpl<>(null, publisher, filter, options); } + private String getNamespace(String collectionName) { + return context.getDatabase().getName() + "." + collectionName; + } + private MongoOperationPublisher getPublisher(MongoCollection collection) { Class entityClass = collection.getDocumentClass(); MongoOperationPublisher publisher = (MongoOperationPublisher) publisherMap.get(entityClass); diff --git a/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MessageMongoProperties.java b/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MessageMongoProperties.java index a6088945a1..e068176981 100644 --- a/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MessageMongoProperties.java +++ b/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MessageMongoProperties.java @@ -39,7 +39,7 @@ public class MessageMongoProperties extends TurmsMongoProperties { private OptionalIndexProperties optionalIndex = new OptionalIndexProperties(); @NestedConfigurationProperty - private MultiTemperatureProperties temperature = new MultiTemperatureProperties(); + private TieredStorageProperties tieredStorage = new TieredStorageProperties(); @NestedConfigurationProperty private WriteConcernProperties writeConcern = new WriteConcernProperties(); diff --git a/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MultiTemperatureProperties.java b/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MultiTemperatureProperties.java deleted file mode 100644 index 16904b6bcb..0000000000 --- a/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/MultiTemperatureProperties.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2019 The Turms Project - * https://github.com/turms-im/turms - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package im.turms.server.common.property.env.service.env.database; - -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.List; - -/** - * @author James Chen - */ -@Data -public class MultiTemperatureProperties { - - private boolean enabled; - - private TemperatureProperties hot = new TemperatureProperties(30); - - private TemperatureProperties warm = new TemperatureProperties(30 * 2); - - private TemperatureProperties cold = new TemperatureProperties(30 * 9); - - private TemperatureProperties frozen = new TemperatureProperties(30 * 12); - - @Data - @NoArgsConstructor - public static class TemperatureProperties { - private int days; - private List shards = List.of(""); - - public TemperatureProperties(int days) { - this.days = days; - } - } -} diff --git a/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/TieredStorageProperties.java b/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/TieredStorageProperties.java new file mode 100644 index 0000000000..3cd82f40a0 --- /dev/null +++ b/turms-server-common/src/main/java/im/turms/server/common/property/env/service/env/database/TieredStorageProperties.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2019 The Turms Project + * https://github.com/turms-im/turms + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.turms.server.common.property.env.service.env.database; + +import com.fasterxml.jackson.annotation.JsonView; +import im.turms.server.common.constant.CronConstant; +import im.turms.server.common.property.metadata.annotation.Description; +import im.turms.server.common.property.metadata.annotation.GlobalProperty; +import im.turms.server.common.property.metadata.view.MutablePropertiesView; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +import java.util.LinkedHashMap; +import java.util.List; + +/** + * @author James Chen + */ +@Data +public class TieredStorageProperties { + + private boolean enabled = true; + + @Description("The storage properties for tiers from hot to cold. " + + "Note that the order of the tiers is important") + private LinkedHashMap tiers = new LinkedHashMap<>(); + + @NestedConfigurationProperty + private AutoRangeUpdaterProperties autoRangeUpdater = new AutoRangeUpdaterProperties(); + + public TieredStorageProperties() { + tiers.put("hot", new StorageTierProperties(30)); + tiers.put("warm", new StorageTierProperties(30 * 2)); + tiers.put("cold", new StorageTierProperties(30 * 9)); + tiers.put("frozen", new StorageTierProperties()); + } + + @Data + @NoArgsConstructor + public static class StorageTierProperties { + private boolean enabled = true; + + private int days; + + private List shards = List.of(""); + + public StorageTierProperties(int days) { + this.days = days; + } + } + + @Data + public static class AutoRangeUpdaterProperties { + @GlobalProperty + @JsonView(MutablePropertiesView.class) + private boolean enabled = true; + + private String cron = CronConstant.DEFAULT_TIERED_STORAGE_TIER_RANGE_UPDATING_CRON; + } + +} \ No newline at end of file diff --git a/turms-server-test-common/src/main/resources/docker-compose.test.yml b/turms-server-test-common/src/main/resources/docker-compose.test.yml index 9bb2f54e20..5170d8e93f 100644 --- a/turms-server-test-common/src/main/resources/docker-compose.test.yml +++ b/turms-server-test-common/src/main/resources/docker-compose.test.yml @@ -9,7 +9,7 @@ services: environment: - MONGODB_ADVERTISED_HOSTNAME=mongodb-router - MONGODB_SHARDING_MODE=mongos - - MONGODB_CFG_PRIMARY_HOST=mongodb-cfg + - MONGODB_CFG_PRIMARY_HOST=mongodb-config - MONGODB_CFG_REPLICA_SET_NAME=cfgreplicaset - MONGODB_REPLICA_SET_KEY=replicasetkey123 - MONGODB_ROOT_PASSWORD=turms @@ -30,10 +30,10 @@ services: - MONGODB_REPLICA_SET_KEY=replicasetkey123 - MONGODB_REPLICA_SET_NAME=shard01 - mongodb-cfg: + mongodb-config: image: "bitnami/mongodb-sharded:4.4.10" environment: - - MONGODB_ADVERTISED_HOSTNAME=mongodb-cfg + - MONGODB_ADVERTISED_HOSTNAME=mongodb-config - MONGODB_SHARDING_MODE=configsvr - MONGODB_ROOT_PASSWORD=turms - MONGODB_REPLICA_SET_MODE=primary diff --git a/turms-service/src/main/java/im/turms/service/logging/ApiLoggingContext.java b/turms-service/src/main/java/im/turms/service/logging/ApiLoggingContext.java index ed9b8bd23e..4c42d8668a 100644 --- a/turms-service/src/main/java/im/turms/service/logging/ApiLoggingContext.java +++ b/turms-service/src/main/java/im/turms/service/logging/ApiLoggingContext.java @@ -36,7 +36,10 @@ public class ApiLoggingContext extends CommonApiLoggingContext { private final Map supportedLoggingNotificationProperties; public ApiLoggingContext(TurmsPropertiesManager propertiesManager) { - ClientApiLoggingProperties loggingProperties = propertiesManager.getLocalProperties().getService().getClientApi().getLogging(); + ClientApiLoggingProperties loggingProperties = propertiesManager.getLocalProperties() + .getService() + .getClientApi() + .getLogging(); supportedLoggingRequestProperties = getSupportedLoggingRequestProperties( loggingProperties.getIncludedRequestCategories(), loggingProperties.getIncludedRequests(), diff --git a/turms-service/src/main/java/im/turms/service/workflow/dao/MongoCollectionInitializer.java b/turms-service/src/main/java/im/turms/service/workflow/dao/MongoCollectionInitializer.java index 15552e9c9d..5c92359ca6 100644 --- a/turms-service/src/main/java/im/turms/service/workflow/dao/MongoCollectionInitializer.java +++ b/turms-service/src/main/java/im/turms/service/workflow/dao/MongoCollectionInitializer.java @@ -19,6 +19,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import im.turms.server.common.cluster.node.Node; import im.turms.server.common.context.TurmsApplicationContext; import im.turms.server.common.dao.domain.User; import im.turms.server.common.logging.core.logger.Logger; @@ -29,10 +30,11 @@ import im.turms.server.common.mongo.entity.MongoEntity; import im.turms.server.common.mongo.entity.Zone; import im.turms.server.common.property.TurmsPropertiesManager; -import im.turms.server.common.property.env.service.env.database.MultiTemperatureProperties; import im.turms.server.common.property.env.service.ServiceProperties; import im.turms.server.common.property.env.service.env.database.MongoProperties; +import im.turms.server.common.property.env.service.env.database.TieredStorageProperties; import im.turms.server.common.security.PasswordManager; +import im.turms.server.common.task.TrivialTaskManager; import im.turms.server.common.util.ReactorUtil; import im.turms.service.workflow.dao.domain.admin.Admin; import im.turms.service.workflow.dao.domain.admin.AdminRole; @@ -53,22 +55,27 @@ import im.turms.service.workflow.dao.domain.user.UserRelationshipGroup; import im.turms.service.workflow.dao.domain.user.UserRelationshipGroupMember; import im.turms.service.workflow.dao.domain.user.UserVersion; -import org.apache.commons.lang3.time.DateUtils; +import org.bson.BsonTimestamp; import org.bson.Document; +import org.springframework.context.annotation.Lazy; +import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; import java.lang.reflect.Field; import java.time.Duration; -import java.util.Date; -import java.util.LinkedHashMap; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.BiPredicate; import java.util.stream.Collectors; -import static im.turms.server.common.property.env.service.env.database.MultiTemperatureProperties.TemperatureProperties; +import static im.turms.server.common.property.env.service.env.database.TieredStorageProperties.StorageTierProperties; /** * @author James Chen @@ -87,18 +94,23 @@ public class MongoCollectionInitializer implements IMongoCollectionInitializer { private final TurmsApplicationContext context; private final MongoFakingManager fakingManager; - private final MultiTemperatureProperties messageTemperatureProperties; + private final TieredStorageProperties messageTieredStorageProperties; private final MongoProperties mongoProperties; + private final Node node; + public MongoCollectionInitializer( + @Lazy Node node, TurmsMongoClient adminMongoClient, TurmsMongoClient userMongoClient, TurmsMongoClient groupMongoClient, TurmsMongoClient conversationMongoClient, TurmsMongoClient messageMongoClient, PasswordManager passwordManager, + TrivialTaskManager taskManager, TurmsApplicationContext context, TurmsPropertiesManager turmsPropertiesManager) { + this.node = node; this.adminMongoClient = adminMongoClient; this.userMongoClient = userMongoClient; this.groupMongoClient = groupMongoClient; @@ -120,12 +132,26 @@ public MongoCollectionInitializer( messageMongoClient); mongoProperties = serviceProperties .getMongo(); - messageTemperatureProperties = serviceProperties + messageTieredStorageProperties = serviceProperties .getMongo() .getMessage() - .getTemperature(); + .getTieredStorage(); initCollections(); + + TieredStorageProperties.AutoRangeUpdaterProperties autoRangeUpdater = serviceProperties.getMongo() + .getMessage().getTieredStorage().getAutoRangeUpdater(); + if (autoRangeUpdater.isEnabled()) { + taskManager.reschedule("tieredStorageZoneUpdater", autoRangeUpdater.getCron(), () -> { + if (this.node.isLocalNodeLeader() && this.node.getSharedProperties() + .getService().getMongo().getMessage().getTieredStorage().getAutoRangeUpdater().isEnabled()) { + LOGGER.info("Updating the zone range for tiered storage"); + ensureZones() + .subscribe(unused -> LOGGER.info("Updated the zone range for tiered storage"), + t -> LOGGER.error("Failed to update the zone range for tiered storage", t)); + } + }); + } } private void initCollections() { @@ -141,7 +167,7 @@ private void initCollections() { } LOGGER.info("Start creating collections..."); Mono createCollections = createCollectionsIfNotExist() - .doOnError(t -> LOGGER.error("Failed to create collections", t)) + .onErrorMap(t -> new IllegalStateException("Failed to create collections", t)) .doOnSuccess(ignored -> LOGGER.info("All collections are created")) .flatMap(exists -> { if (exists && !fakingManager.isFakeIfCollectionExists()) { @@ -149,7 +175,7 @@ private void initCollections() { } return Mono.defer(() -> ensureZones() .then(Mono.defer(this::ensureIndexesAndShard) - .doOnError(t -> LOGGER.error("Failed to ensure indexes and shard", t))) + .onErrorMap(t -> new IllegalStateException("Failed to ensure indexes and shard", t))) .then(Mono.defer(() -> !context.isProduction() && fakingManager.isFakingEnabled() ? fakingManager.fakeData() : Mono.empty()))); @@ -286,62 +312,99 @@ private Mono ensureIndexesAndShard() { private Mono ensureZones() { Mono ensureZones = Mono.empty(); - var map = Map.of(messageTemperatureProperties, messageMongoClient); + var map = Map.of(messageTieredStorageProperties, messageMongoClient); for (var entry : map.entrySet()) { - MultiTemperatureProperties properties = entry.getKey(); + TieredStorageProperties properties = entry.getKey(); if (!properties.isEnabled()) { continue; } TurmsMongoClient client = entry.getValue(); for (MongoEntity entity : client.getRegisteredEntities()) { + String collectionName = entity.collectionName(); ensureZones = ensureZones - .then(Mono.defer(() -> ensureZones(properties, client, entity))); + .then(client.isBalancerRunning()) + .flatMap(isBalancerRunning -> { + if (isBalancerRunning) { + return Mono.error(new IllegalStateException("Failed to ensure zones because the balancer is running")); + } + return client.disableBalancing(collectionName) + .then(Mono.defer(() -> { + LOGGER.info("Deleting the existing tags for the collection " + collectionName); + return client.deleteTags(collectionName) + .doOnSuccess(unused -> LOGGER.info("Deleted the existing tags for the collection " + collectionName)) + .onErrorMap(t -> new IllegalStateException("Failed to the existing tags for the collection " + collectionName)); + })) + .then(Mono.defer(() -> { + LOGGER.info("Adding the shards of the collection {} to zones...", collectionName); + return validateAndEnsureZones(entry.getKey(), client, entity) + .doOnSuccess(unused -> LOGGER.info("Added the shards of the collection {} to zones", collectionName)); + })); + }) + .then(client.enableBalancing(collectionName)); } } - return ensureZones; } - private Mono ensureZones(MultiTemperatureProperties temperatureProperties, - TurmsMongoClient mongoClient, - MongoEntity entity) { + private Mono validateAndEnsureZones(TieredStorageProperties storageProperties, + TurmsMongoClient mongoClient, + MongoEntity entity) { Zone zone = entity.zone(); if (zone == null) { return Mono.empty(); } String collectionName = entity.collectionName(); - LOGGER.info("Adding the shards of the {} collection to zones...", collectionName); - Map temperatures = new LinkedHashMap<>(8); - temperatures.put(collectionName + "_hot", temperatureProperties.getHot()); - temperatures.put(collectionName + "_warm", temperatureProperties.getWarm()); - temperatures.put(collectionName + "_cold", temperatureProperties.getCold()); - temperatures.put(collectionName + "_frozen", temperatureProperties.getFrozen()); + Set> tierEntries = storageProperties.getTiers().entrySet(); + if (tierEntries.isEmpty()) { + return Mono.empty(); + } + int tierSize = tierEntries.size(); + List> tiers = new ArrayList<>(tierSize); + int i = 0; + for (Map.Entry tierEntry : tierEntries) { + StorageTierProperties properties = tierEntry.getValue(); + int days = properties.getDays(); + if (days <= 0 && i != tierSize - 1) { + return Mono.error(new IllegalArgumentException("The days of non-latest tiered storage properties must be more than 0")); + } + if (properties.isEnabled()) { + tiers.add(Pair.of(tierEntry.getKey(), tierEntry.getValue())); + } + i++; + } + if (tiers.isEmpty()) { + return Mono.empty(); + } + return ensureZones(mongoClient, tiers, collectionName, zone); + } + private Mono ensureZones(TurmsMongoClient mongoClient, + List> tiers, + String collectionName, + Zone zone) { Mono ensureZones = Mono.empty(); - Date startDate = new Date(); + LocalDateTime startDate = LocalDate.now().atStartOfDay(); int currentDay = 0; String creationDateFieldName = zone.creationDateFieldName(); - for (Map.Entry entry : temperatures.entrySet()) { - String zoneName = entry.getKey(); - TemperatureProperties properties = entry.getValue(); + for (int i = 0, tierSize = tiers.size(); i < tierSize; i++) { + Pair entry = tiers.get(i); + String zoneName = entry.getFirst(); + StorageTierProperties properties = entry.getSecond(); int days = properties.getDays(); - if (days <= 0) { - throw new IllegalArgumentException("The days of temperature properties must be more than 0"); - } + Object min = i == tierSize - 1 + ? BsonPool.MIN_KEY + : new BsonTimestamp(startDate.minusDays(currentDay + days).toEpochSecond(ZoneOffset.UTC)); + Object max = i == 0 + ? BsonPool.MAX_KEY + : new BsonTimestamp(startDate.minusDays(currentDay).toEpochSecond(ZoneOffset.UTC)); List shards = properties.getShards(); for (String shard : shards) { if (!StringUtils.hasText(shard)) { continue; } - Object min = zoneName.endsWith("frozen") - ? BsonPool.MIN_KEY - : DateUtils.addDays(startDate, -(currentDay + days)); - Object max = zoneName.endsWith("hot") - ? BsonPool.MAX_KEY - : DateUtils.addDays(startDate, -currentDay); ensureZones = ensureZones .then(Mono.defer(() -> mongoClient.addShardToZone(shard, zoneName) - .doOnError(t -> LOGGER.error("Failed to add a shard {} to the zone {}", shard, zoneName, t))) + .onErrorMap(t -> new IllegalStateException("Failed to add a shard %s to the zone %s".formatted(shard, zoneName), t))) .doOnSuccess(unused -> LOGGER.info("Added a shard {} to the zone {}", shard, zoneName))) .then(Mono.defer(() -> { // TODO: support the shard key consisting of multiple fields @@ -351,10 +414,10 @@ private Mono ensureZones(MultiTemperatureProperties temperatureProperties, zoneName, minimum, maximum) - .doOnError(t -> LOGGER.error("Failed to update the zone {} with the key ranges: {} -->> {}", + .onErrorMap(t -> new IllegalStateException("Failed to update the zone %s with the key ranges: %s -->> %s".formatted( zoneName, minimum.toJson(), - maximum.toJson(), + maximum.toJson()), t)) .doOnSuccess(unused -> LOGGER.info("Updated the zone {} with the key ranges: {} -->> {}", zoneName, @@ -364,8 +427,7 @@ private Mono ensureZones(MultiTemperatureProperties temperatureProperties, } currentDay += days; } - return ensureZones - .doOnSuccess(unused -> LOGGER.info("Added the shards of the {} collection to zones", collectionName)); + return ensureZones; } } \ No newline at end of file diff --git a/turms-service/src/main/java/im/turms/service/workflow/dao/domain/message/Message.java b/turms-service/src/main/java/im/turms/service/workflow/dao/domain/message/Message.java index 2ece7a3125..e8880de629 100644 --- a/turms-service/src/main/java/im/turms/service/workflow/dao/domain/message/Message.java +++ b/turms-service/src/main/java/im/turms/service/workflow/dao/domain/message/Message.java @@ -23,6 +23,7 @@ import im.turms.server.common.mongo.entity.annotation.Id; import im.turms.server.common.mongo.entity.annotation.Indexed; import im.turms.server.common.mongo.entity.annotation.Sharded; +import im.turms.server.common.mongo.entity.annotation.TieredStorage; import lombok.Data; import java.util.Date; @@ -37,14 +38,13 @@ * @implNote The shard key is DELIVERY_DATE instead of TARGET_ID. * Being sharded by TARGET_ID, most CRUD operations are efficient for small scale applications, * but Turms is designed for medium and large scale applications, - * so we use DELIVERY_DATE as the shard key to support multi-temperature messages. + * so we use DELIVERY_DATE as the shard key to support tiered storage. */ @Data @Document(Message.COLLECTION_NAME) @CompoundIndex({Message.Fields.DELIVERY_DATE, Message.Fields.TARGET_ID}) @Sharded(shardKey = Message.Fields.DELIVERY_DATE) -// Hide this feature until we deliver a complete solution for it in the next major release -//@WithTemperature(creationDateFieldName = Message.Fields.DELIVERY_DATE) +@TieredStorage(creationDateFieldName = Message.Fields.DELIVERY_DATE) public final class Message { public static final String COLLECTION_NAME = "message"; diff --git a/turms-service/src/main/resources/application-dev.yaml b/turms-service/src/main/resources/application-dev.yaml index 965a2256f5..2cb41cdfa9 100644 --- a/turms-service/src/main/resources/application-dev.yaml +++ b/turms-service/src/main/resources/application-dev.yaml @@ -40,20 +40,24 @@ turms: uri: mongodb://localhost:27017/turms-dev message: uri: mongodb://localhost:27017/turms-dev - temperature: + tiered-storage: enabled: true - hot: - shards: - - shard01 - warm: - shards: - - shard01 - cold: - shards: - - shard01 - frozen: - shards: - - shard01 + tiers: + hot: + days: 30 + shards: + - shard01 + warm: + days: 60 + shards: + - shard01 + cold: + days: 270 + shards: + - shard01 + frozen: + shards: + - shard01 turms-plugin: minio: retry: