Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consume event archive log reader #288

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh
#
# Copyright 2019 The JoyQueue Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


exec $(dirname $0)/run-class.sh org.joyqueue.tools.archive.ConsumeArchieveLogReader "$@"
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public interface ArchiveStore extends LifeCycle {
*
* @param topic
* @param partition
* @return
* @return current archived position
* @throws JoyQueueException
*/
Long getPosition(String topic, short partition) throws JoyQueueException;
Expand All @@ -78,7 +78,7 @@ public interface ArchiveStore extends LifeCycle {
* 查看发送日志
*
* @param query
* @return
* @return send logs
* @throws JoyQueueException
*/
List<SendLog> scanSendLog(Query query) throws JoyQueueException;
Expand All @@ -87,7 +87,7 @@ public interface ArchiveStore extends LifeCycle {
* 查看一条发送日志
*
* @param query
* @return
* @return send log
* @throws JoyQueueException
*/
SendLog getOneSendLog(Query query) throws JoyQueueException;
Expand All @@ -97,7 +97,7 @@ public interface ArchiveStore extends LifeCycle {
*
* @param messageId
* @param count
* @return
* @return consume logs
* @throws JoyQueueException
*/
List<ConsumeLog> scanConsumeLog(String messageId,Integer count) throws JoyQueueException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ public List<ConsumeLog> scanConsumeLog(String messageId, Integer count) throws J
logger.error("hBaseClient is null,archive no service");
throw new JoyQueueException(JoyQueueCode.CN_SERVICE_NOT_AVAILABLE, "hBaseClient is null");
}
// 查询消费日志(rowkey=messageId+appId)
// 查询消费日志(rowkey = messageId+appId)
List<ConsumeLog> logList = new LinkedList<>();
// 查询发送日志(rowkey=topicId+sendTime+businessId)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ private static int consumeLogSize(ConsumeLog consumeLog) {
return size;
}

/**
* length + consumeLog bytes
**/
public static ConsumeLog tryRead(ByteBuffer buffer){
int len= buffer.getInt();
if(len<=0||buffer.remaining()<len){
return null;
}
return read(buffer);
}

/**
* @param buffer
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ private List<ConsumeLog> convert(Connection connection, MessageLocation[] locati
ConsumeLog log = new ConsumeLog();

byte[] bytesMsgId = buildMessageId(location);

log.setBytesMessageId(bytesMsgId);

log.setApp(connection.getApp());
Expand All @@ -265,7 +266,7 @@ private List<ConsumeLog> convert(Connection connection, MessageLocation[] locati
}

/**
* 构造消息Id
* 构造消息Id md5 定长
*
* @param location 应答位置信息
* @return
Expand Down Expand Up @@ -293,7 +294,7 @@ private synchronized void appendLog(ByteBuffer buffer) {
/**
* 本地日志日志文件存储
*/
static class ArchiveMappedFileRepository implements Closeable {
public static class ArchiveMappedFileRepository implements Closeable {
// 消费归档文件本地根存储路径
private String baseDir;

Expand Down Expand Up @@ -322,7 +323,7 @@ static class ArchiveMappedFileRepository implements Closeable {
// 单个归档文件的大小
private final long pageSize = 1024 * 1024 * 16; // 16M

ArchiveMappedFileRepository(String baseDir) {
public ArchiveMappedFileRepository(String baseDir) {
this.baseDir = baseDir;
recover();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ protected void validate() throws Exception {
this.updateItemThread = LoopThread.builder()
.sleepTime(1000 * 10, 1000 * 10)
.name("UpdateArchiveItem-Thread")
.onException(e -> logger.warn("Exception:", e))
.onException(e -> logger.warn("Produce archive service update item thread Exception:", e))
.doWork(() -> {
// 更新item列表
updateArchiveItem();
Expand All @@ -145,7 +145,7 @@ protected void validate() throws Exception {
this.readMsgThread = LoopThread.builder()
.sleepTime(0, 10)
.name("ReadArchiveMsg-Thread")
.onException(e -> logger.warn("Exception:", e))
.onException(e -> logger.warn("Produce archive service read message thread Exception:", e))
.doWork(() -> {
// 消费接口读取消息,放入队列
readArchiveMsg();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/**
* Copyright 2019 The JoyQueue Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.joyqueue.tools.archive;

import org.joyqueue.broker.archive.ArchiveSerializer;
import org.joyqueue.server.archive.store.HBaseSerializer;
import org.joyqueue.server.archive.store.model.ConsumeLog;
import org.joyqueue.toolkit.network.IpUtil;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;

/**
*
* Consume archive log reader
*
**/
public class ConsumeArchiveLogReader {

private static final byte RECORD_START_FLAG= Byte.MIN_VALUE;
private static final byte RECORD_END_FLAG= Byte.MAX_VALUE;
// use larger buffer size than consume log file
private static final int bufferSize=32*1024*1024;
private ByteBuffer buf= ByteBuffer.allocate(bufferSize);
private static final DateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args){
if(args.length < 5){
throw new IllegalArgumentException("args too less(base path,start time,end time,message id)," +
"such as /archive 1596084370000 1596123970000 ssss F7C0D91A52D59F4E1B05C8273EF41B69");
}
String basePath=args[0];
long startLastModifiedTimeMs=Long.parseLong(args[1]);
long endLastModifiedTimeMs=Long.parseLong(args[2]);
String topic=args[3];
// message id md5 hex
String messageId=args[4];
System.out.println(String.format("on %s,Scan for topic:%s,message id %s in %s-%s",basePath,topic,messageId,
sdf.format(new Date(startLastModifiedTimeMs)),
sdf.format(new Date(endLastModifiedTimeMs))));
File consumeLogParentFile=new File(basePath);
if(!consumeLogParentFile.isDirectory()){
throw new IllegalArgumentException(String.format("%s is not directory",consumeLogParentFile));
}
File[] allConsumeLogs=consumeLogParentFile.listFiles();
ConsumeArchiveLogReader reader=new ConsumeArchiveLogReader(messageId);
Arrays.sort(allConsumeLogs, Comparator.comparingLong(reader::FileToLong));
// never read last file
for(int i=0;i<allConsumeLogs.length-1;i++){
File log= allConsumeLogs[i];
if(log.lastModified()>startLastModifiedTimeMs&&log.lastModified()<endLastModifiedTimeMs){
try {
List<ConsumeLog> consumeLogs = reader.readConsumeLogFromFile(log);
reader.onConsumeLogEvent(consumeLogs);
}catch (IOException e){
System.out.println("io exception:"+e);
break;
}
}
}
}
private String messageId;
// message id md5 bytes
private byte[] byteMessageId;
public ConsumeArchiveLogReader(String messageId){
this.messageId=messageId;
this.byteMessageId= HBaseSerializer.hexStrToByteArray(messageId);
}
public long FileToLong(File file){
return Long.parseLong(file.getName());
}

/**
* Consume message log and
**/
public void onConsumeLogEvent(List<ConsumeLog> consumeLogs){
for(ConsumeLog log:consumeLogs){
// consume log don't contain topic
if(Arrays.equals(log.getBytesMessageId(),byteMessageId)){
System.out.println(formatConsumeLog(log));
}
}
}

/**
* Format consume log
**/
public String formatConsumeLog(ConsumeLog log){
StringBuilder clientIp = new StringBuilder();
IpUtil.toAddress(log.getClientIp(), clientIp);
return String.format("Consume event found, %s %s %s %s",messageId,log.getApp(),clientIp,sdf.format(new Date(log.getConsumeTime())));
}


/**
* Read consume log from log file
**/
public List<ConsumeLog> readConsumeLogFromFile(File consumeLog) throws IOException {
if(!consumeLog.exists()){
return null;
}
List<ConsumeLog> consumeLogs=new ArrayList<>();
RandomAccessFile randomAccessFile=null;
FileChannel fileChannel=null;
try {
buf.clear();
randomAccessFile= new RandomAccessFile(consumeLog, "r");
fileChannel= randomAccessFile.getChannel();
while (true) {
try {
//if(readBuffer.)
int readSize = fileChannel.read(buf);
if (readSize > 0) {
buf.flip();
// prepare for read
consumeLogs.addAll(readConsumeLog(consumeLog.getName(),buf));
// process
if (buf.hasRemaining()) {
buf.compact();
}
buf.flip();
} else {
break;
}
} catch (IOException e) {
System.out.println("read exception:" + e);
break;
}catch (IllegalStateException e){
System.out.println("on file: "+consumeLog.getName() + ", " + e);
break;
}
}
}finally {
if(fileChannel!=null) {
fileChannel.close();
}
if(randomAccessFile!=null) {
randomAccessFile.close();
}
}
System.out.println("read consume log on "+consumeLog.getName()+" finished!");
return consumeLogs;
}

/**
* Read consume log from buffer
**/
public List<ConsumeLog> readConsumeLog(String logName,ByteBuffer buffer){
List<ConsumeLog> consumeLogs=new ArrayList<>();
while(buffer.hasRemaining()){
byte flag= buffer.get();
if(flag == RECORD_START_FLAG) {
ConsumeLog log = ArchiveSerializer.tryRead(buffer);
if (log != null) {
consumeLogs.add(log);
}
}else if(flag == RECORD_END_FLAG){
System.out.println(String.format("Meet %s end flag",logName));
break;
}else{
// reset
buffer.position(buffer.position()-1);
throw new IllegalStateException(String.format("Bad format on %s",logName));
}
}
return consumeLogs;
}

}