Skip to content

Commit

Permalink
fix loading bar when ingesting catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodlz committed Dec 12, 2023
1 parent f97095a commit 298cbf2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
26 changes: 13 additions & 13 deletions kowalski/ingesters/ingest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import pandas as pd
import pyarrow.parquet as pq
from astropy.io import fits
from tqdm import tqdm
from tqdm.auto import tqdm

import kowalski.tools.istarmap as istarmap # noqa: F401
from kowalski.config import load_config
Expand Down Expand Up @@ -545,15 +545,13 @@ def verify_ids(files: list, id_col: str, format: str, num_proc: int = 4):
files_copy = deepcopy(files)

with multiprocessing.Pool(processes=num_proc) as pool:
for result in tqdm(
pool.imap(
get_file_ids,
[(file, id_col, format) for file in files],
),
total=len(files),
):
file = files_copy.pop(0)
ids_per_file[file] = result
with tqdm(total=len(files)) as pbar:
for result in pool.imap_unordered(
get_file_ids, [(file, id_col, format) for file in files]
):
file = files_copy.pop(0)
ids_per_file[file] = result
pbar.update(1)

# now we have a list of all the ids in all the files
# we want to make sure that all the ids are unique
Expand Down Expand Up @@ -662,9 +660,11 @@ def run(
total_good_documents, total_bad_documents = 0, 0
log(f"Processing {len(files)} files with {num_proc} processes")
with multiprocessing.Pool(processes=num_proc) as pool:
for result in tqdm(pool.imap(process_file, input_list), total=len(files)):
total_good_documents += result[0]
total_bad_documents += result[1]
with tqdm(total=len(files)) as pbar:
for result in pool.imap_unordered(process_file, input_list):
total_good_documents += result[0]
total_bad_documents += result[1]
pbar.update(1)

log(f"Successfully ingested {total_good_documents} documents")
log(f"Failed to ingest {total_bad_documents} documents")
Expand Down
2 changes: 2 additions & 0 deletions kowalski/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ def __init__(

if self.replica_set is not None:
conn_string += f"?replicaSet={self.replica_set}"
else:
conn_string += "?directConnection=true"

self.client = pymongo.MongoClient(conn_string)
self.db = self.client.get_database(db)
Expand Down

0 comments on commit 298cbf2

Please sign in to comment.