From a086eeb35cb633248b2e86e2c7fddba2c3008780 Mon Sep 17 00:00:00 2001 From: Robert Hetzler Date: Fri, 13 Oct 2017 13:42:49 -0700 Subject: [PATCH] Ecto prefix support - Update Normalized Query to put database string where it will successfully pass through to Connection - Update Mongo.Ecto ddl methods to pass prefix into NQ generation or directly to driver --- lib/mongo_ecto.ex | 45 ++++++++++++++++++------------ lib/mongo_ecto/normalized_query.ex | 33 ++++++++++++---------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/lib/mongo_ecto.ex b/lib/mongo_ecto.ex index 2bdc6f9..2bcd3c1 100644 --- a/lib/mongo_ecto.ex +++ b/lib/mongo_ecto.ex @@ -666,16 +666,16 @@ defmodule Mongo.Ecto do :ok end - def execute_ddl(repo, {:create, %Table{options: nil, name: coll}, columns}, opts) do + def execute_ddl(repo, {:create, %Table{options: nil, name: coll, prefix: p}, columns}, opts) do warn_on_references!(columns) - command(repo, [create: coll], opts) + command(repo, [create: coll], db_opt(p) ++ opts) :ok end - def execute_ddl(repo, {:create, %Table{options: options, name: coll}, columns}, opts) + def execute_ddl(repo, {:create, %Table{options: options, name: coll, prefix: p}, columns}, opts) when is_list(options) do warn_on_references!(columns) - command(repo, [create: coll] ++ options, opts) + command(repo, [create: coll] ++ options, db_opt(p) ++ opts) :ok end @@ -689,38 +689,40 @@ defmodule Mongo.Ecto do unique: command.unique, background: command.concurrently, key: Enum.map(command.columns, &{&1, 1}), - ns: namespace(repo, command.table)] + ns: namespace(repo, command.table, command.prefix)] + # TODO: move this into normalized query for consistent coverage query = %WriteQuery{coll: "system.indexes", command: index} - case Connection.insert(repo, query, opts) do + case Connection.insert(repo, query, db_opt(command.prefix) ++ opts) do {:ok, _} -> :ok {:invalid, [unique: index]} -> raise Connection.format_constraint_error(index) end end - def execute_ddl(repo, {:drop, %Index{name: name, table: coll}}, opts) do - command(repo, [dropIndexes: coll, index: to_string(name)], opts) + def execute_ddl(repo, {:drop, %Index{name: name, table: coll, prefix: p}}, opts) do + command(repo, [dropIndexes: coll, index: to_string(name)], db_opt(p) ++ opts) :ok end - def execute_ddl(repo, {:drop, %Table{name: coll}}, opts) do - command(repo, [drop: coll], opts) + def execute_ddl(repo, {:drop, %Table{name: coll, prefix: p}}, opts) do + command(repo, [drop: coll], db_opt(p) ++ opts) :ok end - def execute_ddl(repo, {:rename, %Table{name: old}, %Table{name: new}}, opts) do - command = [renameCollection: namespace(repo, old), to: namespace(repo, new)] - command(repo, command, [database: "admin"] ++ opts) + def execute_ddl(repo, {:rename, %Table{name: old, prefix: pold}, %Table{name: new, prefix: pnew}}, opts) do + command = [renameCollection: namespace(repo, old, pold), to: namespace(repo, new, pnew)] + command(repo, command, db_opt("admin") ++ opts) :ok end - def execute_ddl(repo, {:rename, %Table{name: coll}, old, new}, opts) do + def execute_ddl(repo, {:rename, %Table{name: coll, prefix: p}, old, new}, opts) do + # TODO: move this into normalized query for consistent coverage query = %WriteQuery{coll: to_string(coll), command: ["$rename": [{to_string(old), to_string(new)}]], opts: [multi: true]} - {:ok, _} = Connection.update(repo, query, opts) + {:ok, _} = Connection.update(repo, query, db_opt(p) ++ opts) :ok end @@ -826,6 +828,7 @@ defmodule Mongo.Ecto do end defp list_collections(_,repo, opts) do + # TODO: move this into normalized query for consistent coverage query = %ReadQuery{coll: "system.namespaces", query: @list_collections_query} opts = Keyword.put(opts, :log, false) @@ -837,13 +840,19 @@ defmodule Mongo.Ecto do end defp truncate_collection(repo, collection, opts) do + # TODO: move this into normalized query for consistent coverage query = %WriteQuery{coll: collection, query: %{}} Connection.delete_all(repo, query, opts) end - defp namespace(repo, coll) do - "#{repo.config[:database]}.#{coll}" - end + defp namespace(repo, coll, nil), do: "#{repo.config[:database]}.#{coll}" + defp namespace(repo, coll, prefix), do: "#{normalize_dbname(prefix)}.#{coll}" + + defp normalize_dbname(nil), do: nil + defp normalize_dbname(other), do: to_string(other) # sometimes we get atoms here, which can be problematic + + defp db_opt(nil), do: [] + defp db_opt(prefix), do: [database: normalize_dbname(prefix)] defp db_version(repo) do version = command(repo, %{"buildinfo": 1}, [])["versionArray"] diff --git a/lib/mongo_ecto/normalized_query.ex b/lib/mongo_ecto/normalized_query.ex index eb7d4c2..441e673 100644 --- a/lib/mongo_ecto/normalized_query.ex +++ b/lib/mongo_ecto/normalized_query.ex @@ -5,31 +5,31 @@ defmodule Mongo.Ecto.NormalizedQuery do @moduledoc false defstruct coll: nil, pk: nil, params: {}, query: %{}, projection: %{}, - order: %{}, fields: [], database: nil, opts: [] + order: %{}, fields: [], opts: [] end defmodule WriteQuery do @moduledoc false - defstruct coll: nil, query: %{}, command: %{}, database: nil, opts: [] + defstruct coll: nil, query: %{}, command: %{}, opts: [] end defmodule CommandQuery do @moduledoc false - defstruct command: nil, database: nil, opts: [] + defstruct command: nil, opts: [] end defmodule CountQuery do @moduledoc false - defstruct coll: nil, pk: nil, fields: [], query: %{}, database: nil, opts: [] + defstruct coll: nil, pk: nil, fields: [], query: %{}, opts: [] end defmodule AggregateQuery do @moduledoc false - defstruct coll: nil, pk: nil, fields: [], pipeline: [], database: nil, opts: [] + defstruct coll: nil, pk: nil, fields: [], pipeline: [], opts: [] end alias Mongo.Ecto.Conversions @@ -62,12 +62,12 @@ defmodule Mongo.Ecto.NormalizedQuery do defp find_all(original, query, projection, fields, params, {coll, _, pk} = from) do %ReadQuery{coll: coll, pk: pk, params: params, query: query, fields: fields, projection: projection, order: order(original, from), - database: original.prefix, opts: limit_skip(original, params, from)} + opts: select_database(original.prefix) ++ limit_skip(original, params, from)} end defp count(original, query, fields, params, {coll, _, pk} = from) do %CountQuery{coll: coll, query: query, opts: limit_skip(original, params, from), - pk: pk, fields: fields, database: original.prefix} + pk: pk, fields: fields, opts: select_database(original.prefix)} end defp aggregate(original, query, pipeline, fields, params, {coll, _, pk} = from) do @@ -83,7 +83,7 @@ defmodule Mongo.Ecto.NormalizedQuery do if query != %{}, do: [["$match": query] | pipeline], else: pipeline %AggregateQuery{coll: coll, pipeline: pipeline, pk: pk, fields: fields, - database: original.prefix} + opts: select_database(original.prefix)} end def update_all(%Query{} = original, params) do @@ -95,14 +95,14 @@ defmodule Mongo.Ecto.NormalizedQuery do query = query(original, params, from) command = command(:update, original, params, from) - %WriteQuery{coll: coll, query: query, command: command, database: original.prefix} + %WriteQuery{coll: coll, query: query, command: command, opts: select_database(original.prefix)} end def update(%{source: {prefix, coll}, schema: schema}, fields, filter) do command = command(:update, fields, primary_key(schema)) query = query(filter, primary_key(schema)) - %WriteQuery{coll: coll, query: query, database: prefix, command: command} + %WriteQuery{coll: coll, query: query, command: command, opts: select_database(prefix)} end def delete_all(%Query{} = original, params) do @@ -113,23 +113,23 @@ defmodule Mongo.Ecto.NormalizedQuery do coll = coll(from) query = query(original, params, from) - %WriteQuery{coll: coll, query: query, database: original.prefix} + %WriteQuery{coll: coll, query: query, opts: select_database(original.prefix)} end def delete(%{source: {prefix, coll}, schema: schema}, filter) do query = query(filter, primary_key(schema)) - %WriteQuery{coll: coll, query: query, database: prefix} + %WriteQuery{coll: coll, query: query, opts: select_database(prefix)} end def insert(%{source: {prefix, coll}, schema: schema}, document) do command = command(:insert, document, primary_key(schema)) - %WriteQuery{coll: coll, command: command, database: prefix} + %WriteQuery{coll: coll, command: command, opts: select_database(prefix)} end def command(command, opts) do - %CommandQuery{command: command, database: Keyword.get(opts, :database, nil)} + %CommandQuery{command: command, opts: select_database(Keyword.get(opts, :database))} end defp from(%Query{from: {coll, model}}) do @@ -274,6 +274,11 @@ defmodule Mongo.Ecto.NormalizedQuery do defp offset_limit(%Query.QueryExpr{expr: expr}, params, pk, query, where), do: value(expr, params, pk, query, where) + defp select_database(nil), + do: [] + defp select_database(db_prefix), + do: [database: to_string(db_prefix)] + defp primary_key(nil), do: nil defp primary_key(schema) do