From ad441ee86453be30a670d633207376c4765d9264 Mon Sep 17 00:00:00 2001 From: Ralf Date: Fri, 6 Dec 2024 11:18:45 +0200 Subject: [PATCH] [FSTORE-1632] Change limit location for delta ingestion (#422) Co-authored-by: Fabio Buso --- utils/python/hsfs_utils.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index ac2f23de2..fc226dca4 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -301,22 +301,25 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .option("includeHeaders", "true") .option("failOnDataLoss", "false") .load() - .limit(5000000) ) # filter only the necassary entries - df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) - df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + filtered_df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) + filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + + # limit the number of records ingested + limit = job_conf.get("write_options", {}).get("job_limit", 5000000) + filtered_df = filtered_df.limit(limit) # deserialize dataframe so that it can be properly saved - deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) + deserialized_df = engine.get_instance()._deserialize_from_avro(entity, filtered_df) # insert data entity.stream = False # to make sure we dont write to kafka entity.insert(deserialized_df, storage="offline") # update offsets - df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() + df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect() offset_dict = json.loads(offset_string) for offset_row in df_offsets: offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1