Skip to content

add ordered argument to insert method #2570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 39 additions & 16 deletions mongoengine/queryset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,12 @@ def first(self):
return result

def insert(
self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None
self,
doc_or_docs,
load_bulk=True,
write_concern=None,
signal_kwargs=None,
ordered=True,
):
"""bulk insert documents

Expand All @@ -309,6 +314,11 @@ def insert(
each server being written to.
:param signal_kwargs: (optional) kwargs dictionary to be passed to
the signal calls.
:param ordered (optional): If True (the default) documents will be
inserted on the server serially, in the order provided. If an error
occurs all remaining inserts are aborted. If False, documents will
be inserted on the server in arbitrary order, possibly in parallel,
and all document inserts will be attempted.

By default returns document instances, set ``load_bulk`` to False to
return just ``ObjectIds``
Expand Down Expand Up @@ -341,12 +351,14 @@ def insert(

with set_write_concern(self._collection, write_concern) as collection:
insert_func = collection.insert_many
insert_func_kwargs = {"ordered": ordered}
if return_one:
raw = raw[0]
insert_func = collection.insert_one
insert_func_kwargs = {}

try:
inserted_result = insert_func(raw)
inserted_result = insert_func(raw, **insert_func_kwargs)
ids = (
[inserted_result.inserted_id]
if return_one
Expand All @@ -358,6 +370,17 @@ def insert(
except pymongo.errors.BulkWriteError as err:
# inserting documents that already have an _id field will
# give huge performance debt or raise
if ordered:
inserted = err.details["nInserted"]
for doc, raw_doc in zip(docs[:inserted], raw[:inserted]):
doc.pk = raw_doc["_id"]
else:
not_writed_ids = [
error["op"]["_id"] for error in err.details["writeErrors"]
]
for doc, raw_doc in zip(docs, raw):
if raw_doc["_id"] not in not_writed_ids:
doc.pk = raw_doc["_id"]
message = "Bulk write error: (%s)"
raise BulkWriteError(message % err.details)
except pymongo.errors.OperationFailure as err:
Expand Down Expand Up @@ -1715,29 +1738,29 @@ def no_dereference(self):

def _item_frequencies_map_reduce(self, field, normalize=False):
map_func = """
function() {
var path = '{{~%(field)s}}'.split('.');
function() {{
var path = '{{{{~{field}}}}}'.split('.');
var field = this;

for (p in path) {
for (p in path) {{
if (typeof field != 'undefined')
field = field[path[p]];
else
break;
}
if (field && field.constructor == Array) {
field.forEach(function(item) {
}}
if (field && field.constructor == Array) {{
field.forEach(function(item) {{
emit(item, 1);
});
} else if (typeof field != 'undefined') {
}});
}} else if (typeof field != 'undefined') {{
emit(field, 1);
} else {
}} else {{
emit(null, 1);
}
}
""" % {
"field": field
}
}}
}}
""".format(
field=field
)
reduce_func = """
function(key, values) {
var total = 0;
Expand Down
29 changes: 28 additions & 1 deletion tests/queryset/test_queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from mongoengine import *
from mongoengine.connection import get_db
from mongoengine.context_managers import query_counter, switch_db
from mongoengine.errors import InvalidQueryError
from mongoengine.errors import BulkWriteError, InvalidQueryError
from mongoengine.mongodb_support import (
MONGODB_36,
get_mongodb_version,
Expand Down Expand Up @@ -1067,6 +1067,33 @@ class Comment(Document):
com2 = Comment(id=1)
Comment.objects.insert([com1, com2])

def test_bulk_insert_ordered(self):
class Comment(Document):
name = StringField(unique=True)

Comment.drop_collection()
Comment.objects.insert(Comment(name="b"), ordered=True)
comments = [Comment(name="a"), Comment(name="b"), Comment(name="c")]
with pytest.raises(BulkWriteError):
Comment.objects.insert(comments, ordered=True)
Comment.objects.get(name="a")
with pytest.raises(DoesNotExist):
Comment.objects.get(name="c")
assert comments[0].pk is not None
assert comments[1].pk is None
assert comments[2].pk is None

Comment.drop_collection()
Comment.objects.insert(Comment(name="b"), ordered=False)
comments = [Comment(name="a"), Comment(name="b"), Comment(name="c")]
with pytest.raises(BulkWriteError):
Comment.objects.insert(comments, ordered=False)
Comment.objects.get(name="a")
Comment.objects.get(name="c")
assert comments[0].pk is not None
assert comments[1].pk is None
assert comments[2].pk is not None

def test_insert_raise_if_duplicate_in_constraint(self):
class Comment(Document):
id = IntField(primary_key=True)
Expand Down