From 5b97cebe6b6f843203eef60d65a5ff950f63c4e3 Mon Sep 17 00:00:00 2001 From: Naoyuki Tai Date: Wed, 4 Oct 2023 12:58:29 -0400 Subject: [PATCH] Fix up the tests, and logging. --- .../sync_prod_to_gcp/sync_published_to_gcp.py | 58 ++++++++++--------- .../test/test_sync_prod_to_gcp.py | 37 +++++++----- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/script/sync_prod_to_gcp/sync_published_to_gcp.py b/script/sync_prod_to_gcp/sync_published_to_gcp.py index a2a6fa86a..ad86e3dde 100644 --- a/script/sync_prod_to_gcp/sync_published_to_gcp.py +++ b/script/sync_prod_to_gcp/sync_published_to_gcp.py @@ -44,7 +44,7 @@ from pathlib import Path from identifier import Identifier -from digester import digest_from_filepath, get_file_mtime +from digester import get_file_mtime overall_start = perf_counter() @@ -97,7 +97,6 @@ RUN = True DONE = False - def handler_stop_signals(signum, frame): """Stop threads on ctrl-c, mostly useful during testing""" global RUN @@ -259,6 +258,7 @@ def path_to_bucket_key(pdf) -> str: elif str(pdf).startswith(DATA_PREFIX): return str(pdf).replace(DATA_PREFIX, '') else: + logging.error(f"path_to_bucket_key: {pdf} does not start with {CACHE_PREFIX} or {DATA_PREFIX}") raise ValueError(f"Cannot convert PDF path {pdf} to a GS key") @@ -321,7 +321,7 @@ def arxiv_pdf_url(host, arxiv_id) -> str: else: raise (Exception(f"ensure_pdf: Could not create {pdf_file}. {url} {ms_since(start)} ms")) else: - logger.info(f"ensure_file_url_exists: {str(pdf_file)} already exists") + logger.debug(f"ensure_file_url_exists: {str(pdf_file)} already exists") return (pdf_file, url, "already exists", ms_since(start)) @@ -349,17 +349,16 @@ def mime_from_fname(filepath): with open(localpath, 'rb') as fh: destination.upload_from_file(fh, content_type=mime_from_fname(localpath)) - logger.info( + logger.debug( f"upload: completed upload of {localpath} to gs://{GS_BUCKET}/{key} of size {localpath.stat().st_size}") - sha_value = digest_from_filepath(localpath) try: - destination.metadata = {"localpath": localpath, "mtime": get_file_mtime(localpath), "sha256": sha_value} + destination.metadata = {"localpath": localpath, "mtime": get_file_mtime(localpath)} destination.update() except: pass return ("upload", localpath, key, "uploaded", ms_since(start), localpath.stat().st_size) else: - logger.info(f"upload: Not uploading {localpath}, gs://{GS_BUCKET}/{key} already on gs") + logger.debug(f"upload: Not uploading {localpath}, gs://{GS_BUCKET}/{key} already on gs") return ("upload", localpath, key, "already_on_gs", ms_since(start), 0) @@ -384,7 +383,7 @@ def sync_to_gcp(todo_q, host): break job_details = {"job": repr(job), "paper_id": job['paper_id']} - logger.info("doing %s", job['paper_id'], extra=job_details) + logger.debug("doing %s", job['paper_id'], extra=job_details) for action, item in job['actions']: extra = {"action": action, "item": str(item), "job": repr(job), "paper_id": job['paper_id']} try: @@ -395,7 +394,7 @@ def sync_to_gcp(todo_q, host): res = upload(tl_data.gs_client, Path(item), path_to_bucket_key(item)) summary_q.put((job['paper_id'], ms_since(start)) + res) - logger.info("success uploading %s", job['paper_id'], extra=extra) + logger.debug("success uploading %s", job['paper_id'], extra=extra) except Exception as ex: extra.update({CATEGORY: "upload"}) extra.update(job_details) @@ -404,7 +403,7 @@ def sync_to_gcp(todo_q, host): todo_q.task_done() -def log_summary(): +def log_summary(duration, overall_size): # Don't worry about the log level being INFO. # Summary is always "calm" logging. When the error happens, the log entry is generated at the spot of failure. dispatch = { @@ -412,6 +411,7 @@ def log_summary(): "upload": lambda it: {"paper_id": it[0], "action": it[2], "outcome": it[5]}, "failed": lambda it: {"paper_id": it[0], "action": it[2], "error": it[3]} } + n_good, n_bad = 0, 0 for row in sorted(list(summary_q.queue), key=lambda tup: tup[0]): summary = list(map(str, row)) action = summary[2] @@ -420,8 +420,13 @@ def log_summary(): summary_log.update(digester(summary)) if action == "failed": logger.warning(','.join(summary), extra=summary_log) + n_bad += 1 else: - logger.info(','.join(summary), extra=summary_log) + logger.debug(','.join(summary), extra=summary_log) + n_good += 1 + + logger.info( f"Done at {datetime.now().isoformat()}. Overall time: {duration:.2f} sec for {overall_size} submissions. Success/Failed: {n_good}/{n_bad}", + extra={"total": overall_size, "duration": str(duration), "success": n_good, "failure": n_bad, CATEGORY: "summary"}) # #################### MAIN #################### # @@ -432,8 +437,10 @@ def main(args): globals().update(eval(args.globals)) if not (args.d or args.test): storage.Client() # will fail if no auth setup - if args.v or args.test: + if args.v: logger.setLevel(logging.INFO) + if args.debug or args.test: + logger.setLevel(logging.DEBUG) if args.json_log_dir and os.path.exists(args.json_log_dir): json_logHandler = logging.handlers.RotatingFileHandler(os.path.join(args.json_log_dir, "sync-to-gcp.log"), @@ -442,7 +449,7 @@ def main(args): json_formatter = ArxivSyncJsonFormatter(**LOG_FORMAT_KWARGS) json_formatter.converter = gmtime json_logHandler.setFormatter(json_formatter) - json_logHandler.setLevel(logging.DEBUG if args.v or args.test else logging.INFO) + json_logHandler.setLevel(logging.DEBUG if args.v or args.debug else logging.INFO) logger.addHandler(json_logHandler) pass @@ -452,8 +459,11 @@ def main(args): if args.d: todo = list(todo_q.queue) + # Drain the to-do so other tests don't get confused. + while not todo_q.empty(): + todo_q.get() if args.test: - logger.info("Dry run no changes made", + logger.debug("Dry run no changes made", extra={CATEGORY: "status", "todos": len(todo)}) localpath = "/foo" key = "bar" @@ -461,7 +471,7 @@ def main(args): paper_id = "1234" summary_q.put(("1234", 0, "upload", localpath, key, "already_on_gs", 0, 0)) summary_q.put(("5678", 0, "failed", "bad!")) - log_summary() + log_summary(perf_counter() - overall_start, 2) return todo print(json.dumps(todo, indent=2)) print(f"{len(todo)} submissions (some may be test submissions)") @@ -469,7 +479,7 @@ def main(args): logger.debug("made todo_q, getting size") overall_size = todo_q.qsize() - logger.info('Made %d todos', overall_size, extra={"n_todos": overall_size}) + logger.debug('Made %d todos', overall_size, extra={"n_todos": overall_size}) threads = [] for host, n_th in ENSURE_HOSTS: @@ -486,20 +496,13 @@ def main(args): DONE = True RUN = False - logger.info("Run is complete. Wating to join threads") + logger.debug("Run is complete. Waiting to join threads") [th.join() for th in threads] - logger.info("Run is complete. Threads done joining") + logger.debug("Run is complete. Threads done joining") # Summary report - log_summary() - - # Epilogue - logger.info(f"Done at {datetime.now().isoformat()}", - extra={CATEGORY: "status"}) - duration = perf_counter() - overall_start - - logger.info(f"Overall time: {duration:.2f} sec for {overall_size} submissions", - extra={CATEGORY: "status", "duration": str(duration)}) + log_summary(perf_counter() - overall_start, overall_size) + pass if __name__ == "__main__": @@ -508,6 +511,7 @@ def main(args): ad.add_argument('--json-log-dir', help='Additional JSON logging', default='/var/log/e-prints') ad.add_argument('-v', help='verbose', action='store_true') ad.add_argument('-d', help="Dry run no action", action='store_true') + ad.add_argument('--debug', help='Set logging to debug', action='store_true') ad.add_argument('--globals', help="Global variables") ad.add_argument('filename') args = ad.parse_args() diff --git a/script/sync_prod_to_gcp/test/test_sync_prod_to_gcp.py b/script/sync_prod_to_gcp/test/test_sync_prod_to_gcp.py index 6426087de..af72666a3 100644 --- a/script/sync_prod_to_gcp/test/test_sync_prod_to_gcp.py +++ b/script/sync_prod_to_gcp/test/test_sync_prod_to_gcp.py @@ -18,23 +18,14 @@ def setUp(self) -> None: self.parser.add_argument('-d', help="Dry run no action", action='store_true') self.parser.add_argument('--prefix', help="FTP_PREFIX", default=None) self.parser.add_argument('--globals', help="Global variables") + self.parser.add_argument('--debug', help="Debug logging", action='store_true') self.parser.add_argument('filename') if os.path.exists("test/test-output"): shutil.rmtree("test/test-output") pass os.makedirs("test/test-output", exist_ok=True) os.putenv("GOOGLE_APPLICATION_CREDENTIALS", "test/gcp-sync-test-role.json") - self.overrides = repr({ - "GS_BUCKET": 'arxiv-sync-test-01', - "GS_KEY_PREFIX": '/ps_cache', - "CACHE_PREFIX": 'test/cache/', - "PS_CACHE_PREFIX": 'test/cache/ps_cache/', - "FTP_PREFIX": 'test/data/ftp/', - "ORIG_PREFIX": 'test/data/orig/', - "DATA_PREFIX": 'test/data/', - "REUPLOADS": {'ftp/arxiv/papers/2308/2308.16189.abs': True} - }) - rm_items = ["gsutil", "rm", "-a", "-f" ] + rm_items = ["gsutil", "rm", "-a", "-f"] droplets = ["gs://arxiv-sync-test-01/ftp/arxiv/papers/2308/2308.16188.abs", "gs://arxiv-sync-test-01/ftp/arxiv/papers/2308/2308.16188.tar.gz", "gs://arxiv-sync-test-01/ps_cache/arxiv/pdf/2308/2308.16188v1.pdf"] @@ -49,9 +40,13 @@ def tearDown(self) -> None: def test_json_log(self): from sync_published_to_gcp import main as sync_main + overrides = repr({ + "GS_BUCKET": 'arxiv-sync-test-01', + "GS_KEY_PREFIX": '/ps_cache' + }) args = self.parser.parse_args(["-d", "-v", "--test", "--json-log-dir", "./test/test-output", - "--globals", self.overrides, + "--globals", overrides, "./test/data/publish_230901.log"]) todos = sync_main(args) with open("./test/data/publish_230901.out.pickle", "wb") as outfd: @@ -66,7 +61,7 @@ def test_json_log(self): logs = logfd.readlines() pass actual = json.loads(logs[1]) - self.assertEqual("INFO", actual.get("level")) + self.assertEqual("DEBUG", actual.get("level")) self.assertEqual("Dry run no changes made", actual.get("message")) self.assertEqual(1285, actual.get("todos")) @@ -86,9 +81,19 @@ def test_json_log(self): def test_sync(self): + overrides = repr({ + "GS_BUCKET": 'arxiv-sync-test-01', + "GS_KEY_PREFIX": '/ps_cache', + "REUPLOADS": {'ftp/arxiv/papers/2308/2308.16189.abs': True}, + "CACHE_PREFIX": 'test/cache/', + "PS_CACHE_PREFIX": 'test/cache/ps_cache/', + "FTP_PREFIX": 'test/data/ftp/', + "ORIG_PREFIX": 'test/data/orig/', + "DATA_PREFIX": 'test/data/', + }) args = self.parser.parse_args(["-v", "--json-log-dir=./test/test-output", - "--globals", self.overrides, + "--globals", overrides, "./test/data/publish_test.log"]) todos = sync_main(args) with open("./test/test-output/sync-to-gcp.log") as logfd: @@ -104,7 +109,7 @@ def test_sync(self): pass pass - self.assertEqual(2, len(levels["ERROR"])) + self.assertGreaterEqual(len(levels["ERROR"]), 2) for an_error in levels["ERROR"]: self.assertEqual("2308.16189", an_error.get("paper_id")) self.assertEqual("upload", an_error.get("action")) @@ -120,7 +125,7 @@ def test_sync(self): self.fail(f"Unexpected item " + repr(an_error.get("item"))) pass pass - self.assertEqual(2, len(levels["WARNING"])) + self.assertGreaterEqual(len(levels["WARNING"]), 2) pass