Skip to content

Commit

Permalink
Fixed redis Rx channels
Browse files Browse the repository at this point in the history
  • Loading branch information
cjmalloy committed Apr 10, 2024
1 parent 3188b4c commit 8ba2eb8
Showing 1 changed file with 65 additions and 16 deletions.
81 changes: 65 additions & 16 deletions src/main/java/jasper/config/RedisConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
Expand Down Expand Up @@ -103,6 +104,46 @@ public class RedisConfig {
@Autowired
MessageChannel templateRxChannel;

@Bean
public MessageChannel cursorRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel refRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel tagRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel responseRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel userRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel extRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel pluginRedisChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel templateRedisChannel() {
return new DirectChannel();
}

@Bean("integration")
public TaskExecutor taskExecutor() {
var executor = new ThreadPoolTaskExecutor();
Expand Down Expand Up @@ -137,8 +178,9 @@ protected byte[] getMessage(Message<String> message) {

@Bean
public IntegrationFlow redisSubscribeCursorFlow() {
return IntegrationFlows.from(cursorRxChannel)
return IntegrationFlows.from(cursorRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(cursorRxChannel)
.get();
}

Expand All @@ -150,7 +192,7 @@ public RedisMessageListenerContainer redisCursorRxAdapter(RedisConnectionFactory
var cursor = new String(message.getBody());
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
cursorRxChannel.send(MessageBuilder.createMessage(cursor, originHeaders(origin)));
cursorRedisChannel().send(MessageBuilder.createMessage(cursor, originHeaders(origin)));
}, of("cursor/*"));
return container;
}
Expand Down Expand Up @@ -180,8 +222,9 @@ protected byte[] getMessage(Message<RefDto> message) {

@Bean
public IntegrationFlow redisSubscribeRefFlow() {
return IntegrationFlows.from(refRxChannel)
return IntegrationFlows.from(refRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(refRxChannel)
.get();
}

Expand All @@ -194,7 +237,7 @@ public RedisMessageListenerContainer redisRefRxAdapter(RedisConnectionFactory re
var ref = objectMapper.readValue(message.getBody(), RefDto.class);
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
refRxChannel.send(MessageBuilder.createMessage(ref, refHeaders(origin, ref)));
refRedisChannel().send(MessageBuilder.createMessage(ref, refHeaders(origin, ref)));
} catch (IOException e) {
logger.error("Error parsing RefDto from redis.");
}
Expand Down Expand Up @@ -222,8 +265,9 @@ protected byte[] getMessage(Message<String> message) {

@Bean
public IntegrationFlow redisSubscribeTagFlow() {
return IntegrationFlows.from(tagRxChannel)
return IntegrationFlows.from(tagRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(tagRxChannel)
.get();
}

Expand All @@ -235,7 +279,7 @@ public RedisMessageListenerContainer redisTagRxAdapter(RedisConnectionFactory re
var tag = new String(message.getBody(), StandardCharsets.UTF_8);
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
tagRxChannel.send(MessageBuilder.createMessage(tag, tagHeaders(origin, tag)));
tagRedisChannel().send(MessageBuilder.createMessage(tag, tagHeaders(origin, tag)));
}, of("tag/*"));
return container;
}
Expand All @@ -260,8 +304,9 @@ protected byte[] getMessage(Message<String> message) {

@Bean
public IntegrationFlow redisSubscribeResponseFlow() {
return IntegrationFlows.from(responseRxChannel)
return IntegrationFlows.from(responseRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(responseRxChannel)
.get();
}

Expand All @@ -274,7 +319,7 @@ public RedisMessageListenerContainer redisResponseRxAdapter(RedisConnectionFacto
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
var source = parts[2];
responseRxChannel.send(MessageBuilder.createMessage(response, responseHeaders(origin, source)));
responseRedisChannel().send(MessageBuilder.createMessage(response, responseHeaders(origin, source)));
}, of("response/*"));
return container;
}
Expand Down Expand Up @@ -304,8 +349,9 @@ protected byte[] getMessage(Message<UserDto> message) {

@Bean
public IntegrationFlow redisSubscribeUserFlow() {
return IntegrationFlows.from(userRxChannel)
return IntegrationFlows.from(userRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(userRxChannel)
.get();
}

Expand All @@ -319,7 +365,7 @@ public RedisMessageListenerContainer redisUserRxAdapter(RedisConnectionFactory r
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
var tag = parts[2];
userRxChannel.send(MessageBuilder.createMessage(user, tagHeaders(origin, tag)));
userRedisChannel().send(MessageBuilder.createMessage(user, tagHeaders(origin, tag)));
} catch (IOException e) {
logger.error("Error parsing UserDto from redis.");
}
Expand Down Expand Up @@ -352,8 +398,9 @@ protected byte[] getMessage(Message<ExtDto> message) {

@Bean
public IntegrationFlow redisSubscribeExtFlow() {
return IntegrationFlows.from(extRxChannel)
return IntegrationFlows.from(extRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(extRxChannel)
.get();
}

Expand All @@ -367,7 +414,7 @@ public RedisMessageListenerContainer redisExtRxAdapter(RedisConnectionFactory re
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
var tag = parts[2];
userRxChannel.send(MessageBuilder.createMessage(user, tagHeaders(origin, tag)));
userRedisChannel().send(MessageBuilder.createMessage(user, tagHeaders(origin, tag)));
} catch (IOException e) {
logger.error("Error parsing ExtDto from redis.");
}
Expand Down Expand Up @@ -400,8 +447,9 @@ protected byte[] getMessage(Message<PluginDto> message) {

@Bean
public IntegrationFlow redisSubscribePluginFlow() {
return IntegrationFlows.from(pluginRxChannel)
return IntegrationFlows.from(pluginRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(pluginRxChannel)
.get();
}

Expand All @@ -415,7 +463,7 @@ public RedisMessageListenerContainer redisPluginRxAdapter(RedisConnectionFactory
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
var tag = parts[2];
pluginRxChannel.send(MessageBuilder.createMessage(plugin, tagHeaders(origin, tag)));
pluginRedisChannel().send(MessageBuilder.createMessage(plugin, tagHeaders(origin, tag)));
} catch (IOException e) {
logger.error("Error parsing PluginDto from redis.");
}
Expand Down Expand Up @@ -448,8 +496,9 @@ protected byte[] getMessage(Message<TemplateDto> message) {

@Bean
public IntegrationFlow redisSubscribeTemplateFlow() {
return IntegrationFlows.from(templateRxChannel)
return IntegrationFlows.from(templateRedisChannel())
.channel(new ExecutorChannel(taskExecutor))
.channel(templateRxChannel)
.get();
}

Expand All @@ -463,7 +512,7 @@ public RedisMessageListenerContainer redisTemplateRxAdapter(RedisConnectionFacto
var parts = new String(message.getChannel(), StandardCharsets.UTF_8).split("/");
var origin = parts[1];
var tag = parts[2];
templateRxChannel.send(MessageBuilder.createMessage(template, tagHeaders(origin, tag)));
templateRedisChannel().send(MessageBuilder.createMessage(template, tagHeaders(origin, tag)));
} catch (IOException e) {
logger.error("Error parsing TemplateDto from redis.");
}
Expand Down

0 comments on commit 8ba2eb8

Please sign in to comment.