Skip to content

Commit

Permalink
[INLONG-10650][Agent] When the installer updates the configuration, i…
Browse files Browse the repository at this point in the history
…t first checks the version
  • Loading branch information
justinwwhuang committed Jul 18, 2024
1 parent 89c965b commit a2ee47a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ private Runnable configFetchThread() {
while (isRunnable()) {
try {
ConfigResult config = getConfig();
if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)) {
if (config != null && config.getCode().equals(AgentResponseCode.SUCCESS)
&& manager.getModuleManager().getCurrentVersion() < config.getVersion()) {
manager.getModuleManager().submitConfig(config);
}
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class ModuleManager extends AbstractDaemon {
private final String confPath;
private final BlockingQueue<ConfigResult> configQueue;
private String currentMd5 = "";
private Integer currentVersion = -1;
private Map<Integer, ModuleConfig> currentModules = new ConcurrentHashMap<>();
private static final GsonBuilder gsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Gson GSON = gsonBuilder.create();
Expand Down Expand Up @@ -218,6 +219,10 @@ public String getCurrentMd5() {
return currentMd5;
}

public Integer getCurrentVersion() {
return currentVersion;
}

public ModuleConfig getModule(Integer moduleId) {
return currentModules.get(moduleId);
}
Expand Down Expand Up @@ -255,7 +260,10 @@ public void restoreFromLocalFile(String confPath) {
JsonElement tmpElement = JsonParser.parseReader(reader).getAsJsonObject();
ConfigResult curConfig = GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class);
if (curConfig.getMd5() != null && curConfig.getModuleList() != null) {
currentMd5 = curConfig.getMd5();
if (curConfig.getMd5() != null && currentVersion != null) {
currentMd5 = curConfig.getMd5();
currentVersion = curConfig.getVersion();
}
curConfig.getModuleList().forEach((module) -> {
currentModules.put(module.getId(), module);
});
Expand All @@ -277,7 +285,7 @@ public void saveToLocalFile(String confPath) {
File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE);
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(jsonPath), StandardCharsets.UTF_8))) {
String curConfig = GSON.toJson(ConfigResult.builder().md5(currentMd5)
String curConfig = GSON.toJson(ConfigResult.builder().md5(currentMd5).version(currentVersion)
.moduleList(currentModules.values().stream().collect(Collectors.toList())).build());
writer.write(curConfig);
writer.flush();
Expand All @@ -299,6 +307,7 @@ private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) {
}
if (updateModules(config.getModuleList())) {
currentMd5 = config.getMd5();
currentVersion = config.getVersion();
saveToLocalFile(confPath);
} else {
LOGGER.error("update modules failed!");
Expand All @@ -308,13 +317,14 @@ private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) {
private void checkModules() {
LOGGER.info("check modules start");
currentModules.values().forEach((module) -> {
LOGGER.info("check module current state {} {}", module.getName(), module.getState());
LOGGER.info("check module {}({}) current state {}", module.getId(), module.getName(), module.getState());
switch (module.getState()) {
case NEW:
if (downloadModule(module)) {
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
} else {
LOGGER.error("download module {} failed, keep state in new", module.getName());
LOGGER.error("download module {}({}) failed, keep state in new", module.getId(),
module.getName());
}
break;
case DOWNLOADED:
Expand All @@ -323,22 +333,24 @@ private void checkModules() {
saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
} else {
LOGGER.info(
"check module {} package failed, change stated to new, will download package again",
module.getName());
"check module {}({}) package failed, change stated to new, will download package again",
module.getId(), module.getName());
saveModuleState(module.getId(), ModuleStateEnum.NEW);
}
break;
case INSTALLED:
if (!isProcessAllStarted(module)) {
LOGGER.info("module {} process not all started try to start", module.getName());
LOGGER.info("module {}({}) process not all started try to start", module.getId(),
module.getName());
if (!startModule(module)) {
LOGGER.info("start module {} failed, change state to downloaded", module.getState());
LOGGER.info("start module {}({}) failed, change state to downloaded", module.getId(),
module.getName());
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
}
}
break;
default:
LOGGER.error("module {} invalid state {}", module.getName(), module.getState());
LOGGER.error("module {}({}) invalid state {}", module.getId(), module.getName(), module.getState());
}
});
LOGGER.info("check modules end");
Expand All @@ -358,15 +370,15 @@ private void traverseManagerModulesToLocal(Map<Integer, ModuleConfig> modulesFro
modulesFromManager.values().forEach((managerModule) -> {
ModuleConfig localModule = currentModules.get(managerModule.getId());
if (localModule == null) {
LOGGER.info("traverseManagerModulesToLocal module {} {} {} not found in local, add it",
LOGGER.info("traverseManagerModulesToLocal module {}({}) {} not found in local, add it",
managerModule.getId(), managerModule.getName(), managerModule.getVersion());
addModule(managerModule);
} else {
if (managerModule.getMd5().equals(localModule.getMd5())) {
LOGGER.info("traverseManagerModulesToLocal module {} {} {} md5 no change, do nothing",
LOGGER.info("traverseManagerModulesToLocal module {}({}) {} md5 no change, do nothing",
localModule.getId(), localModule.getName(), localModule.getVersion());
} else {
LOGGER.info("traverseManagerModulesToLocal module {} {} {} md5 changed, update it",
LOGGER.info("traverseManagerModulesToLocal module {}({}) {} md5 changed, update it",
localModule.getId(), localModule.getName(), localModule.getVersion());
updateModule(localModule, managerModule);
}
Expand All @@ -378,62 +390,65 @@ private void traverseLocalModulesToManager(Map<Integer, ModuleConfig> modulesFro
currentModules.values().forEach((localModule) -> {
ModuleConfig managerModule = modulesFromManager.get(localModule.getId());
if (managerModule == null) {
LOGGER.info("traverseLocalModulesToManager module {} {} {} not found in local, delete it",
LOGGER.info("traverseLocalModulesToManager module {}({}) {} not found in local, delete it",
localModule.getId(), localModule.getName(), localModule.getVersion());
deleteModule(localModule);
}
});
}

private void addModule(ModuleConfig module) {
LOGGER.info("add module {} start", module.getName());
LOGGER.info("add module {}({}) start", module.getId(), module.getName());
addAndSaveModuleConfig(module);
if (!downloadModule(module)) {
LOGGER.error("add module {} but download failed", module.getName());
LOGGER.error("add module {}({}) but download failed", module.getId(), module.getName());
return;
}
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
installModule(module);
saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
startModule(module);
LOGGER.info("add module {} end", module.getId());
LOGGER.info("add module {}({}) end", module.getId(), module.getName());
}

private void deleteModule(ModuleConfig module) {
LOGGER.info("delete module {} start", module.getId());
LOGGER.info("delete module {}({}) start", module.getId(), module.getName());
stopModule(module);
uninstallModule(module);
deleteAndSaveModuleConfig(module);
LOGGER.info("delete module {} end", module.getId());
LOGGER.info("delete module {}({}) end", module.getId(), module.getName());
}

private void updateModule(ModuleConfig localModule, ModuleConfig managerModule) {
LOGGER.info("update module {} start", localModule.getId());
LOGGER.info("update module {}({}) start", localModule.getId(), localModule.getName());
if (localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5())) {
LOGGER.info("module {} package md5 no change, will restart and save config", localModule.getId());
LOGGER.info("module {}({}) package md5 no change, will restart and save config", localModule.getId(),
localModule.getName());
restartModule(localModule, managerModule);
managerModule.setState(localModule.getState());
updateModuleConfig(managerModule);
} else {
LOGGER.info("module {} package md5 changed, will reinstall", localModule.getId());
LOGGER.info("module {}({}) package md5 changed, will reinstall", localModule.getId(),
localModule.getName());
deleteModule(localModule);
addModule(managerModule);
}
LOGGER.info("update module {} end", localModule.getId());
LOGGER.info("update module {}({}) end", localModule.getId(), localModule.getName());
}

private void addAndSaveModuleConfig(ModuleConfig module) {
module.setState(ModuleStateEnum.NEW);
if (currentModules.containsKey(module.getId())) {
LOGGER.error("should not happen! module {} found! will force to replace it!", module.getId());
LOGGER.error("should not happen! module {}({}) found! will force to replace it!", module.getId(),
module.getName());
}
currentModules.put(module.getId(), module);
saveToLocalFile(confPath);
}

private void deleteAndSaveModuleConfig(ModuleConfig module) {
if (!currentModules.containsKey(module.getId())) {
LOGGER.error("should not happen! module {} not found!", module.getId());
LOGGER.error("should not happen! module {}({}) not found!", module.getId(), module.getName());
return;
}
currentModules.remove(module.getId());
Expand All @@ -453,7 +468,7 @@ private boolean saveModuleState(Integer moduleId, ModuleStateEnum state) {
}
module.setState(state);
saveToLocalFile(confPath);
LOGGER.info("save module state to {} {}", moduleId, state);
LOGGER.info("save module {}({}) state to {}", module.getId(), module.getName(), state);
return true;
}

Expand All @@ -463,42 +478,43 @@ private void restartModule(ModuleConfig localModule, ModuleConfig managerModule)
}

private void installModule(ModuleConfig module) {
LOGGER.info("install module {} with cmd {}", module.getId(), module.getInstallCommand());
LOGGER.info("install module {}({}) with cmd {}", module.getId(), module.getName(), module.getInstallCommand());
String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
LOGGER.info("install module {} return {} ", module.getId(), ret);
LOGGER.info("install module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private boolean startModule(ModuleConfig module) {
LOGGER.info("start module {} with cmd {}", module.getId(), module.getStartCommand());
LOGGER.info("start module {}({}) with cmd {}", module.getId(), module.getName(), module.getStartCommand());
for (int i = 0; i < module.getProcessesNum(); i++) {
String ret = ExcuteLinux.exeCmd(module.getStartCommand());
LOGGER.info("start [{}] module {} return {} ", i, module.getId(), ret);
LOGGER.info("start module {}({}) proc[{}] return {} ", module.getId(), module.getName(), i, ret);
}
if (isProcessAllStarted(module)) {
LOGGER.info("start module {} success", module.getId());
LOGGER.info("start module {}({}) success", module.getId(), module.getName());
return true;
} else {
LOGGER.info("start module {} failed", module.getId());
LOGGER.info("start module {}({}) failed", module.getId(), module.getName());
return false;
}
}

private void stopModule(ModuleConfig module) {
LOGGER.info("stop module {} with cmd {}", module.getId(), module.getStopCommand());
LOGGER.info("stop module {}({}) with cmd {}", module.getId(), module.getName(), module.getStopCommand());
String ret = ExcuteLinux.exeCmd(module.getStopCommand());
LOGGER.info("stop module {} return {} ", module.getId(), ret);
LOGGER.info("stop module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private void uninstallModule(ModuleConfig module) {
LOGGER.info("uninstall module {} with cmd {}", module.getId(), module.getUninstallCommand());
LOGGER.info("uninstall module {}({}) with cmd {}", module.getId(), module.getName(),
module.getUninstallCommand());
String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
LOGGER.info("uninstall module {}({}) return {} ", module.getId(), module.getName(), ret);
}

private boolean isProcessAllStarted(ModuleConfig module) {
String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
if (ret == null) {
LOGGER.error("get module process num {} failed", module.getName());
LOGGER.error("get module {}({}) process num failed", module.getId(), module.getName());
return false;
}
String[] processArray = ret.split("\n");
Expand All @@ -508,12 +524,12 @@ private boolean isProcessAllStarted(ModuleConfig module) {
cnt++;
}
}
LOGGER.info("get module process num {} {}", module.getName(), cnt);
LOGGER.info("get module {}({}) process num {}", module.getId(), module.getName(), cnt);
return cnt >= module.getProcessesNum();
}

private boolean downloadModule(ModuleConfig module) {
LOGGER.info("download module {} begin with url {}", module.getId(),
LOGGER.info("download module {}({}) begin with url {}", module.getId(), module.getName(),
module.getPackageConfig().getDownloadUrl());
try {
URL url = new URL(module.getPackageConfig().getDownloadUrl());
Expand All @@ -526,7 +542,7 @@ private boolean downloadModule(ModuleConfig module) {
module.getPackageConfig().getStoragePath() + "/" + module.getPackageConfig().getFileName();
try (InputStream inputStream = conn.getInputStream();
FileOutputStream outputStream = new FileOutputStream(path)) {
LOGGER.info("save path {}", path);
LOGGER.info("module {}({}) save path {}", module.getId(), module.getName(), path);
int byteRead;
byte[] buffer = new byte[DOWNLOAD_PACKAGE_READ_BUFF_SIZE];
while ((byteRead = inputStream.read(buffer)) != -1) {
Expand All @@ -536,15 +552,15 @@ private boolean downloadModule(ModuleConfig module) {
if (isPackageDownloaded(module)) {
return true;
} else {
LOGGER.error("download package md5 not match!");
LOGGER.error("download module {}({}) package md5 not match!", module.getId(), module.getName());
return false;
}
} catch (FileNotFoundException e) {
LOGGER.error("download module err", e);
LOGGER.error("download module {}({}) err", module.getId(), module.getName(), e);
} catch (IOException e) {
LOGGER.error("download module err", e);
LOGGER.error("download module {}({}) err", module.getId(), module.getName(), e);
}
LOGGER.info("download module {} end", module.getId());
LOGGER.info("download module {}({}) end", module.getId(), module.getName());
return false;
}

Expand All @@ -554,7 +570,8 @@ private boolean isPackageDownloaded(ModuleConfig module) {
if (Objects.equals(fileMd5, module.getPackageConfig().getMd5())) {
return true;
} else {
LOGGER.error("md5 not match! fileMd5 {} moduleMd5 {}", fileMd5, module.getPackageConfig().getMd5());
LOGGER.error("module {}({}) md5 not match! fileMd5 {} moduleMd5 {}", module.getId(), module.getName(),
fileMd5, module.getPackageConfig().getMd5());
return false;
}
}
Expand Down

0 comments on commit a2ee47a

Please sign in to comment.