Skip to content

Commit

Permalink
refactor to be a feature_store_activity
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Nov 25, 2024
1 parent 807aff4 commit 035b953
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.logicalclocks.hsfs.constructor.QueryBase;
import com.logicalclocks.hsfs.engine.FeatureGroupEngineBase;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.IngestionRunApi;
import com.logicalclocks.hsfs.metadata.OnlineIngestionApi;
import com.logicalclocks.hsfs.metadata.Statistics;
import com.logicalclocks.hsfs.metadata.Subject;
import com.logicalclocks.hsfs.metadata.User;
Expand Down Expand Up @@ -545,8 +545,8 @@ public Schema getDeserializedAvroSchema() throws FeatureStoreException, IOExcept
}

@JsonIgnore
public IngestionRun getLatestIngestionRun() throws IOException, FeatureStoreException {
return new IngestionRunApi().getIngestionRun(this, new HashMap<String, String>() {{
public OnlineIngestion getLatestOnlineIngestion() throws IOException, FeatureStoreException {
return new OnlineIngestionApi().getOnlineIngestion(this, new HashMap<String, String>() {{
put("filter_by", "LATEST");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

@NoArgsConstructor
@AllArgsConstructor
public class IngestionRun extends RestDto<IngestionRun> {
public class OnlineIngestion extends RestDto<OnlineIngestion> {

@Getter
@Setter
Expand All @@ -49,7 +49,7 @@ public class IngestionRun extends RestDto<IngestionRun> {
@Getter
private Integer processedEntries;

public IngestionRun(String startingOffsets, String endingOffsets) {
public OnlineIngestion(String startingOffsets, String endingOffsets) {
this.startingOffsets = startingOffsets;
this.endingOffsets = endingOffsets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public class FeatureGroupApi {
public static final String FEATURE_GROUP_COMMIT_PATH = FEATURE_GROUP_ID_PATH
+ "/commits{?filter_by,sort_by,offset,limit}";
public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear";
public static final String FEATURE_GROUP_INGESTION_RUN = FEATURE_GROUP_ID_PATH
+ "/ingestionrun{?filter_by,sort_by,offset,limit}";

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.damnhandy.uri.template.UriTemplate;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.IngestionRun;
import com.logicalclocks.hsfs.OnlineIngestion;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.slf4j.Logger;
Expand All @@ -29,20 +29,20 @@
import java.io.IOException;
import java.util.Map;

public class IngestionRunApi {
public class OnlineIngestionApi {

public static final String FEATURE_GROUP_ID_PATH = "/featuregroups/{fgId}";
public static final String FEATURE_GROUP_INGESTION_RUN = FEATURE_GROUP_ID_PATH
+ "/ingestionrun{?filter_by,sort_by,offset,limit}";
public static final String FEATURE_GROUP_ONLINE_INGESTION = FEATURE_GROUP_ID_PATH
+ "/online_ingestion{?filter_by,sort_by,offset,limit}";

private static final Logger LOGGER = LoggerFactory.getLogger(IngestionRunApi.class);
private static final Logger LOGGER = LoggerFactory.getLogger(OnlineIngestionApi.class);

public void saveIngestionRun(FeatureGroupBase featureGroupBase, IngestionRun ingestionRun)
public void saveOnlineIngestion(FeatureGroupBase featureGroupBase, OnlineIngestion onlineIngestion)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ FEATURE_GROUP_INGESTION_RUN;
+ FEATURE_GROUP_ONLINE_INGESTION;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", hopsworksClient.getProject().getProjectId())
Expand All @@ -51,16 +51,16 @@ public void saveIngestionRun(FeatureGroupBase featureGroupBase, IngestionRun ing
.expand();

HttpPost postRequest = new HttpPost(uri);
postRequest.setEntity(hopsworksClient.buildStringEntity(ingestionRun));
postRequest.setEntity(hopsworksClient.buildStringEntity(onlineIngestion));
hopsworksClient.handleRequest(postRequest);
}

public IngestionRun getIngestionRun(FeatureGroupBase featureGroupBase, Map<String, String> queryParams)
public OnlineIngestion getOnlineIngestion(FeatureGroupBase featureGroupBase, Map<String, String> queryParams)
throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = HopsworksClient.PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ FEATURE_GROUP_INGESTION_RUN;
+ FEATURE_GROUP_ONLINE_INGESTION;

UriTemplate uriTemplate = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", hopsworksClient.getProject().getProjectId())
Expand All @@ -76,6 +76,6 @@ public IngestionRun getIngestionRun(FeatureGroupBase featureGroupBase, Map<Stri
String uri = uriTemplate.expand();

LOGGER.info("Sending metadata request: " + uri);
return hopsworksClient.handleRequest(new HttpGet(uri), IngestionRun.class);
return hopsworksClient.handleRequest(new HttpGet(uri), OnlineIngestion.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import com.logicalclocks.hsfs.metadata.DatasetApi;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.HopsworksExternalClient;
import com.logicalclocks.hsfs.metadata.IngestionRunApi;
import com.logicalclocks.hsfs.metadata.OnlineIngestionApi;
import com.logicalclocks.hsfs.spark.constructor.Query;
import com.logicalclocks.hsfs.spark.engine.hudi.HudiEngine;
import com.logicalclocks.hsfs.DataFormat;
import com.logicalclocks.hsfs.Feature;
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.IngestionRun;
import com.logicalclocks.hsfs.OnlineIngestion;
import com.logicalclocks.hsfs.Split;
import com.logicalclocks.hsfs.StorageConnector;
import com.logicalclocks.hsfs.TimeTravelFormat;
Expand Down Expand Up @@ -132,7 +132,7 @@ public class SparkEngine extends EngineBase {
private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();
protected FeatureGroupApi featureGroupApi = new FeatureGroupApi();
protected IngestionRunApi ingestionRunApi = new IngestionRunApi();
protected OnlineIngestionApi onlineIngestionApi = new OnlineIngestionApi();
private final KafkaEngine kafkaEngine;

private static SparkEngine INSTANCE = null;
Expand Down Expand Up @@ -576,7 +576,7 @@ public void writeOnlineDataframe(FeatureGroupBase featureGroupBase, Dataset<Row>

String endingCheckPoint = kafkaEngine.kafkaGetOffsets(featureGroupBase, writeOptions, true);

ingestionRunApi.saveIngestionRun(featureGroupBase, new IngestionRun(startingCheckPoint, endingCheckPoint));
onlineIngestionApi.saveOnlineIngestion(featureGroupBase, new OnlineIngestion(startingCheckPoint, endingCheckPoint));
}

public <S> StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase, Dataset<Row> dataset,
Expand Down Expand Up @@ -637,7 +637,8 @@ public void onQueryProgress(QueryProgressEvent queryProgress) {
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
try {
String endingCheckPoint = kafkaEngine.kafkaGetOffsets(featureGroupBase, writeOptions, true);
ingestionRunApi.saveIngestionRun(featureGroupBase, new IngestionRun(startingCheckPoint, endingCheckPoint));
onlineIngestionApi.saveOnlineIngestion(featureGroupBase,
new OnlineIngestion(startingCheckPoint, endingCheckPoint));
} catch (Exception e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from tqdm.auto import tqdm


class IngestionRun:
class OnlineIngestion:
"""
Metadata object used to provide Ingestion Run information for a feature group.
Metadata object used to provide Online Ingestion information for a feature group.
"""

def __init__(
Expand All @@ -47,7 +47,7 @@ def __init__(
self._processed_entries = processed_entries

@classmethod
def from_response_json(cls, json_dict: Dict[str, Any]) -> "IngestionRun":
def from_response_json(cls, json_dict: Dict[str, Any]) -> "OnlineIngestion":
if json_dict is None:
return None

Expand All @@ -63,13 +63,13 @@ def from_response_json(cls, json_dict: Dict[str, Any]) -> "IngestionRun":
return None

def refresh(self):
from hsfs.core.ingestion_run_api import IngestionRunApi
from python.hsfs.core.online_ingestion_api import OnlineIngestionApi

ingestion_run = IngestionRunApi().get_ingestion_run(
online_ingestion = OnlineIngestionApi().get_online_ingestion(
self.feature_group,
query_params={"filter_by": f"ID:{self.id}"}
)
self.__dict__.update(ingestion_run.__dict__)
self.__dict__.update(online_ingestion.__dict__)

def to_dict(self):
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

from hopsworks_common import client
from hsfs import feature_group as fg_mod
from hsfs.core import ingestion_run
from hsfs.core import online_ingestion


class IngestionRunApi:
class OnlineIngestionApi:

def save_ingestion_run(
def save_online_ingestion(
self,
feature_group_instance: fg_mod.FeatureGroup,
ingestion_run_instance: ingestion_run.IngestionRun,
online_ingestion_instance: online_ingestion.OnlineIngestion,
):
_client = client.get_instance()
path_params = [
Expand All @@ -35,13 +35,13 @@ def save_ingestion_run(
feature_group_instance.feature_store_id,
"featuregroups",
feature_group_instance.id,
"ingestionrun",
"online_ingestion",
]

headers = {"content-type": "application/json"}
_client._send_request("POST", path_params, headers=headers, data=ingestion_run_instance.json())
_client._send_request("POST", path_params, headers=headers, data=online_ingestion_instance.json())

def get_ingestion_run(
def get_online_ingestion(
self,
feature_group_instance: fg_mod.FeatureGroup,
query_params: None,
Expand All @@ -54,11 +54,11 @@ def get_ingestion_run(
feature_group_instance.feature_store_id,
"featuregroups",
feature_group_instance.id,
"ingestionrun",
"online_ingestion",
]

ingestion_run_instance = ingestion_run.IngestionRun.from_response_json(
online_ingestion_instance = online_ingestion.OnlineIngestion.from_response_json(
_client._send_request("GET", path_params, query_params)
)
ingestion_run_instance.feature_group = feature_group_instance
return ingestion_run_instance
online_ingestion_instance.feature_group = feature_group_instance
return online_ingestion_instance
8 changes: 4 additions & 4 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@
feature_group_api,
feature_view_api,
ingestion_job_conf,
ingestion_run,
ingestion_run_api,
job,
job_api,
kafka_engine,
online_ingestion,
online_ingestion_api,
statistics_api,
storage_connector_api,
training_dataset_api,
Expand Down Expand Up @@ -1504,9 +1504,9 @@ def _write_dataframe_kafka(
high=True,
)

ingestion_run_api.IngestionRunApi().save_ingestion_run(
online_ingestion_api.OnlineIngestionApi().save_online_ingestion(
feature_group,
ingestion_run.IngestionRun(
online_ingestion.OnlineIngestion(
starting_offsets=initial_check_point,
ending_offsets=ending_check_point
)
Expand Down
10 changes: 5 additions & 5 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import tzlocal
from hopsworks_common.core.constants import HAS_NUMPY, HAS_PANDAS
from hsfs.constructor import query
from hsfs.core import feature_group_api, ingestion_run, ingestion_run_api
from hsfs.core import feature_group_api, online_ingestion, online_ingestion_api

# in case importing in %%local
from hsfs.core.vector_db_client import VectorDbClient
Expand Down Expand Up @@ -559,9 +559,9 @@ def on_finished():
high=True,
)

ingestion_run_api.IngestionRunApi().save_ingestion_run(
online_ingestion_api.OnlineIngestionApi().save_online_ingestion(
feature_group,
ingestion_run.IngestionRun(
online_ingestion.OnlineIngestion(
starting_offsets=starting_check_point,
ending_offsets=ending_check_point
)
Expand Down Expand Up @@ -647,9 +647,9 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options):
high=True,
)

ingestion_run_api.IngestionRunApi().save_ingestion_run(
online_ingestion_api.OnlineIngestionApi().save_online_ingestion(
feature_group,
ingestion_run.IngestionRun(
online_ingestion.OnlineIngestion(
starting_offsets=starting_check_point,
ending_offsets=ending_check_point
)
Expand Down
8 changes: 4 additions & 4 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@
feature_monitoring_result_engine,
feature_store_api,
great_expectation_engine,
ingestion_run,
ingestion_run_api,
job_api,
online_ingestion,
online_ingestion_api,
spine_group_engine,
statistics_engine,
validation_report_engine,
Expand Down Expand Up @@ -1801,8 +1801,8 @@ def feature_store(self) -> feature_store_mod.FeatureStore:
)
return self._feature_store

def get_latest_ingestion_run(self) -> ingestion_run.IngestionRun:
return ingestion_run_api.IngestionRunApi().get_ingestion_run(self, query_params={"filter_by": "LATEST"})
def get_latest_online_ingestion(self) -> online_ingestion.OnlineIngestion:
return online_ingestion_api.OnlineIngestionApi().get_online_ingestion(self, query_params={"filter_by": "LATEST"})

@feature_store.setter
def feature_store(self, feature_store: feature_store_mod.FeatureStore) -> None:
Expand Down
Loading

0 comments on commit 035b953

Please sign in to comment.