Skip to content

Enhance RedisItemReader/Writer performance with pipelining #4940

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,21 +18,27 @@
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.*;
import org.springframework.util.Assert;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;

/**
* Item reader for Redis based on Spring Data Redis. Uses a {@link RedisTemplate} to query
* data. The user should provide a {@link ScanOptions} to specify the set of keys to
* query.
* query. The {@code fetchSize} property controls how many items are fetched from Redis in
* a single pipeline round-trip for efficiency.
*
* <p>
* The implementation is not thread-safe and not restartable.
* </p>
*
* @author Mahmoud Ben Hassine
* @author Hyunwoo Jung
* @since 5.1
* @param <K> type of keys
* @param <V> type of values
Expand All @@ -43,13 +49,20 @@ public class RedisItemReader<K, V> implements ItemStreamReader<V> {

private final ScanOptions scanOptions;

private final int fetchSize;

private final Deque<V> buffer;

private Cursor<K> cursor;

public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions) {
public RedisItemReader(RedisTemplate<K, V> redisTemplate, ScanOptions scanOptions, int fetchSize) {
Assert.notNull(redisTemplate, "redisTemplate must not be null");
Assert.notNull(scanOptions, "scanOptions must no be null");
Assert.isTrue(fetchSize > 0, "fetchSize must be greater than 0");
this.redisTemplate = redisTemplate;
this.scanOptions = scanOptions;
this.fetchSize = fetchSize;
this.buffer = new ArrayDeque<>();
}

@Override
Expand All @@ -59,18 +72,45 @@ public void open(ExecutionContext executionContext) throws ItemStreamException {

@Override
public V read() throws Exception {
if (this.cursor.hasNext()) {
K nextKey = this.cursor.next();
return this.redisTemplate.opsForValue().get(nextKey);
}
else {
return null;
if (this.buffer.isEmpty()) {
fetchNext();
}

return this.buffer.pollFirst();
}

@Override
public void close() throws ItemStreamException {
this.cursor.close();
}

private void fetchNext() {
List<K> keys = new ArrayList<>();
while (this.cursor.hasNext() && keys.size() < this.fetchSize) {
keys.add(this.cursor.next());
}

if (keys.isEmpty()) {
return;
}

@SuppressWarnings("unchecked")
List<V> items = (List<V>) this.redisTemplate.executePipelined(sessionCallback(keys));

this.buffer.addAll(items);
}

private SessionCallback<Object> sessionCallback(List<K> keys) {
return new SessionCallback<>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
for (K key : keys) {
operations.opsForValue().get(key);
}

return null;
}
};
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,37 +18,53 @@

import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.KeyValueItemWriter;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.util.Pair;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.List;

/**
* <p>
* An {@link ItemWriter} implementation for Redis using a {@link RedisTemplate} .
* </p>
*
* @author Santiago Molano
* @author Mahmoud Ben Hassine
* @author Hyunwoo Jung
* @since 5.1
*/
public class RedisItemWriter<K, T> extends KeyValueItemWriter<K, T> {

private RedisTemplate<K, T> redisTemplate;

private final List<Pair<K, T>> buffer = new ArrayList<>();

@Override
protected void writeKeyValue(K key, T value) {
if (this.delete) {
this.redisTemplate.delete(key);
}
else {
this.redisTemplate.opsForValue().set(key, value);
}
this.buffer.add(Pair.of(key, value));
}

@Override
protected void init() {
Assert.notNull(this.redisTemplate, "RedisTemplate must not be null");
}

@Override
protected void flush() throws Exception {
if (this.buffer.isEmpty()) {
return;
}

this.redisTemplate.executePipelined(sessionCallback());

this.buffer.clear();
}

/**
* Set the {@link RedisTemplate} to use.
* @param redisTemplate the template to use
Expand All @@ -57,4 +73,33 @@ public void setRedisTemplate(RedisTemplate<K, T> redisTemplate) {
this.redisTemplate = redisTemplate;
}

private SessionCallback<Object> sessionCallback() {
return new SessionCallback<>() {

@SuppressWarnings("unchecked")
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
if (RedisItemWriter.this.delete) {
executeDeleteOperations(operations);
}
else {
executeSetOperations(operations);
}
return null;
}
};
}

private void executeDeleteOperations(RedisOperations<K, T> operations) {
for (Pair<K, T> item : this.buffer) {
operations.delete(item.getFirst());
}
}

private void executeSetOperations(RedisOperations<K, T> operations) {
for (Pair<K, T> item : this.buffer) {
operations.opsForValue().set(item.getFirst(), item.getSecond());
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
* Builder for {@link RedisItemReader}.
*
* @author Mahmoud Ben Hassine
* @author Hyunwoo Jung
* @since 5.1
* @param <K> type of keys
* @param <V> type of values
Expand All @@ -33,6 +34,8 @@ public class RedisItemReaderBuilder<K, V> {

private ScanOptions scanOptions;

private int fetchSize;

/**
* Set the {@link RedisTemplate} to use in the reader.
* @param redisTemplate the template to use
Expand All @@ -53,12 +56,22 @@ public RedisItemReaderBuilder<K, V> scanOptions(ScanOptions scanOptions) {
return this;
}

/**
* Set the fetchSize to how many items from Redis in a single round-trip.
* @param fetchSize the number of items to fetch per pipeline execution
* @return the current builder instance for fluent chaining
*/
public RedisItemReaderBuilder<K, V> fetchSize(int fetchSize) {
this.fetchSize = fetchSize;
return this;
}

/**
* Build a new {@link RedisItemReader}.
* @return a new item reader
*/
public RedisItemReader<K, V> build() {
return new RedisItemReader<>(this.redisTemplate, this.scanOptions);
return new RedisItemReader<>(this.redisTemplate, this.scanOptions, this.fetchSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void testRead(RedisConnectionFactory connectionFactory) throws Exception {

RedisTemplate<String, Person> redisTemplate = setUpRedisTemplate(connectionFactory);
ScanOptions scanOptions = ScanOptions.scanOptions().match("person:*").count(10).build();
this.reader = new RedisItemReader<>(redisTemplate, scanOptions);
this.reader = new RedisItemReader<>(redisTemplate, scanOptions, 10);

this.reader.open(new ExecutionContext());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,20 +16,30 @@
package org.springframework.batch.item.redis;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.SessionCallback;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

/**
* @author Mahmoud Ben Hassine
* @author Hyunwoo Jung
*/
@ExtendWith(MockitoExtension.class)
public class RedisItemReaderTests {
class RedisItemReaderTests {

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private RedisTemplate<String, String> redisTemplate;
Expand All @@ -40,15 +50,36 @@ public class RedisItemReaderTests {
@Mock
private Cursor<String> cursor;

private List<String> results;

@BeforeEach
void setUp() {
this.results = new ArrayList<>();

when(this.redisTemplate.executePipelined(any(SessionCallback.class))).thenAnswer(invocation -> {
SessionCallback<?> sessionCallback = invocation.getArgument(0);
sessionCallback.execute(this.redisTemplate);
return this.results;
});
}

@Test
void testRead() throws Exception {
// given
Mockito.when(this.redisTemplate.scan(this.scanOptions)).thenReturn(this.cursor);
Mockito.when(this.cursor.hasNext()).thenReturn(true, true, false);
Mockito.when(this.cursor.next()).thenReturn("person:1", "person:2");
Mockito.when(this.redisTemplate.opsForValue().get("person:1")).thenReturn("foo");
Mockito.when(this.redisTemplate.opsForValue().get("person:2")).thenReturn("bar");
RedisItemReader<String, String> redisItemReader = new RedisItemReader<>(this.redisTemplate, this.scanOptions);
when(this.redisTemplate.scan(this.scanOptions)).thenReturn(this.cursor);
when(this.cursor.hasNext()).thenReturn(true, true, false);
when(this.cursor.next()).thenReturn("person:1", "person:2");
when(this.redisTemplate.opsForValue().get("person:1")).thenAnswer(invocation -> {
results.add("foo");
return null;
});
when(this.redisTemplate.opsForValue().get("person:2")).thenAnswer(invocation -> {
results.add("bar");
return null;
});

RedisItemReader<String, String> redisItemReader = new RedisItemReader<>(this.redisTemplate, this.scanOptions,
10);
redisItemReader.open(new ExecutionContext());

// when
Expand Down
Loading
Loading