Skip to content

Commit

Permalink
https://github.com/chubaostream/joyqueue/issues/322
Browse files Browse the repository at this point in the history
archive issue: consume log exception jd-opensource#322

归档功能异常:消费归档功能里,在批量进行读取消费记录日志文件的时候,如果读取是在两个文件里进行的(比如批量1000条,500条读取完后再进行下一个文件的500条读取),操作hbase存储如果出现异常的话进行位置回退处理,此时的回退处理是对1000条整体位置的偏移量进行回退,但偏移量横跨两个文件进行读取计算的,所以会导致回退偏移量设置异常,从而影响整个消费记录日志的存储线程。
  • Loading branch information
majun87 committed Dec 12, 2020
1 parent 55fb6a3 commit 2bfef81
Show file tree
Hide file tree
Showing 18 changed files with 926 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
* Created by chengzhiliang on 2018/12/6.
*/
public class ArchiveConfig {
public static final String LOG_DETAIL_PRODUCE_PREFIX = "produce.";
public static final String LOG_DETAIL_CONSUME_PREFIX = "consume.";

private static final String ARCHIVE_PATH ="/archive/";
private PropertySupplier propertySupplier;
private String archivePath;
Expand Down Expand Up @@ -60,6 +63,13 @@ public void setPath(String path) {
}
}

public boolean getLogDetail(String archiveType, String brokerId) {
return (boolean) PropertySupplier.getValue(propertySupplier,
ArchiveConfigKey.ARCHIVE_TRACE_LOG.getName() + archiveType + brokerId,
ArchiveConfigKey.ARCHIVE_TRACE_LOG.getType(),
ArchiveConfigKey.ARCHIVE_TRACE_LOG.getValue());
}

public int getConsumeBatchNum() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.CONSUME_BATCH_NUM);
}
Expand Down Expand Up @@ -91,6 +101,10 @@ public String getNamespace() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_STORE_NAMESPACE);
}

public int getStoreFialedRetryCount() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_STORE_RETRY_COUNT);
}

public String getTracerType() {
return PropertySupplier.getValue(propertySupplier, BrokerConfigKey.TRACER_TYPE);
}
Expand All @@ -102,4 +116,8 @@ public boolean isReamingEnable() {
public boolean isBacklogEnable() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_BACKLOG_ENABLE);
}

public int getLogRetainDuration() {
return PropertySupplier.getValue(propertySupplier, ArchiveConfigKey.ARCHIVE_LOG_RETAIN_DURATION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ public enum ArchiveConfigKey implements PropertyDef {
ARCHIVE_SWITCH("archive.switch", false, Type.BOOLEAN),
ARCHIVE_THREAD_POOL_QUEUE_SIZE("archive.thread.pool.queue.size", 10, Type.INT),
ARCHIVE_STORE_NAMESPACE("archive.store.namespace", "joyqueue", Type.STRING),
ARCHIVE_REAMING_ENABLE("archive.reaming.enable", false, Type.BOOLEAN),
ARCHIVE_STORE_RETRY_COUNT("archive.store.retry.count", 3, Type.INT),
ARCHIVE_REAMING_ENABLE("archive.reaming.enable", true, Type.BOOLEAN),
ARCHIVE_BACKLOG_ENABLE("archive.backlog.enable", false, Type.BOOLEAN),

ARCHIVE_TRACE_LOG("archive.trace.log.", false, Type.BOOLEAN),
ARCHIVE_LOG_RETAIN_DURATION("archive.log.retain.duration", 24, Type.INT)
;

private String name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.joyqueue.broker.archive;

import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.BrokerContext;
import org.joyqueue.broker.consumer.ConsumeConfig;
import org.joyqueue.broker.consumer.ConsumeConfigKey;
import org.joyqueue.broker.limit.RateLimiter;
import org.joyqueue.broker.limit.support.AbstractSubscribeRateLimiterManager;
import org.joyqueue.broker.producer.ProduceConfig;
import org.joyqueue.broker.producer.ProducerConfigKey;
import org.joyqueue.domain.Config;
import org.joyqueue.domain.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* @author majun8
*/
public class ArchiveRateLimiterManager extends AbstractSubscribeRateLimiterManager {
protected static final Logger LOG = LoggerFactory.getLogger(ArchiveRateLimiterManager.class);

private ProduceConfig produceConfig;
private ConsumeConfig consumeConfig;

public ArchiveRateLimiterManager(BrokerContext context) {
super(context);
this.produceConfig = new ProduceConfig(context != null ? context.getPropertySupplier() : null);;
this.consumeConfig = new ConsumeConfig(context != null ? context.getPropertySupplier() : null);;
}

@Override
public int producerLimitRate(String topic, String app) {
int archiveRate = produceConfig.getArchiveRate(topic, app);
if(archiveRate <= 0) {
// get broker level retry rate
archiveRate = produceConfig.getArchiveRate();
}
return archiveRate;
}

@Override
public int consumerLimitRate(String topic, String app) {
int archiveRate = consumeConfig.getArchiveRate(topic, app);
if(archiveRate <= 0) {
// get broker level retry rate
archiveRate = consumeConfig.getArchiveRate();
}
return archiveRate;
}

@Override
public void cleanRateLimiter(Config config) {
String configKey = config.getKey();
if (StringUtils.isBlank(configKey)) {
return;
}

if (StringUtils.equals(configKey, ProducerConfigKey.PRODUCE_ARCHIVE_RATE.getName())) {
for (Map.Entry<String, ConcurrentMap<String, RateLimiter>> topic : subscribeRateLimiters.entrySet()) {
Iterator<Map.Entry<String, RateLimiter>> subLimiters = topic.getValue().entrySet().iterator();
while (subLimiters.hasNext()) {
Map.Entry<String, RateLimiter> subLimiter = subLimiters.next();
String subscribe = subLimiter.getKey();
if (StringUtils.contains(subscribe, Subscription.Type.PRODUCTION.name() + SPLIT)) {
subLimiters.remove();
}
}
}
} else if (StringUtils.startsWith(configKey, ProducerConfigKey.PRODUCE_ARCHIVE_RATE_PREFIX.getName())) {
String[] keys = StringUtils.split(configKey, "\\.");
if (keys != null && keys.length == 4) {
String topic = keys[2];
String app = keys[3];
if (topic != null && app != null) {
cleanRateLimiter(topic, app, Subscription.Type.PRODUCTION);
}
}
}

if (StringUtils.equals(configKey, ConsumeConfigKey.CONSUME_ARCHIVE_RATE.getName())) {
for (Map.Entry<String, ConcurrentMap<String, RateLimiter>> topic : subscribeRateLimiters.entrySet()) {
Iterator<Map.Entry<String, RateLimiter>> subLimiters = topic.getValue().entrySet().iterator();
while (subLimiters.hasNext()) {
Map.Entry<String, RateLimiter> subLimiter = subLimiters.next();
String subscribe = subLimiter.getKey();
if (StringUtils.contains(subscribe, Subscription.Type.CONSUMPTION.name() + SPLIT)) {
subLimiters.remove();
}
}
}
} else if (StringUtils.startsWith(configKey, ConsumeConfigKey.CONSUME_ARCHIVE_RATE_PREFIX.getName())) {
String[] keys = StringUtils.split(configKey, "\\.");
if (keys != null && keys.length == 4) {
String topic = keys[2];
String app = keys[3];
if (topic != null && app != null) {
cleanRateLimiter(topic, app, Subscription.Type.CONSUMPTION);
}
}
}
}
}
Loading

0 comments on commit 2bfef81

Please sign in to comment.