Skip to content

Commit

Permalink
Fix up the tests, and logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
ntai-arxiv committed Oct 4, 2023
1 parent eac5784 commit 5b97ceb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 43 deletions.
58 changes: 31 additions & 27 deletions script/sync_prod_to_gcp/sync_published_to_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from pathlib import Path

from identifier import Identifier
from digester import digest_from_filepath, get_file_mtime
from digester import get_file_mtime

overall_start = perf_counter()

Expand Down Expand Up @@ -97,7 +97,6 @@
RUN = True
DONE = False


def handler_stop_signals(signum, frame):
"""Stop threads on ctrl-c, mostly useful during testing"""
global RUN
Expand Down Expand Up @@ -259,6 +258,7 @@ def path_to_bucket_key(pdf) -> str:
elif str(pdf).startswith(DATA_PREFIX):
return str(pdf).replace(DATA_PREFIX, '')
else:
logging.error(f"path_to_bucket_key: {pdf} does not start with {CACHE_PREFIX} or {DATA_PREFIX}")
raise ValueError(f"Cannot convert PDF path {pdf} to a GS key")


Expand Down Expand Up @@ -321,7 +321,7 @@ def arxiv_pdf_url(host, arxiv_id) -> str:
else:
raise (Exception(f"ensure_pdf: Could not create {pdf_file}. {url} {ms_since(start)} ms"))
else:
logger.info(f"ensure_file_url_exists: {str(pdf_file)} already exists")
logger.debug(f"ensure_file_url_exists: {str(pdf_file)} already exists")
return (pdf_file, url, "already exists", ms_since(start))


Expand Down Expand Up @@ -349,17 +349,16 @@ def mime_from_fname(filepath):
with open(localpath, 'rb') as fh:

destination.upload_from_file(fh, content_type=mime_from_fname(localpath))
logger.info(
logger.debug(
f"upload: completed upload of {localpath} to gs://{GS_BUCKET}/{key} of size {localpath.stat().st_size}")
sha_value = digest_from_filepath(localpath)
try:
destination.metadata = {"localpath": localpath, "mtime": get_file_mtime(localpath), "sha256": sha_value}
destination.metadata = {"localpath": localpath, "mtime": get_file_mtime(localpath)}
destination.update()
except:
pass
return ("upload", localpath, key, "uploaded", ms_since(start), localpath.stat().st_size)
else:
logger.info(f"upload: Not uploading {localpath}, gs://{GS_BUCKET}/{key} already on gs")
logger.debug(f"upload: Not uploading {localpath}, gs://{GS_BUCKET}/{key} already on gs")
return ("upload", localpath, key, "already_on_gs", ms_since(start), 0)


Expand All @@ -384,7 +383,7 @@ def sync_to_gcp(todo_q, host):
break

job_details = {"job": repr(job), "paper_id": job['paper_id']}
logger.info("doing %s", job['paper_id'], extra=job_details)
logger.debug("doing %s", job['paper_id'], extra=job_details)
for action, item in job['actions']:
extra = {"action": action, "item": str(item), "job": repr(job), "paper_id": job['paper_id']}
try:
Expand All @@ -395,7 +394,7 @@ def sync_to_gcp(todo_q, host):
res = upload(tl_data.gs_client, Path(item), path_to_bucket_key(item))

summary_q.put((job['paper_id'], ms_since(start)) + res)
logger.info("success uploading %s", job['paper_id'], extra=extra)
logger.debug("success uploading %s", job['paper_id'], extra=extra)
except Exception as ex:
extra.update({CATEGORY: "upload"})
extra.update(job_details)
Expand All @@ -404,14 +403,15 @@ def sync_to_gcp(todo_q, host):
todo_q.task_done()


def log_summary():
def log_summary(duration, overall_size):
# Don't worry about the log level being INFO.
# Summary is always "calm" logging. When the error happens, the log entry is generated at the spot of failure.
dispatch = {
"build+upload": lambda it: {"paper_id": it[0], "action": it[2], "outcome": it[5]},
"upload": lambda it: {"paper_id": it[0], "action": it[2], "outcome": it[5]},
"failed": lambda it: {"paper_id": it[0], "action": it[2], "error": it[3]}
}
n_good, n_bad = 0, 0
for row in sorted(list(summary_q.queue), key=lambda tup: tup[0]):
summary = list(map(str, row))
action = summary[2]
Expand All @@ -420,8 +420,13 @@ def log_summary():
summary_log.update(digester(summary))
if action == "failed":
logger.warning(','.join(summary), extra=summary_log)
n_bad += 1
else:
logger.info(','.join(summary), extra=summary_log)
logger.debug(','.join(summary), extra=summary_log)
n_good += 1

logger.info( f"Done at {datetime.now().isoformat()}. Overall time: {duration:.2f} sec for {overall_size} submissions. Success/Failed: {n_good}/{n_bad}",
extra={"total": overall_size, "duration": str(duration), "success": n_good, "failure": n_bad, CATEGORY: "summary"})


# #################### MAIN #################### #
Expand All @@ -432,8 +437,10 @@ def main(args):
globals().update(eval(args.globals))
if not (args.d or args.test):
storage.Client() # will fail if no auth setup
if args.v or args.test:
if args.v:
logger.setLevel(logging.INFO)
if args.debug or args.test:
logger.setLevel(logging.DEBUG)

if args.json_log_dir and os.path.exists(args.json_log_dir):
json_logHandler = logging.handlers.RotatingFileHandler(os.path.join(args.json_log_dir, "sync-to-gcp.log"),
Expand All @@ -442,7 +449,7 @@ def main(args):
json_formatter = ArxivSyncJsonFormatter(**LOG_FORMAT_KWARGS)
json_formatter.converter = gmtime
json_logHandler.setFormatter(json_formatter)
json_logHandler.setLevel(logging.DEBUG if args.v or args.test else logging.INFO)
json_logHandler.setLevel(logging.DEBUG if args.v or args.debug else logging.INFO)
logger.addHandler(json_logHandler)
pass

Expand All @@ -452,24 +459,27 @@ def main(args):

if args.d:
todo = list(todo_q.queue)
# Drain the to-do so other tests don't get confused.
while not todo_q.empty():
todo_q.get()
if args.test:
logger.info("Dry run no changes made",
logger.debug("Dry run no changes made",
extra={CATEGORY: "status", "todos": len(todo)})
localpath = "/foo"
key = "bar"
res = ()
paper_id = "1234"
summary_q.put(("1234", 0, "upload", localpath, key, "already_on_gs", 0, 0))
summary_q.put(("5678", 0, "failed", "bad!"))
log_summary()
log_summary(perf_counter() - overall_start, 2)
return todo
print(json.dumps(todo, indent=2))
print(f"{len(todo)} submissions (some may be test submissions)")
sys.exit(1)

logger.debug("made todo_q, getting size")
overall_size = todo_q.qsize()
logger.info('Made %d todos', overall_size, extra={"n_todos": overall_size})
logger.debug('Made %d todos', overall_size, extra={"n_todos": overall_size})

threads = []
for host, n_th in ENSURE_HOSTS:
Expand All @@ -486,20 +496,13 @@ def main(args):

DONE = True
RUN = False
logger.info("Run is complete. Wating to join threads")
logger.debug("Run is complete. Waiting to join threads")
[th.join() for th in threads]
logger.info("Run is complete. Threads done joining")
logger.debug("Run is complete. Threads done joining")

# Summary report
log_summary()

# Epilogue
logger.info(f"Done at {datetime.now().isoformat()}",
extra={CATEGORY: "status"})
duration = perf_counter() - overall_start

logger.info(f"Overall time: {duration:.2f} sec for {overall_size} submissions",
extra={CATEGORY: "status", "duration": str(duration)})
log_summary(perf_counter() - overall_start, overall_size)
pass


if __name__ == "__main__":
Expand All @@ -508,6 +511,7 @@ def main(args):
ad.add_argument('--json-log-dir', help='Additional JSON logging', default='/var/log/e-prints')
ad.add_argument('-v', help='verbose', action='store_true')
ad.add_argument('-d', help="Dry run no action", action='store_true')
ad.add_argument('--debug', help='Set logging to debug', action='store_true')
ad.add_argument('--globals', help="Global variables")
ad.add_argument('filename')
args = ad.parse_args()
Expand Down
37 changes: 21 additions & 16 deletions script/sync_prod_to_gcp/test/test_sync_prod_to_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,14 @@ def setUp(self) -> None:
self.parser.add_argument('-d', help="Dry run no action", action='store_true')
self.parser.add_argument('--prefix', help="FTP_PREFIX", default=None)
self.parser.add_argument('--globals', help="Global variables")
self.parser.add_argument('--debug', help="Debug logging", action='store_true')
self.parser.add_argument('filename')
if os.path.exists("test/test-output"):
shutil.rmtree("test/test-output")
pass
os.makedirs("test/test-output", exist_ok=True)
os.putenv("GOOGLE_APPLICATION_CREDENTIALS", "test/gcp-sync-test-role.json")
self.overrides = repr({
"GS_BUCKET": 'arxiv-sync-test-01',
"GS_KEY_PREFIX": '/ps_cache',
"CACHE_PREFIX": 'test/cache/',
"PS_CACHE_PREFIX": 'test/cache/ps_cache/',
"FTP_PREFIX": 'test/data/ftp/',
"ORIG_PREFIX": 'test/data/orig/',
"DATA_PREFIX": 'test/data/',
"REUPLOADS": {'ftp/arxiv/papers/2308/2308.16189.abs': True}
})
rm_items = ["gsutil", "rm", "-a", "-f" ]
rm_items = ["gsutil", "rm", "-a", "-f"]
droplets = ["gs://arxiv-sync-test-01/ftp/arxiv/papers/2308/2308.16188.abs",
"gs://arxiv-sync-test-01/ftp/arxiv/papers/2308/2308.16188.tar.gz",
"gs://arxiv-sync-test-01/ps_cache/arxiv/pdf/2308/2308.16188v1.pdf"]
Expand All @@ -49,9 +40,13 @@ def tearDown(self) -> None:

def test_json_log(self):
from sync_published_to_gcp import main as sync_main
overrides = repr({
"GS_BUCKET": 'arxiv-sync-test-01',
"GS_KEY_PREFIX": '/ps_cache'
})
args = self.parser.parse_args(["-d", "-v", "--test",
"--json-log-dir", "./test/test-output",
"--globals", self.overrides,
"--globals", overrides,
"./test/data/publish_230901.log"])
todos = sync_main(args)
with open("./test/data/publish_230901.out.pickle", "wb") as outfd:
Expand All @@ -66,7 +61,7 @@ def test_json_log(self):
logs = logfd.readlines()
pass
actual = json.loads(logs[1])
self.assertEqual("INFO", actual.get("level"))
self.assertEqual("DEBUG", actual.get("level"))
self.assertEqual("Dry run no changes made", actual.get("message"))
self.assertEqual(1285, actual.get("todos"))

Expand All @@ -86,9 +81,19 @@ def test_json_log(self):


def test_sync(self):
overrides = repr({
"GS_BUCKET": 'arxiv-sync-test-01',
"GS_KEY_PREFIX": '/ps_cache',
"REUPLOADS": {'ftp/arxiv/papers/2308/2308.16189.abs': True},
"CACHE_PREFIX": 'test/cache/',
"PS_CACHE_PREFIX": 'test/cache/ps_cache/',
"FTP_PREFIX": 'test/data/ftp/',
"ORIG_PREFIX": 'test/data/orig/',
"DATA_PREFIX": 'test/data/',
})
args = self.parser.parse_args(["-v",
"--json-log-dir=./test/test-output",
"--globals", self.overrides,
"--globals", overrides,
"./test/data/publish_test.log"])
todos = sync_main(args)
with open("./test/test-output/sync-to-gcp.log") as logfd:
Expand All @@ -104,7 +109,7 @@ def test_sync(self):
pass
pass

self.assertEqual(2, len(levels["ERROR"]))
self.assertGreaterEqual(len(levels["ERROR"]), 2)
for an_error in levels["ERROR"]:
self.assertEqual("2308.16189", an_error.get("paper_id"))
self.assertEqual("upload", an_error.get("action"))
Expand All @@ -120,7 +125,7 @@ def test_sync(self):
self.fail(f"Unexpected item " + repr(an_error.get("item")))
pass
pass
self.assertEqual(2, len(levels["WARNING"]))
self.assertGreaterEqual(len(levels["WARNING"]), 2)

pass

Expand Down

0 comments on commit 5b97ceb

Please sign in to comment.