Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ecto prefix support #144

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions lib/mongo_ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -666,16 +666,16 @@ defmodule Mongo.Ecto do
:ok
end

def execute_ddl(repo, {:create, %Table{options: nil, name: coll}, columns}, opts) do
def execute_ddl(repo, {:create, %Table{options: nil, name: coll, prefix: p}, columns}, opts) do
warn_on_references!(columns)
command(repo, [create: coll], opts)
command(repo, [create: coll], db_opt(p) ++ opts)
:ok
end

def execute_ddl(repo, {:create, %Table{options: options, name: coll}, columns}, opts)
def execute_ddl(repo, {:create, %Table{options: options, name: coll, prefix: p}, columns}, opts)
when is_list(options) do
warn_on_references!(columns)
command(repo, [create: coll] ++ options, opts)
command(repo, [create: coll] ++ options, db_opt(p) ++ opts)
:ok
end

Expand All @@ -689,38 +689,40 @@ defmodule Mongo.Ecto do
unique: command.unique,
background: command.concurrently,
key: Enum.map(command.columns, &{&1, 1}),
ns: namespace(repo, command.table)]
ns: namespace(repo, command.table, command.prefix)]

# TODO: move this into normalized query for consistent coverage
query = %WriteQuery{coll: "system.indexes", command: index}

case Connection.insert(repo, query, opts) do
case Connection.insert(repo, query, db_opt(command.prefix) ++ opts) do
{:ok, _} -> :ok
{:invalid, [unique: index]} -> raise Connection.format_constraint_error(index)
end
end

def execute_ddl(repo, {:drop, %Index{name: name, table: coll}}, opts) do
command(repo, [dropIndexes: coll, index: to_string(name)], opts)
def execute_ddl(repo, {:drop, %Index{name: name, table: coll, prefix: p}}, opts) do
command(repo, [dropIndexes: coll, index: to_string(name)], db_opt(p) ++ opts)
:ok
end

def execute_ddl(repo, {:drop, %Table{name: coll}}, opts) do
command(repo, [drop: coll], opts)
def execute_ddl(repo, {:drop, %Table{name: coll, prefix: p}}, opts) do
command(repo, [drop: coll], db_opt(p) ++ opts)
:ok
end

def execute_ddl(repo, {:rename, %Table{name: old}, %Table{name: new}}, opts) do
command = [renameCollection: namespace(repo, old), to: namespace(repo, new)]
command(repo, command, [database: "admin"] ++ opts)
def execute_ddl(repo, {:rename, %Table{name: old, prefix: pold}, %Table{name: new, prefix: pnew}}, opts) do
command = [renameCollection: namespace(repo, old, pold), to: namespace(repo, new, pnew)]
command(repo, command, db_opt("admin") ++ opts)
:ok
end

def execute_ddl(repo, {:rename, %Table{name: coll}, old, new}, opts) do
def execute_ddl(repo, {:rename, %Table{name: coll, prefix: p}, old, new}, opts) do
# TODO: move this into normalized query for consistent coverage
query = %WriteQuery{coll: to_string(coll),
command: ["$rename": [{to_string(old), to_string(new)}]],
opts: [multi: true]}

{:ok, _} = Connection.update(repo, query, opts)
{:ok, _} = Connection.update(repo, query, db_opt(p) ++ opts)
:ok
end

Expand Down Expand Up @@ -826,6 +828,7 @@ defmodule Mongo.Ecto do
end

defp list_collections(_,repo, opts) do
# TODO: move this into normalized query for consistent coverage
query = %ReadQuery{coll: "system.namespaces", query: @list_collections_query}
opts = Keyword.put(opts, :log, false)

Expand All @@ -837,13 +840,19 @@ defmodule Mongo.Ecto do
end

defp truncate_collection(repo, collection, opts) do
# TODO: move this into normalized query for consistent coverage
query = %WriteQuery{coll: collection, query: %{}}
Connection.delete_all(repo, query, opts)
end

defp namespace(repo, coll) do
"#{repo.config[:database]}.#{coll}"
end
defp namespace(repo, coll, nil), do: "#{repo.config[:database]}.#{coll}"
defp namespace(repo, coll, prefix), do: "#{normalize_dbname(prefix)}.#{coll}"

defp normalize_dbname(nil), do: nil
defp normalize_dbname(other), do: to_string(other) # sometimes we get atoms here, which can be problematic

defp db_opt(nil), do: []
defp db_opt(prefix), do: [database: normalize_dbname(prefix)]

defp db_version(repo) do
version = command(repo, %{"buildinfo": 1}, [])["versionArray"]
Expand Down
33 changes: 19 additions & 14 deletions lib/mongo_ecto/normalized_query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@ defmodule Mongo.Ecto.NormalizedQuery do
@moduledoc false

defstruct coll: nil, pk: nil, params: {}, query: %{}, projection: %{},
order: %{}, fields: [], database: nil, opts: []
order: %{}, fields: [], opts: []
end

defmodule WriteQuery do
@moduledoc false

defstruct coll: nil, query: %{}, command: %{}, database: nil, opts: []
defstruct coll: nil, query: %{}, command: %{}, opts: []
end

defmodule CommandQuery do
@moduledoc false

defstruct command: nil, database: nil, opts: []
defstruct command: nil, opts: []
end

defmodule CountQuery do
@moduledoc false

defstruct coll: nil, pk: nil, fields: [], query: %{}, database: nil, opts: []
defstruct coll: nil, pk: nil, fields: [], query: %{}, opts: []
end

defmodule AggregateQuery do
@moduledoc false

defstruct coll: nil, pk: nil, fields: [], pipeline: [], database: nil, opts: []
defstruct coll: nil, pk: nil, fields: [], pipeline: [], opts: []
end

alias Mongo.Ecto.Conversions
Expand Down Expand Up @@ -62,12 +62,12 @@ defmodule Mongo.Ecto.NormalizedQuery do
defp find_all(original, query, projection, fields, params, {coll, _, pk} = from) do
%ReadQuery{coll: coll, pk: pk, params: params, query: query, fields: fields,
projection: projection, order: order(original, from),
database: original.prefix, opts: limit_skip(original, params, from)}
opts: select_database(original.prefix) ++ limit_skip(original, params, from)}
end

defp count(original, query, fields, params, {coll, _, pk} = from) do
%CountQuery{coll: coll, query: query, opts: limit_skip(original, params, from),
pk: pk, fields: fields, database: original.prefix}
pk: pk, fields: fields, opts: select_database(original.prefix)}
end

defp aggregate(original, query, pipeline, fields, params, {coll, _, pk} = from) do
Expand All @@ -83,7 +83,7 @@ defmodule Mongo.Ecto.NormalizedQuery do
if query != %{}, do: [["$match": query] | pipeline], else: pipeline

%AggregateQuery{coll: coll, pipeline: pipeline, pk: pk, fields: fields,
database: original.prefix}
opts: select_database(original.prefix)}
end

def update_all(%Query{} = original, params) do
Expand All @@ -95,14 +95,14 @@ defmodule Mongo.Ecto.NormalizedQuery do
query = query(original, params, from)
command = command(:update, original, params, from)

%WriteQuery{coll: coll, query: query, command: command, database: original.prefix}
%WriteQuery{coll: coll, query: query, command: command, opts: select_database(original.prefix)}
end

def update(%{source: {prefix, coll}, schema: schema}, fields, filter) do
command = command(:update, fields, primary_key(schema))
query = query(filter, primary_key(schema))

%WriteQuery{coll: coll, query: query, database: prefix, command: command}
%WriteQuery{coll: coll, query: query, command: command, opts: select_database(prefix)}
end

def delete_all(%Query{} = original, params) do
Expand All @@ -113,23 +113,23 @@ defmodule Mongo.Ecto.NormalizedQuery do
coll = coll(from)
query = query(original, params, from)

%WriteQuery{coll: coll, query: query, database: original.prefix}
%WriteQuery{coll: coll, query: query, opts: select_database(original.prefix)}
end

def delete(%{source: {prefix, coll}, schema: schema}, filter) do
query = query(filter, primary_key(schema))

%WriteQuery{coll: coll, query: query, database: prefix}
%WriteQuery{coll: coll, query: query, opts: select_database(prefix)}
end

def insert(%{source: {prefix, coll}, schema: schema}, document) do
command = command(:insert, document, primary_key(schema))

%WriteQuery{coll: coll, command: command, database: prefix}
%WriteQuery{coll: coll, command: command, opts: select_database(prefix)}
end

def command(command, opts) do
%CommandQuery{command: command, database: Keyword.get(opts, :database, nil)}
%CommandQuery{command: command, opts: select_database(Keyword.get(opts, :database))}
end

defp from(%Query{from: {coll, model}}) do
Expand Down Expand Up @@ -274,6 +274,11 @@ defmodule Mongo.Ecto.NormalizedQuery do
defp offset_limit(%Query.QueryExpr{expr: expr}, params, pk, query, where),
do: value(expr, params, pk, query, where)

defp select_database(nil),
do: []
defp select_database(db_prefix),
do: [database: to_string(db_prefix)]

defp primary_key(nil),
do: nil
defp primary_key(schema) do
Expand Down