Skip to content

Commit

Permalink
✨ feat: Support transfer files.
Browse files Browse the repository at this point in the history
  • Loading branch information
jarvis2f committed Jan 23, 2025
1 parent 22c9fb6 commit fb511b4
Show file tree
Hide file tree
Showing 32 changed files with 2,208 additions and 163 deletions.
5 changes: 4 additions & 1 deletion api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
testImplementation 'io.vertx:vertx-junit5:4.5.11'
testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'org.mockito:mockito-core:5.15.2'
}

test {
Expand All @@ -43,7 +44,9 @@ test {
}
}
}
jvmArgs "-Djava.library.path=${System.getenv('TDLIB_PATH')}"
jvmArgs "-Djava.library.path=${System.getenv('TDLIB_PATH')}",
"-XX:+EnableDynamicAgentLoading",
"-javaagent:${configurations.testRuntimeClasspath.find { it.name.contains('mockito-core') }}"
environment "APP_ROOT", "${testClassesDirs.asPath}"
useJUnitPlatform()
finalizedBy jacocoTestReport
Expand Down
83 changes: 21 additions & 62 deletions api/src/main/java/telegram/files/AutoDownloadVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import telegram.files.repository.SettingAutoRecords;
import telegram.files.repository.SettingKey;

import java.util.*;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -40,10 +43,17 @@ public class AutoDownloadVerticle extends AbstractVerticle {
// telegramId -> messages
private final Map<Long, LinkedList<TdApi.Message>> waitingDownloadMessages = new ConcurrentHashMap<>();

private final SettingAutoRecords autoRecords = new SettingAutoRecords();
private final SettingAutoRecords autoRecords;

private int limit = DEFAULT_LIMIT;

public AutoDownloadVerticle(AutoRecordsHolder autoRecordsHolder) {
this.autoRecords = autoRecordsHolder.autoRecords();
autoRecordsHolder.registerOnRemoveListener(removedItems -> removedItems.forEach(item ->
waitingDownloadMessages.getOrDefault(item.telegramId, new LinkedList<>())
.removeIf(m -> m.chatId == item.chatId)));
}

@Override
public void start(Promise<Void> startPromise) {
initAutoDownload()
Expand Down Expand Up @@ -77,37 +87,17 @@ public void stop(Promise<Void> stopPromise) {
}

private Future<Void> initAutoDownload() {
return Future.all(
DataVerticle.settingRepository.<Integer>getByKey(SettingKey.autoDownloadLimit)
.onSuccess(limit -> {
if (limit != null) {
this.limit = limit;
}
})
.onFailure(e -> log.error("Get Auto download limit failed!", e)),
DataVerticle.settingRepository.<SettingAutoRecords>getByKey(SettingKey.autoDownload)
.onSuccess(settingAutoRecords -> {
if (settingAutoRecords == null) {
return;
}
settingAutoRecords.items.forEach(item -> HttpVerticle.getTelegramVerticle(item.telegramId)
.ifPresentOrElse(telegramVerticle -> {
if (telegramVerticle.authorized) {
autoRecords.add(item);
} else {
log.warn("Init auto download fail. Telegram verticle not authorized: %s".formatted(item.telegramId));
}
}, () -> log.warn("Init auto download fail. Telegram verticle not found: %s".formatted(item.telegramId))));
})
.onFailure(e -> log.error("Init Auto download failed!", e))
).mapEmpty();
return DataVerticle.settingRepository.<Integer>getByKey(SettingKey.autoDownloadLimit)
.onSuccess(limit -> {
if (limit != null) {
this.limit = limit;
}
})
.onFailure(e -> log.error("Get Auto download limit failed!", e))
.mapEmpty();
}

private Future<Void> initEventConsumer() {
vertx.eventBus().consumer(EventEnum.AUTO_DOWNLOAD_UPDATE.address(), message -> {
log.debug("Auto download update: %s".formatted(message.body()));
this.onAutoRecordsUpdate(Json.decodeValue(message.body().toString(), SettingAutoRecords.class));
});
vertx.eventBus().consumer(EventEnum.SETTING_UPDATE.address(SettingKey.autoDownloadLimit.name()), message -> {
log.debug("Auto download limit update: %s".formatted(message.body()));
this.limit = Convert.toInt(message.body(), DEFAULT_LIMIT);
Expand Down Expand Up @@ -250,7 +240,7 @@ private void download(long telegramId) {
.toList();
downloadMessages.forEach(message -> {
Integer fileId = TdApiHelp.getFileId(message);
log.debug("Start process file: %s".formatted(fileId));
log.debug("Start download file: %s".formatted(fileId));
telegramVerticle.startDownload(message.chatId, message.id, fileId)
.onSuccess(v -> log.info("Start download file success! ChatId: %d MessageId:%d FileId:%d"
.formatted(message.chatId, message.id, fileId))
Expand All @@ -266,37 +256,6 @@ private TelegramVerticle getTelegramVerticle(long telegramId) {
.orElseThrow(() -> new IllegalArgumentException("Telegram verticle not found: %s".formatted(telegramId)));
}

private void onAutoRecordsUpdate(SettingAutoRecords records) {
for (SettingAutoRecords.Item item : records.items) {
if (!autoRecords.exists(item.telegramId, item.chatId)) {
// new enabled
HttpVerticle.getTelegramVerticle(item.telegramId)
.ifPresentOrElse(telegramVerticle -> {
if (telegramVerticle.authorized) {
autoRecords.add(item);
log.info("Add auto download success: %s".formatted(item.uniqueKey()));
} else {
log.warn("Add auto download fail. Telegram verticle not authorized: %s".formatted(item.telegramId));
}
}, () -> log.warn("Add auto download fail. Telegram verticle not found: %s".formatted(item.telegramId)));
}
}
// remove disabled
List<SettingAutoRecords.Item> removedItems = new ArrayList<>();
autoRecords.items.removeIf(item -> {
if (records.exists(item.telegramId, item.chatId)) {
return false;
}
removedItems.add(item);
log.info("Remove auto download success: %s".formatted(item.uniqueKey()));
return true;
});
removedItems.forEach(item ->
waitingDownloadMessages.getOrDefault(item.telegramId, new LinkedList<>())
.removeIf(message -> message.chatId == item.chatId)
);
}

private void onNewMessage(JsonObject jsonObject) {
long telegramId = jsonObject.getLong("telegramId");
long chatId = jsonObject.getLong("chatId");
Expand Down
80 changes: 80 additions & 0 deletions api/src/main/java/telegram/files/AutoRecordsHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package telegram.files;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import io.vertx.core.Future;
import telegram.files.repository.SettingAutoRecords;
import telegram.files.repository.SettingKey;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class AutoRecordsHolder {
private final Log log = LogFactory.get();

private final SettingAutoRecords autoRecords = new SettingAutoRecords();

private final List<Consumer<List<SettingAutoRecords.Item>>> onRemoveListeners = new ArrayList<>();

public AutoRecordsHolder() {
}

public SettingAutoRecords autoRecords() {
return autoRecords;
}

public void registerOnRemoveListener(Consumer<List<SettingAutoRecords.Item>> onRemove) {
onRemoveListeners.add(onRemove);
}

public Future<Void> init() {
return DataVerticle.settingRepository.<SettingAutoRecords>getByKey(SettingKey.autoDownload)
.onSuccess(settingAutoRecords -> {
if (settingAutoRecords == null) {
return;
}
settingAutoRecords.items.forEach(item -> HttpVerticle.getTelegramVerticle(item.telegramId)
.ifPresentOrElse(telegramVerticle -> {
if (telegramVerticle.authorized) {
autoRecords.add(item);
} else {
log.warn("Init auto records fail. Telegram verticle not authorized: %s".formatted(item.telegramId));
}
}, () -> log.warn("Init auto records fail. Telegram verticle not found: %s".formatted(item.telegramId))));
})
.onFailure(e -> log.error("Init auto records failed!", e))
.mapEmpty();
}

public void onAutoRecordsUpdate(SettingAutoRecords records) {
for (SettingAutoRecords.Item item : records.items) {
if (!autoRecords.exists(item.telegramId, item.chatId)) {
// new enabled
HttpVerticle.getTelegramVerticle(item.telegramId)
.ifPresentOrElse(telegramVerticle -> {
if (telegramVerticle.authorized) {
autoRecords.add(item);
log.info("Add auto records success: %s".formatted(item.uniqueKey()));
} else {
log.warn("Add auto records fail. Telegram verticle not authorized: %s".formatted(item.telegramId));
}
}, () -> log.warn("Add auto records fail. Telegram verticle not found: %s".formatted(item.telegramId)));
}
}
// remove disabled
List<SettingAutoRecords.Item> removedItems = new ArrayList<>();
autoRecords.items.removeIf(item -> {
if (records.exists(item.telegramId, item.chatId)) {
return false;
}
removedItems.add(item);
log.info("Remove auto records success: %s".formatted(item.uniqueKey()));
return true;
});
if (CollUtil.isNotEmpty(removedItems)) {
onRemoveListeners.forEach(listener -> listener.accept(removedItems));
}
}
}
20 changes: 17 additions & 3 deletions api/src/main/java/telegram/files/HttpVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.ext.web.sstore.LocalSessionStore;
import io.vertx.ext.web.sstore.SessionStore;
import org.drinkless.tdlib.TdApi;
import telegram.files.repository.SettingAutoRecords;
import telegram.files.repository.SettingKey;
import telegram.files.repository.SettingRecord;
import telegram.files.repository.TelegramRecord;
Expand All @@ -47,15 +48,17 @@ public class HttpVerticle extends AbstractVerticle {
// session id -> telegram verticle
private final Map<String, TelegramVerticle> sessionTelegramVerticles = new ConcurrentHashMap<>();

private AutoDownloadVerticle autoDownloadVerticle;
private final AutoRecordsHolder autoRecordsHolder = new AutoRecordsHolder();

private static final String SESSION_COOKIE_NAME = "tf";

@Override
public void start(Promise<Void> startPromise) {
initHttpServer()
.compose(r -> initTelegramVerticles())
.compose(r -> autoRecordsHolder.init())
.compose(r -> initAutoDownloadVerticle())
.compose(r -> initTransferVerticle())
.compose(r -> initEventConsumer())
.onSuccess(startPromise::complete)
.onFailure(startPromise::fail);
Expand Down Expand Up @@ -212,13 +215,20 @@ public Future<Void> initTelegramVerticles() {
}

public Future<Void> initAutoDownloadVerticle() {
autoDownloadVerticle = new AutoDownloadVerticle();
return vertx.deployVerticle(autoDownloadVerticle, new DeploymentOptions()
return vertx.deployVerticle(new AutoDownloadVerticle(autoRecordsHolder), new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
)
.mapEmpty();
}

public Future<Void> initTransferVerticle() {
return vertx.deployVerticle(new TransferVerticle(autoRecordsHolder), new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
)
.mapEmpty();
}


private Future<Void> initEventConsumer() {
vertx.eventBus().consumer(EventEnum.TELEGRAM_EVENT.address(), message -> {
log.debug("Received telegram event: %s".formatted(message.body()));
Expand All @@ -237,6 +247,10 @@ private Future<Void> initEventConsumer() {
});
});

vertx.eventBus().consumer(EventEnum.AUTO_DOWNLOAD_UPDATE.address(), message -> {
log.debug("Auto download update: %s".formatted(message.body()));
autoRecordsHolder.onAutoRecordsUpdate(Json.decodeValue(message.body().toString(), SettingAutoRecords.class));
});
return Future.succeededFuture();
}

Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/telegram/files/MessyUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package telegram.files;

import java.io.File;
import java.io.FileInputStream;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;

public class MessyUtils {

Expand All @@ -10,4 +16,49 @@ public static LocalDateTime withGrouping5Minutes(LocalDateTime time) {
int newMinute = minuteGroup * 5;
return time.withMinute(newMinute).withSecond(0).withNano(0);
}

public static String calculateFileMD5(File file) {
try (FileInputStream fis = new FileInputStream(file);
FileChannel channel = fis.getChannel()) {

MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, file.length());

MessageDigest md = MessageDigest.getInstance("MD5");

md.update(buffer);

byte[] md5Bytes = md.digest();
StringBuilder hexString = new StringBuilder();
for (byte b : md5Bytes) {
hexString.append(String.format("%02x", b));
}

return hexString.toString();
} catch (Exception e) {
return null;
}
}

public static boolean compareFilesMD5(File file1, File file2) {
CompletableFuture<String> md5Task1 = CompletableFuture.supplyAsync(() -> {
try {
return calculateFileMD5(file1);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

CompletableFuture<String> md5Task2 = CompletableFuture.supplyAsync(() -> {
try {
return calculateFileMD5(file2);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

String md5File1 = md5Task1.join();
String md5File2 = md5Task2.join();

return md5File1.equals(md5File2);
}
}
2 changes: 1 addition & 1 deletion api/src/main/java/telegram/files/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class Start {
private static final Log log = LogFactory.get();

public static final String VERSION = "0.1.11";
public static final String VERSION = "0.1.12";

private static final CountDownLatch shutdownLatch = new CountDownLatch(1);

Expand Down
Loading

0 comments on commit fb511b4

Please sign in to comment.