diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index 1e0a303a..f0e6a19f 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -120,7 +120,7 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base config :client_inactivity_timeout, :validate => :number, :default => 60 # Beats handler executor thread - config :executor_threads, :validate => :number, :default => LogStash::Config::CpuCoreStrategy.maximum * 4 + config :executor_threads, :validate => :number, :default => LogStash::Config::CpuCoreStrategy.maximum def register # For Logstash 2.4 we need to make sure that the logger is correctly set for the diff --git a/lib/logstash/inputs/beats/message_listener.rb b/lib/logstash/inputs/beats/message_listener.rb index 8c80f98b..5cddc4f7 100644 --- a/lib/logstash/inputs/beats/message_listener.rb +++ b/lib/logstash/inputs/beats/message_listener.rb @@ -27,22 +27,29 @@ def initialize(queue, input) end def onNewMessage(ctx, message) - hash = message.getData - ip_address = ip_address(ctx) + begin + hash = message.getData + ip_address = ip_address(ctx) - hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil? - target_field = extract_target_field(hash) + hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil? + target_field = extract_target_field(hash) - if target_field.nil? - event = LogStash::Event.new(hash) - @nocodec_transformer.transform(event) - @queue << event - else - codec(ctx).accept(CodecCallbackListener.new(target_field, - hash, - message.getIdentityStream(), - @codec_transformer, - @queue)) + if target_field.nil? + event = LogStash::Event.new(hash) + @nocodec_transformer.transform(event) + @queue << event + else + codec(ctx).accept(CodecCallbackListener.new(target_field, + hash, + message.getIdentityStream(), + @codec_transformer, + @queue)) + end + rescue => e + logger.warn("Error handling message #{message}", e) + raise e + ensure + message.release end end @@ -85,7 +92,7 @@ def ip_address(ctx) end def register_connection(ctx) - connections_list[ctx] = ConnectionState.new(ctx, input.codec.dup, ip_address_from_ctx(ctx)) + connections_list[ctx] = ConnectionState.new(ctx, input.codec.clone, ip_address_from_ctx(ctx)) end def ip_address_from_ctx(ctx) diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index 8e03922f..e483b546 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -1,59 +1,58 @@ package org.logstash.beats; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.AttributeKey; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.ThreadContext; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLHandshakeException; -public class BeatsHandler extends SimpleChannelInboundHandler { +public class BeatsHandler extends SimpleChannelInboundHandler { private final static Logger logger = LogManager.getLogger(BeatsHandler.class); public static AttributeKey PROCESSING_BATCH = AttributeKey.valueOf("processing-batch"); private final IMessageListener messageListener; private ChannelHandlerContext context; - public BeatsHandler(IMessageListener listener) { messageListener = listener; } @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + public void channelActive(final ChannelHandlerContext ctx) throws Exception { + if (logger.isTraceEnabled()){ + logger.trace(format("Channel Active")); + } + ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(false); + + super.channelActive(ctx); context = ctx; messageListener.onNewConnection(ctx); - // Give some breathing room on new clients to receive the keep alive. - ctx.channel().attr(PROCESSING_BATCH).set(false); } @Override - public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - ctx.channel().attr(PROCESSING_BATCH).set(false); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + if (logger.isTraceEnabled()){ + logger.trace(format("Channel Inactive")); + } + ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(false); messageListener.onConnectionClose(ctx); } @Override public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception { - if(logger.isDebugEnabled()) { - logger.debug(format("Received a new payload")); + if(logger.isTraceEnabled()) { + logger.trace(format("Received a new payload")); } - ctx.channel().attr(PROCESSING_BATCH).set(true); + ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(true); for(Message message : batch.getMessages()) { - if(logger.isDebugEnabled()) { - logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); + if(logger.isTraceEnabled()) { + logger.trace(format("Sending a new message for the listener, sequence: " + message.getSequence())); } messageListener.onNewMessage(ctx, message); @@ -65,6 +64,33 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio ctx.channel().attr(PROCESSING_BATCH).set(false); } + @Override + public void channelRead0(ChannelHandlerContext ctx, NewBatch batch) throws Exception { + if(logger.isDebugEnabled()) { + logger.debug(format("Received a new payload")); + } + + ctx.channel().attr(PROCESSING_BATCH).set(true); + batch.getMessageStream().forEach(e -> { + messageListener.onNewMessage(ctx, e); + if (needAck(e)){ + ack(ctx, e); + } + }); +// for(Message message : batch.getMessages()) { +// if(logger.isDebugEnabled()) { +// logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); +// } +// messageListener.onNewMessage(ctx, message); +// +// if(needAck(message)) { +// ack(ctx, message); +// } +// } + ctx.flush(); + ctx.channel().attr(PROCESSING_BATCH).set(false); + } + /* * Do not propagate the SSL handshake exception down to the ruby layer handle it locally instead and close the connection * if the channel is still active. Calling `onException` will flush the content of the codec's buffer, this call @@ -75,26 +101,33 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio * overlap Filebeat transmission; we were recommending multiline at the source in v5 and in v6 we enforce it. */ @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - ctx.close(); - - if (!(cause instanceof SSLHandshakeException)) { - messageListener.onException(ctx, cause); - } - - String causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + try { + if (!(cause instanceof SSLHandshakeException)) { + messageListener.onException(ctx, cause); + } + String causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage(); - if (logger.isDebugEnabled()){ - logger.debug(format("Handling exception: " + causeMessage), cause); + if (logger.isDebugEnabled()){ + logger.debug(format("Handling exception: " + causeMessage), cause); + } + logger.info(format("Handling exception: " + causeMessage)); + } finally{ + super.exceptionCaught(ctx, cause); + ctx.flush(); + ctx.close(); } - logger.info(format("Handling exception: " + causeMessage)); } private boolean needAck(Message message) { - return message.getSequence() == message.getBatch().getBatchSize(); + return message.getSequence() == message.getNewBatch().getBatchSize(); } private void ack(ChannelHandlerContext ctx, Message message) { + logger.warn(format("Acking message number " + message.getSequence())); + if (logger.isTraceEnabled()){ + logger.trace(format("Acking message number " + message.getSequence())); + } writeAck(ctx, message.getBatch().getProtocol(), message.getSequence()); } diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 7042a902..2d1ebbb2 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -24,7 +24,7 @@ public class BeatsParser extends ByteToMessageDecoder { public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); private final static Logger logger = LogManager.getLogger(BeatsParser.class); - private Batch batch = new Batch(); + private Batch batch; private enum States { READ_HEADER(1), @@ -56,6 +56,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t switch (currentState) { case READ_HEADER: { + if (batch == null){ + batch = new Batch(); + } logger.trace("Running: READ_HEADER"); byte currentVersion = in.readByte(); @@ -67,7 +70,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t logger.trace("Frame version 1 detected"); batch.setProtocol(Protocol.VERSION_1); } - transition(States.READ_FRAME_TYPE); break; } @@ -100,7 +102,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_WINDOW_SIZE: { logger.trace("Running: READ_WINDOW_SIZE"); - batch.setBatchSize((int) in.readUnsignedInt()); // This is unlikely to happen but I have no way to known when a frame is @@ -201,12 +202,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t case READ_JSON: { logger.trace("Running: READ_JSON"); - byte[] bytes = new byte[requiredBytes]; - in.readBytes(bytes); - Message message = new Message(sequence, (Map) MAPPER.readValue(bytes, Object.class)); - - batch.addMessage(message); - + batch.addMessage(new Message(sequence, in.readBytes(requiredBytes))); if(batch.size() == batch.getBatchSize()) { if(logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); @@ -241,7 +237,8 @@ private void transition(States nextState, int requiredBytes) { private void batchComplete() { requiredBytes = 0; sequence = 0; - batch = new Batch(); + batch = null; + logger.warn("batch compete"); } public class InvalidFrameProtocolException extends Exception { diff --git a/src/main/java/org/logstash/beats/KeepAliveHandler.java b/src/main/java/org/logstash/beats/KeepAliveHandler.java index 9b9d50c3..a2dae7e1 100644 --- a/src/main/java/org/logstash/beats/KeepAliveHandler.java +++ b/src/main/java/org/logstash/beats/KeepAliveHandler.java @@ -21,6 +21,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (e.state() == IdleState.WRITER_IDLE) { if (isProcessing(ctx)) { ChannelFuture f = ctx.writeAndFlush(new Ack(Protocol.VERSION_2, 0)); + logger.warn("sending keep alive ack to libbeat"); if (logger.isTraceEnabled()) { logger.trace("sending keep alive ack to libbeat"); f.addListener((ChannelFutureListener) future -> { @@ -51,6 +52,6 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc } public boolean isProcessing(ChannelHandlerContext ctx) { - return ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).get(); + return ctx.channel().hasAttr(BeatsHandler.PROCESSING_BATCH) && ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).get(); } } \ No newline at end of file diff --git a/src/main/java/org/logstash/beats/Message.java b/src/main/java/org/logstash/beats/Message.java index 5b079a41..37e1d9a6 100644 --- a/src/main/java/org/logstash/beats/Message.java +++ b/src/main/java/org/logstash/beats/Message.java @@ -1,30 +1,64 @@ package org.logstash.beats; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; import java.util.HashMap; import java.util.Map; public class Message implements Comparable { private final int sequence; private String identityStream; - private final Map data; + private Map data; private Batch batch; + private ByteBuf buffer; + private Logger logger = LogManager.getLogger(Message.class); + + public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); public Message(int sequence, Map map) { this.sequence = sequence; this.data = map; + } - identityStream = extractIdentityStream(); + public Message(int sequence, ByteBuf buffer){ + this.sequence = sequence; + this.buffer = buffer; } public int getSequence() { return sequence; } + public boolean release() { + if (buffer != null){ + return buffer.release(); + } + return true; + } + + public Map getData(){ + if (data == null && buffer != null){ + try (ByteBufInputStream byteBufInputStream = new ByteBufInputStream(buffer)){ + data = (Map)MAPPER.readValue(byteBufInputStream, Object.class); + } catch (IOException e){ + throw new RuntimeException("Unable to parse beats payload ", e); + } +// finally{ +// release(); +// } + } - public Map getData() { return data; } + private NewBatch newBatch; + @Override public int compareTo(Message o) { return Integer.compare(getSequence(), o.getSequence()); @@ -38,7 +72,19 @@ public void setBatch(Batch newBatch) { batch = newBatch; } + public NewBatch getNewBatch() { + return newBatch; + } + + public void setNewBatch(NewBatch newnewBatch){ + this.newBatch = newnewBatch; + } + + public String getIdentityStream() { + if (identityStream == null){ + identityStream = extractIdentityStream(); + } return identityStream; } @@ -52,7 +98,7 @@ private String extractIdentityStream() { if(id != null && resourceId != null) { return id + "-" + resourceId; } else { - return (String) beatsData.get("name") + "-" + (String) beatsData.get("source"); + return beatsData.get("name") + "-" + beatsData.get("source"); } } diff --git a/src/main/java/org/logstash/beats/Runner.java b/src/main/java/org/logstash/beats/Runner.java index dd1b8170..a5eb6fd3 100644 --- a/src/main/java/org/logstash/beats/Runner.java +++ b/src/main/java/org/logstash/beats/Runner.java @@ -7,6 +7,7 @@ public class Runner { private static final int DEFAULT_PORT = 5044; + private final static Logger logger = LogManager.getLogger(Runner.class); @@ -17,7 +18,7 @@ static public void main(String[] args) throws Exception { // Check for leaks. // ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); - Server server = new Server("0.0.0.0", DEFAULT_PORT); + Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors()); if(args.length > 0 && args[0].equals("ssl")) { logger.debug("Using SSL"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 85c669d9..aa2a21b6 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -17,8 +17,6 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; - - public class Server { private final static Logger logger = LogManager.getLogger(Server.class); @@ -34,17 +32,14 @@ public class Server { private final int clientInactivityTimeoutSeconds; - public Server(String host, int p) { - this(host, p, DEFAULT_CLIENT_TIMEOUT_SECONDS, Runtime.getRuntime().availableProcessors() * 4); - } - public Server(String host, int p, int timeout, int threadCount) { + this.host = host; port = p; clientInactivityTimeoutSeconds = timeout; beatsHeandlerThreadCount = threadCount; workGroup = new NioEventLoopGroup(); - } + } public void enableSSL(SslSimpleBuilder builder) { sslBuilder = builder; @@ -52,7 +47,7 @@ public void enableSSL(SslSimpleBuilder builder) { public Server listen() throws InterruptedException { try { - logger.info("Starting server on port: " + this.port); + logger.info("Starting server on port: " + this.port); beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount); @@ -131,15 +126,18 @@ public void initChannel(SocketChannel socket) throws IOException, NoSuchAlgorith } pipeline.addLast(idleExecutorGroup, IDLESTATE_HANDLER, new IdleStateHandler(clientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS , clientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); - pipeline.addLast(BEATS_PARSER, new BeatsParser()); - pipeline.addLast(beatsHandlerExecutorGroup, BEATS_HANDLER, new BeatsHandler(this.message)); pipeline.addLast(KEEP_ALIVE_HANDLER, new KeepAliveHandler()); + pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(this.message)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warn("Exception caught in channel initializer", cause); - this.message.onChannelInitializeException(ctx, cause); + try { + this.message.onChannelInitializeException(ctx, cause); + } finally { + super.exceptionCaught(ctx, cause); + } } public void shutdownEventExecutor() {