Skip to content

Commit

Permalink
Insert many func
Browse files Browse the repository at this point in the history
  • Loading branch information
brianreicher committed Oct 10, 2023
1 parent 6218b4a commit c5ce5ea
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
19 changes: 19 additions & 0 deletions ingestion/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ edition = "2021"
[dependencies]
mongodb = "2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json = "1.0"
futures = "0.3.28"
54 changes: 53 additions & 1 deletion ingestion/src/mongo_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use mongodb::{Client, bson::Document, options::FindOptions};
use mongodb::{Client, bson::Document, options::{FindOptions, InsertOneOptions, InsertManyOptions}};
use futures::future::try_join_all;

pub struct MongoDriver {
client: Option<Client>,
Expand Down Expand Up @@ -81,4 +82,55 @@ impl MongoDriver {

Ok(result)
}

pub async fn insert_document(
&mut self,
collection_name: &str,
document: Document,
) -> Result<(), mongodb::error::Error> {
if self.client.is_none() {
self.connect().await?;
}

let client = self.client.as_ref().unwrap();
let db = client.database(&self.db_name);
let collection = db.collection(collection_name);

let options = InsertOneOptions::default();

collection.insert_one(document, options).await?;

Ok(())
}

pub async fn insert_many_documents(
&mut self,
collection_name: &str,
documents: Vec<Document>,
) -> Result<InsertManyResult, mongodb::error::Error> {
if self.client.is_none() {
self.connect().await?;
}

let client = self.client.as_ref().unwrap();
let db = client.database(&self.db_name);
let collection = db.collection(collection_name);

let options = InsertManyOptions::default();

let write_models: Vec<WriteModel<Document>> = documents
.into_iter()
.map(|doc| WriteModel::InsertOne { document: doc })
.collect();

let insert_futures = write_models.iter().map(|model| {
let collection_ref = &collection;
collection_ref.insert_one_with_options(model, &options)
});

let results: Result<Vec<_>, _> = try_join_all(insert_futures).await;
let insert_many_result = InsertManyResult::from_results(results)?;

Ok(insert_many_result)
}
}

0 comments on commit c5ce5ea

Please sign in to comment.