From 48ec4e835bef37a8d714df0ce79f39c7cdc06b6f Mon Sep 17 00:00:00 2001 From: linlinwang3 Date: Tue, 18 Dec 2018 15:48:13 +0800 Subject: [PATCH 1/5] https://github.com/uavorg/uavstack/issues/462 Redis supports cluster configuration --- com.creditease.uav.cache.redis/pom.xml | 38 +- .../uav/cache/api/CacheManager.java | 60 +- .../uav/cache/redis/LettuceAsyncService.java | 644 ++++++++++++++++++ .../uav/cache/redis/api/CacheFactory.java | 9 +- 4 files changed, 735 insertions(+), 16 deletions(-) create mode 100644 com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java diff --git a/com.creditease.uav.cache.redis/pom.xml b/com.creditease.uav.cache.redis/pom.xml index 02e4532d..67c6b05c 100644 --- a/com.creditease.uav.cache.redis/pom.xml +++ b/com.creditease.uav.cache.redis/pom.xml @@ -37,6 +37,42 @@ commons-logging commons-logging 1.2 - + + + io.netty + netty-all + 4.1.11.Final + + + io.lettuce + lettuce-core + 5.0.5.RELEASE + + + io.netty + netty-common + + + io.netty + netty-transport + + + io.netty + netty-buffer + + + io.netty + netty-resolver + + + io.netty + netty-handler + + + io.netty + netty-codec + + + \ No newline at end of file diff --git a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java index 828d422a..f073b02e 100644 --- a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java +++ b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/api/CacheManager.java @@ -430,10 +430,20 @@ public static void build(String cacheServerAddress, int minConcurrent, int maxCo private CacheService service; private L1Cache l1cache; - + + private static CacheClientMode clientMode; + protected CacheManager(String cacheServerAddress, int minConcurrent, int maxConcurrent, int queueSize, String password) { - service = CacheFactory.instance().createCacheService(CacheClientMode.AREDIS, cacheServerAddress, minConcurrent, + + if (cacheServerAddress.contains(",")) { + clientMode = CacheClientMode.LETTUCE; + } + else { + clientMode = CacheClientMode.AREDIS; + } + + service = CacheFactory.instance().createCacheService(clientMode, cacheServerAddress, minConcurrent, maxConcurrent, queueSize, password); l1cache = new L1Cache(); } @@ -1239,7 +1249,8 @@ public void putHash(String region, String key, Map fieldValues, * @param fieldNames * @return */ - public Map getHash(String region, String key, String... fieldNames) { + @SuppressWarnings("unchecked") + public Map getHash(String region, String key, String... fieldNames) { if (null == fieldNames) { Collections.emptyMap(); @@ -1253,8 +1264,13 @@ public Map getHash(String region, String key, String... fieldNam if (null != results && results.length > 0) { - Map value = genHMGetResults(fieldNames, results); - + Map value = new HashMap(); + if (clientMode.equals(CacheClientMode.LETTUCE) && results[0] instanceof Map) { + value = (Map) results[0]; + } + else { + value = genHMGetResults(fieldNames, results); + } return value; } @@ -1321,8 +1337,13 @@ public Map getHashAll(String region, String key) { if (null != results && results.length > 0) { - Map value = genHMGetResults(null, results); - + Map value = new HashMap(); + if (clientMode.equals(CacheClientMode.LETTUCE) && results[0] instanceof Map) { + value = (Map) results[0]; + } + else { + value = genHMGetResults(null, results); + } this.l1cache.put(rkey, value); return value; @@ -1362,7 +1383,14 @@ public void getHashAll(String region, String key, AsyncCacheCallback 0) { - Map mresult = genHMGetResults(null, result); + Map mresult = new HashMap(); + + if (clientMode.equals(CacheClientMode.LETTUCE) && result[0] instanceof Map) { + mresult = (Map) result[0]; + } + else { + mresult = genHMGetResults(null, result); + } /** * get hash all是可以放L1Cache @@ -1403,12 +1431,18 @@ public void getHash(String region, String key, AsyncCacheCallback() { - @Override + @SuppressWarnings("unchecked") + @Override public void process(CommandInfo[] command, Object[] result, Throwable throwable) { - if (fcallback != null && null != result && result.length > 0) { - Map mresult = genHMGetResults(ffieldname, result); - + if (fcallback != null && null != result && result.length > 0) { + Map mresult = new HashMap(); + if (clientMode.equals(CacheClientMode.LETTUCE) && result[0] instanceof Map) { + mresult = (Map) result[0]; + } + else { + mresult = genHMGetResults(ffieldname, result); + } fcallback.onResult(mresult); } } @@ -1795,7 +1829,7 @@ public List lrange(String region, String key, int start, int end) { Object[] results = service.submitCommands( new CommandInfo(CommandInfo.RedisCommand.LRANGE, rkey, String.valueOf(start), String.valueOf(end))); - if (null != results && results.length > 0) { + if (null != results && results.length > 0 && results[0] != null) { Object[] objs = (Object[]) results[0]; diff --git a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java new file mode 100644 index 00000000..daa604b2 --- /dev/null +++ b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/LettuceAsyncService.java @@ -0,0 +1,644 @@ +/*- + * << + * UAVStack + * == + * Copyright (C) 2016 - 2018 UAVStack + * == + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * >> + */ + +package com.creditease.uav.cache.redis; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.creditease.agent.helpers.StringHelper; +import com.creditease.agent.log.SystemLogger; +import com.creditease.agent.log.api.ISystemLogger; +import com.creditease.uav.cache.redis.api.AbstractAsyncHandler; +import com.creditease.uav.cache.redis.api.CacheService; +import com.creditease.uav.cache.redis.api.CommandInfo; + +import io.lettuce.core.KeyValue; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands; + +/** + * LettuceAsyncService description: lettuce实现类 + * + * @param + * + */ +public class LettuceAsyncService implements CacheService { + + static ISystemLogger logger = SystemLogger.getLogger(LettuceAsyncService.class); + private static StatefulRedisClusterConnection connect; + private static RedisClusterClient client; + private static long expireTimeLong = 10; + + public enum EnumMethod { + HKEYS() { + + @Override + public Object send(String[] params) { + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture> result = commandAsync.hkeys(params[0]); + List res = null; + try { + res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + + HGETALL() { + + @Override + public Object send(String[] params) { + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture> result = commandAsync.hgetall(params[0]); + try { + Map res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + HDEL() { + + @Override + public Object send(String[] params) { + // 异步不要求返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.hdel(params[0], + Arrays.copyOfRange(params, 1, params.length)); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + + } + }, + RPUSH() { + + @Override + public Object send(String[] params) { + // 异步不要求返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture results = commandAsync.rpush(params[0], new String[] { params[1] }); + lcr.setResult(results); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + RPOP() { + + @Override + public Object send(String[] params) { + // 无论同步异步都要求返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.rpop(params[0]); + try { + String res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LSET() { + + @Override + public Object send(String[] params) { + // 异步不要求返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.lset(params[0], Long.parseLong(params[1]), params[2]); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LREM() { + + @Override + public Object send(String[] params) { + // 同步异步都要求返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + try { + RedisFuture result = commandAsync.lrem(params[0], Long.parseLong(params[1]), params[2]); + long res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult((int) res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setResult(0); + lcr.setRunState(false); + } + return lcr; + } + }, + LRANGE() { + + @Override + public Object send(String[] params) { + // 同步异步均需要返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + RedisFuture> result = commandAsync.lrange(params[0], Long.parseLong(params[1]), + Long.parseLong(params[2])); + + try { + List res = result.get(expireTimeLong, TimeUnit.SECONDS); + + if (res == null || res.isEmpty()) { + lcr.setResult(null); + } + else { + String[] resultArray = new String[res.size()]; + for (int i = 0; i < res.size(); i++) { + resultArray[i] = res.get(i).toString(); + } + lcr.setResult(resultArray); + } + + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setResult(Collections.emptyList()); + lcr.setRunState(false); + } + return lcr; + } + }, + LPUSH() { + + @Override + public Object send(String[] params) { + // 只有异步操作 不需要返回值 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.lpush(params[0], new String[] { params[1] }); + lcr.setResult(result); + lcr.setRunState(true); + + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LPOP() { + + @Override + public Object send(String[] params) { + // 同步异步均需返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture results = commandAsync.lpop(params[0]); + try { + String res = results.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + LLEN() { + + @Override + public Object send(String[] params) { + // 同步异步均需返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + try { + RedisFuture result = commandAsync.llen(params[0]); + long res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult((int) res); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setResult(0); + lcr.setRunState(false); + } + return lcr; + } + }, + LINDEX() { + + @Override + public Object send(String[] params) { + // 同步异步均需返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.lindex(params[0], Long.parseLong(params[1])); + String res = ""; + try { + res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + } + }, + INCR() { + + @Override + public Object send(String[] params) { + // 只有同步操作 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.incr(params[0]); + String res = ""; + try { + res = String.valueOf(result.get(expireTimeLong, TimeUnit.SECONDS)); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + } + }, + + DECR() { + + @Override + public Object send(String[] params) { + // 只有同步操作 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.decr(params[0]); + String res = ""; + try { + res = String.valueOf(result.get(expireTimeLong, TimeUnit.SECONDS)); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + + } + }, + HMSET() { + + @Override + public Object send(String[] params) { + // 只有异步 + Map mapValues = new HashMap(); + for (int i = 1; i < params.length; i++) { + mapValues.put(params[i].toString(), params[i + 1].toString()); + i++; + } + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commands = connect.async(); + + try { + RedisFuture result = commands.hmset(params[0], mapValues); + String res = result.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(result); + lcr.setRunState(res.equals("OK")); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + }, + HMGET() { + + @Override + public Object send(String[] params) { + // 同步与异步均需要返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture>> result = commandAsync.hmget(params[0], + Arrays.copyOfRange(params, 1, params.length)); + Map results = new HashMap(); + List> resultMap = null; + try { + resultMap = result.get(expireTimeLong, TimeUnit.SECONDS); + } + catch (Exception e) { + lcr.setRunState(false); + return lcr; + } + + for (int i = 0; i < resultMap.size(); i++) { + if (resultMap.get(i).hasValue() == true) { + results.put(resultMap.get(i).getKey(), resultMap.get(i).getValue()); + } + } + lcr.setResult(results); + lcr.setRunState(true); + return lcr; + } + + }, + EXPIRE() { + + @Override + public Object send(String[] params) { + // 只有异步不需要返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + try { + + RedisFuture result = commandAsync.expire(params[0], Long.parseLong(params[1])); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + + return lcr; + } + }, + DEL() { + + @Override + public Object send(String[] params) { + // 只有异步不需要返回结果 + + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.del(params[0]); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + + return lcr; + } + }, + GET() { + + @Override + public Object send(String[] params) { + // 同步异步都需要返回结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commands = connect.async(); + RedisFuture resultSync = commands.get(params[0]); + String result = null; + try { + result = resultSync.get(expireTimeLong, TimeUnit.SECONDS); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + + return lcr; + } + }, + EXISTS() { + + @Override + public Object send(String[] params) { + // 只有同步 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + RedisFuture result = commandAsync.exists(params[0]); + String res = ""; + try { + res = String.valueOf(result.get(expireTimeLong, TimeUnit.SECONDS)); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + lcr.setResult(res); + return lcr; + + } + + }, + SET() { + + @Override + public Object send(String[] params) { + // 异步不需要结果 + LettuceCommandResult lcr = new LettuceCommandResult(); + RedisAdvancedClusterAsyncCommands commandAsync = connect.async(); + + try { + RedisFuture result = commandAsync.set(params[0], params[1]); + lcr.setResult(result); + lcr.setRunState(true); + } + catch (Exception e) { + lcr.setRunState(false); + } + return lcr; + } + + }; + + private static Map map = new HashMap(); + + static { + for (EnumMethod legEnum : EnumMethod.values()) { + map.put(legEnum.name(), legEnum); + } + } + + public static EnumMethod getMethod(String symbol) { + return map.get(symbol); + } + + public abstract Object send(String[] params); + + } + + /** + * @param redisServerAddress + * @param minConcurrent + * @param maxConcurrent + * @param queueSize + * @param password + */ + public LettuceAsyncService(String redisServerAddress, int minConcurrent, int maxConcurrent, int queueSize) { + this(redisServerAddress, minConcurrent, maxConcurrent, queueSize, null); + } + + public LettuceAsyncService(String redisServerAddress, int minConcurrent, int maxConcurrent, int queueSize, + String password) { + String[] redisCluster = redisServerAddress.split(","); + List nodes = new ArrayList(); + for (int i = 0; i < redisCluster.length; i++) { + String[] uriArray = redisCluster[i].split(":"); + Integer port = Integer.valueOf(uriArray[1]); + if (!StringHelper.isEmpty(password)) { + nodes.add(RedisURI.Builder.redis(uriArray[0], port).withPassword(password).build()); + } + else { + nodes.add(RedisURI.create(uriArray[0], port)); + } + } + + client = RedisClusterClient.create(nodes); + + connect = client.connect(); + + } + + @Override + public void start() { + // Do nothing but must pass sonar check + + } + + @Override + public void shutdown() { + connect.close(); + client.shutdown(); + } + + static class LettuceCommandResult { + + private Object result; + private boolean runState; + + /** + * @return the result + */ + public Object getResult() { + return result; + } + + /** + * @param result + * the result to set + */ + public void setResult(Object result) { + this.result = result; + } + + /** + * @return the runState + */ + public boolean isRunState() { + return runState; + } + + /** + * @param runState + * the runState to set + */ + public void setRunState(boolean runState) { + this.runState = runState; + } + + } + + @Override + public Object[] submitCommands(CommandInfo... commands) { + Object[] result = new Object[commands.length]; + for (int i = 0; i < commands.length; i++) { + LettuceCommandResult lcr = (LettuceCommandResult) EnumMethod.getMethod(commands[i].getCommand().name()) + .send(commands[i].getParam()); + result[i] = lcr.getResult(); + + } + return result; + } + + @Override + public void submitCommands(AbstractAsyncHandler handler, CommandInfo... commands) { + Object[] infos = new Object[commands.length]; + for (int i = 0; i < commands.length; i++) { + LettuceCommandResult lcr = null; + try { + lcr = (LettuceCommandResult) EnumMethod.getMethod(commands[i].getCommand().name()) + .send(commands[i].getParam()); + infos[i] = lcr.getResult(); + commands[i].setState(lcr.isRunState()); + } + catch (Exception e) { + infos[i] = null; + commands[i].setState(false); + } + + } + handler.process(commands, infos, null); + } + +} diff --git a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java index d55fff52..34d37ffa 100644 --- a/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java +++ b/com.creditease.uav.cache.redis/src/main/java/com/creditease/uav/cache/redis/api/CacheFactory.java @@ -21,11 +21,12 @@ package com.creditease.uav.cache.redis.api; import com.creditease.uav.cache.redis.AredisAsyncService; +import com.creditease.uav.cache.redis.LettuceAsyncService; public class CacheFactory { public enum CacheClientMode { - AREDIS + AREDIS, LETTUCE } private static CacheFactory factory = new CacheFactory(); @@ -56,8 +57,12 @@ public CacheService createCacheService(CacheClientMode mode, String redisServerA switch (mode) { case AREDIS: + service = new AredisAsyncService(redisServerAddress, minConcurrent, maxConcurrent, QueueSize, password); + break; + case LETTUCE: default: - service = new AredisAsyncService(redisServerAddress, minConcurrent, maxConcurrent, QueueSize, password); + service = new LettuceAsyncService(redisServerAddress, minConcurrent, maxConcurrent, QueueSize, + password); break; } From 57f744570b9ffab34ffa3ee97c94a71ed7bd2755 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E6=B0=B8=E5=BD=AA?= Date: Fri, 28 Dec 2018 14:07:18 +0800 Subject: [PATCH 2/5] =?UTF-8?q?https://github.com/uavorg/uavstack/issues/4?= =?UTF-8?q?70=20=E8=BF=9B=E7=A8=8B=E6=AD=BB=E4=BA=A1=E5=8C=BA=E5=88=86MA?= =?UTF-8?q?=E6=8C=82=E6=8E=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scheduler/NodeInfoWatcher.java | 94 +++++++++++-------- 1 file changed, 56 insertions(+), 38 deletions(-) diff --git a/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/NodeInfoWatcher.java b/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/NodeInfoWatcher.java index 6256d922..e6559069 100644 --- a/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/NodeInfoWatcher.java +++ b/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/NodeInfoWatcher.java @@ -33,6 +33,7 @@ import com.creditease.agent.ConfigurationManager; import com.creditease.agent.helpers.DataConvertHelper; import com.creditease.agent.helpers.JSONHelper; +import com.creditease.agent.helpers.StringHelper; import com.creditease.agent.monitor.api.MonitorDataFrame; import com.creditease.agent.monitor.api.NotificationEvent; import com.creditease.agent.spi.AbstractTimerWork; @@ -61,7 +62,7 @@ private static class CrashEventObj { private String ip; private String appgroup; - private int deadProcsCount = 0; + private String nodeuuid; private List deadProcsInfo = new ArrayList(); private List deadProcNames = new ArrayList(); @@ -81,14 +82,19 @@ public String getIp() { return ip; } + public String getNodeuuid() { + + return nodeuuid; + } + public int getDeadProcsCount() { - return deadProcsCount; + return deadProcNames.size(); } - public void increDeadProcsCount() { + public void setNodeuuid(String nodeuuid) { - deadProcsCount++; + this.nodeuuid = nodeuuid; } public void addDeadProcName(String name) { @@ -117,7 +123,7 @@ public String getDeadProcsInfoAsString() { StringBuffer sb = new StringBuffer(); for (String dpi : deadProcsInfo) { - sb.append(dpi + "\n"); + sb.append(dpi).append("\n"); } return sb.toString().replace("\\", "\\\\"); @@ -130,12 +136,12 @@ public String getDeadProcsInfoAsString() { private static final String CRASH_PROCS = "rtnotify.dead.procs"; private static final String CRASH_PROCS_DETAIL = "rtnotify.dead.procs.detail"; private static final long LOCK_TIMEOUT = 30 * 1000; - private static final long DEFAULT_CRASH_TIMEOUT = 5 * 60 * 1000; private static final long MIN_RANDOM_PORT = 32768; private CacheManager cm; private CacheLock lock; private int hold; + private int timeout; private boolean isSendMq; private boolean isExchange; @@ -149,6 +155,7 @@ public NodeInfoWatcher(String cName, String feature) { hold = DataConvertHelper.toInt(getConfigManager().getFeatureConfiguration(feature, "nodeinfotimer.period"), 15000); + timeout = DataConvertHelper.toInt(getConfigManager().getFeatureConfiguration(feature, "crash.timeout"), 300000); isSendMq = DataConvertHelper .toBoolean(getConfigManager().getFeatureConfiguration(feature, "nodeinfoprocess.sendmq"), true); @@ -204,7 +211,7 @@ public void run() { /** * Step 3: check if any proc crash */ - judgeProcCrash(); + judgeProcCrash(data); /** * Step 4: push data to runtimenotify mgr or to mq @@ -235,27 +242,27 @@ private List> syncProcInfoToCache(Map data) Map fieldValues = new HashMap(); Map fieldValuesDetail = new HashMap(); - for (String node : data.values()) { - - Map mdfMap = buildMDF(node); + for (Map.Entry entry : data.entrySet()) { + Map mdfMap = buildMDF(entry.getValue()); MonitorDataFrame mdf = new MonitorDataFrame(mdfMap); String time = mdf.getTimeFlag() + ""; List els = mdf.getElemInstances("server", "procState"); for (Map el : els) { try { - String group = mdf.getExt("appgroup"); - String ip = mdf.getIP(); @SuppressWarnings("unchecked") Map m = (Map) el.get("values"); + String hashKey = genProcHashKey(mdf.getIP(), m); - String hashKey = genProcHashKey(ip, m); + Map detail = new HashMap(); + detail.put("appgroup", mdf.getExt("appgroup")); + detail.put("nodeuuid", entry.getKey()); // 分别存时间戳和group fieldValues.put(hashKey, time); - fieldValuesDetail.put(hashKey, group); + fieldValuesDetail.put(hashKey, JSONHelper.toString(detail)); } catch (Exception e) { log.err(this, "Sync ProcInfo To Cache Fail." + " ProcInfo:" + JSONHelper.toString(el), e); @@ -287,29 +294,18 @@ private List> syncProcInfoToCache(Map data) * * 4.时间戳超过进程死亡时间(可配置)则保存至死亡进程list并在redis中删除该进程。 */ - private void judgeProcCrash() { + private void judgeProcCrash(Map data) { if (log.isDebugEnable()) { log.debug(this, "NodeInfoWatcher Judge Crash START."); } - Map allProcs = null; - Map allProcDetails = null; - try { - allProcs = cm.getHashAll(UAV_CACHE_REGION, CRASH_PROCS); - allProcDetails = cm.getHashAll(UAV_CACHE_REGION, CRASH_PROCS_DETAIL); - } - catch (Exception e) { - log.err(this, "Fail to get all process info", e); - return; - } - + Map allProcs = cm.getHashAll(UAV_CACHE_REGION, CRASH_PROCS); + Map allProcDetails = cm.getHashAll(UAV_CACHE_REGION, CRASH_PROCS_DETAIL); if (allProcs == null) { return; } - String cfgTimeout = getConfigManager().getFeatureConfiguration(feature, "crash.timeout"); - long timeout = DataConvertHelper.toLong(cfgTimeout, DEFAULT_CRASH_TIMEOUT); long deadline = System.currentTimeMillis() - timeout; List delKeys = new ArrayList<>(); @@ -406,18 +402,19 @@ private void judgeProcCrash() { for (String key : deadKeys) { Map procDetail = new HashMap(); procDetail.put("deadtime", allProcs.get(key)); - procDetail.put("appgroup", allProcDetails.get(key)); + procDetail.put("detail", allProcDetails.get(key)); deadProcs.put(key, procDetail); } - fireEvent(deadProcs); + fireEvent(deadProcs, data); } /** * 触发预警事件 */ - private void fireEvent(Map> deadProcs) { + @SuppressWarnings("unchecked") + private void fireEvent(Map> deadProcs, Map data) { /** * Step 1: split crash event by IP @@ -430,21 +427,33 @@ private void fireEvent(Map> deadProcs) { String[] procInfo = procKey.split("_", -1); String ip = procInfo[0]; String procName = procInfo[1]; - - String deadtime = en.getValue().get("deadtime"); - String appgroup = en.getValue().get("appgroup"); + + Map map = en.getValue(); + + String deadtime = map.get("deadtime"); + String appgroup; + String nodeuuid = ""; + + if(map.get("detail") != null) { + Map detail = JSONHelper.toObject(map.get("detail"), Map.class); + appgroup = detail.get("appgroup"); + nodeuuid = detail.get("nodeuuid"); + } + else{ + appgroup = map.get("appgroup"); + } CrashEventObj ceo; if (!ips.containsKey(ip)) { ceo = new CrashEventObj(ip, appgroup); + ceo.setNodeuuid(nodeuuid); ips.put(ip, ceo); } else { ceo = ips.get(ip); } - ceo.increDeadProcsCount(); ceo.addDeadProcName(procName); ceo.addDeadProcInfo("触发时间:" + format.format(new Date(Long.parseLong(deadtime))) + ", 进程信息:" + procKey); } @@ -455,9 +464,18 @@ private void fireEvent(Map> deadProcs) { RuntimeNotifyStrategyMgr strategyMgr = (RuntimeNotifyStrategyMgr) getConfigManager().getComponent(this.feature, "RuntimeNotifyStrategyMgr"); for (CrashEventObj ceo : ips.values()) { + + String title; + String nodeuuid = ceo.getNodeuuid(); + if (!StringHelper.isEmpty(nodeuuid) && StringHelper.isEmpty(data.get(nodeuuid))) { + title = "应用组[" + ceo.getAppGroup() + "]的" + ceo.getIp() + "监控代理程序(MonitorAgent)超过" + timeout / 1000 + + "秒没有心跳数据上送"; + } + else { + title = "应用组[" + ceo.getAppGroup() + "]的" + ceo.getIp() + "共发现" + ceo.getDeadProcsCount() + "进程" + + ceo.getDeadProcNamesAsString() + "可疑死掉"; + } - String title = "应用组[" + ceo.getAppGroup() + "]的" + ceo.getIp() + "共发现" + ceo.getDeadProcsCount() + "进程" - + ceo.getDeadProcNamesAsString() + "可疑死掉"; String description = ceo.getDeadProcsInfoAsString(); NotificationEvent event = new NotificationEvent(NotificationEvent.EVENT_RT_ALERT_CRASH, title, description, System.currentTimeMillis(), ceo.getIp(), ""); @@ -466,8 +484,8 @@ private void fireEvent(Map> deadProcs) { * Notification Manager will not block the event, the frozen time has no effect to this event */ event.addArg(NotificationEvent.EVENT_Tag_NoBlock, "true"); - // add appgroup event.addArg("appgroup", ceo.getAppGroup()); + event.addArg("nodeuuid", nodeuuid); NotifyStrategy stra = strategyMgr.seekStrategy("server@procCrash@" + ceo.getIp()); From 5a7c2827694a42b0754de2d2cc6ca9df2516603f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E6=B0=B8=E5=BD=AA?= Date: Fri, 28 Dec 2018 14:08:12 +0800 Subject: [PATCH 3/5] =?UTF-8?q?https://github.com/uavorg/uavstack/issues/4?= =?UTF-8?q?69=20=E9=A2=84=E8=AD=A6=E7=AD=96=E7=95=A5=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E5=85=88=E5=88=A4=E6=96=AD=E8=8E=B7=E5=8F=96=E7=9A=84=E6=96=B0?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E6=98=AF=E5=90=A6=E4=B8=BA=E7=A9=BA=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E7=A9=BA=E5=88=99=E6=B8=85=E7=A9=BA=E5=BD=93=E5=89=8D?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E6=83=85=E5=86=B5=EF=BC=8C=E5=9C=A8=E9=A2=84?= =?UTF-8?q?=E8=AD=A6=E7=AD=96=E7=95=A5=E7=9F=AD=E6=97=B6=E9=97=B4=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E5=90=8E=EF=BC=8C=E7=94=B1=E4=BA=8E=E6=96=B0=E7=AD=96?= =?UTF-8?q?=E7=95=A5=E4=B8=BA=E7=A9=BA=EF=BC=8C=E5=AF=BC=E8=87=B4=E4=B8=8D?= =?UTF-8?q?=E4=BC=9A=E6=B8=85=E7=A9=BA=E5=86=85=E5=AD=98=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E7=AD=96=E7=95=A5=20=E7=8E=B0=E8=B0=83=E6=95=B4=E5=88=A4?= =?UTF-8?q?=E6=96=AD=E5=92=8C=E6=B8=85=E7=A9=BA=E7=9A=84=E9=A1=BA=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runtimenotify/scheduler/RuntimeNotifyStrategyMgr.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/RuntimeNotifyStrategyMgr.java b/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/RuntimeNotifyStrategyMgr.java index 91759c0a..1828ed4d 100644 --- a/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/RuntimeNotifyStrategyMgr.java +++ b/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/scheduler/RuntimeNotifyStrategyMgr.java @@ -219,10 +219,6 @@ public void run() { Map strategyMap = cm.getHashAll(UAV_CACHE_REGION, RT_STRATEGY_KEY); - if (null == strategyMap || strategyMap.isEmpty()) { - return; - } - try { strategyLock.lockInterruptibly(); @@ -232,6 +228,10 @@ public void run() { multiInsts.clear(); strategies.clear(); + if (null == strategyMap || strategyMap.isEmpty()) { + return; + } + for (String key : strategyMap.keySet()) { String json = strategyMap.get(key); NotifyStrategy stra = new NotifyStrategy(key, json); From dfba50d61967d4beb000a0493df7d97ab3e9e586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E6=B0=B8=E5=BD=AA?= Date: Fri, 28 Dec 2018 14:09:11 +0800 Subject: [PATCH 4/5] =?UTF-8?q?https://github.com/uavorg/uavstack/issues/4?= =?UTF-8?q?68=20=E5=90=8C=E7=8E=AF=E6=AF=94=E9=A2=84=E8=AD=A6=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=E4=B8=8B=E9=99=90=E6=97=B6=E5=8F=AF=E8=83=BD=E5=8F=97?= =?UTF-8?q?=E4=B8=8A=E9=99=90=E5=BD=B1=E5=93=8D=E5=AF=BC=E8=87=B4=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../runtimenotify/StrategyJudgement.java | 45 +++++++++---------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/StrategyJudgement.java b/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/StrategyJudgement.java index 6a97450b..d97dcf7d 100644 --- a/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/StrategyJudgement.java +++ b/com.creditease.uav.healthmanager/src/main/java/com/creditease/uav/feature/runtimenotify/StrategyJudgement.java @@ -426,18 +426,10 @@ private boolean isOverdue(long time, long currentTime, Expression expr) { private boolean caculate(double currentValue, double lastValue, NotifyStrategy.Expression expr, Map judgeResult) { - boolean result = false; - double diff = currentValue - lastValue; - - String limitString = ""; - - String upperLimitString = expr.getUpperLimit(); - String lowerLimitString = expr.getLowerLimit(); - // 增幅or降幅 - String upperORlower = ""; double upperLimit = 0; - double lowerLimit = 0; + String upperLimitString = expr.getUpperLimit(); + // get upperLimit if (upperLimitString.contains("#")) { upperLimit = Double.parseDouble(upperLimitString.substring(upperLimitString.indexOf('#') + 1)); @@ -450,6 +442,18 @@ else if (!upperLimitString.contains("*")) { upperLimit = Double.parseDouble(upperLimitString); } + if (!upperLimitString.contains("*") && diff > upperLimit) { + judgeResult.put("actualValue", String.format("%.2f", diff) + (upperLimitString.contains("%") ? "%" : "")); + judgeResult.put("expectedValue", upperLimitString); + judgeResult.put("upperORlower", "upper"); + + return true; + } + + diff = currentValue - lastValue; + double lowerLimit = 0; + String lowerLimitString = expr.getLowerLimit(); + // get lowerLimit if (lowerLimitString.contains("#")) { lowerLimit = 0 - Double.parseDouble(lowerLimitString.substring(lowerLimitString.indexOf('#') + 1)); @@ -462,22 +466,15 @@ else if (!lowerLimitString.contains("*")) { lowerLimit = Double.parseDouble(lowerLimitString); } - if (!upperLimitString.contains("*") && diff > upperLimit) { - result = true; - upperORlower = "upper"; - limitString = upperLimitString; - } - else if (!lowerLimitString.contains("*") && diff < 0 - lowerLimit) { - result = true; - upperORlower = "lower"; - limitString = lowerLimitString; - } + if (!lowerLimitString.contains("*") && diff < 0 - lowerLimit) { + judgeResult.put("actualValue", String.format("%.2f", diff) + (lowerLimitString.contains("%") ? "%" : "")); + judgeResult.put("expectedValue", lowerLimitString); + judgeResult.put("upperORlower", "lower"); - judgeResult.put("actualValue", String.format("%.2f", diff) + (limitString.contains("%") ? "%" : "")); - judgeResult.put("expectedValue", limitString); - judgeResult.put("upperORlower", upperORlower); - return result; + return true; + } + return false; } @SuppressWarnings({"rawtypes" }) From cbede2a32c6a973d6d80de541677e9c91d65b763 Mon Sep 17 00:00:00 2001 From: xingShengLi Date: Fri, 28 Dec 2018 14:36:09 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=EF=BC=9A=20=E5=85=B3=E9=94=AE=E5=AD=97=E6=90=9C=E7=B4=A2?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E5=A6=82=E6=9E=9C=E5=9C=A8=E5=85=B3=E9=94=AE?= =?UTF-8?q?=E5=AD=97=E5=89=8D=E6=88=96=E5=90=8E=E5=8F=AA=E5=8A=A0=E4=B8=8A?= =?UTF-8?q?=E7=A9=BA=E6=A0=BC=EF=BC=8C=E5=88=99=E6=97=A5=E5=BF=97=E5=86=85?= =?UTF-8?q?=E5=AE=B9=E5=B1=95=E7=A4=BA=E6=97=B6=E4=BC=9A=E5=8C=85=E5=90=AB?= =?UTF-8?q?=E9=AB=98=E4=BA=AE=E6=98=BE=E7=A4=BA=E7=9A=84=E6=A0=87=E7=AD=BE?= =?UTF-8?q?=E7=AC=A6=E5=8F=B7=EF=BC=8C=E9=9C=80=E5=AF=B9=E6=AD=A4=E7=A7=8D?= =?UTF-8?q?=E6=83=85=E5=86=B5=E5=81=9A=E5=85=BC=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/webapp/uavapp_godeye/appmonitor/js/uav.log.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/com.creditease.uav.console/src/main/webapp/uavapp_godeye/appmonitor/js/uav.log.js b/com.creditease.uav.console/src/main/webapp/uavapp_godeye/appmonitor/js/uav.log.js index 40203d09..31c8bf97 100644 --- a/com.creditease.uav.console/src/main/webapp/uavapp_godeye/appmonitor/js/uav.log.js +++ b/com.creditease.uav.console/src/main/webapp/uavapp_godeye/appmonitor/js/uav.log.js @@ -398,6 +398,9 @@ function NewLogTool(app) { for(var i=0;i