Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: auto batch flush (more than 2x times performance gain) #2950

Open
wants to merge 35 commits into
base: main
Choose a base branch
from

Conversation

okg-cxf
Copy link

@okg-cxf okg-cxf commented Aug 8, 2024

Refer to issue: #2945 #2302

Make sure that:

  • [Y] You have read the contribution guidelines.
  • [Y] You have created a feature request first to discuss your contribution intent. Please reference the feature request ticket number in the pull request.
  • [Y] You applied code formatting rules using the mvn formatter:format target. Don’t submit any formatting related changes.
  • [Y] You submit test cases (unit or integration tests) that back your changes.

The optimization idea comes from group commit in database technology.

Bench result:
Test Env: AWS EC2: t2.2xlarge
Redis Version: 7.1.0
Redis Server: cache.r7g.large
Test Model: multi thread sync get, 512 threads, each thread do 10,000 get
Benchmark code:

package io.lettuce.bench;

import java.text.NumberFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;

import io.lettuce.bench.utils.BenchUtils;
import io.lettuce.core.AutoBatchFlushOptions;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.internal.LettuceAssert;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
 * @author chenxiaofan
 */
public class MultiThreadSyncGet {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultiThreadSyncGet.class);

    private static final int THREAD_COUNT = 32;

    private static final int LOOP_NUM = 500_000;

    private static final int DIGIT_NUM = 9;

    private static final String KEY_FORMATTER = String.format("key-%%0%dd", DIGIT_NUM);

    private static final String VALUE_FORMATTER = String.format("value-%%0%dd", DIGIT_NUM);

    static {
        // noinspection ConstantValue
        LettuceAssert.assertState(DIGIT_NUM >= String.valueOf(LOOP_NUM).length() + 1, "digit num is not large enough");
    }

    void test(boolean useBatchFlush) {
        try (RedisClient redisClient = RedisClient.create(RedisURI.create("127.0.0.1", 6379))) {
            final ClientOptions.Builder optsBuilder = ClientOptions.builder()
                    .timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(7200)).build());
            if (useBatchFlush) {
                optsBuilder.autoBatchFlushOptions(AutoBatchFlushOptions.builder().enableAutoBatchFlush(true).build());
            }
            redisClient.setOptions(optsBuilder.build());
            final StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(ByteArrayCodec.INSTANCE);

            logger.info("thread count: {}", THREAD_COUNT);
            final Thread[] threads = new Thread[THREAD_COUNT];
            final AtomicLong totalCount = new AtomicLong();
            final AtomicLong totalLatency = new AtomicLong();
            for (int i = 0; i < THREAD_COUNT; i++) {
                threads[i] = new Thread(() -> {
                    for (int j = 0; j < LOOP_NUM; j++) {
                        final long cmdStart = System.nanoTime();
                        final byte[] resultBytes = connection.sync().get(genKey(j));
                        totalLatency.addAndGet((System.nanoTime() - cmdStart) / 1000);
                        LettuceAssert.assertState(Arrays.equals(genValue(j), resultBytes), "value not match");
                        totalCount.incrementAndGet();
                    }
                });
            }
            final long start = System.nanoTime();
            Arrays.asList(threads).forEach(Thread::start);
            Arrays.asList(threads).forEach(thread -> {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException(e);
                }
            });
            double costInSeconds = (System.nanoTime() - start) / 1_000_000_000.0;
            logger.info("Total commands: {}", NumberFormat.getInstance().format(totalCount.get()));
            logger.info("Total time: {}s", costInSeconds);
            logger.info("Avg latency: {}us", totalLatency.get() / (double) totalCount.get());
            logger.info("Avg QPS: {}/s", totalCount.get() / costInSeconds);
            BenchUtils.logEnterRatioIfNeeded(logger);
            BenchUtils.logAvgBatchCount(logger);
        }
    }

    private byte[] genKey(int j) {
        return String.format(KEY_FORMATTER, j).getBytes();
    }

    private byte[] genValue(int j) {
        return String.format(VALUE_FORMATTER, j).getBytes();
    }

    public static void main(String[] args) {
        for (boolean useBatchFlush : new boolean[] { true, false }) {
            logger.info("=====================================");
            logger.info("useBatchFlush: {}", useBatchFlush);
            new MultiThreadSyncGet().test(useBatchFlush);
        }
        logger.info("=====================================");
    }

}

Bench result:

=====================================
10:53:11,074 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:129) - useBatchFlush: true
10:53:11,074 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:126) - batch size: 512
10:53:58,552 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:74) - Total commands: 5,120,000
10:53:58,552 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:75) - Total time: 47.009796512s
10:53:58,553 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:76) - Avg latency: 4693.2784484375us
10:53:58,553 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:77) - Avg QPS: 108913.46867866229/s
=====================================
10:53:58,597 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:129) - useBatchFlush: false
10:55:53,352 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:74) - Total commands: 5,120,000
10:55:53,352 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:75) - Total time: 114.732157413s
10:55:53,352 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:76) - Avg latency: 11458.427779882812us
10:55:53,353 INFO  [main] bench.MultiThreadSyncExists (MultiThreadSyncExists.java:77) - Avg QPS: 44625.67527227433/s
=====================================

@okg-cxf
Copy link
Author

okg-cxf commented Aug 8, 2024

@tishun Hi, bro, could you help evaluate this PR? Got huge performance gain!

@okg-cxf okg-cxf changed the title Perf/auto batch flush perf: auto batch flush Aug 8, 2024
@okg-cxf okg-cxf changed the title perf: auto batch flush perf: auto batch flush (more than 2x times performance gain) Aug 8, 2024
@okg-cxf okg-cxf force-pushed the perf/auto-batch-flush branch 3 times, most recently from 1817719 to 2058db3 Compare August 8, 2024 15:38
@Roiocam
Copy link

Roiocam commented Aug 16, 2024

batching will cause latency, AFAIK, netty already has flush balance handler for this: FlushConsolidationHandler

…ble activation command in drainStackUponChannelInactive()
@tishun
Copy link
Collaborator

tishun commented Aug 26, 2024

@tishun Hi, bro, could you help evaluate this PR? Got huge performance gain!

Hey @okg-cxf , thanks for filing this PR.

Since it is a large change that touches quite critical pieces of the code I will need some more time to go through it.

Currently I am mainly focused on #2933 but I will come back to this issue as soon as possible.

…e before autoBatchFlushEndPointContext.done(1) othewise the flyingTaskNum could be negative; 2, make sure lastEventLoop is never null
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants