From 298cbf2991a43711bdf8f96f113a55d3e696fe1f Mon Sep 17 00:00:00 2001 From: Theodlz Date: Tue, 12 Dec 2023 15:43:13 -0800 Subject: [PATCH] fix loading bar when ingesting catalogs --- kowalski/ingesters/ingest_catalog.py | 26 +++++++++++++------------- kowalski/utils.py | 2 ++ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/kowalski/ingesters/ingest_catalog.py b/kowalski/ingesters/ingest_catalog.py index e6576493..3a316175 100644 --- a/kowalski/ingesters/ingest_catalog.py +++ b/kowalski/ingesters/ingest_catalog.py @@ -16,7 +16,7 @@ import pandas as pd import pyarrow.parquet as pq from astropy.io import fits -from tqdm import tqdm +from tqdm.auto import tqdm import kowalski.tools.istarmap as istarmap # noqa: F401 from kowalski.config import load_config @@ -545,15 +545,13 @@ def verify_ids(files: list, id_col: str, format: str, num_proc: int = 4): files_copy = deepcopy(files) with multiprocessing.Pool(processes=num_proc) as pool: - for result in tqdm( - pool.imap( - get_file_ids, - [(file, id_col, format) for file in files], - ), - total=len(files), - ): - file = files_copy.pop(0) - ids_per_file[file] = result + with tqdm(total=len(files)) as pbar: + for result in pool.imap_unordered( + get_file_ids, [(file, id_col, format) for file in files] + ): + file = files_copy.pop(0) + ids_per_file[file] = result + pbar.update(1) # now we have a list of all the ids in all the files # we want to make sure that all the ids are unique @@ -662,9 +660,11 @@ def run( total_good_documents, total_bad_documents = 0, 0 log(f"Processing {len(files)} files with {num_proc} processes") with multiprocessing.Pool(processes=num_proc) as pool: - for result in tqdm(pool.imap(process_file, input_list), total=len(files)): - total_good_documents += result[0] - total_bad_documents += result[1] + with tqdm(total=len(files)) as pbar: + for result in pool.imap_unordered(process_file, input_list): + total_good_documents += result[0] + total_bad_documents += result[1] + pbar.update(1) log(f"Successfully ingested {total_good_documents} documents") log(f"Failed to ingest {total_bad_documents} documents") diff --git a/kowalski/utils.py b/kowalski/utils.py index 4f018cd9..4aed48b7 100644 --- a/kowalski/utils.py +++ b/kowalski/utils.py @@ -347,6 +347,8 @@ def __init__( if self.replica_set is not None: conn_string += f"?replicaSet={self.replica_set}" + else: + conn_string += "?directConnection=true" self.client = pymongo.MongoClient(conn_string) self.db = self.client.get_database(db)