Skip to content

Commit

Permalink
[FSTORE-1058] if feature view query doesn't include event time featur…
Browse files Browse the repository at this point in the history
…e add for get_batch_data(start_time, end_time) (#1610)

Co-authored-by: kennethmhc <[email protected]>
  • Loading branch information
2 people authored and SirOibaf committed Nov 20, 2023
1 parent d04cfb3 commit c1072d5
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 31 deletions.
111 changes: 100 additions & 11 deletions hopsworks-IT/src/test/ruby/spec/featureview_query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,18 @@
end


it "should be able to create sql string with different type of event time filter" do
it "should be able to create sql string with different type of event time filter without them included in the selected feature list" do
featurestore_id = get_featurestore_id(@project.id)
project_name = @project.projectname.downcase
featurestore_name = get_featurestore_name(@project.id)
featuregroup_suffix = short_random_id
features_a = [
{ type: "INT", name: "id", primary: true },
{ type: "DATE", name: "ts_date" },
{ type: "TIMESTAMP", name: "ts" },
{type: "INT", name: "id", primary: true },
{type: "INT", name: "a_testfeature1"},
{type: "INT", name: "a_testfeature2"},
{type: "TIMESTAMP", name: "ts" },
]

fg = create_cached_featuregroup_checked_return_fg(@project.id, featurestore_id,
"test_fg_a#{featuregroup_suffix}",
features: features_a,
Expand All @@ -150,16 +152,16 @@
id: fg[:id],
type: fg[:type]
},
leftFeatures: [{ name: 'ts_date' }, { name: 'ts' }],
leftFeatures: [{ name: 'a_testfeature1' }, { name: 'a_testfeature2' }],
filter: {
type: "AND",
leftFilter: {
feature: {
name: "ts_date",
name: "a_testfeature1",
featureGroupId: fg[:id]
},
condition: "GREATER_THAN",
value: "2022-01-01"
value: 0
},
rightFilter: {
feature: {
Expand All @@ -177,15 +179,102 @@
parsed_query_result = JSON.parse(query_result)

expect(parsed_query_result['query']).to eql(
"SELECT `fg0`.`ts_date` `ts_date`, `fg0`.`ts` `ts`\n" +
"SELECT `fg0`.`a_testfeature1` `a_testfeature1`, `fg0`.`a_testfeature2` `a_testfeature2`\n" +
"FROM `#{featurestore_name}`.`test_fg_a#{featuregroup_suffix}_1` `fg0`\n" +
"WHERE `fg0`.`ts_date` > DATE '#{query[:filter][:leftFilter][:value]}' AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'"
"WHERE `fg0`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'"
)

expect(parsed_query_result['queryOnline']).to eql(
"SELECT `fg0`.`ts_date` `ts_date`, `fg0`.`ts` `ts`\n" +
"SELECT `fg0`.`a_testfeature1` `a_testfeature1`, `fg0`.`a_testfeature2` `a_testfeature2`\n" +
"FROM `#{project_name.downcase}`.`test_fg_a#{featuregroup_suffix}_1` `fg0`\n" +
"WHERE `fg0`.`ts_date` > DATE '#{query[:filter][:leftFilter][:value]}' AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'"
"WHERE `fg0`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'"
)
end


it "should be able to create sql string in joins with different type of event time filter without them included in the selected feature list" do
featurestore_id = get_featurestore_id(@project.id)
project_name = @project.projectname.downcase
featurestore_name = get_featurestore_name(@project.id)
featuregroup_suffix = short_random_id
features_a = [
{type: "INT", name: "id", primary: true },
{type: "INT", name: "a_testfeature1"},
{type: "INT", name: "a_testfeature2"},
{type: "TIMESTAMP", name: "ts" },
]

features_b = [
{type: "INT", name: "id", primary: true },
{type: "INT", name: "b_testfeature1"},
{type: "INT", name: "b_testfeature2"},
{type: "TIMESTAMP", name: "ts" },
]

fg_a = create_cached_featuregroup_checked_return_fg(@project.id, featurestore_id,
"test_fg_a#{featuregroup_suffix}",
features: features_a,
event_time: "ts")

fg_b = create_cached_featuregroup_checked_return_fg(@project.id, featurestore_id,
"test_fg_b#{featuregroup_suffix}",
features: features_b,
event_time: "ts")

query = {
leftFeatureGroup: {
id: fg_a[:id],
type: fg_a[:type]
},
leftFeatures: [{name: 'a_testfeature1'}, {name: 'a_testfeature2'}],
joins: [{
query: {
leftFeatureGroup: {
id: fg_b[:id],
type: fg_b[:type]
},
leftFeatures: [{name: 'b_testfeature1'}, {name: 'b_testfeature2'}],
}
}
],
filter: {
type: "AND",
leftFilter: {
feature: {
name: "a_testfeature1",
featureGroupId: fg_a[:id]
},
condition: "GREATER_THAN",
value: 0
},
rightFilter: {
feature: {
name: "ts",
featureGroupId: fg_a[:id]
},
condition: "GREATER_THAN",
value: "2022-02-01 00:00:00"
}
}

}

query_result = put "#{ENV['HOPSWORKS_API']}/project/#{@project.id}/featurestores/query", query.to_json
expect_status_details(200)
parsed_query_result = JSON.parse(query_result)

expect(parsed_query_result['query']).to eql(
"SELECT `fg1`.`a_testfeature1` `a_testfeature1`, `fg1`.`a_testfeature2` `a_testfeature2`, `fg0`.`b_testfeature1` `b_testfeature1`, `fg0`.`b_testfeature2` `b_testfeature2`\n" +
"FROM `#{featurestore_name}`.`test_fg_a#{featuregroup_suffix}_1` `fg1`\n" +
"INNER JOIN `#{featurestore_name}`.`test_fg_b#{featuregroup_suffix}_1` `fg0` ON `fg1`.`id` = `fg0`.`id`\n" +
"WHERE `fg1`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg1`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'"
)

expect(parsed_query_result['queryOnline']).to eql(
"SELECT `fg1`.`a_testfeature1` `a_testfeature1`, `fg1`.`a_testfeature2` `a_testfeature2`, `fg0`.`b_testfeature1` `b_testfeature1`, `fg0`.`b_testfeature2` `b_testfeature2`\n" +
"FROM `#{project_name.downcase}`.`test_fg_a#{featuregroup_suffix}_1` `fg1`\n" +
"INNER JOIN `#{project_name.downcase}`.`test_fg_b#{featuregroup_suffix}_1` `fg0` ON `fg1`.`id` = `fg0`.`id`\n" +
"WHERE `fg1`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg1`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'"
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,26 +334,30 @@ public List<Feature> collectFeatures(Query query) {
public List<Feature> collectFeaturesFromFilter(FilterLogic filter) {
return this.collectFeaturesFromFilter(filter, null);
}

public List<Feature> collectFeaturesFromFilter(FilterLogic filter, Query query) {
List<Feature> features = new ArrayList<>();
if (filter.getLeftFilter() != null) {
collectFeatureFromFilter(filter, features, query);
return features;
}

private void collectFeatureFromFilter(FilterLogic filter, List<Feature> features, Query query) {
if(filter.getLeftFilter() != null) {
features.addAll(filter.getLeftFilter().getFeatures().stream().filter(f ->
(query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList()));
(query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList()));
}
if (filter.getRightFilter() != null) {
if(filter.getRightFilter() != null) {
features.addAll(filter.getRightFilter().getFeatures().stream().filter(f ->
(query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList()));
(query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList()));
}
if (filter.getLeftLogic() != null) {
features.addAll(this.collectFeaturesFromFilter(filter.getLeftLogic(), query));
if(filter.getLeftLogic() !=null) {
collectFeatureFromFilter(filter.getLeftLogic(), features, query);
}
if (filter.getRightLogic() != null) {
features.addAll(this.collectFeaturesFromFilter(filter.getRightLogic(), query));
if(filter.getRightLogic() !=null) {
collectFeatureFromFilter(filter.getRightLogic(), features, query);
}
return features;
}

private SqlNode generateCachedTableNode(Query query, boolean online) {
List<String> tableIdentifierStr = new ArrayList<>();
if (online) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,25 @@ public List<SqlCall> generateSubQueries(Query baseQuery, Query query, boolean is

// Add filters if needed
List<Feature> originalFeatures = new ArrayList<>(join.getRightQuery().getFeatures());

if (query.getFilter() != null) {
// if all filters are on left-sided features, add them to the base/inner query
if (baseQuery.getFilter() == null && !constructorController.collectFeaturesFromFilter(query.getFilter()
).stream().anyMatch(f -> !query.getFeatures().contains(f))) {
List<Feature> leftQueryFilterFeatures = constructorController.collectFeaturesFromFilter(query.getFilter());
if (baseQuery.getFilter() == null && !leftQueryFilterFeatures
.stream().anyMatch(f -> !query.getAvailableFeatures().contains(f))) {
baseQuery.setFilter(query.getFilter());
}
// else, filters will be added to the outer query and relevant features need to be selected in the subquery
else {
List<Feature> filterFeatures = constructorController.collectFeaturesFromFilter(query.getFilter(),
join.getRightQuery()).stream().filter(f -> !join.getRightQuery().getFeatures().contains(f))
.collect(Collectors.toList());
join.getRightQuery().getFeatures().addAll(filterFeatures);
} else {
// else, filters will be added to the outer query and relevant features need to be selected in the subquery
List<Feature> filterOuterQueryFeatures = constructorController.collectFeaturesFromFilter(query.getFilter(),
join.getRightQuery()).stream()
.filter(f -> !join.getRightQuery().getAvailableFeatures().contains(f))
.collect(Collectors.toList());
List<Feature> filterLFeatures = constructorController.collectFeaturesFromFilter(query.getFilter(),
join.getLeftQuery()).stream()
.filter(f -> !join.getLeftQuery().getAvailableFeatures().contains(f))
.collect(Collectors.toList());
filterOuterQueryFeatures.addAll(filterLFeatures);
join.getRightQuery().getFeatures().addAll(filterOuterQueryFeatures);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ public void testGenerateSqlWithRightFilterInner() {
"SELECT `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`ts` `ts`, `right_fg0`.`label` `label`, `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`ts` `ts`, `right_fg0`.`ft1` `ft1`, `right_fg1`.`R_pk1` `R_pk1`, `right_fg1`.`R_ts` `R_ts`, `right_fg1`.`R_ft1` `R_ft1`\n" +
"FROM right_fg0\n" +
"INNER JOIN right_fg1 ON `right_fg0`.`join_pk_pk1` = `right_fg1`.`join_pk_pk1` AND `right_fg0`.`join_evt_ts` = `right_fg1`.`join_evt_ts`)";

Assert.assertEquals(expected, result);
}

Expand Down Expand Up @@ -804,6 +805,65 @@ public void testGenerateSqlWithLeftFilter() {
"INNER JOIN right_fg1 ON `right_fg0`.`join_pk_pk1` = `right_fg1`.`join_pk_pk1` AND `right_fg0`.`join_evt_ts` = `right_fg1`.`join_evt_ts`)";
Assert.assertEquals(expected, result);
}

@Test
public void testGenerateSqlWithLeftFilterOmitted() {
List<Feature> leftFeatures = new ArrayList<>();
Feature filterFeature = new Feature("label", "fg0", fgLeft, "int", null);
leftFeatures.add(new Feature("pk1", "fg0", fgLeft, true));
leftFeatures.add(new Feature("pk2", "fg0", fgLeft));
leftFeatures.add(filterFeature);

List<Feature> rightFeatures = new ArrayList<>();
rightFeatures.add(new Feature("pk1", "fg1", fgRight));
rightFeatures.add(new Feature("pk2", "fg1", fgRight));
rightFeatures.add(new Feature("ft1", "fg1", fgRight));

List<Feature> rightFeatures1 = new ArrayList<>();
rightFeatures1.add(new Feature("pk1", "fg2", fgRight1));
rightFeatures1.add(new Feature("ft1", "fg2", fgRight1));

List<Feature> leftOn = Arrays.asList(new Feature("pk1", "fg0", fgLeft), new Feature("pk2", "fg0", fgLeft));
List<Feature> rightOn = Arrays.asList(new Feature("pk1", "fg1", fgRight), new Feature("pk2", "fg1", fgRight));

// join on different pks
List<Feature> leftOn1 = Collections.singletonList(new Feature("pk1", "fg0", fgLeft));
List<Feature> rightOn1 = Collections.singletonList(new Feature("pk1", "fg2", fgRight1));

List<SqlCondition> joinOperator = Arrays.asList(SqlCondition.EQUALS, SqlCondition.EQUALS);
List<SqlCondition> joinOperator1 = Collections.singletonList(SqlCondition.EQUALS);

FilterLogic filter = new FilterLogic(new Filter(Arrays.asList(filterFeature), SqlCondition.EQUALS, "1"));

Query query = new Query("fs", "project", fgLeft, "fg0", leftFeatures, leftFeatures, false, filter);
Query right = new Query("fs", "project", fgRight, "fg1", rightFeatures, rightFeatures, false, null);
Query right1 = new Query("fs", "project", fgRight1, "fg2", rightFeatures1, rightFeatures1, false, null);

Join join = new Join(query, right, leftOn, rightOn, JoinType.INNER, null, joinOperator);
Join join1 = new Join(query, right1, leftOn1, rightOn1, JoinType.INNER, "R_", joinOperator1);

query.setJoins(Arrays.asList(join, join1));

String result =
pitJoinController.generateSQL(query, false).toSqlString(new SparkSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
String expected = "WITH right_fg0 AS (SELECT *\n" +
"FROM (SELECT `fg0`.`pk1` `pk1`, `fg0`.`pk2` `pk2`, `fg0`.`label` `label`, `fg0`.`pk1` `join_pk_pk1`, `fg0`.`ts` `join_evt_ts`, `fg1`.`pk1` `pk1`, `fg1`.`pk2` `pk2`, `fg1`.`ft1` `ft1`, RANK() OVER (PARTITION BY `fg0`.`pk1`, `fg0`.`pk2`, `fg0`.`ts` ORDER BY `fg1`.`ts` DESC) pit_rank_hopsworks\n" +
"FROM `fs`.`fg0_1` `fg0`\n" +
"INNER JOIN `fs`.`fg1_1` `fg1` ON `fg0`.`pk1` = `fg1`.`pk1` AND `fg0`.`pk2` = `fg1`.`pk2` AND `fg0`.`ts` >= `fg1`.`ts`\n" +
"WHERE `fg0`.`label` = 1) NA\n" +
"WHERE `pit_rank_hopsworks` = 1), right_fg1 AS (SELECT *\n" +
"FROM (SELECT `fg0`.`pk1` `pk1`, `fg0`.`pk2` `pk2`, `fg0`.`label` `label`, `fg0`.`pk1` `join_pk_pk1`, `fg0`.`ts` `join_evt_ts`, `fg2`.`pk1` `R_pk1`, `fg2`.`ft1` `R_ft1`, RANK() OVER (PARTITION BY `fg0`.`pk1`, `fg0`.`ts` ORDER BY `fg2`.`ts` DESC) pit_rank_hopsworks\n" +
"FROM `fs`.`fg0_1` `fg0`\n" +
"INNER JOIN `fs`.`fg2_1` `fg2` ON `fg0`.`pk1` = `fg2`.`pk1` AND `fg0`.`ts` >= `fg2`.`ts`\n" +
"WHERE `fg0`.`label` = 1) NA\n" +
"WHERE `pit_rank_hopsworks` = 1) (SELECT `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`label` `label`, `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`ft1` `ft1`, `right_fg1`.`R_pk1` `R_pk1`, `right_fg1`.`R_ft1` `R_ft1`\n" +
"FROM right_fg0\n" +
"INNER JOIN right_fg1 ON `right_fg0`.`join_pk_pk1` = `right_fg1`.`join_pk_pk1` AND `right_fg0`.`join_evt_ts` = " +
"`right_fg1`.`join_evt_ts`)";

System.out.println(result);
Assert.assertEquals(expected, result);
}

@Test
public void testGenerateSqlWithDefault() {
Expand Down

0 comments on commit c1072d5

Please sign in to comment.