diff --git a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb
index c036a1d61e..e73ed05cdc 100644
--- a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb
+++ b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb
@@ -21,9 +21,15 @@
describe "cached feature groups" do
context 'with valid project, featurestore service enabled' do
before :all do
+ @enable_flyingduck = getVar("enable_flyingduck")
+
with_valid_project
end
+ after :all do
+ setVar("enable_flyingduck", @enable_flyingduck[:value])
+ end
+
it "should be able to add a offline cached featuregroup to the featurestore" do
project = get_project
featurestore_id = get_featurestore_id(project.id)
@@ -247,8 +253,25 @@
expect(parsed_json["features"].first["description"]).to eql("this description contains ' and ;'")
end
- it "should be able to preview a offline cached featuregroup in the featurestore" do
+ it "should be able to preview a offline cached featuregroup in the featurestore using hive" do
+ project = get_project
+ setVar("enable_flyingduck", "false")
+ create_session(project[:username], "Pass123")
+ featurestore_id = get_featurestore_id(project.id)
+ json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id)
+ parsed_json = JSON.parse(json_result)
+ expect_status_details(201)
+ featuregroup_id = parsed_json["id"]
+ preview_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s + "/preview"
+ get preview_featuregroup_endpoint
+ parsed_json = JSON.parse(response.body)
+ expect_status_details(200)
+ end
+
+ it "should be able to preview a offline cached featuregroup in the featurestore using flying duck" do
project = get_project
+ setVar("enable_flyingduck", "true")
+ create_session(project[:username], "Pass123")
featurestore_id = get_featurestore_id(project.id)
json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id)
parsed_json = JSON.parse(json_result)
diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java
index a29e43538c..6612e48c30 100644
--- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java
+++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java
@@ -109,7 +109,6 @@ public Response getPreview(@BeanParam FeatureGroupPreviewBeanParam featureGroupP
}
PreviewDTO previewDTO = previewBuilder.build(uriInfo, user, project, featuregroup,
- featureGroupPreviewBeanParam.getPartition(),
online,
featureGroupPreviewBeanParam.getLimit() == null ? 20 : featureGroupPreviewBeanParam.getLimit());
diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java
index fd56c4cdac..b0008f8dda 100644
--- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java
+++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java
@@ -57,12 +57,12 @@ private URI uri(UriInfo uriInfo, Project project, Featuregroup featuregroup) {
}
public PreviewDTO build(UriInfo uriInfo, Users user, Project project, Featuregroup featuregroup,
- String partition, boolean online, int limit)
+ boolean online, int limit)
throws FeaturestoreException, HopsSecurityException {
FeaturegroupPreview preview;
try {
- preview = featuregroupController.getFeaturegroupPreview(featuregroup, project, user, partition, online, limit);
+ preview = featuregroupController.getFeaturegroupPreview(featuregroup, project, user, online, limit);
} catch (SQLException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_PREVIEW_FEATUREGROUP,
Level.SEVERE, "Feature Group id: " + featuregroup.getId(), e.getMessage(), e);
diff --git a/hopsworks-common/pom.xml b/hopsworks-common/pom.xml
index 285504f698..2a80ca1349 100644
--- a/hopsworks-common/pom.xml
+++ b/hopsworks-common/pom.xml
@@ -248,6 +248,15 @@
junit
junit
+
+ org.apache.arrow
+ flight-core
+
+
+ org.projectlombok
+ lombok
+ ${projectlombok.version}
+
@@ -260,4 +269,4 @@
-
\ No newline at end of file
+
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java
new file mode 100644
index 0000000000..aa5850e64d
--- /dev/null
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java
@@ -0,0 +1,51 @@
+/*
+ * This file is part of Hopsworks
+ * Copyright (C) 2024, Hopsworks AB. All rights reserved
+ *
+ * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
+ * the GNU Affero General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License along with this program.
+ * If not, see .
+ */
+
+package io.hops.hopsworks.common.arrowflight;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import java.util.Map;
+
+@AllArgsConstructor
+@NoArgsConstructor
+public class ArrowFlightConnectorDTO {
+
+ @Getter
+ @Setter
+ @JsonProperty(value = "type")
+ private String type;
+ @Getter
+ @Setter
+ @JsonProperty(value = "options")
+ private Map options;
+ @Getter
+ @Setter
+ @JsonProperty(value = "query")
+ private String query;
+ @Getter
+ @Setter
+ @JsonProperty(value = "alias")
+ private String alias;
+ @Getter
+ @Setter
+ @JsonProperty(value = "filters")
+ private String filters;
+
+}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java
new file mode 100644
index 0000000000..2cb8486923
--- /dev/null
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java
@@ -0,0 +1,316 @@
+/*
+ * This file is part of Hopsworks
+ * Copyright (C) 2024, Hopsworks AB. All rights reserved
+ *
+ * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
+ * the GNU Affero General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License along with this program.
+ * If not, see .
+ */
+
+package io.hops.hopsworks.common.arrowflight;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.logicalclocks.servicediscoverclient.service.Service;
+import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
+import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview;
+import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
+import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil;
+import io.hops.hopsworks.common.hosts.ServiceDiscoveryController;
+import io.hops.hopsworks.common.project.AccessCredentialsDTO;
+import io.hops.hopsworks.common.project.ProjectController;
+import io.hops.hopsworks.exceptions.FeaturestoreException;
+import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
+import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType;
+import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.bigquery.FeatureStoreBigqueryConnector;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.snowflake.FeaturestoreSnowflakeConnector;
+import io.hops.hopsworks.persistence.entity.project.Project;
+import io.hops.hopsworks.persistence.entity.user.Users;
+import io.hops.hopsworks.restutils.RESTCodes;
+import io.hops.hopsworks.servicediscovery.HopsworksService;
+import io.hops.hopsworks.servicediscovery.tags.FlyingDuckTags;
+import org.apache.arrow.flight.Action;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.table.Row;
+import org.apache.arrow.vector.table.Table;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.javatuples.Pair;
+
+import javax.ejb.EJB;
+import javax.ejb.Stateless;
+import javax.ejb.TransactionAttribute;
+import javax.ejb.TransactionAttributeType;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.stream.Collectors;
+
+/**
+ * Class controlling the interaction with the arrow flight server
+ */
+@Stateless
+@TransactionAttribute(TransactionAttributeType.NEVER)
+public class ArrowFlightController {
+
+ @EJB
+ protected StorageConnectorUtil storageConnectorUtil;
+ @EJB
+ private ProjectController projectController;
+ @EJB
+ protected ServiceDiscoveryController serviceDiscoveryController;
+ @EJB
+ protected FeaturegroupController featuregroupController;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ /**
+ * Initializes an Arrow Flight connection to Flying Duck using TLS with the given access credentials
+ *
+ * @param project the project that owns the Hive database
+ * @param user the user making the request
+ * @return FlightClient
+ * @throws FeaturestoreException
+ */
+ private FlightClient initFlightClient(Project project, Users user)
+ throws FeaturestoreException, InterruptedException {
+ FlightClient flightClient = null;
+ try {
+ AccessCredentialsDTO accessCredentialsDTO = projectController.credentials(project, user);
+
+ InputStream caChainInputStream =
+ new ByteArrayInputStream(accessCredentialsDTO.getCaChain().getBytes(StandardCharsets.UTF_8));
+ InputStream clientCertInputStream =
+ new ByteArrayInputStream(accessCredentialsDTO.getClientCert().getBytes(StandardCharsets.UTF_8));
+ InputStream clientKeyInputStream =
+ new ByteArrayInputStream(accessCredentialsDTO.getClientKey().getBytes(StandardCharsets.UTF_8));
+
+ // Flyingduck port is exposed as server.flyingduck.service.consul, however flyingduck is quite picky
+ // when it comes to certificates and it requires the hostname to be flyingduck.service.consul
+ // so here we fetch the port from the service discovery and then we build the rest of the name
+ Service flyingduckService = serviceDiscoveryController
+ .getAnyAddressOfServiceWithDNS(HopsworksService.FLYING_DUCK.getNameWithTag(FlyingDuckTags.server));
+ String flyingduckEndpoing = serviceDiscoveryController
+ .constructServiceFQDN(HopsworksService.FLYING_DUCK.getName()) + ":" + flyingduckService.getPort();
+
+ flightClient = FlightClient.builder()
+ .useTls()
+ .allocator(new RootAllocator())
+ .location(new Location("grpc+tls://" + flyingduckEndpoing))
+ .trustedCertificates(caChainInputStream)
+ .clientCertificate(clientCertInputStream, clientKeyInputStream)
+ .build();
+
+ // register client certificates
+ ArrowFlightCredentialDTO arrowFlightCredentials = new ArrowFlightCredentialDTO(accessCredentialsDTO);
+ flightClient.doAction(new Action("register-client-certificates",
+ objectMapper.writeValueAsString(arrowFlightCredentials).getBytes()))
+ .hasNext();
+
+ return flightClient;
+ } catch (Exception e) {
+ if (flightClient != null) {
+ flightClient.close();
+ }
+ throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_ARROW_FLIGHT_CONNECTION,
+ Level.SEVERE, "project: " + project.getName(), e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Starts Arrow Flight connection to Flying Duck using the given project-user and then executes a query
+ *
+ * @param query the read query (Proprietary to Flying Duck)
+ * @param project the project that owns the Hive database
+ * @param user the user making the request
+ * @return FeaturegroupPreview
+ * @throws FeaturestoreException
+ */
+ public FeaturegroupPreview executeReadArrowFlightQuery(String query, Project project, Users user)
+ throws FeaturestoreException {
+ try(FlightClient flightClient = initFlightClient(project, user)) {
+ // get flight info
+ FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.command(query.getBytes(StandardCharsets.US_ASCII)));
+
+ // read data
+ FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview();
+ try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) {
+ try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) {
+ while (flightStream.next()) {
+ try (Table table = new Table(vectorSchemaRootReceived.getFieldVectors())) {
+ for (Row tableRow: table) {
+ FeaturegroupPreview.Row row = new FeaturegroupPreview.Row();
+ for (Field field: table.getSchema().getFields()) {
+ row.addValue(new Pair<>(field.getName().toLowerCase(), // UI breaks if header is capitalized
+ tableRow.isNull(field.getName()) ? "" : tableRow.getExtensionType(field.getName()).toString()));
+ }
+ featuregroupPreview.addRow(row);
+ }
+ }
+ }
+ } catch (FlightRuntimeException e) {
+ if (e.getMessage().contains("No such file or directory")) {
+ return featuregroupPreview; // nothing was writtent to hudi
+ }
+ throw e;
+ }
+ }
+ return featuregroupPreview;
+ } catch (Exception e) {
+ throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ARROW_FLIGHT_READ_QUERY_ERROR, Level.FINE,
+ "project: " + project.getName() + ", Arrow Flight query: " + query, e.getMessage(), e);
+ }
+ }
+
+
+ /**
+ * Gets Query string that can be used in Arrow Flight server
+ *
+ * @param featuregroup the featuregroup to preview
+ * @param project the project that owns the Hive database
+ * @param user the user making the request
+ * @param tbl table name
+ * @param limit the number of rows to visualize
+ * @return read query (Proprietary to Arrow Flight server)
+ * @throws FeaturestoreException
+ */
+ public String getArrowFlightQuery(Featuregroup featuregroup, Project project, Users user, String tbl, int limit)
+ throws FeaturestoreException {
+ ArrowFlightQueryDTO queryDto = new ArrowFlightQueryDTO();
+
+ // query
+ String query = featuregroupController.getOfflineFeaturegroupQuery(featuregroup, project, user, tbl, limit);
+ query = query.replace("`", "\"");
+ queryDto.setQueryString(query);
+
+ // features
+ List features = featuregroupController.getFeatures(featuregroup, project, user);
+ List featureNames = features.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.toList());
+ Map> featureMap = Collections.singletonMap(tbl, featureNames);
+ queryDto.setFeatures(featureMap);
+
+ // filters (not necessary since it will always be used only for Preview)
+ queryDto.setFilters(null);
+
+ // connectors
+ Map connectorMap =
+ Collections.singletonMap(tbl, getArrowFlightConnectorDTO(featuregroup));
+ queryDto.setConnectors(connectorMap);
+
+ try {
+ return objectMapper.writeValueAsString(queryDto);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private ArrowFlightConnectorDTO getArrowFlightConnectorDTO(Featuregroup featuregroup)
+ throws FeaturestoreException{
+ ArrowFlightConnectorDTO connector = new ArrowFlightConnectorDTO();
+ if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
+ OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup();
+ FeaturestoreConnector featurestoreConnector = onDemandFeaturegroup.getFeaturestoreConnector();
+
+ connector.setType(featurestoreConnector.getConnectorType().name());
+ connector.setOptions(getConnectorOptions(featurestoreConnector));
+ connector.setQuery(onDemandFeaturegroup.getQuery().replaceAll(";( )*$", ""));
+ connector.setAlias(featuregroupController.getTblName(featuregroup));
+ connector.setFilters(null);
+ } else {
+ connector.setType("hudi");
+ }
+ return connector;
+ }
+
+ /**
+ * Gets Map used for connector options in ArrowFlight
+ *
+ * @param featurestoreConnector the connector from which to extract options
+ * @return Map
+ * @throws FeaturestoreException
+ */
+ private Map getConnectorOptions(FeaturestoreConnector featurestoreConnector)
+ throws FeaturestoreException {
+ Map optionMap = new HashMap();
+
+ switch (featurestoreConnector.getConnectorType()) {
+ case SNOWFLAKE:
+ FeaturestoreSnowflakeConnector snowflakeConnector = featurestoreConnector.getSnowflakeConnector();
+ optionMap.put("user", snowflakeConnector.getDatabaseUser());
+ optionMap.put("account", snowflakeConnector.getUrl()
+ .replace("https://", "")
+ .replace(".snowflakecomputing.com", ""));
+ optionMap.put("database", snowflakeConnector.getDatabaseName() + "/" + snowflakeConnector.getDatabaseSchema());
+
+ if (snowflakeConnector.getPwdSecret() != null) {
+ optionMap.put("password", storageConnectorUtil.getSecret(snowflakeConnector.getPwdSecret(), String.class));
+ } else {
+ optionMap.put("authenticator", "oauth");
+ optionMap.put("token", storageConnectorUtil.getSecret(snowflakeConnector.getTokenSecret(), String.class));
+ }
+
+ if (snowflakeConnector.getWarehouse() != null) {
+ optionMap.put("warehouse", snowflakeConnector.getWarehouse());
+ }
+
+ if (snowflakeConnector.getApplication() != null) {
+ optionMap.put("application", snowflakeConnector.getApplication());
+ }
+ break;
+ case BIGQUERY:
+ FeatureStoreBigqueryConnector connector = featurestoreConnector.getBigqueryConnector();
+ optionMap.put("key_path", connector.getKeyPath());
+ optionMap.put("project_id", connector.getQueryProject());
+ optionMap.put("dataset_id", connector.getDataset());
+ optionMap.put("parent_project", connector.getParentProject());
+ break;
+ default:
+ throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_TYPE, Level.SEVERE,
+ "Arrow Flight doesn't support connector of type: " + featurestoreConnector.getConnectorType().name());
+ }
+
+ return optionMap;
+ }
+
+ /**
+ * Throws exception if feature group is not supported by Arrow Flight server
+ *
+ * @param featuregroup the featuregroup
+ * @throws FeaturestoreException
+ */
+ public void checkFeatureGroupSupportedByArrowFlight(Featuregroup featuregroup) throws FeaturestoreException {
+ if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
+ OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup();
+ FeaturestoreConnector featurestoreConnector = onDemandFeaturegroup.getFeaturestoreConnector();
+ switch (featurestoreConnector.getConnectorType()) {
+ case SNOWFLAKE:
+ case BIGQUERY:
+ return;
+ default:
+ throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_TYPE, Level.SEVERE,
+ "Arrow Flight doesn't support connector of type: " + featurestoreConnector.getConnectorType().name());
+ }
+ }
+ return;
+ }
+}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java
new file mode 100644
index 0000000000..3d58bcf0ad
--- /dev/null
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java
@@ -0,0 +1,50 @@
+/*
+ * This file is part of Hopsworks
+ * Copyright (C) 2024, Hopsworks AB. All rights reserved
+ *
+ * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
+ * the GNU Affero General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License along with this program.
+ * If not, see .
+ */
+
+package io.hops.hopsworks.common.arrowflight;
+
+import io.hops.hopsworks.common.project.AccessCredentialsDTO;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@AllArgsConstructor
+@NoArgsConstructor
+public class ArrowFlightCredentialDTO {
+
+ @Getter
+ @Setter
+ @JsonProperty(value = "kstore")
+ private String kstore;
+ @Getter
+ @Setter
+ @JsonProperty(value = "tstore")
+ private String tstore;
+ @Getter
+ @Setter
+ @JsonProperty(value = "cert_key")
+ private String certKey;
+
+ public ArrowFlightCredentialDTO(AccessCredentialsDTO accessCredentialsDTO) {
+ this.kstore = accessCredentialsDTO.getkStore();
+ this.tstore = accessCredentialsDTO.gettStore();
+ this.certKey = accessCredentialsDTO.getPassword();
+ }
+
+}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java
new file mode 100644
index 0000000000..2e4daae07c
--- /dev/null
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java
@@ -0,0 +1,48 @@
+/*
+ * This file is part of Hopsworks
+ * Copyright (C) 2024, Hopsworks AB. All rights reserved
+ *
+ * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
+ * the GNU Affero General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License along with this program.
+ * If not, see .
+ */
+
+package io.hops.hopsworks.common.arrowflight;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.NoArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import java.util.List;
+import java.util.Map;
+
+@AllArgsConstructor
+@NoArgsConstructor
+public class ArrowFlightQueryDTO {
+
+ @Getter
+ @Setter
+ @JsonProperty(value = "query_string")
+ private String queryString;
+ @Getter
+ @Setter
+ @JsonProperty(value = "features")
+ private Map> features;
+ @Getter
+ @Setter
+ @JsonProperty(value = "filters")
+ private String filters;
+ @Getter
+ @Setter
+ @JsonProperty(value = "connectors")
+ private Map connectors;
+
+}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java
index 7a264452e1..7b82b1fcd5 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java
@@ -18,6 +18,7 @@
import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
+import io.hops.hopsworks.common.arrowflight.ArrowFlightController;
import io.hops.hopsworks.common.commands.featurestore.search.SearchFSCommandLogger;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
@@ -37,6 +38,8 @@
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController;
+import io.hops.hopsworks.common.featurestore.query.ConstructorController;
+import io.hops.hopsworks.common.featurestore.query.Feature;
import io.hops.hopsworks.common.featurestore.statistics.StatisticsController;
import io.hops.hopsworks.common.featurestore.statistics.columns.StatisticColumnController;
import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController;
@@ -69,6 +72,13 @@
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.dialect.HiveSqlDialect;
+import org.apache.calcite.sql.parser.SqlParserPos;
import javax.ejb.EJB;
import javax.ejb.Stateless;
@@ -79,6 +89,7 @@
import java.nio.file.Paths;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.logging.Level;
@@ -136,6 +147,10 @@ public class FeaturegroupController {
private SearchFSCommandLogger searchCommandLogger;
@EJB
private EmbeddingController embeddingController;
+ @EJB
+ private ConstructorController constructorController;
+ @EJB
+ protected ArrowFlightController arrowFlightController;
/**
* Gets all featuregroups for a particular featurestore and project, using the userCerts to query Hive
@@ -906,7 +921,6 @@ private void enforceFeaturegroupQuotas(Featurestore featurestore, FeaturegroupDT
* @param featuregroup of the featuregroup to preview
* @param project the project the user is operating from, in case of shared feature store
* @param user the user making the request
- * @param partition the selected partition if any as represented in the PARTITIONS_METASTORE
* @param online whether to show preview from the online feature store
* @param limit the number of rows to visualize
* @return A DTO with the first 20 feature rows of the online and offline tables.
@@ -915,19 +929,78 @@ private void enforceFeaturegroupQuotas(Featurestore featurestore, FeaturegroupDT
* @throws HopsSecurityException
*/
public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project,
- Users user, String partition, boolean online, int limit)
+ Users user, boolean online, int limit)
throws SQLException, FeaturestoreException, HopsSecurityException {
if (online && featuregroup.isOnlineEnabled()) {
return onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, user, limit);
} else if (online) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE);
- } else if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
- throw new FeaturestoreException(
- RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS,
- Level.FINE, "Preview for offline storage of external feature groups is not supported",
- "featuregroupId: " + featuregroup.getId());
+ } else if (settings.isFlyingduckEnabled()) {
+ // use flying duck for offline fs
+ arrowFlightController.checkFeatureGroupSupportedByArrowFlight(featuregroup);
+
+ String tbl = getTblName(featuregroup);
+ if (featuregroup.getFeaturegroupType() != FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
+ String db = featuregroup.getFeaturestore().getProject().getName().toLowerCase();
+ tbl = db + "." + tbl;
+ }
+ String query = arrowFlightController.getArrowFlightQuery(featuregroup, project, user, tbl, limit);
+ return arrowFlightController.executeReadArrowFlightQuery(query, project, user);
} else {
- return cachedFeaturegroupController.getOfflineFeaturegroupPreview(featuregroup, project, user, partition, limit);
+ // use hive for offline fs
+ if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) {
+ throw new FeaturestoreException(
+ RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS,
+ Level.FINE, "Preview for offline storage of external feature groups is not supported",
+ "featuregroupId: " + featuregroup.getId());
+ } else {
+ String tbl = getTblName(featuregroup);
+ String query = getOfflineFeaturegroupQuery(featuregroup, project, user, tbl, limit);
+ String db = featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
+ return cachedFeaturegroupController.executeReadHiveQuery(query, db, project, user);
+ }
}
}
+
+ /**
+ * Previews the offline data of a given featuregroup by doing a SELECT LIMIT query on the Hive Table
+ *
+ * @param featuregroup the featuregroup to fetch
+ * @param project the project the user is operating from, in case of shared feature store
+ * @param user the user making the request
+ * @param tbl table name
+ * @param limit number of sample to fetch
+ * @return list of feature-rows from the Hive table where the featuregroup is stored
+ * @throws FeaturestoreException
+ */
+ public String getOfflineFeaturegroupQuery(Featuregroup featuregroup, Project project,
+ Users user, String tbl, int limit)
+ throws FeaturestoreException {
+
+ List features = getFeatures(featuregroup, project, user);
+
+ // This is not great, but at the same time the query runs as the user.
+ SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
+ for (FeatureGroupFeatureDTO feature : features) {
+ if (feature.getDefaultValue() == null) {
+ selectList.add(new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"),
+ SqlParserPos.ZERO));
+ } else {
+ selectList.add(constructorController.selectWithDefaultAs(new Feature(feature, tbl), false));
+ }
+ }
+
+ SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList,
+ new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO), null,
+ null, null, null, null, null,
+ SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO), null);
+
+ return select.toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
+ }
+
+ public static boolean isTimeTravelEnabled(Featuregroup featuregroup) {
+ return (featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP &&
+ featuregroup.getCachedFeaturegroup().getTimeTravelFormat() == TimeTravelFormat.HUDI) ||
+ featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP;
+ }
}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java
index eb61e5f9eb..74e31f5512 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java
@@ -16,7 +16,6 @@
package io.hops.hopsworks.common.featurestore.featuregroup.cached;
-import com.google.common.base.Strings;
import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException;
import io.hops.hopsworks.common.featurestore.FeaturestoreController;
import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade;
@@ -27,7 +26,6 @@
import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade;
import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController;
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
-import io.hops.hopsworks.common.featurestore.query.ConstructorController;
import io.hops.hopsworks.common.featurestore.query.Feature;
import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils;
import io.hops.hopsworks.common.hdfs.Utils;
@@ -53,15 +51,6 @@
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
-import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.dialect.HiveSqlDialect;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParserPos;
import javax.annotation.PostConstruct;
import javax.ejb.EJB;
@@ -109,8 +98,6 @@ public class CachedFeaturegroupController {
@EJB
private HiveController hiveController;
@EJB
- private ConstructorController constructorController;
- @EJB
private FeaturestoreUtils featurestoreUtils;
@EJB
private FeaturestoreActivityFacade fsActivityFacade;
@@ -170,89 +157,6 @@ private Connection initConnection(String databaseName, Project project, Users us
"project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), e);
}
}
-
- /**
- * Previews the offline data of a given featuregroup by doing a SELECT LIMIT query on the Hive Table
- *
- * @param featuregroup the featuregroup to fetch
- * @param project the project the user is operating from, in case of shared feature store
- * @param user the user making the request
- * @param limit number of sample to fetch
- * @return list of feature-rows from the Hive table where the featuregroup is stored
- * @throws SQLException
- * @throws FeaturestoreException
- * @throws HopsSecurityException
- */
- public FeaturegroupPreview getOfflineFeaturegroupPreview(Featuregroup featuregroup, Project project,
- Users user, String partition, int limit)
- throws FeaturestoreException, HopsSecurityException, SQLException {
- String tbl = featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion());
-
- List features = featuregroupController.getFeatures(featuregroup, project, user);
-
- // This is not great, but at the same time the query runs as the user.
- SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO);
- for (FeatureGroupFeatureDTO feature : features) {
- if (feature.getDefaultValue() == null) {
- selectList.add(new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"),
- SqlParserPos.ZERO));
- } else {
- selectList.add(constructorController.selectWithDefaultAs(new Feature(feature, tbl), false));
- }
- }
-
- SqlNode whereClause = getWhereCondition(partition, features);
-
- SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList,
- new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO),
- whereClause, null, null, null, null, null,
- SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO), null);
- String db = featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject());
- try {
- return executeReadHiveQuery(
- select.toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
- } catch(Exception e) {
- return executeReadHiveQuery(
- select.toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user);
- }
- }
-
- public SqlNode getWhereCondition(String partition, List features)
- throws FeaturestoreException {
- if (Strings.isNullOrEmpty(partition)) {
- // user didn't ask for a specific partition
- return null;
- }
-
- // partition names are separated by /, column=VALUE/column=VALUE
- SqlNodeList whereClauses = new SqlNodeList(SqlParserPos.ZERO);
- String[] splits = partition.split("/");
- for (String split : splits) {
- int posEqual = split.indexOf("=");
- String column = split.substring(0, posEqual);
- FeatureGroupFeatureDTO partitionFeature = features.stream()
- .filter(FeatureGroupFeatureDTO::getPartition)
- .filter(feature -> feature.getName().equals(column))
- .findFirst().orElseThrow(() ->
- new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_NAME, Level.FINE,
- "The selected partition column: " + column + " was not found among the partition columns of the feature " +
- "group."));
- SqlNode value;
- if (partitionFeature.getType().equalsIgnoreCase("string")) {
- value = SqlLiteral.createCharString(split.substring(posEqual + 1), SqlParserPos.ZERO);
- } else {
- value = new SqlIdentifier(split.substring(posEqual + 1), SqlParserPos.ZERO);
- }
- whereClauses.add(SqlStdOperatorTable.EQUALS.createCall(
- SqlParserPos.ZERO,
- new SqlIdentifier("`" + column + "`", SqlParserPos.ZERO),
- value));
- }
- if (whereClauses.size() == 1) {
- return whereClauses;
- }
- return SqlStdOperatorTable.AND.createCall(whereClauses);
- }
/**
* Persists a cached feature group
@@ -382,7 +286,7 @@ public void deleteFeatureGroup(Featuregroup featuregroup, Project project, Users
* @throws HopsSecurityException
* @throws FeaturestoreException
*/
- private FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user)
+ public FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user)
throws SQLException, FeaturestoreException, HopsSecurityException {
Connection conn = null;
Statement stmt = null;
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java
index 23d3d67b8b..c07a38e1f2 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java
@@ -2398,12 +2398,16 @@ private AccessCredentialsDTO getAccessCredentials(Project project, Users user)
public AccessCredentialsDTO credentials(Integer projectId, Users user) throws ProjectException, DatasetException {
Project project = findProjectById(projectId);
+ return credentials(project, user);
+ }
+
+ public AccessCredentialsDTO credentials(Project project, Users user) throws ProjectException, DatasetException {
try {
return getAccessCredentials(project, user);
} catch (Exception ex) {
LOGGER.log(Level.SEVERE, null, ex);
throw new DatasetException(RESTCodes.DatasetErrorCode.DOWNLOAD_ERROR, Level.SEVERE,
- "Failed to download credentials for projectId: " + projectId, ex.getMessage(), ex);
+ "Failed to download credentials for projectId: " + project.getId(), ex.getMessage(), ex);
} finally {
certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName());
}
diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java
index e5aed6e2bc..0de1ff2528 100644
--- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java
+++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java
@@ -383,6 +383,9 @@ public class Settings implements Serializable {
/*----------------------- Python ------------------------*/
private final static String VARIABLE_MAX_ENV_YML_BYTE_SIZE = "max_env_yml_byte_size";
+ /*----------------------- Flyingduck ------------------------*/
+ private final static String VARIABLE_ENABLE_FLYINGDUCK = "enable_flyingduck";
+
//Git
private static final String VARIABLE_GIT_IMAGE_VERSION = "git_image_version";
private static final String VARIABLE_GIT_COMMAND_TIMEOUT_MINUTES_DEFAULT = "git_command_timeout_minutes";
@@ -936,6 +939,9 @@ private void populateCache() {
ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES = setBoolVar(VARIABLE_ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES,
ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES);
+ ENABLE_FLYINGDUCK = setBoolVar(VARIABLE_ENABLE_FLYINGDUCK,
+ ENABLE_FLYINGDUCK);
+
MAX_LONG_RUNNING_HTTP_REQUESTS =
setIntVar(VARIABLE_MAX_LONG_RUNNING_HTTP_REQUESTS, MAX_LONG_RUNNING_HTTP_REQUESTS);
@@ -3193,6 +3199,13 @@ public synchronized boolean isPythonKernelEnabled() {
return ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES;
}
+ private boolean ENABLE_FLYINGDUCK = false;
+
+ public synchronized boolean isFlyingduckEnabled() {
+ checkCache();
+ return ENABLE_FLYINGDUCK;
+ }
+
//These dependencies were collected by installing jupyterlab in a new environment
public static final List JUPYTER_DEPENDENCIES = Arrays.asList("urllib3", "chardet", "idna", "requests",
"attrs", "zipp", "importlib-metadata", "pyrsistent", "six", "jsonschema", "prometheus-client", "pycparser",
diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java
new file mode 100644
index 0000000000..4316a92870
--- /dev/null
+++ b/hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java
@@ -0,0 +1,307 @@
+/*
+ * This file is part of Hopsworks
+ * Copyright (C) 2024, Hopsworks AB. All rights reserved
+ *
+ * Hopsworks is free software: you can redistribute it and/or modify it under the terms of
+ * the GNU Affero General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License along with this program.
+ * If not, see .
+ */
+
+package io.hops.hopsworks.common.arrowflight;
+
+import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
+import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController;
+import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil;
+import io.hops.hopsworks.exceptions.FeaturestoreException;
+import io.hops.hopsworks.persistence.entity.featurestore.Featurestore;
+import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
+import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType;
+import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.bigquery.FeatureStoreBigqueryConnector;
+import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.snowflake.FeaturestoreSnowflakeConnector;
+import io.hops.hopsworks.persistence.entity.project.Project;
+import io.hops.hopsworks.persistence.entity.user.security.secrets.Secret;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.dialect.HiveSqlDialect;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doReturn;
+
+public class TestArrowFlightController {
+
+ private ArrowFlightController arrowFlightController = Mockito.spy(ArrowFlightController.class);
+
+ List features = new ArrayList<>();
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ arrowFlightController.storageConnectorUtil = Mockito.mock(StorageConnectorUtil.class);
+ arrowFlightController.featuregroupController = Mockito.mock(FeaturegroupController.class);
+
+ features = new ArrayList<>();
+ features.add(new FeatureGroupFeatureDTO("feature", "Integer", "", true, false, "10", null));
+ features.add(new FeatureGroupFeatureDTO("feature2", "String", "", false, false, null, null));
+ }
+
+ @Test
+ public void testGetArrowFlightQueryCachedFg() throws Exception {
+ // Arrange
+ doReturn("SELECT 'id' from 'project.fg_1'").when(arrowFlightController.featuregroupController)
+ .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt());
+
+ doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int")))
+ .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any());
+
+ doReturn("fg_1").when(arrowFlightController.featuregroupController).getTblName(any());
+
+ Project project = new Project();
+ project.setName("project");
+
+ Featurestore featurestore = new Featurestore();
+ featurestore.setProject(project);
+
+ Featuregroup fg = new Featuregroup();
+ fg.setFeaturegroupType(FeaturegroupType.CACHED_FEATURE_GROUP);
+ fg.setName("fg");
+ fg.setVersion(1);
+ fg.setFeaturestore(featurestore);
+
+ // Act
+ String result = arrowFlightController.getArrowFlightQuery(fg, null, null, "project.fg_1", 1);
+
+ // Assert
+ Assert.assertEquals("{" +
+ "\"query_string\":\"SELECT 'id' from 'project.fg_1'\"," +
+ "\"features\":{\"project.fg_1\":[\"id\"]}," +
+ "\"filters\":null," +
+ "\"connectors\":{\"project.fg_1\":{" +
+ "\"type\":\"hudi\"," +
+ "\"options\":null," +
+ "\"query\":null," +
+ "\"alias\":null," +
+ "\"filters\":null" +
+ "}}" +
+ "}", result);
+ }
+
+ @Test
+ public void testGetArrowFlightQueryStreamFg() throws Exception {
+ // Arrange
+ doReturn("SELECT 'id' from 'project.fg_1'").when(arrowFlightController.featuregroupController)
+ .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt());
+
+ doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int")))
+ .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any());
+
+ doReturn("fg_1").when(arrowFlightController.featuregroupController).getTblName(any());
+
+ Project project = new Project();
+ project.setName("project");
+
+ Featurestore featurestore = new Featurestore();
+ featurestore.setProject(project);
+
+ Featuregroup fg = new Featuregroup();
+ fg.setFeaturegroupType(FeaturegroupType.STREAM_FEATURE_GROUP);
+ fg.setName("fg");
+ fg.setVersion(1);
+ fg.setFeaturestore(featurestore);
+
+ // Act
+ String result = arrowFlightController.getArrowFlightQuery(fg, null, null, "project.fg_1", 1);
+
+ // Assert
+ Assert.assertEquals("{" +
+ "\"query_string\":\"SELECT 'id' from 'project.fg_1'\"," +
+ "\"features\":{\"project.fg_1\":[\"id\"]}," +
+ "\"filters\":null," +
+ "\"connectors\":{\"project.fg_1\":{" +
+ "\"type\":\"hudi\"," +
+ "\"options\":null," +
+ "\"query\":null," +
+ "\"alias\":null," +
+ "\"filters\":null" +
+ "}}" +
+ "}", result);
+ }
+
+ @Test
+ public void testGetArrowFlightQueryOndemandFgSnowflake() throws Exception {
+ // Arrange
+ doReturn("SELECT 'id' from 'fg_1'").when(arrowFlightController.featuregroupController)
+ .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt());
+
+ doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int")))
+ .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any());
+
+ String tbl = "fg_1";
+ doReturn(tbl).when(arrowFlightController.featuregroupController).getTblName(any());
+
+ doReturn("test_password").when(arrowFlightController.storageConnectorUtil).getSecret(any(), any());
+
+ Project project = new Project();
+ project.setName("project");
+
+ Featurestore featurestore = new Featurestore();
+ featurestore.setProject(project);
+
+ FeaturestoreSnowflakeConnector snowflakeConnector = new FeaturestoreSnowflakeConnector();
+ snowflakeConnector.setDatabaseUser("test_user");
+ snowflakeConnector.setUrl("https://test_account.snowflakecomputing.com");
+ snowflakeConnector.setDatabaseName("test_dbname");
+ snowflakeConnector.setDatabaseSchema("test_dbschema");
+ snowflakeConnector.setPwdSecret(new Secret());
+ snowflakeConnector.setWarehouse("test_warehouse");
+ snowflakeConnector.setApplication("test_application");
+
+ FeaturestoreConnector connector = new FeaturestoreConnector();
+ connector.setConnectorType(FeaturestoreConnectorType.SNOWFLAKE);
+ connector.setSnowflakeConnector(snowflakeConnector);
+
+ OnDemandFeaturegroup onDemandFg = new OnDemandFeaturegroup();
+ onDemandFg.setQuery("SELECT * FROM HOUSEHOLD_DEMOGRAPHICS;");
+ onDemandFg.setFeaturestoreConnector(connector);
+
+ Featuregroup fg = new Featuregroup();
+ fg.setFeaturegroupType(FeaturegroupType.ON_DEMAND_FEATURE_GROUP);
+ fg.setOnDemandFeaturegroup(onDemandFg);
+ fg.setName("fg");
+ fg.setVersion(1);
+ fg.setFeaturestore(featurestore);
+
+ // Act
+ String result = arrowFlightController.getArrowFlightQuery(fg, null, null, tbl, 1);
+
+ // Assert
+ Assert.assertEquals("{" +
+ "\"query_string\":\"SELECT 'id' from 'fg_1'\"," +
+ "\"features\":{\"fg_1\":[\"id\"]}," +
+ "\"filters\":null," +
+ "\"connectors\":{\"fg_1\":{" +
+ "\"type\":\"SNOWFLAKE\"," +
+ "\"options\":{\"database\":\"test_dbname/test_dbschema\",\"password\":\"test_password\",\"application\":\"test_application\",\"warehouse\":\"test_warehouse\",\"user\":\"test_user\",\"account\":\"test_account\"}," +
+ "\"query\":\"SELECT * FROM HOUSEHOLD_DEMOGRAPHICS\"," +
+ "\"alias\":\"fg_1\"," +
+ "\"filters\":null" +
+ "}}" +
+ "}", result);
+ }
+
+ @Test
+ public void testGetArrowFlightQueryOndemandFgBigQuery() throws Exception {
+ // Arrange
+ doReturn("SELECT 'id' from 'fg_1'").when(arrowFlightController.featuregroupController)
+ .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt());
+
+ doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int")))
+ .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any());
+
+ String tbl = "fg_1";
+ doReturn(tbl).when(arrowFlightController.featuregroupController).getTblName(any());
+
+ Project project = new Project();
+ project.setName("project");
+
+ Featurestore featurestore = new Featurestore();
+ featurestore.setProject(project);
+
+ FeatureStoreBigqueryConnector bigqueryConnector = new FeatureStoreBigqueryConnector();
+ bigqueryConnector.setKeyPath("test_keypath");
+ bigqueryConnector.setQueryProject("test_project");
+ bigqueryConnector.setDataset("test_dataset");
+ bigqueryConnector.setParentProject("test_parent_project");
+
+ FeaturestoreConnector connector = new FeaturestoreConnector();
+ connector.setConnectorType(FeaturestoreConnectorType.BIGQUERY);
+ connector.setBigqueryConnector(bigqueryConnector);
+
+ OnDemandFeaturegroup onDemandFg = new OnDemandFeaturegroup();
+ onDemandFg.setQuery("SELECT * FROM HOUSEHOLD_DEMOGRAPHICS;");
+ onDemandFg.setFeaturestoreConnector(connector);
+
+ Featuregroup fg = new Featuregroup();
+ fg.setFeaturegroupType(FeaturegroupType.ON_DEMAND_FEATURE_GROUP);
+ fg.setOnDemandFeaturegroup(onDemandFg);
+ fg.setName("fg");
+ fg.setVersion(1);
+ fg.setFeaturestore(featurestore);
+
+ // Act
+ String result = arrowFlightController.getArrowFlightQuery(fg, null, null, tbl, 1);
+
+ // Assert
+ Assert.assertEquals("{" +
+ "\"query_string\":\"SELECT 'id' from 'fg_1'\"," +
+ "\"features\":{\"fg_1\":[\"id\"]}," +
+ "\"filters\":null," +
+ "\"connectors\":{\"fg_1\":{" +
+ "\"type\":\"BIGQUERY\"," +
+ "\"options\":{\"parent_project\":\"test_parent_project\",\"project_id\":\"test_project\",\"dataset_id\":\"test_dataset\",\"key_path\":\"test_keypath\"}," +
+ "\"query\":\"SELECT * FROM HOUSEHOLD_DEMOGRAPHICS\"," +
+ "\"alias\":\"fg_1\"," +
+ "\"filters\":null" +
+ "}}" +
+ "}", result);
+ }
+
+ @Test
+ public void testGetArrowFlightQueryOndemandFgBadConnector() throws Exception {
+ // Arrange
+ doReturn("SELECT 'id' from 'fg_1'").when(arrowFlightController.featuregroupController)
+ .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt());
+
+ doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int")))
+ .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any());
+
+ String tbl = "fg_1";
+
+ Project project = new Project();
+ project.setName("project");
+
+ Featurestore featurestore = new Featurestore();
+ featurestore.setProject(project);
+
+ FeaturestoreConnector connector = new FeaturestoreConnector();
+ connector.setConnectorType(FeaturestoreConnectorType.S3);
+
+ OnDemandFeaturegroup onDemandFg = new OnDemandFeaturegroup();
+ onDemandFg.setQuery("SELECT * FROM HOUSEHOLD_DEMOGRAPHICS;");
+ onDemandFg.setFeaturestoreConnector(connector);
+
+ Featuregroup fg = new Featuregroup();
+ fg.setFeaturegroupType(FeaturegroupType.ON_DEMAND_FEATURE_GROUP);
+ fg.setOnDemandFeaturegroup(onDemandFg);
+ fg.setName("fg");
+ fg.setVersion(1);
+ fg.setFeaturestore(featurestore);
+
+ // Act
+ thrown.expect(FeaturestoreException.class);
+ arrowFlightController.getArrowFlightQuery(fg, null, null, tbl, 1);
+
+ // Assert
+ }
+}
diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java
index 69546a8dd8..09e203d62d 100644
--- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java
+++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java
@@ -17,7 +17,6 @@
package io.hops.hopsworks.common.featurestore.featuregroup;
import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO;
-import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import org.junit.Before;
import org.junit.Rule;
@@ -48,5 +47,5 @@ public void testVerifyFeaturesNoDefaultValue() throws Exception {
thrown.expect(FeaturestoreException.class);
featuregroupController.verifyFeaturesNoDefaultValue(features);
}
-
+
}
diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java
index 1ee85ddb44..860ecd4b3e 100644
--- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java
+++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java
@@ -21,9 +21,6 @@
import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO;
import io.hops.hopsworks.exceptions.FeaturestoreException;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat;
-import org.apache.calcite.sql.SqlDialect;
-import org.apache.calcite.sql.dialect.HiveSqlDialect;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -36,7 +33,6 @@ public class TestCachedFeatureGroupController {
List features = new ArrayList<>();
List features2 = new ArrayList<>();
- List partitionFeatures = new ArrayList<>();
private CachedFeaturegroupController cachedFeaturegroupController = new CachedFeaturegroupController();
@@ -52,50 +48,6 @@ public void setup() {
features2 = new ArrayList<>();
features2.add(new FeatureGroupFeatureDTO("part_param", "String", "", true, false));
features2.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false, false));
-
- partitionFeatures.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", false, true));
- partitionFeatures.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false, true));
- partitionFeatures.add(new FeatureGroupFeatureDTO("part_param3", "String", "", false, true));
- }
-
- @Test
- public void testPreviewWhereSingle() throws Exception {
- String queryPart = "part_param=3";
- String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures)
- .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
- Assert.assertEquals("`part_param` = 3", output);
- }
-
- @Test
- public void testPreviewWhereDouble() throws Exception {
- String queryPart = "part_param=3/part_param2=hello";
- String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures)
- .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
- Assert.assertEquals("`part_param` = 3 AND `part_param2` = 'hello'", output);
- }
-
- @Test
- public void testPreviewWhereDoubleSpace() throws Exception {
- String queryPart = "part_param2=3 4/part_param3=hello";
- String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures)
- .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
- Assert.assertEquals("`part_param2` = '3 4' AND `part_param3` = 'hello'", output);
- }
-
- @Test
- public void testPreviewWhereDoubleEquals() throws Exception {
- String queryPart = "part_param2=3=4/part_param3=hello";
- String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures)
- .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
- Assert.assertEquals("`part_param2` = '3=4' AND `part_param3` = 'hello'", output);
- }
-
- @Test
- public void testPreviewWhereNoPartitionColumn() throws Exception {
- String queryPart = "part_param=3";
- thrown.expect(FeaturestoreException.class);
- String output = cachedFeaturegroupController.getWhereCondition(queryPart, features)
- .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql();
}
@Test
diff --git a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java
index 6ef1542abf..15869c952d 100644
--- a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java
+++ b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java
@@ -1353,7 +1353,7 @@ public enum FeaturestoreErrorCode implements RESTErrorCode {
Response.Status.BAD_REQUEST),
CERTIFICATES_NOT_FOUND(18, "Could not find user certificates for authenticating with Hive Feature Store",
Response.Status.INTERNAL_SERVER_ERROR),
- COULD_NOT_INITIATE_HIVE_CONNECTION(19, "Could not initiate connecton to Hive Server",
+ COULD_NOT_INITIATE_HIVE_CONNECTION(19, "Could not initiate connection to Hive Server",
Response.Status.INTERNAL_SERVER_ERROR),
HIVE_UPDATE_STATEMENT_ERROR(20, "Hive Update Statement failed",
Response.Status.INTERNAL_SERVER_ERROR),
@@ -1415,7 +1415,7 @@ public enum FeaturestoreErrorCode implements RESTErrorCode {
FEATURESTORE_ONLINE_NOT_ENABLED(64, "Online featurestore not enabled", Response.Status.BAD_REQUEST),
SYNC_TABLE_NOT_FOUND(65, "The Hive Table to Sync with the feature store was not " +
"found in the metastore", Response.Status.BAD_REQUEST),
- COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE(66, "Could not initiate connecton to " +
+ COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE(66, "Could not initiate connection to " +
"MySQL Server", Response.Status.INTERNAL_SERVER_ERROR),
MYSQL_JDBC_UPDATE_STATEMENT_ERROR(67, "MySQL JDBC Update Statement failed",
Response.Status.INTERNAL_SERVER_ERROR),
@@ -1680,6 +1680,15 @@ public enum FeaturestoreErrorCode implements RESTErrorCode {
HELPER_COL_NOT_FOUND(225, "Could not find helper column in feature view schema",
Response.Status.NOT_FOUND),
OPENSEARCH_DEFAULT_EMBEDDING_INDEX_SUFFIX_NOT_DEFINED(226, "Opensearch default embedding index not defined",
+ Response.Status.INTERNAL_SERVER_ERROR),
+ FEATURE_GROUP_COMMIT_NOT_FOUND(227, "Feature group commit not found", Response.Status.BAD_REQUEST),
+ STATISTICS_NOT_FOUND(228, "Statistics wasn't found.", Response.Status.NOT_FOUND),
+ INVALID_STATISTICS_WINDOW_TIMES(229, "Window times provided are invalid", Response.Status.BAD_REQUEST),
+ COULD_NOT_DELETE_VECTOR_DB_INDEX(230, "Could not delete index from vector db.",
+ Response.Status.INTERNAL_SERVER_ERROR),
+ COULD_NOT_INITIATE_ARROW_FLIGHT_CONNECTION(231, "Could not initiate connection to Arrow Flight server",
+ Response.Status.INTERNAL_SERVER_ERROR),
+ ARROW_FLIGHT_READ_QUERY_ERROR(232, "Arrow Flight server Read Query failed",
Response.Status.INTERNAL_SERVER_ERROR);
private int code;
diff --git a/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java b/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java
index e3780ab2fd..58eb05f3aa 100644
--- a/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java
+++ b/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java
@@ -16,7 +16,8 @@
package io.hops.hopsworks.servicediscovery.tags;
public enum FlyingDuckTags implements ServiceTags {
- monitoring("monitoring");
+ monitoring("monitoring"),
+ server("server");
private final String value;
FlyingDuckTags(String value) {
diff --git a/pom.xml b/pom.xml
index ad4b5e24fe..4421b1cc6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
20231013
2.5.2
1.18.6
- 3.9.1
+ 3.23.1
0.9.12
0.16.0
0.6-SNAPSHOT
@@ -148,6 +148,7 @@
1.6.2
3.8.3
9.2.1
+ 14.0.1
@@ -226,7 +227,6 @@
com.google.protobuf
protobuf-java
${protobuf-java.version}
- provided
com.google.zxing
@@ -521,6 +521,10 @@
io.netty
netty-common
+
+ com.google.flatbuffers
+ flatbuffers-java
+
@@ -868,6 +872,11 @@
javax.json.bind-api
1.0
+
+ org.apache.arrow
+ flight-core
+ ${arrow.version}
+