Skip to content

Commit

Permalink
[FSTORE-862] Allow naming of Kafka topics for data ingestion (#1533) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks authored Sep 11, 2023
1 parent b5ad739 commit 6a3fa8c
Show file tree
Hide file tree
Showing 25 changed files with 322 additions and 293 deletions.
60 changes: 34 additions & 26 deletions hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2009,13 +2009,12 @@
expect_status_details(201)
expect(parsed_json["onlineEnabled"]).to be true

topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
topic_name = project.projectname + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1)
expect_status_details(200)
end

Expand All @@ -2041,13 +2040,12 @@
expect_status_details(200)
expect(parsed_json["onlineEnabled"]).to be true

topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
topic_name = project.projectname + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1)
expect_status_details(200)
end

Expand All @@ -2074,13 +2072,12 @@
expect(parsed_json["onlineEnabled"]).to be false

# topic should still be there as we currently don't delete it
topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
topic_name = project.projectname + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1)
expect_status_details(200)
end

Expand Down Expand Up @@ -2569,8 +2566,24 @@
expect(parsed_json["featurestoreName"] == project.projectname.downcase + "_featurestore").to be true
expect(parsed_json["name"] == featuregroup_name).to be true
expect(parsed_json["type"] == "cachedFeaturegroupDTO").to be true
expect(parsed_json["onlineTopicName"]).to eql(project.id.to_s + "_" + parsed_json["id"].to_s + "_" +
featuregroup_name + "_" + parsed_json["version"].to_s + "_onlinefs")
expect(parsed_json["onlineTopicName"]).to end_with("_onlinefs")
end

it "should be able to add a cached featuregroup with online feature serving to the featurestore using specified topic" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
topic_name = "topic_#{random_id}_onlinefs"
json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true, topic_name:topic_name)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
expect(parsed_json.key?("id")).to be true
expect(parsed_json.key?("featurestoreName")).to be true
expect(parsed_json.key?("onlineEnabled")).to be true
expect(parsed_json.key?("name")).to be true
expect(parsed_json["featurestoreName"] == project.projectname.downcase + "_featurestore").to be true
expect(parsed_json["name"] == featuregroup_name).to be true
expect(parsed_json["type"] == "cachedFeaturegroupDTO").to be true
expect(parsed_json["onlineTopicName"]).to eql(topic_name)
expect(parsed_json["location"]).to start_with("hopsfs://")
end

Expand All @@ -2580,13 +2593,12 @@
json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id, online:true)
parsed_json = JSON.parse(json_result)
expect_status_details(201)
topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
topic_name = project.projectname + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 1)
get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1)
expect_status_details(200)
end

Expand Down Expand Up @@ -2966,8 +2978,7 @@
json_result = enable_cached_featuregroup_online(project.id, featurestore_id, featuregroup_id)
expect_status_details(200)
parsed_json = JSON.parse(json_result)
expect(parsed_json["onlineTopicName"]).to eql(project.id.to_s + "_" + parsed_json["id"].to_s + "_" +
featuregroup_name + "_" + parsed_json["version"].to_s + "_onlinefs")
expect(parsed_json["onlineTopicName"]).to end_with("_onlinefs")
end

it "should be able to disable online serving for a online cached feature group" do
Expand All @@ -2990,8 +3001,7 @@
expect_status_details(201)
featuregroup_id = parsed_json["id"]
featuregroup_version = parsed_json["version"]
topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
topic_name = project.projectname + "_onlinefs"
disable_cached_featuregroup_online(project.id, featurestore_id, featuregroup_id, featuregroup_version)
expect_status_details(200)
get_project_topics(project.id)
Expand All @@ -3001,10 +3011,10 @@
else
topic = []
end
expect(topic.length).to eq(0)
get_subject_schema(project, topic_name, 1)
expect_status_details(404)
expect(json_body[:error_code]).to eql(40401)
expect(topic.length).to eq(1)
get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 1)
expect_status_details(200)
expect(json_body[:error_code]).to eql(nil)
end

it "should update avro schema when features are appended to existing online feature group" do
Expand Down Expand Up @@ -3037,13 +3047,12 @@
features: new_schema)
expect_status_details(200)

topic_name = project.id.to_s + "_" + parsed_json["id"].to_s + "_" + featuregroup_name + "_" +
parsed_json["version"].to_s + "_onlinefs"
topic_name = project.projectname + "_onlinefs"
get_project_topics(project.id)
expect_status_details(200)
topic = json_body[:items].select{|topic| topic[:name] == topic_name}
expect(topic.length).to eq(1)
get_subject_schema(project, topic[0][:name], 2)
get_subject_schema(project, featuregroup_name + "_" + parsed_json["version"].to_s, 2)
expect_status_details(200)
expect(json_body.to_json).to eql("{\"type\":\"record\",\"name\":\"#{featuregroup_name}_#{parsed_json["version"]}\",\"namespace\":" +
"\"#{project.projectname.downcase}_featurestore.db\",\"fields\":[{\"name\":\"testfeature\"," +
Expand All @@ -3070,7 +3079,6 @@
expect(parsed_json["connectionString"]).to include("jdbc:mysql:")
expect(parsed_json["arguments"].find{ |item| item['name'] == 'password' }.key?('value')).to be true
expect(parsed_json["arguments"].find{ |item| item['name'] == 'user' }.key?('value')).to be true
expect(parsed_json["location"]).to start_with("hopsfs://")
expect_status_details(200)
end
end
Expand Down
20 changes: 12 additions & 8 deletions hopsworks-IT/src/test/ruby/spec/helpers/featurestore_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def create_cached_featuregroup_checked2(project_id, name: nil, version: 1, feat

def create_cached_featuregroup(project_id, featurestore_id, features: nil, featuregroup_name: nil, online:false,
version: 1, featuregroup_description: nil, statistics_config: nil, time_travel_format:
"NONE", event_time: nil, expectation_suite: nil, parents: nil)
"NONE", event_time: nil, expectation_suite: nil, parents: nil, topic_name: nil)
type = "cachedFeaturegroupDTO"
features = features == nil ? [{type: "INT", name: "testfeature", description: "testfeaturedescription",
primary: true, onlineType: "INT", partition: false}] : features
Expand All @@ -110,7 +110,8 @@ def create_cached_featuregroup(project_id, featurestore_id, features: nil, featu
onlineEnabled: online,
timeTravelFormat: time_travel_format,
eventTime: event_time,
parents: parents
parents: parents,
topicName: topic_name
}
unless statistics_config == nil
json_data[:statisticsConfig] = statistics_config
Expand All @@ -128,7 +129,7 @@ def create_cached_featuregroup(project_id, featurestore_id, features: nil, featu
def create_stream_featuregroup(project_id, featurestore_id, features: nil, featuregroup_name: nil,
version: 1, featuregroup_description: nil, statistics_config: nil,
event_time: nil, deltaStreamerJobConf: nil, materialize_offline: false,
commit_time: nil, online_enabled: true)
commit_time: nil, online_enabled: true, topic_name: nil)
type = "streamFeatureGroupDTO"
featuregroupType = "STREAM_FEATURE_GROUP"
features = features == nil ? [{type: "INT", name: "testfeature", description: "testfeaturedescription",
Expand All @@ -146,7 +147,8 @@ def create_stream_featuregroup(project_id, featurestore_id, features: nil, featu
featuregroupType: featuregroupType,
eventTime: event_time,
deltaStreamerJobConf: deltaStreamerJobConf,
onlineEnabled: online_enabled
onlineEnabled: online_enabled,
topicName: topic_name
}
unless statistics_config == nil
json_data[:statisticsConfig] = statistics_config
Expand Down Expand Up @@ -199,7 +201,7 @@ def materialize_stream_featuregroup(featurestore_id, featuregroup_id, commit_tim

def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId, name: nil, version: 1, query: nil,
features: nil, data_format: nil, options: nil, event_time: nil,
online_enabled: false)
online_enabled: false, topic_name: nil)
type = "onDemandFeaturegroupDTO"
featuregroupType = "ON_DEMAND_FEATURE_GROUP"
create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/#{project_id}/featurestores/#{featurestore_id}/featuregroups"
Expand All @@ -221,7 +223,8 @@ def create_on_demand_featuregroup(project_id, featurestore_id, jdbcconnectorId,
query: query,
featuregroupType: featuregroupType,
eventTime: event_time,
onlineEnabled: online_enabled
onlineEnabled: online_enabled,
topicName: topic_name
}

unless data_format == nil
Expand Down Expand Up @@ -841,7 +844,7 @@ def get_featurestore_tour_job_name
return "featurestore_tour_job"
end

def create_cached_featuregroup_with_partition(project_id, featurestore_id, time_travel_format: "NONE", online: false)
def create_cached_featuregroup_with_partition(project_id, featurestore_id, time_travel_format: "NONE", online: false, topic_name: nil)
type = "cachedFeaturegroupDTO"
featuregroupType = "CACHED_FEATURE_GROUP"
create_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project_id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups"
Expand Down Expand Up @@ -871,7 +874,8 @@ def create_cached_featuregroup_with_partition(project_id, featurestore_id, time_
type: type,
featuregroupType: featuregroupType,
timeTravelFormat: time_travel_format,
onlineEnabled: online
onlineEnabled: online,
topicName: topic_name
}
json_data = json_data.to_json
json_result = post create_featuregroup_endpoint, json_data
Expand Down
Loading

0 comments on commit 6a3fa8c

Please sign in to comment.