Skip to content

Commit

Permalink
Add partial setup for Pow to use Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
backspace committed Jul 21, 2024
1 parent 96b4550 commit 253b653
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 0 deletions.
2 changes: 2 additions & 0 deletions registrations/config/prod.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ config :registrations,
# Do not print debug messages in production
config :logger, level: :info

config :registrations, :pow, cache_store_backend: RegistrationsWeb.Pow.RedisCache

# ## SSL Support
#
# To get SSL working, you will need to add the `https` key
Expand Down
290 changes: 290 additions & 0 deletions registrations/lib/registrations_web/pow/redis_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
defmodule RegistrationsWeb.Pow.RedisCache do
@moduledoc """
Redis cache with optimized lookup.
To keep lookups performant the key is split up into n - 1 prefixes, where n
is the number of items in the key array. Each prefix will be combined with
all previous prefixes (if any) into a sorted set key. The rest of the key
will be combined into a member of this sorted set. The member of the sorted
set will have the expiration timestamp set as the score.
The sorted set will be set to expire with the provided `ttl`. When new
records are inserted any members of any of these prefix sorted sets with a
score score lower than the current timestamp will be removed.
When records are queried with `all/2`, the first matching `:_` wildcard in
the key match will define what prefix sorted set to use to find the keys. All
members with a lower score than the current timestamp will be removed, and
the sorted set for that prefix will be scanned. The resulting list is then
checked against the key match spec.
Any part of the key and value will be encoded with `:erlang.term_to_binary/1`
and `Base.url_encode64/2` when stored.
## Configuration options
* `:ttl` - integer value in milliseconds for ttl of records. If this value
is not provided, or is set to nil, the records will never expire.
* `:namespace` - value to use for namespacing keys. Defaults to "cache".
* `:writes` - set to `:async` to do asynchronous writes. Defaults to
`:sync`.
"""

@behaviour Pow.Store.Backend.Base

alias Pow.Config

@redix_instance_name :redix

@impl true
def put(config, record_or_records) do
ttl = Config.get(config, :ttl) || raise_ttl_error!()

commands =
record_or_records
|> List.wrap()
|> Enum.reduce([], fn {key, value}, acc ->
config
|> redis_key(key)
|> command_builder(&put_command(&1, value, ttl))
|> flatten(acc)
end)

maybe_async(config, fn ->
@redix_instance_name
|> Redix.pipeline!(commands)
|> Enum.zip(commands)
|> Enum.reject(fn
{n, _cmd} when is_number(n) or n == "OK" -> true
_any -> false
end)
|> case do
[] -> :ok
errors -> raise "Redix received unexpected response #{inspect errors}"
end
end)

:ok
end

defp put_command({key, []}, value, ttl) do
key = to_binary_redis_key(key)
value = :erlang.term_to_binary(value)

["SET", key, value, "PX", ttl]
end
defp put_command({prefix, key}, _value, ttl) do
key = to_binary_redis_key(key)
timestamp = current_timestamp()

index_key =
prefix
|> to_binary_redis_key()
|> to_index_key()

[
["ZREMRANGEBYSCORE", index_key, "-inf", timestamp],
["ZADD", index_key, timestamp + ttl, key],
["PEXPIRE", index_key, ttl]
]
end

defp current_timestamp, do: DateTime.to_unix(DateTime.utc_now(), :millisecond)

defp command_builder(key, fun) do
count = Enum.count(key)

1..count
|> Enum.map(fn i ->
key
|> Enum.split(i)
|> fun.()
end)
|> Enum.reduce([], &flatten/2)
end

defp flatten([item | _rest] = items, acc) when is_list(item) do
Enum.reduce(items, acc, &flatten/2)
end
defp flatten(item, acc), do: acc ++ [item]

defp maybe_async(config, fun) do
case Config.get(config, :writes, :sync) do
:sync -> fun.()
:async -> Task.start(fun)
end
end

@impl true
def delete(config, key) do
commands =
config
|> redis_key(key)
|> command_builder(&delete_command/1)

maybe_async(config, fn ->
Redix.pipeline!(@redix_instance_name, commands)
end)

:ok
end

def delete_command({prefix, []}) do
prefix = to_binary_redis_key(prefix)

["DEL", prefix]
end
def delete_command({prefix, key}) do
index_key =
prefix
|> to_binary_redis_key()
|> to_index_key()

key = to_binary_redis_key(key)

["ZREM", index_key, key]
end

@impl true
def get(config, key) do
key =
config
|> redis_key(key)
|> to_binary_redis_key()

case Redix.command!(@redix_instance_name, ["GET", key]) do
nil -> :not_found
value -> :erlang.binary_to_term(value)
end
end

@impl true
def all(config, key_match) do
compiled_match_spec = :ets.match_spec_compile([{{key_match, :_}, [], [:"$_"]}])
key_match = redis_key(config, key_match)

prefix =
key_match
|> Enum.find_index(& &1 == :_)
|> case do
nil -> {redis_key(config, []), key_match}
i -> Enum.split(key_match, i)
end
|> elem(0)

index_key =
prefix
|> to_binary_redis_key()
|> to_index_key()

Redix.command!(@redix_instance_name, ["ZREMRANGEBYSCORE", index_key, "-inf", current_timestamp()])

Stream.resource(
fn -> do_scan(config, prefix, compiled_match_spec, "0") end,
&stream_scan(config, prefix, compiled_match_spec, &1),
fn _ -> :ok end)
|> Enum.to_list()
end

defp to_index_key(key), do: "_index:#{key}"

defp stream_scan(_config, _prefix, _compiled_match_spec, {[], "0"}), do: {:halt, nil}
defp stream_scan(config, prefix, compiled_match_spec, {[], iterator}) do
result = do_scan(config, prefix, compiled_match_spec, iterator)

stream_scan(config, prefix, compiled_match_spec, result)
end
defp stream_scan(_config, _prefix, _compiled_match_spec, {keys, iterator}), do: {keys, {[], iterator}}

defp do_scan(config, prefix, compiled_match_spec, iterator) do
prefix = to_binary_redis_key(prefix)
index_key = to_index_key(prefix)

[iterator, res] = Redix.command!(@redix_instance_name, ["ZSCAN", index_key, iterator])

keys = Enum.take_every(res, 2)

{filter_or_load_value(compiled_match_spec, prefix, keys, config), iterator}
end

defp filter_or_load_value(compiled_match_spec, prefix, keys, config) do
keys
|> Enum.map(&"#{prefix}:#{&1}")
|> Enum.map(&convert_key/1)
|> :ets.match_spec_run(compiled_match_spec)
|> populate_values(config)
end

defp convert_key(key) do
key =
key
|> from_binary_redis_key()
|> unwrap()

{key, nil}
end

defp unwrap([_namespace, key]), do: key
defp unwrap([_namespace | key]), do: key

defp populate_values([], _config), do: []
defp populate_values(records, config) do
binary_keys = Enum.map(records, fn {key, nil} -> binary_redis_key(config, key) end)

values =
@redix_instance_name
|> Redix.command!(["MGET"] ++ binary_keys)
|> Enum.map(fn
nil -> nil
value -> :erlang.binary_to_term(value)
end)

records
|> zip_values(values)
|> Enum.reject(fn {_key, value} -> is_nil(value) end)
end

defp zip_values([{key, nil} | next1], [value | next2]) do
[{key, value} | zip_values(next1, next2)]
end
defp zip_values(_, []), do: []
defp zip_values([], _), do: []

defp binary_redis_key(config, key) do
config
|> redis_key(key)
|> to_binary_redis_key()
end

defp redis_key(config, key) do
[namespace(config) | List.wrap(key)]
end

defp namespace(config), do: Config.get(config, :namespace, "cache")

defp to_binary_redis_key(key) do
key
|> Enum.map(fn part ->
part
|> :erlang.term_to_binary()
|> Base.url_encode64(padding: false)
end)
|> Enum.join(":")
end

defp from_binary_redis_key(key) do
key
|> String.split(":")
|> Enum.map(fn part ->
part
|> Base.url_decode64!(padding: false)
|> :erlang.binary_to_term()
end)
end

@spec raise_ttl_error! :: no_return()
defp raise_ttl_error!,
do: Config.raise_error("`:ttl` configuration option is required for #{inspect(__MODULE__)}")
end
1 change: 1 addition & 0 deletions registrations/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule Registrations.Mixfile do
{:plug, "~> 1.7"},
{:pow, "~> 1.0.28"},
{:pow_assent, "~> 0.4.15"},
{:redix, "~> 0.9.2"},
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:hound, github: "backspace/hound", ref: "malgasm-plus-warning-fixes", only: :test},
{:ex_machina, "~> 2.7.0", only: :test},
Expand Down
1 change: 1 addition & 0 deletions registrations/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"pow_assent": {:hex, :pow_assent, "0.4.15", "57a9c3daf6ddbef289239a30cbbb67697e7634e88b08e0698d094620aa50852e", [:mix], [{:assent, "~> 0.1.2 or ~> 0.2.0", [hex: :assent, repo: "hexpm", optional: false]}, {:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.3.0 and < 1.8.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, ">= 2.0.0 and <= 4.0.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:plug, ">= 1.5.0 and < 2.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:pow, "~> 1.0.27", [hex: :pow, repo: "hexpm", optional: false]}], "hexpm", "2083bdbfdd5ef661aea531325ce59919932caa4225c6b050615712e8fa258a2d"},
"premailex": {:hex, :premailex, "0.3.19", "c26ff9c712c08e574d1792f2cfed638e7c7a5e353b5a4db7a40487c8130fa37c", [:mix], [{:certifi, ">= 0.0.0", [hex: :certifi, repo: "hexpm", optional: true]}, {:floki, "~> 0.19", [hex: :floki, repo: "hexpm", optional: false]}, {:meeseeks, "~> 0.11", [hex: :meeseeks, repo: "hexpm", optional: true]}, {:ssl_verify_fun, ">= 0.0.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: true]}], "hexpm", "18f3772f4b30ffe82f670c2714b2d3221eb9face69e38bb1674b6e570b7c0aff"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"redix": {:hex, :redix, "0.9.3", "c51d66657018a9b14f3b4496b1ba70ef6a16e8414cd19b34635242ac371beb95", [:mix], [], "hexpm", "aaf66448a6daac9ebaf310407a9edee99c65a0c4b39b16e67676cb5884228a13"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"swoosh": {:hex, :swoosh, "1.14.2", "cf686f92ad3b21e6651b20c50eeb1781f581dc7097ef6251b4d322a9f1d19339", [:mix], [{:cowboy, "~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:ex_aws, "~> 2.1", [hex: :ex_aws, repo: "hexpm", optional: true]}, {:finch, "~> 0.6", [hex: :finch, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13 or ~> 1.0", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:req, "~> 0.4 or ~> 1.0", [hex: :req, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "01d8fae72930a0b5c1bb9725df0408602ed8c5c3d59dc6e7a39c57b723cd1065"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
Expand Down

0 comments on commit 253b653

Please sign in to comment.