Skip to content

Commit

Permalink
feat: subscribing operations can it be configured
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO committed Sep 12, 2024
1 parent 86470db commit 0d3d4c6
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_REDIS_KEYS = "task.redisTask.keys";
public static final String TASK_REDIS_FIELD_OR_MEMBER = "task.redisTask.fieldOrMember";
public static final String TASK_REDIS_IS_SUBSCRIBE = "task.redisTask.isSubscribe";
public static final String TASK_REDIS_SUBOPERATION = "task.redisTask.subOperation";
public static final String TASK_REDIS_SYNC_FREQ = "task.redisTask.syncFreq";

public static final String TASK_STATE = "task.state";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class RedisTask {
private String fieldOrMember;
private Boolean isSubscribe;
private String syncFreq;
private String subOperations;

@Data
public static class RedisTaskConfig {
Expand All @@ -53,5 +54,6 @@ public static class RedisTaskConfig {
private String fieldOrMember;
private Boolean isSubscribe;
private String syncFreq;
private String subOperations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ private static RedisTask getRedisTask(DataConfig dataConfig) {
redisTask.setFieldOrMember(config.getFieldOrMember());
redisTask.setIsSubscribe(config.getIsSubscribe());
redisTask.setSyncFreq(config.getSyncFreq());
redisTask.setSubOperations(config.getSubOperations());

return redisTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class RedisSource extends AbstractSource {
private boolean destroyed;
private boolean isSubscribe;
private Set<String> keys;
private Set<String> subOperations;
private Replicator redisReplicator;
private BlockingQueue<SourceData> redisQueue;
private ScheduledExecutorService executor;
Expand Down Expand Up @@ -145,10 +146,12 @@ protected void initSource(InstanceProfile profile) {
try {
if (isSubscribe) {
// use subscribe mode
this.subOperations = new ConcurrentSkipListSet<>(
Arrays.asList(profile.get(TaskConstants.TASK_REDIS_SUBOPERATION).split(",")));
this.executor = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
redisReplicator = new RedisReplicator(uri);
this.redisReplicator = new RedisReplicator(uri);
initReplicator();
executor.execute(startReplicatorSync());
this.executor.execute(startReplicatorSync());
} else {
this.executor = Executors.newScheduledThreadPool(1);
// use command mode
Expand All @@ -158,7 +161,7 @@ protected void initSource(InstanceProfile profile) {
long syncFreq = profile.getLong(TaskConstants.TASK_REDIS_SYNC_FREQ, DEFAULT_FREQ);
this.jedis = new Jedis(uri);
jedis.connect();
executor.scheduleWithFixedDelay(startJedisSync(), 0, syncFreq, TimeUnit.MILLISECONDS);
this.executor.scheduleWithFixedDelay(startJedisSync(), 0, syncFreq, TimeUnit.MILLISECONDS);
}
} catch (URISyntaxException | IOException | JedisConnectionException e) {
sourceMetric.pluginReadFailCount.addAndGet(1);
Expand Down Expand Up @@ -389,6 +392,34 @@ private String getRedisUri() {
}

private void initReplicator() {
if (!subOperations.isEmpty()) {
DefaultCommandParser replicatorCommandParser = new DefaultCommandParser();
for (String subOperation : subOperations) {
this.redisReplicator.addCommandParser(CommandName.name(subOperation), replicatorCommandParser);
}
this.redisReplicator.addEventListener((replicator, event) -> {
if (event instanceof DefaultCommand) {
DefaultCommand defaultCommand = (DefaultCommand) event;
Object[] args = defaultCommand.getArgs();
if (args[0] instanceof byte[]) {
String key = new String((byte[]) args[0], StandardCharsets.UTF_8);
if (keys.contains(key)) {
synchronizeData(gson.toJson(event));
}
}
}
if (event instanceof PostRdbSyncEvent) {
this.snapShot = String.valueOf(replicator.getConfiguration().getReplOffset());
LOGGER.info("after rdb snapShot is: {}", snapShot);
}
});
} else {
// if SubOperation is not configured, subscribe all modification
initDefaultReplicator();
}
}

private void initDefaultReplicator() {
DefaultCommandParser defaultCommandParser = new DefaultCommandParser();
this.redisReplicator.addCommandParser(CommandName.name("APPEND"), defaultCommandParser);
this.redisReplicator.addCommandParser(CommandName.name("SET"), defaultCommandParser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private void initProfile() {
final String streamId = "stream01";
final String keys = "age,name,sex";
final String command = "zscore";
final String subOperation = "set,del";

TaskProfile taskProfile = helper.getTaskProfile(1, "", false, 0L, 0L, TaskStateEnum.RUNNING, "D",
"GMT+8:00");
Expand All @@ -133,6 +134,7 @@ private void initProfile() {
profile.set(TaskConstants.TASK_REDIS_COMMAND, command);
profile.set(TaskConstants.TASK_REDIS_KEYS, keys);
profile.set(TaskConstants.TASK_AUDIT_VERSION, "0");
profile.set(TaskConstants.TASK_REDIS_SUBOPERATION, subOperation);
profile.setInstanceId(instanceId);
}

Expand Down

0 comments on commit 0d3d4c6

Please sign in to comment.