Skip to content

Commit

Permalink
BSON mongo insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
brianreicher committed Oct 10, 2023
1 parent 2823103 commit 7b826b4
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
39 changes: 22 additions & 17 deletions ingestion/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,49 @@ use std::path::Path;
use zip::write::FileOptions;
use zip::ZipWriter;
use mongo_utils::MongoDriver;
use mongodb::bson::{doc, Document};


pub struct GitHubDownloader {
client: reqwest::Client,
mongo_model: MongoDriver,
collection: String,
}

impl GitHubDownloader {
pub fn new() -> Self {
pub fn new(mongo_model: &MongoDriver, collection: &str) -> Self {
let client = reqwest::Client::new();
GitHubDownloader { client }
GitHubDownloader { client, mongo_model, collection }
}

pub async fn download_and_zip_urls(
pub async fn download_git_zips(
&self,
urls: Vec<(&str, &str)>,
zip_file_path: &str,
urls: Vec<&str>,
repo_name: &str,
) -> Result<(), Box<dyn Error>> {
let file = File::create(zip_file_path)?;
let mut zip = ZipWriter::new(file);
let options = FileOptions::default()
.unix_permissions(0o755)
.compression_method(zip::CompressionMethod::Stored);

for (url, file_name) in urls {
for url in urls {
let response = self.client.get(url).send().await?;

if response.status() != reqwest::StatusCode::OK {
eprintln!("Error downloading {}: {:?}", url, response.status());
continue;
}


// copy response into memory
let mut file_buf = Vec::new();
response.copy_to(&mut file_buf)?;

// create BSON
let document = doc! {
"url": response.url.to_owned(),
"data": file_buf,
}

// response.copy_to(&mut file_buf)?;
response.read_to_end(&mut file_buf)?;
// insert into Mongostore
self.mongo_model.insert_document(self.collection, document).await?;

zip.start_file(file_name, options)?;
zip.write_all(&file_buf)?;
// clear the buffer for the next iteration
file_buf.clear();
}

Ok(())
Expand Down
14 changes: 10 additions & 4 deletions ingestion/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use ingestion::mongo_utils::MongoDriver;

use dpw
#[tokio::main]
async fn main() -> std::io::Result<()> {
let mongo: MongoDriver = MongoDriver::new("localhost", 8080, "turbine");
mongo.connect();
mongo.connect().await?;

// create/connect if exists, and flush
mongo.create_collection("github_data");
mongo.flush_collection("github_data");
mongo.create_collection("github_data").await?;
mongo.flush_collection("github_data").await?;

// set remote urls and download
let urls = vec!["https://github.com/user/repo/raw/master/file1.zip", "https://github.com/user/repo/raw/master/file2.zip"];
github_downloader.download_git_zips(urls, "my_repo").await?;


Ok(())
}

0 comments on commit 7b826b4

Please sign in to comment.