diff --git a/ingestion/Cargo.lock b/ingestion/Cargo.lock index ebb81c5..68a2a80 100644 --- a/ingestion/Cargo.lock +++ b/ingestion/Cargo.lock @@ -333,6 +333,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.28" @@ -340,6 +355,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -394,9 +410,11 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project-lite", @@ -555,6 +573,7 @@ dependencies = [ name = "ingestion" version = "0.1.0" dependencies = [ + "futures", "mongodb", "serde", "serde_json", diff --git a/ingestion/Cargo.toml b/ingestion/Cargo.toml index c39b0a7..16adb49 100644 --- a/ingestion/Cargo.toml +++ b/ingestion/Cargo.toml @@ -8,4 +8,5 @@ edition = "2021" [dependencies] mongodb = "2.0" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" \ No newline at end of file +serde_json = "1.0" +futures = "0.3.28" \ No newline at end of file diff --git a/ingestion/src/mongo_utils.rs b/ingestion/src/mongo_utils.rs index 2760036..c214804 100644 --- a/ingestion/src/mongo_utils.rs +++ b/ingestion/src/mongo_utils.rs @@ -1,4 +1,5 @@ -use mongodb::{Client, bson::Document, options::FindOptions}; +use mongodb::{Client, bson::Document, options::{FindOptions, InsertOneOptions, InsertManyOptions}}; +use futures::future::try_join_all; pub struct MongoDriver { client: Option, @@ -81,4 +82,55 @@ impl MongoDriver { Ok(result) } + + pub async fn insert_document( + &mut self, + collection_name: &str, + document: Document, + ) -> Result<(), mongodb::error::Error> { + if self.client.is_none() { + self.connect().await?; + } + + let client = self.client.as_ref().unwrap(); + let db = client.database(&self.db_name); + let collection = db.collection(collection_name); + + let options = InsertOneOptions::default(); + + collection.insert_one(document, options).await?; + + Ok(()) + } + + pub async fn insert_many_documents( + &mut self, + collection_name: &str, + documents: Vec, + ) -> Result { + if self.client.is_none() { + self.connect().await?; + } + + let client = self.client.as_ref().unwrap(); + let db = client.database(&self.db_name); + let collection = db.collection(collection_name); + + let options = InsertManyOptions::default(); + + let write_models: Vec> = documents + .into_iter() + .map(|doc| WriteModel::InsertOne { document: doc }) + .collect(); + + let insert_futures = write_models.iter().map(|model| { + let collection_ref = &collection; + collection_ref.insert_one_with_options(model, &options) + }); + + let results: Result, _> = try_join_all(insert_futures).await; + let insert_many_result = InsertManyResult::from_results(results)?; + + Ok(insert_many_result) + } }