From 18bc7f9739d17e679a897ce0f5a78345e1a97d0a Mon Sep 17 00:00:00 2001 From: Morgan Aubert Date: Tue, 30 Jan 2024 22:13:01 -0500 Subject: [PATCH] #11 - Add support for bulk records creation --- .../reference/query-set.md | 51 ++++++ spec/marten/db/connection/mysql_spec.cr | 7 + spec/marten/db/connection/postgresq_spec.cr | 7 + spec/marten/db/connection/sqlite_spec.cr | 17 ++ spec/marten/db/model/querying_spec.cr | 24 +++ spec/marten/db/query/set_spec.cr | 153 ++++++++++++++++++ .../db/query/set_spec/models/tag_with_uuid.cr | 12 ++ src/marten/db/connection/base.cr | 15 ++ src/marten/db/connection/mysql.cr | 29 ++++ src/marten/db/connection/postgresql.cr | 41 +++++ src/marten/db/connection/sqlite.cr | 47 ++++++ src/marten/db/model/persistence.cr | 19 ++- src/marten/db/model/querying.cr | 33 ++++ src/marten/db/model/table.cr | 74 +++++---- src/marten/db/query/set.cr | 97 +++++++++++ 15 files changed, 582 insertions(+), 44 deletions(-) create mode 100644 spec/marten/db/query/set_spec/models/tag_with_uuid.cr diff --git a/docs/docs/models-and-databases/reference/query-set.md b/docs/docs/models-and-databases/reference/query-set.md index a2b1fc942..c1b1598c8 100644 --- a/docs/docs/models-and-databases/reference/query-set.md +++ b/docs/docs/models-and-databases/reference/query-set.md @@ -256,6 +256,57 @@ The value passed to `#using` must be a valid database alias that was used to con Query sets also provide a set of methods that will usually result in specific SQL queries to be executed in order to return values that don't correspond to new query sets. +### `bulk_create` + +Bulk inserts the passed model instances into the database. + +This method allows to insert multiple model instances into the database in a single query. This can be useful when dealing with large amounts of data that need to be inserted into the database. For example: + +```crystal +query_set = Post.all +query_set.bulk_create( + [ + Post.new(title: "First post"), + Post.new(title: "Second post"), + Post.new(title: "Third post"), + ] +) +``` + +An optional `batch_size` argument can be passed to this method in order to specify the number of records that should be inserted in a single query. By default, all records are inserted in a single query (except for SQLite databases where the limit of variables in a single query is 999). For example: + +```crystal +query_set = Post.all +query_set.bulk_create( + [ + Post.new(title: "First post"), + Post.new(title: "Second post"), + Post.new(title: "Third post"), + ], + batch_size: 2 +) +``` + +:::tip +The `#bulk_create` model can also be called directly on model classes: + +```crystal +Post.bulk_create( + [ + Post.new(title: "First post"), + Post.new(title: "Second post"), + Post.new(title: "Third post"), + ] +) +``` +::: + +It is worth mentioning that this method has a few caveats: + +* The specified records are assumed to be valid and no [callbacks](../callbacks.md) will be called on them. +* Bulk-creating records making use of multi-table inheritance is not supported. +* If the model's primary key field is auto-incremented at the database level, the newly inserted primary keys will only be assigned to records on certain databases that support retrieving bulk-inserted rows (namely PostgreSQL and SQLite). + ### `count` Returns the number of records that are targeted by the current query set. diff --git a/spec/marten/db/connection/mysql_spec.cr b/spec/marten/db/connection/mysql_spec.cr index 43d881109..b490259b3 100644 --- a/spec/marten/db/connection/mysql_spec.cr +++ b/spec/marten/db/connection/mysql_spec.cr @@ -2,6 +2,13 @@ require "./spec_helper" for_mysql do describe Marten::DB::Connection::MySQL do + describe "#bulk_batch_size" do + it "returns the specified records count" do + conn = Marten::DB::Connection.default + conn.bulk_batch_size(records_count: 1000, values_count: 10).should eq 1000 + end + end + describe "#distinct_clause_for" do it "returns the expected distinct clause if no column names are specified" do conn = Marten::DB::Connection.default diff --git a/spec/marten/db/connection/postgresq_spec.cr b/spec/marten/db/connection/postgresq_spec.cr index 6600de2a0..0b2926995 100644 --- a/spec/marten/db/connection/postgresq_spec.cr +++ b/spec/marten/db/connection/postgresq_spec.cr @@ -2,6 +2,13 @@ require "./spec_helper" for_postgresql do describe Marten::DB::Connection::PostgreSQL do + describe "#bulk_batch_size" do + it "returns the specified records count" do + conn = Marten::DB::Connection.default + conn.bulk_batch_size(records_count: 1000, values_count: 10).should eq 1000 + end + end + describe "#distinct_clause_for" do it "returns the expected distinct clause if no column names are specified" do conn = Marten::DB::Connection.default diff --git a/spec/marten/db/connection/sqlite_spec.cr b/spec/marten/db/connection/sqlite_spec.cr index e4c3d09f3..2154a128f 100644 --- a/spec/marten/db/connection/sqlite_spec.cr +++ b/spec/marten/db/connection/sqlite_spec.cr @@ -2,6 +2,23 @@ require "./spec_helper" for_sqlite do describe Marten::DB::Connection::PostgreSQL do + describe "#bulk_batch_size" do + it "returns 500 if the values count is 1" do + conn = Marten::DB::Connection.default + conn.bulk_batch_size(records_count: 1000, values_count: 1).should eq 500 + end + + it "returns the expected value if the values count is greater than 1" do + conn = Marten::DB::Connection.default + conn.bulk_batch_size(records_count: 1000, values_count: 89).should eq 999 // 89 + end + + it "returns the records count if no values will be inserted" do + conn = Marten::DB::Connection.default + conn.bulk_batch_size(records_count: 1000, values_count: 0).should eq 1000 + end + end + describe "#distinct_clause_for" do it "returns the expected distinct clause if no column names are specified" do conn = Marten::DB::Connection.default diff --git a/spec/marten/db/model/querying_spec.cr b/spec/marten/db/model/querying_spec.cr index 6dec21923..3df76ca4d 100644 --- a/spec/marten/db/model/querying_spec.cr +++ b/spec/marten/db/model/querying_spec.cr @@ -43,6 +43,30 @@ describe Marten::DB::Model::Querying do end end + describe "::bulk_create" do + it "allows to insert an array of records without specifying a batch size" do + objects = (1..100).map do |i| + Tag.new(name: "tag #{i}", is_active: true) + end + + inserted_objects = Tag.bulk_create(objects) + + inserted_objects.size.should eq objects.size + Tag.filter(name__in: objects.map(&.name)).count.should eq objects.size + end + + it "allows to insert a small array of records while specifying a batch size" do + objects = (1..100).map do |i| + Tag.new(name: "tag #{i}", is_active: true) + end + + inserted_objects = Tag.bulk_create(objects, batch_size: 10) + + inserted_objects.size.should eq objects.size + Tag.filter(name__in: objects.map(&.name)).count.should eq objects.size + end + end + describe "::count" do it "returns the expected number of records when no field is specified" do Tag.create!(name: "ruby", is_active: true) diff --git a/spec/marten/db/query/set_spec.cr b/spec/marten/db/query/set_spec.cr index 8c9855133..7dec69ed0 100644 --- a/spec/marten/db/query/set_spec.cr +++ b/spec/marten/db/query/set_spec.cr @@ -406,6 +406,159 @@ describe Marten::DB::Query::Set do end end + describe "#bulk_create" do + it "allows to insert a small array of records without specifying a batch size" do + objects = (1..100).map do |i| + Tag.new(name: "tag #{i}", is_active: true) + end + + inserted_objects = Marten::DB::Query::Set(Tag).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + Tag.filter(name__in: objects.map(&.name)).count.should eq objects.size + end + + it "allows to insert a large array of records without specifying a batch size" do + objects = (1..5_000).map do |i| + Tag.new(name: "tag #{i}", is_active: true) + end + + inserted_objects = Marten::DB::Query::Set(Tag).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + Tag.filter(name__in: objects.map(&.name)).count.should eq objects.size + end + + it "allows to insert a small array of records while specifying a batch size" do + objects = (1..100).map do |i| + Tag.new(name: "tag #{i}", is_active: true) + end + + inserted_objects = Marten::DB::Query::Set(Tag).new.bulk_create(objects, batch_size: 10) + + inserted_objects.size.should eq objects.size + Tag.filter(name__in: objects.map(&.name)).count.should eq objects.size + end + + it "allows to insert a large array of records while specifying a batch size" do + objects = (1..5_000).map do |i| + Tag.new(name: "tag #{i}", is_active: true) + end + + inserted_objects = Marten::DB::Query::Set(Tag).new.bulk_create(objects, batch_size: 500) + + inserted_objects.size.should eq objects.size + Tag.filter(name__in: objects.map(&.name)).count.should eq objects.size + end + + it "properly calls the fields' before_save logic to ensure they can set default values on records" do + objects = (1..10).map do |i| + TestUser.new(username: "jd#{i}", email: "jd#{i}@example.com", first_name: "John", last_name: "Doe") + end + + inserted_objects = Marten::DB::Query::Set(TestUser).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + TestUser.filter(username__in: objects.map(&.username)).count.should eq objects.size + inserted_objects.all? { |o| !o.created_at.nil? }.should be_true + end + + it "properly marks created objects as persisted" do + objects = (1..10).map do |i| + TestUser.new(username: "jd#{i}", email: "jd#{i}@example.com", first_name: "John", last_name: "Doe") + end + + inserted_objects = Marten::DB::Query::Set(TestUser).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + TestUser.filter(username__in: objects.map(&.username)).count.should eq objects.size + inserted_objects.all?(&.persisted?).should be_true + end + + it "inserts records with already assigned pks when no batch size is specified" do + objects = (1..100).map do |i| + Marten::DB::Query::SetSpec::TagWithUUID.new(label: "tag #{i}") + end + + inserted_objects = Marten::DB::Query::Set(Marten::DB::Query::SetSpec::TagWithUUID).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + Marten::DB::Query::SetSpec::TagWithUUID.filter(label__in: objects.map(&.label)).count.should eq objects.size + inserted_objects.all?(&.persisted?).should be_true + inserted_objects.all?(&.pk?).should be_true + end + + it "inserts records with already assigned pks when a batch size is specified" do + objects = (1..100).map do |i| + Marten::DB::Query::SetSpec::TagWithUUID.new(label: "tag #{i}") + end + + inserted_objects = Marten::DB::Query::Set(Marten::DB::Query::SetSpec::TagWithUUID).new.bulk_create( + objects, + batch_size: 10 + ) + + inserted_objects.size.should eq objects.size + Marten::DB::Query::SetSpec::TagWithUUID.filter(label__in: objects.map(&.label)).count.should eq objects.size + inserted_objects.all?(&.persisted?).should be_true + inserted_objects.all?(&.pk?).should be_true + end + + it "inserts records that have null values" do + objects = (1..10).map do |i| + user = TestUser.create!(username: "jd#{i}", email: "jd#{i}@example.com", first_name: "John", last_name: "Doe") + TestUserProfile.new(user: user, bio: i % 2 == 0 ? "Bio #{i}" : nil) + end + + inserted_objects = Marten::DB::Query::Set(TestUserProfile).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + TestUserProfile.filter(user_id__in: objects.map(&.user_id)).count.should eq objects.size + inserted_objects.all?(&.persisted?).should be_true + end + + for_db_backends :postgresql, :sqlite do + it "#properly assigns the returned objects' pks when they don't have one already" do + objects = (1..10).map do |i| + TestUser.new(username: "jd#{i}", email: "jd#{i}@example.com", first_name: "John", last_name: "Doe") + end + + inserted_objects = Marten::DB::Query::Set(TestUser).new.bulk_create(objects) + + inserted_objects.size.should eq objects.size + TestUser.filter(username__in: objects.map(&.username)).count.should eq objects.size + inserted_objects.all?(&.pk?).should be_true + end + end + + it "raises an ArgumentError if the specified batch size is less than 1" do + expect_raises(ArgumentError, "Batch size must be greater than 1") do + Marten::DB::Query::Set(Tag).new.bulk_create([] of Tag, batch_size: 0) + end + + expect_raises(ArgumentError, "Batch size must be greater than 1") do + Marten::DB::Query::Set(Tag).new.bulk_create([] of Tag, batch_size: -1) + end + end + + it "raises the expected exception if the targeted model inherits from concrete models" do + address = Marten::DB::Query::SetSpec::Address.create!(street: "Street 1") + student = Marten::DB::Query::SetSpec::Student.create!( + name: "Student 1", + email: "student-1@example.com", + address: address, + grade: "10" + ) + + expect_raises( + Marten::DB::Errors::UnmetQuerySetCondition, + "Bulk creation is not supported for multi table inherited model records" + ) do + Marten::DB::Query::Set(Marten::DB::Query::SetSpec::Student).new.bulk_create([student]) + end + end + end + describe "#count" do it "returns the expected number of record for an unfiltered query set" do Tag.create!(name: "ruby", is_active: true) diff --git a/spec/marten/db/query/set_spec/models/tag_with_uuid.cr b/spec/marten/db/query/set_spec/models/tag_with_uuid.cr new file mode 100644 index 000000000..c5209f562 --- /dev/null +++ b/spec/marten/db/query/set_spec/models/tag_with_uuid.cr @@ -0,0 +1,12 @@ +module Marten::DB::Query::SetSpec + class TagWithUUID < Marten::Model + field :id, :uuid, primary_key: true + field :label, :string, max_size: 128 + + after_initialize :initialize_id + + def initialize_id + @id ||= UUID.random + end + end +end diff --git a/src/marten/db/connection/base.cr b/src/marten/db/connection/base.cr index a3cd79f85..9f52ac677 100644 --- a/src/marten/db/connection/base.cr +++ b/src/marten/db/connection/base.cr @@ -14,6 +14,21 @@ module Marten @url = build_url end + # Returns the batch size to use when inserting multiple rows in a specific table. + abstract def bulk_batch_size(records_count : Int32, values_count : Int32) : Int32 + + # Allows to insert multiple rows in a specific table and returns the primary key values for the inserted rows. + # + # This method allow inserting individual rows defined in `values` in the `table_name` table. When + # `pk_column_to_fetch` is specified, the primary key values for the inserted rows will be returned. Note that + # this method can return `nil` if the underlying database does not support returning primary key values for bulk + # inserts. + abstract def bulk_insert( + table_name : String, + values : Array(Hash(String, ::DB::Any)), + pk_column_to_fetch : String? = nil + ) : Array(::DB::Any)? + # Returns a distinct clause to remove duplicates from a query's results. # # If column names are specified, only these specific columns will be checked to identify duplicates. diff --git a/src/marten/db/connection/mysql.cr b/src/marten/db/connection/mysql.cr index 30a79f79e..c1a17e29b 100644 --- a/src/marten/db/connection/mysql.cr +++ b/src/marten/db/connection/mysql.cr @@ -2,6 +2,35 @@ module Marten module DB module Connection class MySQL < Base + def bulk_batch_size(records_count : Int32, values_count : Int32) : Int32 + records_count + end + + def bulk_insert( + table_name : String, + values : Array(Hash(String, ::DB::Any)), + pk_column_to_fetch : String? = nil + ) : Array(::DB::Any)? + column_names = values[0].keys.join(", ") { |column_name| "#{quote(column_name)}" } + + index = 0 + numbered_values = values.map do |raw_values| + raw_values.keys.map do |_c| + index += 1 + parameter_id_for_ordered_argument(index) + end.join(", ") + end + + statement = "INSERT INTO #{quote(table_name)} (#{column_names}) " \ + "VALUES #{numbered_values.map { |v| "(#{v})" }.join(", ")}" + + open do |db| + db.exec(statement, args: values.flat_map(&.values)) + end + + nil + end + def distinct_clause_for(columns : Array(String)) : String return DISTINCT_CLAUSE if columns.empty? raise NotImplementedError.new("DISTINCT ON columns is not supported by this connection implementation") diff --git a/src/marten/db/connection/postgresql.cr b/src/marten/db/connection/postgresql.cr index 92309e781..f1fcfe835 100644 --- a/src/marten/db/connection/postgresql.cr +++ b/src/marten/db/connection/postgresql.cr @@ -2,6 +2,47 @@ module Marten module DB module Connection class PostgreSQL < Base + def bulk_batch_size(records_count : Int32, values_count : Int32) : Int32 + records_count + end + + def bulk_insert( + table_name : String, + values : Array(Hash(String, ::DB::Any)), + pk_column_to_fetch : String? = nil + ) : Array(::DB::Any)? + column_names = values[0].keys.join(", ") { |column_name| "#{quote(column_name)}" } + + index = 0 + numbered_values = values.map do |raw_values| + raw_values.keys.map do |_c| + index += 1 + parameter_id_for_ordered_argument(index) + end.join(", ") + end + + statement = "INSERT INTO #{quote(table_name)} (#{column_names}) " \ + "VALUES #{numbered_values.map { |v| "(#{v})" }.join(", ")}" + statement += " RETURNING #{quote(pk_column_to_fetch)}" unless pk_column_to_fetch.nil? + + new_record_ids = nil + + open do |db| + if pk_column_to_fetch + new_record_ids = [] of ::DB::Any + db.query(statement, args: values.flat_map(&.values)) do |result_set| + result_set.each do + new_record_ids << result_set.read(::DB::Any) + end + end + else + db.exec(statement, args: values.flat_map(&.values)) + end + end + + new_record_ids + end + def distinct_clause_for(columns : Array(String)) : String columns.empty? ? DISTINCT_CLAUSE : "#{DISTINCT_CLAUSE} ON (#{columns.join(", ")})" end diff --git a/src/marten/db/connection/sqlite.cr b/src/marten/db/connection/sqlite.cr index 1163a9ed7..87ec33950 100644 --- a/src/marten/db/connection/sqlite.cr +++ b/src/marten/db/connection/sqlite.cr @@ -2,6 +2,53 @@ module Marten module DB module Connection class SQLite < Base + def bulk_batch_size(records_count : Int32, values_count : Int32) : Int32 + if values_count == 1 + 500 + elsif values_count > 1 + 999 // values_count + else + records_count + end + end + + def bulk_insert( + table_name : String, + values : Array(Hash(String, ::DB::Any)), + pk_column_to_fetch : String? = nil + ) : Array(::DB::Any)? + column_names = values[0].keys.join(", ") { |column_name| "#{quote(column_name)}" } + + index = 0 + numbered_values = values.map do |raw_values| + raw_values.keys.map do |_c| + index += 1 + parameter_id_for_ordered_argument(index) + end.join(", ") + end + + statement = "INSERT INTO #{quote(table_name)} (#{column_names}) " \ + "VALUES #{numbered_values.map { |v| "(#{v})" }.join(", ")}" + statement += " RETURNING #{quote(pk_column_to_fetch)}" unless pk_column_to_fetch.nil? + + new_record_ids = nil + + open do |db| + if pk_column_to_fetch + new_record_ids = [] of ::DB::Any + db.query(statement, args: values.flat_map(&.values)) do |result_set| + result_set.each do + new_record_ids << result_set.read(::DB::Any) + end + end + else + db.exec(statement, args: values.flat_map(&.values)) + end + end + + new_record_ids + end + def distinct_clause_for(columns : Array(String)) : String return DISTINCT_CLAUSE if columns.empty? raise NotImplementedError.new("DISTINCT ON columns is not supported by this connection implementation") diff --git a/src/marten/db/model/persistence.cr b/src/marten/db/model/persistence.cr index fe7702541..a44e23573 100644 --- a/src/marten/db/model/persistence.cr +++ b/src/marten/db/model/persistence.cr @@ -204,8 +204,11 @@ module Marten protected setter new_record - private def auto_increment_field?(pk_field : Field::Base) : Bool - (pk_field.is_a?(Field::BigInt) || pk_field.is_a?(Field::Int)) && pk_field.auto? + protected def prepare_fields_for_save : Nil + self.class.fields.each do |field| + next if field.primary_key? + field.prepare_save(self, new_record: !persisted?) + end end private def insert_or_update(connection) @@ -222,10 +225,7 @@ module Marten # Notifies each field so that they have the chance to apply changes to the model instance before the actual # save operation. - self.class.fields.each do |field| - next if field.primary_key? - field.prepare_save(self, new_record: !persisted?) - end + prepare_fields_for_save run_before_save_callbacks @@ -262,7 +262,7 @@ module Marten values = local_field_db_values pk_field = self.class.pk_field - if auto_increment_field?(pk_field) + if self.class.auto_increment_pk_field? pk_field_to_fetch = pk_field.db_column! values.delete(pk_field_to_fetch) else @@ -280,9 +280,8 @@ module Marten private def insert_parent(parent_model, connection) values = parent_model_field_db_values(parent_model) - parent_pk_field = parent_model.pk_field - if auto_increment_field?(parent_pk_field) - pk_field_to_fetch = parent_pk_field.db_column! + if parent_model.auto_increment_pk_field? + pk_field_to_fetch = parent_model.pk_field.db_column! values.delete(pk_field_to_fetch) else pk_field_to_fetch = nil diff --git a/src/marten/db/model/querying.cr b/src/marten/db/model/querying.cr index b3d1ba0f5..fe074e6bc 100644 --- a/src/marten/db/model/querying.cr +++ b/src/marten/db/model/querying.cr @@ -45,6 +45,39 @@ module Marten exists? end + # Bulk inserts the passed model instances into the database. + # + # This method allows to insert multiple model instances into the database in a single query. This can be + # useful when dealing with large amounts of data that need to be inserted into the database. For example: + # + # ``` + # Post.bulk_create( + # [ + # Post.new(title: "First post"), + # Post.new(title: "Second post"), + # Post.new(title: "Third post"), + # ] + # ) + # ``` + # + # An optional `batch_size` argument can be passed to this method in order to specify the number of records + # that should be inserted in a single query. By default, all records are inserted in a single query (except + # for SQLite databases where the limit of variables in a single query is 999). For example: + # + # ``` + # Post.bulk_create( + # [ + # Post.new(title: "First post"), + # Post.new(title: "Second post"), + # Post.new(title: "Third post"), + # ], + # batch_size: 2 + # ) + # ``` + def bulk_create(objects : Array(self), batch_size : Int32? = nil) + default_queryset.bulk_create(objects, batch_size) + end + # Returns the total count of records for the considered model. # # This method returns the total count of records for the considered model. If a field is specified, the method diff --git a/src/marten/db/model/table.cr b/src/marten/db/model/table.cr index c89166635..e26463e2b 100644 --- a/src/marten/db/model/table.cr +++ b/src/marten/db/model/table.cr @@ -148,6 +148,11 @@ module Marten @@local_reverse_relations << reverse_relation end + protected def auto_increment_pk_field? : Bool + field : Field::Base = pk_field + (field.is_a?(Field::BigInt) || field.is_a?(Field::Int)) && field.auto? + end + protected def local_fields_per_column @@local_fields_per_column end @@ -355,6 +360,41 @@ module Marten {% end %} end + def inspect(io) + io << "#<#{self.class.name}:0x#{object_id.to_s(16)} " + io << "#{self.class.pk_field.id}: #{pk.inspect}" + {% for field_var in @type.instance_vars + .select { |ivar| ivar.annotation(Marten::DB::Model::Table::FieldInstanceVariable) } %} + {% ann = field_var.annotation(Marten::DB::Model::Table::FieldInstanceVariable) %} + {% unless ann[:field_kwargs] && ann[:field_kwargs][:primary_key] %} + io << ", " + io << {{ field_var.name.stringify }} + ": #{{{ field_var.id }}.inspect}" + {% end %} + {% end %} + io << ">" + end + + # :nodoc: + def local_field_db_values + {% begin %} + values = {} of String => ::DB::Any + + {% + local_field_vars = @type.instance_vars.select do |ivar| + ann = ivar.annotation(Marten::DB::Model::Table::FieldInstanceVariable) + ann && ann[:model_klass].id == @type.name.id + end + %} + + {% for field_var in local_field_vars %} + field = self.class.get_field({{ field_var.name.stringify }}) + values[field.db_column!] = field.to_db({{ field_var.id }}) if field.db_column? + {% end %} + + values + {% end %} + end + # Returns the primary key value. def pk {% begin %} @@ -413,20 +453,6 @@ module Marten inspect(io) end - def inspect(io) - io << "#<#{self.class.name}:0x#{object_id.to_s(16)} " - io << "#{self.class.pk_field.id}: #{pk.inspect}" - {% for field_var in @type.instance_vars - .select { |ivar| ivar.annotation(Marten::DB::Model::Table::FieldInstanceVariable) } %} - {% ann = field_var.annotation(Marten::DB::Model::Table::FieldInstanceVariable) %} - {% unless ann[:field_kwargs] && ann[:field_kwargs][:primary_key] %} - io << ", " - io << {{ field_var.name.stringify }} + ": #{{{ field_var.id }}.inspect}" - {% end %} - {% end %} - io << ">" - end - protected def assign_local_field_from_db_result_set(result_set : ::DB::ResultSet, column_name : String) {% begin %} field = self.class.local_fields_per_column[column_name]? @@ -594,26 +620,6 @@ module Marten {% end %} end - private def local_field_db_values - {% begin %} - values = {} of String => ::DB::Any - - {% - local_field_vars = @type.instance_vars.select do |ivar| - ann = ivar.annotation(Marten::DB::Model::Table::FieldInstanceVariable) - ann && ann[:model_klass].id == @type.name.id - end - %} - - {% for field_var in local_field_vars %} - field = self.class.get_field({{ field_var.name.stringify }}) - values[field.db_column!] = field.to_db({{ field_var.id }}) if field.db_column? - {% end %} - - values - {% end %} - end - private def parent_model_field_db_values(model_klass) {% begin %} values = {} of String => ::DB::Any diff --git a/src/marten/db/query/set.cr b/src/marten/db/query/set.cr index 772d5c311..1d6199bca 100644 --- a/src/marten/db/query/set.cr +++ b/src/marten/db/query/set.cr @@ -97,6 +97,75 @@ module Marten exists? end + # Bulk inserts the passed model instances into the database. + # + # This method allows to insert multiple model instances into the database in a single query. This can be useful + # when dealing with large amounts of data that need to be inserted into the database. For example: + # + # ``` + # query_set = Post.all + # query_set.bulk_create( + # [ + # Post.new(title: "First post"), + # Post.new(title: "Second post"), + # Post.new(title: "Third post"), + # ] + # ) + # ``` + # + # An optional `batch_size` argument can be passed to this method in order to specify the number of records that + # should be inserted in a single query. By default, all records are inserted in a single query (except for + # SQLite databases where the limit of variables in a single query is 999). For example: + # + # ``` + # query_set = Post.all + # query_set.bulk_create( + # [ + # Post.new(title: "First post"), + # Post.new(title: "Second post"), + # Post.new(title: "Third post"), + # ], + # batch_size: 2 + # ) + # ``` + def bulk_create(objects : Array(M), batch_size : Int32? = nil) + if !batch_size.nil? && batch_size < 1 + raise ArgumentError.new("Batch size must be greater than 1") + end + + return objects if objects.empty? + + # Check that objects are not descendants of concrete models (multi table inheritance). + if !M.parent_models.empty? + raise Errors::UnmetQuerySetCondition.new( + "Bulk creation is not supported for multi table inherited model records" + ) + end + + query.connection.transaction do + objects_with_pk, objects_without_pk = objects.partition(&.pk?) + + if !objects_with_pk.empty? + perform_batched_insert(objects_with_pk, batch_size) + end + + if !objects_without_pk.empty? + inserted_pks = perform_batched_insert(objects_without_pk, batch_size) + + if !inserted_pks.empty? + objects_without_pk.zip(inserted_pks).each do |object, pk| + object.pk = pk.as?(Field::Any) + end + end + end + end + + # Mark all objects as persisted. + objects.each(&.new_record=(false)) + + objects + end + # Returns the number of records that are targeted by the current query set. def count(field : String | Symbol | Nil = nil) @result_cache.nil? || !field.nil? ? @query.count(field.try(&.to_s)) : @result_cache.not_nil!.size @@ -980,6 +1049,34 @@ module Marten qs end + private def perform_batched_insert(objects : Array(M), batch_size : Int32? = nil) + max_batch_size = @query.connection.bulk_batch_size(objects.size, M.local_fields.count(&.db_column?)) + effective_batch_size = batch_size.nil? ? max_batch_size : [batch_size, max_batch_size].min + + inserted_pks = Array(::DB::Any).new + + pk_column_to_fetch = M.auto_increment_pk_field? ? M.pk_field.db_column! : nil + + objects.each_slice(effective_batch_size) do |sliced_objects| + # Ensure all objects' fields are prepared for save before inserting them. This is necessary to ensure that + # fields like creation timestamp are properly set. + sliced_objects.each(&.prepare_fields_for_save) + values_to_insert = sliced_objects.map do |o| + values = o.local_field_db_values + values.delete(pk_column_to_fetch) if !pk_column_to_fetch.nil? + values + end + + result = @query.connection.bulk_insert(M.db_table, values_to_insert, pk_column_to_fetch) + + if result.is_a?(Array(::DB::Any)) + inserted_pks += result + end + end + + inserted_pks + end + private def raise_negative_indexes_not_supported raise Errors::UnmetQuerySetCondition.new("Negative indexes are not supported") end