Skip to content

Commit

Permalink
feat: added support for op_compressed (zlib and zstd)
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook committed Aug 23, 2024
1 parent 84732f7 commit cf32c87
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 58 deletions.
55 changes: 32 additions & 23 deletions lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,9 @@ defmodule Mongo do
|> filter_nils()

opts =
Keyword.drop(
opts,
~w(explain allow_disk_use collation bypass_document_validation hint comment read_concern)a
)
opts
|> Keyword.drop(~w(explain allow_disk_use collation bypass_document_validation hint comment read_concern)a)
|> Keyword.put_new(:compression, true)

get_stream(topology_pid, cmd, opts)
end
Expand Down Expand Up @@ -496,10 +495,9 @@ defmodule Mongo do
|> filter_nils()

opts =
Keyword.drop(
opts,
~w(bypass_document_validation max_time projection return_document sort upsert collation w j wtimeout)a
)
opts
|> Keyword.drop(~w(bypass_document_validation max_time projection return_document sort upsert collation w j wtimeout)a)
|> Keyword.put_new(:compression, true)

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
{:ok,
Expand Down Expand Up @@ -559,10 +557,9 @@ defmodule Mongo do
|> filter_nils()

opts =
Keyword.drop(
opts,
~w(bypass_document_validation max_time projection return_document sort upsert collation)a
)
opts
|> Keyword.drop(~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
|> Keyword.put_new(:compression, true)

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
{:ok,
Expand Down Expand Up @@ -607,7 +604,10 @@ defmodule Mongo do
]
|> filter_nils()

opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
opts =
opts
|> Keyword.drop(~w(max_time projection sort collation)a)
|> Keyword.put_new(:compression, true)

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
end
Expand All @@ -626,7 +626,10 @@ defmodule Mongo do
]
|> filter_nils()

opts = Keyword.drop(opts, ~w(limit skip hint collation)a)
opts =
opts
|> Keyword.drop(~w(limit skip hint collation)a)
|> Keyword.put_new(:compression, true)

with {:ok, doc} <- issue_command(topology_pid, cmd, :read, opts) do
{:ok, trunc(doc["n"])}
Expand Down Expand Up @@ -721,7 +724,10 @@ defmodule Mongo do
]
|> filter_nils()

opts = Keyword.drop(opts, ~w(max_time)a)
opts =
opts
|> Keyword.drop(~w(max_time)a)
|> Keyword.put_new(:compression, true)

with {:ok, doc} <- issue_command(topology_pid, cmd, :read, opts), do: {:ok, doc["values"]}
end
Expand Down Expand Up @@ -783,7 +789,10 @@ defmodule Mongo do

drop = ~w(limit hint single_batch read_concern max min collation return_key show_record_id tailable no_cursor_timeout await_data projection comment skip sort)a

opts = Keyword.drop(opts, drop)
opts =
opts
|> Keyword.drop(drop)
|> Keyword.put_new(:compression, true)

try do
get_stream(topology_pid, cmd, opts)
Expand Down Expand Up @@ -858,7 +867,7 @@ defmodule Mongo do
]
|> filter_nils()

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
case doc do
%{"writeErrors" => _} ->
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
Expand Down Expand Up @@ -913,7 +922,7 @@ defmodule Mongo do
]
|> filter_nils()

case issue_command(topology_pid, cmd, :write, opts) do
case issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
{:ok, %{"writeErrors" => _} = doc} ->
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}

Expand Down Expand Up @@ -989,7 +998,7 @@ defmodule Mongo do
]
|> filter_nils()

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
case doc do
%{"writeErrors" => _} ->
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
Expand Down Expand Up @@ -1171,7 +1180,7 @@ defmodule Mongo do
]
|> filter_nils()

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
case doc do
%{"writeErrors" => write_errors} ->
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: write_errors}}
Expand Down Expand Up @@ -1243,7 +1252,7 @@ defmodule Mongo do
]
|> filter_nils()

with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
case doc do
%{"writeErrors" => _} ->
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
Expand Down Expand Up @@ -1300,7 +1309,7 @@ defmodule Mongo do
Mongo.limits(top)
{:ok, %{
compression: nil,
compression: [],
logical_session_timeout: 30,
max_bson_object_size: 16777216,
max_message_size_bytes: 48000000,
Expand Down Expand Up @@ -1381,7 +1390,7 @@ defmodule Mongo do
]
|> filter_nils()

with {:ok, _doc} <- issue_command(topology_pid, cmd, :write, opts) do
with {:ok, _doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do
:ok
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/binary_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ defmodule Mongo.BinaryUtils do
quote do: signed - little - 8
end

defmacro uint8 do
quote do: unsigned - little - 8
end

defmacro float64 do
quote do: float - little - 64
end
Expand Down
45 changes: 45 additions & 0 deletions lib/mongo/compressor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule Mongo.Compressor do
@moduledoc false

@zlib_compressor_id 2
if Code.ensure_loaded?(:ezstd) do
@zstd_compressor_id 3
end

def compress(binary, :zlib) do
{@zlib_compressor_id, :zlib.compress(binary)}
end

if Code.ensure_loaded?(:ezstd) do
def compress(binary, :zstd) when is_binary(binary) do
{@zstd_compressor_id, :ezstd.compress(binary)}
end

def compress(iodata, :zstd) when is_list(iodata) do
{@zstd_compressor_id,
iodata
|> IO.iodata_to_binary()
|> :ezstd.compress()}
end
end

def uncompress(binary, @zlib_compressor_id) do
:zlib.uncompress(binary)
end

if Code.ensure_loaded?(:ezstd) do
def uncompress(binary, @zstd_compressor_id) do
:ezstd.decompress(binary)
end
end

def uncompress(binary, :zlib) do
:zlib.uncompress(binary)
end

if Code.ensure_loaded?(:ezstd) do
def uncompress(binary, :zstd) do
:ezstd.decompress(binary)
end
end
end
38 changes: 37 additions & 1 deletion lib/mongo/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ defmodule Mongo.Messages do
import Record
import Mongo.BinaryUtils

alias Mongo.Compressor

@op_reply 1
@op_query 2004
@op_msg_code 2013
@op_compressed 2012

@query_flags [
tailable_cursor: 0x2,
Expand Down Expand Up @@ -46,6 +49,7 @@ defmodule Mongo.Messages do
defrecord :payload, [:doc, :sequence]
defrecord :section, [:payload_type, :payload]
defrecord :op_msg, [:flags, :sections]
defrecord :op_msg_compressed, [:flags, :sections, :compressor]

@decoder_module Application.compile_env(:mongodb_driver, :decoder, BSON.Decoder)

Expand Down Expand Up @@ -84,7 +88,10 @@ defmodule Mongo.Messages do
@op_msg_code ->
{:ok, response_to, decode_msg(response), rest}

_ ->
@op_compressed ->
decode_compression(response_to, binary)

_error ->
:error
end
end
Expand Down Expand Up @@ -137,6 +144,22 @@ defmodule Mongo.Messages do
end
end

defp decode_compression(response_to, binary) do
<<original_opcode::int32(), uncompressed_size::int32(), compressor_id::uint8(), compressed::binary>> = binary
<<response::binary(uncompressed_size), rest::binary>> = Compressor.uncompress(compressed, compressor_id)

case original_opcode do
@op_reply ->
{:ok, response_to, decode_reply(response), rest}

@op_msg_code ->
{:ok, response_to, decode_msg(response), rest}

_error ->
:error
end
end

defp cstring(binary) do
split(binary, [])
end
Expand All @@ -161,6 +184,15 @@ defmodule Mongo.Messages do
[encode_header(header) | iodata]
end

def encode(request_id, op_msg_compressed(compressor: compressor) = op) do
payload = encode_op(op)
uncompressed_size = IO.iodata_length(payload)
{compressor_id, compressed_payload} = Compressor.compress(payload, compressor)
iodata = [<<@op_msg_code::int32()>>, <<uncompressed_size::int32()>>, <<compressor_id::uint8()>> | compressed_payload]
header = msg_header(length: IO.iodata_length(iodata) + @header_size, request_id: request_id, response_to: 0, op_code: @op_compressed)
[encode_header(header) | iodata]
end

defp encode_header(msg_header(length: length, request_id: request_id, response_to: response_to, op_code: op_code)) do
<<length::int32(), request_id::int32(), response_to::int32(), op_code::int32()>>
end
Expand All @@ -169,6 +201,10 @@ defmodule Mongo.Messages do
[<<blit_flags(:query, flags)::int32()>>, coll, <<0x00, num_skip::int32(), num_return::int32()>>, BSON.Encoder.document(query), select]
end

defp encode_op(op_msg_compressed(flags: flags, sections: sections)) do
[<<blit_flags(:msg, flags)::int32()>> | encode_sections(sections)]
end

defp encode_op(op_msg(flags: flags, sections: sections)) do
[<<blit_flags(:msg, flags)::int32()>> | encode_sections(sections)]
end
Expand Down
32 changes: 23 additions & 9 deletions lib/mongo/mongo_db_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ defmodule Mongo.MongoDBConnection do
end

defp hand_shake(opts, state) do
cmd = handshake_command(state, client(opts[:appname] || "elixir-driver"))
cmd = handshake_command(state, client(opts[:appname] || "elixir-driver"), Keyword.get(opts, :compressors, []))

case Utils.command(-1, cmd, state) do
{:ok, _flags, %{"ok" => ok, "maxWireVersion" => version} = response} when ok == 1 ->
Expand Down Expand Up @@ -342,14 +342,22 @@ defmodule Mongo.MongoDBConnection do
defp send_command({:command, cmd}, opts, %{use_op_msg: true} = state) do
{command_name, data} = provide_cmd_data(cmd)
db = opts[:database] || state.database
compressor = opts[:compressor]
cmd = cmd ++ ["$db": db]
flags = Keyword.get(opts, :flags, 0x0)

# MongoDB 3.6 only allows certain command arguments to be provided this way. These are:
op =
case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do
nil -> op_msg(flags: flags, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
key -> pulling_out(cmd, flags, key)
nil ->
if compressor != nil do
op_msg_compressed(flags: flags, sections: [section(payload_type: 0, payload: payload(doc: cmd))], compressor: compressor)
else
op_msg(flags: flags, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
end

key ->
pulling_out(cmd, flags, key, compressor)
end

# overwrite temporary timeout by timeout option
Expand Down Expand Up @@ -427,14 +435,20 @@ defmodule Mongo.MongoDBConnection do
end
end

defp pulling_out(cmd, flags, key) when is_atom(key) do
defp pulling_out(cmd, flags, key, compressor) when is_atom(key) do
docs = Keyword.get(cmd, key)
cmd = Keyword.delete(cmd, key)

payload_0 = section(payload_type: 0, payload: payload(doc: cmd))
payload_1 = section(payload_type: 1, payload: payload(sequence: sequence(identifier: to_string(key), docs: docs)))

op_msg(flags: flags, sections: [payload_0, payload_1])
case compressor != nil do
false ->
op_msg(flags: flags, sections: [payload_0, payload_1])

true ->
op_msg_compressed(flags: flags, sections: [payload_0, payload_1], compressor: compressor)
end
end

defp flags(flags) do
Expand All @@ -444,12 +458,12 @@ defmodule Mongo.MongoDBConnection do
end)
end

defp handshake_command(%{stable_api: nil}, client) do
[ismaster: 1, helloOk: true, client: client]
defp handshake_command(%{stable_api: nil}, client, compression) do
[ismaster: 1, helloOk: true, client: client, compression: compression]
end

defp handshake_command(%{stable_api: stable_api}, client) do
[client: client]
defp handshake_command(%{stable_api: stable_api}, client, compression) do
[client: client, compression: compression]
|> StableVersion.merge_stable_api(stable_api)
|> Keyword.put(:hello, 1)
end
Expand Down
Loading

0 comments on commit cf32c87

Please sign in to comment.