Skip to content

Commit

Permalink
Try to improve subprocess exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ttys0dev committed Nov 6, 2024
1 parent f340d11 commit 425acd7
Showing 1 changed file with 127 additions and 70 deletions.
197 changes: 127 additions & 70 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import concurrent.futures
import hashlib
import logging
import traceback
from dataclasses import dataclass
from datetime import datetime
from http import HTTPStatus
Expand Down Expand Up @@ -558,13 +559,25 @@ async def process_recap_docket(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_docket_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool, parse_docket_text, map_cl_to_pacer_id(pq.court_id), text
)
try:
if process.current_process().daemon:
data = parse_docket_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_docket_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq,
f"We encountered a parsing error while processing this item: {e}",
PROCESSING_STATUS.FAILED,
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down Expand Up @@ -735,18 +748,25 @@ async def process_recap_claims_register(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_claims_register_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_claims_register_text,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
data = parse_claims_register_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_claims_register_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed for item {pq}")

if not data:
Expand Down Expand Up @@ -832,16 +852,25 @@ async def process_recap_docket_history_report(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_docket_history_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_docket_history_text,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
data = parse_docket_history_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_docket_history_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed for item {pq}")

if data == {}:
Expand Down Expand Up @@ -939,18 +968,25 @@ async def process_case_query_page(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_case_query_page_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_case_query_page_text,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
data = parse_case_query_page_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_case_query_page_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed for item {pq}")

if data == {}:
Expand Down Expand Up @@ -1073,16 +1109,23 @@ async def process_recap_appellate_docket(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_appellate_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_appellate_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
try:
if process.current_process().daemon:
data = parse_appellate_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_appellate_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down Expand Up @@ -1177,16 +1220,23 @@ async def process_recap_acms_docket(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_acms_json(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_json,
map_cl_to_pacer_id(pq.court_id),
text,
)
try:
if process.current_process().daemon:
data = parse_acms_json(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_json,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down Expand Up @@ -1267,19 +1317,26 @@ async def process_recap_acms_appellate_attachment(
)
return pq_status, msg, []

if process.current_process().daemon:
# yyy
data = parse_acms_attachment_json(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_attachment_json,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
# yyy
data = parse_acms_attachment_json(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_attachment_json,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down

0 comments on commit 425acd7

Please sign in to comment.