Skip to content

Commit

Permalink
Add retry logic to bulk indexing requests
Browse files Browse the repository at this point in the history
  • Loading branch information
bmquinn committed Aug 2, 2024
1 parent 050bb10 commit 3404040
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions app/lib/meadow/search/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ defmodule Meadow.Search.Bulk do
alias Meadow.Search.Config, as: SearchConfig
alias Meadow.Search.HTTP

require Logger

@retries 3

def delete(ids, index) do
ids
|> Stream.map(&Jason.encode!(%{delete: %{_id: &1}}))
Expand All @@ -27,15 +31,50 @@ defmodule Meadow.Search.Bulk do
bulk_documents
|> Stream.chunk_every(SearchConfig.bulk_page_size())
|> Stream.intersperse(SearchConfig.bulk_wait_interval())
|> Stream.each(&upload_batch(&1, index))
|> Stream.run()
end
|> Enum.reduce_while(:ok, fn
wait_interval, _ when is_integer(wait_interval) ->
:timer.sleep(wait_interval)
{:cont, :ok}

defp upload_batch(wait_interval, _) when is_integer(wait_interval),
do: :timer.sleep(wait_interval)
batch, _ ->
case upload_batch(batch, index) do
:ok -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason}}
end
end)
end

defp upload_batch(docs, index) do
bulk_document = docs |> Enum.join("\n")
bulk_document = Enum.join(docs, "\n")

case with_retry(&upload_bulk_document(&1, index, bulk_document)) do
{:ok, _response} ->
:ok

{:error, reason} ->
Logger.error("Bulk upload failed after #{@retries} retries. Error: #{inspect(reason)}")
{:error, reason}
end
end

defp upload_bulk_document(_, index, bulk_document) do
HTTP.post("/#{index}/_bulk", bulk_document <> "\n")
end

defp with_retry(func, remaining_tries \\ @retries, last_response \\ nil)

defp with_retry(_, 0, last_response), do: last_response

defp with_retry(func, remaining_tries, _) do
with response <- func.() do
case response do
{:error, _} ->
Logger.warn("Bulk upload failed. Retrying. Attempts left: #{remaining_tries - 1}")
with_retry(func, remaining_tries - 1, response)

other ->
other
end
end
end
end

0 comments on commit 3404040

Please sign in to comment.