Skip to content

Commit

Permalink
Add max buffer size, run reports in a task, continue to chunk batch s…
Browse files Browse the repository at this point in the history
…izes
  • Loading branch information
keathley committed Jan 25, 2021
1 parent f6aee2d commit 689c1a9
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 37 deletions.
12 changes: 9 additions & 3 deletions lib/spandex_datadog/api_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@ defmodule SpandexDatadog.ApiServer do
http: :atom,
batch_size: :integer,
sync_threshold: :integer,
max_buffer_size: :integer,
api_adapter: :atom
],
defaults: [
host: "localhost",
port: 8126,
verbose?: false,
batch_size: 10,
batch_size: 50,
sync_threshold: 20,
max_buffer_size: 5_000,
api_adapter: SpandexDatadog.ApiServer
],
required: [:http],
describe: [
verbose?: "Only to be used for debugging: All finished traces will be logged",
host: "The host the agent can be reached at",
port: "The port to use when sending traces to the agent",
max_buffer_size: "The maximum number of traces that will be buffered.",
batch_size: "The number of traces that should be sent in a single batch",
sync_threshold:
"The maximum number of processes that may be sending traces at any one time. This adds backpressure",
Expand All @@ -60,14 +63,17 @@ defmodule SpandexDatadog.ApiServer do
end

def init(opts) do
buffer = Buffer.new()
task_sup = __MODULE__.TaskSupervisor
buffer = Buffer.new(opts)
reporter_opts =
opts
|> Map.new()
|> Map.take([:http, :verbose?, :host, :port])
|> Map.take([:http, :verbose?, :host, :port, :batch_size])
|> Map.put(:buffer, buffer)
|> Map.put(:task_sup, task_sup)

children = [
{Task.Supervisor, name: task_sup},
{Reporter, reporter_opts},
]

Expand Down
63 changes: 43 additions & 20 deletions lib/spandex_datadog/api_server/buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,57 @@ defmodule SpandexDatadog.ApiServer.Buffer do
# id. This helps reduce contention on each ets table since, in theory, only one process should be writing to a table at a time.
# We periodically flush all spans to datadog in the background.

defstruct tabs: []
defstruct tabs: [], counters: nil

@config_key {__MODULE__, :config}

# Builds a bunch of ets tables, 1 per scheduler and returns them in a struct
def new() do
def new(opts) do
schedulers = System.schedulers()
counters = :atomics.new(schedulers, [signed: false])
config = %{
max_buffer_size: opts[:max_buffer_size] || 5_000,
counters: counters,
}
:persistent_term.put(@config_key, config)

# 1 index erlang ftw
tabs = for s <- 1..System.schedulers() do
:ets.new(:"#{__MODULE__}-#{s}", [:named_table, :set, :public, {:write_concurrency, true}])
tabs = for s <- 1..schedulers do
:ets.new(tab_name(s), [:named_table, :set, :public, {:write_concurrency, true}])
end

%__MODULE__{tabs: tabs}
%__MODULE__{tabs: tabs, counters: counters}
end

def add_trace(trace) do
buffer = :"#{__MODULE__}-#{:erlang.system_info(:scheduler_id)}"
index = :ets.update_counter(buffer, :index, 1, {:index, 0})
:ets.insert(buffer, {index, trace})
config = :persistent_term.get(@config_key)
id = :erlang.system_info(:scheduler_id)
buffer = :"#{__MODULE__}-#{id}"
index = :atomics.add_get(config.counters, id, 1)

# If we're at the buffer size we drop the new trace on the ground.
# TODO - This should really be first in last out since we care more about
# the current data than about the old data.
if index > config.max_buffer_size do
# Remove the increment that we just made.
:atomics.sub(config.counters, id, 1)
else
:ets.insert(buffer, {index, trace})
end
end

# Returns the latest messages and then deletes them from the buffer
def flush_latest(buffer, f) do
Enum.flat_map(buffer.tabs, fn tab ->
case :ets.lookup(tab, :index) do
[{:index, index}] ->
records = :ets.select(tab, select_spec(index))
f.(records)
:ets.select_delete(tab, delete_spec(index))
records

[] ->
[]
end
end)
for s <- 1..System.schedulers() do
# Get current latest index for this table and reset the count to 0
index = :atomics.exchange(buffer.counters, s, 0)
# Its possible that we interleave with a different process that is adding
# additional traces at this point. That means that we're going to possibly
# allow the caller to overwrite old data (since the index is reset).
# This is OK for our purposes.
records = :ets.select(tab_name(s), select_spec(index))
f.(records)
end
end

defp delete_spec(index) do
Expand All @@ -53,4 +72,8 @@ defmodule SpandexDatadog.ApiServer.Buffer do
# Get integers less than the current index
[{{:"$1", :"$2"}, [{:andalso, {:is_integer, :"$1"}, {:"=<", :"$1", index}}], [item]}]
end

defp tab_name(index) do
:"#{__MODULE__}-#{index}"
end
end
40 changes: 26 additions & 14 deletions lib/spandex_datadog/api_server/reporter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ defmodule SpandexDatadog.ApiServer.Reporter do
def init(opts) do
host = opts[:host]
port = opts[:port]
state = %{
buffer: opts[:buffer],
collector_url: "#{host}:#{port}/v0.3/traces",
verbose?: opts[:verbose?],
http: opts[:http],
flush_period: opts[:flush_period] || 1_000,
}
collector_url = "#{host}:#{port}/v0.3/traces"
state =
opts
|> update_in([:flush_period], & &1 || 1_000)
|> put_in([:collector_url], collector_url)

schedule(state.flush_period)

Expand All @@ -49,7 +47,10 @@ defmodule SpandexDatadog.ApiServer.Reporter do

# Only used for development and testing purposes
def handle_call(:flush, _, state) do
flush(state)
# this little dance is hella weird, but it ensures that we can call this
# with backpressure in tests and we don't need to duplicate code in lots of
# places.
handle_info(:flush, state)
{:reply, :ok, state}
end

Expand All @@ -62,20 +63,31 @@ defmodule SpandexDatadog.ApiServer.Reporter do
end

def handle_info(:flush, state) do
flush(state)
# Run this function in a task to avoid bloating this processes binary memory
# and generally optimize GC. We're not really protecting ourselves from failure
# here because if the task exits, we're going to exit as well. But that's OK
# and is probably what we want.
state.task_sup
|> Task.Supervisor.async(fn -> flush(state) end)
|> Task.await()

schedule(state.flush_period)

{:noreply, state}
end

defp flush(state) do
:telemetry.span([:spandex_datadog, :client, :flush], %{}, fn ->
Buffer.flush_latest(state.buffer, fn buffer ->
if buffer == [] do
Buffer.flush_latest(state.buffer, fn
[] ->
:ok
else
Client.send(state.http, state.collector_url, buffer, verbose?: state.verbose?)
end

buffer ->
buffer
|> Enum.chunk_every(state.batch_size)
|> Enum.each(fn batch ->
Client.send(state.http, state.collector_url, batch, verbose?: state.verbose?)
end)
end)

{:ok, %{}}
Expand Down

0 comments on commit 689c1a9

Please sign in to comment.