diff --git a/lib/fluent/plugin/out_cloudwatch_logs.rb b/lib/fluent/plugin/out_cloudwatch_logs.rb index ce62913..c103d60 100644 --- a/lib/fluent/plugin/out_cloudwatch_logs.rb +++ b/lib/fluent/plugin/out_cloudwatch_logs.rb @@ -162,9 +162,8 @@ def start end options[:http_proxy] = @http_proxy if @http_proxy @logs ||= Aws::CloudWatchLogs::Client.new(options) - @sequence_tokens = {} - @store_next_sequence_token_mutex = Mutex.new - + @log_groups = {} + log.debug "Aws::CloudWatchLogs::Client initialized: log.level #{log.level} => #{options[:log_level]}" @json_handler = case @json_handler @@ -356,20 +355,6 @@ def scrub_record!(record) end end - def delete_sequence_token(group_name, stream_name) - @sequence_tokens[group_name].delete(stream_name) - end - - def next_sequence_token(group_name, stream_name) - @sequence_tokens[group_name][stream_name] - end - - def store_next_sequence_token(group_name, stream_name, token) - @store_next_sequence_token_mutex.synchronize do - @sequence_tokens[group_name][stream_name] = token - end - end - def put_events_by_chunk(group_name, stream_name, events) chunk = [] @@ -413,9 +398,6 @@ def put_events(group_name, stream_name, events, events_bytesize) log_stream_name: stream_name, } - token = next_sequence_token(group_name, stream_name) - args[:sequence_token] = token if token - begin t = Time.now response = @logs.put_log_events(args) @@ -424,7 +406,6 @@ def put_events(group_name, stream_name, events, events_bytesize) "stream" => stream_name, "events_count" => events.size, "events_bytesize" => events_bytesize, - "sequence_token" => token, "thread" => Thread.current.object_id, "request_sec" => Time.now - t, } @@ -434,16 +415,6 @@ def put_events(group_name, stream_name, events, events_bytesize) else log.debug "Called PutLogEvents API", request end - rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException, Aws::CloudWatchLogs::Errors::DataAlreadyAcceptedException => err - sleep 1 # to avoid too many API calls - store_next_sequence_token(group_name, stream_name, err.expected_sequence_token) - log.warn "updating upload sequence token forcefully because unrecoverable error occured", { - "error" => err, - "log_group" => group_name, - "log_stream" => stream_name, - "new_sequence_token" => token, - } - retry_count += 1 rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => err if @auto_create_stream && err.message == 'The specified log stream does not exist.' log.warn 'Creating log stream because "The specified log stream does not exist." error is got', { @@ -452,7 +423,6 @@ def put_events(group_name, stream_name, events, events_bytesize) "log_stream" => stream_name, } create_log_stream(group_name, stream_name) - delete_sequence_token(group_name, stream_name) retry_count += 1 else raise err @@ -487,8 +457,6 @@ def put_events(group_name, stream_name, events, events_bytesize) if 0 < retry_count log.warn "retry succeeded" end - - store_next_sequence_token(group_name, stream_name, response.next_sequence_token) end def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = nil) @@ -497,7 +465,6 @@ def create_log_group(group_name, log_group_aws_tags = nil, retention_in_days = n unless retention_in_days.nil? put_retention_policy(group_name, retention_in_days) end - @sequence_tokens[group_name] = {} rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException log.debug "Log group '#{group_name}' already exists" end @@ -517,18 +484,16 @@ def put_retention_policy(group_name, retention_in_days) def create_log_stream(group_name, stream_name) begin @logs.create_log_stream(log_group_name: group_name, log_stream_name: stream_name) - @sequence_tokens[group_name] ||= {} - @sequence_tokens[group_name][stream_name] = nil rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException log.debug "Log stream '#{stream_name}' already exists" end end def log_group_exists?(group_name) - if @sequence_tokens[group_name] + if @log_groups[group_name] true elsif check_log_group_existence(group_name) - @sequence_tokens[group_name] = {} + @log_groups[group_name] = [] true else false @@ -547,12 +512,12 @@ def check_log_group_existence(group_name) end def log_stream_exists?(group_name, stream_name) - if not @sequence_tokens[group_name] + if not @log_groups[group_name] false - elsif @sequence_tokens[group_name].has_key?(stream_name) + elsif @log_groups[group_name].include?(stream_name) true elsif (log_stream = find_log_stream(group_name, stream_name)) - @sequence_tokens[group_name][stream_name] = log_stream.upload_sequence_token + @log_groups[group_name].push(stream_name) true else false