Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-322] Fix archive consume log exception #323

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public String toString() {
public static class TopicPolicy implements Serializable {
private Long storeMaxTime;
private Boolean storeCleanKeepUnconsumed;
private Integer produceArchiveTps = -1;
private Integer consumeArchiveTps = -1;
private Map<String, String> params;

public Long getStoreMaxTime() {
Expand All @@ -139,6 +141,22 @@ public Boolean getStoreCleanKeepUnconsumed() {
return storeCleanKeepUnconsumed;
}

public Integer getProduceArchiveTps() {
return produceArchiveTps;
}

public void setProduceArchiveTps(Integer produceArchiveTps) {
this.produceArchiveTps = produceArchiveTps;
}

public Integer getConsumeArchiveTps() {
return consumeArchiveTps;
}

public void setConsumeArchiveTps(Integer consumeArchiveTps) {
this.consumeArchiveTps = consumeArchiveTps;
}

public void setParams(Map<String, String> params) {
this.params = params;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ public static Map<String, String> toStringMap(final String text) throws IOExcept
return new HashMap(properties);
}

public static String readString(final byte[] bytes) {
return bytes == null ? null : readString(bytes, 0, bytes.length);
}

public static String readString(byte[] bytes, int offset, int length) {
if (bytes == null) {
return null;
} else {
return length == 0 ? "" : new String(bytes, offset, length, Charset.forName("UTF-8"));
}
}

/**
* 读取字符串,字符长度&lt;=255
*
Expand Down
10 changes: 7 additions & 3 deletions joyqueue-console/joyqueue-data/joyqueue-data-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</dependency>
<!--<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-hbase</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-nsr-core</artifactId>
Expand All @@ -124,10 +128,10 @@
<artifactId>joyqueue-archive-api</artifactId>
</dependency>
<!-- FIXME: 应该依赖joyqueue-archive-api 而不是这个joyqueue-archive-hbase实现-->
<dependency>
<!--<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-archive-hbase</artifactId>
</dependency>
</dependency>-->
<dependency>
<groupId>org.joyqueue</groupId>
<artifactId>joyqueue-client-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.convert.CodeConverter;
import org.joyqueue.convert.NsrConsumerConverter;
import org.joyqueue.domain.ClientType;
Expand All @@ -26,7 +27,6 @@
import org.joyqueue.nsr.ConsumerNameServerService;
import org.joyqueue.nsr.NameServerBase;
import org.joyqueue.nsr.model.ConsumerQuery;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.joyqueue.model.domain.User;
import org.joyqueue.model.query.QApplication;
import org.joyqueue.model.query.QArchive;
import org.joyqueue.server.archive.store.QueryCondition;
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.server.archive.store.model.SendLog;
import org.joyqueue.server.archive.store.query.QueryCondition;
import org.joyqueue.service.ApplicationService;
import org.joyqueue.service.ArchiveService;
import org.joyqueue.service.TopicService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

import com.alibaba.fastjson.JSON;
import org.joyqueue.broker.archive.ArchiveUtils;
import org.joyqueue.broker.buffer.Serializer;
import org.joyqueue.util.serializer.Serializer;
import org.joyqueue.exception.ServiceException;
import org.joyqueue.handler.error.ErrorCode;
import org.joyqueue.message.SourceType;
import org.joyqueue.server.archive.store.HBaseSerializer;
import org.joyqueue.server.archive.store.utils.ArchiveSerializer;
import org.joyqueue.server.retry.model.RetryMessageModel;
import org.joyqueue.handler.Constants;
import org.joyqueue.service.MessagePreviewService;
Expand All @@ -45,7 +45,6 @@
import com.jd.laf.web.vertx.response.Responses;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -200,10 +199,10 @@ public BrokerMessage filterBrokerMessage(BrokerMessage brokerMessage, SendLog se
}
for(BrokerMessage m:msgs){
String msgId=ArchiveUtils.messageId(brokerMessage.getTopic(),m.getPartition(),m.getMsgIndexNo());
byte[] msgIdMd5Bytes=HBaseSerializer.md5(msgId,null);
byte[] msgIdMd5Bytes= ArchiveSerializer.md5(msgId,null);
if(logger.isDebugEnabled()) {
logger.debug("current message business id {},message id {},md5 length {},base 64 bytes {},hex {}", m.getBusinessId(), msgId, msgIdMd5Bytes.length,
Base64.getEncoder().encodeToString(msgIdMd5Bytes), HBaseSerializer.byteArrayToHexStr(msgIdMd5Bytes));
Base64.getEncoder().encodeToString(msgIdMd5Bytes), ArchiveSerializer.byteArrayToHexStr(msgIdMd5Bytes));
}
if(Arrays.equals(msgIdMd5Bytes,sendLog.getBytesMessageId())){
return m;
Expand All @@ -221,7 +220,7 @@ public String preview(BrokerMessage brokerMessage,String messageType){
return messagePreviewService.preview(messageType, brokerMessage.getDecompressedBody());
} catch (Throwable e) {
logger.error("parse error",e);
return Bytes.toString(brokerMessage.getDecompressedBody());
return Serializer.readString(brokerMessage.getDecompressedBody());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.jd.laf.web.vertx.annotation.QueryParam;
import com.jd.laf.web.vertx.response.Response;
import com.jd.laf.web.vertx.response.Responses;
import org.apache.commons.net.telnet.TelnetClient;
import org.joyqueue.handler.annotation.PageQuery;
import org.joyqueue.handler.error.ConfigException;
import org.joyqueue.handler.routing.command.NsrCommandSupport;
Expand All @@ -29,7 +30,6 @@
import org.joyqueue.model.domain.Broker;
import org.joyqueue.model.query.QBroker;
import org.joyqueue.service.BrokerService;
import org.apache.commons.net.telnet.TelnetClient;

import static org.joyqueue.handler.Constants.ID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.joyqueue.server.archive.store;
package org.joyqueue.server.archive.store.query;

import org.joyqueue.server.archive.store.model.Query;
import org.joyqueue.server.archive.store.utils.ArchiveSerializer;

import java.util.Arrays;

/**
* Created by chengzhiliang on 2018/12/4.
Expand Down Expand Up @@ -72,10 +75,21 @@ public byte[] getStartRowKeyByteArr() {
}

public void setStartRowKeyByteArr(String startRowKeyByteArr) {
byte[] bytes = HBaseSerializer.hexStrToByteArray(startRowKeyByteArr);
byte[] bytes = ArchiveSerializer.hexStrToByteArray(startRowKeyByteArr);
this.startRowKeyByteArr = bytes;
}

@Override
public String toString() {
return "QueryCondition{" +
"startRowKey=" + startRowKey +
", stopRowKey=" + stopRowKey +
", count=" + count +
", rowKey=" + rowKey +
", startRowKeyByteArr=" + Arrays.toString(startRowKeyByteArr) +
'}';
}

/**
* 查询RowKey
*/
Expand Down Expand Up @@ -117,5 +131,14 @@ public void setMessageId(String messageId) {
this.messageId = messageId;
}

@Override
public String toString() {
return "RowKey{" +
"topic='" + topic + '\'' +
", time=" + time +
", businessId='" + businessId + '\'' +
", messageId='" + messageId + '\'' +
'}';
}
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,15 @@
/**
* Copyright 2019 The JoyQueue Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.joyqueue.server.archive.store;
package org.joyqueue.server.archive.store.utils;

import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.server.archive.store.model.SendLog;
import org.joyqueue.toolkit.lang.Pair;
import org.joyqueue.toolkit.security.Md5;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;

/**
* Created by chengzhiliang on 2018/12/12.
*/
public class HBaseSerializer {
public class ArchiveSerializer {

public static ConsumeLog readConsumeLog(Pair<byte[], byte[]> pair) {
ConsumeLog log = new ConsumeLog();
Expand Down Expand Up @@ -247,7 +230,7 @@ public static SendLog readSendLog4BizId(Pair<byte[], byte[]> pair) {
*
**/
public static byte[] md5(String content,byte[] key) throws GeneralSecurityException {
return Md5.INSTANCE.encrypt(content.getBytes(Charset.forName("utf-8")), key);
return Md5.INSTANCE.encrypt(content.getBytes(Charset.forName("utf-8")), key);
}

public static String byteArrayToHexStr(byte[] byteArray) {
Expand Down Expand Up @@ -278,4 +261,23 @@ public static byte[] hexStrToByteArray(String str) {
}
return byteArray;
}

public static byte[] reverse(byte[] byteArray) {
if (byteArray == null || byteArray.length == 0) {
return byteArray;
}
byte[] reverseArray = new byte[byteArray.length];
for (int i = 0; i < byteArray.length; i++) {
reverseArray[i] = byteArray[byteArray.length - i - 1];
}
return reverseArray;
}

public static byte[] reverse(ByteBuffer buffer) {
return reverse(buffer.array());
}

public static String reverse(String reverseStr) {
return new StringBuffer(reverseStr).reverse().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.joyqueue.server.archive.store.api.ArchiveStore;
import org.joyqueue.monitor.PointTracer;
import org.joyqueue.server.archive.store.model.*;
import org.joyqueue.server.archive.store.query.QueryCondition;
import org.joyqueue.server.archive.store.utils.ArchiveSerializer;
import org.joyqueue.toolkit.lang.Pair;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.security.Md5;
Expand Down Expand Up @@ -118,7 +120,7 @@ public void putConsumeLog(List<ConsumeLog> consumeLogList, PointTracer tracer) t
int appId = topicAppMapping.getAppId(app);
consumeLog.setAppId(appId);

Pair<byte[], byte[]> pair = HBaseSerializer.convertConsumeLogToKVBytes(consumeLog);
Pair<byte[], byte[]> pair = ArchiveSerializer.convertConsumeLogToKVBytes(consumeLog);

logList.add(pair);
}
Expand All @@ -145,10 +147,10 @@ public void putSendLog(List<SendLog> sendLogList, PointTracer tracer) throws Joy
log.setTopicId(topicId);
log.setAppId(appId);

Pair<byte[], byte[]> pair = HBaseSerializer.convertSendLogToKVBytes(log);
Pair<byte[], byte[]> pair = ArchiveSerializer.convertSendLogToKVBytes(log);
logList.add(pair);

Pair<byte[], byte[]> pair4BizId = HBaseSerializer.convertSendLogToKVBytes4BizId(log);
Pair<byte[], byte[]> pair4BizId = ArchiveSerializer.convertSendLogToKVBytes4BizId(log);
logList.add(pair4BizId);

}
Expand Down Expand Up @@ -233,13 +235,13 @@ public List<SendLog> scanSendLog(Query query) throws JoyQueueException {
for (Pair<byte[], byte[]> pair : scan) {
SendLog log;
if (hasBizId) {
log = HBaseSerializer.readSendLog4BizId(pair);
log = ArchiveSerializer.readSendLog4BizId(pair);
} else {
log = HBaseSerializer.readSendLog(pair);
log = ArchiveSerializer.readSendLog(pair);
}

log.setClientIpStr(toIpString(log.getClientIp()));
log.setRowKeyStart(HBaseSerializer.byteArrayToHexStr(pair.getKey()));
log.setRowKeyStart(ArchiveSerializer.byteArrayToHexStr(pair.getKey()));
String topicName = topicAppMapping.getTopicName(log.getTopicId());
log.setTopic(topicName);

Expand Down Expand Up @@ -412,13 +414,13 @@ public SendLog getOneSendLog(Query query) throws JoyQueueException {
allocate.putInt(topicAppMapping.getTopicId(rowKey.getTopic()));
allocate.putLong(rowKey.getTime());
allocate.put(Md5.INSTANCE.encrypt(rowKey.getBusinessId().getBytes(Charset.forName("utf-8")), null));
allocate.put(HBaseSerializer.hexStrToByteArray(rowKey.getMessageId()));
allocate.put(ArchiveSerializer.hexStrToByteArray(rowKey.getMessageId()));
// rowKey
byte[] bytesRowKey = allocate.array();

Pair<byte[], byte[]> bytes = hBaseClient.getKV(namespace, sendLogTable, cf, col, bytesRowKey);

SendLog log = HBaseSerializer.readSendLog(bytes);
SendLog log = ArchiveSerializer.readSendLog(bytes);

StringBuilder clientIp = new StringBuilder();
IpUtil.toAddress(log.getClientIp(), clientIp);
Expand Down Expand Up @@ -452,7 +454,7 @@ public List<ConsumeLog> scanConsumeLog(String messageId, Integer count) throws J
scanParameters.setCf(cf);
scanParameters.setCol(col);

byte[] messageIdBytes = HBaseSerializer.hexStrToByteArray(messageId);
byte[] messageIdBytes = ArchiveSerializer.hexStrToByteArray(messageId);
scanParameters.setStartRowKey(messageIdBytes);

ByteBuffer bytebuffer = ByteBuffer.allocate(messageIdBytes.length + 1);
Expand All @@ -465,8 +467,8 @@ public List<ConsumeLog> scanConsumeLog(String messageId, Integer count) throws J
List<Pair<byte[], byte[]>> scan = hBaseClient.scan(namespace, scanParameters);

for (Pair<byte[], byte[]> pair : scan) {
ConsumeLog log = HBaseSerializer.readConsumeLog(pair);
log.setMessageId(HBaseSerializer.byteArrayToHexStr(log.getBytesMessageId()));
ConsumeLog log = ArchiveSerializer.readConsumeLog(pair);
log.setMessageId(ArchiveSerializer.byteArrayToHexStr(log.getBytesMessageId()));

StringBuilder clientIp = new StringBuilder();
IpUtil.toAddress(log.getClientIp(), clientIp);
Expand Down
Loading