Skip to content

Commit

Permalink
feat: TTL support for ScyllaDB config and FeatureViews (#154)
Browse files Browse the repository at this point in the history
* feat: TTL support for ScyllaDB tables and FeatureViews

* fix: lint formatting

* fix: set TTL as a tag in FeatureViews

* fix: lint formatting

* fix: remove ttl_clause from write_rows

* just make online_store_ttl an int rather than timedelta

* fix: address comments

* chore: revert repo_configuration changes

* fix: lint formatting

* fix: change from online_store_ttl to online_store_key_ttl_seconds

* chore: change Redis key_tt_seconds docstring

* change Cassandra key_ttle_seconds docstring

Co-authored-by: Bhargav Dodla <[email protected]>

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
zabarn and EXPEbdodla authored Nov 8, 2024
1 parent 43c94a6 commit 2a19428
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
19 changes: 19 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,22 @@ def most_recent_end_time(self) -> Optional[datetime]:
if len(self.materialization_intervals) == 0:
return None
return max([interval[1] for interval in self.materialization_intervals])

@property
def online_store_key_ttl_seconds(self) -> Optional[int]:
"""
Retrieves the online store TTL from the FeatureView's tags.
Returns:
An integer representing the TTL in seconds, or None if not set.
"""
ttl_str = self.tags.get("online_store_key_ttl_seconds")
if ttl_str:
try:
return int(ttl_str)
except ValueError:
raise ValueError(
f"Invalid online_store_key_ttl_seconds value '{ttl_str}' in tags. It must be an integer representing seconds."
)
else:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
event_ts TIMESTAMP,
created_ts TIMESTAMP,
PRIMARY KEY ((entity_key), feature_name)
) WITH CLUSTERING ORDER BY (feature_name ASC);
) WITH CLUSTERING ORDER BY (feature_name ASC) AND default_time_to_live={ttl};
"""

DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};"
Expand Down Expand Up @@ -159,6 +159,9 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel):
Table deletion is not currently supported in this mode.
"""

key_ttl_seconds: Optional[StrictInt] = None
"""Default TTL (in seconds) to apply to all tables if not specified in FeatureView. Value 0 or None means No TTL."""

class CassandraLoadBalancingPolicy(FeastConfigBaseModel):
"""
Configuration block related to the Cluster's load-balancing policy.
Expand Down Expand Up @@ -566,8 +569,13 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView):
session: Session = self._get_session(config)
keyspace: str = self._keyspace
fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table)
create_cql = self._get_cql_statement(config, "create", fqtable)
logger.info(f"Creating table {fqtable}.")
ttl = (
table.online_store_key_ttl_seconds
or config.online_store.key_ttl_seconds
or 0
)
create_cql = self._get_cql_statement(config, "create", fqtable, ttl=ttl)
logger.info(f"Creating table {fqtable} with TTL {ttl}.")
session.execute(create_cql)

def _get_cql_statement(
Expand Down
13 changes: 8 additions & 5 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel):
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """

key_ttl_seconds: Optional[int] = None
"""(Optional) redis key bin ttl (in seconds) for expiring entities"""
"""(Optional) redis key bin ttl (in seconds) for expiring entities. Value None means No TTL. Value 0 means expire in 0 seconds."""

full_scan_for_deletion: Optional[bool] = True
"""(Optional) whether to scan for deletion of features"""
Expand Down Expand Up @@ -330,10 +330,13 @@ def online_write_batch(

pipe.hset(redis_key_bin, mapping=entity_hset)

if online_store_config.key_ttl_seconds:
pipe.expire(
name=redis_key_bin, time=online_store_config.key_ttl_seconds
)
ttl = (
table.online_store_key_ttl_seconds
or online_store_config.key_ttl_seconds
or None
)
if ttl:
pipe.expire(name=redis_key_bin, time=ttl)
results = pipe.execute()
if progress:
progress(len(results))
Expand Down
43 changes: 43 additions & 0 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,46 @@ def test_update_materialization_intervals():
second_updated_feature_view.materialization_intervals[0][1]
== updated_feature_view.materialization_intervals[0][1]
)


def test_online_store_key_ttl_seconds_retrieval():
# Test when TTL is set as a valid integer in tags
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_with_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={"online_store_key_ttl_seconds": "3600"},
)
assert feature_view.online_store_key_ttl_seconds == 3600


def test_online_store_key_ttl_seconds_none_when_not_set():
# Test when TTL is not set in tags, expecting None
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_without_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={},
)
assert feature_view.online_store_key_ttl_seconds is None


def test_online_store_key_ttl_seconds_invalid_value():
# Test when TTL is set as a non-integer string, expecting a ValueError
file_source = FileSource(name="my-file-source", path="test.parquet")
feature_view = FeatureView(
name="feature_view_invalid_ttl",
entities=[],
schema=[Field(name="feature1", dtype=Float32)],
source=file_source,
tags={"online_store_key_ttl_seconds": "invalid_ttl"},
)
with pytest.raises(
ValueError,
match="Invalid online_store_key_ttl_seconds value 'invalid_ttl' in tags. It must be an integer representing seconds.",
):
_ = feature_view.online_store_key_ttl_seconds

0 comments on commit 2a19428

Please sign in to comment.