diff --git a/lib/logstash/codecs/fluent.rb b/lib/logstash/codecs/fluent.rb index 92175f3..73b5ccd 100644 --- a/lib/logstash/codecs/fluent.rb +++ b/lib/logstash/codecs/fluent.rb @@ -53,6 +53,10 @@ def encode(event) private + def merge_values(source, target) + source.merge!(target){|key, old_val, new_val| [new_val, old_val]} + end + def decode_event(data, &block) tag = data[0] entries = data[1] @@ -70,10 +74,10 @@ def decode_event(data, &block) entries_decoder.feed_each(entries) do |entry| epochtime = entry[0] map = entry[1] - event = LogStash::Event.new(map.merge( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => [ tag ] - )) + event = LogStash::Event.new(merge_values(map, { + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => tag + })) yield event end when Array @@ -81,20 +85,20 @@ def decode_event(data, &block) entries.each do |entry| epochtime = entry[0] map = entry[1] - event = LogStash::Event.new(map.merge( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => [ tag ] - )) + event = LogStash::Event.new(merge_values(map, { + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => tag + })) yield event end when Fixnum # Message epochtime = entries map = data[2] - event = LogStash::Event.new(map.merge( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => [ tag ] - )) + event = LogStash::Event.new(merge_values(map, { + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => tag + })) yield event else raise(LogStash::Error, "Unknown event type") diff --git a/spec/codecs/fluent_spec.rb b/spec/codecs/fluent_spec.rb index c85cd1f..6d0a412 100644 --- a/spec/codecs/fluent_spec.rb +++ b/spec/codecs/fluent_spec.rb @@ -38,6 +38,7 @@ it "should decode without errors" do subject.decode(message) do |event| expect(event.get("name")).to eq("foo") + expect(event.get("tags")).to eq(tag) end end @@ -63,6 +64,7 @@ subject.decode(message) do |event| expect(event.get("name")).to eq("foo") + expect(event.get("tags")).to eq(tag) count += 1 end