From b7f439edf0ef610feb4423f8ccf35f4616487c79 Mon Sep 17 00:00:00 2001 From: Ralf Date: Tue, 30 Jan 2024 12:19:15 +0200 Subject: [PATCH] [FSTORE-1063] Hopsworks data preview should use arrow flight to retrieve data if available (#1461) * [FSTORE-1063] Hopsworks data preview should use arrow flight to retrieve data if available (#1630) * fix cherry pick --- .../src/test/ruby/spec/featuregroup_spec.rb | 25 +- .../FeatureGroupPreviewResource.java | 1 - .../featuregroup/PreviewBuilder.java | 4 +- hopsworks-common/pom.xml | 11 +- .../arrowflight/ArrowFlightConnectorDTO.java | 51 +++ .../arrowflight/ArrowFlightController.java | 316 ++++++++++++++++++ .../arrowflight/ArrowFlightCredentialDTO.java | 50 +++ .../arrowflight/ArrowFlightQueryDTO.java | 48 +++ .../featuregroup/FeaturegroupController.java | 89 ++++- .../cached/CachedFeaturegroupController.java | 98 +----- .../common/project/ProjectController.java | 6 +- .../hops/hopsworks/common/util/Settings.java | 13 + .../TestArrowFlightController.java | 307 +++++++++++++++++ .../TestFeatureGroupController.java | 3 +- .../TestCachedFeatureGroupController.java | 48 --- .../hops/hopsworks/restutils/RESTCodes.java | 13 +- .../servicediscovery/tags/FlyingDuckTags.java | 3 +- pom.xml | 13 +- 18 files changed, 933 insertions(+), 166 deletions(-) create mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java create mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java create mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java create mode 100644 hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java create mode 100644 hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java diff --git a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb index c036a1d61e..e73ed05cdc 100644 --- a/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb +++ b/hopsworks-IT/src/test/ruby/spec/featuregroup_spec.rb @@ -21,9 +21,15 @@ describe "cached feature groups" do context 'with valid project, featurestore service enabled' do before :all do + @enable_flyingduck = getVar("enable_flyingduck") + with_valid_project end + after :all do + setVar("enable_flyingduck", @enable_flyingduck[:value]) + end + it "should be able to add a offline cached featuregroup to the featurestore" do project = get_project featurestore_id = get_featurestore_id(project.id) @@ -247,8 +253,25 @@ expect(parsed_json["features"].first["description"]).to eql("this description contains ' and ;'") end - it "should be able to preview a offline cached featuregroup in the featurestore" do + it "should be able to preview a offline cached featuregroup in the featurestore using hive" do + project = get_project + setVar("enable_flyingduck", "false") + create_session(project[:username], "Pass123") + featurestore_id = get_featurestore_id(project.id) + json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id) + parsed_json = JSON.parse(json_result) + expect_status_details(201) + featuregroup_id = parsed_json["id"] + preview_featuregroup_endpoint = "#{ENV['HOPSWORKS_API']}/project/" + project.id.to_s + "/featurestores/" + featurestore_id.to_s + "/featuregroups/" + featuregroup_id.to_s + "/preview" + get preview_featuregroup_endpoint + parsed_json = JSON.parse(response.body) + expect_status_details(200) + end + + it "should be able to preview a offline cached featuregroup in the featurestore using flying duck" do project = get_project + setVar("enable_flyingduck", "true") + create_session(project[:username], "Pass123") featurestore_id = get_featurestore_id(project.id) json_result, featuregroup_name = create_cached_featuregroup(project.id, featurestore_id) parsed_json = JSON.parse(json_result) diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java index a29e43538c..6612e48c30 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/FeatureGroupPreviewResource.java @@ -109,7 +109,6 @@ public Response getPreview(@BeanParam FeatureGroupPreviewBeanParam featureGroupP } PreviewDTO previewDTO = previewBuilder.build(uriInfo, user, project, featuregroup, - featureGroupPreviewBeanParam.getPartition(), online, featureGroupPreviewBeanParam.getLimit() == null ? 20 : featureGroupPreviewBeanParam.getLimit()); diff --git a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java index fd56c4cdac..b0008f8dda 100644 --- a/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java +++ b/hopsworks-api/src/main/java/io/hops/hopsworks/api/featurestore/featuregroup/PreviewBuilder.java @@ -57,12 +57,12 @@ private URI uri(UriInfo uriInfo, Project project, Featuregroup featuregroup) { } public PreviewDTO build(UriInfo uriInfo, Users user, Project project, Featuregroup featuregroup, - String partition, boolean online, int limit) + boolean online, int limit) throws FeaturestoreException, HopsSecurityException { FeaturegroupPreview preview; try { - preview = featuregroupController.getFeaturegroupPreview(featuregroup, project, user, partition, online, limit); + preview = featuregroupController.getFeaturegroupPreview(featuregroup, project, user, online, limit); } catch (SQLException e) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_PREVIEW_FEATUREGROUP, Level.SEVERE, "Feature Group id: " + featuregroup.getId(), e.getMessage(), e); diff --git a/hopsworks-common/pom.xml b/hopsworks-common/pom.xml index 285504f698..2a80ca1349 100644 --- a/hopsworks-common/pom.xml +++ b/hopsworks-common/pom.xml @@ -248,6 +248,15 @@ junit junit + + org.apache.arrow + flight-core + + + org.projectlombok + lombok + ${projectlombok.version} + @@ -260,4 +269,4 @@ - \ No newline at end of file + diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java new file mode 100644 index 0000000000..aa5850e64d --- /dev/null +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightConnectorDTO.java @@ -0,0 +1,51 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2024, Hopsworks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.arrowflight; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import java.util.Map; + +@AllArgsConstructor +@NoArgsConstructor +public class ArrowFlightConnectorDTO { + + @Getter + @Setter + @JsonProperty(value = "type") + private String type; + @Getter + @Setter + @JsonProperty(value = "options") + private Map options; + @Getter + @Setter + @JsonProperty(value = "query") + private String query; + @Getter + @Setter + @JsonProperty(value = "alias") + private String alias; + @Getter + @Setter + @JsonProperty(value = "filters") + private String filters; + +} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java new file mode 100644 index 0000000000..2cb8486923 --- /dev/null +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightController.java @@ -0,0 +1,316 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2024, Hopsworks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.arrowflight; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.logicalclocks.servicediscoverclient.service.Service; +import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; +import io.hops.hopsworks.common.featurestore.featuregroup.cached.FeaturegroupPreview; +import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController; +import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil; +import io.hops.hopsworks.common.hosts.ServiceDiscoveryController; +import io.hops.hopsworks.common.project.AccessCredentialsDTO; +import io.hops.hopsworks.common.project.ProjectController; +import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.bigquery.FeatureStoreBigqueryConnector; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.snowflake.FeaturestoreSnowflakeConnector; +import io.hops.hopsworks.persistence.entity.project.Project; +import io.hops.hopsworks.persistence.entity.user.Users; +import io.hops.hopsworks.restutils.RESTCodes; +import io.hops.hopsworks.servicediscovery.HopsworksService; +import io.hops.hopsworks.servicediscovery.tags.FlyingDuckTags; +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightRuntimeException; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.table.Row; +import org.apache.arrow.vector.table.Table; +import org.apache.arrow.vector.types.pojo.Field; +import org.javatuples.Pair; + +import javax.ejb.EJB; +import javax.ejb.Stateless; +import javax.ejb.TransactionAttribute; +import javax.ejb.TransactionAttributeType; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import java.util.stream.Collectors; + +/** + * Class controlling the interaction with the arrow flight server + */ +@Stateless +@TransactionAttribute(TransactionAttributeType.NEVER) +public class ArrowFlightController { + + @EJB + protected StorageConnectorUtil storageConnectorUtil; + @EJB + private ProjectController projectController; + @EJB + protected ServiceDiscoveryController serviceDiscoveryController; + @EJB + protected FeaturegroupController featuregroupController; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Initializes an Arrow Flight connection to Flying Duck using TLS with the given access credentials + * + * @param project the project that owns the Hive database + * @param user the user making the request + * @return FlightClient + * @throws FeaturestoreException + */ + private FlightClient initFlightClient(Project project, Users user) + throws FeaturestoreException, InterruptedException { + FlightClient flightClient = null; + try { + AccessCredentialsDTO accessCredentialsDTO = projectController.credentials(project, user); + + InputStream caChainInputStream = + new ByteArrayInputStream(accessCredentialsDTO.getCaChain().getBytes(StandardCharsets.UTF_8)); + InputStream clientCertInputStream = + new ByteArrayInputStream(accessCredentialsDTO.getClientCert().getBytes(StandardCharsets.UTF_8)); + InputStream clientKeyInputStream = + new ByteArrayInputStream(accessCredentialsDTO.getClientKey().getBytes(StandardCharsets.UTF_8)); + + // Flyingduck port is exposed as server.flyingduck.service.consul, however flyingduck is quite picky + // when it comes to certificates and it requires the hostname to be flyingduck.service.consul + // so here we fetch the port from the service discovery and then we build the rest of the name + Service flyingduckService = serviceDiscoveryController + .getAnyAddressOfServiceWithDNS(HopsworksService.FLYING_DUCK.getNameWithTag(FlyingDuckTags.server)); + String flyingduckEndpoing = serviceDiscoveryController + .constructServiceFQDN(HopsworksService.FLYING_DUCK.getName()) + ":" + flyingduckService.getPort(); + + flightClient = FlightClient.builder() + .useTls() + .allocator(new RootAllocator()) + .location(new Location("grpc+tls://" + flyingduckEndpoing)) + .trustedCertificates(caChainInputStream) + .clientCertificate(clientCertInputStream, clientKeyInputStream) + .build(); + + // register client certificates + ArrowFlightCredentialDTO arrowFlightCredentials = new ArrowFlightCredentialDTO(accessCredentialsDTO); + flightClient.doAction(new Action("register-client-certificates", + objectMapper.writeValueAsString(arrowFlightCredentials).getBytes())) + .hasNext(); + + return flightClient; + } catch (Exception e) { + if (flightClient != null) { + flightClient.close(); + } + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.COULD_NOT_INITIATE_ARROW_FLIGHT_CONNECTION, + Level.SEVERE, "project: " + project.getName(), e.getMessage(), e); + } + } + + /** + * Starts Arrow Flight connection to Flying Duck using the given project-user and then executes a query + * + * @param query the read query (Proprietary to Flying Duck) + * @param project the project that owns the Hive database + * @param user the user making the request + * @return FeaturegroupPreview + * @throws FeaturestoreException + */ + public FeaturegroupPreview executeReadArrowFlightQuery(String query, Project project, Users user) + throws FeaturestoreException { + try(FlightClient flightClient = initFlightClient(project, user)) { + // get flight info + FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.command(query.getBytes(StandardCharsets.US_ASCII))); + + // read data + FeaturegroupPreview featuregroupPreview = new FeaturegroupPreview(); + try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) { + try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { + while (flightStream.next()) { + try (Table table = new Table(vectorSchemaRootReceived.getFieldVectors())) { + for (Row tableRow: table) { + FeaturegroupPreview.Row row = new FeaturegroupPreview.Row(); + for (Field field: table.getSchema().getFields()) { + row.addValue(new Pair<>(field.getName().toLowerCase(), // UI breaks if header is capitalized + tableRow.isNull(field.getName()) ? "" : tableRow.getExtensionType(field.getName()).toString())); + } + featuregroupPreview.addRow(row); + } + } + } + } catch (FlightRuntimeException e) { + if (e.getMessage().contains("No such file or directory")) { + return featuregroupPreview; // nothing was writtent to hudi + } + throw e; + } + } + return featuregroupPreview; + } catch (Exception e) { + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ARROW_FLIGHT_READ_QUERY_ERROR, Level.FINE, + "project: " + project.getName() + ", Arrow Flight query: " + query, e.getMessage(), e); + } + } + + + /** + * Gets Query string that can be used in Arrow Flight server + * + * @param featuregroup the featuregroup to preview + * @param project the project that owns the Hive database + * @param user the user making the request + * @param tbl table name + * @param limit the number of rows to visualize + * @return read query (Proprietary to Arrow Flight server) + * @throws FeaturestoreException + */ + public String getArrowFlightQuery(Featuregroup featuregroup, Project project, Users user, String tbl, int limit) + throws FeaturestoreException { + ArrowFlightQueryDTO queryDto = new ArrowFlightQueryDTO(); + + // query + String query = featuregroupController.getOfflineFeaturegroupQuery(featuregroup, project, user, tbl, limit); + query = query.replace("`", "\""); + queryDto.setQueryString(query); + + // features + List features = featuregroupController.getFeatures(featuregroup, project, user); + List featureNames = features.stream().map(FeatureGroupFeatureDTO::getName).collect(Collectors.toList()); + Map> featureMap = Collections.singletonMap(tbl, featureNames); + queryDto.setFeatures(featureMap); + + // filters (not necessary since it will always be used only for Preview) + queryDto.setFilters(null); + + // connectors + Map connectorMap = + Collections.singletonMap(tbl, getArrowFlightConnectorDTO(featuregroup)); + queryDto.setConnectors(connectorMap); + + try { + return objectMapper.writeValueAsString(queryDto); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e); + } + } + + private ArrowFlightConnectorDTO getArrowFlightConnectorDTO(Featuregroup featuregroup) + throws FeaturestoreException{ + ArrowFlightConnectorDTO connector = new ArrowFlightConnectorDTO(); + if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup(); + FeaturestoreConnector featurestoreConnector = onDemandFeaturegroup.getFeaturestoreConnector(); + + connector.setType(featurestoreConnector.getConnectorType().name()); + connector.setOptions(getConnectorOptions(featurestoreConnector)); + connector.setQuery(onDemandFeaturegroup.getQuery().replaceAll(";( )*$", "")); + connector.setAlias(featuregroupController.getTblName(featuregroup)); + connector.setFilters(null); + } else { + connector.setType("hudi"); + } + return connector; + } + + /** + * Gets Map used for connector options in ArrowFlight + * + * @param featurestoreConnector the connector from which to extract options + * @return Map + * @throws FeaturestoreException + */ + private Map getConnectorOptions(FeaturestoreConnector featurestoreConnector) + throws FeaturestoreException { + Map optionMap = new HashMap(); + + switch (featurestoreConnector.getConnectorType()) { + case SNOWFLAKE: + FeaturestoreSnowflakeConnector snowflakeConnector = featurestoreConnector.getSnowflakeConnector(); + optionMap.put("user", snowflakeConnector.getDatabaseUser()); + optionMap.put("account", snowflakeConnector.getUrl() + .replace("https://", "") + .replace(".snowflakecomputing.com", "")); + optionMap.put("database", snowflakeConnector.getDatabaseName() + "/" + snowflakeConnector.getDatabaseSchema()); + + if (snowflakeConnector.getPwdSecret() != null) { + optionMap.put("password", storageConnectorUtil.getSecret(snowflakeConnector.getPwdSecret(), String.class)); + } else { + optionMap.put("authenticator", "oauth"); + optionMap.put("token", storageConnectorUtil.getSecret(snowflakeConnector.getTokenSecret(), String.class)); + } + + if (snowflakeConnector.getWarehouse() != null) { + optionMap.put("warehouse", snowflakeConnector.getWarehouse()); + } + + if (snowflakeConnector.getApplication() != null) { + optionMap.put("application", snowflakeConnector.getApplication()); + } + break; + case BIGQUERY: + FeatureStoreBigqueryConnector connector = featurestoreConnector.getBigqueryConnector(); + optionMap.put("key_path", connector.getKeyPath()); + optionMap.put("project_id", connector.getQueryProject()); + optionMap.put("dataset_id", connector.getDataset()); + optionMap.put("parent_project", connector.getParentProject()); + break; + default: + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_TYPE, Level.SEVERE, + "Arrow Flight doesn't support connector of type: " + featurestoreConnector.getConnectorType().name()); + } + + return optionMap; + } + + /** + * Throws exception if feature group is not supported by Arrow Flight server + * + * @param featuregroup the featuregroup + * @throws FeaturestoreException + */ + public void checkFeatureGroupSupportedByArrowFlight(Featuregroup featuregroup) throws FeaturestoreException { + if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + OnDemandFeaturegroup onDemandFeaturegroup = featuregroup.getOnDemandFeaturegroup(); + FeaturestoreConnector featurestoreConnector = onDemandFeaturegroup.getFeaturestoreConnector(); + switch (featurestoreConnector.getConnectorType()) { + case SNOWFLAKE: + case BIGQUERY: + return; + default: + throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_STORAGE_CONNECTOR_TYPE, Level.SEVERE, + "Arrow Flight doesn't support connector of type: " + featurestoreConnector.getConnectorType().name()); + } + } + return; + } +} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java new file mode 100644 index 0000000000..3d58bcf0ad --- /dev/null +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightCredentialDTO.java @@ -0,0 +1,50 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2024, Hopsworks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.arrowflight; + +import io.hops.hopsworks.common.project.AccessCredentialsDTO; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@AllArgsConstructor +@NoArgsConstructor +public class ArrowFlightCredentialDTO { + + @Getter + @Setter + @JsonProperty(value = "kstore") + private String kstore; + @Getter + @Setter + @JsonProperty(value = "tstore") + private String tstore; + @Getter + @Setter + @JsonProperty(value = "cert_key") + private String certKey; + + public ArrowFlightCredentialDTO(AccessCredentialsDTO accessCredentialsDTO) { + this.kstore = accessCredentialsDTO.getkStore(); + this.tstore = accessCredentialsDTO.gettStore(); + this.certKey = accessCredentialsDTO.getPassword(); + } + +} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java new file mode 100644 index 0000000000..2e4daae07c --- /dev/null +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/arrowflight/ArrowFlightQueryDTO.java @@ -0,0 +1,48 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2024, Hopsworks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.arrowflight; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import java.util.List; +import java.util.Map; + +@AllArgsConstructor +@NoArgsConstructor +public class ArrowFlightQueryDTO { + + @Getter + @Setter + @JsonProperty(value = "query_string") + private String queryString; + @Getter + @Setter + @JsonProperty(value = "features") + private Map> features; + @Getter + @Setter + @JsonProperty(value = "filters") + private String filters; + @Getter + @Setter + @JsonProperty(value = "connectors") + private Map connectors; + +} diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java index 7a264452e1..7b82b1fcd5 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/FeaturegroupController.java @@ -18,6 +18,7 @@ import com.google.common.base.Strings; import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; +import io.hops.hopsworks.common.arrowflight.ArrowFlightController; import io.hops.hopsworks.common.commands.featurestore.search.SearchFSCommandLogger; import io.hops.hopsworks.common.featurestore.FeaturestoreController; import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade; @@ -37,6 +38,8 @@ import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupController; import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO; import io.hops.hopsworks.common.featurestore.online.OnlineFeaturestoreController; +import io.hops.hopsworks.common.featurestore.query.ConstructorController; +import io.hops.hopsworks.common.featurestore.query.Feature; import io.hops.hopsworks.common.featurestore.statistics.StatisticsController; import io.hops.hopsworks.common.featurestore.statistics.columns.StatisticColumnController; import io.hops.hopsworks.common.featurestore.storageconnectors.FeaturestoreStorageConnectorController; @@ -69,6 +72,13 @@ import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.restutils.RESTCodes; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.dialect.HiveSqlDialect; +import org.apache.calcite.sql.parser.SqlParserPos; import javax.ejb.EJB; import javax.ejb.Stateless; @@ -79,6 +89,7 @@ import java.nio.file.Paths; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.logging.Level; @@ -136,6 +147,10 @@ public class FeaturegroupController { private SearchFSCommandLogger searchCommandLogger; @EJB private EmbeddingController embeddingController; + @EJB + private ConstructorController constructorController; + @EJB + protected ArrowFlightController arrowFlightController; /** * Gets all featuregroups for a particular featurestore and project, using the userCerts to query Hive @@ -906,7 +921,6 @@ private void enforceFeaturegroupQuotas(Featurestore featurestore, FeaturegroupDT * @param featuregroup of the featuregroup to preview * @param project the project the user is operating from, in case of shared feature store * @param user the user making the request - * @param partition the selected partition if any as represented in the PARTITIONS_METASTORE * @param online whether to show preview from the online feature store * @param limit the number of rows to visualize * @return A DTO with the first 20 feature rows of the online and offline tables. @@ -915,19 +929,78 @@ private void enforceFeaturegroupQuotas(Featurestore featurestore, FeaturegroupDT * @throws HopsSecurityException */ public FeaturegroupPreview getFeaturegroupPreview(Featuregroup featuregroup, Project project, - Users user, String partition, boolean online, int limit) + Users user, boolean online, int limit) throws SQLException, FeaturestoreException, HopsSecurityException { if (online && featuregroup.isOnlineEnabled()) { return onlineFeaturegroupController.getFeaturegroupPreview(featuregroup, project, user, limit); } else if (online) { throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.FEATUREGROUP_NOT_ONLINE, Level.FINE); - } else if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { - throw new FeaturestoreException( - RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, - Level.FINE, "Preview for offline storage of external feature groups is not supported", - "featuregroupId: " + featuregroup.getId()); + } else if (settings.isFlyingduckEnabled()) { + // use flying duck for offline fs + arrowFlightController.checkFeatureGroupSupportedByArrowFlight(featuregroup); + + String tbl = getTblName(featuregroup); + if (featuregroup.getFeaturegroupType() != FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + String db = featuregroup.getFeaturestore().getProject().getName().toLowerCase(); + tbl = db + "." + tbl; + } + String query = arrowFlightController.getArrowFlightQuery(featuregroup, project, user, tbl, limit); + return arrowFlightController.executeReadArrowFlightQuery(query, project, user); } else { - return cachedFeaturegroupController.getOfflineFeaturegroupPreview(featuregroup, project, user, partition, limit); + // use hive for offline fs + if (featuregroup.getFeaturegroupType() == FeaturegroupType.ON_DEMAND_FEATURE_GROUP) { + throw new FeaturestoreException( + RESTCodes.FeaturestoreErrorCode.PREVIEW_NOT_SUPPORTED_FOR_ON_DEMAND_FEATUREGROUPS, + Level.FINE, "Preview for offline storage of external feature groups is not supported", + "featuregroupId: " + featuregroup.getId()); + } else { + String tbl = getTblName(featuregroup); + String query = getOfflineFeaturegroupQuery(featuregroup, project, user, tbl, limit); + String db = featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()); + return cachedFeaturegroupController.executeReadHiveQuery(query, db, project, user); + } } } + + /** + * Previews the offline data of a given featuregroup by doing a SELECT LIMIT query on the Hive Table + * + * @param featuregroup the featuregroup to fetch + * @param project the project the user is operating from, in case of shared feature store + * @param user the user making the request + * @param tbl table name + * @param limit number of sample to fetch + * @return list of feature-rows from the Hive table where the featuregroup is stored + * @throws FeaturestoreException + */ + public String getOfflineFeaturegroupQuery(Featuregroup featuregroup, Project project, + Users user, String tbl, int limit) + throws FeaturestoreException { + + List features = getFeatures(featuregroup, project, user); + + // This is not great, but at the same time the query runs as the user. + SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO); + for (FeatureGroupFeatureDTO feature : features) { + if (feature.getDefaultValue() == null) { + selectList.add(new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"), + SqlParserPos.ZERO)); + } else { + selectList.add(constructorController.selectWithDefaultAs(new Feature(feature, tbl), false)); + } + } + + SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList, + new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO), null, + null, null, null, null, null, + SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO), null); + + return select.toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); + } + + public static boolean isTimeTravelEnabled(Featuregroup featuregroup) { + return (featuregroup.getFeaturegroupType() == FeaturegroupType.CACHED_FEATURE_GROUP && + featuregroup.getCachedFeaturegroup().getTimeTravelFormat() == TimeTravelFormat.HUDI) || + featuregroup.getFeaturegroupType() == FeaturegroupType.STREAM_FEATURE_GROUP; + } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java index eb61e5f9eb..74e31f5512 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/cached/CachedFeaturegroupController.java @@ -16,7 +16,6 @@ package io.hops.hopsworks.common.featurestore.featuregroup.cached; -import com.google.common.base.Strings; import com.logicalclocks.servicediscoverclient.exceptions.ServiceDiscoveryException; import io.hops.hopsworks.common.featurestore.FeaturestoreController; import io.hops.hopsworks.common.featurestore.activity.FeaturestoreActivityFacade; @@ -27,7 +26,6 @@ import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupFacade; import io.hops.hopsworks.common.featurestore.featuregroup.online.OnlineFeaturegroupController; import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO; -import io.hops.hopsworks.common.featurestore.query.ConstructorController; import io.hops.hopsworks.common.featurestore.query.Feature; import io.hops.hopsworks.common.featurestore.utils.FeaturestoreUtils; import io.hops.hopsworks.common.hdfs.Utils; @@ -53,15 +51,6 @@ import io.hops.hopsworks.persistence.entity.project.Project; import io.hops.hopsworks.persistence.entity.user.Users; import io.hops.hopsworks.restutils.RESTCodes; -import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlLiteral; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.dialect.HiveSqlDialect; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; -import org.apache.calcite.sql.parser.SqlParserPos; import javax.annotation.PostConstruct; import javax.ejb.EJB; @@ -109,8 +98,6 @@ public class CachedFeaturegroupController { @EJB private HiveController hiveController; @EJB - private ConstructorController constructorController; - @EJB private FeaturestoreUtils featurestoreUtils; @EJB private FeaturestoreActivityFacade fsActivityFacade; @@ -170,89 +157,6 @@ private Connection initConnection(String databaseName, Project project, Users us "project: " + project.getName() + ", hive database: " + databaseName, e.getMessage(), e); } } - - /** - * Previews the offline data of a given featuregroup by doing a SELECT LIMIT query on the Hive Table - * - * @param featuregroup the featuregroup to fetch - * @param project the project the user is operating from, in case of shared feature store - * @param user the user making the request - * @param limit number of sample to fetch - * @return list of feature-rows from the Hive table where the featuregroup is stored - * @throws SQLException - * @throws FeaturestoreException - * @throws HopsSecurityException - */ - public FeaturegroupPreview getOfflineFeaturegroupPreview(Featuregroup featuregroup, Project project, - Users user, String partition, int limit) - throws FeaturestoreException, HopsSecurityException, SQLException { - String tbl = featuregroupController.getTblName(featuregroup.getName(), featuregroup.getVersion()); - - List features = featuregroupController.getFeatures(featuregroup, project, user); - - // This is not great, but at the same time the query runs as the user. - SqlNodeList selectList = new SqlNodeList(SqlParserPos.ZERO); - for (FeatureGroupFeatureDTO feature : features) { - if (feature.getDefaultValue() == null) { - selectList.add(new SqlIdentifier(Arrays.asList("`" + tbl + "`", "`" + feature.getName() + "`"), - SqlParserPos.ZERO)); - } else { - selectList.add(constructorController.selectWithDefaultAs(new Feature(feature, tbl), false)); - } - } - - SqlNode whereClause = getWhereCondition(partition, features); - - SqlSelect select = new SqlSelect(SqlParserPos.ZERO, null, selectList, - new SqlIdentifier("`" + tbl + "`", SqlParserPos.ZERO), - whereClause, null, null, null, null, null, - SqlLiteral.createExactNumeric(String.valueOf(limit), SqlParserPos.ZERO), null); - String db = featurestoreController.getOfflineFeaturestoreDbName(featuregroup.getFeaturestore().getProject()); - try { - return executeReadHiveQuery( - select.toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user); - } catch(Exception e) { - return executeReadHiveQuery( - select.toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(), db, project, user); - } - } - - public SqlNode getWhereCondition(String partition, List features) - throws FeaturestoreException { - if (Strings.isNullOrEmpty(partition)) { - // user didn't ask for a specific partition - return null; - } - - // partition names are separated by /, column=VALUE/column=VALUE - SqlNodeList whereClauses = new SqlNodeList(SqlParserPos.ZERO); - String[] splits = partition.split("/"); - for (String split : splits) { - int posEqual = split.indexOf("="); - String column = split.substring(0, posEqual); - FeatureGroupFeatureDTO partitionFeature = features.stream() - .filter(FeatureGroupFeatureDTO::getPartition) - .filter(feature -> feature.getName().equals(column)) - .findFirst().orElseThrow(() -> - new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ILLEGAL_FEATURE_NAME, Level.FINE, - "The selected partition column: " + column + " was not found among the partition columns of the feature " + - "group.")); - SqlNode value; - if (partitionFeature.getType().equalsIgnoreCase("string")) { - value = SqlLiteral.createCharString(split.substring(posEqual + 1), SqlParserPos.ZERO); - } else { - value = new SqlIdentifier(split.substring(posEqual + 1), SqlParserPos.ZERO); - } - whereClauses.add(SqlStdOperatorTable.EQUALS.createCall( - SqlParserPos.ZERO, - new SqlIdentifier("`" + column + "`", SqlParserPos.ZERO), - value)); - } - if (whereClauses.size() == 1) { - return whereClauses; - } - return SqlStdOperatorTable.AND.createCall(whereClauses); - } /** * Persists a cached feature group @@ -382,7 +286,7 @@ public void deleteFeatureGroup(Featuregroup featuregroup, Project project, Users * @throws HopsSecurityException * @throws FeaturestoreException */ - private FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user) + public FeaturegroupPreview executeReadHiveQuery(String query, String databaseName, Project project, Users user) throws SQLException, FeaturestoreException, HopsSecurityException { Connection conn = null; Statement stmt = null; diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java index 23d3d67b8b..c07a38e1f2 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/project/ProjectController.java @@ -2398,12 +2398,16 @@ private AccessCredentialsDTO getAccessCredentials(Project project, Users user) public AccessCredentialsDTO credentials(Integer projectId, Users user) throws ProjectException, DatasetException { Project project = findProjectById(projectId); + return credentials(project, user); + } + + public AccessCredentialsDTO credentials(Project project, Users user) throws ProjectException, DatasetException { try { return getAccessCredentials(project, user); } catch (Exception ex) { LOGGER.log(Level.SEVERE, null, ex); throw new DatasetException(RESTCodes.DatasetErrorCode.DOWNLOAD_ERROR, Level.SEVERE, - "Failed to download credentials for projectId: " + projectId, ex.getMessage(), ex); + "Failed to download credentials for projectId: " + project.getId(), ex.getMessage(), ex); } finally { certificateMaterializer.removeCertificatesLocal(user.getUsername(), project.getName()); } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java index e5aed6e2bc..0de1ff2528 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/util/Settings.java @@ -383,6 +383,9 @@ public class Settings implements Serializable { /*----------------------- Python ------------------------*/ private final static String VARIABLE_MAX_ENV_YML_BYTE_SIZE = "max_env_yml_byte_size"; + /*----------------------- Flyingduck ------------------------*/ + private final static String VARIABLE_ENABLE_FLYINGDUCK = "enable_flyingduck"; + //Git private static final String VARIABLE_GIT_IMAGE_VERSION = "git_image_version"; private static final String VARIABLE_GIT_COMMAND_TIMEOUT_MINUTES_DEFAULT = "git_command_timeout_minutes"; @@ -936,6 +939,9 @@ private void populateCache() { ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES = setBoolVar(VARIABLE_ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES, ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES); + ENABLE_FLYINGDUCK = setBoolVar(VARIABLE_ENABLE_FLYINGDUCK, + ENABLE_FLYINGDUCK); + MAX_LONG_RUNNING_HTTP_REQUESTS = setIntVar(VARIABLE_MAX_LONG_RUNNING_HTTP_REQUESTS, MAX_LONG_RUNNING_HTTP_REQUESTS); @@ -3193,6 +3199,13 @@ public synchronized boolean isPythonKernelEnabled() { return ENABLE_JUPYTER_PYTHON_KERNEL_NON_KUBERNETES; } + private boolean ENABLE_FLYINGDUCK = false; + + public synchronized boolean isFlyingduckEnabled() { + checkCache(); + return ENABLE_FLYINGDUCK; + } + //These dependencies were collected by installing jupyterlab in a new environment public static final List JUPYTER_DEPENDENCIES = Arrays.asList("urllib3", "chardet", "idna", "requests", "attrs", "zipp", "importlib-metadata", "pyrsistent", "six", "jsonschema", "prometheus-client", "pycparser", diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java new file mode 100644 index 0000000000..4316a92870 --- /dev/null +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/arrowflight/TestArrowFlightController.java @@ -0,0 +1,307 @@ +/* + * This file is part of Hopsworks + * Copyright (C) 2024, Hopsworks AB. All rights reserved + * + * Hopsworks is free software: you can redistribute it and/or modify it under the terms of + * the GNU Affero General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * Hopsworks is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; + * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR + * PURPOSE. See the GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License along with this program. + * If not, see . + */ + +package io.hops.hopsworks.common.arrowflight; + +import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; +import io.hops.hopsworks.common.featurestore.featuregroup.FeaturegroupController; +import io.hops.hopsworks.common.featurestore.storageconnectors.StorageConnectorUtil; +import io.hops.hopsworks.exceptions.FeaturestoreException; +import io.hops.hopsworks.persistence.entity.featurestore.Featurestore; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.FeaturegroupType; +import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.ondemand.OnDemandFeaturegroup; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnector; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.FeaturestoreConnectorType; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.bigquery.FeatureStoreBigqueryConnector; +import io.hops.hopsworks.persistence.entity.featurestore.storageconnector.snowflake.FeaturestoreSnowflakeConnector; +import io.hops.hopsworks.persistence.entity.project.Project; +import io.hops.hopsworks.persistence.entity.user.security.secrets.Secret; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.dialect.HiveSqlDialect; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doReturn; + +public class TestArrowFlightController { + + private ArrowFlightController arrowFlightController = Mockito.spy(ArrowFlightController.class); + + List features = new ArrayList<>(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setup() { + arrowFlightController.storageConnectorUtil = Mockito.mock(StorageConnectorUtil.class); + arrowFlightController.featuregroupController = Mockito.mock(FeaturegroupController.class); + + features = new ArrayList<>(); + features.add(new FeatureGroupFeatureDTO("feature", "Integer", "", true, false, "10", null)); + features.add(new FeatureGroupFeatureDTO("feature2", "String", "", false, false, null, null)); + } + + @Test + public void testGetArrowFlightQueryCachedFg() throws Exception { + // Arrange + doReturn("SELECT 'id' from 'project.fg_1'").when(arrowFlightController.featuregroupController) + .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt()); + + doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int"))) + .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any()); + + doReturn("fg_1").when(arrowFlightController.featuregroupController).getTblName(any()); + + Project project = new Project(); + project.setName("project"); + + Featurestore featurestore = new Featurestore(); + featurestore.setProject(project); + + Featuregroup fg = new Featuregroup(); + fg.setFeaturegroupType(FeaturegroupType.CACHED_FEATURE_GROUP); + fg.setName("fg"); + fg.setVersion(1); + fg.setFeaturestore(featurestore); + + // Act + String result = arrowFlightController.getArrowFlightQuery(fg, null, null, "project.fg_1", 1); + + // Assert + Assert.assertEquals("{" + + "\"query_string\":\"SELECT 'id' from 'project.fg_1'\"," + + "\"features\":{\"project.fg_1\":[\"id\"]}," + + "\"filters\":null," + + "\"connectors\":{\"project.fg_1\":{" + + "\"type\":\"hudi\"," + + "\"options\":null," + + "\"query\":null," + + "\"alias\":null," + + "\"filters\":null" + + "}}" + + "}", result); + } + + @Test + public void testGetArrowFlightQueryStreamFg() throws Exception { + // Arrange + doReturn("SELECT 'id' from 'project.fg_1'").when(arrowFlightController.featuregroupController) + .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt()); + + doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int"))) + .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any()); + + doReturn("fg_1").when(arrowFlightController.featuregroupController).getTblName(any()); + + Project project = new Project(); + project.setName("project"); + + Featurestore featurestore = new Featurestore(); + featurestore.setProject(project); + + Featuregroup fg = new Featuregroup(); + fg.setFeaturegroupType(FeaturegroupType.STREAM_FEATURE_GROUP); + fg.setName("fg"); + fg.setVersion(1); + fg.setFeaturestore(featurestore); + + // Act + String result = arrowFlightController.getArrowFlightQuery(fg, null, null, "project.fg_1", 1); + + // Assert + Assert.assertEquals("{" + + "\"query_string\":\"SELECT 'id' from 'project.fg_1'\"," + + "\"features\":{\"project.fg_1\":[\"id\"]}," + + "\"filters\":null," + + "\"connectors\":{\"project.fg_1\":{" + + "\"type\":\"hudi\"," + + "\"options\":null," + + "\"query\":null," + + "\"alias\":null," + + "\"filters\":null" + + "}}" + + "}", result); + } + + @Test + public void testGetArrowFlightQueryOndemandFgSnowflake() throws Exception { + // Arrange + doReturn("SELECT 'id' from 'fg_1'").when(arrowFlightController.featuregroupController) + .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt()); + + doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int"))) + .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any()); + + String tbl = "fg_1"; + doReturn(tbl).when(arrowFlightController.featuregroupController).getTblName(any()); + + doReturn("test_password").when(arrowFlightController.storageConnectorUtil).getSecret(any(), any()); + + Project project = new Project(); + project.setName("project"); + + Featurestore featurestore = new Featurestore(); + featurestore.setProject(project); + + FeaturestoreSnowflakeConnector snowflakeConnector = new FeaturestoreSnowflakeConnector(); + snowflakeConnector.setDatabaseUser("test_user"); + snowflakeConnector.setUrl("https://test_account.snowflakecomputing.com"); + snowflakeConnector.setDatabaseName("test_dbname"); + snowflakeConnector.setDatabaseSchema("test_dbschema"); + snowflakeConnector.setPwdSecret(new Secret()); + snowflakeConnector.setWarehouse("test_warehouse"); + snowflakeConnector.setApplication("test_application"); + + FeaturestoreConnector connector = new FeaturestoreConnector(); + connector.setConnectorType(FeaturestoreConnectorType.SNOWFLAKE); + connector.setSnowflakeConnector(snowflakeConnector); + + OnDemandFeaturegroup onDemandFg = new OnDemandFeaturegroup(); + onDemandFg.setQuery("SELECT * FROM HOUSEHOLD_DEMOGRAPHICS;"); + onDemandFg.setFeaturestoreConnector(connector); + + Featuregroup fg = new Featuregroup(); + fg.setFeaturegroupType(FeaturegroupType.ON_DEMAND_FEATURE_GROUP); + fg.setOnDemandFeaturegroup(onDemandFg); + fg.setName("fg"); + fg.setVersion(1); + fg.setFeaturestore(featurestore); + + // Act + String result = arrowFlightController.getArrowFlightQuery(fg, null, null, tbl, 1); + + // Assert + Assert.assertEquals("{" + + "\"query_string\":\"SELECT 'id' from 'fg_1'\"," + + "\"features\":{\"fg_1\":[\"id\"]}," + + "\"filters\":null," + + "\"connectors\":{\"fg_1\":{" + + "\"type\":\"SNOWFLAKE\"," + + "\"options\":{\"database\":\"test_dbname/test_dbschema\",\"password\":\"test_password\",\"application\":\"test_application\",\"warehouse\":\"test_warehouse\",\"user\":\"test_user\",\"account\":\"test_account\"}," + + "\"query\":\"SELECT * FROM HOUSEHOLD_DEMOGRAPHICS\"," + + "\"alias\":\"fg_1\"," + + "\"filters\":null" + + "}}" + + "}", result); + } + + @Test + public void testGetArrowFlightQueryOndemandFgBigQuery() throws Exception { + // Arrange + doReturn("SELECT 'id' from 'fg_1'").when(arrowFlightController.featuregroupController) + .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt()); + + doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int"))) + .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any()); + + String tbl = "fg_1"; + doReturn(tbl).when(arrowFlightController.featuregroupController).getTblName(any()); + + Project project = new Project(); + project.setName("project"); + + Featurestore featurestore = new Featurestore(); + featurestore.setProject(project); + + FeatureStoreBigqueryConnector bigqueryConnector = new FeatureStoreBigqueryConnector(); + bigqueryConnector.setKeyPath("test_keypath"); + bigqueryConnector.setQueryProject("test_project"); + bigqueryConnector.setDataset("test_dataset"); + bigqueryConnector.setParentProject("test_parent_project"); + + FeaturestoreConnector connector = new FeaturestoreConnector(); + connector.setConnectorType(FeaturestoreConnectorType.BIGQUERY); + connector.setBigqueryConnector(bigqueryConnector); + + OnDemandFeaturegroup onDemandFg = new OnDemandFeaturegroup(); + onDemandFg.setQuery("SELECT * FROM HOUSEHOLD_DEMOGRAPHICS;"); + onDemandFg.setFeaturestoreConnector(connector); + + Featuregroup fg = new Featuregroup(); + fg.setFeaturegroupType(FeaturegroupType.ON_DEMAND_FEATURE_GROUP); + fg.setOnDemandFeaturegroup(onDemandFg); + fg.setName("fg"); + fg.setVersion(1); + fg.setFeaturestore(featurestore); + + // Act + String result = arrowFlightController.getArrowFlightQuery(fg, null, null, tbl, 1); + + // Assert + Assert.assertEquals("{" + + "\"query_string\":\"SELECT 'id' from 'fg_1'\"," + + "\"features\":{\"fg_1\":[\"id\"]}," + + "\"filters\":null," + + "\"connectors\":{\"fg_1\":{" + + "\"type\":\"BIGQUERY\"," + + "\"options\":{\"parent_project\":\"test_parent_project\",\"project_id\":\"test_project\",\"dataset_id\":\"test_dataset\",\"key_path\":\"test_keypath\"}," + + "\"query\":\"SELECT * FROM HOUSEHOLD_DEMOGRAPHICS\"," + + "\"alias\":\"fg_1\"," + + "\"filters\":null" + + "}}" + + "}", result); + } + + @Test + public void testGetArrowFlightQueryOndemandFgBadConnector() throws Exception { + // Arrange + doReturn("SELECT 'id' from 'fg_1'").when(arrowFlightController.featuregroupController) + .getOfflineFeaturegroupQuery(any(), any(), any(), any(), anyInt()); + + doReturn(Collections.singletonList(new FeatureGroupFeatureDTO("id", "int"))) + .when(arrowFlightController.featuregroupController).getFeatures(any(), any(), any()); + + String tbl = "fg_1"; + + Project project = new Project(); + project.setName("project"); + + Featurestore featurestore = new Featurestore(); + featurestore.setProject(project); + + FeaturestoreConnector connector = new FeaturestoreConnector(); + connector.setConnectorType(FeaturestoreConnectorType.S3); + + OnDemandFeaturegroup onDemandFg = new OnDemandFeaturegroup(); + onDemandFg.setQuery("SELECT * FROM HOUSEHOLD_DEMOGRAPHICS;"); + onDemandFg.setFeaturestoreConnector(connector); + + Featuregroup fg = new Featuregroup(); + fg.setFeaturegroupType(FeaturegroupType.ON_DEMAND_FEATURE_GROUP); + fg.setOnDemandFeaturegroup(onDemandFg); + fg.setName("fg"); + fg.setVersion(1); + fg.setFeaturestore(featurestore); + + // Act + thrown.expect(FeaturestoreException.class); + arrowFlightController.getArrowFlightQuery(fg, null, null, tbl, 1); + + // Assert + } +} diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java index 69546a8dd8..09e203d62d 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/TestFeatureGroupController.java @@ -17,7 +17,6 @@ package io.hops.hopsworks.common.featurestore.featuregroup; import io.hops.hopsworks.common.featurestore.feature.FeatureGroupFeatureDTO; -import io.hops.hopsworks.common.featurestore.featuregroup.cached.CachedFeaturegroupDTO; import io.hops.hopsworks.exceptions.FeaturestoreException; import org.junit.Before; import org.junit.Rule; @@ -48,5 +47,5 @@ public void testVerifyFeaturesNoDefaultValue() throws Exception { thrown.expect(FeaturestoreException.class); featuregroupController.verifyFeaturesNoDefaultValue(features); } - + } diff --git a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java index 1ee85ddb44..860ecd4b3e 100644 --- a/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java +++ b/hopsworks-common/src/test/io/hops/hopsworks/common/featurestore/featuregroup/cached/TestCachedFeatureGroupController.java @@ -21,9 +21,6 @@ import io.hops.hopsworks.common.featurestore.featuregroup.stream.StreamFeatureGroupDTO; import io.hops.hopsworks.exceptions.FeaturestoreException; import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.cached.TimeTravelFormat; -import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.dialect.HiveSqlDialect; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -36,7 +33,6 @@ public class TestCachedFeatureGroupController { List features = new ArrayList<>(); List features2 = new ArrayList<>(); - List partitionFeatures = new ArrayList<>(); private CachedFeaturegroupController cachedFeaturegroupController = new CachedFeaturegroupController(); @@ -52,50 +48,6 @@ public void setup() { features2 = new ArrayList<>(); features2.add(new FeatureGroupFeatureDTO("part_param", "String", "", true, false)); features2.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false, false)); - - partitionFeatures.add(new FeatureGroupFeatureDTO("part_param", "Integer", "", false, true)); - partitionFeatures.add(new FeatureGroupFeatureDTO("part_param2", "String", "", false, true)); - partitionFeatures.add(new FeatureGroupFeatureDTO("part_param3", "String", "", false, true)); - } - - @Test - public void testPreviewWhereSingle() throws Exception { - String queryPart = "part_param=3"; - String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures) - .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); - Assert.assertEquals("`part_param` = 3", output); - } - - @Test - public void testPreviewWhereDouble() throws Exception { - String queryPart = "part_param=3/part_param2=hello"; - String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures) - .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); - Assert.assertEquals("`part_param` = 3 AND `part_param2` = 'hello'", output); - } - - @Test - public void testPreviewWhereDoubleSpace() throws Exception { - String queryPart = "part_param2=3 4/part_param3=hello"; - String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures) - .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); - Assert.assertEquals("`part_param2` = '3 4' AND `part_param3` = 'hello'", output); - } - - @Test - public void testPreviewWhereDoubleEquals() throws Exception { - String queryPart = "part_param2=3=4/part_param3=hello"; - String output = cachedFeaturegroupController.getWhereCondition(queryPart, partitionFeatures) - .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); - Assert.assertEquals("`part_param2` = '3=4' AND `part_param3` = 'hello'", output); - } - - @Test - public void testPreviewWhereNoPartitionColumn() throws Exception { - String queryPart = "part_param=3"; - thrown.expect(FeaturestoreException.class); - String output = cachedFeaturegroupController.getWhereCondition(queryPart, features) - .toSqlString(new HiveSqlDialect(SqlDialect.EMPTY_CONTEXT)).getSql(); } @Test diff --git a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java index 6ef1542abf..15869c952d 100644 --- a/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java +++ b/hopsworks-rest-utils/src/main/java/io/hops/hopsworks/restutils/RESTCodes.java @@ -1353,7 +1353,7 @@ public enum FeaturestoreErrorCode implements RESTErrorCode { Response.Status.BAD_REQUEST), CERTIFICATES_NOT_FOUND(18, "Could not find user certificates for authenticating with Hive Feature Store", Response.Status.INTERNAL_SERVER_ERROR), - COULD_NOT_INITIATE_HIVE_CONNECTION(19, "Could not initiate connecton to Hive Server", + COULD_NOT_INITIATE_HIVE_CONNECTION(19, "Could not initiate connection to Hive Server", Response.Status.INTERNAL_SERVER_ERROR), HIVE_UPDATE_STATEMENT_ERROR(20, "Hive Update Statement failed", Response.Status.INTERNAL_SERVER_ERROR), @@ -1415,7 +1415,7 @@ public enum FeaturestoreErrorCode implements RESTErrorCode { FEATURESTORE_ONLINE_NOT_ENABLED(64, "Online featurestore not enabled", Response.Status.BAD_REQUEST), SYNC_TABLE_NOT_FOUND(65, "The Hive Table to Sync with the feature store was not " + "found in the metastore", Response.Status.BAD_REQUEST), - COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE(66, "Could not initiate connecton to " + + COULD_NOT_INITIATE_MYSQL_CONNECTION_TO_ONLINE_FEATURESTORE(66, "Could not initiate connection to " + "MySQL Server", Response.Status.INTERNAL_SERVER_ERROR), MYSQL_JDBC_UPDATE_STATEMENT_ERROR(67, "MySQL JDBC Update Statement failed", Response.Status.INTERNAL_SERVER_ERROR), @@ -1680,6 +1680,15 @@ public enum FeaturestoreErrorCode implements RESTErrorCode { HELPER_COL_NOT_FOUND(225, "Could not find helper column in feature view schema", Response.Status.NOT_FOUND), OPENSEARCH_DEFAULT_EMBEDDING_INDEX_SUFFIX_NOT_DEFINED(226, "Opensearch default embedding index not defined", + Response.Status.INTERNAL_SERVER_ERROR), + FEATURE_GROUP_COMMIT_NOT_FOUND(227, "Feature group commit not found", Response.Status.BAD_REQUEST), + STATISTICS_NOT_FOUND(228, "Statistics wasn't found.", Response.Status.NOT_FOUND), + INVALID_STATISTICS_WINDOW_TIMES(229, "Window times provided are invalid", Response.Status.BAD_REQUEST), + COULD_NOT_DELETE_VECTOR_DB_INDEX(230, "Could not delete index from vector db.", + Response.Status.INTERNAL_SERVER_ERROR), + COULD_NOT_INITIATE_ARROW_FLIGHT_CONNECTION(231, "Could not initiate connection to Arrow Flight server", + Response.Status.INTERNAL_SERVER_ERROR), + ARROW_FLIGHT_READ_QUERY_ERROR(232, "Arrow Flight server Read Query failed", Response.Status.INTERNAL_SERVER_ERROR); private int code; diff --git a/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java b/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java index e3780ab2fd..58eb05f3aa 100644 --- a/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java +++ b/hopsworks-service-discovery/src/main/java/io/hops/hopsworks/servicediscovery/tags/FlyingDuckTags.java @@ -16,7 +16,8 @@ package io.hops.hopsworks.servicediscovery.tags; public enum FlyingDuckTags implements ServiceTags { - monitoring("monitoring"); + monitoring("monitoring"), + server("server"); private final String value; FlyingDuckTags(String value) { diff --git a/pom.xml b/pom.xml index ad4b5e24fe..4421b1cc6d 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ 20231013 2.5.2 1.18.6 - 3.9.1 + 3.23.1 0.9.12 0.16.0 0.6-SNAPSHOT @@ -148,6 +148,7 @@ 1.6.2 3.8.3 9.2.1 + 14.0.1 @@ -226,7 +227,6 @@ com.google.protobuf protobuf-java ${protobuf-java.version} - provided com.google.zxing @@ -521,6 +521,10 @@ io.netty netty-common + + com.google.flatbuffers + flatbuffers-java + @@ -868,6 +872,11 @@ javax.json.bind-api 1.0 + + org.apache.arrow + flight-core + ${arrow.version} +