diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/DefaultDelayLogFacade.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/DefaultDelayLogFacade.java index e90a8011..cc6577de 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/DefaultDelayLogFacade.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/DefaultDelayLogFacade.java @@ -28,6 +28,7 @@ import qunar.tc.qmq.delay.store.log.*; import qunar.tc.qmq.delay.store.model.AppendLogResult; import qunar.tc.qmq.delay.store.model.LogRecord; +import qunar.tc.qmq.delay.store.model.MessageLogRecord; import qunar.tc.qmq.delay.store.model.RawMessageExtend; import qunar.tc.qmq.delay.store.model.ScheduleSetRecord; import qunar.tc.qmq.delay.wheel.WheelLoadCursor; @@ -62,7 +63,7 @@ public DefaultDelayLogFacade(final StoreConfiguration config, final Function { + bus.subscribe(MessageLogRecord.class, e -> { AppendLogResult result = appendScheduleLog(e); int code = result.getCode(); if (MessageProducerCode.SUCCESS != code) { @@ -71,7 +72,7 @@ public DefaultDelayLogFacade(final StoreConfiguration config, final Function { + bus.subscribe(MessageLogRecord.class, e -> { long checkpoint = e.getStartWroteOffset() + e.getRecordSize(); updateIterateOffset(checkpoint); });