Skip to content

Commit

Permalink
Merge pull request #43 from kxs-who/interruptableRetries
Browse files Browse the repository at this point in the history
The plugin now supports interruption.
  • Loading branch information
StephenWakely authored Jun 7, 2024
2 parents 894c28b + b4d2759 commit 8c1af6e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.5.2
- Now checks if logstash is being shutdown

## 0.5.1
- Support using HTTP proxies, adding the `http_proxy` parameter.

Expand Down
47 changes: 41 additions & 6 deletions lib/logstash/outputs/datadog_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ def multi_receive(events)
end
end
rescue => e
@logger.error("Uncaught processing exception in datadog forwarder #{e.message}")
if e.is_a?(InterruptedError)
raise e
else
@logger.error("Uncaught processing exception in datadog forwarder #{e.message}")
end
end
end

Expand Down Expand Up @@ -148,7 +152,7 @@ def gzip_compress(payload, compression_level)
# Build a new transport client
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy)
if use_http
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy, -> { defined?(pipeline_shutdown_requested?) ? pipeline_shutdown_requested? : false }
else
DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port
end
Expand All @@ -157,6 +161,9 @@ def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port
class RetryableError < StandardError;
end

class InterruptedError < StandardError;
end

class DatadogClient
def send_retries(payload, max_retries, max_backoff)
backoff = 1
Expand All @@ -166,17 +173,36 @@ def send_retries(payload, max_retries, max_backoff)
rescue RetryableError => e
if retries < max_retries || max_retries < 0
@logger.warn("Retrying send due to: #{e.message}")
sleep backoff
interruptableSleep(backoff)
backoff = 2 * backoff unless backoff > max_backoff
retries += 1
retry
end
@logger.error("Max number of retries reached, dropping message. Last exception: #{ex.message}")
@logger.error("Max number of retries reached, dropping message. Last exception: #{e.message}")
rescue => ex
@logger.error("Unmanaged exception while sending log to datadog #{ex.message}")
if ex.is_a?(InterruptedError)
raise ex
else
@logger.error("Unmanaged exception while sending log to datadog #{ex.message}")
end
end
end

def interruptableSleep(duration)
amountSlept = 0
while amountSlept < duration
sleep 1
amountSlept += 1
if interrupted?
raise InterruptedError.new "Interrupted while backing off"
end
end
end

def interrupted?
false
end

def send(payload)
raise NotImplementedError, "Datadog transport client should implement the send method"
end
Expand All @@ -196,7 +222,8 @@ class DatadogHTTPClient < DatadogClient
::Manticore::ResolutionFailure
]

def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy)
def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy, interruptedLambda = nil)
@interruptedLambda = interruptedLambda
@logger = logger
protocol = use_ssl ? "https" : "http"

Expand Down Expand Up @@ -224,6 +251,14 @@ def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression,
@client = Manticore::Client.new(config)
end

def interrupted?
if @interruptedLambda
return @interruptedLambda.call
end

false
end

def send(payload)
begin
response = @client.post(@url, :body => payload, :headers => @headers).call
Expand Down
4 changes: 2 additions & 2 deletions lib/logstash/outputs/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module DatadogLogStashPlugin
VERSION = '0.5.1'
end
VERSION = '0.5.2'
end

0 comments on commit 8c1af6e

Please sign in to comment.