From cf32c8739ef88175a0c914bd2ef96419f13d992f Mon Sep 17 00:00:00 2001 From: zookzook Date: Fri, 23 Aug 2024 14:23:58 +0200 Subject: [PATCH] feat: added support for op_compressed (zlib and zstd) --- lib/mongo.ex | 55 +++++++++++++++++------------- lib/mongo/binary_utils.ex | 4 +++ lib/mongo/compressor.ex | 45 ++++++++++++++++++++++++ lib/mongo/messages.ex | 38 ++++++++++++++++++++- lib/mongo/mongo_db_connection.ex | 32 ++++++++++++----- lib/mongo/server_description.ex | 36 ++++++++++++++++--- lib/mongo/session.ex | 28 +++++++++++++-- lib/mongo/topology_description.ex | 35 +++++++++++++------ lib/mongo/url_parser.ex | 13 +++++++ mix.exs | 3 +- mix.lock | 14 ++++++++ test/support/topology_test_data.ex | 21 ++++++++---- 12 files changed, 266 insertions(+), 58 deletions(-) create mode 100644 lib/mongo/compressor.ex diff --git a/lib/mongo.ex b/lib/mongo.ex index 71cb167d..2ef69435 100644 --- a/lib/mongo.ex +++ b/lib/mongo.ex @@ -442,10 +442,9 @@ defmodule Mongo do |> filter_nils() opts = - Keyword.drop( - opts, - ~w(explain allow_disk_use collation bypass_document_validation hint comment read_concern)a - ) + opts + |> Keyword.drop(~w(explain allow_disk_use collation bypass_document_validation hint comment read_concern)a) + |> Keyword.put_new(:compression, true) get_stream(topology_pid, cmd, opts) end @@ -496,10 +495,9 @@ defmodule Mongo do |> filter_nils() opts = - Keyword.drop( - opts, - ~w(bypass_document_validation max_time projection return_document sort upsert collation w j wtimeout)a - ) + opts + |> Keyword.drop(~w(bypass_document_validation max_time projection return_document sort upsert collation w j wtimeout)a) + |> Keyword.put_new(:compression, true) with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do {:ok, @@ -559,10 +557,9 @@ defmodule Mongo do |> filter_nils() opts = - Keyword.drop( - opts, - ~w(bypass_document_validation max_time projection return_document sort upsert collation)a - ) + opts + |> Keyword.drop(~w(bypass_document_validation max_time projection return_document sort upsert collation)a) + |> Keyword.put_new(:compression, true) with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do {:ok, @@ -607,7 +604,10 @@ defmodule Mongo do ] |> filter_nils() - opts = Keyword.drop(opts, ~w(max_time projection sort collation)a) + opts = + opts + |> Keyword.drop(~w(max_time projection sort collation)a) + |> Keyword.put_new(:compression, true) with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]} end @@ -626,7 +626,10 @@ defmodule Mongo do ] |> filter_nils() - opts = Keyword.drop(opts, ~w(limit skip hint collation)a) + opts = + opts + |> Keyword.drop(~w(limit skip hint collation)a) + |> Keyword.put_new(:compression, true) with {:ok, doc} <- issue_command(topology_pid, cmd, :read, opts) do {:ok, trunc(doc["n"])} @@ -721,7 +724,10 @@ defmodule Mongo do ] |> filter_nils() - opts = Keyword.drop(opts, ~w(max_time)a) + opts = + opts + |> Keyword.drop(~w(max_time)a) + |> Keyword.put_new(:compression, true) with {:ok, doc} <- issue_command(topology_pid, cmd, :read, opts), do: {:ok, doc["values"]} end @@ -783,7 +789,10 @@ defmodule Mongo do drop = ~w(limit hint single_batch read_concern max min collation return_key show_record_id tailable no_cursor_timeout await_data projection comment skip sort)a - opts = Keyword.drop(opts, drop) + opts = + opts + |> Keyword.drop(drop) + |> Keyword.put_new(:compression, true) try do get_stream(topology_pid, cmd, opts) @@ -858,7 +867,7 @@ defmodule Mongo do ] |> filter_nils() - with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do + with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do case doc do %{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}} @@ -913,7 +922,7 @@ defmodule Mongo do ] |> filter_nils() - case issue_command(topology_pid, cmd, :write, opts) do + case issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do {:ok, %{"writeErrors" => _} = doc} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}} @@ -989,7 +998,7 @@ defmodule Mongo do ] |> filter_nils() - with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do + with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do case doc do %{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}} @@ -1171,7 +1180,7 @@ defmodule Mongo do ] |> filter_nils() - with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do + with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do case doc do %{"writeErrors" => write_errors} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: write_errors}} @@ -1243,7 +1252,7 @@ defmodule Mongo do ] |> filter_nils() - with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do + with {:ok, doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do case doc do %{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}} @@ -1300,7 +1309,7 @@ defmodule Mongo do Mongo.limits(top) {:ok, %{ - compression: nil, + compression: [], logical_session_timeout: 30, max_bson_object_size: 16777216, max_message_size_bytes: 48000000, @@ -1381,7 +1390,7 @@ defmodule Mongo do ] |> filter_nils() - with {:ok, _doc} <- issue_command(topology_pid, cmd, :write, opts) do + with {:ok, _doc} <- issue_command(topology_pid, cmd, :write, Keyword.put_new(opts, :compression, true)) do :ok end end diff --git a/lib/mongo/binary_utils.ex b/lib/mongo/binary_utils.ex index 58dee2c5..0c6beb20 100644 --- a/lib/mongo/binary_utils.ex +++ b/lib/mongo/binary_utils.ex @@ -21,6 +21,10 @@ defmodule Mongo.BinaryUtils do quote do: signed - little - 8 end + defmacro uint8 do + quote do: unsigned - little - 8 + end + defmacro float64 do quote do: float - little - 64 end diff --git a/lib/mongo/compressor.ex b/lib/mongo/compressor.ex new file mode 100644 index 00000000..9b8af6cf --- /dev/null +++ b/lib/mongo/compressor.ex @@ -0,0 +1,45 @@ +defmodule Mongo.Compressor do + @moduledoc false + + @zlib_compressor_id 2 + if Code.ensure_loaded?(:ezstd) do + @zstd_compressor_id 3 + end + + def compress(binary, :zlib) do + {@zlib_compressor_id, :zlib.compress(binary)} + end + + if Code.ensure_loaded?(:ezstd) do + def compress(binary, :zstd) when is_binary(binary) do + {@zstd_compressor_id, :ezstd.compress(binary)} + end + + def compress(iodata, :zstd) when is_list(iodata) do + {@zstd_compressor_id, + iodata + |> IO.iodata_to_binary() + |> :ezstd.compress()} + end + end + + def uncompress(binary, @zlib_compressor_id) do + :zlib.uncompress(binary) + end + + if Code.ensure_loaded?(:ezstd) do + def uncompress(binary, @zstd_compressor_id) do + :ezstd.decompress(binary) + end + end + + def uncompress(binary, :zlib) do + :zlib.uncompress(binary) + end + + if Code.ensure_loaded?(:ezstd) do + def uncompress(binary, :zstd) do + :ezstd.decompress(binary) + end + end +end diff --git a/lib/mongo/messages.ex b/lib/mongo/messages.ex index b5ed10df..786a3bf1 100644 --- a/lib/mongo/messages.ex +++ b/lib/mongo/messages.ex @@ -14,9 +14,12 @@ defmodule Mongo.Messages do import Record import Mongo.BinaryUtils + alias Mongo.Compressor + @op_reply 1 @op_query 2004 @op_msg_code 2013 + @op_compressed 2012 @query_flags [ tailable_cursor: 0x2, @@ -46,6 +49,7 @@ defmodule Mongo.Messages do defrecord :payload, [:doc, :sequence] defrecord :section, [:payload_type, :payload] defrecord :op_msg, [:flags, :sections] + defrecord :op_msg_compressed, [:flags, :sections, :compressor] @decoder_module Application.compile_env(:mongodb_driver, :decoder, BSON.Decoder) @@ -84,7 +88,10 @@ defmodule Mongo.Messages do @op_msg_code -> {:ok, response_to, decode_msg(response), rest} - _ -> + @op_compressed -> + decode_compression(response_to, binary) + + _error -> :error end end @@ -137,6 +144,22 @@ defmodule Mongo.Messages do end end + defp decode_compression(response_to, binary) do + <> = binary + <> = Compressor.uncompress(compressed, compressor_id) + + case original_opcode do + @op_reply -> + {:ok, response_to, decode_reply(response), rest} + + @op_msg_code -> + {:ok, response_to, decode_msg(response), rest} + + _error -> + :error + end + end + defp cstring(binary) do split(binary, []) end @@ -161,6 +184,15 @@ defmodule Mongo.Messages do [encode_header(header) | iodata] end + def encode(request_id, op_msg_compressed(compressor: compressor) = op) do + payload = encode_op(op) + uncompressed_size = IO.iodata_length(payload) + {compressor_id, compressed_payload} = Compressor.compress(payload, compressor) + iodata = [<<@op_msg_code::int32()>>, <>, <> | compressed_payload] + header = msg_header(length: IO.iodata_length(iodata) + @header_size, request_id: request_id, response_to: 0, op_code: @op_compressed) + [encode_header(header) | iodata] + end + defp encode_header(msg_header(length: length, request_id: request_id, response_to: response_to, op_code: op_code)) do <> end @@ -169,6 +201,10 @@ defmodule Mongo.Messages do [<>, coll, <<0x00, num_skip::int32(), num_return::int32()>>, BSON.Encoder.document(query), select] end + defp encode_op(op_msg_compressed(flags: flags, sections: sections)) do + [<> | encode_sections(sections)] + end + defp encode_op(op_msg(flags: flags, sections: sections)) do [<> | encode_sections(sections)] end diff --git a/lib/mongo/mongo_db_connection.ex b/lib/mongo/mongo_db_connection.ex index 7b6f4977..276cd74c 100644 --- a/lib/mongo/mongo_db_connection.ex +++ b/lib/mongo/mongo_db_connection.ex @@ -213,7 +213,7 @@ defmodule Mongo.MongoDBConnection do end defp hand_shake(opts, state) do - cmd = handshake_command(state, client(opts[:appname] || "elixir-driver")) + cmd = handshake_command(state, client(opts[:appname] || "elixir-driver"), Keyword.get(opts, :compressors, [])) case Utils.command(-1, cmd, state) do {:ok, _flags, %{"ok" => ok, "maxWireVersion" => version} = response} when ok == 1 -> @@ -342,14 +342,22 @@ defmodule Mongo.MongoDBConnection do defp send_command({:command, cmd}, opts, %{use_op_msg: true} = state) do {command_name, data} = provide_cmd_data(cmd) db = opts[:database] || state.database + compressor = opts[:compressor] cmd = cmd ++ ["$db": db] flags = Keyword.get(opts, :flags, 0x0) # MongoDB 3.6 only allows certain command arguments to be provided this way. These are: op = case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do - nil -> op_msg(flags: flags, sections: [section(payload_type: 0, payload: payload(doc: cmd))]) - key -> pulling_out(cmd, flags, key) + nil -> + if compressor != nil do + op_msg_compressed(flags: flags, sections: [section(payload_type: 0, payload: payload(doc: cmd))], compressor: compressor) + else + op_msg(flags: flags, sections: [section(payload_type: 0, payload: payload(doc: cmd))]) + end + + key -> + pulling_out(cmd, flags, key, compressor) end # overwrite temporary timeout by timeout option @@ -427,14 +435,20 @@ defmodule Mongo.MongoDBConnection do end end - defp pulling_out(cmd, flags, key) when is_atom(key) do + defp pulling_out(cmd, flags, key, compressor) when is_atom(key) do docs = Keyword.get(cmd, key) cmd = Keyword.delete(cmd, key) payload_0 = section(payload_type: 0, payload: payload(doc: cmd)) payload_1 = section(payload_type: 1, payload: payload(sequence: sequence(identifier: to_string(key), docs: docs))) - op_msg(flags: flags, sections: [payload_0, payload_1]) + case compressor != nil do + false -> + op_msg(flags: flags, sections: [payload_0, payload_1]) + + true -> + op_msg_compressed(flags: flags, sections: [payload_0, payload_1], compressor: compressor) + end end defp flags(flags) do @@ -444,12 +458,12 @@ defmodule Mongo.MongoDBConnection do end) end - defp handshake_command(%{stable_api: nil}, client) do - [ismaster: 1, helloOk: true, client: client] + defp handshake_command(%{stable_api: nil}, client, compression) do + [ismaster: 1, helloOk: true, client: client, compression: compression] end - defp handshake_command(%{stable_api: stable_api}, client) do - [client: client] + defp handshake_command(%{stable_api: stable_api}, client, compression) do + [client: client, compression: compression] |> StableVersion.merge_stable_api(stable_api) |> Keyword.put(:hello, 1) end diff --git a/lib/mongo/server_description.ex b/lib/mongo/server_description.ex index 2565874b..ece27c17 100644 --- a/lib/mongo/server_description.ex +++ b/lib/mongo/server_description.ex @@ -8,6 +8,14 @@ defmodule Mongo.ServerDescription do # see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#serverdescription @type type :: :standalone | :mongos | :possible_primary | :rs_primary | :rs_secondary | :rs_arbiter | :rs_other | :rs_ghost | :unknown + if Code.ensure_loaded?(:ezstd) do + @type compressor_types :: :zlib | :zstd + @support_compressors ["zlib", "zstd"] + else + @type compressor_types :: :zlib + @support_compressors ["zlib"] + end + @type t :: %{ address: String.t() | nil, error: String.t() | nil, @@ -30,7 +38,7 @@ defmodule Mongo.ServerDescription do max_bson_object_size: non_neg_integer, max_message_size_bytes: non_neg_integer, max_write_batch_size: non_neg_integer, - compression: String.t() | nil, + compression: [compressor_types], read_only: boolean(), logical_session_timeout: non_neg_integer, supports_retryable_writes: boolean() @@ -58,7 +66,7 @@ defmodule Mongo.ServerDescription do max_bson_object_size: 16_777_216, max_message_size_bytes: 48_000_000, max_write_batch_size: 100_000, - compression: nil, + compression: [], read_only: false, logical_session_timeout: 30, support_retryable_writes: false @@ -104,7 +112,7 @@ defmodule Mongo.ServerDescription do max_bson_object_size: hello_response["maxBsonObjectSize"] || 16_777_216, max_message_size_bytes: hello_response["maxMessageSizeBytes"] || 48_000_000, max_write_batch_size: hello_response["maxWriteBatchSize"] || 100_000, - compression: hello_response["compression"], + compression: map_compressors(hello_response["compression"]), read_only: hello_response["readOnly"] || false, logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30, supports_retryable_writes: supports_retryable_writes @@ -136,13 +144,23 @@ defmodule Mongo.ServerDescription do max_bson_object_size: hello_response["maxBsonObjectSize"] || 16_777_216, max_message_size_bytes: hello_response["maxMessageSizeBytes"] || 48_000_000, max_write_batch_size: hello_response["maxWriteBatchSize"] || 100_000, - compression: hello_response["compression"], + compression: map_compressors(hello_response["compression"]), read_only: hello_response["readOnly"] || false, logical_session_timeout: hello_response["logicalSessionTimeoutMinutes"] || 30, supports_retryable_writes: server_type != :standalone && max_wire_version >= @retryable_wire_version && hello_response["logicalSessionTimeoutMinutes"] != nil } end + defp map_compressors(nil) do + [] + end + + defp map_compressors(compressors) do + compressors + |> Enum.filter(fn compressor -> compressor in @support_compressors end) + |> Enum.map(fn compressor -> String.to_existing_atom(compressor) end) + end + # see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#type defp determine_server_type(%{"ok" => n}) when n != 1, do: :unknown defp determine_server_type(%{"msg" => "isdbgrid"}), do: :mongos @@ -159,4 +177,14 @@ defmodule Mongo.ServerDescription do end defp determine_server_type(_), do: :standalone + + if Code.ensure_loaded?(:ezstd) do + def support_compressors() do + [:zstd, :zlib] + end + else + def support_compressors() do + [:zlib] + end + end end diff --git a/lib/mongo/session.ex b/lib/mongo/session.ex index 72a7c627..a1ea4149 100644 --- a/lib/mongo/session.ex +++ b/lib/mongo/session.ex @@ -481,7 +481,11 @@ defmodule Mongo.Session do |> ReadPreference.add_read_preference(opts) |> filter_nils() - client_opts = merge_timeout(client_opts, opts) + client_opts = + client_opts + |> merge_timeout(opts) + |> merge_compression(opts) + {:keep_state_and_data, {:ok, conn, cmd, client_opts}} end @@ -497,7 +501,11 @@ defmodule Mongo.Session do |> filter_nils() |> Keyword.drop(~w(writeConcern)a) - client_opts = merge_timeout(client_opts, opts) + client_opts = + client_opts + |> merge_timeout(opts) + |> merge_compression(opts) + {:next_state, :transaction_in_progress, {:ok, conn, cmd, client_opts}} end @@ -510,7 +518,11 @@ defmodule Mongo.Session do ) |> Keyword.drop(~w(writeConcern readConcern)a) - client_opts = merge_timeout(client_opts, opts) + client_opts = + client_opts + |> merge_timeout(opts) + |> merge_compression(opts) + {:keep_state_and_data, {:ok, conn, result, client_opts}} end @@ -727,4 +739,14 @@ defmodule Mongo.Session do Keyword.put_new(opts, :timeout, timeout) end end + + defp merge_compression(opts, default_ops) do + case Keyword.get(default_ops, :compressor) do + nil -> + opts + + compressor -> + Keyword.put(opts, :compressor, compressor) + end + end end diff --git a/lib/mongo/topology_description.ex b/lib/mongo/topology_description.ex index bc40b798..4d4ed1a4 100644 --- a/lib/mongo/topology_description.ex +++ b/lib/mongo/topology_description.ex @@ -101,19 +101,19 @@ defmodule Mongo.TopologyDescription do [] end - addr = + server = servers ## only valid servers |> Enum.filter(fn {_, %{type: type}} -> type != :unknown end) - |> Enum.map(fn {server, _} -> server end) + |> Enum.map(fn {server, description} -> {server, description.compression} end) |> Enum.take_random(1) - case addr do + case server do [] -> :empty - [result] -> - {:ok, {result, opts}} + [{addr, compression}] -> + {:ok, {addr, merge_compression(opts, compression)}} end end @@ -147,18 +147,31 @@ defmodule Mongo.TopologyDescription do Keyword.put(opts, :read_preference, prefs) end - addr = + server = servers |> Enum.take_random(1) - |> Enum.map(fn {server, _} -> server end) + |> Enum.map(fn {server, description} -> {server, description.compression} end) - # check now three possible cases - case addr do + case server do [] -> :empty - [result] -> - {:ok, {result, opts}} + [{addr, compression}] -> + {:ok, {addr, merge_compression(opts, compression)}} + end + end + + def merge_compression(opts, []) do + opts + end + + def merge_compression(opts, [compressor | _xs]) do + case Keyword.get(opts, :compression, false) do + true -> + Keyword.put(opts, :compressor, compressor) + + false -> + opts end end diff --git a/lib/mongo/url_parser.ex b/lib/mongo/url_parser.ex index 41cd6b71..05454a36 100644 --- a/lib/mongo/url_parser.ex +++ b/lib/mongo/url_parser.ex @@ -10,6 +10,12 @@ defmodule Mongo.UrlParser do @mongo_url_regex ~r/^mongodb(?\+srv)?:\/\/(?:(?[^:]+):(?[^@]+)@)?(?[^\/\?]+)(?:\/(?[^?]*)?(?:\?(?(?:[^\s=]+=[^\s&]*)+))?)?$/ + if Code.ensure_loaded?(:ezstd) do + @compressors ["zstd", "zlib"] + else + @compressors ["zlib"] + end + # https://docs.mongodb.com/manual/reference/connection-string/#connections-connection-options @mongo_options %{ # Path options @@ -48,6 +54,7 @@ defmodule Mongo.UrlParser do "heartbeatFrequencyMS" => :number, "retryWrites" => ["true", "false"], "tls" => ["true", "false"], + "compressors" => @compressors, "uuidRepresentation" => ["standard", "csharpLegacy", "javaLegacy", "pythonLegacy"], # Elixir Driver options "type" => ["unknown", "single", "replicaSetNoPrimary", "sharded"] @@ -61,6 +68,12 @@ defmodule Mongo.UrlParser do defp parse_option_value(_key, ""), do: nil + defp parse_option_value("compressors", values) do + values + |> String.split(",") + |> Enum.filter(fn compressor -> compressor in @compressors end) + end + defp parse_option_value(key, value) do case @mongo_options[key] do :number -> diff --git a/mix.exs b/mix.exs index 8df6ee7d..1ce8d2ed 100644 --- a/mix.exs +++ b/mix.exs @@ -37,7 +37,8 @@ defmodule Mongodb.Mixfile do {:patch, "~> 0.12.0", only: [:dev, :test]}, {:jason, "~> 1.3", only: [:dev, :test]}, {:credo, "~> 1.7.0", only: [:dev, :test], runtime: false}, - {:ex_doc, "== 0.24.1", only: :dev, runtime: false} + {:ex_doc, "== 0.24.1", only: :dev, runtime: false}, + {:ezstd, "~> 1.1", optional: true} ] end diff --git a/mix.lock b/mix.lock index 3dcb94ec..de3b50a1 100644 --- a/mix.lock +++ b/mix.lock @@ -1,16 +1,30 @@ %{ "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, "credo": {:hex, :credo, "1.7.4", "68ca5cf89071511c12fd9919eb84e388d231121988f6932756596195ccf7fd35", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9cf776d062c78bbe0f0de1ecaee183f18f2c3ec591326107989b054b7dddefc2"}, "db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"}, + "ex_zstd": {:hex, :ex_zstd, "0.1.0", "4b1b5ebd7c0417e69308db8cdd478b9adb3e2d1a03b6e7366cf0a9aadeae11af", [:make, :mix], [{:ex_doc, ">= 0.0.0", [hex: :ex_doc, repo: "hexpm", optional: false]}], "hexpm", "2c9542a5c088e0eab14aa9b10d18bc084a6060ecf09025bbfc5b08684568bc67"}, + "ezstd": {:hex, :ezstd, "1.1.0", "d3b483d6acfadfb65dba4015371e6d54526dbf3d9ef0941b5add8bf5890731f4", [:rebar3], [], "hexpm", "28cfa0ed6cc3922095ad5ba0f23392a1664273358b17184baa909868361184e7"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, + "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.4", "29563475afa9b8a2add1b7a9c8fb68d06ca7737648f28398e04461f008b69521", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f4ed47ecda66de70dd817698a703f8816daa91272e7e45812469498614ae8b29"}, + "mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"}, + "mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"}, + "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "patch": {:hex, :patch, "0.12.0", "2da8967d382bade20344a3e89d618bfba563b12d4ac93955468e830777f816b0", [:mix], [], "hexpm", "ffd0e9a7f2ad5054f37af84067ee88b1ad337308a1cb227e181e3967127b0235"}, + "r_zstd": {:hex, :r_zstd, "1.0.1", "4899cae27f14ac4e323786ee30359d3245ddff660bb50e8a6e2138347e8d187c", [:mix], [{:rustler, "~> 0.21", [hex: :rustler, repo: "hexpm", optional: false]}, {:unsafe, "~> 1.0", [hex: :unsafe, repo: "hexpm", optional: false]}], "hexpm", "9aacab907b6a11321303cd80e248a4d62de95bf1779bdef3a4afa2d99907626c"}, + "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, + "rustler": {:hex, :rustler, "0.34.0", "e9a73ee419fc296a10e49b415a2eb87a88c9217aa0275ec9f383d37eed290c1c", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:req, "~> 0.5", [hex: :req, repo: "hexpm", optional: false]}, {:toml, "~> 0.6", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "1d0c7449482b459513003230c0e2422b0252245776fe6fd6e41cb2b11bd8e628"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, + "unsafe": {:hex, :unsafe, "1.0.2", "23c6be12f6c1605364801f4b47007c0c159497d0446ad378b5cf05f1855c0581", [:mix], [], "hexpm", "b485231683c3ab01a9cd44cb4a79f152c6f3bb87358439c6f68791b85c2df675"}, } diff --git a/test/support/topology_test_data.ex b/test/support/topology_test_data.ex index 13af5948..e22ef3ac 100644 --- a/test/support/topology_test_data.ex +++ b/test/support/topology_test_data.ex @@ -29,7 +29,8 @@ defmodule Mongo.TopologyTestData do set_name: nil, set_version: nil, tag_set: %{}, - type: :standalone + type: :standalone, + compression: [] } } } @@ -62,7 +63,8 @@ defmodule Mongo.TopologyTestData do set_name: nil, set_version: nil, tag_set: %{}, - type: :mongos + type: :mongos, + compression: [] } } } @@ -95,6 +97,7 @@ defmodule Mongo.TopologyTestData do set_version: 3, tag_set: %{}, type: :rs_primary, + compression: [], hosts: [ "localhost:27018", "localhost:27019", @@ -118,6 +121,7 @@ defmodule Mongo.TopologyTestData do set_version: 3, tag_set: %{}, type: :rs_secondary, + compression: [], error: nil, hosts: [ "localhost:27018", @@ -142,6 +146,7 @@ defmodule Mongo.TopologyTestData do set_version: 3, tag_set: %{}, type: :rs_secondary, + compression: [], error: nil, hosts: [ "localhost:27018", @@ -180,7 +185,8 @@ defmodule Mongo.TopologyTestData do set_version: nil, tag_set: %{}, type: :unknown, - hosts: [] + hosts: [], + compression: [] }, "localhost:27019" => %{ address: "localhost:27019", @@ -204,7 +210,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + compression: [] }, "localhost:27020" => %{ address: "localhost:27020", @@ -228,7 +235,8 @@ defmodule Mongo.TopologyTestData do "localhost:27018", "localhost:27019", "localhost:27020" - ] + ], + compression: [] } } } @@ -261,7 +269,8 @@ defmodule Mongo.TopologyTestData do set_version: nil, tag_set: %{}, type: :rs_primary, - hosts: ["localhost:27018"] + hosts: ["localhost:27018"], + compression: [] } } }