Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into FSTORE-1436
Browse files Browse the repository at this point in the history
  • Loading branch information
davitbzh committed Dec 9, 2024
2 parents 9b168fb + ad441ee commit a2e3c50
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
Expand Down Expand Up @@ -113,6 +115,7 @@ public HopsworksExternalClient(CloseableHttpClient httpClient, HttpHost httpHost
httpClient = HttpClients.custom()
.setConnectionManager(connectionPool)
.setKeepAliveStrategy((httpResponse, httpContext) -> 30 * 1000)
.setDefaultRequestConfig(RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).build())
.build();

if (!Strings.isNullOrEmpty(apiKeyValue)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
Expand Down Expand Up @@ -103,6 +105,7 @@ public HopsworksInternalClient() throws IOException, KeyStoreException, Certific
httpClient = HttpClients.custom()
.setConnectionManager(connectionPool)
.setKeepAliveStrategy((httpResponse, httpContext) -> 30 * 1000)
.setDefaultRequestConfig(RequestConfig.custom().setCookieSpec(CookieSpecs.STANDARD).build())
.build();

certKey = HopsworksHttpClient.readCertKey(MATERIAL_PASSWD);
Expand Down
13 changes: 8 additions & 5 deletions utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,22 +301,25 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
.option("includeHeaders", "true")
.option("failOnDataLoss", "false")
.load()
.limit(5000000)
)

# filter only the necassary entries
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id))
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"]))
filtered_df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id))
filtered_df = filtered_df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"]))

# limit the number of records ingested
limit = job_conf.get("write_options", {}).get("job_limit", 5000000)
filtered_df = filtered_df.limit(limit)

# deserialize dataframe so that it can be properly saved
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df)
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, filtered_df)

# insert data
entity.stream = False # to make sure we dont write to kafka
entity.insert(deserialized_df, storage="offline")

# update offsets
df_offsets = df.groupBy('partition').agg(max('offset').alias('offset')).collect()
df_offsets = (df if limit > filtered_df.count() else filtered_df).groupBy('partition').agg(max('offset').alias('offset')).collect()
offset_dict = json.loads(offset_string)
for offset_row in df_offsets:
offset_dict[f"{entity._online_topic_name}"][f"{offset_row.partition}"] = offset_row.offset + 1
Expand Down

0 comments on commit a2e3c50

Please sign in to comment.