Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-established previous behaviour without a default limit for 'decode_size_limit_bytes' #45

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Aug 30, 2024

Release notes

Update behaviour of decode_size_limit_bytes to do not apply any limitation to the length of a line, eventually tagging the event if it's set.

What does this PR do?

If decode_size_limit_bytes is unset re-establish the behaviour like before version 3.2.0 so that if a long line, close to the heap available space, is processed by the codec then it would throw an OOM and kill Logstash. If the such config is instead explicitly set then instead of continue looping with a warning log, it still produces an event but it's tagged with an error and the content of the message clipped at the first decode_size_limit_bytes bytes.

Why is it important/What is the impact to the user?

With #43 which configured a default 20Mb limit on the size of the line to parse, it introduced a breaking change in the behaviour of existing installations. If a Logstash was parsing correctly lines wide 30Mb, then after that change it would encounter an error.
With this PR the existing behaviour is re-established but let the use to explicitly set a limit that once passed, instead of make a continuous iteration on the error, still produces an event but it's tagger with error and contains a slice of the offending input.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

How to test this PR locally

  • Use a pipelines with the codec that limit at 512 bytes, such as:
input {
  stdin {
    codec => json_lines {
      decode_size_limit_bytes => 512
    }  
  }
}

output {
  stdout {
    codec => rubydebug
  }
}
  • very that generates a an event tagging it with the _jsonparsetoobigfailure and a message clipped at 512, as sample file you can use
    1kb_single_line.json
  • launch Logstash like
cat /path/to/1kb_single_line.json | bin/logstash -f /path/to/test_pipeline.conf

Related issues

Logs

[2024-09-02T14:45:51,796][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
{
    "@timestamp" => 2024-09-02T12:45:51.801192Z,
         "event" => {
        "original" => "{\"field_1\": [{\"name\":\"Jannik\",\"surname\":\"Sinner\"},{\"name\":\"Novak\",\"surname\":\"Djokovic\"},{\"name\":\"Rafa\",\"surname\":\"Nadal\"},{\"name\":\"Roger\",\"surname\":\"Federer\"},{\"name\":\"Pete\",\"surname\":\"Sampras\"},{\"name\":\"Andr\xC3\xA9\",\"surname\":\"Agassi\"},{\"name\":\"Rod\",\"surname\":\"Laver\"},{\"name\":\"Ivan\",\"surname\":\"Lendl\"},{\"name\":\"Bjorn\",\"surname\":\"Borg\"},{\"name\":\"John\",\"surname\":\"McEnroe\"},{\"name\":\"Jimmy\",\"surname\":\"Connors\"}],\"field_2\": [{\"name\":\"Jannik\",\"surname\":\"Sinner\"},{\"name\":\"Novak\",\"surname\":\"Djokovic\"},{\"name\":\"Rafa\",\"surname\":\"Nadal\"},{\"name\":\"Roger\",\"surname\":\"Federer\"},{\"name\":\"Pete\",\"surname\":\"Sampras\"},{\"name\":\"Andr\xC3\xA9\",\"surname\":\"Agassi\"},{\"name\":\"Rod\",\"surname\":\"Laver\"},{\"name\":\"Ivan\",\"surname\":\"Lendl\"},{\"name\":\"Bjorn\",\"surname\":\"Borg\"},{\"name\":\"John\",\"surname\":\"McEnroe\"},{\"name\":\"Jimmy\",\"surname\":\"Connors\"}],\"field_3\": [{\"name\":\"Jannik\",\"surname\":\"Sinner\"},{\"name\":\"Novak\",\"surname\":\"Djokovic\"},{\"name\":\"Rafa\",\"surname\":\"Nadal\"},{\"name\":\"Roger\",\"surname\":\"Federer\"},{\"name\":\"Pete\",\"surname\":\"Sampras\"},{\"name\":\"Andr\xC3\xA9\",\"surname\":\"Agassi\"},{\"name\":\"Rod\",\"surname\":\"Laver\"},{\"name\":\"Ivan\",\"surname\":\"Lendl\"},{\"name\":\"Bjorn\",\"surname\":\"Borg\"},{\"name\":\"John\",\"surname\":\"McEnroe\"},{\"name\":\"Jimmy\",\"surname\":\"Connors\"}]}\r\n"
    },
      "@version" => "1",
          "host" => {
        "hostname" => "andreas-MBP-2.station"
    },
          "tags" => [
        [0] "_jsonparsetoobigfailure"
    ],
       "message" => "{\"field_1\": [{\"name\":\"Jannik\",\"surname\":\"Sinner\"},{\"name\":\"Novak\",\"surname\":\"Djokovic\"},{\"name\":\"Rafa\",\"surname\":\"Nadal\"},{\"name\":\"Roger\",\"surname\":\"Federer\"},{\"name\":\"Pete\",\"surname\":\"Sampras\"},{\"name\":\"Andr\xC3\xA9\",\"surname\":\"Agassi\"},{\"name\":\"Rod\",\"surname\":\"Laver\"},{\"name\":\"Ivan\",\"surname\":\"Lendl\"},{\"name\":\"Bjorn\",\"surname\":\"Borg\"},{\"name\":\"John\",\"surname\":\"McEnroe\"},{\"name\":\"Jimmy\",\"surname\":\"Connors\"}],\"field_2\": [{\"name\":\"Jannik\",\"surname\":\"Sinner\"},{\"name\":\"Novak\",\"surname\":\"Djokovic\"},{\"name\":\"Rafa\",\"su"
}
[2024-09-02T14:45:51,966][INFO ][logstash.javapipeline    ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2024-09-02T14:45:51,968][INFO ][logstash.agent           ] Pipelines running {:count=>0, :running_pipelines=>[], :non_running_pipelines=>[:main]}
[2024-09-02T14:45:51,970][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2024-09-02T14:45:51,972][INFO ][logstash.runner          ] Logstash shut down.

@andsel andsel self-assigned this Aug 30, 2024
@andsel andsel marked this pull request as ready for review September 2, 2024 12:51
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO it makes sense to default to limiting sizes as that helps Logstash continue to function, and also gives the user an idea that we’re going into a dangerous territory.
That said I can agree that it’s a breaking change for people doing legit 30, 60 or 100mb messages.
So I suggest we default to none now but warn the the default will change to 20MB in the future cc @robbavey

@andsel andsel requested a review from jsvd September 3, 2024 12:50
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some concerns that this PR conflates buffer full vs large messages and can lead to locking up the codec. A situation we can think about is the following:

tcp {
  port => 3333
  decode_size_limit_bytes => 1000
}

Then three TCP packets arrive:
Packet 1, size 900 bytes: contains the first 900 bytes of a 1050 bytes message
What happens: nothing, the message is stored in the buffer

Packet 2, size 150 bytes: contains the last 50 bytes of the first message and an entire 100 bytes second message
What happens: a new Logstash Event is created with the contents of packet 2, which is tagged with _jsonparsetoobigfailure, but the buffer keeps its initial 900 bytes packet 1.

Packet 3, size 200 bytes: an entire 200 bytes third message
What happens: a new Logstash Event is created with the contents of packet 3, tagged with _jsonparsetoobigfailure, but the buffer keeps its initial 900 bytes packet 1.

As long as the next message is bigger than the remaining 100 bytes on the buffer then Logstash will never get rid of the first 900 bytes of message 1.

You can try this out with the following code:

# first packet with part of message 1
data = {"a" => "a"*1050}.to_json + "\n"; socket.write(data[0...900])
# second packet with part of message 1 and entire message 2
data = {"a" => "a"*1050}.to_json + "\n"; socket.write(data[900..] + "{\"b\" => \"bbb\"}\n")
# third packet w/ entire message 3
socket.write({"c" => "c"*200}.to_json + "\n")
# fourth packet w/ entire message 4
socket.write({"c" => "c"*200}.to_json + "\n")
# fifth packet w/ entire message 5
socket.write({"c" => "c"*200}.to_json + "\n")

We need to flush out the buffer and generate the event using the buffers data.
There may be other problematic cases with variable sized messages, please check that it behaves correctly in these partial message situations.

@andsel
Copy link
Contributor Author

andsel commented Sep 4, 2024

I tried locally, and breakpointing at https://github.com/elastic/logstash/blob/ac034a14ee422148483d42a51b68f07d1a38494c/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java#L80 and in the first iteration I would expect that entities contains just one element with the first 1000 chars (the as) but instead has the full payload so it contains 6 elements:

  • {"a":"a.....a"}
  • {"b": "bbb"}
  • {"c":"c....c"}
  • {"c":"c....c"}
  • {"c":"c....c"}
  • <empty string>

So a couple of observations:

  • TCP input doesn't bubble up the packets of 900 chars as we expected, in such case maybe it does some merging of TCP packets (maybe up to 32Kb?)
  • the implementation of BufferedTokenizerExt loose subsequent fragments once the first throws an error, dropping the segments instead of storing for further processing. This make think that maybe the bug resides in the BufferedTokenizerExt than the usage of it.

@jsvd
Copy link
Member

jsvd commented Sep 4, 2024

Did you try with bigger packages larger than MTU (16k)? Same test but larger messages:

bin/logstash -e "input { tcp { port => 3333 codec => json_lines { decode_size_limit_bytes => 100000 } } }"
data = {"a" => "a"*105_000}.to_json + "\n"; socket.write(data[0...90_000])
data = {"a" => "a"*105_000}.to_json + "\n"; socket.write(data[90_000..] + "{\"b\" => \"bbbbbbbbbbbbbbbbbbb\"}\n")

# now it's stuck
socket.write({"c" => "c"*50_000}.to_json + "\n")
socket.write({"c" => "c"*50_000}.to_json + "\n")
socket.write({"c" => "c"*50_000}.to_json + "\n")

# unblock with smaller message
socket.write({"c" => "c"*1_000}.to_json + "\n")

… not raise an error bu create an Event, tag it appropriately and slice the offending data into 'message' field
@andsel andsel force-pushed the feature/unlock_limit_and_tag_event_on_failure branch from 6e7ae5b to 0db5a0f Compare September 9, 2024 14:53
@andsel
Copy link
Contributor Author

andsel commented Sep 10, 2024

Using the code suggested (but just keeping the A and B and avoiding the Cs) with this PR, which handles the IllegalStateException we can observe two things:

  1. the event generated with tag _jsonparsetoobigfailure contains a portion of 65536 as and not the 100.000 chars from start of the json like {"a": "a...... up to 100.000.
  2. after the "line too big event" a valid JSON with as is generated.

Detailed explanation

Considering the code at https://github.com/elastic/logstash/blob/701108f88b3c16a08fb501a71d812b804a79fe68/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java#L79-L95

invokation number entities.size() data.length data inputSize return notes
1 1 2048 {"a": "a..... 0 -> 2048 empty array data contains the first 2048 bytes of a-s payload
2 1 32768 a...................a 2048 -> 34816 empty array data contains 32768 a-s and no separator
3 1 65536 a.......................................a 34816 + 65536 > 100000 throws the error buffer full condition reached and error raised, note that BufferedTokenizer.input (the accumulator) still contains the first 2 segments, it isn't cleared on exception
4 3 4686 a......a"}\n{"b": "b.....b"}\n 34816 + 4656 -> 39472 an array with 2 elements, {"a": "a...a"} and {"b": "b.....b"} Note that a valid event with a-s is generated as valid json, but it's luck. Then it generates the Message B

What happens here is that at iteration 3 the BufferedTokenizer throw the exception but doesn't clean the internal input accumulator which still contains the fragments from iteration 1 and 2. On iteration 4 it receives the segment that closes the message a , a separator and a valid message b.

What should be expected

  1. the event that's tagged with _jsonparsetoobigfailure contains 100.000 (the buffer limit) message a characters, and not just the content of actual data variable.
  2. after the exception the tokenizer accumulator is cleaned and no fragments of message a is forwarded to the decoder. So on "line too long" error the full line should be consumed, till the next delimiter
  3. that the message b is propertly emitted.

@jsvd
Copy link
Member

jsvd commented Sep 11, 2024

Agreed, on the observations.
When a new packet comes and can't fit the buffer it means the buffer is full with an incomplete message.
At that point here are two options, an easier one and a harder one.

Easier:

  1. Discard buffer
  2. Discard packet content up to a delimiter
  3. Write the rest of packet to the buffer
  4. Call extract

Harder:

  1. Copy buffer
  2. Append the packet content - up to delimiter - to the buffer copy
  3. Call extract with buffer copy
  4. Empty buffer
  5. Write rest of packet to the buffer
  6. Call extract

The harder solution provides a last resort attempt at consuming the bigger-than-payload message before discarding it. The easier version just drops it.
I'd suggest going with the "easier" solution.

… condition, now instead the exception is raised producing a properly tagged event
@andsel andsel requested a review from jsvd October 10, 2024 12:30
@andsel
Copy link
Contributor Author

andsel commented Oct 17, 2024

After the merge of the BufferredTokenizer fix, I think we could proceed with the review of this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make behaviour consistent when big oneliners JSON is provided
2 participants