diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 45c018f775..6e7992c553 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -494,3 +494,22 @@ def most_recent_end_time(self) -> Optional[datetime]: if len(self.materialization_intervals) == 0: return None return max([interval[1] for interval in self.materialization_intervals]) + + @property + def online_store_key_ttl_seconds(self) -> Optional[int]: + """ + Retrieves the online store TTL from the FeatureView's tags. + + Returns: + An integer representing the TTL in seconds, or None if not set. + """ + ttl_str = self.tags.get("online_store_key_ttl_seconds") + if ttl_str: + try: + return int(ttl_str) + except ValueError: + raise ValueError( + f"Invalid online_store_key_ttl_seconds value '{ttl_str}' in tags. It must be an integer representing seconds." + ) + else: + return None diff --git a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py index af747650e2..1998de464a 100644 --- a/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py +++ b/sdk/python/feast/infra/online_stores/contrib/cassandra_online_store/cassandra_online_store.py @@ -88,7 +88,7 @@ event_ts TIMESTAMP, created_ts TIMESTAMP, PRIMARY KEY ((entity_key), feature_name) - ) WITH CLUSTERING ORDER BY (feature_name ASC); + ) WITH CLUSTERING ORDER BY (feature_name ASC) AND default_time_to_live={ttl}; """ DROP_TABLE_CQL_TEMPLATE = "DROP TABLE IF EXISTS {fqtable};" @@ -159,6 +159,9 @@ class CassandraOnlineStoreConfig(FeastConfigBaseModel): Table deletion is not currently supported in this mode. """ + key_ttl_seconds: Optional[StrictInt] = None + """Default TTL (in seconds) to apply to all tables if not specified in FeatureView. Value 0 or None means No TTL.""" + class CassandraLoadBalancingPolicy(FeastConfigBaseModel): """ Configuration block related to the Cluster's load-balancing policy. @@ -566,8 +569,13 @@ def _create_table(self, config: RepoConfig, project: str, table: FeatureView): session: Session = self._get_session(config) keyspace: str = self._keyspace fqtable = CassandraOnlineStore._fq_table_name(keyspace, project, table) - create_cql = self._get_cql_statement(config, "create", fqtable) - logger.info(f"Creating table {fqtable}.") + ttl = ( + table.online_store_key_ttl_seconds + or config.online_store.key_ttl_seconds + or 0 + ) + create_cql = self._get_cql_statement(config, "create", fqtable, ttl=ttl) + logger.info(f"Creating table {fqtable} with TTL {ttl}.") session.execute(create_cql) def _get_cql_statement( diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index 47c4ab49d8..b8228e8269 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -74,7 +74,7 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel): format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """ key_ttl_seconds: Optional[int] = None - """(Optional) redis key bin ttl (in seconds) for expiring entities""" + """(Optional) redis key bin ttl (in seconds) for expiring entities. Value None means No TTL. Value 0 means expire in 0 seconds.""" full_scan_for_deletion: Optional[bool] = True """(Optional) whether to scan for deletion of features""" @@ -330,10 +330,13 @@ def online_write_batch( pipe.hset(redis_key_bin, mapping=entity_hset) - if online_store_config.key_ttl_seconds: - pipe.expire( - name=redis_key_bin, time=online_store_config.key_ttl_seconds - ) + ttl = ( + table.online_store_key_ttl_seconds + or online_store_config.key_ttl_seconds + or None + ) + if ttl: + pipe.expire(name=redis_key_bin, time=ttl) results = pipe.execute() if progress: progress(len(results)) diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index 981968df0d..895d849be8 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -168,3 +168,46 @@ def test_update_materialization_intervals(): second_updated_feature_view.materialization_intervals[0][1] == updated_feature_view.materialization_intervals[0][1] ) + + +def test_online_store_key_ttl_seconds_retrieval(): + # Test when TTL is set as a valid integer in tags + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="feature_view_with_ttl", + entities=[], + schema=[Field(name="feature1", dtype=Float32)], + source=file_source, + tags={"online_store_key_ttl_seconds": "3600"}, + ) + assert feature_view.online_store_key_ttl_seconds == 3600 + + +def test_online_store_key_ttl_seconds_none_when_not_set(): + # Test when TTL is not set in tags, expecting None + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="feature_view_without_ttl", + entities=[], + schema=[Field(name="feature1", dtype=Float32)], + source=file_source, + tags={}, + ) + assert feature_view.online_store_key_ttl_seconds is None + + +def test_online_store_key_ttl_seconds_invalid_value(): + # Test when TTL is set as a non-integer string, expecting a ValueError + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="feature_view_invalid_ttl", + entities=[], + schema=[Field(name="feature1", dtype=Float32)], + source=file_source, + tags={"online_store_key_ttl_seconds": "invalid_ttl"}, + ) + with pytest.raises( + ValueError, + match="Invalid online_store_key_ttl_seconds value 'invalid_ttl' in tags. It must be an integer representing seconds.", + ): + _ = feature_view.online_store_key_ttl_seconds