Skip to content

Commit

Permalink
[ISSUE #7634] Introduce controllableOffset to prevent unnecessary sus…
Browse files Browse the repository at this point in the history
…pension when OFFSET_ILLEGAL (#7635)

* Add controllableOffset to prevent unnecessary suspension when OFFSET_ILLEGAL
  • Loading branch information
drpmma authored Dec 12, 2023
1 parent f7a6d0b commit a376fbc
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.consumer.store;

import java.util.concurrent.atomic.AtomicLong;

/**
* The ControllableOffset class encapsulates a thread-safe offset value that can be
* updated atomically. Additionally, this class allows for the offset to be "frozen,"
* which prevents further updates after the freeze operation has been performed.
* <p>
* Concurrency Scenarios:
* If {@code updateAndFreeze} is called before any {@code update} operations, it sets
* {@code allowToUpdate} to false and updates the offset to the target value specified.
* After this operation, further invocations of {@code update} will not affect the offset,
* as it is considered frozen.
* <p>
* If {@code update} is in progress while {@code updateAndFreeze} is invoked concurrently,
* the final outcome depends on the sequence of operations:
* 1. If {@code update}'s atomic update operation completes before {@code updateAndFreeze},
* the latter will overwrite the offset and set {@code allowToUpdate} to false,
* preventing any further updates.
* 2. If {@code updateAndFreeze} executes before the {@code update} finalizes its operation,
* the ongoing {@code update} will not proceed with its changes. The {@link AtomicLong#getAndUpdate}
* method used in both operations ensures atomicity and respects the final state imposed by
* {@code updateAndFreeze}, even if the {@code update} function has already begun.
* <p>
* In essence, once the {@code updateAndFreeze} operation is executed, the offset value remains
* immutable to any subsequent {@code update} calls due to the immediate visibility of the
* {@code allowToUpdate} state change, courtesy of its volatile nature.
* <p>
* The combination of an AtomicLong for the offset value and a volatile boolean flag for update
* control provides a reliable mechanism for managing offset values in concurrent environments.
*/
public class ControllableOffset {
// Holds the current offset value in an atomic way.
private final AtomicLong value;
// Controls whether updates to the offset are allowed.
private volatile boolean allowToUpdate;

public ControllableOffset(long value) {
this.value = new AtomicLong(value);
this.allowToUpdate = true;
}

/**
* Attempts to update the offset to the target value. If increaseOnly is true,
* the offset will not be decreased. The update operation is atomic and thread-safe.
* The operation will respect the current allowToUpdate state, and if the offset
* has been frozen by a previous call to {@link #updateAndFreeze(long)},
* this method will not update the offset.
*
* @param target the new target offset value.
* @param increaseOnly if true, the offset will only be updated if the target value
* is greater than the current value.
*/
public void update(long target, boolean increaseOnly) {
if (allowToUpdate) {
value.getAndUpdate(val -> {
if (allowToUpdate) {
if (increaseOnly) {
return Math.max(target, val);
} else {
return target;
}
} else {
return val;
}
});
}
}

/**
* Overloaded method for updating the offset value unconditionally.
*
* @param target The new target value for the offset.
*/
public void update(long target) {
update(target, false);
}

/**
* Freezes the offset at the target value provided. Once frozen, the offset
* cannot be updated by subsequent calls to {@link #update(long, boolean)}.
* This method will set allowToUpdate to false and then update the offset,
* ensuring the new value is the final state of the offset.
*
* @param target the new target offset value to freeze at.
*/
public void updateAndFreeze(long target) {
value.getAndUpdate(val -> {
allowToUpdate = false;
return target;
});
}

public long getOffset() {
return value.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class LocalFileOffsetStore implements OffsetStore {
private final MQClientInstance mQClientFactory;
private final String groupName;
private final String storePath;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable =
new ConcurrentHashMap<>();

public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
Expand All @@ -63,10 +63,9 @@ public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName)
public void load() throws MQClientException {
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
AtomicLong offset = mqEntry.getValue();
offsetTable.put(mqEntry.getKey(), new ControllableOffset(offset.get()));
log.info("load consumer's offset, {} {} {}",
this.groupName,
mqEntry.getKey(),
Expand All @@ -78,30 +77,38 @@ public void load() throws MQClientException {
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
ControllableOffset offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
offsetOld = this.offsetTable.putIfAbsent(mq, new ControllableOffset(offset));
}

if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
offsetOld.update(offset, true);
} else {
offsetOld.set(offset);
offsetOld.update(offset);
}
}
}
}

@Override
public void updateAndFreezeOffset(MessageQueue mq, long offset) {
if (mq != null) {
this.offsetTable.computeIfAbsent(mq, k -> new ControllableOffset(offset))
.updateAndFreeze(offset);
}
}

@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
ControllableOffset offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
return offset.getOffset();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
Expand Down Expand Up @@ -135,9 +142,9 @@ public void persistAll(Set<MessageQueue> mqs) {
return;

OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
AtomicLong offset = new AtomicLong(entry.getValue().getOffset());
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
}
}
Expand Down Expand Up @@ -170,12 +177,12 @@ public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset
@Override
public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
Map<MessageQueue, Long> cloneOffsetTable = new HashMap<>(this.offsetTable.size(), 1);
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
continue;
}
cloneOffsetTable.put(mq, entry.getValue().get());
cloneOffsetTable.put(mq, entry.getValue().getOffset());

}
return cloneOffsetTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public interface OffsetStore {
*/
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

/**
* Update and freeze the message queue to prevent concurrent update action
*
* @param mq target message queue
* @param offset expect update offset
*/
void updateAndFreezeOffset(final MessageQueue mq, final long offset);

/**
* Get offset from local storage
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.exception.OffsetNotFoundException;
Expand All @@ -31,11 +30,11 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

/**
* Remote storage implementation
Expand All @@ -44,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
private final static Logger log = LoggerFactory.getLogger(RemoteBrokerOffsetStore.class);
private final MQClientInstance mQClientFactory;
private final String groupName;
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
private ConcurrentMap<MessageQueue, ControllableOffset> offsetTable =
new ConcurrentHashMap<>();

public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
Expand All @@ -59,30 +58,38 @@ public void load() {
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
ControllableOffset offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
offsetOld = this.offsetTable.putIfAbsent(mq, new ControllableOffset(offset));
}

if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
offsetOld.update(offset, true);
} else {
offsetOld.set(offset);
offsetOld.update(offset);
}
}
}
}

@Override
public void updateAndFreezeOffset(MessageQueue mq, long offset) {
if (mq != null) {
this.offsetTable.computeIfAbsent(mq, k -> new ControllableOffset(offset))
.updateAndFreeze(offset);
}
}

@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
if (mq != null) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
AtomicLong offset = this.offsetTable.get(mq);
ControllableOffset offset = this.offsetTable.get(mq);
if (offset != null) {
return offset.get();
return offset.getOffset();
} else if (ReadOffsetType.READ_FROM_MEMORY == type) {
return -1;
}
Expand Down Expand Up @@ -118,18 +125,18 @@ public void persistAll(Set<MessageQueue> mqs) {

final HashSet<MessageQueue> unusedMQ = new HashSet<>();

for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
ControllableOffset offset = entry.getValue();
if (offset != null) {
if (mqs.contains(mq)) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
this.updateConsumeOffsetToBroker(mq, offset.getOffset());
log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
offset.getOffset());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
Expand All @@ -149,15 +156,15 @@ public void persistAll(Set<MessageQueue> mqs) {

@Override
public void persist(MessageQueue mq) {
AtomicLong offset = this.offsetTable.get(mq);
ControllableOffset offset = this.offsetTable.get(mq);
if (offset != null) {
try {
this.updateConsumeOffsetToBroker(mq, offset.get());
this.updateConsumeOffsetToBroker(mq, offset.getOffset());
log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
offset.getOffset());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
Expand All @@ -175,12 +182,12 @@ public void removeOffset(MessageQueue mq) {
@Override
public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
Map<MessageQueue, Long> cloneOffsetTable = new HashMap<>(this.offsetTable.size(), 1);
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
for (Map.Entry<MessageQueue, ControllableOffset> entry : this.offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
continue;
}
cloneOffsetTable.put(mq, entry.getValue().get());
cloneOffsetTable.put(mq, entry.getValue().getOffset());
}
return cloneOffsetTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,24 +404,25 @@ public void onSuccess(PullResult pullResult) {
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
DefaultMQPushConsumerImpl.this.executeTask(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.updateAndFreezeOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset());

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

// removeProcessQueue will also remove offset to cancel the frozen status.
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
});
break;
default:
break;
Expand Down Expand Up @@ -705,6 +706,10 @@ public void executeTaskLater(final Runnable r, final long timeDelay) {
this.mQClientFactory.getPullMessageService().executeTaskLater(r, timeDelay);
}

public void executeTask(final Runnable r) {
this.mQClientFactory.getPullMessageService().executeTask(r);
}

public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
Expand Down
Loading

0 comments on commit a376fbc

Please sign in to comment.