From b1609f741e90941abcd46d030ccbb6397afce9b2 Mon Sep 17 00:00:00 2001 From: RTLS Date: Mon, 23 Dec 2024 11:51:33 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A8=20Resolve=20warnings=20from=201.18?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/sequin/aws/sqs.ex | 27 +- lib/sequin/bench/end_to_end.ex | 541 +++++++++--------- lib/sequin/check_system_health.ex | 11 +- .../acknowledged_messages.ex | 17 +- lib/sequin/metrics/store.ex | 41 +- lib/sequin/redis.ex | 16 +- test/sequin/metrics/store_test.exs | 4 +- 7 files changed, 318 insertions(+), 339 deletions(-) diff --git a/lib/sequin/aws/sqs.ex b/lib/sequin/aws/sqs.ex index b13e68dd5..683b3e490 100644 --- a/lib/sequin/aws/sqs.ex +++ b/lib/sequin/aws/sqs.ex @@ -57,8 +57,15 @@ defmodule Sequin.Aws.SQS do {:ok, %{"QueueUrl" => queue_url}, _body} -> {:ok, queue_url} - err -> - {:error, Error.service(service: :aws_sqs, message: "Failed to get queue URL", details: err)} + {:error, {:unexpected_response, %{body: body}}} -> + if is_binary(body) and String.contains?(body, "The specified queue does not exist") do + {:error, Error.not_found(entity: :sqs_queue)} + else + {:error, Error.service(service: :aws_sqs, message: "Failed to get queue URL", details: inspect(body))} + end + + {:error, error} -> + {:error, Error.service(service: :aws_sqs, message: "Failed to get queue URL", details: error)} end end @@ -175,18 +182,16 @@ defmodule Sequin.Aws.SQS do @spec delete_queue(Client.t(), String.t(), String.t()) :: :ok | {:error, Error.t()} def delete_queue(%Client{} = client, account_id, queue_name) do case get_queue_url(client, account_id, queue_name) do - {:error, {:unexpected_response, %{body: body, status_code: 400}}} -> - if String.contains?(body, "The specified queue does not exist") do - {:error, Error.not_found(entity: :sqs_queue)} - else - {:error, Error.service(service: :aws_sqs, message: "Failed to delete queue", code: "400")} - end - {:ok, queue_url} -> - with {:ok, _, %{status_code: 200}} <- - AWS.SQS.delete_queue(client, %{"QueueUrl" => queue_url}) do + with {:ok, _, %{status_code: 200}} <- AWS.SQS.delete_queue(client, %{"QueueUrl" => queue_url}) do :ok end + + {:error, %Error.NotFoundError{} = error} -> + {:error, error} + + {:error, error} -> + {:error, Error.service(service: :aws_sqs, message: "Failed to delete queue", details: error)} end end diff --git a/lib/sequin/bench/end_to_end.ex b/lib/sequin/bench/end_to_end.ex index 94b6ae8a9..2658a046c 100644 --- a/lib/sequin/bench/end_to_end.ex +++ b/lib/sequin/bench/end_to_end.ex @@ -1,272 +1,275 @@ defmodule Sequin.Bench.EndToEnd do @moduledoc false - import Ecto.Query - - alias Sequin.Accounts - alias Sequin.Bench.Utils - alias Sequin.Consumers - alias Sequin.Repo - alias Sequin.Streams - - require Logger - - def run(opts \\ []) do - current_message_count = - Repo.one(from(m in Streams.Message, select: count(m.key)), timeout: :timer.minutes(1)) - - {account, opts} = - Keyword.pop_lazy(opts, :account, fn -> - List.first(Accounts.list_accounts()) - end) - - {stream, opts} = - Keyword.pop_lazy(opts, :stream, fn -> - account.id |> Streams.list_streams_for_account() |> List.first() - end) - - # stream.id - # |> Consumers.list_consumers_for_stream() - # |> Enum.each(&Consumers.delete_consumer_with_lifecycle/1) - - consumers = - Enum.map(1..10, fn _ -> - {:ok, consumer} = - Consumers.create_sink_consumer_for_account_with_lifecycle(account.id, %{ - stream_id: stream.id - }) - - consumer - end) - - consumer = List.first(consumers) - - default_opts = [ - max_time_s: 10, - warmup_time_s: 1, - parallel: [10, 50], - inputs: [ - {"batch_1", [1]}, - {"batch_10", [10]}, - {"batch_100", [100]} - ] - ] - - Sequin.Bench.run( - [ - {"e2e_0M_upsert", fn batch_size -> upsert_messages(stream.id, batch_size) end}, - {"e2e_1M_receive", fn batch_size -> receive_and_ack(consumer, batch_size) end, - before: fn _input -> - if current_message_count < 1_000_000 do - populate_messages(stream.id, 1_000_000) - else - Logger.info("Skipping, db already populated") - end - end}, - {"e2e_10M_receive", fn batch_size -> receive_and_ack(consumer, batch_size) end, - before: fn _input -> - if current_message_count < 8_000_000 do - populate_messages(stream.id, 10_000_000) - else - Logger.info("Skipping, db already populated") - end - end}, - {"e2e_10M_10C_receive", - fn batch_size -> - consumer = Enum.random(consumers) - receive_and_ack(consumer, batch_size) - end} - ], - Keyword.merge(default_opts, opts) - ) - end - - def run_throughput(opts \\ []) do - current_message_count = - Repo.one(from(m in Streams.Message, select: count(m.key)), timeout: :timer.minutes(1)) - - {account, opts} = - Keyword.pop_lazy(opts, :account, fn -> - List.first(Accounts.list_accounts()) - end) - - {stream, opts} = - Keyword.pop_lazy(opts, :stream, fn -> - account.id |> Streams.list_streams_for_account() |> List.first() - end) - - # stream.id - # |> Consumers.list_consumers_for_stream() - # |> Enum.each(&Consumers.delete_consumer_with_lifecycle/1) - - Repo.delete_all(from(m in Streams.Message, where: is_nil(m.seq))) - Repo.delete_all(from(m in Streams.ConsumerMessage)) - - consumers = - Enum.map(1..10, fn _ -> - {:ok, consumer} = - Consumers.create_sink_consumer_for_account_with_lifecycle(account.id, %{ - stream_id: stream.id - }) - - consumer - end) - - consumer = List.first(consumers) - - # Keep pressure on the system - this produces ~500 inserts/sec - # Task.Supervisor.async(Sequin.TaskSupervisor, fn -> - # # Create a stream that emits 10 items every second - # interval_stream = - # Stream.resource( - # fn -> 0 end, - # fn counter -> - # # 200ms interval for 5 ops/second - # Process.sleep(200) - # items = Enum.to_list(counter..(counter + 9)) - # {items, counter + 10} - # end, - # fn _ -> :ok end - # ) - - # interval_stream - # |> Flow.from_enumerable(max_demand: 10, stages: 10) - # |> Flow.partition(stages: 10, max_demand: 1) - # |> Flow.map(fn _ -> upsert_messages(stream.id, 100) end) - # |> Flow.run() - # end) - - default_opts = [ - max_time_s: 30, - warmup_time_s: 5, - parallel: [1, 10, 50, 100], - # parallel: [10, 50, 100], - inputs: [ - {"batch_1", [1]}, - {"batch_10", [10]}, - {"batch_100", [100]} - ] - ] - - {:ok, key_agent} = Agent.start_link(fn -> %{} end) - - Sequin.Bench.run( - [ - {"e2e_10M_throughput", - fn batch_size -> - upsert_and_await(stream.id, consumer, batch_size, key_agent) - end, - before: fn _input -> - if current_message_count < 8_000_000 do - # populate_messages(stream.id, 10_000_000) - else - Logger.info("Skipping, db already populated") - end - end}, - {"e2e_10M_throughput_10C", - fn batch_size -> - # Have this pid use a deterministic consumer - consumer = Enum.at(consumers, :erlang.phash2(self(), 10)) - - upsert_and_await(stream.id, consumer, batch_size, key_agent) - end, - before: fn _input -> - if current_message_count < 8_000_000 do - # populate_messages(stream.id, 10_000_000) - else - Logger.info("Skipping, db already populated") - end - end} - ], - Keyword.merge(default_opts, opts) - ) - - GenServer.stop(key_agent) - end - - def upsert_and_await(stream_id, consumer, batch_size, key_agent) do - messages = messages(stream_id, batch_size) - message = List.first(messages) - key = message.key - Agent.update(key_agent, fn keys -> Map.put(keys, key, true) end) - - upsert_messages(stream_id, messages) - - 1..1 - |> Stream.cycle() - |> Enum.reduce_while(nil, fn _, _ -> - messages = receive_and_ack(consumer, 100) - keys = Enum.map(messages, & &1.key) - Agent.update(key_agent, fn k -> Map.drop(k, keys) end) - - if Agent.get(key_agent, fn k -> Map.get(k, key) end) do - {:cont, :cont} - else - {:halt, :ok} - end - end) - end - - def upsert_messages(stream_id, messages) when is_list(messages) do - Streams.upsert_messages(stream_id, messages) - messages - end - - def upsert_messages(stream_id, batch_size) do - messages = - Enum.map(1..batch_size, fn _ -> - %{ - key: key(), - stream_id: stream_id, - data: Utils.rand_string() - } - end) - - upsert_messages(stream_id, messages) - end - - defp messages(stream_id, batch_size) do - Enum.map(1..batch_size, fn _ -> - %{ - key: key(), - stream_id: stream_id, - data: Utils.rand_string() - } - end) - end - - def receive_and_ack(consumer, batch_size) do - {:ok, messages} = Consumers.receive_for_consumer(consumer, batch_size: batch_size) - Streams.ack_messages(consumer.id, Enum.map(messages, & &1.ack_id)) - messages - end - - def key do - # Should be low enough entropy for high collisions - 30 |> :crypto.strong_rand_bytes() |> Base.encode64() - end - - defp populate_messages(stream_id, count) do - Logger.info("Seeding database with #{count} messages") - batch_size = 100_000 - num_batches = div(count, batch_size) - - Enum.each(1..num_batches, fn _ -> - query = """ - INSERT INTO #{Streams.stream_schema()}.messages (key, stream_id, data, data_hash, inserted_at, updated_at) - SELECT - encode(decode(substr(md5(random()::text || i::text), 1, 8), 'hex'), 'base64'), - $1, - substr(md5(random()::text), 1, 20), - substr(md5(random()::text), 1, 20), - now(), - now() - FROM generate_series(1, $2) i - ON CONFLICT (stream_id, key) DO NOTHING - """ - - Repo.query!(query, [Sequin.String.string_to_binary!(stream_id), batch_size], timeout: :timer.minutes(2)) - end) - - Logger.info("Seeded database with #{count} messages") - end + # import Ecto.Query + + # alias Sequin.Accounts + # alias Sequin.Bench.Utils + # alias Sequin.Consumers + # alias Sequin.Repo + # alias Sequin.Streams + + # require Logger + + def run(_opts \\ []), do: :ok + def run_throughput(_opts \\ []), do: :ok + + # def run(opts \\ []) do + # current_message_count = + # Repo.one(from(m in Streams.Message, select: count(m.key)), timeout: :timer.minutes(1)) + + # {account, opts} = + # Keyword.pop_lazy(opts, :account, fn -> + # List.first(Accounts.list_accounts()) + # end) + + # {stream, opts} = + # Keyword.pop_lazy(opts, :stream, fn -> + # account.id |> Streams.list_streams_for_account() |> List.first() + # end) + + # # stream.id + # # |> Consumers.list_consumers_for_stream() + # # |> Enum.each(&Consumers.delete_consumer_with_lifecycle/1) + + # consumers = + # Enum.map(1..10, fn _ -> + # {:ok, consumer} = + # Consumers.create_sink_consumer_for_account_with_lifecycle(account.id, %{ + # stream_id: stream.id + # }) + + # consumer + # end) + + # consumer = List.first(consumers) + + # default_opts = [ + # max_time_s: 10, + # warmup_time_s: 1, + # parallel: [10, 50], + # inputs: [ + # {"batch_1", [1]}, + # {"batch_10", [10]}, + # {"batch_100", [100]} + # ] + # ] + + # Sequin.Bench.run( + # [ + # {"e2e_0M_upsert", fn batch_size -> upsert_messages(stream.id, batch_size) end}, + # {"e2e_1M_receive", fn batch_size -> receive_and_ack(consumer, batch_size) end, + # before: fn _input -> + # if current_message_count < 1_000_000 do + # populate_messages(stream.id, 1_000_000) + # else + # Logger.info("Skipping, db already populated") + # end + # end}, + # {"e2e_10M_receive", fn batch_size -> receive_and_ack(consumer, batch_size) end, + # before: fn _input -> + # if current_message_count < 8_000_000 do + # populate_messages(stream.id, 10_000_000) + # else + # Logger.info("Skipping, db already populated") + # end + # end}, + # {"e2e_10M_10C_receive", + # fn batch_size -> + # consumer = Enum.random(consumers) + # receive_and_ack(consumer, batch_size) + # end} + # ], + # Keyword.merge(default_opts, opts) + # ) + # end + + # def run_throughput(opts \\ []) do + # current_message_count = + # Repo.one(from(m in Streams.Message, select: count(m.key)), timeout: :timer.minutes(1)) + + # {account, opts} = + # Keyword.pop_lazy(opts, :account, fn -> + # List.first(Accounts.list_accounts()) + # end) + + # {stream, opts} = + # Keyword.pop_lazy(opts, :stream, fn -> + # account.id |> Streams.list_streams_for_account() |> List.first() + # end) + + # # stream.id + # # |> Consumers.list_consumers_for_stream() + # # |> Enum.each(&Consumers.delete_consumer_with_lifecycle/1) + + # Repo.delete_all(from(m in Streams.Message, where: is_nil(m.seq))) + # Repo.delete_all(from(m in Streams.ConsumerMessage)) + + # consumers = + # Enum.map(1..10, fn _ -> + # {:ok, consumer} = + # Consumers.create_sink_consumer_for_account_with_lifecycle(account.id, %{ + # stream_id: stream.id + # }) + + # consumer + # end) + + # consumer = List.first(consumers) + + # # Keep pressure on the system - this produces ~500 inserts/sec + # # Task.Supervisor.async(Sequin.TaskSupervisor, fn -> + # # # Create a stream that emits 10 items every second + # # interval_stream = + # # Stream.resource( + # # fn -> 0 end, + # # fn counter -> + # # # 200ms interval for 5 ops/second + # # Process.sleep(200) + # # items = Enum.to_list(counter..(counter + 9)) + # # {items, counter + 10} + # # end, + # # fn _ -> :ok end + # # ) + + # # interval_stream + # # |> Flow.from_enumerable(max_demand: 10, stages: 10) + # # |> Flow.partition(stages: 10, max_demand: 1) + # # |> Flow.map(fn _ -> upsert_messages(stream.id, 100) end) + # # |> Flow.run() + # # end) + + # default_opts = [ + # max_time_s: 30, + # warmup_time_s: 5, + # parallel: [1, 10, 50, 100], + # # parallel: [10, 50, 100], + # inputs: [ + # {"batch_1", [1]}, + # {"batch_10", [10]}, + # {"batch_100", [100]} + # ] + # ] + + # {:ok, key_agent} = Agent.start_link(fn -> %{} end) + + # Sequin.Bench.run( + # [ + # {"e2e_10M_throughput", + # fn batch_size -> + # upsert_and_await(stream.id, consumer, batch_size, key_agent) + # end, + # before: fn _input -> + # if current_message_count < 8_000_000 do + # # populate_messages(stream.id, 10_000_000) + # else + # Logger.info("Skipping, db already populated") + # end + # end}, + # {"e2e_10M_throughput_10C", + # fn batch_size -> + # # Have this pid use a deterministic consumer + # consumer = Enum.at(consumers, :erlang.phash2(self(), 10)) + + # upsert_and_await(stream.id, consumer, batch_size, key_agent) + # end, + # before: fn _input -> + # if current_message_count < 8_000_000 do + # # populate_messages(stream.id, 10_000_000) + # else + # Logger.info("Skipping, db already populated") + # end + # end} + # ], + # Keyword.merge(default_opts, opts) + # ) + + # GenServer.stop(key_agent) + # end + + # def upsert_and_await(stream_id, consumer, batch_size, key_agent) do + # messages = messages(stream_id, batch_size) + # message = List.first(messages) + # key = message.key + # Agent.update(key_agent, fn keys -> Map.put(keys, key, true) end) + + # upsert_messages(stream_id, messages) + + # 1..1 + # |> Stream.cycle() + # |> Enum.reduce_while(nil, fn _, _ -> + # messages = receive_and_ack(consumer, 100) + # keys = Enum.map(messages, & &1.key) + # Agent.update(key_agent, fn k -> Map.drop(k, keys) end) + + # if Agent.get(key_agent, fn k -> Map.get(k, key) end) do + # {:cont, :cont} + # else + # {:halt, :ok} + # end + # end) + # end + + # def upsert_messages(stream_id, messages) when is_list(messages) do + # Streams.upsert_messages(stream_id, messages) + # messages + # end + + # def upsert_messages(stream_id, batch_size) do + # messages = + # Enum.map(1..batch_size, fn _ -> + # %{ + # key: key(), + # stream_id: stream_id, + # data: Utils.rand_string() + # } + # end) + + # upsert_messages(stream_id, messages) + # end + + # defp messages(stream_id, batch_size) do + # Enum.map(1..batch_size, fn _ -> + # %{ + # key: key(), + # stream_id: stream_id, + # data: Utils.rand_string() + # } + # end) + # end + + # def receive_and_ack(consumer, batch_size) do + # {:ok, messages} = Consumers.receive_for_consumer(consumer, batch_size: batch_size) + # Streams.ack_messages(consumer.id, Enum.map(messages, & &1.ack_id)) + # messages + # end + + # def key do + # # Should be low enough entropy for high collisions + # 30 |> :crypto.strong_rand_bytes() |> Base.encode64() + # end + + # defp populate_messages(stream_id, count) do + # Logger.info("Seeding database with #{count} messages") + # batch_size = 100_000 + # num_batches = div(count, batch_size) + + # Enum.each(1..num_batches, fn _ -> + # query = """ + # INSERT INTO #{Streams.stream_schema()}.messages (key, stream_id, data, data_hash, inserted_at, updated_at) + # SELECT + # encode(decode(substr(md5(random()::text || i::text), 1, 8), 'hex'), 'base64'), + # $1, + # substr(md5(random()::text), 1, 20), + # substr(md5(random()::text), 1, 20), + # now(), + # now() + # FROM generate_series(1, $2) i + # ON CONFLICT (stream_id, key) DO NOTHING + # """ + + # Repo.query!(query, [Sequin.String.string_to_binary!(stream_id), batch_size], timeout: :timer.minutes(2)) + # end) + + # Logger.info("Seeded database with #{count} messages") + # end end diff --git a/lib/sequin/check_system_health.ex b/lib/sequin/check_system_health.ex index 27e083e11..e0f6b6d19 100644 --- a/lib/sequin/check_system_health.ex +++ b/lib/sequin/check_system_health.ex @@ -13,7 +13,7 @@ defmodule Sequin.CheckSystemHealth do {:ok, "PONG"} <- Redis.command(["PING"]) do :ok else - {:error, %Error.ServiceError{service: :redis, code: :connection_error} = error} -> + {:error, %Error.ServiceError{service: :redis, code: "connection_error"} = error} -> {redis_url, _opts} = Application.get_env(:redix, :start_opts) %{host: redis_host, port: redis_port} = URI.parse(redis_url) @@ -29,7 +29,7 @@ defmodule Sequin.CheckSystemHealth do )} else {:error, %ValidationError{} = error} -> - {:error, Error.service(service: :redis, message: Exception.message(error), code: :tcp_reachability_error)} + {:error, Error.service(service: :redis, message: Exception.message(error), code: "tcp_reachability_error")} end {:error, %Postgrex.Error{} = error} -> @@ -52,10 +52,9 @@ defmodule Sequin.CheckSystemHealth do {:error, Error.service(service: :postgres, message: Exception.message(error))} end - error -> - Logger.error("Unknown error while checking system health: #{inspect(error)}") - message = if is_exception(error), do: Exception.message(error), else: inspect(error) - {:error, Error.service(service: :sequin, message: message, details: error)} + {:error, error} -> + Logger.error("Unknown error while checking system health: #{Exception.message(error)}") + {:error, Error.service(service: :sequin, message: Exception.message(error), details: error)} end end end diff --git a/lib/sequin/consumers/acknowledged_messages/acknowledged_messages.ex b/lib/sequin/consumers/acknowledged_messages/acknowledged_messages.ex index 9bb31792e..6178dd763 100644 --- a/lib/sequin/consumers/acknowledged_messages/acknowledged_messages.ex +++ b/lib/sequin/consumers/acknowledged_messages/acknowledged_messages.ex @@ -33,7 +33,6 @@ defmodule Sequin.Consumers.AcknowledgedMessages do commands |> Redis.pipeline() - |> handle_response() |> case do {:ok, _} -> :ok error -> error @@ -50,7 +49,6 @@ defmodule Sequin.Consumers.AcknowledgedMessages do ["ZREVRANGE", key, offset, offset + count - 1] |> Redis.command() - |> handle_response() |> case do {:ok, messages} -> {:ok, Enum.map(messages, &AcknowledgedMessage.decode/1)} error -> error @@ -64,20 +62,7 @@ defmodule Sequin.Consumers.AcknowledgedMessages do def count_messages(consumer_id) do key = "acknowledged_messages:#{consumer_id}" - ["ZCARD", key] - |> Redis.command() - |> handle_response() - end - - @spec handle_response(any()) :: {:ok, any()} | {:error, Error.t()} - defp handle_response({:ok, response}), do: {:ok, response} - - defp handle_response({:error, error}) when is_exception(error) do - {:error, Error.service(service: :redis, message: Exception.message(error))} - end - - defp handle_response({:error, error}) do - {:error, Error.service(service: :redis, message: "Redis error: #{inspect(error)}")} + Redis.command(["ZCARD", key]) end defp to_acknowledged_message(%ConsumerRecord{} = record) do diff --git a/lib/sequin/metrics/store.ex b/lib/sequin/metrics/store.ex index 494586967..962a9832c 100644 --- a/lib/sequin/metrics/store.ex +++ b/lib/sequin/metrics/store.ex @@ -2,23 +2,20 @@ defmodule Sequin.Metrics.Store do @moduledoc false alias Sequin.Redis + # Count functions def incr_count(key, amount \\ 1) do - ["INCRBY", "metrics:count:#{key}", amount] - |> Redis.command() - |> case do + case Redis.command(["INCRBY", "metrics:count:#{key}", amount]) do {:ok, _} -> :ok - error -> error + {:error, error} -> {:error, error} end end def get_count(key) do - ["GET", "metrics:count:#{key}"] - |> Redis.command() - |> case do + case Redis.command(["GET", "metrics:count:#{key}"]) do {:ok, nil} -> {:ok, 0} {:ok, value} -> {:ok, String.to_integer(value)} - error -> error + {:error, error} -> {:error, error} end end @@ -31,22 +28,20 @@ defmodule Sequin.Metrics.Store do |> Redis.pipeline() |> case do {:ok, _} -> :ok - error -> error + {:error, error} -> {:error, error} end end def get_avg(key) do - ["HMGET", "metrics:avg:#{key}", "total", "count"] - |> Redis.command() - |> case do + case Redis.command(["HMGET", "metrics:avg:#{key}", "total", "count"]) do {:ok, [total, count]} when is_binary(total) and is_binary(count) -> {:ok, String.to_integer(total) / String.to_integer(count)} {:ok, _} -> {:ok, nil} - error -> - error + {:error, error} -> + {:error, error} end end @@ -64,7 +59,7 @@ defmodule Sequin.Metrics.Store do |> Redis.pipeline() |> case do {:ok, _} -> :ok - error -> error + {:error, error} -> {:error, error} end end @@ -74,9 +69,7 @@ defmodule Sequin.Metrics.Store do buckets = Enum.to_list((now - @instant_throughput_window + 1)..now) commands = Enum.map(buckets, &["GET", "metrics:throughput:#{key}:#{&1}"]) - commands - |> Redis.pipeline() - |> case do + case Redis.pipeline(commands) do {:ok, results} -> sum = results @@ -85,8 +78,8 @@ defmodule Sequin.Metrics.Store do {:ok, sum / @instant_throughput_window} - error -> - error + {:error, error} -> + {:error, error} end end @@ -102,14 +95,12 @@ defmodule Sequin.Metrics.Store do buckets = Enum.to_list((most_recent_full_window - window_count + 1)..most_recent_full_window) commands = Enum.map(buckets, &["GET", "metrics:throughput:#{key}:#{&1}"]) - commands - |> Redis.pipeline() - |> case do + case Redis.pipeline(commands) do {:ok, results} -> {:ok, Enum.map(results, &String.to_integer(&1 || "0"))} - error -> - error + {:error, error} -> + {:error, error} end end end diff --git a/lib/sequin/redis.ex b/lib/sequin/redis.ex index bd3e2644c..4a638a9c9 100644 --- a/lib/sequin/redis.ex +++ b/lib/sequin/redis.ex @@ -11,9 +11,7 @@ defmodule Sequin.Redis do {Redix, {url, opts}} end - @spec command(Redix.command(), keyword()) :: - {:ok, Redix.Protocol.redis_value()} - | {:error, Error.t()} + @spec command(Redix.command(), keyword()) :: {:ok, Redix.Protocol.redis_value()} | {:error, Error.t()} def command(command, opts \\ []) do case Redix.command(__MODULE__, command, opts) do {:ok, value} -> {:ok, value} @@ -25,13 +23,11 @@ defmodule Sequin.Redis do def command!(command, opts \\ []) do case command(command, opts) do {:ok, value} -> value - {:error, error} -> raise to_sequin_error(error) + {:error, error} -> raise error end end - @spec pipeline([Redix.command()], keyword()) :: - {:ok, [Redix.Protocol.redis_value()]} - | {:error, Error.t()} + @spec pipeline([Redix.command()], keyword()) :: {:ok, [Redix.Protocol.redis_value()]} | {:error, Error.t()} def pipeline(commands, opts \\ []) do case Redix.pipeline(__MODULE__, commands, opts) do {:ok, values} -> {:ok, values} @@ -43,7 +39,7 @@ defmodule Sequin.Redis do Error.service( service: :redis, message: "Redis connection error: #{Exception.message(error)}", - code: :connection_error + code: "connection_error" ) end @@ -51,7 +47,7 @@ defmodule Sequin.Redis do Error.service( service: :redis, message: "Redis error: #{Exception.message(error)}", - code: :command_error + code: "command_error" ) end @@ -59,7 +55,7 @@ defmodule Sequin.Redis do Error.service( service: :redis, message: "Redis error: #{error}", - code: error + code: "command_error" ) end end diff --git a/test/sequin/metrics/store_test.exs b/test/sequin/metrics/store_test.exs index 97ca45439..c6f7cabb2 100644 --- a/test/sequin/metrics/store_test.exs +++ b/test/sequin/metrics/store_test.exs @@ -22,10 +22,10 @@ defmodule Sequin.Metrics.StoreTest do test "incr_avg", ctx do assert Store.incr_avg(ctx.key, 10) == :ok - assert Store.get_avg(ctx.key) == {:ok, 10} + assert Store.get_avg(ctx.key) == {:ok, 10.0} assert Store.incr_avg(ctx.key, 20) == :ok - assert Store.get_avg(ctx.key) == {:ok, 15} + assert Store.get_avg(ctx.key) == {:ok, 15.0} end end