Skip to content

Commit

Permalink
[ISSUE #7464] Polish the pop logger format (#7465)
Browse files Browse the repository at this point in the history
* Doc: How to debug in Idea

* en

* polish

* polish log
  • Loading branch information
joeCarf authored Oct 16, 2023
1 parent 0f01df4 commit 3a035d7
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
(System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult);
}
Expand Down Expand Up @@ -319,7 +319,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
// offset self amend
while (true) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
Expand Down Expand Up @@ -351,7 +351,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
noMsgCount = 0;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}
for (MessageExt messageExt : messageExts) {
Expand All @@ -373,7 +373,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
POP_LOGGER.info("reviveQueueId={}, find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
Expand Down Expand Up @@ -465,15 +465,15 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {

protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
ArrayList<PopCheckPoint> sortList = consumeReviveObj.genSortList();
POP_LOGGER.info("reviveQueueId={},ck listSize={}", queueId, sortList.size());
POP_LOGGER.info("reviveQueueId={}, ck listSize={}", queueId, sortList.size());
if (sortList.size() != 0) {
POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={} ; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={}; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset());
}
long newOffset = consumeReviveObj.oldOffset;
for (PopCheckPoint popCheckPoint : sortList) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip ck process , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip ck process, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
Expand All @@ -483,12 +483,12 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
// check normal topic, skip ck , if normal topic is not exist
String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
POP_LOGGER.warn("reviveQueueId={}, can not get normal topic {}, then continue", queueId, popCheckPoint.getTopic());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
POP_LOGGER.warn("reviveQueueId={}, can not get cid {}, then continue", queueId, popCheckPoint.getCId());
newOffset = popCheckPoint.getReviveOffset();
continue;
}
Expand Down Expand Up @@ -520,7 +520,7 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl

private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
Expand Down Expand Up @@ -646,7 +646,7 @@ public void run() {
consumeReviveMessage(consumeReviveObj);

if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
continue;
}

Expand All @@ -662,7 +662,7 @@ public void run() {
currentReviveMessageTimestamp = System.currentTimeMillis();
}

POP_LOGGER.info("reviveQueueId={},revive finish,old offset is {}, new offset is {}, ckDelay={} ",
POP_LOGGER.info("reviveQueueId={}, revive finish,old offset is {}, new offset is {}, ckDelay={} ",
queueId, consumeReviveObj.oldOffset, consumeReviveObj.newOffset, delay);

if (sortList == null || sortList.isEmpty()) {
Expand Down

0 comments on commit 3a035d7

Please sign in to comment.