diff --git a/hopsworks-IT/src/test/ruby/spec/featureview_query_spec.rb b/hopsworks-IT/src/test/ruby/spec/featureview_query_spec.rb index 480331abc4..0361db9be3 100644 --- a/hopsworks-IT/src/test/ruby/spec/featureview_query_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/featureview_query_spec.rb @@ -131,16 +131,18 @@ end - it "should be able to create sql string with different type of event time filter" do + it "should be able to create sql string with different type of event time filter without them included in the selected feature list" do featurestore_id = get_featurestore_id(@project.id) project_name = @project.projectname.downcase featurestore_name = get_featurestore_name(@project.id) featuregroup_suffix = short_random_id features_a = [ - { type: "INT", name: "id", primary: true }, - { type: "DATE", name: "ts_date" }, - { type: "TIMESTAMP", name: "ts" }, + {type: "INT", name: "id", primary: true }, + {type: "INT", name: "a_testfeature1"}, + {type: "INT", name: "a_testfeature2"}, + {type: "TIMESTAMP", name: "ts" }, ] + fg = create_cached_featuregroup_checked_return_fg(@project.id, featurestore_id, "test_fg_a#{featuregroup_suffix}", features: features_a, @@ -150,16 +152,16 @@ id: fg[:id], type: fg[:type] }, - leftFeatures: [{ name: 'ts_date' }, { name: 'ts' }], + leftFeatures: [{ name: 'a_testfeature1' }, { name: 'a_testfeature2' }], filter: { type: "AND", leftFilter: { feature: { - name: "ts_date", + name: "a_testfeature1", featureGroupId: fg[:id] }, condition: "GREATER_THAN", - value: "2022-01-01" + value: 0 }, rightFilter: { feature: { @@ -177,15 +179,102 @@ parsed_query_result = JSON.parse(query_result) expect(parsed_query_result['query']).to eql( - "SELECT `fg0`.`ts_date` `ts_date`, `fg0`.`ts` `ts`\n" + + "SELECT `fg0`.`a_testfeature1` `a_testfeature1`, `fg0`.`a_testfeature2` `a_testfeature2`\n" + "FROM `#{featurestore_name}`.`test_fg_a#{featuregroup_suffix}_1` `fg0`\n" + - "WHERE `fg0`.`ts_date` > DATE '#{query[:filter][:leftFilter][:value]}' AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'" + "WHERE `fg0`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'" ) expect(parsed_query_result['queryOnline']).to eql( - "SELECT `fg0`.`ts_date` `ts_date`, `fg0`.`ts` `ts`\n" + + "SELECT `fg0`.`a_testfeature1` `a_testfeature1`, `fg0`.`a_testfeature2` `a_testfeature2`\n" + "FROM `#{project_name.downcase}`.`test_fg_a#{featuregroup_suffix}_1` `fg0`\n" + - "WHERE `fg0`.`ts_date` > DATE '#{query[:filter][:leftFilter][:value]}' AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'" + "WHERE `fg0`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg0`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'" + ) + end + + + it "should be able to create sql string in joins with different type of event time filter without them included in the selected feature list" do + featurestore_id = get_featurestore_id(@project.id) + project_name = @project.projectname.downcase + featurestore_name = get_featurestore_name(@project.id) + featuregroup_suffix = short_random_id + features_a = [ + {type: "INT", name: "id", primary: true }, + {type: "INT", name: "a_testfeature1"}, + {type: "INT", name: "a_testfeature2"}, + {type: "TIMESTAMP", name: "ts" }, + ] + + features_b = [ + {type: "INT", name: "id", primary: true }, + {type: "INT", name: "b_testfeature1"}, + {type: "INT", name: "b_testfeature2"}, + {type: "TIMESTAMP", name: "ts" }, + ] + + fg_a = create_cached_featuregroup_checked_return_fg(@project.id, featurestore_id, + "test_fg_a#{featuregroup_suffix}", + features: features_a, + event_time: "ts") + + fg_b = create_cached_featuregroup_checked_return_fg(@project.id, featurestore_id, + "test_fg_b#{featuregroup_suffix}", + features: features_b, + event_time: "ts") + + query = { + leftFeatureGroup: { + id: fg_a[:id], + type: fg_a[:type] + }, + leftFeatures: [{name: 'a_testfeature1'}, {name: 'a_testfeature2'}], + joins: [{ + query: { + leftFeatureGroup: { + id: fg_b[:id], + type: fg_b[:type] + }, + leftFeatures: [{name: 'b_testfeature1'}, {name: 'b_testfeature2'}], + } + } + ], + filter: { + type: "AND", + leftFilter: { + feature: { + name: "a_testfeature1", + featureGroupId: fg_a[:id] + }, + condition: "GREATER_THAN", + value: 0 + }, + rightFilter: { + feature: { + name: "ts", + featureGroupId: fg_a[:id] + }, + condition: "GREATER_THAN", + value: "2022-02-01 00:00:00" + } + } + + } + + query_result = put "#{ENV['HOPSWORKS_API']}/project/#{@project.id}/featurestores/query", query.to_json + expect_status_details(200) + parsed_query_result = JSON.parse(query_result) + + expect(parsed_query_result['query']).to eql( + "SELECT `fg1`.`a_testfeature1` `a_testfeature1`, `fg1`.`a_testfeature2` `a_testfeature2`, `fg0`.`b_testfeature1` `b_testfeature1`, `fg0`.`b_testfeature2` `b_testfeature2`\n" + + "FROM `#{featurestore_name}`.`test_fg_a#{featuregroup_suffix}_1` `fg1`\n" + + "INNER JOIN `#{featurestore_name}`.`test_fg_b#{featuregroup_suffix}_1` `fg0` ON `fg1`.`id` = `fg0`.`id`\n" + + "WHERE `fg1`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg1`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'" + ) + + expect(parsed_query_result['queryOnline']).to eql( + "SELECT `fg1`.`a_testfeature1` `a_testfeature1`, `fg1`.`a_testfeature2` `a_testfeature2`, `fg0`.`b_testfeature1` `b_testfeature1`, `fg0`.`b_testfeature2` `b_testfeature2`\n" + + "FROM `#{project_name.downcase}`.`test_fg_a#{featuregroup_suffix}_1` `fg1`\n" + + "INNER JOIN `#{project_name.downcase}`.`test_fg_b#{featuregroup_suffix}_1` `fg0` ON `fg1`.`id` = `fg0`.`id`\n" + + "WHERE `fg1`.`a_testfeature1` > #{query[:filter][:leftFilter][:value]} AND `fg1`.`ts` > TIMESTAMP '#{query[:filter][:rightFilter][:value]}.000'" ) end end diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java index 78e7f2246e..fd410c734a 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/ConstructorController.java @@ -334,26 +334,30 @@ public List collectFeatures(Query query) { public List collectFeaturesFromFilter(FilterLogic filter) { return this.collectFeaturesFromFilter(filter, null); } - + public List collectFeaturesFromFilter(FilterLogic filter, Query query) { List features = new ArrayList<>(); - if (filter.getLeftFilter() != null) { + collectFeatureFromFilter(filter, features, query); + return features; + } + + private void collectFeatureFromFilter(FilterLogic filter, List features, Query query) { + if(filter.getLeftFilter() != null) { features.addAll(filter.getLeftFilter().getFeatures().stream().filter(f -> - (query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList())); + (query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList())); } - if (filter.getRightFilter() != null) { + if(filter.getRightFilter() != null) { features.addAll(filter.getRightFilter().getFeatures().stream().filter(f -> - (query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList())); + (query == null || f.getFeatureGroup().equals( query.getFeaturegroup()))).collect(Collectors.toList())); } - if (filter.getLeftLogic() != null) { - features.addAll(this.collectFeaturesFromFilter(filter.getLeftLogic(), query)); + if(filter.getLeftLogic() !=null) { + collectFeatureFromFilter(filter.getLeftLogic(), features, query); } - if (filter.getRightLogic() != null) { - features.addAll(this.collectFeaturesFromFilter(filter.getRightLogic(), query)); + if(filter.getRightLogic() !=null) { + collectFeatureFromFilter(filter.getRightLogic(), features, query); } - return features; } - + private SqlNode generateCachedTableNode(Query query, boolean online) { List tableIdentifierStr = new ArrayList<>(); if (online) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/pit/PitJoinController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/pit/PitJoinController.java index 1c7383d60d..37fc744284 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/pit/PitJoinController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/query/pit/PitJoinController.java @@ -133,18 +133,25 @@ public List generateSubQueries(Query baseQuery, Query query, boolean is // Add filters if needed List originalFeatures = new ArrayList<>(join.getRightQuery().getFeatures()); + if (query.getFilter() != null) { // if all filters are on left-sided features, add them to the base/inner query - if (baseQuery.getFilter() == null && !constructorController.collectFeaturesFromFilter(query.getFilter() - ).stream().anyMatch(f -> !query.getFeatures().contains(f))) { + List leftQueryFilterFeatures = constructorController.collectFeaturesFromFilter(query.getFilter()); + if (baseQuery.getFilter() == null && !leftQueryFilterFeatures + .stream().anyMatch(f -> !query.getAvailableFeatures().contains(f))) { baseQuery.setFilter(query.getFilter()); - } - // else, filters will be added to the outer query and relevant features need to be selected in the subquery - else { - List filterFeatures = constructorController.collectFeaturesFromFilter(query.getFilter(), - join.getRightQuery()).stream().filter(f -> !join.getRightQuery().getFeatures().contains(f)) - .collect(Collectors.toList()); - join.getRightQuery().getFeatures().addAll(filterFeatures); + } else { + // else, filters will be added to the outer query and relevant features need to be selected in the subquery + List filterOuterQueryFeatures = constructorController.collectFeaturesFromFilter(query.getFilter(), + join.getRightQuery()).stream() + .filter(f -> !join.getRightQuery().getAvailableFeatures().contains(f)) + .collect(Collectors.toList()); + List filterLFeatures = constructorController.collectFeaturesFromFilter(query.getFilter(), + join.getLeftQuery()).stream() + .filter(f -> !join.getLeftQuery().getAvailableFeatures().contains(f)) + .collect(Collectors.toList()); + filterOuterQueryFeatures.addAll(filterLFeatures); + join.getRightQuery().getFeatures().addAll(filterOuterQueryFeatures); } } diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/pit/TestPitJoinController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/pit/TestPitJoinController.java index 4ad0700fc2..530848c14d 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/pit/TestPitJoinController.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/pit/TestPitJoinController.java @@ -739,6 +739,7 @@ public void testGenerateSqlWithRightFilterInner() { "SELECT `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`ts` `ts`, `right_fg0`.`label` `label`, `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`ts` `ts`, `right_fg0`.`ft1` `ft1`, `right_fg1`.`R_pk1` `R_pk1`, `right_fg1`.`R_ts` `R_ts`, `right_fg1`.`R_ft1` `R_ft1`\n" + "FROM right_fg0\n" + "INNER JOIN right_fg1 ON `right_fg0`.`join_pk_pk1` = `right_fg1`.`join_pk_pk1` AND `right_fg0`.`join_evt_ts` = `right_fg1`.`join_evt_ts`)"; + Assert.assertEquals(expected, result); } @@ -804,6 +805,65 @@ public void testGenerateSqlWithLeftFilter() { "INNER JOIN right_fg1 ON `right_fg0`.`join_pk_pk1` = `right_fg1`.`join_pk_pk1` AND `right_fg0`.`join_evt_ts` = `right_fg1`.`join_evt_ts`)"; Assert.assertEquals(expected, result); } + + @Test + public void testGenerateSqlWithLeftFilterOmitted() { + List leftFeatures = new ArrayList<>(); + Feature filterFeature = new Feature("label", "fg0", fgLeft, "int", null); + leftFeatures.add(new Feature("pk1", "fg0", fgLeft, true)); + leftFeatures.add(new Feature("pk2", "fg0", fgLeft)); + leftFeatures.add(filterFeature); + + List rightFeatures = new ArrayList<>(); + rightFeatures.add(new Feature("pk1", "fg1", fgRight)); + rightFeatures.add(new Feature("pk2", "fg1", fgRight)); + rightFeatures.add(new Feature("ft1", "fg1", fgRight)); + + List rightFeatures1 = new ArrayList<>(); + rightFeatures1.add(new Feature("pk1", "fg2", fgRight1)); + rightFeatures1.add(new Feature("ft1", "fg2", fgRight1)); + + List leftOn = Arrays.asList(new Feature("pk1", "fg0", fgLeft), new Feature("pk2", "fg0", fgLeft)); + List rightOn = Arrays.asList(new Feature("pk1", "fg1", fgRight), new Feature("pk2", "fg1", fgRight)); + + // join on different pks + List leftOn1 = Collections.singletonList(new Feature("pk1", "fg0", fgLeft)); + List rightOn1 = Collections.singletonList(new Feature("pk1", "fg2", fgRight1)); + + List joinOperator = Arrays.asList(SqlCondition.EQUALS, SqlCondition.EQUALS); + List joinOperator1 = Collections.singletonList(SqlCondition.EQUALS); + + FilterLogic filter = new FilterLogic(new Filter(Arrays.asList(filterFeature), SqlCondition.EQUALS, "1")); + + Query query = new Query("fs", "project", fgLeft, "fg0", leftFeatures, leftFeatures, false, filter); + Query right = new Query("fs", "project", fgRight, "fg1", rightFeatures, rightFeatures, false, null); + Query right1 = new Query("fs", "project", fgRight1, "fg2", rightFeatures1, rightFeatures1, false, null); + + Join join = new Join(query, right, leftOn, rightOn, JoinType.INNER, null, joinOperator); + Join join1 = new Join(query, right1, leftOn1, rightOn1, JoinType.INNER, "R_", joinOperator1); + + query.setJoins(Arrays.asList(join, join1)); + + String result = + pitJoinController.generateSQL(query, false).toSqlString(new SparkSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); + String expected = "WITH right_fg0 AS (SELECT *\n" + + "FROM (SELECT `fg0`.`pk1` `pk1`, `fg0`.`pk2` `pk2`, `fg0`.`label` `label`, `fg0`.`pk1` `join_pk_pk1`, `fg0`.`ts` `join_evt_ts`, `fg1`.`pk1` `pk1`, `fg1`.`pk2` `pk2`, `fg1`.`ft1` `ft1`, RANK() OVER (PARTITION BY `fg0`.`pk1`, `fg0`.`pk2`, `fg0`.`ts` ORDER BY `fg1`.`ts` DESC) pit_rank_hopsworks\n" + + "FROM `fs`.`fg0_1` `fg0`\n" + + "INNER JOIN `fs`.`fg1_1` `fg1` ON `fg0`.`pk1` = `fg1`.`pk1` AND `fg0`.`pk2` = `fg1`.`pk2` AND `fg0`.`ts` >= `fg1`.`ts`\n" + + "WHERE `fg0`.`label` = 1) NA\n" + + "WHERE `pit_rank_hopsworks` = 1), right_fg1 AS (SELECT *\n" + + "FROM (SELECT `fg0`.`pk1` `pk1`, `fg0`.`pk2` `pk2`, `fg0`.`label` `label`, `fg0`.`pk1` `join_pk_pk1`, `fg0`.`ts` `join_evt_ts`, `fg2`.`pk1` `R_pk1`, `fg2`.`ft1` `R_ft1`, RANK() OVER (PARTITION BY `fg0`.`pk1`, `fg0`.`ts` ORDER BY `fg2`.`ts` DESC) pit_rank_hopsworks\n" + + "FROM `fs`.`fg0_1` `fg0`\n" + + "INNER JOIN `fs`.`fg2_1` `fg2` ON `fg0`.`pk1` = `fg2`.`pk1` AND `fg0`.`ts` >= `fg2`.`ts`\n" + + "WHERE `fg0`.`label` = 1) NA\n" + + "WHERE `pit_rank_hopsworks` = 1) (SELECT `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`label` `label`, `right_fg0`.`pk1` `pk1`, `right_fg0`.`pk2` `pk2`, `right_fg0`.`ft1` `ft1`, `right_fg1`.`R_pk1` `R_pk1`, `right_fg1`.`R_ft1` `R_ft1`\n" + + "FROM right_fg0\n" + + "INNER JOIN right_fg1 ON `right_fg0`.`join_pk_pk1` = `right_fg1`.`join_pk_pk1` AND `right_fg0`.`join_evt_ts` = " + + "`right_fg1`.`join_evt_ts`)"; + + System.out.println(result); + Assert.assertEquals(expected, result); + } @Test public void testGenerateSqlWithDefault() {