Skip to content

Commit

Permalink
refactor: improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Mehrsary committed Apr 16, 2024
1 parent bb18d42 commit 22c3636
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 243 deletions.
7 changes: 4 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
version: '3.7'
services:
node-fhir-test-data-populator:
build: .
image: ghcr.io/privateaim/node-fhir-test-data-populator:latest
# build: .
environment:
CLEAR_TESTDATA_DIRS: ${CLEAR_TESTDATA_DIRS:-"true"}
# SYNTHEA GENERATION
Expand All @@ -17,11 +18,11 @@ services:
ENABLE_SENT_DATA_TO_FHIR: ${ENABLE_SENT_DATA_TO_FHIR:-""}
GZIPPED_FHIR_SEND_INPUT_FILES: ${GZIPPED_FHIR_SEND_INPUT_FILES:-"false"}
REMOVE_INPUTFILES_FHIR_SENDING: ${REMOVE_INPUTFILES_FHIR_SENDING:-"false"}
FHIR_URL: ${FHIR_URL:-"http://localhost:8081/fhir"}
FHIR_URL: ${FHIR_URL:-"http://localhost:8080/fhir"}
FHIR_USER: ${FHIR_USER:-""}
FHIR_PW: ${FHIR_PW:-""}
volumes:
- "./generated-testdata:/gen/output"
# - "./generated-testdata:/gen/output"
- "./synthea.properties:/gen/synthea.properties"
- "./docker-entrypoint.sh:/gen/docker-entrypoint.sh"
- "./project/testdata-post-processing.py:/gen/testdata-post-processing.py"
Expand Down
152 changes: 97 additions & 55 deletions project/continuously-load-testdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,64 @@


def json_from_gzip_file(input_filepath):

with gzip.open(input_filepath, 'rt', encoding='utf-8') as gzip_file:
json_data = gzip_file.read()

return json.loads(json_data)
try:
with gzip.open(input_filepath, "rt", encoding="utf-8") as gzip_file:
json_data = gzip_file.read()
return json.loads(json_data)
except Exception as e:
logging.error(f"Error loading JSON from gzipped file: {e}")
raise


def send_to_fhir_server(bundle, args):
logging.debug("Sending bundle to fhir...")
headers = {'Content-Type': 'application/json'}
response = requests.post(args.fhirurl, json=bundle, headers=headers,
auth=HTTPBasicAuth(args.fhiruser, args.fhirpw))

logging.debug(f'Response sending bundle: {response.status_code}')
try:
logging.debug("Sending bundle to FHIR...")
headers = {"Content-Type": "application/json"}
response = requests.post(
args.fhirurl,
json=bundle,
headers=headers,
auth=HTTPBasicAuth(args.fhiruser, args.fhirpw),
)
response.raise_for_status() # Raise an exception for non-2xx responses
logging.debug(f"Response sending bundle: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Error sending bundle to FHIR server: {e}")
raise


def process_file(data_dict, file_path, args):

logger = logging.getLogger()
logger.setLevel(get_numeric_log_level(args.log_level))
logging.info(f'SENDING TO FHIR - Processing file: {file_path}')
logging.info(f"SENDING TO FHIR - Processing file: {file_path}")

bundle = {}
json_loaded_success = False
cur_try = 0
n_retries = 5
while cur_try < n_retries and json_loaded_success == False:
while cur_try < n_retries and not json_loaded_success:
try:

if args.gzippedfiles:
bundle = json_from_gzip_file(file_path)
else:
with open(file_path, 'r') as json_file:
with open(file_path, "r") as json_file:
bundle = json.load(json_file)
json_loaded_success = True
except Exception as e:
cur_try = cur_try + 1
cur_try += 1
logging.debug(
f"Hit running condition opening half written file, try:{cur_try} of {n_retries}")
json_loaded_success = False
f"Error loading JSON from file, attempt {cur_try} of {n_retries}: {e}"
)
time.sleep(5)

send_to_fhir_server(bundle, args)

if args.removeinputfiles:
os.remove(file_path)
try:
os.remove(file_path)
except OSError as e:
logging.error(f"Error removing input file: {e}")
raise

data_dict[file_path] = True
return True
Expand All @@ -73,17 +85,26 @@ def process_directory(data_dict, args):
global LAST_CHANGE_TIME

multiple_results = []
all_files = [os.path.join(root, file) for root, _,
files in os.walk(args.inputdir) for file in files]
all_files = [
os.path.join(root, file)
for root, _, files in os.walk(args.inputdir)
for file in files
]

with Pool(processes=4) as pool:
for filename in all_files:
if filename not in data_dict:
LAST_CHANGE_TIME = datetime.now()
multiple_results.append(pool.apply_async(
process_file, (data_dict, filename, args)))
multiple_results.append(
pool.apply_async(process_file, (data_dict, filename, args))
)

logging.debug([res.get(timeout=30) for res in multiple_results])
try:
results = [res.get(timeout=30) for res in multiple_results]
logging.debug(results)
except concurrent.futures.TimeoutError:
logging.error("Timeout occurred while processing files.")
raise


def get_numeric_log_level(log_level):
Expand All @@ -99,48 +120,69 @@ def str_to_bool(s):


def main():

parser = argparse.ArgumentParser(
description="Continuously process files in a directory.")
parser.add_argument("--log-level", default="info", choices=["debug", "info", "warning", "error", "critical"],
help="Set the logging level")
parser.add_argument("--metadatadir", default="generated-testdata/metadata",
help="Path to the directory to be processed.")
parser.add_argument("--inputdir", default="generated-testdata/fhir-processed",
help="Path to the directory to be processed.")
parser.add_argument("--timeout", type=int, default=5,
help="Timeout in minutes for no new files.")
parser.add_argument("--gzippedfiles", type=str_to_bool,
help="Enable reading of gzipped files")
parser.add_argument("--removeinputfiles", type=str_to_bool,
help="Enable the continious removing of input files")
parser.add_argument("--fhirurl", default="http://localhost:8081/fhir",
help="FHIR base url - commonly ends with /fhir")
parser.add_argument("--fhiruser", default="",
help="FHIR Basic auth user")
parser.add_argument("--fhirpw", default="",
help="FHIR Basic auth password")
description="Continuously process files in a directory."
)
parser.add_argument(
"--log-level",
default="info",
choices=["debug", "info", "warning", "error", "critical"],
help="Set the logging level",
)
parser.add_argument(
"--metadatadir",
default="generated-testdata/metadata",
help="Path to the directory to be processed.",
)
parser.add_argument(
"--inputdir",
default="generated-testdata/fhir-processed",
help="Path to the directory to be processed.",
)
parser.add_argument(
"--timeout", type=int, default=5, help="Timeout in minutes for no new files."
)
parser.add_argument(
"--gzippedfiles", type=str_to_bool, help="Enable reading of gzipped files"
)
parser.add_argument(
"--removeinputfiles",
type=str_to_bool,
help="Enable the continuous removing of input files",
)
parser.add_argument(
"--fhirurl",
default="http://localhost:8080/fhir",
help="FHIR base url - commonly ends with /fhir",
)
parser.add_argument("--fhiruser", default="", help="FHIR Basic auth user")
parser.add_argument("--fhirpw", default="", help="FHIR Basic auth password")

args = parser.parse_args()
logging.basicConfig(level=get_numeric_log_level(args.log_level))
manager = Manager()
data_dict = manager.dict()

while True:
process_directory(data_dict, args)

if (datetime.now() - LAST_CHANGE_TIME) > timedelta(minutes=args.timeout):
break

time.sleep(10)
try:
process_directory(data_dict, args)
if (datetime.now() - LAST_CHANGE_TIME) > timedelta(minutes=args.timeout):
break
time.sleep(10)
except Exception as e:
logging.error(f"An error occurred: {e}")
time.sleep(10) # Add some delay before retrying

processed_data_info = dict(data_dict)

if args.removeinputfiles:
shutil.rmtree(args.inputdir)
try:
shutil.rmtree(args.inputdir)
except Exception as e:
logging.error(f"Error removing input directory: {e}")

with open(f'{args.metadatadir}/loaded_data_info.json', "w") as json_file:
with open(f"{args.metadatadir}/loaded_data_info.json", "w") as json_file:
json.dump(processed_data_info, json_file, indent=4)


if __name__ == "__main__":
main()
main()
Loading

0 comments on commit 22c3636

Please sign in to comment.