diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index ac2f23de2..fc226dca4 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -301,22 +301,25 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in .option("includeHeaders", "true") .option("failOnDataLoss", "false") .load() - .limit(5000000) ) # filter only the necassary entries - df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) - df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + filtered_df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id)) + filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"])) + + # limit the number of records ingested + limit = job_conf.get("write_options", {}).get("job_limit", 5000000) + filtered_df = filtered_df.limit(limit) # deserialize dataframe so that it can be properly saved - deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df) + deserialized_df = engine.get_instance()._deserialize_from_avro(entity, filtered_df) # insert data entity.stream = False # to make sure we dont write to kafka entity.insert(deserialized_df, storage="offline") # update offsets - df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect() + df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect() offset_dict = json.loads(offset_string) for offset_row in df_offsets: offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1