Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RocketMQ broker busy, start flow control for a while原因 #11

Open
2pc opened this issue Feb 22, 2022 · 0 comments
Open

RocketMQ broker busy, start flow control for a while原因 #11

2pc opened this issue Feb 22, 2022 · 0 comments
Labels

Comments

@2pc
Copy link
Owner

2pc commented Feb 22, 2022

直接全局搜源码,是BrokerFastFailure里边启动的线程BrokerFastFailureScheduledThread执行的

public class BrokerFastFailure {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "BrokerFastFailureScheduledThread"));
//省略部分代码
    public void start() {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
                    cleanExpiredRequest();
                }
            }
        }, 1000, 10, TimeUnit.MILLISECONDS);//每隔10ms执行一次
    }

如果isOSPageCacheBusy就会设置RequestTask的returnResponse为SYSTEM_BUSY,也就是broker busy, start flow control

    private void cleanExpiredRequest() {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),//queue大小10000
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());//200ms

        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),//queue大小100000
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());//5*1000

        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),//queue大小50000
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());//31 * 1000

        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this//queue大小500000
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());//3 * 1000
    }

cleanExpiredRequestInQueue每个200ms就会将其从SendThreadPoolQueue移除,其他Queue以及waitTime如上
RequestTask 设置stopRun状态为true,同时设置returnResponse为SYSTEM_BUSY,也就是broker busy, start flow control

    void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                if (!blockingQueue.isEmpty()) {
                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }

                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }

isOSPageCacheBusy,只要当前时间与上一条消息存储时的开始时间大于1000ms,小于10000ms

  public boolean isOSPageCacheBusy() {
      long begin = this.getCommitLog().getBeginTimeInLock();
      long diff = this.systemClock.now() - begin;

      return diff < 10000000
          && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();//1000ms
  }

另外返回SYSTEM_BUSY,客户端不会重试

@2pc 2pc added the RocketMQ label Feb 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant