Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1580] OnlineFS Observability #435

Merged
merged 36 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f936bc6
temp
bubriks Dec 3, 2024
72fbbf9
standardize headers
bubriks Dec 10, 2024
9d546d5
Merge branch 'main' into FSTORE-1580-new
bubriks Dec 10, 2024
1a5c1fe
lint
bubriks Dec 10, 2024
946b80c
some test fix
bubriks Dec 10, 2024
6e47085
working on tests
bubriks Dec 10, 2024
2bd22a9
add unit test for get_headers
bubriks Dec 13, 2024
f7b4800
ruff fix
bubriks Dec 13, 2024
f5b1206
add wait_for_online_ingestion
bubriks Dec 13, 2024
b80c0de
small rename
bubriks Dec 13, 2024
1f83a73
add timeout
bubriks Dec 17, 2024
0446791
fix B006
bubriks Dec 17, 2024
51196b6
Merge branch 'main' into FSTORE-1580-new
bubriks Dec 18, 2024
55c37eb
test fix
bubriks Dec 18, 2024
03847e3
feedback fix
bubriks Dec 20, 2024
22162b2
Merge branch 'main' into FSTORE-1580-new
bubriks Dec 20, 2024
6a96b06
fix
bubriks Dec 20, 2024
a87a4bb
fix lint
bubriks Dec 23, 2024
cf4208f
remove info
bubriks Dec 30, 2024
3253104
id change
bubriks Jan 3, 2025
779fb28
allow for nullable OnlineIngestion num_entries
bubriks Jan 6, 2025
0c4a82e
support for java client (not tested/finished)
bubriks Jan 6, 2025
270b4ea
long to Long
bubriks Jan 6, 2025
4261040
some java fixes
bubriks Jan 6, 2025
8f4289d
Merge branch 'main' into FSTORE-1580-new
bubriks Jan 7, 2025
7bf9ce7
if timeout is 0 we will wait indefinitely
bubriks Jan 7, 2025
e30f2d5
get latest fix for java
bubriks Jan 7, 2025
2144292
rows ignored
bubriks Jan 9, 2025
6bee253
dont create online ingestion for offline fg
bubriks Jan 13, 2025
04bc2a7
remove currentOffsets
bubriks Jan 15, 2025
3b1d03c
update online ingestion result
bubriks Jan 16, 2025
b79f29d
Merge branch 'main' into FSTORE-1580-new
bubriks Jan 16, 2025
45a99a2
small fix
bubriks Jan 17, 2025
1e2f997
get online ingestion by id
bubriks Jan 17, 2025
15525b1
ruff fix
bubriks Jan 23, 2025
19d8585
Merge branch 'main' into FSTORE-1580-new
bubriks Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;

import lombok.NonNull;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -46,13 +48,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -64,7 +64,7 @@ public class BeamProducer extends PTransform<@NonNull PCollection<Row>, @NonNull
private transient Schema encodedSchema;
private Map<String, Schema> deserializedComplexFeatureSchemas;
private List<String> primaryKeys;
private final Map<String, byte[]> headerMap = new HashMap<>();
private final Map<String, byte[]> headerMap;

public BeamProducer(String topic, Map<String, String> properties, Schema schema, Schema encodedSchema,
Map<String, Schema> deserializedComplexFeatureSchemas, List<String> primaryKeys,
Expand All @@ -75,12 +75,7 @@ public BeamProducer(String topic, Map<String, String> properties, Schema schema,
this.properties = properties;
this.deserializedComplexFeatureSchemas = deserializedComplexFeatureSchemas;
this.primaryKeys = primaryKeys;

headerMap.put("projectId",
String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8));
headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8));
headerMap.put("subjectId",
String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8));
this.headerMap = FeatureGroupUtils.getHeaders(streamFeatureGroup, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package com.logicalclocks.hsfs.flink.engine;

import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
Expand All @@ -32,25 +34,19 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class KafkaRecordSerializer implements KafkaRecordSerializationSchema<GenericRecord> {

private final String topic;
private final List<String> primaryKeys;
private final Map<String, byte[]> headerMap = new HashMap<>();
private final Map<String, byte[]> headerMap;

KafkaRecordSerializer(StreamFeatureGroup streamFeatureGroup) throws FeatureStoreException, IOException {
this.topic = streamFeatureGroup.getOnlineTopicName();
this.primaryKeys = streamFeatureGroup.getPrimaryKeys();

headerMap.put("projectId",
String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8));
headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8));
headerMap.put("subjectId",
String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8));
this.headerMap = FeatureGroupUtils.getHeaders(streamFeatureGroup, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.logicalclocks.hsfs.constructor.QueryBase;
import com.logicalclocks.hsfs.engine.FeatureGroupEngineBase;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.OnlineIngestionApi;
import com.logicalclocks.hsfs.metadata.Statistics;
import com.logicalclocks.hsfs.metadata.Subject;
import com.logicalclocks.hsfs.metadata.User;
Expand Down Expand Up @@ -153,6 +154,7 @@ public abstract class FeatureGroupBase<T> {

protected FeatureGroupEngineBase featureGroupEngineBase = new FeatureGroupEngineBase();
protected FeatureGroupUtils utils = new FeatureGroupUtils();
protected OnlineIngestionApi onlineIngestionApi = new OnlineIngestionApi();

protected static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupBase.class);

Expand Down Expand Up @@ -543,5 +545,14 @@ public Schema getDeserializedAvroSchema() throws FeatureStoreException, IOExcept
return utils.getDeserializedAvroSchema(getAvroSchema());
}

@JsonIgnore
public OnlineIngestion getLatestOnlineIngestion() throws FeatureStoreException, IOException {
return onlineIngestionApi.getOnlineIngestion(this, "filter_by=LATEST").get(0);
}

@JsonIgnore
public OnlineIngestion getOnlineIngestion(Integer id) throws FeatureStoreException, IOException {
return onlineIngestionApi.getOnlineIngestion(this, "filter_by=ID:" + id).get(0);
}

}
101 changes: 101 additions & 0 deletions java/hsfs/src/main/java/com/logicalclocks/hsfs/OnlineIngestion.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2025. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.logicalclocks.hsfs.metadata.RestDto;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
@AllArgsConstructor
public class OnlineIngestion extends RestDto<OnlineIngestion> {

protected static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestion.class);

@Getter
@Setter
private Integer id;

@Getter
@Setter
private Long numEntries;

@Getter
@Setter
private List<OnlineIngestionResult> results;

@Getter
@Setter
private FeatureGroupBase featureGroup;

public OnlineIngestion(long numEntries) {
this.numEntries = numEntries;
}

public void refresh() throws FeatureStoreException, IOException {
OnlineIngestion onlineIngestion = featureGroup.getOnlineIngestion(id);

// Method to copy data from another object
this.id = onlineIngestion.id;
this.numEntries = onlineIngestion.numEntries;
this.results = onlineIngestion.results;
this.featureGroup = onlineIngestion.featureGroup;
}

public void waitForCompletion(int timeout, int period)
throws InterruptedException, FeatureStoreException, IOException {
long startTime = System.currentTimeMillis();

// Convert to milliseconds
timeout = timeout * 1000;
period = period * 1000;

while (true) {
// Get total number of rows processed
long rowsProcessed = results.stream().collect(Collectors.summarizingLong(o -> o.getRows())).getSum();

// Check if the online ingestion is complete
if (numEntries != null && rowsProcessed >= numEntries) {
break;
}

// Check if the timeout has been reached (if timeout is 0 we will wait indefinitely)
if (timeout != 0 && System.currentTimeMillis() - startTime > timeout) {
LOGGER.warn("Timeout of " + timeout
+ " was exceeded while waiting for online ingestion completion.");
break;
}

// Sleep for the specified period in seconds
Thread.sleep(period);

refresh();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2025. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
@AllArgsConstructor
public class OnlineIngestionResult {

@Getter
@Setter
private Integer onlineIngestionId;

@Getter
@Setter
private String status;

@Getter
@Setter
private Long rows;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.KafkaApi;
import com.logicalclocks.hsfs.metadata.OnlineIngestionApi;
import com.logicalclocks.hsfs.metadata.Subject;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureGroupCommit;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.OnlineIngestion;

import lombok.SneakyThrows;
import org.apache.avro.Schema;
Expand All @@ -34,6 +36,7 @@
import scala.collection.Seq;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -247,4 +250,25 @@ public String getDatasetType(String path) {
}
return "DATASET";
}

public static Map<String, byte[]> getHeaders(FeatureGroupBase featureGroup, Long numEntries)
throws FeatureStoreException, IOException {
Map<String, byte[]> headerMap = new HashMap<>();

headerMap.put("projectId",
String.valueOf(featureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8));
headerMap.put("featureGroupId", String.valueOf(featureGroup.getId()).getBytes(StandardCharsets.UTF_8));
headerMap.put("subjectId",
String.valueOf(featureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8));

if (featureGroup.getOnlineEnabled()) {
OnlineIngestion onlineIngestion = new OnlineIngestionApi()
.createOnlineIngestion(featureGroup, new OnlineIngestion(numEntries));

headerMap.put("onlineIngestionId",
String.valueOf(onlineIngestion.getId()).getBytes(StandardCharsets.UTF_8));
}

return headerMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2025. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs.metadata;

import com.damnhandy.uri.template.UriTemplate;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.OnlineIngestion;

import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

public class OnlineIngestionApi {

private static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestionApi.class);

public static final String ONLINE_INGESTION_PATH = "/featuregroups/{fgId}/online_ingestion";

public OnlineIngestion createOnlineIngestion(FeatureGroupBase featureGroup, OnlineIngestion onlineIngestion)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ ONLINE_INGESTION_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", hopsworksClient.getProject().getProjectId())
.set("fsId", featureGroup.getFeatureStore().getId())
.set("fgId", featureGroup.getId())
.expand();

HttpPost postRequest = new HttpPost(uri);
postRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
postRequest.setEntity(hopsworksClient.buildStringEntity(onlineIngestion));

LOGGER.info("Sending metadata request: " + uri);

onlineIngestion = hopsworksClient.handleRequest(postRequest, OnlineIngestion.class);
onlineIngestion.setFeatureGroup(featureGroup);
return onlineIngestion;
}

public List<OnlineIngestion> getOnlineIngestion(FeatureGroupBase featureGroup, String queryParameters)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ ONLINE_INGESTION_PATH;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", hopsworksClient.getProject().getProjectId())
.set("fsId", featureGroup.getFeatureStore().getId())
.set("fgId", featureGroup.getId())
.expand() + "?" + queryParameters;

LOGGER.debug("Sending metadata request: " + uri);

OnlineIngestion onlineIngestion = hopsworksClient.handleRequest(new HttpGet(uri), OnlineIngestion.class);
for (OnlineIngestion ingestion : onlineIngestion.getItems()) {
ingestion.setFeatureGroup(featureGroup);
}
return onlineIngestion.getItems();
}
}
Loading
Loading