diff --git a/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java b/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java index aa902162b..f3104e3bc 100644 --- a/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java +++ b/utils/java/src/main/java/com/logicalclocks/utils/MainClass.java @@ -115,13 +115,22 @@ public static void main(String[] args) throws Exception { } LOGGER.info("Hsfs utils write options: {}", writeOptions); + boolean success = false; try { if (op.equals("offline_fg_materialization") || op.equals("offline_fg_backfill")) { SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions); } + success = true; } finally { LOGGER.info("Closing spark session..."); - SparkEngine.getInstance().closeSparkSession(); + try { + SparkEngine.getInstance().closeSparkSession(); + } catch (Exception e) { + LOGGER.error("Error closing spark session", e); + } + if (!success) { + System.exit(1); + } } System.exit(0); } diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5133b6d17..40799c9f4 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys import os import argparse import json @@ -285,6 +286,7 @@ def parse_isoformat_date(da: str) -> datetime: args = parser.parse_args() job_conf = read_job_conf(args.path) + success = False try: if args.op == "insert_fg": insert_fg(spark, job_conf) @@ -300,6 +302,16 @@ def parse_isoformat_date(da: str) -> datetime: import_fg(job_conf) elif args.op == "run_feature_monitoring": run_feature_monitoring(job_conf) + + success = True finally: if spark is not None: - spark.stop() \ No newline at end of file + try: + spark.stop() + except Exception as e: + print(f"Error stopping spark session: {e}") + if not success: + sys.exit(1) + + sys.exit(0) +