Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a strategy to handle OOM in direct memory #475

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
dc8a8b0
Avoid to run Beats parser and Beats protocol handler in separate exec…
andsel Jul 5, 2023
700543c
Ported core part of original #410 PR
andsel Jul 5, 2023
8e5f7b3
Added pull way of fetching data
andsel Jul 7, 2023
dbabb78
Extracted the pull mode part into a separate handler
andsel Jul 12, 2023
ea7f14d
Update the flow control handler to avoid new reads if the channel bec…
andsel Jul 12, 2023
234569d
Separated the logic to drop incoming connections into specific handler
andsel Jul 12, 2023
86e4445
Fist draft of the integration test
andsel Jul 17, 2023
0148987
Removed tests using EmbeddedChannel because doesn't manage the writea…
andsel Jul 17, 2023
af04733
Reshaped the asynch code to be more linear
andsel Jul 17, 2023
cd7aafe
Covered the number of connection limiter with unit test
andsel Jul 17, 2023
c6e775c
Pessimistic remediation, when a direct OOM happens close the channel
andsel Jul 20, 2023
e34bf25
Removed from the log string any reference to Filebeat
andsel Jul 28, 2023
7a6982e
Raised up the log level level when dropping connections becuase of th…
andsel Jul 28, 2023
299ee27
Better actionable suggestion to user in case of OOM
andsel Jul 28, 2023
e24f339
Updated OOMConnectionCloser to monitor the consumption of memory also…
andsel Sep 20, 2023
c7c54d9
Re-introduce the beats handlers worker group to separata the Beats pr…
andsel Sep 20, 2023
16a76b8
Added feature flag named protect_direct_memory to control the usage o…
andsel Sep 20, 2023
d75a0df
Throw a configuration error if Netty reserved direct memory is not an…
andsel Sep 20, 2023
68f4967
Add missed shutdown of beat's worker loop
andsel Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-host>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-include_codec_tag>> |<<boolean,boolean>>|__Deprecated__
| <<plugins-{type}s-{plugin}-port>> |<<number,number>>|Yes
| <<plugins-{type}s-{plugin}-protect_direct_memory>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-ssl>> |<<boolean,boolean>>|__Deprecated__
| <<plugins-{type}s-{plugin}-ssl_certificate>> |a valid filesystem path|No
| <<plugins-{type}s-{plugin}-ssl_certificate_authorities>> |<<array,array>>|No
Expand Down Expand Up @@ -384,6 +385,17 @@ deprecated[6.5.0, Replaced by <<plugins-{type}s-{plugin}-enrich>>]

The port to listen on.

[id="plugins-{type}s-{plugin}-protect_direct_memory"]
===== `protect_direct_memory`

* Value type is <<boolean,boolean>>
* Default value is `true`

If enabled, actively check native memory used by network part to do parsing and avoid
out of memory conditions. When the consumption of native memory used is close to
the maximum limit, connections are being closed in undetermined order until the safe
memory condition is reestablished.

[id="plugins-{type}s-{plugin}-ssl"]
===== `ssl`
deprecated[6.6.0, Replaced by <<plugins-{type}s-{plugin}-ssl_enabled>>]
Expand Down
8 changes: 7 additions & 1 deletion lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base
# The port to listen on.
config :port, :validate => :number, :required => true

# Proactive checks that keep the beats input active when the memory used by protocol parser and network
# related operations is going to terminate.
config :protect_direct_memory, :validate => :boolean, :default => true

# Events are by default sent in plain text. You can
# enable encryption by setting `ssl` to true and configuring
# the `ssl_certificate` and `ssl_key` options.
Expand Down Expand Up @@ -243,9 +247,11 @@ def register
end # def register

def create_server
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, @protect_direct_memory)
server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
server
rescue java.lang.IllegalArgumentException => e
configuration_error e.message
end

def run(output_queue)
Expand Down
7 changes: 4 additions & 3 deletions spec/inputs/beats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
let(:port) { BeatsInputTest.random_port }
let(:client_inactivity_timeout) { 400 }
let(:threads) { 1 + rand(9) }
let(:protect_direct_memory) { true }
let(:queue) { Queue.new }
let(:config) do
{
Expand All @@ -36,7 +37,7 @@
let(:port) { 9001 }

it "sends the required options to the server" do
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads)
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory)
subject.register
end
end
Expand Down Expand Up @@ -529,8 +530,8 @@
subject(:plugin) { LogStash::Inputs::Beats.new(config) }

before do
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads)
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads).and_return @server
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, threads, protect_direct_memory)
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, threads, protect_direct_memory).and_return @server
expect( @server ).to receive(:listen)

subject.register
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/logstash/beats/BeatsHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.logstash.beats;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -92,6 +94,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
logger.info(format("closing (" + cause.getMessage() + ")"));
}
} else {
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
OOMConnectionCloser.DirectMemoryUsage usageSnapshot = OOMConnectionCloser.DirectMemoryUsage.capture(allocator);
logger.info("Connection {}, memory status used: {}, pinned: {}, ratio {}", ctx.channel(), usageSnapshot.used, usageSnapshot.pinned, usageSnapshot.ratio);
final Throwable realCause = extractCause(cause, 0);
if (logger.isDebugEnabled()){
logger.info(format("Handling exception: " + cause + " (caused by: " + realCause + ")"), cause);
Expand Down
24 changes: 19 additions & 5 deletions src/main/java/org/logstash/beats/BeatsParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@


import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.DecoderException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -14,6 +17,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.zip.Inflater;
import java.util.zip.InflaterOutputStream;

Expand Down Expand Up @@ -48,8 +52,8 @@ private enum States {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws InvalidFrameProtocolException, IOException {
if(!hasEnoughBytes(in)) {
if (decodingCompressedBuffer){
if (!hasEnoughBytes(in)) {
if (decodingCompressedBuffer) {
throw new InvalidFrameProtocolException("Insufficient bytes in compressed content to decode: " + currentState);
}
return;
Expand Down Expand Up @@ -182,6 +186,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

case READ_COMPRESSED_FRAME: {
logger.trace("Running: READ_COMPRESSED_FRAME");

inflateCompressedFrame(ctx, in, (buffer) -> {
transition(States.READ_HEADER);

Expand All @@ -199,9 +204,18 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
}
case READ_JSON: {
logger.trace("Running: READ_JSON");
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
if(batch.isComplete()) {
if(logger.isTraceEnabled()) {
try {
((V2Batch) batch).addMessage(sequence, in, requiredBytes);
} catch (Throwable th) {
// batch has to release its internal buffer before bubbling up the exception
batch.release();

// re throw the same error after released the internal buffer
throw th;
}

if (batch.isComplete()) {
if (logger.isTraceEnabled()) {
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
}
out.add(batch);
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/org/logstash/beats/FlowLimiterHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.logstash.beats;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* Configure the channel where it's installed to operate the reads in pull mode,
* disabling the autoread and explicitly invoking the read operation.
* The flow control to keep the outgoing buffer under control is done
* avoiding to read in new bytes if the outgoing direction became not writable, this
* excert back pressure to the TCP layer and ultimately to the upstream system.
* */
@Sharable
public final class FlowLimiterHandler extends ChannelInboundHandlerAdapter {

private final static Logger logger = LogManager.getLogger(FlowLimiterHandler.class);

@Override
public void channelRegistered(final ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setAutoRead(false);
super.channelRegistered(ctx);
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) {
ctx.channel().read();
}
}

@Override
public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
if (isAutoreadDisabled(ctx.channel()) && ctx.channel().isWritable()) {
ctx.channel().read();
}
}

private boolean isAutoreadDisabled(Channel channel) {
return !channel.config().isAutoRead();
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.channel().read();
super.channelWritabilityChanged(ctx);

logger.debug("Writability on channel {} changed to {}", ctx.channel(), ctx.channel().isWritable());
}

}
88 changes: 88 additions & 0 deletions src/main/java/org/logstash/beats/OOMConnectionCloser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.logstash.beats;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class OOMConnectionCloser extends ChannelInboundHandlerAdapter {

private final PooledByteBufAllocator allocator;

static class DirectMemoryUsage {
final long used;
final long pinned;
private final PooledByteBufAllocator allocator;
final short ratio;

private DirectMemoryUsage(long used, long pinned, PooledByteBufAllocator allocator) {
this.used = used;
this.pinned = pinned;
this.allocator = allocator;
this.ratio = (short) Math.round(((double) pinned / used) * 100);
}

static DirectMemoryUsage capture(PooledByteBufAllocator allocator) {
long usedDirectMemory = allocator.metric().usedDirectMemory();
long pinnedDirectMemory = allocator.pinnedDirectMemory();
return new DirectMemoryUsage(usedDirectMemory, pinnedDirectMemory, allocator);
}

boolean isCloseToOOM() {
long maxDirectMemory = PlatformDependent.maxDirectMemory();
int chunkSize = allocator.metric().chunkSize();
return ((maxDirectMemory - used) <= chunkSize) && ratio > 75;
}
}

private final static Logger logger = LogManager.getLogger(OOMConnectionCloser.class);

public static final Pattern DIRECT_MEMORY_ERROR = Pattern.compile("^Cannot reserve \\d* bytes of direct buffer memory.*$");

OOMConnectionCloser() {
allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator);
logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio);
if (direct.isCloseToOOM()) {
logger.warn("Closing connection {} because running out of memory, used: {}, pinned: {}, ratio {}", ctx.channel(), direct.used, direct.pinned, direct.ratio);
ReferenceCountUtil.release(msg); // to free the memory used by the buffer
ctx.flush();
ctx.close();
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (isDirectMemoryOOM(cause)) {
DirectMemoryUsage direct = DirectMemoryUsage.capture(allocator);
logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio);
logger.warn("Dropping connection {} due to lack of available Direct Memory. Please lower the number of concurrent connections or reduce the batch size. " +
"Alternatively, raise -XX:MaxDirectMemorySize option in the JVM running Logstash", ctx.channel());
ctx.flush();
ctx.close();
} else {
super.exceptionCaught(ctx, cause);
}
}

private boolean isDirectMemoryOOM(Throwable th) {
if (!(th instanceof OutOfMemoryError)) {
return false;
}
Matcher m = DIRECT_MEMORY_ERROR.matcher(th.getMessage());
return m.matches();
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/logstash/beats/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception {
// Check for leaks.
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors());
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), true);

if(args.length > 0 && args[0].equals("ssl")) {
logger.debug("Using SSL");
Expand Down
35 changes: 32 additions & 3 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package org.logstash.beats;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.PlatformDependent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.netty.SslHandlerProvider;
Expand All @@ -18,18 +23,34 @@ public class Server {
private final int port;
private final String host;
private final int beatsHeandlerThreadCount;
private final boolean protectDirectMemory;
private NioEventLoopGroup workGroup;
private IMessageListener messageListener = new MessageListener();
private SslHandlerProvider sslHandlerProvider;
private BeatsInitializer beatsInitializer;

private final int clientInactivityTimeoutSeconds;

public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) {
public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, boolean protectDirectMemory) {
this.host = host;
port = p;
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
beatsHeandlerThreadCount = threadCount;
this.protectDirectMemory = protectDirectMemory;

validateMinimumDirectMemory();
}

/**
* Validate if the configured available direct memory is enough for safe processing, else throws a ConfigurationException
* */
private void validateMinimumDirectMemory() {
long maxDirectMemoryAllocatable = PlatformDependent.maxDirectMemory();
if (maxDirectMemoryAllocatable < 256 * 1024 * 1024) {
long roundedMegabytes = Math.round((double) maxDirectMemoryAllocatable / 1024 / 1024);
throw new IllegalArgumentException("Max direct memory should be at least 256MB but was " + roundedMegabytes + "MB, " +
"please check your MaxDirectMemorySize and io.netty.maxDirectMemory settings");
}
}

public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){
Expand Down Expand Up @@ -126,6 +147,9 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {

public void initChannel(SocketChannel socket){
ChannelPipeline pipeline = socket.pipeline();
if (protectDirectMemory) {
pipeline.addLast(new OOMConnectionCloser());
}

if (isSslEnabled()) {
pipeline.addLast(SSL_HANDLER, sslHandlerProvider.sslHandlerForChannel(socket));
Expand All @@ -134,7 +158,12 @@ public void initChannel(SocketChannel socket){
new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds));
pipeline.addLast(BEATS_ACKER, new AckEncoder());
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like us to be a bit more intentional in why we're removing the executor group here. in my test PR I removed it to simplify the number of threads I had to reason about, and having a single pool for both boss/worker loops would mean blocking the workers would stop boss from accepting new connections too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be the complete pipeline executed in the worker group, instead just BeatsParser and BeatsHandler, while BeatsHacker and Connectionhandler are still executed by the boss group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean shouldn't use the ServerBootstrap.group(bossGroup, workerGroup) instead of assigning the group for just those 2 Beats handler? If we do this, we have at least one thread context switch on every pipeline. Maybe it's something I'm not grasping.

if (protectDirectMemory) {
pipeline.addLast(new FlowLimiterHandler());
pipeline.addLast(new ThunderingGuardHandler());
}
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener));
}


Expand Down
Loading