diff --git a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb index 9e0107dbe4..7ba7a1eddf 100644 --- a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb @@ -2009,13 +2009,12 @@ expect_status_details(201) expect(parsed_json["onlineEnabled"]).to be true - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) expect_status_details(200) end @@ -2041,13 +2040,12 @@ expect_status_details(200) expect(parsed_json["onlineEnabled"]).to be true - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) expect_status_details(200) end @@ -2074,13 +2072,12 @@ expect(parsed_json["onlineEnabled"]).to be false # topic should still be there as we currently don't delete it - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) expect_status_details(200) end @@ -2569,8 +2566,24 @@ expect(parsed_json["featurestoreName"] == project.projectname.downcase + "_featurestore").to be true expect(parsed_json["name"] == featuregroup_name).to be true expect(parsed_json["type"] == "cachedFeaturegroupDTO").to be true - expect(parsed_json["onlineTopicName"]).to eql(project.id.to_s + "_" + parsed_json["id"].to_s + "_" + - featuregroup_name + "_" + parsed_json["version"].to_s + "_onlinefs") + expect(parsed_json["onlineTopicName"]).to end_with("_onlinefs") + end + + it "should be able to add a cached featuregroup with online feature serving to the featurestore using specified topic" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + topic_name = "topic_#{random_id}_onlinefs" + json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true, topic_name:topic_name) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json.key?("id")).to be true + expect(parsed_json.key?("featurestoreName")).to be true + expect(parsed_json.key?("onlineEnabled")).to be true + expect(parsed_json.key?("name")).to be true + expect(parsed_json["featurestoreName"] == project.projectname.downcase + "_featurestore").to be true + expect(parsed_json["name"] == featuregroup_name).to be true + expect(parsed_json["type"] == "cachedFeaturegroupDTO").to be true + expect(parsed_json["onlineTopicName"]).to eql(topic_name) expect(parsed_json["location"]).to start_with("hopsfs://") end @@ -2580,13 +2593,12 @@ json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true) parsed_json = JSON.parse(json_result) expect_status_details(201) - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) expect_status_details(200) end @@ -2966,8 +2978,7 @@ json_result = enable_cached_featuregroup_online(project.id, featurestore_id, featuregroup_id) expect_status_details(200) parsed_json = JSON.parse(json_result) - expect(parsed_json["onlineTopicName"]).to eql(project.id.to_s + "_" + parsed_json["id"].to_s + "_" + - featuregroup_name + "_" + parsed_json["version"].to_s + "_onlinefs") + expect(parsed_json["onlineTopicName"]).to end_with("_onlinefs") end it "should be able to disable online serving for a online cached feature group" do @@ -2990,8 +3001,7 @@ expect_status_details(201) featuregroup_id = parsed_json["id"] featuregroup_version = parsed_json["version"] - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" disable_cached_featuregroup_online(project.id, featurestore_id, featuregroup_id, featuregroup_version) expect_status_details(200) get_project_topics(project.id) @@ -3001,10 +3011,10 @@ else topic = [] end - expect(topic.length).to eq(0) - get_subject_schema(project, topic_name, 1) - expect_status_details(404) - expect(json_body[:error_code]).to eql(40401) + expect(topic.length).to eq(1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) + expect_status_details(200) + expect(json_body[:error_code]).to eql(nil) end it "should update avro schema when features are appended to existing online feature group" do @@ -3037,13 +3047,12 @@ features: new_schema) expect_status_details(200) - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 2) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 2) expect_status_details(200) expect(json_body.to_json).to eql("{\"type\":\"record\",\"name\":\"#{featuregroup_name}_#{parsed_json["version"]}\",\"namespace\":" + "\"#{project.projectname.downcase}_featurestore.db\",\"fields\":[{\"name\":\"testfeature\"," + @@ -3070,7 +3079,6 @@ expect(parsed_json["connectionString"]).to include("jdbc:mysql:") expect(parsed_json["arguments"].find{ |item| item['name'] == 'password' }.key?('value')).to be true expect(parsed_json["arguments"].find{ |item| item['name'] == 'user' }.key?('value')).to be true - expect(parsed_json["location"]).to start_with("hopsfs://") expect_status_details(200) end end diff --git a/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb b/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb index 18e3226e7c..4d7439e526 100644 --- a/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb +++ b/hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb @@ -94,7 +94,7 @@ def create_cached_featuregroup_checked2(project_id, name: nil, version: 1, feat def create_cached_featuregroup(project_id, featurestore_id, features: nil, featuregroup_name: nil, online:false, version: 1, featuregroup_description: nil, statistics_config: nil, time_travel_format: - "NONE", event_time: nil, expectation_suite: nil, parents: nil) + "NONE", event_time: nil, expectation_suite: nil, parents: nil, topic_name: nil) type = "cachedFeaturegroupDTO" features = features == nil ? [{type: "INT", name: "testfeature", description: "testfeaturedescription", primary: true, onlineType: "INT", partition: false}] : features @@ -110,7 +110,8 @@ def create_cached_featuregroup(project_id, featurestore_id, features: nil, featu onlineEnabled: online, timeTravelFormat: time_travel_format, eventTime: event_time, - parents: parents + parents: parents, + topicName: topic_name } unless statistics_config == nil json_data[:statisticsConfig] = statistics_config @@ -128,7 +129,7 @@ def create_cached_featuregroup(project_id, featurestore_id, features: nil, featu def create_stream_featuregroup(project_id, featurestore_id, features: nil, featuregroup_name: nil, version: 1, featuregroup_description: nil, statistics_config: nil, event_time: nil, deltaStreamerJobConf: nil, materialize_offline: false, - commit_time: nil, online_enabled: true) + commit_time: nil, online_enabled: true, topic_name: nil) type = "streamFeatureGroupDTO" featuregroupType = "STREAM_FEATURE_GROUP" features = features == nil ? [{type: "INT", name: "testfeature", description: "testfeaturedescription", @@ -146,7 +147,8 @@ def create_stream_featuregroup(project_id, featurestore_id, features: nil, featu featuregroupType: featuregroupType, eventTime: event_time, deltaStreamerJobConf: deltaStreamerJobConf, - onlineEnabled: online_enabled + onlineEnabled: online_enabled, + topicName: topic_name } unless statistics_config == nil json_data[:statisticsConfig] = statistics_config @@ -199,7 +201,7 @@ def materialize_stream_featuregroup(featurestore_id, featuregroup_id, commit_tim def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, name: nil, version: 1, query: nil, features: nil, data_format: nil, options: nil, event_time: nil, - online_enabled: false) + online_enabled: false, topic_name: nil) type = "onDemandFeaturegroupDTO" featuregroupType = "ON_DEMAND_FEATURE_GROUP" create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/featuregroups" @@ -221,7 +223,8 @@ def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, query: query, featuregroupType: featuregroupType, eventTime: event_time, - onlineEnabled: online_enabled + onlineEnabled: online_enabled, + topicName: topic_name } unless data_format == nil @@ -841,7 +844,7 @@ def get_featurestore_tour_job_name return "featurestore_tour_job" end - def create_cached_featuregroup_with_partition(project_id, featurestore_id, time_travel_format: "NONE", online: false) + def create_cached_featuregroup_with_partition(project_id, featurestore_id, time_travel_format: "NONE", online: false, topic_name: nil) type = "cachedFeaturegroupDTO" featuregroupType = "CACHED_FEATURE_GROUP" create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups" @@ -871,7 +874,8 @@ def create_cached_featuregroup_with_partition(project_id, featurestore_id, time_ type: type, featuregroupType: featuregroupType, timeTravelFormat: time_travel_format, - onlineEnabled: online + onlineEnabled: online, + topicName: topic_name } json_data = json_data.to_json json_result = post create_featuregroup_endpoint, json_data diff --git a/hopsworks-IT/src/test/ruby/spec/stream_feature_group_spec.rb b/hopsworks-IT/src/test/ruby/spec/stream_feature_group_spec.rb index c08c62a4cd..ea8d6f04f7 100644 --- a/hopsworks-IT/src/test/ruby/spec/stream_feature_group_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/stream_feature_group_spec.rb @@ -35,8 +35,38 @@ expect(parsed_json["name"]).to eql(featuregroup_name) expect(parsed_json["type"]).to eql("streamFeatureGroupDTO") expect(parsed_json["timeTravelFormat"]).to eql("HUDI") - expect(parsed_json["onlineTopicName"]).to eql(project.id.to_s + "_" + parsed_json["id"].to_s + "_" + - featuregroup_name + "_" + parsed_json["version"].to_s + "_onlinefs") + expect(parsed_json["onlineTopicName"]).to end_with("_onlinefs") + + job_name = featuregroup_name + "_" + parsed_json["version"].to_s + "_" + "offline_fg_materialization" + job_json_result = get_job(project.id, job_name, expected_status: 200) + job_parsed_json = JSON.parse(job_json_result) + expect(job_parsed_json["name"]).to eql(job_name) + expect(job_parsed_json["config"]["mainClass"]).to eql("com.logicalclocks.utils.MainClass") + expect(job_parsed_json["config"]["spark.executor.instances"]).to eql(2) + expect(job_parsed_json["config"]["spark.executor.cores"]).to eql(2) + expect(job_parsed_json["config"]["spark.executor.memory"]).to eql(1500) + expect(job_parsed_json["config"]["spark.dynamicAllocation.enabled"]).to eql(true) + expect(job_parsed_json["config"]["spark.dynamicAllocation.minExecutors"]).to eql(2) + expect(job_parsed_json["config"]["spark.dynamicAllocation.maxExecutors"]).to eql(10) + expect(job_parsed_json["config"]["spark.dynamicAllocation.initialExecutors"]).to eql(1) + expect(job_parsed_json["config"]["spark.blacklist.enabled"]).to eql(false) + end + + it "should be able to add a stream featuregroup to the featurestore using specified topic" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + topic_name = "topic_#{random_id}_onlinefs" + json_result, featuregroup_name = create_stream_featuregroup(project.id, featurestore_id, topic_name:topic_name) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json.key?("id")).to be true + expect(parsed_json.key?("featurestoreName")).to be true + expect(parsed_json.key?("name")).to be true + expect(parsed_json["featurestoreName"]).to eql(project.projectname.downcase + "_featurestore") + expect(parsed_json["name"]).to eql(featuregroup_name) + expect(parsed_json["type"]).to eql("streamFeatureGroupDTO") + expect(parsed_json["timeTravelFormat"]).to eql("HUDI") + expect(parsed_json["onlineTopicName"]).to eql(topic_name) job_name = featuregroup_name + "_" + parsed_json["version"].to_s + "_" + "offline_fg_materialization" job_json_result = get_job(project.id, job_name, expected_status: 200) @@ -66,8 +96,38 @@ expect(parsed_json["name"]).to eql(featuregroup_name) expect(parsed_json["type"]).to eql("streamFeatureGroupDTO") expect(parsed_json["onlineEnabled"]).to be false - expect(parsed_json["onlineTopicName"]).to eql(project.id.to_s + "_" + parsed_json["id"].to_s + "_" + - featuregroup_name + "_" + parsed_json["version"].to_s) + expect(parsed_json["onlineTopicName"]).to eql(project.projectname) + + job_name = featuregroup_name + "_" + parsed_json["version"].to_s + "_" + "offline_fg_materialization" + job_json_result = get_job(project.id, job_name, expected_status: 200) + job_parsed_json = JSON.parse(job_json_result) + expect(job_parsed_json["name"]).to eql(job_name) + expect(job_parsed_json["config"]["mainClass"]).to eql("com.logicalclocks.utils.MainClass") + expect(job_parsed_json["config"]["spark.executor.instances"]).to eql(2) + expect(job_parsed_json["config"]["spark.executor.cores"]).to eql(2) + expect(job_parsed_json["config"]["spark.executor.memory"]).to eql(1500) + expect(job_parsed_json["config"]["spark.dynamicAllocation.enabled"]).to eql(true) + expect(job_parsed_json["config"]["spark.dynamicAllocation.minExecutors"]).to eql(2) + expect(job_parsed_json["config"]["spark.dynamicAllocation.maxExecutors"]).to eql(10) + expect(job_parsed_json["config"]["spark.dynamicAllocation.initialExecutors"]).to eql(1) + expect(job_parsed_json["config"]["spark.blacklist.enabled"]).to eql(false) + end + + it "should be able to add an offline only stream feature group to the feature store using specified topic" do + project = get_project + featurestore_id = get_featurestore_id(project.id) + topic_name = "topic_#{random_id}_onlinefs" + json_result, featuregroup_name = create_stream_featuregroup(project.id, featurestore_id, online_enabled: false, topic_name:topic_name) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + expect(parsed_json.key?("id")).to be true + expect(parsed_json.key?("featurestoreName")).to be true + expect(parsed_json.key?("name")).to be true + expect(parsed_json["featurestoreName"]).to eql(project.projectname.downcase + "_featurestore") + expect(parsed_json["name"]).to eql(featuregroup_name) + expect(parsed_json["type"]).to eql("streamFeatureGroupDTO") + expect(parsed_json["onlineEnabled"]).to be false + expect(parsed_json["onlineTopicName"]).to eql(topic_name) job_name = featuregroup_name + "_" + parsed_json["version"].to_s + "_" + "offline_fg_materialization" job_json_result = get_job(project.id, job_name, expected_status: 200) @@ -137,13 +197,12 @@ json_result, featuregroup_name = create_stream_featuregroup(project.id, featurestore_id) parsed_json = JSON.parse(json_result) expect_status_details(201) - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) expect_status_details(200) end @@ -153,13 +212,12 @@ json_result, featuregroup_name = create_stream_featuregroup(project.id, featurestore_id, online_enabled: false) parsed_json = JSON.parse(json_result) expect_status_details(201) - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + topic_name = project.projectname get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 1) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1) expect_status_details(200) end @@ -439,15 +497,14 @@ features: new_schema) expect_status_details(200) - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + "_onlinefs" + topic_name = project.projectname + "_onlinefs" get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 2) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 2) expect_status_details(200) - expect(json_body.to_json).to eql("{\"type\":\"record\",\"name\":\"#{featuregroup_name}\",\"namespace\":" + + expect(json_body.to_json).to eql("{\"type\":\"record\",\"name\":\"#{featuregroup_name}_#{parsed_json["version"]}\",\"namespace\":" + "\"#{project.projectname.downcase}_featurestore.db\",\"fields\":[{\"name\":\"testfeature\"," + "\"type\":[\"null\",\"int\"]},{\"name\":\"testfeature2\",\"type\":[\"null\",\"double\"]}]}") end @@ -482,15 +539,14 @@ features: new_schema) expect_status_details(200) - topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" + - parsed_json["version"].to_s + topic_name = project.projectname get_project_topics(project.id) expect_status_details(200) topic = json_body[:items].select{|topic| topic[:name] == topic_name} expect(topic.length).to eq(1) - get_subject_schema(project, topic[0][:name], 2) + get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 2) expect_status_details(200) - expect(json_body.to_json).to eql("{\"type\":\"record\",\"name\":\"#{featuregroup_name}\",\"namespace\":" + + expect(json_body.to_json).to eql("{\"type\":\"record\",\"name\":\"#{featuregroup_name}_#{parsed_json["version"]}\",\"namespace\":" + "\"#{project.projectname.downcase}_featurestore.db\",\"fields\":[{\"name\":\"testfeature\"," + "\"type\":[\"null\",\"int\"]},{\"name\":\"testfeature2\",\"type\":[\"null\",\"double\"]}]}") end diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java index ccc1ea12c8..f4f429c363 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeaturegroupService.java @@ -472,26 +472,24 @@ public Response updateFeaturegroup(@Context SecurityContext sc, updatedFeaturegroupDTO = featuregroupController.updateFeatureGroupStatsConfig( featurestore, featuregroupDTO, project, user); } - if(enableOnline && !featuregroup.isOnlineEnabled() && - (featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP || - featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP)) { - updatedFeaturegroupDTO = - featuregroupController.enableFeaturegroupOnline(featuregroup, project, user); - } - if(disableOnline && featuregroup.isOnlineEnabled() && - (featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP || - featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP)){ - updatedFeaturegroupDTO = featuregroupController.disableFeaturegroupOnline(featuregroup, project, user); - } - if (enableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP && - !featuregroup.isOnlineEnabled()) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE, - Level.FINE, "Please create a new version of the feature group to enable online storage."); + if(enableOnline && !featuregroup.isOnlineEnabled()) { + if(featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP || + featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + updatedFeaturegroupDTO = + featuregroupController.enableFeaturegroupOnline(featuregroup, project, user); + } else if(featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE, + Level.FINE, "Please create a new version of the feature group to enable online storage."); + } } - if (disableOnline && featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP && - featuregroup.isOnlineEnabled()) { - throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE, - Level.FINE, "Please create a new version of the feature group to disable online storage."); + if(disableOnline && featuregroup.isOnlineEnabled()) { + if(featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP || + featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + updatedFeaturegroupDTO = featuregroupController.disableFeaturegroupOnline(featuregroup, project, user); + } else if(featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.STREAM_FEATURE_GROUP_ONLINE_DISABLE_ENABLE, + Level.FINE, "Please create a new version of the feature group to disable online storage."); + } } if(deprecate != null) { updatedFeaturegroupDTO = featuregroupController.deprecateFeatureGroup(project, user, featuregroup, deprecate); diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featureview/FeatureViewBuilder.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featureview/FeatureViewBuilder.java index e84868e33d..5a36e192f7 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featureview/FeatureViewBuilder.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featureview/FeatureViewBuilder.java @@ -36,7 +36,6 @@ import io.hops.hopsworks.common.featurestore.query.QueryDTO; import io.hops.hopsworks.common.featurestore.query.pit.PitJoinController; import io.hops.hopsworks.common.featurestore.trainingdatasets.TrainingDatasetController; -import io.hops.hopsworks.common.hdfs.Utils; import io.hops.hopsworks.exceptions.DatasetException; import io.hops.hopsworks.exceptions.FeaturestoreException; import io.hops.hopsworks.exceptions.MetadataException; @@ -161,7 +160,7 @@ public FeatureViewDTO build(FeatureView featureView, ResourceRequest resourceReq base.setQuery(queryBuilder.build(query, featureView.getFeaturestore(), project, user)); } if (resourceRequest.contains(ResourceRequest.Name.FEATURES)) { - base.setFeatures(makeFeatures(featureView, project)); + base.setFeatures(makeFeatures(featureView)); } if (resourceRequest.contains(ResourceRequest.Name.KEYWORDS)) { List<String> keywords = keywordCtrl.getKeywords(featureView); @@ -193,7 +192,7 @@ public FeatureViewDTO convertToDTO(FeatureView featureView) { return featureViewDTO; } - private List<TrainingDatasetFeatureDTO> makeFeatures(FeatureView featureView, Project project) { + private List<TrainingDatasetFeatureDTO> makeFeatures(FeatureView featureView) { List<TrainingDatasetFeature> tdFeatures = featureViewController.getFeaturesSorted(featureView.getFeatures()); Map<Integer, String> fsLookupTable = trainingDatasetController.getFsLookupTableFeatures(tdFeatures); return tdFeatures @@ -204,8 +203,6 @@ private List<TrainingDatasetFeatureDTO> makeFeatures(FeatureView featureView, Pr fsLookupTable.get(f.getFeatureGroup().getFeaturestore().getId()), f.getFeatureGroup().getId(), f.getFeatureGroup().getName(), f.getFeatureGroup().getVersion(), - onlineFeaturegroupController.onlineFeatureGroupTopicName(project.getId(), - f.getFeatureGroup().getId(), Utils.getFeaturegroupName(f.getFeatureGroup())), f.getFeatureGroup().isDeprecated()) : null, f.getIndex(), f.isLabel())) diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java index af2b3e0479..4511d8d545 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/kafka/KafkaResource.java @@ -230,7 +230,7 @@ public Response getTopic(@Context UriInfo uriInfo, @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) @ApiKeyRequired(acceptedScopes = {ApiScope.KAFKA}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT", "HOPS_SERVICE_USER"}) public Response getSchema(@PathParam("id") Integer id, @Context HttpServletRequest req, @Context SecurityContext sc) { @@ -388,7 +388,7 @@ public Response getSubjectVersions(@PathParam("subject") String subject, @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) @ApiKeyRequired(acceptedScopes = {ApiScope.KAFKA}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT", "HOPS_SERVICE_USER"}) + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) public Response getSubjectDetails(@PathParam("subject") String subject, @PathParam("version") String version, @Context HttpServletRequest req, @@ -544,7 +544,7 @@ public Response setSubjectCompatibility(@PathParam("subject") String subject, Co @JWTRequired(acceptedTokens = {Audience.API, Audience.JOB}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) @ApiKeyRequired(acceptedScopes = {ApiScope.KAFKA}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT", "HOPS_SERVICE_USER"}) + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) public Response getTopicSubject(@PathParam("topic") String topic, @Context HttpServletRequest req, @Context SecurityContext sc) throws KafkaException { diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java index 92e7009ecb..2236d295a1 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/project/ProjectService.java @@ -245,6 +245,8 @@ public Response findAllByUser(@Context HttpServletRequest req, @Context Security @GET @Path("/getAll") @Produces(MediaType.APPLICATION_JSON) + @ApiKeyRequired(acceptedScopes = {ApiScope.PROJECT, ApiScope.KAFKA}, + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT", "HOPS_SERVICE_USER"}) public Response getAllProjects(@Context HttpServletRequest req, @Context SecurityContext sc) { List<Project> list = projectFacade.findAll(); GenericEntity<List<Project>> projects = new GenericEntity<List<Project>>(list) { @@ -463,8 +465,8 @@ public Response checkProjectAccess(@PathParam("projectId") Integer id, @Context @Produces(MediaType.APPLICATION_JSON) @JWTRequired(acceptedTokens = {Audience.API}, allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) - @ApiKeyRequired(acceptedScopes = {ApiScope.PROJECT, ApiScope.KAFKA}, - allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "AGENT", "HOPS_SERVICE_USER"}) + @ApiKeyRequired(acceptedScopes = {ApiScope.PROJECT}, + allowedUserRoles = {"HOPS_ADMIN", "HOPS_USER", "HOPS_SERVICE_USER"}) @AllowedProjectRoles({AllowedProjectRoles.DATA_SCIENTIST, AllowedProjectRoles.DATA_OWNER}) public Response findByProjectID(@PathParam("projectId") Integer id, @Context HttpServletRequest req, diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/TopicDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/TopicDTO.java index 5aa821c74c..0b54106cf7 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/TopicDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/TopicDTO.java @@ -42,7 +42,6 @@ import io.hops.hopsworks.common.api.RestDTO; import java.io.Serializable; -import javax.ws.rs.core.UriInfo; public class TopicDTO extends RestDTO<TopicDTO> implements Serializable { @@ -66,25 +65,6 @@ public class TopicDTO extends RestDTO<TopicDTO> implements Serializable { public TopicDTO() { } - - public TopicDTO(UriInfo uriInfo) { - this.setHref(uriInfo.getAbsolutePathBuilder().build()); - } - - public TopicDTO(String name) { - this.name = name; - } - - public TopicDTO(String name, Integer ownerProjectId, String schemaName, Integer schemaVersion, String - schemaContent, Boolean isShared, Boolean accepted) { - this.name = name; - this.ownerProjectId = ownerProjectId; - this.schemaName = schemaName; - this.schemaVersion = schemaVersion; - this.schemaContent = schemaContent; - this.isShared = isShared; - this.accepted = accepted; - } public TopicDTO(String name, Integer numOfReplicas, Integer numOfPartitions) { this.name = name; @@ -92,13 +72,6 @@ public TopicDTO(String name, Integer numOfReplicas, Integer numOfPartitions) { this.numOfPartitions = numOfPartitions; } - public TopicDTO(String name, String schemaName, Integer schemaVersion, Boolean isShared) { - this.name = name; - this.schemaName = schemaName; - this.schemaVersion = schemaVersion; - this.isShared = isShared; - } - public TopicDTO(String name, Integer numOfReplicas, Integer numOfPartitions, String schemaName, Integer schemaVersion) { this.name = name; @@ -108,17 +81,6 @@ public TopicDTO(String name, Integer numOfReplicas, Integer numOfPartitions, this.schemaVersion = schemaVersion; } - public TopicDTO(String name, Integer numOfReplicas, Integer numOfPartitions, - String schemaName, Integer schemaVersion, Integer ownerProjectId, Boolean isShared) { - this.name = name; - this.numOfReplicas = numOfReplicas; - this.numOfPartitions = numOfPartitions; - this.schemaName = schemaName; - this.schemaVersion = schemaVersion; - this.isShared = isShared; - this.ownerProjectId = ownerProjectId; - } - public TopicDTO(String topicName, String subject, Integer subjectVersion, String schema) { this.name = topicName; this.schemaName = subject; diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java index 439b02f5c2..405c6ac38f 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java @@ -267,7 +267,7 @@ public FeaturegroupDTO createFeaturegroupNoValidation(Featurestore featurestore, if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled() && !isSpine) { onlineFeaturegroupController.setupOnlineFeatureGroup(featurestore, featuregroup, featuresNoHudi, project, user); } else if (featuregroupDTO instanceof StreamFeatureGroupDTO && !featuregroupDTO.getOnlineEnabled()) { - streamFeatureGroupController.setupOfflineStreamFeatureGroup(project, featuregroup, featuresNoHudi); + onlineFeaturegroupController.createFeatureGroupKafkaTopic(project, featuregroup, featuresNoHudi); } FeaturegroupDTO completeFeaturegroupDTO = convertFeaturegrouptoDTO(featuregroup, project, user); @@ -321,7 +321,7 @@ public FeaturegroupDTO convertFeaturegrouptoDTO(Featuregroup featuregroup, Proje return cachedFeaturegroupDTO; case STREAM_FEATURE_GROUP: StreamFeatureGroupDTO streamFeatureGroupDTO = - streamFeatureGroupController.convertStreamFeatureGroupToDTO(featuregroup, project, user); + streamFeatureGroupController.convertStreamFeatureGroupToDTO(featuregroup); streamFeatureGroupDTO.setFeaturestoreName(featurestoreName); if (includeFeatures) { streamFeatureGroupDTO.setFeatures( @@ -634,7 +634,7 @@ public void deleteFeaturegroup(Featuregroup featuregroup, Project project, Users onlineFeaturegroupController.disableOnlineFeatureGroup(featuregroup, project, user); } else { // only topics need to be deleted, but no RonDB table - streamFeatureGroupController.deleteOfflineStreamFeatureGroupTopic(project, featuregroup); + onlineFeaturegroupController.deleteFeatureGroupKafkaTopic(project, featuregroup); } break; case ON_DEMAND_FEATURE_GROUP: @@ -761,6 +761,7 @@ private Featuregroup persistFeaturegroupMetadata(Featurestore featurestore, Proj featuregroup.setOnDemandFeaturegroup(onDemandFeaturegroup); featuregroup.setEventTime(featuregroupDTO.getEventTime()); featuregroup.setOnlineEnabled(settings.isOnlineFeaturestore() && featuregroupDTO.getOnlineEnabled()); + featuregroup.setTopicName(featuregroupDTO.getTopicName()); StatisticsConfig statisticsConfig = new StatisticsConfig(featuregroupDTO.getStatisticsConfig().getEnabled(), featuregroupDTO.getStatisticsConfig().getCorrelations(), featuregroupDTO.getStatisticsConfig().getHistograms(), diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java index d90b8a2eda..d2f7bc3d01 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupDTO.java @@ -58,14 +58,14 @@ public class FeaturegroupDTO extends FeaturestoreEntityDTO<FeaturegroupDTO> { private Boolean onlineEnabled = false; @JsonSetter(nulls = Nulls.SKIP) private Boolean deprecated = false; + private String topicName; public FeaturegroupDTO() { } public FeaturegroupDTO(Integer featurestoreId, String featurestoreName, Integer id, String name, Integer version, - String onlineTopicName, Boolean deprecated) { + Boolean deprecated) { super(featurestoreId, featurestoreName, id, name, version); - this.onlineTopicName = onlineTopicName; this.deprecated = deprecated; } @@ -75,6 +75,7 @@ public FeaturegroupDTO(Featuregroup featuregroup) { featuregroup.getId(), new StatisticsConfigDTO(featuregroup.getStatisticsConfig())); this.eventTime = featuregroup.getEventTime(); this.deprecated = featuregroup.isDeprecated(); + this.topicName = featuregroup.getTopicName(); } // for testing @@ -122,6 +123,14 @@ public void setOnlineEnabled(Boolean onlineEnabled) { this.onlineEnabled = onlineEnabled; } + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + public Boolean getDeprecated() { return deprecated; } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java index 7a9e46fc28..cda49181e1 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java @@ -295,9 +295,7 @@ public CachedFeaturegroupDTO convertCachedFeaturegroupToDTO(Featuregroup feature if (settings.isOnlineFeaturestore() && featuregroup.isOnlineEnabled()) { cachedFeaturegroupDTO.setOnlineEnabled(true); - cachedFeaturegroupDTO.setOnlineTopicName(onlineFeaturegroupController - .onlineFeatureGroupTopicName(project.getId(), featuregroup.getId(), - Utils.getFeaturegroupName(featuregroup))); + cachedFeaturegroupDTO.setOnlineTopicName(Utils.getFeatureGroupTopicName(featuregroup)); } cachedFeaturegroupDTO.setName(featuregroup.getName()); cachedFeaturegroupDTO.setTimeTravelFormat(featuregroup.getCachedFeaturegroup().getTimeTravelFormat()); @@ -517,10 +515,10 @@ public void enableFeaturegroupOnline(Featurestore featurestore, Featuregroup fea } if(!featuregroup.isOnlineEnabled()) { + featuregroup.setOnlineEnabled(true); onlineFeaturegroupController.setupOnlineFeatureGroup(featurestore, featuregroup, features, project, user); } //Set foreign key of the cached feature group to the new online feature group - featuregroup.setOnlineEnabled(true); featureGroupFacade.updateFeaturegroupMetadata(featuregroup); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java index f88fecfd7c..ca94585caa 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/ondemand/OnDemandFeaturegroupController.java @@ -158,9 +158,7 @@ public OnDemandFeaturegroup createSpineGroup(Featurestore featurestore, public OnDemandFeaturegroupDTO convertOnDemandFeatureGroupToDTO(String featureStoreName, Featuregroup featureGroup, FeaturestoreStorageConnectorDTO storageConnectorDTO) { - String onlineTopicName = onlineFeatureGroupController.onlineFeatureGroupTopicName( - featureGroup.getFeaturestore().getProject().getId(), - featureGroup.getId(), Utils.getFeaturegroupName(featureGroup)); + String onlineTopicName = Utils.getFeatureGroupTopicName(featureGroup); return new OnDemandFeaturegroupDTO(featureStoreName, featureGroup, storageConnectorDTO, null, onlineTopicName); } @@ -330,9 +328,9 @@ public void enableFeatureGroupOnline(Featurestore featurestore, Featuregroup fea UserException, IOException, HopsSecurityException { List<FeatureGroupFeatureDTO> features = getFeaturesDTO(featuregroup); if(!featuregroup.isOnlineEnabled()) { + featuregroup.setOnlineEnabled(true); onlineFeatureGroupController.setupOnlineFeatureGroup(featurestore, featuregroup, features, project, user); } - featuregroup.setOnlineEnabled(true); featureGroupFacade.updateFeaturegroupMetadata(featuregroup); } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/AvroSchemaConstructorController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/AvroSchemaConstructorController.java index d6ba11e2a2..a2ebd747fa 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/AvroSchemaConstructorController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/AvroSchemaConstructorController.java @@ -18,7 +18,9 @@ import com.google.common.base.Strings; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; +import io.hops.hopsworks.common.hdfs.Utils; import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; import io.hops.hopsworks.restutils.RESTCodes; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -40,8 +42,11 @@ public class AvroSchemaConstructorController { public AvroSchemaConstructorController() {} - public String constructSchema(String featureGroupEntityName, String featureStoreName, - List<FeatureGroupFeatureDTO> schema) throws FeaturestoreException { + public String constructSchema(Featuregroup featuregroup, List<FeatureGroupFeatureDTO> schema) + throws FeaturestoreException { + String featureGroupEntityName = Utils.getFeaturegroupName(featuregroup); + String featureStoreName = Utils.getFeaturestoreName(featuregroup.getFeaturestore().getProject()); + SchemaBuilder.TypeBuilder<Schema> avroSchema = SchemaBuilder.builder(); // top level needs to be record diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java index ee743c4a48..14cf6fcbbc 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java @@ -19,7 +19,6 @@ import com.google.common.base.Strings; import com.logicalclocks.shaded.org.apache.commons.lang3.StringUtils; import io.hops.hopsworks.common.dao.kafka.TopicDTO; -import io.hops.hopsworks.common.dao.kafka.schemas.SubjectDTO; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupController; @@ -41,7 +40,6 @@ import io.hops.hopsworks.exceptions.ProjectException; import io.hops.hopsworks.exceptions.SchemaException; import io.hops.hopsworks.exceptions.ServiceException; -import io.hops.hopsworks.exceptions.UserException; import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; import io.hops.hopsworks.persistence.entity.kafka.schemas.SchemaCompatibility; @@ -106,7 +104,6 @@ public class OnlineFeaturegroupController { private final static String VARBINARY_DEFAULT = "VARBINARY(100)"; private final static String VARCHAR_DEFAULT = "VARCHAR(100)"; - private static final String KAFKA_TOPIC_SUFFIX = "_onlinefs"; public OnlineFeaturegroupController() {} @@ -142,15 +139,6 @@ public void createMySQLTable(Featurestore featurestore, String tableName, List<F public void setupOnlineFeatureGroup(Featurestore featureStore, Featuregroup featureGroup, List<FeatureGroupFeatureDTO> features, Project project, Users user) - throws FeaturestoreException, SQLException, SchemaException, KafkaException, ProjectException, UserException, - IOException, HopsSecurityException, ServiceException { - setupOnlineFeatureGroup(featureStore, featureGroup.getId(), featureGroup.getName(), featureGroup.getVersion(), - features, project, user); - } - - public void setupOnlineFeatureGroup(Featurestore featureStore, Integer featureGroupId, String featureGroupName, - Integer featureGroupVersion, List<FeatureGroupFeatureDTO> features, - Project project, Users user) throws KafkaException, SchemaException, ProjectException, FeaturestoreException, SQLException, IOException, HopsSecurityException, ServiceException { // check if onlinefs user is part of project @@ -165,46 +153,44 @@ public void setupOnlineFeatureGroup(Featurestore featureStore, Integer featureGr } } - String featureGroupEntityName = Utils.getFeatureStoreEntityName(featureGroupName, featureGroupVersion); - + String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup); createMySQLTable(featureStore, featureGroupEntityName, features, project, user); - String topicName = onlineFeatureGroupTopicName(project.getId(), featureGroupId, featureGroupEntityName); - createFeatureGroupKafkaTopic(project, featureGroupEntityName, topicName, features); + createFeatureGroupKafkaTopic(project, featureGroup, features); } - - // For ingesting data in the online feature store, we setup a new topic for each feature group - // The topic schema is also registered so it's available both for the hsfs library and for the collector - public void createFeatureGroupKafkaTopic(Project project, String featureGroupEntityName, - String topicName, List<FeatureGroupFeatureDTO> features) + + // For ingesting data in the online feature store, we set up a topic for project/feature group + // The topic schema is registered for each feature group + public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup, + List<FeatureGroupFeatureDTO> features) throws KafkaException, SchemaException, FeaturestoreException { - - String avroSchema = avroSchemaConstructorController - .constructSchema(featureGroupEntityName, Utils.getFeaturestoreName(project), features); + String avroSchema = avroSchemaConstructorController.constructSchema(featureGroup, features); schemasController.validateSchema(project, avroSchema); - - SubjectDTO topicSubject = subjectsController.registerNewSubject(project, topicName, avroSchema, false); - subjectsCompatibilityController.setSubjectCompatibility(project, topicName, SchemaCompatibility.NONE); - TopicDTO topicDTO = new TopicDTO(topicName, - settings.getKafkaDefaultNumReplicas(), - settings.getOnlineFsThreadNumber(), - topicSubject.getSubject(), - topicSubject.getVersion()); - kafkaController.createTopic(project, topicDTO); - } - - public String onlineFeatureGroupTopicName(Integer projectId, Integer featureGroupId, String featureGroupEntityName) { - return projectId.toString() + "_" + featureGroupId.toString() + "_" + featureGroupEntityName + KAFKA_TOPIC_SUFFIX; + + String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup); + subjectsController.registerNewSubject(project, featureGroupEntityName, avroSchema, false); + subjectsCompatibilityController.setSubjectCompatibility(project, featureGroupEntityName, SchemaCompatibility.NONE); + + String topicName = Utils.getFeatureGroupTopicName(featureGroup); + if (!kafkaController.projectTopicExists(project, topicName)) { + TopicDTO topicDTO = new TopicDTO(topicName, + settings.getKafkaDefaultNumReplicas(), + settings.getOnlineFsThreadNumber()); + kafkaController.createTopic(project, topicDTO); + } } - public void deleteFeatureGroupKafkaTopic(Project project, String topicName) - throws KafkaException, SchemaException { + public void deleteFeatureGroupKafkaTopic(Project project, Featuregroup featureGroup) + throws KafkaException, SchemaException { + String topicName = Utils.getFeatureGroupTopicName(featureGroup); + String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup); // user might have deleted topic manually - if (kafkaController.projectTopicExists(project, topicName)) { + if (topicName.equals(featureGroup.getTopicName())) { + // delete topic only if it is unique to fg kafkaController.removeTopicFromProject(project, topicName); } - if (!subjectsController.getSubjectVersions(project, topicName).isEmpty()) { - subjectsController.deleteSubject(project, topicName); + if (!subjectsController.getSubjectVersions(project, featureGroupEntityName).isEmpty()) { + subjectsController.deleteSubject(project, featureGroupEntityName); } } @@ -212,29 +198,25 @@ public void alterOnlineFeatureGroupSchema(Featuregroup featureGroup, List<Featur List<FeatureGroupFeatureDTO> fullNewSchema, Project project, Users user) throws FeaturestoreException, SchemaException, SQLException, KafkaException { - String tableName = Utils.getFeatureStoreEntityName(featureGroup.getName(), featureGroup.getVersion()); - String topicName = onlineFeatureGroupTopicName(project.getId(), featureGroup.getId(), tableName); + String tableName = Utils.getFeaturegroupName(featureGroup); alterMySQLTableColumns(featureGroup.getFeaturestore(), tableName, newFeatures, project, user); - alterFeatureGroupSchema(featureGroup, fullNewSchema, topicName, project); + alterFeatureGroupSchema(featureGroup, fullNewSchema, project); } public void alterFeatureGroupSchema(Featuregroup featureGroup, List<FeatureGroupFeatureDTO> fullNewSchema, - String topicName, Project project) + Project project) throws FeaturestoreException, SchemaException, KafkaException { // publish new version of avro schema - String avroSchema = avroSchemaConstructorController.constructSchema( - Utils.getFeatureStoreEntityName(featureGroup.getName(), featureGroup.getVersion()), - Utils.getFeaturestoreName(project), fullNewSchema); + String avroSchema = avroSchemaConstructorController.constructSchema(featureGroup, fullNewSchema); schemasController.validateSchema(project, avroSchema); - SubjectDTO topicSubject = subjectsController.registerNewSubject(project, topicName, avroSchema, false); - kafkaController.updateTopicSchemaVersion(project, topicName, topicSubject.getVersion()); + String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup); + subjectsController.registerNewSubject(project, featureGroupEntityName, avroSchema, false); } public void disableOnlineFeatureGroup(Featuregroup featureGroup, Project project, Users user) throws FeaturestoreException, SQLException, SchemaException, KafkaException { dropMySQLTable(featureGroup, project, user); - String topicName = onlineFeatureGroupTopicName(project.getId(), featureGroup.getId(), - Utils.getFeaturegroupName(featureGroup)); + String topicName = Utils.getFeatureGroupTopicName(featureGroup); // HOPSWORKS-3252 - we keep kafka topics in order to avoid consumers getting blocked // deleteFeatureGroupKafkaTopic(project, topicName); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupController.java index 8f50515521..8c0039fc27 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/stream/StreamFeatureGroupController.java @@ -86,23 +86,14 @@ public class StreamFeatureGroupController { * @param featuregroup the entity to convert * @return the converted DTO representation */ - public StreamFeatureGroupDTO convertStreamFeatureGroupToDTO(Featuregroup featuregroup, Project project, Users user) - throws FeaturestoreException, ServiceException { + public StreamFeatureGroupDTO convertStreamFeatureGroupToDTO(Featuregroup featuregroup) + throws ServiceException { StreamFeatureGroupDTO streamFeatureGroupDTO = new StreamFeatureGroupDTO(featuregroup); - - if (featuregroup.isOnlineEnabled()) { - streamFeatureGroupDTO.setOnlineTopicName(onlineFeaturegroupController - .onlineFeatureGroupTopicName(project.getId(), featuregroup.getId(), - Utils.getFeaturegroupName(featuregroup))); - } else { - streamFeatureGroupDTO.setOnlineTopicName(offlineStreamFeatureGroupTopicName(project.getId(), - featuregroup.getId(), Utils.getFeaturegroupName(featuregroup))); - } - + streamFeatureGroupDTO.setOnlineTopicName(Utils.getFeatureGroupTopicName(featuregroup)); streamFeatureGroupDTO.setName(featuregroup.getName()); streamFeatureGroupDTO.setDescription(featuregroup.getDescription()); streamFeatureGroupDTO.setOnlineEnabled(featuregroup.isOnlineEnabled()); - + streamFeatureGroupDTO.setLocation( featurestoreUtils.resolveLocation(featuregroupController.getFeatureGroupLocation(featuregroup))); return streamFeatureGroupDTO; @@ -243,7 +234,7 @@ public void updateMetadata(Project project, Users user, Featuregroup featuregrou onlineFeaturegroupController.alterOnlineFeatureGroupSchema( featuregroup, newFeatures, featuregroupDTO.getFeatures(), project, user); } else { - alterOfflineStreamFeatureGroupSchema(featuregroup, featuregroupDTO.getFeatures(), project); + onlineFeaturegroupController.alterFeatureGroupSchema(featuregroup, featuregroupDTO.getFeatures(), project); } // Log schema change @@ -269,35 +260,4 @@ private void updateCachedDescriptions(StreamFeatureGroup streamFeatureGroup, } streamFeatureGroupFacade.updateMetadata(streamFeatureGroup); } - - public void deleteOfflineStreamFeatureGroupTopic(Project project, Featuregroup featureGroup) - throws SchemaException, KafkaException { - String topicName = offlineStreamFeatureGroupTopicName( - project.getId(), featureGroup.getId(), Utils.getFeaturegroupName(featureGroup)); - onlineFeaturegroupController.deleteFeatureGroupKafkaTopic(project, topicName); - } - - private void alterOfflineStreamFeatureGroupSchema(Featuregroup featureGroup, - List<FeatureGroupFeatureDTO> fullNewSchema, Project project) - throws SchemaException, KafkaException, FeaturestoreException { - String topicName = offlineStreamFeatureGroupTopicName( - project.getId(), featureGroup.getId(), Utils.getFeaturegroupName(featureGroup)); - onlineFeaturegroupController.alterFeatureGroupSchema(featureGroup, fullNewSchema, topicName, project); - } - - public void setupOfflineStreamFeatureGroup(Project project, Featuregroup featureGroup, - List<FeatureGroupFeatureDTO> features) - throws SchemaException, KafkaException, FeaturestoreException { - String featureGroupEntityName = Utils.getFeaturegroupName(featureGroup); - String topicName = offlineStreamFeatureGroupTopicName( - project.getId(), featureGroup.getId(), featureGroupEntityName); - - onlineFeaturegroupController.createFeatureGroupKafkaTopic(project, featureGroupEntityName, topicName, - features); - } - - public String offlineStreamFeatureGroupTopicName(Integer projectId, Integer featureGroupId, - String featureGroupEntityName) { - return projectId.toString() + "_" + featureGroupId.toString() + "_" + featureGroupEntityName; - } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/trainingdatasets/TrainingDatasetController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/trainingdatasets/TrainingDatasetController.java index 5f20ee84ab..15e85c3cb4 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/trainingdatasets/TrainingDatasetController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/trainingdatasets/TrainingDatasetController.java @@ -229,8 +229,6 @@ public TrainingDatasetDTO convertTrainingDatasetToDTO(Users user, Project projec fsLookupTable.get(f.getFeatureGroup().getFeaturestore().getId()), f.getFeatureGroup().getId(), f.getFeatureGroup().getName(), f.getFeatureGroup().getVersion(), - onlineFeaturegroupController.onlineFeatureGroupTopicName(project.getId(), - f.getFeatureGroup().getId(), Utils.getFeaturegroupName(f.getFeatureGroup())), f.getFeatureGroup().isDeprecated()) : null, f.getIndex(), f.isLabel())) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/hdfs/Utils.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/hdfs/Utils.java index cb5b4272c3..60662d5c91 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/hdfs/Utils.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/hdfs/Utils.java @@ -40,6 +40,7 @@ package io.hops.hopsworks.common.hdfs; import com.google.common.base.CharMatcher; +import com.google.common.base.Strings; import io.hops.hopsworks.persistence.entity.dataset.Dataset; import io.hops.hopsworks.persistence.entity.dataset.DatasetType; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; @@ -61,6 +62,8 @@ public final class Utils { private static final Logger logger = Logger.getLogger(Utils.class.getName()); + private static final String FEATURE_STORE_SUFFIX = "_featurestore.db"; + public static String getFileName(String path) { int lastSlash = path.lastIndexOf("/"); int startName = (lastSlash > -1) ? lastSlash + 1 : 0; @@ -140,7 +143,7 @@ public static String getHiveDBPath(Project project, Settings settings) { } public static String getFeaturestoreName(Project project) { - return project.getName().toLowerCase() + "_featurestore.db"; + return project.getName().toLowerCase() + FEATURE_STORE_SUFFIX; } public static String getFeaturestorePath(Project project, Settings settings) { @@ -163,6 +166,23 @@ public static String getFeatureStoreEntityName(String entityName, Integer versio return entityName + "_" + version.toString(); } + public static String getFeatureGroupTopicName(Featuregroup featureGroup) { + if (!Strings.isNullOrEmpty(featureGroup.getTopicName())) { + return featureGroup.getTopicName(); + } + + Project project = featureGroup.getFeaturestore().getProject(); + if (!Strings.isNullOrEmpty(project.getTopicName())) { + return project.getTopicName(); + } + + String topicName = project.getName(); + if (featureGroup.isOnlineEnabled()) { + topicName += "_onlinefs"; + } + return topicName; + } + /** * The root '/' is considered '0', so the answer is incorrect for root, but * that doesn't matter. '/blah.txt' should return '1'. diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java index 3a52c64143..359b6813f8 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java @@ -193,11 +193,14 @@ public List<TopicDTO> findTopicsByProject(Project project) { List<TopicDTO> topics = new ArrayList<>(); for (ProjectTopics pt : ptList) { - topics.add( - new TopicDTO(pt.getTopicName(), - pt.getSubjects().getSubject(), - pt.getSubjects().getVersion(), - false)); + TopicDTO topicDTO = new TopicDTO(pt.getTopicName(), pt.getNumOfReplicas(), pt.getNumOfPartitions()); + Subjects subjects = pt.getSubjects(); + if (subjects != null) { + topicDTO.setSchemaName(subjects.getSubject()); + topicDTO.setSchemaVersion(subjects.getVersion()); + } + topicDTO.setShared(false); + topics.add(topicDTO); } return topics; } @@ -214,10 +217,13 @@ public List<TopicDTO> findAllTopicsByProject(Project project) { private ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) throws KafkaException { - Subjects schema = - subjectsFacade.findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion()) - .orElseThrow(() -> - new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, "topic: " + topicDto.getName())); + Subjects subjects = null; + if (topicDto.getSchemaName() != null && topicDto.getSchemaVersion() != null) { + subjects = subjectsFacade + .findSubjectByNameAndVersion(project, topicDto.getSchemaName(), topicDto.getSchemaVersion()) + .orElseThrow(() -> new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_NOT_FOUND, Level.FINE, + "topic: " + topicDto.getName())); + } // create the topic in kafka try { @@ -243,7 +249,7 @@ private ProjectTopics createTopicInProject(Project project, TopicDTO topicDto) t * user. Hence, the above schema query will be empty. */ ProjectTopics pt = new ProjectTopics(topicDto.getName(), topicDto.getNumOfPartitions(), - topicDto.getNumOfReplicas(), project, schema); + topicDto.getNumOfReplicas(), project, subjects); projectTopicsFacade.save(pt); @@ -316,22 +322,6 @@ public TopicDefaultValueDTO topicDefaultValues() { brokers.size()); } - public void updateTopicSchemaVersion(Project project, String topicName, Integer schemaVersion) throws KafkaException { - Optional<ProjectTopics> optionalPt = projectTopicsFacade.findTopicByNameAndProject(project, topicName); - - if (optionalPt.isPresent()) { - ProjectTopics pt = optionalPt.get(); - String schemaName = pt.getSubjects().getSubject(); - - Subjects st = subjectsFacade.findSubjectByNameAndVersion(project, schemaName, schemaVersion) - .orElseThrow(() -> - new KafkaException(RESTCodes.KafkaErrorCode.SCHEMA_VERSION_NOT_FOUND, Level.FINE, - "schema: " + schemaName + ", version: " + schemaVersion)); - - projectTopicsFacade.updateTopicSchemaVersion(pt, st); - } - } - public SubjectDTO getSubjectForTopic(Project project, String topic) throws KafkaException { Optional<ProjectTopics> pt = projectTopicsFacade.findTopicByNameAndProject(project, topic); if (!pt.isPresent()) { diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index a8e39222be..9b24c8b435 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -381,7 +381,7 @@ protected Project createProjectInternal(ProjectDTO projectDTO, Users owner) thro * until this project is removed from the database */ try { - project = createProjectDbMetadata(projectName, owner, projectDTO.getDescription()); + project = createProjectDbMetadata(projectName, owner, projectDTO); LOGGER.log(Level.INFO, projectCreationLog(projectDTO, "Created DB metadata")); } catch (EJBException ex) { LOGGER.log(Level.WARNING, null, ex); @@ -589,7 +589,7 @@ private void verifyProject(Project project, DistributedFileSystemOps dfso) throw } @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) - private Project createProjectDbMetadata(String projectName, Users user, String projectDescription) + private Project createProjectDbMetadata(String projectName, Users user, ProjectDTO projectDTO) throws ProjectException { if (user == null) { throw new IllegalArgumentException("User was not provided."); @@ -605,7 +605,8 @@ private Project createProjectDbMetadata(String projectName, Users user, String p Date now = new Date(); Project project = new Project(projectName, user, now, settings.getDefaultPaymentType()); project.setKafkaMaxNumTopics(settings.getKafkaMaxNumTopics()); - project.setDescription(projectDescription); + project.setDescription(projectDTO.getDescription()); + project.setTopicName(projectDTO.getFeatureStoreTopic()); project.setCreationStatus(CreationStatus.ONGOING); //Persist project object diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectDTO.java index e4b44834e6..99ff5a36d1 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectDTO.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectDTO.java @@ -65,16 +65,11 @@ public class ProjectDTO { private boolean isPreinstalledDockerImage; private boolean isOldDockerImage; private CreationStatus creationStatus; + private String featureStoreTopic; public ProjectDTO() { } - public ProjectDTO(Integer projectId, String projectName, String owner) { - this.projectId = projectId; - this.projectName = projectName; - this.owner = owner; - } - public ProjectDTO(Project project, Long inodeid, List<String> services, List<ProjectTeam> projectTeam, Quotas quotas, boolean isPreinstalledDockerImage, boolean isOldDockerImage) { @@ -91,6 +86,7 @@ public ProjectDTO(Project project, Long inodeid, List<String> services, this.isPreinstalledDockerImage = isPreinstalledDockerImage; this.isOldDockerImage = isOldDockerImage; this.creationStatus = project.getCreationStatus(); + this.featureStoreTopic = project.getTopicName(); } public ProjectDTO(Project project, Long inodeid, List<String> services, @@ -110,20 +106,7 @@ public ProjectDTO(Project project, Long inodeid, List<String> services, this.isPreinstalledDockerImage = isPreinstalledDockerImage; this.isOldDockerImage = isOldDockerImage; this.creationStatus = project.getCreationStatus(); - } - - public ProjectDTO(Integer projectId, String projectName, String owner, - Date created, String description, boolean isPreinstalledDockerImage, - boolean isOldDockerImage, List<String> services, List<ProjectTeam> projectTeam) { - this.projectId = projectId; - this.projectName = projectName; - this.owner = owner; - this.created = created; - this.description = description; - this.isPreinstalledDockerImage = isPreinstalledDockerImage; - this.isOldDockerImage = isOldDockerImage; - this.services = services; - this.projectTeam = projectTeam; + this.featureStoreTopic = project.getTopicName(); } public Integer getProjectId() { @@ -237,6 +220,14 @@ public CreationStatus getCreationStatus() { public void setCreationStatus(CreationStatus creationStatus) { this.creationStatus = creationStatus; } + + public String getFeatureStoreTopic() { + return featureStoreTopic; + } + + public void setFeatureStoreTopic(String topicName) { + this.featureStoreTopic = topicName; + } @Override public String toString() { diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestAvroSchemaConstructorController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestAvroSchemaConstructorController.java index 304e15cf70..1a20ba545b 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestAvroSchemaConstructorController.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/online/TestAvroSchemaConstructorController.java @@ -18,6 +18,9 @@ import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; +import io.hops.hopsworks.persistence.entity.project.Project; import org.apache.avro.Schema; import org.junit.Assert; import org.junit.Before; @@ -157,15 +160,26 @@ public void testToAvroStructException() throws Exception { @Test public void testConstructSchema() throws Exception { + Project project = new Project(); + project.setName("project_name"); + + Featurestore featureStore = new Featurestore(); + featureStore.setProject(project); + + Featuregroup featuregroup = new Featuregroup(); + featuregroup.setName("fg"); + featuregroup.setVersion(1); + featuregroup.setFeaturestore(featureStore); + List<FeatureGroupFeatureDTO> schema = new ArrayList<>(); schema.add(new FeatureGroupFeatureDTO("feature0", "int", "")); schema.add(new FeatureGroupFeatureDTO("feature1", "map<string,array<int>>", "")); schema.add(new FeatureGroupFeatureDTO("feature2", "struct<label:int,value:binary>", "")); - String result = avroSchemaConstructorController.constructSchema("fg", "fs", schema); + String result = avroSchemaConstructorController.constructSchema(featuregroup, schema); Assert.assertEquals("{\n" + " \"type\" : \"record\",\n" + - " \"name\" : \"fg\",\n" + - " \"namespace\" : \"fs\",\n" + + " \"name\" : \"fg_1\",\n" + + " \"namespace\" : \"project_name_featurestore.db\",\n" + " \"fields\" : [ {\n" + " \"name\" : \"feature0\",\n" + " \"type\" : [ \"null\", \"int\" ]\n" + @@ -199,6 +213,17 @@ public void testConstructSchema() throws Exception { @Test public void testConstructSchemaOverlyComplexFeatures() throws Exception { + Project project = new Project(); + project.setName("project_name"); + + Featurestore featureStore = new Featurestore(); + featureStore.setProject(project); + + Featuregroup featuregroup = new Featuregroup(); + featuregroup.setName("fg"); + featuregroup.setVersion(1); + featuregroup.setFeaturestore(featureStore); + List<FeatureGroupFeatureDTO> schema = new ArrayList<>(); schema.add(new FeatureGroupFeatureDTO("feature3", "struct<label1:string,value:struct<label2:string,value2:int>>", "")); @@ -206,11 +231,11 @@ public void testConstructSchemaOverlyComplexFeatures() throws Exception { schema.add(new FeatureGroupFeatureDTO("feature5", "struct<key:string,item1:struct<subkey:string," + "item2:array<int>>>", "")); schema.add(new FeatureGroupFeatureDTO("feature6", "array<struct<test:string,val:int>>", "")); - String result = avroSchemaConstructorController.constructSchema("fg", "fs", schema); + String result = avroSchemaConstructorController.constructSchema(featuregroup, schema); Assert.assertEquals("{\n" + " \"type\" : \"record\",\n" + - " \"name\" : \"fg\",\n" + - " \"namespace\" : \"fs\",\n" + + " \"name\" : \"fg_1\",\n" + + " \"namespace\" : \"project_name_featurestore.db\",\n" + " \"fields\" : [ {\n" + " \"name\" : \"feature3\",\n" + " \"type\" : [ \"null\", {\n" + diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureStoreInputValidation.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureStoreInputValidation.java index 12657216dd..fd275da95a 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureStoreInputValidation.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/utils/TestFeatureStoreInputValidation.java @@ -38,7 +38,7 @@ public void setup() { @Test public void testVerifyUserInputFeatureGroup() throws Exception { - FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO(1, "featurestore", 1, "1wrong_name", 1, "online_topic_name", false); + FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO(1, "featurestore", 1, "1wrong_name", 1, false); // upper case featuregroupDTO.setName("UPPER_CASE"); thrown.expect(FeaturestoreException.class); @@ -62,7 +62,7 @@ public void testVerifyUserInputFeatureGroup() throws Exception { @Test public void testVerifyDescription() throws Exception { - FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO(1, "featurestore", 1, "wrong_name", 1, "online_topic_name", false); + FeaturegroupDTO featuregroupDTO = new FeaturegroupDTO(1, "featurestore", 1, "wrong_name", 1, false); // description is null featuregroupDTO.setDescription(null); diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java index 5e8e341a3a..6d8c54ab0b 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/featurestore/featuregroup/Featuregroup.java @@ -127,6 +127,8 @@ public class Featuregroup implements Serializable { private String eventTime; @Column(name = "online_enabled") private boolean onlineEnabled; + @Column(name = "topic_name") + private String topicName; @Column(name = "deprecated") private boolean deprecated; @NotNull @@ -309,6 +311,14 @@ public void setOnlineEnabled(boolean onlineEnabled) { this.onlineEnabled = onlineEnabled; } + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + public boolean isDeprecated() { return deprecated; } @@ -337,6 +347,7 @@ public boolean equals(Object o) { if (!Objects.equals(streamFeatureGroup, that.streamFeatureGroup)) return false; if (!Objects.equals(eventTime, that.eventTime)) return false; if (!Objects.equals(onlineEnabled, that.onlineEnabled)) return false; + if (!Objects.equals(topicName, that.topicName)) return false; if (!Objects.equals(deprecated, that.deprecated)) return false; if (!Objects.equals(expectationSuite, that.expectationSuite)) return false; return Objects.equals(statisticsConfig, that.statisticsConfig); @@ -358,6 +369,7 @@ public int hashCode() { result = 31 * result + (statisticsConfig != null ? statisticsConfig.hashCode() : 0); result = 31 * result + (eventTime != null ? eventTime.hashCode() : 0); result = 31 * result + (onlineEnabled ? 1: 0); + result = 31 * result + (topicName != null ? topicName.hashCode() : 0); result = 31 * result + (deprecated ? 1: 0); result = 31 * result + (expectationSuite != null ? expectationSuite.hashCode(): 0); return result; diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/kafka/ProjectTopics.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/kafka/ProjectTopics.java index a614f9b4a1..f7fddd6e13 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/kafka/ProjectTopics.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/kafka/ProjectTopics.java @@ -113,7 +113,7 @@ public class ProjectTopics implements Serializable { private Project project; @JoinColumn(name = "subject_id", referencedColumnName = "id") - @ManyToOne(optional = false) + @ManyToOne private Subjects subjects; public ProjectTopics() { diff --git a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java index d8c9f3c56a..37d66c73ea 100644 --- a/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java +++ b/hopsworks-persistence/src/main/java/io/hops/hopsworks/persistence/entity/project/Project.java @@ -193,6 +193,9 @@ public class Project implements Serializable { @Size(max = 255) @Column(name = "docker_image") private String dockerImage; + + @Column(name = "topic_name") + private String topicName; @Basic(optional = false) @NotNull @@ -473,6 +476,14 @@ public void setDockerImage(String dockerImage) { this.dockerImage = dockerImage; } + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + public PythonEnvironment getPythonEnvironment() { return this.pythonEnvironment; } @@ -524,6 +535,7 @@ public String toString() { ", kafkaMaxNumTopics=" + kafkaMaxNumTopics + ", lastQuotaUpdate=" + lastQuotaUpdate + ", dockerImage='" + dockerImage + '\'' + + ", topicName='" + topicName + '\'' + ", creationStatus=" + creationStatus + ", pythonDepCollection=" + pythonDepCollection + ", jupyterProjectCollection=" + jupyterProjectCollection +