Skip to content

Commit

Permalink
Filename and unwrapped text info for mongo dump
Browse files Browse the repository at this point in the history
  • Loading branch information
brianreicher committed Oct 10, 2023
1 parent 8268d98 commit 0c334fd
Showing 1 changed file with 45 additions and 11 deletions.
56 changes: 45 additions & 11 deletions ingestion/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use zip::write::FileOptions;
use zip::ZipWriter;
use mongo_utils::MongoDriver;
use mongodb::bson::{doc, Document};
use std::io::{Read, Write};
use zip::read::ZipArchive;


pub struct GitHubDownloader {
Expand All @@ -24,6 +26,7 @@ impl GitHubDownloader {
pub async fn download_git_zips(
&self,
urls: Vec<&str>,
zip_dir: &str,
) -> Result<(), Box<dyn Error>> {
for url in urls {
let response = self.client.get(url).send().await?;
Expand All @@ -32,22 +35,53 @@ impl GitHubDownloader {
eprintln!("Error downloading {}: {:?}", url, response.status());
continue;
}
let filename = url.split('/').last().unwrap_or("unknown.zip");
let file_path = Path::new(zip_dir).join(filename);

// copy response into memory
let mut file_buf = Vec::new();
response.copy_to(&mut file_buf)?;
let mut response_body = response.bytes_stream();
while let Some(chunk) = response_body.next().await {
let chunk = chunk?;
zip_dir.write_all(&chunk)?;
}

// create BSON
let document = doc! {
"url": response.url.to_owned(),
"data": file_buf,
println!("Downloaded: {}", file_path.display());
}

Ok(())
}

async fn mongo_insert(&self, zip_dir: &str, filter_suffix: Vec&<str>) -> mongodb::error::Result<()> {
let collection: mongodb::Collection<Document>= self.mongo_model.client.as_ref().unwrap().database(self.mongo_model.db_name).collection(self.collection_name);

// open the ZIP file
let file = File::open(zip_dir)?;
let reader = std::io::BufReader::new(file);
let mut archive = ZipArchive::new(reader)?;

for i in 0..archive.len() {
let mut file = archive.by_index(i)?;
let file_name = file.name().to_string();

// apply filtering
let found_suffix = filter_suffix.iter().any(|&suffix| file_name.ends_with(suffix));
if !found_suffix {
continue;
}

// insert into Mongostore
self.mongo_model.insert_document(self.collection, document).await?;
let mut file_content = Vec::new();
file.read_to_end(&mut file_content)?;

// clear the buffer for the next iteration
file_buf.clear();
let content_str = match str::from_utf8(&file_content) {
Ok(s) => s.to_string(),
Err(_) => continue,
};

let document = doc! {
"filename": file_name,
"text": content_str,
};

self.mongo_model.insert_document(self.collection, document).await?;
}

Ok(())
Expand Down

0 comments on commit 0c334fd

Please sign in to comment.