Skip to content

Commit

Permalink
[FSTORE-1632] Change limit location for delta ingestion (logicalclock…
Browse files Browse the repository at this point in the history
…s#422)

Co-authored-by: Fabio Buso <[email protected]>
  • Loading branch information
bubriks and SirOibaf committed Dec 9, 2024
1 parent 1004fe5 commit d15a25c
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,22 +300,25 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
.option("startingOffsets", offset_string)
.option("includeHeaders", "true")
.load()
.limit(5000000)
)

# filter only the necassary entries
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id))
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"]))
filtered_df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id))
filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"]))

# limit the number of records ingested
limit = job_conf.get("write_options", {}).get("job_limit", 5000000)
filtered_df = filtered_df.limit(limit)

# deserialize dataframe so that it can be properly saved
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df)
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, filtered_df)

# insert data
entity.stream = False # to make sure we dont write to kafka
entity.insert(deserialized_df)

# update offsets
df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect()
df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect()
offset_dict = json.loads(offset_string)
for offset_row in df_offsets:
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1
Expand Down

0 comments on commit d15a25c

Please sign in to comment.