Skip to content

Commit

Permalink
feat: changing the command string to a constant, use reflections, rem…
Browse files Browse the repository at this point in the history
…ove useless comment
  • Loading branch information
emptyOVO committed Sep 12, 2024
1 parent f3f42b6 commit 966d585
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;

import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
Expand Down Expand Up @@ -72,6 +73,13 @@ public class RedisSource extends AbstractSource {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisSource.class);
private static final long MAX_DATA_SIZE = 500 * 1024;
private static final int REDIS_QUEUE_SIZE = 10000;
private static final long DEFAULT_FREQ = 60 * 1000;
private static final String GET_COMMAND = "GET";
private static final String MGET_COMMAND = "MGET";
private static final String HGET_COMMAND = "HGET";
private static final String ZSCORE_COMMAND = "ZSCORE";
private static final String ZREVRANK_COMMAND = "ZREVRANK";
private static final String EXISTS_COMMAND = "EXISTS";
private Gson gson;

public InstanceProfile profile;
Expand All @@ -95,6 +103,18 @@ public class RedisSource extends AbstractSource {
private BlockingQueue<SourceData> redisQueue;
private ScheduledExecutorService executor;

// Command handler map
private static final Map<String, CommandHandler> commandHandlers = Maps.newConcurrentMap();

static {
commandHandlers.put(GET_COMMAND, RedisSource::handleGet);
commandHandlers.put(MGET_COMMAND, RedisSource::handleMGet);
commandHandlers.put(HGET_COMMAND, RedisSource::handleHGet);
commandHandlers.put(ZSCORE_COMMAND, RedisSource::handleZScore);
commandHandlers.put(ZREVRANK_COMMAND, RedisSource::handleZRevRank);
commandHandlers.put(EXISTS_COMMAND, RedisSource::handleExists);
}

public RedisSource() {

}
Expand Down Expand Up @@ -132,10 +152,10 @@ protected void initSource(InstanceProfile profile) {
} else {
this.executor = Executors.newScheduledThreadPool(1);
// use command mode
this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, "get");
this.redisCommand = profile.get(TaskConstants.TASK_REDIS_COMMAND, GET_COMMAND);
this.fieldOrMember = profile.get(TaskConstants.TASK_REDIS_FIELD_OR_MEMBER, null);
// default frequency 1min
long syncFreq = profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, 60 * 1000);
long syncFreq = profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, DEFAULT_FREQ);
this.jedis = new Jedis(uri);
jedis.connect();
executor.scheduleWithFixedDelay(startJedisSync(), 0, syncFreq, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -172,63 +192,72 @@ private Runnable startJedisSync() {

private Map<String, Object> fetchDataByJedis(Jedis jedis, String command, List<String> keys, String fieldOrMember) {
Map<String, Object> result = new HashMap<>();
switch (command.toUpperCase()) {
case "GET":
// use pipeline to decrease IO cost
Pipeline pipeline = jedis.pipelined();
for (String key : keys) {
pipeline.get(key);
}
List<Object> getValues = pipeline.syncAndReturnAll();
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), getValues.get(i));
}
break;
CommandHandler handler = commandHandlers.get(command.toUpperCase());
if (handler != null) {
handler.handle(jedis, keys, fieldOrMember, result);
} else {
LOGGER.error("Unsupported command: " + command);
throw new UnsupportedOperationException("Unsupported command: " + command);
}
return result;
}

case "MGET":
List<String> mGetValues = jedis.mget(keys.toArray(new String[0]));
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), mGetValues.get(i));
}
break;
private static void handleGet(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) {
Pipeline pipeline = jedis.pipelined();
for (String key : keys) {
pipeline.get(key);
}
List<Object> getValues = pipeline.syncAndReturnAll();
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), getValues.get(i));
}
}

case "HGET":
for (String key : keys) {
String value = jedis.hget(key, fieldOrMember);
result.put(key, value);
}
break;
private static void handleMGet(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) {
List<String> mGetValues = jedis.mget(keys.toArray(new String[0]));
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), mGetValues.get(i));
}
}

case "ZSCORE":
for (String key : keys) {
if (!StringUtils.isEmpty(fieldOrMember)) {
Double score = jedis.zscore(key, fieldOrMember);
result.put(key, score);
}
}
break;
private static void handleHGet(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) {
for (String key : keys) {
String value = jedis.hget(key, fieldOrMember);
result.put(key, value);
}
}

case "ZREVRANK":
for (String key : keys) {
if (StringUtils.isEmpty(fieldOrMember)) {
Long rank = jedis.zrevrank(key, fieldOrMember);
result.put(key, rank);
}
}
break;
private static void handleZScore(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) {
for (String key : keys) {
if (!StringUtils.isEmpty(fieldOrMember)) {
Double score = jedis.zscore(key, fieldOrMember);
result.put(key, score);
}
}
}

case "EXISTS":
for (String key : keys) {
boolean exists = jedis.exists(key);
result.put(key, exists);
}
break;
private static void handleZRevRank(Jedis jedis, List<String> keys, String fieldOrMember,
Map<String, Object> result) {
for (String key : keys) {
if (!StringUtils.isEmpty(fieldOrMember)) {
Long rank = jedis.zrevrank(key, fieldOrMember);
result.put(key, rank);
}
}
}

default:
LOGGER.info("Unsupported command: " + command);
throw new UnsupportedOperationException("Unsupported command: " + command);
private static void handleExists(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result) {
for (String key : keys) {
boolean exists = jedis.exists(key);
result.put(key, exists);
}
return result;
}

// Functional interface for handling commands
@FunctionalInterface
private interface CommandHandler {

void handle(Jedis jedis, List<String> keys, String fieldOrMember, Map<String, Object> result);
}

private void synchronizeData(String data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public void testFetchDataByJedis_Hget()
@Test
public void testFetchDataByJedis_Exists()
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// 模拟 EXISTS 命令的返回值
when(jedis.exists("key1")).thenReturn(true);
when(jedis.exists("key2")).thenReturn(false);

Expand All @@ -245,15 +244,12 @@ public void testFetchDataByJedis_Exists()
expectedData.put("key1", true);
expectedData.put("key2", false);

// 调用 fetchDataByJedis
Method method = RedisSource.class.getDeclaredMethod("fetchDataByJedis", Jedis.class, String.class, List.class,
String.class);
method.setAccessible(true);
Map<String, Object> result = (Map<String, Object>) method.invoke(redisSource, jedis, "EXISTS", keys, null);
// 验证结果是否与预期匹配
assertEquals(expectedData, result);

// 验证 exists 被调用
verify(jedis, times(1)).exists("key1");
verify(jedis, times(1)).exists("key2");
executor.shutdown();
Expand Down

0 comments on commit 966d585

Please sign in to comment.