Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions lib/memcache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,21 @@ defmodule Memcache do
"""

@type error :: {:error, binary | atom}
@type result :: {:ok} | {:ok, integer} | {:ok, any} | {:ok, any, integer} | error

@type fetch_result :: {:ok, any} | {:ok, any, integer} | error
@type result ::
{:ok}
| {:ok, integer}
| {:ok, any}
| {:ok, any, keyword}
| {:ok, any, integer}
| {:ok, any, integer, keyword}
| error

@type fetch_result ::
{:ok, any}
| {:ok, any, keyword}
| {:ok, any, integer}
| {:ok, any, integer, keyword}
| error

@type fetch_integer_result :: {:ok, integer} | {:ok, integer, integer} | error

Expand Down Expand Up @@ -544,26 +556,47 @@ defmodule Memcache do
apply(elem(coder, 0), :encode, [value, elem(coder, 1)])
end

defp decode(server_options, value) do
defp encoder_flags(server_options, value) do
coder = server_options.coder
apply(elem(coder, 0), :decode, [value, elem(coder, 1)])
apply(elem(coder, 0), :encode_flags, [value, elem(coder, 1)])
end

defp decode_response({:ok, value}, server_options) when is_binary(value) do
{:ok, decode(server_options, value)}
defp opts_with_flags(opts, encoder_flags) do
given_flags = Keyword.get(opts, :flags, [])
Keyword.put(opts, :flags, merge_flags(encoder_flags, given_flags))
end

defp decode_response({:ok, value, cas}, server_options) when is_binary(value) do
{:ok, decode(server_options, value), cas}
defp merge_flags(encoder_flags, []), do: encoder_flags
defp merge_flags(_encoder_flags, [_head | []] = given_flags), do: given_flags

defp decode(server_options, {value, flags}) do
coder = server_options.coder
module = elem(coder, 0)
coder_options = elem(coder, 1) ++ [flags: flags]
apply(module, :decode, [value, coder_options])
end

defp decode_response(rest, _server_options), do: rest
defp decode_response({:ok, value, flags}, server_options)
when is_binary(value) and is_list(flags) do
{:ok, decode(server_options, {value, flags})}
end

defp decode_response({:ok, value, cas, flags}, server_options)
when is_binary(value) and is_list(flags) do
{:ok, decode(server_options, {value, flags}), cas}
end

defp decode_response(rest, _server_options) do
rest
end

defp decode_multi_response({:ok, values}, server_options) when is_list(values) do
{:ok, Enum.map(values, &decode_response(&1, server_options))}
end

defp decode_multi_response(rest, _server_options), do: rest
defp decode_multi_response(rest, _server_options) do
rest
end

defp ttl_or_default(server_options, opts) do
if Keyword.has_key?(opts, :ttl) do
Expand Down Expand Up @@ -604,7 +637,7 @@ defmodule Memcache do
|> execute(
command,
[key_with_namespace(server_options, key) | [encode(server_options, value) | rest]],
opts
opts_with_flags(opts, encoder_flags(server_options, value))
)
|> decode_response(server_options)
end
Expand All @@ -631,7 +664,8 @@ defmodule Memcache do
commands =
Enum.map(commands, fn {command, [key | [value | rest]], opts} ->
{command,
[key_with_namespace(server_options, key) | [encode(server_options, value) | rest]], opts}
[key_with_namespace(server_options, key) | [encode(server_options, value) | rest]],
opts_with_flags(opts, encoder_flags(server_options, value))}
end)

server
Expand Down
12 changes: 12 additions & 0 deletions lib/memcache/binary_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ defmodule Memcache.BinaryUtils do
AUTH_STEP: 0x22
]

# http://www.hjp.at/zettel/m/memcached_flags.rxml
@flags [
serialised: 0x1,
compressed: 0x2
]

defmacro opb(x) do
quote do
<<unquote(Keyword.fetch!(@ops, x))>>
Expand All @@ -46,6 +52,12 @@ defmodule Memcache.BinaryUtils do
end
end

def flag_bit(x) do
Keyword.fetch!(@flags, x)
end

def flags, do: @flags

defmodule Header do
@moduledoc false

Expand Down
37 changes: 37 additions & 0 deletions lib/memcache/coder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,46 @@ defmodule Memcache.Coder do
"""
@callback encode(any, options :: Keyword.t()) :: iodata

@doc """
Called before the value is sent to the server. It should return
the flags to set by default for this coder as a list

Valid flags:
- :serialised
- :compressed

Example:
iex> value = %{}
iex> options = [flags: [:compressed]]
iex> encode_flags(value, options)
[:compressed, :serialised]

iex> encode_flags(value)
[:serialised]
"""
@callback encode_flags(any, options :: Keyword.t()) :: list(atom)

@doc """
Called after the value is loaded from the server. It can return any
type.
"""
@callback decode(iodata, options :: Keyword.t()) :: any

defmacro __using__(_opts) do
quote do
@behaviour unquote(__MODULE__)

def encode_flags(value, options) do
flags = options[:flags]

if !is_nil(flags) do
flags
else
[]
end
end

defoverridable encode_flags: 2
end
end
end
2 changes: 1 addition & 1 deletion lib/memcache/coder/erlang.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Memcache.Coder.Erlang do
Uses `:erlang.term_to_binary/2` and `:erlang.binary_to_term/1` to
encode and decode value.
"""
@behaviour Memcache.Coder
use Memcache.Coder

def encode(value, options), do: :erlang.term_to_binary(value, options)
def decode(value, _options), do: :erlang.binary_to_term(value)
Expand Down
3 changes: 2 additions & 1 deletion lib/memcache/coder/json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ if Code.ensure_loaded?(Poison) do
Uses the `Poison` module to encode and decode value. To use this
coder add `poison` as a dependency in `mix.exs`.
"""
@behaviour Memcache.Coder
use Memcache.Coder

def encode(value, options), do: Poison.encode_to_iodata!(value, options)
def encode_flags(_value, _options), do: [:serialised]
def decode(value, options), do: Poison.decode!(value, options)
end
end
2 changes: 1 addition & 1 deletion lib/memcache/coder/raw.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Memcache.Coder.Raw do
@moduledoc """
Doesn't do any conversion. Stores the value as it is in the server.
"""
@behaviour Memcache.Coder
use Memcache.Coder

def encode(value, _options), do: value
def decode(value, _options), do: value
Expand Down
2 changes: 1 addition & 1 deletion lib/memcache/coder/zip.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Memcache.Coder.ZIP do
Uses `:zlib.zip/1` and `:zlib.unzip/1` to compress and decompress
value.
"""
@behaviour Memcache.Coder
use Memcache.Coder

def encode(value, _options), do: :zlib.zip(value)
def decode(value, _options), do: :zlib.unzip(value)
Expand Down
71 changes: 64 additions & 7 deletions lib/memcache/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Memcache.Connection do
"""
require Logger
use Connection
use Bitwise
import Memcache.BinaryUtils
alias Memcache.Protocol
alias Memcache.Receiver
alias Memcache.Utils
Expand Down Expand Up @@ -90,7 +92,17 @@ defmodule Memcache.Connection do
"""
@spec execute(GenServer.server(), atom, [binary], Keyword.t()) :: Memcache.result()
def execute(pid, command, args, options \\ []) do
Connection.call(pid, {:execute, command, args, %{cas: Keyword.get(options, :cas, false)}})
flags =
options
|> Keyword.get(:flags, [])
|> translate_flags()

opts =
%{}
|> Map.put(:cas, Keyword.get(options, :cas, false))
|> Map.put(:flags, flags)

Connection.call(pid, {:execute, command, args, opts})
end

@doc """
Expand All @@ -107,6 +119,7 @@ defmodule Memcache.Connection do
@spec execute_quiet(GenServer.server(), [{atom, [binary]} | {atom, [binary], Keyword.t()}]) ::
{:ok, [Memcache.result()]} | {:error, atom}
def execute_quiet(pid, commands) do
commands = normalise_flags(commands)
Connection.call(pid, {:execute_quiet, commands})
end

Expand Down Expand Up @@ -260,6 +273,27 @@ defmodule Memcache.Connection do
:ok
end

@spec normalise_flags([{atom, [binary]} | {atom, [binary], Keyword.t()}]) :: [
{atom, [binary]} | {atom, [binary], Keyword.t()}
]
defp normalise_flags(commands) do
Enum.map(commands, fn
{_command, _args} = command ->
command

{command, args, opts} ->
opts = Keyword.update(opts, :flags, 0, &translate_flags/1)
{command, args, opts}
end)
end

@spec translate_flags(list(atom)) :: pos_integer
defp translate_flags(flags) do
Enum.reduce(flags, 0x0, fn flag, flag_bits ->
flag_bit(flag) ||| flag_bits
end)
end

defp maybe_activate_sock(state) do
if Enum.empty?(state.receiver_queue) do
case :inet.setopts(state.sock, active: :once) do
Expand All @@ -283,7 +317,7 @@ defmodule Memcache.Connection do
end

defp send_and_receive(%State{sock: sock} = s, from, command, args, opts) do
packet = serialize(command, args)
packet = serialize(command, args, opts)

case :gen_tcp.send(sock, packet) do
:ok ->
Expand All @@ -298,7 +332,7 @@ defmodule Memcache.Connection do

defp send_and_receive_quiet(%State{sock: sock} = s, from, commands) do
{packet, commands, i} = Enum.reduce(commands, {[], [], 1}, &accumulate_commands/2)
packet = [packet | serialize(:NOOP, [], i)]
packet = [packet | serialize(:NOOP, [], %{}, i)]

case :gen_tcp.send(sock, packet) do
:ok ->
Expand All @@ -317,12 +351,12 @@ defmodule Memcache.Connection do
end

defp accumulate_commands({command, args}, {packet, commands, i}) do
{[packet | serialize(command, args, i)], [{i, command, args, %{cas: false}} | commands],
{[packet | serialize(command, args, %{}, i)], [{i, command, args, %{cas: false}} | commands],
i + 1}
end

defp accumulate_commands({command, args, options}, {packet, commands, i}) do
{[packet | serialize(command, args, i)],
{[packet | serialize(command, args, options, i)],
[{i, command, args, %{cas: Keyword.get(options, :cas, false)}} | commands], i + 1}
end

Expand Down Expand Up @@ -362,7 +396,7 @@ defmodule Memcache.Connection do
end

defp execute_command(sock, command, args) do
packet = serialize(command, args)
packet = serialize(command, args, %{})

case :gen_tcp.send(sock, packet) do
:ok -> recv_response(sock, command)
Expand Down Expand Up @@ -413,7 +447,30 @@ defmodule Memcache.Connection do
end
end

defp serialize(command, args, opaque \\ 0) do
@default_expiry 0
@default_cas 0
@flag_commands [:SET, :SETQ, :ADD, :ADDQ, :REPLACE, :REPLACEQ]
defp serialize(command, args, opts, opaque \\ 0)

defp serialize(command, args, opts, opaque) when command in @flag_commands do
opts = Map.new(opts)
flags = Map.get(opts, :flags, 0)

args =
case length(args) do
2 -> args ++ [@default_expiry, @default_cas, flags]
3 -> args ++ [@default_cas, flags]
4 -> args ++ [flags]
end

do_serialize(command, args, opaque)
end

defp serialize(command, args, _opts, opaque) do
do_serialize(command, args, opaque)
end

defp do_serialize(command, args, opaque) do
apply(Protocol, :to_binary, [command | [opaque | args]])
end
end
Loading