Skip to content

Commit

Permalink
Improve memory management
Browse files Browse the repository at this point in the history
This commit makes deserialization of Beats messages lazy for messages
sent via the V2 protocol. Instead of deserializing in the parsing block,
the ByteBuf is copied to the Batch instance, which will deserialize when
the data is needed.

Also:
  Fixes an integration test which was failing regularly on Travis

Fixes #299
  • Loading branch information
robbavey committed Feb 14, 2018
1 parent 2f68e50 commit aaed277
Show file tree
Hide file tree
Showing 17 changed files with 503 additions and 231 deletions.
1 change: 0 additions & 1 deletion lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ def create_server

server.enableSSL(ssl_builder)
end

server
end

Expand Down
35 changes: 14 additions & 21 deletions lib/logstash/inputs/beats/message_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,22 @@ def initialize(queue, input)
end

def onNewMessage(ctx, message)
begin
hash = message.getData
ip_address = ip_address(ctx)
hash = message.getData
ip_address = ip_address(ctx)

hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil?
target_field = extract_target_field(hash)
hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil?
target_field = extract_target_field(hash)

if target_field.nil?
event = LogStash::Event.new(hash)
@nocodec_transformer.transform(event)
@queue << event
else
codec(ctx).accept(CodecCallbackListener.new(target_field,
hash,
message.getIdentityStream(),
@codec_transformer,
@queue))
end
rescue => e
logger.warn("Error handling message #{message}", e)
raise e
ensure
message.release
if target_field.nil?
event = LogStash::Event.new(hash)
@nocodec_transformer.transform(event)
@queue << event
else
codec(ctx).accept(CodecCallbackListener.new(target_field,
hash,
message.getIdentityStream(),
@codec_transformer,
@queue))
end
end

Expand Down
1 change: 0 additions & 1 deletion spec/integration/logstash_forwarder_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
f.write(events.join("\n") + "\n")
end
sleep(1) # give some time to the clients to pick up the changes
stop_client
end

after :each do
Expand Down
91 changes: 46 additions & 45 deletions src/main/java/org/logstash/beats/Batch.java
Original file line number Diff line number Diff line change
@@ -1,47 +1,48 @@
package org.logstash.beats;

import java.util.ArrayList;
import java.util.List;

public class Batch {
private byte protocol = Protocol.VERSION_2;
private int batchSize;
private List<Message> messages = new ArrayList();

public List<Message> getMessages() {
return messages;
}

public void addMessage(Message message) {
message.setBatch(this);
messages.add(message);
}

public int size() {
return messages.size();
}

public void setBatchSize(int size) {
batchSize = size;
}

public int getBatchSize() {
return batchSize;
}

public boolean isEmpty() {
return 0 == messages.size();
}

public boolean complete() {
return size() == getBatchSize();
}

public byte getProtocol() {
return protocol;
}

public void setProtocol(byte protocol) {
this.protocol = protocol;
}
}
/**
* Interface representing a Batch of {@link Message}.
*/
public interface Batch extends Iterable<Message>{
/**
* Returns the protocol of the sent messages that this batch was constructed from
* @return byte - either '1' or '2'
*/
byte getProtocol();

/**
* Number of messages that the batch is expected to contain.
* @return int - number of messages
*/
int getBatchSize();

/**
* Set the number of messages that the batch is expected to contain.
* @param batchSize int - number of messages
*/
void setBatchSize(int batchSize);

/**
* Current number of messages in the batch
* @return int
*/
int size();

/**
* Is the batch currently empty?
* @return boolean
*/
boolean isEmpty();

/**
* Is the batch complete?
* @return boolean
*/
boolean isComplete();

/**
* Release the resources associated with the batch. Consumers of the batch *must* release
* after use.
*/
void release();
}
61 changes: 19 additions & 42 deletions src/main/java/org/logstash/beats/BeatsHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.net.InetSocketAddress;
import javax.net.ssl.SSLHandshakeException;

public class BeatsHandler extends SimpleChannelInboundHandler<NewBatch> {
public class BeatsHandler extends SimpleChannelInboundHandler<Batch> {
private final static Logger logger = LogManager.getLogger(BeatsHandler.class);
public static AttributeKey<Boolean> PROCESSING_BATCH = AttributeKey.valueOf("processing-batch");
private final IMessageListener messageListener;
Expand Down Expand Up @@ -45,50 +45,28 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@Override
public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception {
if(logger.isTraceEnabled()) {
logger.trace(format("Received a new payload"));
if(logger.isDebugEnabled()) {
logger.debug(format("Received a new payload"));
}

ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(true);
for(Message message : batch.getMessages()) {
if(logger.isTraceEnabled()) {
logger.trace(format("Sending a new message for the listener, sequence: " + message.getSequence()));
}
messageListener.onNewMessage(ctx, message);

if(needAck(message)) {
ack(ctx, message);
try {
for (Message message : batch) {
if (logger.isDebugEnabled()) {
logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
}
messageListener.onNewMessage(ctx, message);

if (needAck(message)) {
ack(ctx, message);
}
}
}finally{
batch.release();
ctx.flush();
ctx.channel().attr(PROCESSING_BATCH).set(false);
}
ctx.flush();
ctx.channel().attr(PROCESSING_BATCH).set(false);
}

@Override
public void channelRead0(ChannelHandlerContext ctx, NewBatch batch) throws Exception {
if(logger.isDebugEnabled()) {
logger.debug(format("Received a new payload"));
}

ctx.channel().attr(PROCESSING_BATCH).set(true);
batch.getMessageStream().forEach(e -> {
messageListener.onNewMessage(ctx, e);
if (needAck(e)){
ack(ctx, e);
}
});
// for(Message message : batch.getMessages()) {
// if(logger.isDebugEnabled()) {
// logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
// }
// messageListener.onNewMessage(ctx, message);
//
// if(needAck(message)) {
// ack(ctx, message);
// }
// }
ctx.flush();
ctx.channel().attr(PROCESSING_BATCH).set(false);
}

/*
Expand Down Expand Up @@ -120,11 +98,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

private boolean needAck(Message message) {
return message.getSequence() == message.getNewBatch().getBatchSize();
return message.getSequence() == message.getBatch().getBatchSize();
}

private void ack(ChannelHandlerContext ctx, Message message) {
logger.warn(format("Acking message number " + message.getSequence()));
if (logger.isTraceEnabled()){
logger.trace(format("Acking message number " + message.getSequence()));
}
Expand Down Expand Up @@ -155,7 +132,7 @@ private String format(String message) {
remotehost = remote.getAddress().getHostAddress() + ":" + remote.getPort();
} else{
remotehost = "undefined";
};
}

return "[local: " + localhost + ", remote: " + remotehost + "] " + message;
}
Expand Down
39 changes: 14 additions & 25 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.logstash.beats;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -20,8 +18,6 @@


public class BeatsParser extends ByteToMessageDecoder {
private static final int CHUNK_SIZE = 1024;
public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
private final static Logger logger = LogManager.getLogger(BeatsParser.class);

private Batch batch;
Expand Down Expand Up @@ -56,19 +52,17 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

switch (currentState) {
case READ_HEADER: {
if (batch == null){
batch = new Batch();
}
logger.trace("Running: READ_HEADER");

byte currentVersion = in.readByte();

if(Protocol.isVersion2(currentVersion)) {
logger.trace("Frame version 2 detected");
batch.setProtocol(Protocol.VERSION_2);
} else {
logger.trace("Frame version 1 detected");
batch.setProtocol(Protocol.VERSION_1);
if (batch == null) {
if (Protocol.isVersion2(currentVersion)) {
batch = new V2Batch();
logger.trace("Frame version 2 detected");
} else {
logger.trace("Frame version 1 detected");
batch = new V1Batch();
}
}
transition(States.READ_FRAME_TYPE);
break;
Expand Down Expand Up @@ -145,15 +139,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

count++;
}

Message message = new Message(sequence, dataMap);
batch.addMessage(message);
((V1Batch)batch).addMessage(message);

if(batch.complete()) {
if (batch.isComplete()){
out.add(batch);
batchComplete();
}

transition(States.READ_HEADER);

break;
Expand Down Expand Up @@ -184,9 +176,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
ByteBuf buffer = ctx.alloc().buffer(requiredBytes);
try (
ByteBufOutputStream buffOutput = new ByteBufOutputStream(buffer);
InflaterOutputStream inflater = new InflaterOutputStream(buffOutput, new Inflater());
InflaterOutputStream inflater = new InflaterOutputStream(buffOutput, new Inflater())
) {
ByteBuf bytesRead = in.readBytes(inflater, requiredBytes);
in.readBytes(inflater, requiredBytes);
transition(States.READ_HEADER);
try {
while (buffer.readableBytes() > 0) {
Expand All @@ -201,13 +193,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
case READ_JSON: {
logger.trace("Running: READ_JSON");

batch.addMessage(new Message(sequence, in.readBytes(requiredBytes)));
if(batch.size() == batch.getBatchSize()) {
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
if(batch.isComplete()) {
if(logger.isTraceEnabled()) {
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
}

out.add(batch);
batchComplete();
}
Expand Down Expand Up @@ -238,7 +228,6 @@ private void batchComplete() {
requiredBytes = 0;
sequence = 0;
batch = null;
logger.warn("batch compete");
}

public class InvalidFrameProtocolException extends Exception {
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/logstash/beats/KeepAliveHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ public class KeepAliveHandler extends ChannelDuplexHandler {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent e = null;
IdleStateEvent e;

if (evt instanceof IdleStateEvent) {
e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
if (isProcessing(ctx)) {
ChannelFuture f = ctx.writeAndFlush(new Ack(Protocol.VERSION_2, 0));
logger.warn("sending keep alive ack to libbeat");
if (logger.isTraceEnabled()) {
logger.trace("sending keep alive ack to libbeat");
f.addListener((ChannelFutureListener) future -> {
Expand Down Expand Up @@ -51,7 +50,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
}

public boolean isProcessing(ChannelHandlerContext ctx) {
public boolean isProcessing(ChannelHandlerContext ctx) {
return ctx.channel().hasAttr(BeatsHandler.PROCESSING_BATCH) && ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).get();
}
}
Loading

0 comments on commit aaed277

Please sign in to comment.