Skip to content

Commit

Permalink
Make sure the program exits after completion
Browse files Browse the repository at this point in the history
  • Loading branch information
gibchikafa committed Aug 13, 2024
1 parent e757d01 commit a422bf4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
11 changes: 10 additions & 1 deletion utils/java/src/main/java/com/logicalclocks/utils/MainClass.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,22 @@ public static void main(String[] args) throws Exception {
}
LOGGER.info("Hsfs utils write options: {}", writeOptions);

boolean success = false;
try {
if (op.equals("offline_fg_materialization") || op.equals("offline_fg_backfill")) {
SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions);
}
success = true;
} finally {
LOGGER.info("Closing spark session...");
SparkEngine.getInstance().closeSparkSession();
try {
SparkEngine.getInstance().closeSparkSession();
} catch (Exception e) {
LOGGER.error("Error closing spark session", e);
}
if (!success) {
System.exit(1);
}
}
System.exit(0);
}
Expand Down
14 changes: 13 additions & 1 deletion utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import sys
import os
import argparse
import json
Expand Down Expand Up @@ -285,6 +286,7 @@ def parse_isoformat_date(da: str) -> datetime:
args = parser.parse_args()
job_conf = read_job_conf(args.path)

success = False
try:
if args.op == "insert_fg":
insert_fg(spark, job_conf)
Expand All @@ -300,6 +302,16 @@ def parse_isoformat_date(da: str) -> datetime:
import_fg(job_conf)
elif args.op == "run_feature_monitoring":
run_feature_monitoring(job_conf)

success = True
finally:
if spark is not None:
spark.stop()
try:
spark.stop()
except Exception as e:
print(f"Error stopping spark session: {e}")
if not success:
sys.exit(1)

sys.exit(0)

0 comments on commit a422bf4

Please sign in to comment.