diff --git a/lib/fluent/plugin/filter_parser.rb b/lib/fluent/plugin/filter_parser.rb
index e9c03fd944..52909d0171 100644
--- a/lib/fluent/plugin/filter_parser.rb
+++ b/lib/fluent/plugin/filter_parser.rb
@@ -79,6 +79,10 @@ def filter_with_time(tag, time, record)
                 end
             @accessor.delete(record) if @remove_key_name_field
             r = handle_parsed(tag, record, t, values)
+            # Note: https://github.com/fluent/fluentd/issues/4100
+            # If the parser returns multiple records from one raw_value,
+            # this returns only the first one record.
+            # This should be fixed in the future version.
             return t, r
           else
             if @emit_invalid_record_to_error
diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb
index 9f46caa8e8..6385a80a79 100644
--- a/lib/fluent/plugin/in_http.rb
+++ b/lib/fluent/plugin/in_http.rb
@@ -203,54 +203,24 @@ def on_request(path_info, params)
       begin
         path = path_info[1..-1]  # remove /
         tag = path.split('/').join('.')
-        record_time, record = parse_params(params)
 
-        # Skip nil record
-        if record.nil?
-          log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
-          if @respond_with_empty_img
-            return RESPONSE_IMG
-          else
-            if @use_204_response
-              return RESPONSE_204
-            else
-              return RESPONSE_200
-            end
+        mes = Fluent::MultiEventStream.new
+        parse_params(params) do |record_time, record|
+          if record.nil?
+            log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
+            next
           end
-        end
-
-        mes = nil
-        # Support batched requests
-        if record.is_a?(Array)
-          mes = Fluent::MultiEventStream.new
-          record.each do |single_record|
-            add_params_to_record(single_record, params)
-
-            if param_time = params['time']
-              param_time = param_time.to_f
-              single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
-            elsif @custom_parser
-              single_time = @custom_parser.parse_time(single_record)
-              single_time, single_record = @custom_parser.convert_values(single_time, single_record)
-            else
-              single_time = convert_time_field(single_record)
-            end
 
-            mes.add(single_time, single_record)
-          end
-        else
           add_params_to_record(record, params)
 
           time = if param_time = params['time']
                    param_time = param_time.to_f
                    param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
                  else
-                   if record_time.nil?
-                     convert_time_field(record)
-                   else
-                     record_time
-                   end
+                   record_time.nil? ? convert_time_field(record) : record_time
                  end
+
+          mes.add(time, record)
         end
       rescue => e
         if @dump_error_log
@@ -261,11 +231,7 @@ def on_request(path_info, params)
 
       # TODO server error
       begin
-        if mes
-          router.emit_stream(tag, mes)
-        else
-          router.emit(tag, time, record)
-        end
+        router.emit_stream(tag, mes) unless mes.empty?
       rescue => e
         if @dump_error_log
           log.error "failed to emit data", error: e
@@ -308,20 +274,18 @@ def on_server_connect(conn)
     def parse_params_default(params)
       if msgpack = params['msgpack']
         @parser_msgpack.parse(msgpack) do |_time, record|
-          return nil, record
+          yield nil, record
         end
       elsif js = params['json']
         @parser_json.parse(js) do |_time, record|
-          return nil, record
+          yield nil, record
         end
       elsif ndjson = params['ndjson']
-        events = []
         ndjson.split(/\r?\n/).each do |js|
           @parser_json.parse(js) do |_time, record|
-            events.push(record)
+            yield nil, record
           end
         end
-        return nil, events
       else
         raise "'json', 'ndjson' or 'msgpack' parameter is required"
       end
@@ -329,10 +293,9 @@ def parse_params_default(params)
 
     def parse_params_with_parser(params)
       if content = params[EVENT_RECORD_PARAMETER]
-        @custom_parser.parse(content) { |time, record|
-          raise "Received event is not #{@format_name}: #{content}" if record.nil?
-          return time, record
-        }
+        @custom_parser.parse(content) do |time, record|
+          yield time, record
+        end
       else
         raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
       end
diff --git a/lib/fluent/plugin/parser_json.rb b/lib/fluent/plugin/parser_json.rb
index 829aa4b72a..52dd6f5e8f 100644
--- a/lib/fluent/plugin/parser_json.rb
+++ b/lib/fluent/plugin/parser_json.rb
@@ -70,16 +70,33 @@ def configure_json_parser(name)
       end
 
       def parse(text)
-        record = @load_proc.call(text)
-        time = parse_time(record)
-        if @execute_convert_values
-          time, record = convert_values(time, record)
+        parsed_json = @load_proc.call(text)
+
+        if parsed_json.is_a?(Hash)
+          time, record = parse_one_record(parsed_json)
+          yield time, record
+        elsif parsed_json.is_a?(Array)
+          parsed_json.each do |record|
+            unless record.is_a?(Hash)
+              yield nil, nil
+              next
+            end
+            time, parsed_record = parse_one_record(record)
+            yield time, parsed_record
+          end
+        else
+          yield nil, nil
         end
-        yield time, record
+
       rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
         yield nil, nil
       end
 
+      def parse_one_record(record)
+        time = parse_time(record)
+        convert_values(time, record)
+      end
+
       def parser_type
         :text
       end
diff --git a/lib/fluent/plugin/parser_msgpack.rb b/lib/fluent/plugin/parser_msgpack.rb
index b35f04b9f6..4d29bc6b7e 100644
--- a/lib/fluent/plugin/parser_msgpack.rb
+++ b/lib/fluent/plugin/parser_msgpack.rb
@@ -31,9 +31,9 @@ def parser_type
         :binary
       end
 
-      def parse(data)
+      def parse(data, &block)
         @unpacker.feed_each(data) do |obj|
-          yield convert_values(parse_time(obj), obj)
+          parse_unpacked_data(obj, &block)
         end
       end
       alias parse_partial_data parse
@@ -41,8 +41,29 @@ def parse(data)
       def parse_io(io, &block)
         u = Fluent::MessagePackFactory.engine_factory.unpacker(io)
         u.each do |obj|
-          time, record = convert_values(parse_time(obj), obj)
+          parse_unpacked_data(obj, &block)
+        end
+      end
+
+      def parse_unpacked_data(data)
+        if data.is_a?(Hash)
+          time, record = convert_values(parse_time(data), data)
           yield time, record
+          return
+        end
+
+        unless data.is_a?(Array)
+          yield nil, nil
+          return
+        end
+
+        data.each do |record|
+          unless record.is_a?(Hash)
+            yield nil, nil
+            next
+          end
+          time, converted_record = convert_values(parse_time(record), record)
+          yield time, converted_record
         end
       end
     end
diff --git a/test/plugin/test_parser_json.rb b/test/plugin/test_parser_json.rb
index 19c45402d1..875611eb32 100644
--- a/test/plugin/test_parser_json.rb
+++ b/test/plugin/test_parser_json.rb
@@ -135,4 +135,110 @@ def test_yajl_parse_io_with_buffer_smaller_than_input
       end
     end
   end
+
+  sub_test_case "various record pattern" do
+    data("Only string", { record: '"message"', expected: [nil] }, keep: true)
+    data("Only string without quotation", { record: "message", expected: [nil] }, keep: true)
+    data("Only number", { record: "0", expected: [nil] }, keep: true)
+    data(
+      "Array of Hash",
+      {
+        record: '[{"k1": 1}, {"k2": 2}]',
+        expected: [{"k1" => 1}, {"k2" => 2}]
+      },
+      keep: true,
+    )
+    data(
+      "Array of both Hash and invalid",
+      {
+        record: '[{"k1": 1}, "string", {"k2": 2}, 0]',
+        expected: [{"k1" => 1}, nil, {"k2" => 2}, nil]
+      },
+      keep: true,
+    )
+    data(
+      "Array of all invalid",
+      {
+        record: '["string", 0, [{"k": 0}]]',
+        expected: [nil, nil, nil]
+      },
+      keep: true,
+    )
+
+    def test_oj(data)
+      parsed_records = []
+      @parser.configure("json_parser" => "oj")
+      @parser.instance.parse(data[:record]) { |time, record|
+        parsed_records.append(record)
+      }
+      assert_equal(data[:expected], parsed_records)
+    end
+
+    def test_yajl(data)
+      parsed_records = []
+      @parser.configure("json_parser" => "yajl")
+      @parser.instance.parse(data[:record]) { |time, record|
+        parsed_records.append(record)
+      }
+      assert_equal(data[:expected], parsed_records)
+    end
+
+    def test_json(json)
+      parsed_records = []
+      @parser.configure("json_parser" => "json")
+      @parser.instance.parse(data[:record]) { |time, record|
+        parsed_records.append(record)
+      }
+      assert_equal(data[:expected], parsed_records)
+    end
+  end
+
+  # This becomes NoMethodError if a non-Hash object is passed to convert_values.
+  # https://github.com/fluent/fluentd/issues/4100
+  sub_test_case "execute_convert_values with null_empty_string" do
+    data("Only string", { record: '"message"', expected: [nil] }, keep: true)
+    data(
+      "Hash",
+      {
+        record: '{"k1": 1, "k2": ""}',
+        expected: [{"k1" => 1, "k2" => nil}]
+      },
+      keep: true,
+    )
+    data(
+      "Array of Hash",
+      {
+        record: '[{"k1": 1}, {"k2": ""}]',
+        expected: [{"k1" => 1}, {"k2" => nil}]
+      },
+      keep: true,
+    )
+
+    def test_oj(data)
+      parsed_records = []
+      @parser.configure("json_parser" => "oj", "null_empty_string" => true)
+      @parser.instance.parse(data[:record]) { |time, record|
+        parsed_records.append(record)
+      }
+      assert_equal(data[:expected], parsed_records)
+    end
+
+    def test_yajl(data)
+      parsed_records = []
+      @parser.configure("json_parser" => "yajl", "null_empty_string" => true)
+      @parser.instance.parse(data[:record]) { |time, record|
+        parsed_records.append(record)
+      }
+      assert_equal(data[:expected], parsed_records)
+    end
+
+    def test_json(json)
+      parsed_records = []
+      @parser.configure("json_parser" => "json", "null_empty_string" => true)
+      @parser.instance.parse(data[:record]) { |time, record|
+        parsed_records.append(record)
+      }
+      assert_equal(data[:expected], parsed_records)
+    end
+  end
 end
diff --git a/test/plugin/test_parser_msgpack.rb b/test/plugin/test_parser_msgpack.rb
new file mode 100644
index 0000000000..eafea1538c
--- /dev/null
+++ b/test/plugin/test_parser_msgpack.rb
@@ -0,0 +1,127 @@
+require_relative '../helper'
+require 'fluent/test/driver/parser'
+require 'fluent/plugin/parser_msgpack'
+
+class MessagePackParserTest < ::Test::Unit::TestCase
+  def setup
+    Fluent::Test.setup
+  end
+
+  def create_driver(conf)
+    Fluent::Test::Driver::Parser.new(Fluent::Plugin::MessagePackParser).configure(conf)
+  end
+
+  sub_test_case "simple setting" do
+    data(
+      "Normal Hash",
+      {
+        input: "\x82\xA7message\xADHello msgpack\xA3numd",
+        expected: [{"message" => "Hello msgpack", "num" => 100}]
+      },
+      keep: true
+    )
+    data(
+      "Array of multiple Hash",
+      {
+        input: "\x92\x81\xA7message\xA3foo\x81\xA7message\xA3bar",
+        expected: [{"message"=>"foo"}, {"message"=>"bar"}]
+      },
+      keep: true
+    )
+    data(
+      "String",
+      {
+        # "Hello msgpack".to_msgpack
+        input: "\xADHello msgpack",
+        expected: [nil]
+      },
+      keep: true
+    )
+    data(
+      "Array of String",
+      {
+        # ["foo", "bar"].to_msgpack
+        input: "\x92\xA3foo\xA3bar",
+        expected: [nil, nil]
+      },
+      keep: true
+    )
+    data(
+      "Array of String and Hash",
+      {
+        # ["foo", {message: "bar"}].to_msgpack
+        input: "\x92\xA3foo\x81\xA7message\xA3bar",
+        expected: [nil, {"message"=>"bar"}]
+      },
+      keep: true
+    )
+
+    def test_parse(data)
+      parsed_records = []
+      create_driver("").instance.parse(data[:input]) do |time, record|
+        parsed_records.append(record)
+      end
+      assert_equal(data[:expected], parsed_records)
+    end
+
+    def test_parse_io(data)
+      parsed_records = []
+      StringIO.open(data[:input]) do |io|
+        create_driver("").instance.parse_io(io) do |time, record|
+          parsed_records.append(record)
+        end
+      end
+      assert_equal(data[:expected], parsed_records)
+    end
+  end
+
+  # This becomes NoMethodError if a non-Hash object is passed to convert_values.
+  # https://github.com/fluent/fluentd/issues/4100
+  sub_test_case "execute_convert_values with null_empty_string" do
+    data(
+      "Normal hash",
+      {
+        # {message: "foo", empty: ""}.to_msgpack
+        input: "\x82\xA7message\xA3foo\xA5empty\xA0",
+        expected: [{"message" => "foo", "empty" => nil}]
+      },
+      keep: true
+    )
+    data(
+      "Array of multiple Hash",
+      {
+        # [{message: "foo", empty: ""}, {message: "bar", empty: ""}].to_msgpack
+        input: "\x92\x82\xA7message\xA3foo\xA5empty\xA0\x82\xA7message\xA3bar\xA5empty\xA0",
+        expected: [{"message"=>"foo", "empty" => nil}, {"message"=>"bar", "empty" => nil}]
+      },
+      keep: true
+    )
+    data(
+      "String",
+      {
+        # "Hello msgpack".to_msgpack
+        input: "\xADHello msgpack",
+        expected: [nil]
+      },
+      keep: true
+    )
+
+    def test_parse(data)
+      parsed_records = []
+      create_driver("null_empty_string").instance.parse(data[:input]) do |time, record|
+        parsed_records.append(record)
+      end
+      assert_equal(data[:expected], parsed_records)
+    end
+
+    def test_parse_io(data)
+      parsed_records = []
+      StringIO.open(data[:input]) do |io|
+        create_driver("null_empty_string").instance.parse_io(io) do |time, record|
+          parsed_records.append(record)
+        end
+      end
+      assert_equal(data[:expected], parsed_records)
+    end
+  end
+end
\ No newline at end of file