From 10dfc992d6728ed57d8fea07252fd181bf0509b5 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Tue, 10 Jan 2023 19:34:07 +0530 Subject: [PATCH] [FLINK-30481][FLIP-277] GlueCatalog Implementation --- flink-catalog-aws-glue/pom.xml | 120 ++ .../flink/table/catalog/glue/GlueCatalog.java | 1030 +++++++++++ .../catalog/glue/GlueCatalogConstants.java | 249 +++ .../catalog/glue/GlueCatalogFactory.java | 71 + .../glue/GlueCatalogFactoryOptions.java | 37 + .../catalog/glue/GlueCatalogOptions.java | 157 ++ .../catalog/glue/util/AwsClientFactories.java | 56 + .../catalog/glue/util/AwsClientFactory.java | 44 + .../catalog/glue/util/AwsProperties.java | 241 +++ .../table/catalog/glue/util/GlueOperator.java | 1639 +++++++++++++++++ .../org.apache.flink.table.factories.Factory | 16 + .../flink/table/catalog/GlueCatalogTest.java | 22 + pom.xml | 2 +- 13 files changed, 3683 insertions(+), 1 deletion(-) create mode 100644 flink-catalog-aws-glue/pom.xml create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java create mode 100644 flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java create mode 100644 flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java diff --git a/flink-catalog-aws-glue/pom.xml b/flink-catalog-aws-glue/pom.xml new file mode 100644 index 000000000..71896f5e7 --- /dev/null +++ b/flink-catalog-aws-glue/pom.xml @@ -0,0 +1,120 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-connector-aws-parent + 4.1-SNAPSHOT + + + flink-catalog-aws-glue + Flink : Catalog : AWS : Glue + jar + + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + + org.apache.flink + flink-connector-aws-base + ${project.version} + + + + software.amazon.awssdk + glue + ${aws.sdkv2.version} + + + + software.amazon.awssdk + apache-client + ${aws.sdkv2.version} + + + + software.amazon.awssdk + url-connection-client + ${aws.sdkv2.version} + + + + + + org.apache.flink + flink-architecture-tests-test + test + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-api-java + ${flink.version} + test-jar + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + + + org.apache.flink:flink-connector-base + org.apache.flink:flink-connector-aws-base + org.apache.flink:flink-catalog-aws-glue + software.amazon.awssdk:* + + + + + + + + + diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java new file mode 100644 index 000000000..3f089abbe --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java @@ -0,0 +1,1030 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + // setLocationUri for the database level + String locationUri = ""; + if (glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) { + locationUri = glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + } + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(glueProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database, "Database cannot be null."); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "DatabaseName cannot be null or empty"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "DatabaseName cannot be null or empty"); + checkNotNull(newDatabase, "Database cannot be Empty"); + + CatalogDatabase existingDatabase = glueOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueOperator.updateGlueDatabase(name, newDatabase); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Get the names of all databases in this catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listDatabases() throws CatalogException { + return glueOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + databaseName = databaseName.toLowerCase(Locale.ROOT); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "DatabaseName cannot be null or empty"); + return glueOperator.getDatabase(databaseName); + } + + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + try { + CatalogDatabase database = getDatabase(databaseName); + return database != null; + } catch (DatabaseNotExistException e) { + return false; + } + } + + // ------ tables ------ + + /** + * Creates a new table or view. + * + *

The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be created + * @param table the table definition + * @param ignoreIfExists flag to specify behavior when a table or view already exists at the + * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do + * nothing. + * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in tablePath doesn't exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + checkNotNull(table, "Table must not be null."); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + + if (tableExists(tablePath)) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(getName(), tablePath); + } + } else { + glueOperator.createGlueTable(tablePath, table, false); + } + } + + /** + * Modifies an existing table or view. Note that the new and old {@link CatalogBaseTable} must + * be of the same kind. For example, this doesn't allow altering a regular table to partitioned + * table, or altering a view to a table, and vice versa. + * + *

The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be modified + * @param newTable the new table definition + * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + + checkNotNull(tablePath, "tablePath cannot be null"); + checkNotNull(newTable, "Table must not be null."); + + CatalogBaseTable existingTable = getTable(tablePath); + + if (existingTable != null) { + if (existingTable.getTableKind() != newTable.getTableKind()) { + throw new CatalogException( + String.format( + "Table types don't match. Existing table is '%s' and new table is '%s'.", + existingTable.getTableKind(), newTable.getTableKind())); + } + glueOperator.alterGlueTable(tablePath, newTable, false); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + + // ------ tables and views ------ + + /** + * Drop a table or view. + * + * @param tablePath Path of the table or view to be dropped + * @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table or view does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + if (tableExists(tablePath)) { + glueOperator.dropGlueTable(tablePath); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + + /** + * Rename an existing table or view. + * + * @param tablePath Path of the table or view to be renamed + * @param newTableName the new name of the table or view + * @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + checkNotNull(tablePath, "tablePath cannot be null"); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(newTableName), "newTableName cannot be null"); + + if (tableExists(tablePath)) { + ObjectPath newTablePath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + if (tableExists(newTablePath)) { + throw new TableAlreadyExistException(getName(), newTablePath); + } + glueOperator.renameGlueTable(tablePath, newTablePath); + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(getName(), tablePath); + } + } + + /** + * Get names of all tables and views under this database. An empty list is returned if none + * exists. + * + * @param databaseName fully qualified database name. + * @return a list of the names of all tables and views in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "DatabaseName cannot be null or empty"); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + List results = + glueOperator.getGlueTableList( + databaseName, CatalogBaseTable.TableKind.TABLE.name()); + results.addAll(listViews(databaseName)); + return results; + } + + /** + * Get names of all views under this database. An empty list is returned if none exists. + * + * @param databaseName the name of the given database + * @return a list of the names of all views in the given database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "DatabaseName cannot be null or empty"); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + return glueOperator.getGlueTableList(databaseName, CatalogBaseTable.TableKind.VIEW.name()); + } + + /** + * Returns a {@link CatalogTable} or {@link CatalogView} identified by the given {@link + * ObjectPath}. The framework will resolve the metadata objects when necessary. + * + * @param tablePath Path of the table or view + * @return The requested table or view + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + return glueOperator.getCatalogBaseTableFromGlueTable(glueOperator.getGlueTable(tablePath)); + } + + /** + * Check if a table or view exists in this catalog. + * + * @param tablePath Path of the table or view + * @return true if the given table exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + return databaseExists(tablePath.getDatabaseName()) + && glueOperator.glueTableExists(tablePath); + } + + // ------ functions ------ + + /** + * Create a function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function + * @param function the function to be created + * @param ignoreIfExists flag to specify behavior if a function with the given name already + * exists: if set to false, it throws a FunctionAlreadyExistException, if set to true, + * nothing happens. + * @throws FunctionAlreadyExistException if the function already exist + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createFunction(ObjectPath path, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(path); + checkNotNull(function); + + ObjectPath functionPath = normalize(path); + if (!databaseExists(functionPath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName()); + } + + if (!functionExists(functionPath)) { + glueOperator.createGlueFunction(functionPath, function); + } else { + if (!ignoreIfExists) { + throw new FunctionAlreadyExistException(getName(), functionPath); + } + } + } + + private ObjectPath normalize(ObjectPath path) { + return new ObjectPath( + path.getDatabaseName(), FunctionIdentifier.normalizeName(path.getObjectName())); + } + + /** + * Modify an existing function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function + * @param newFunction the function to be modified + * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to + * false, throw an exception if set to true, nothing happens + * @throws FunctionNotExistException if the function does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterFunction( + ObjectPath path, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + checkNotNull(path); + checkNotNull(newFunction); + + ObjectPath functionPath = normalize(path); + + CatalogFunction existingFunction = getFunction(functionPath); + + if (existingFunction != null) { + if (existingFunction.getClass() != newFunction.getClass()) { + throw new CatalogException( + String.format( + "Function types don't match. Existing function is '%s' and new function is '%s'.", + existingFunction.getClass().getName(), + newFunction.getClass().getName())); + } + + glueOperator.alterGlueFunction(functionPath, newFunction); + } else if (!ignoreIfNotExists) { + throw new FunctionNotExistException(getName(), functionPath); + } + } + + /** + * Drop a function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function to be dropped + * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to + * false, throw an exception if set to true, nothing happens + * @throws FunctionNotExistException if the function does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropFunction(ObjectPath path, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + checkNotNull(path); + + ObjectPath functionPath = normalize(path); + + if (functionExists(functionPath)) { + glueOperator.dropGlueFunction(functionPath); + } else if (!ignoreIfNotExists) { + throw new FunctionNotExistException(getName(), functionPath); + } + } + + /** + * List the names of all functions in the given database. An empty list is returned if none is + * registered. + * + * @param databaseName name of the database. + * @return a list of the names of the functions in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listFunctions(String databaseName) + throws DatabaseNotExistException, CatalogException { + + databaseName = databaseName.toLowerCase(Locale.ROOT); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "databaseName cannot be null or empty"); + + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + return glueOperator.listGlueFunctions(databaseName); + } + + /** + * Get the function. Function name should be handled in a case-insensitive way. + * + * @param path path of the function + * @return the requested function + * @throws FunctionNotExistException if the function does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogFunction getFunction(ObjectPath path) + throws FunctionNotExistException, CatalogException { + checkNotNull(path); + + ObjectPath functionPath = normalize(path); + + if (!functionExists(functionPath)) { + throw new FunctionNotExistException(getName(), functionPath); + } else { + return glueOperator.getGlueFunction(functionPath); + } + } + + /** + * Check whether a function exists or not. Function name should be handled in a case-insensitive + * way. + * + * @param path path of the function + * @return true if the function exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean functionExists(ObjectPath path) throws CatalogException { + checkNotNull(path); + ObjectPath functionPath = normalize(path); + return databaseExists(functionPath.getDatabaseName()) + && glueOperator.glueFunctionExists(functionPath); + } + + /** + * Create a partition. + * + * @param tablePath path of the table. + * @param partitionSpec partition spec of the partition + * @param partition the partition to add. + * @param ignoreIfExists flag to specify behavior if a table with the given name already exists: + * if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens. + * @throws TableNotExistException thrown if the target table does not exist + * @throws TableNotPartitionedException thrown if the target table is not partitioned + * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid + * @throws PartitionAlreadyExistsException thrown if the target partition already exists + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "PartitionSpec cannot be null"); + checkNotNull(partition, "Partition cannot be null"); + + Table glueTable = glueOperator.getGlueTable(tablePath); + glueOperator.ensurePartitionedTable(tablePath, glueTable); + + if (!partitionExists(tablePath, partitionSpec)) { + glueOperator.createGluePartition(glueTable, partitionSpec, partition); + } else { + if (!ignoreIfExists) { + throw new PartitionAlreadyExistsException(getName(), tablePath, partitionSpec); + } + } + } + + /** + * Get CatalogPartitionSpec of all partitions of the table. + * + * @param tablePath path of the table + * @return a list of CatalogPartitionSpec of the table + * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotPartitionedException thrown if the table is not partitioned + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + + checkNotNull(tablePath, "Table path cannot be null"); + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + if (isPartitionedTable(tablePath)) { + return glueOperator.listPartitions(tablePath); + } + throw new TableNotPartitionedException(getName(), tablePath); + } + + private boolean isPartitionedTable(ObjectPath tablePath) { + CatalogBaseTable table; + try { + table = getTable(tablePath); + if (table instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) table; + return catalogTable.isPartitioned(); + } else { + return false; + } + } catch (TableNotExistException e) { + return false; + } + } + + /** + * Get CatalogPartitionSpec of all partitions that is under the given CatalogPartitionSpec in + * the table. + * + * @param tablePath path of the table + * @param partitionSpec the partition spec to list + * @return a list of CatalogPartitionSpec that is under the given CatalogPartitionSpec in the + * table + * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotPartitionedException thrown if the table is not partitioned + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec); + + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + if (!isPartitionedTable(tablePath)) { + throw new TableNotPartitionedException(getName(), tablePath); + } + return glueOperator.listGluePartitions(tablePath, partitionSpec); + } + + /** + * Get CatalogPartitionSpec of partitions by expression filters in the table. + * + *

NOTE: For FieldReferenceExpression, the field index is based on schema of this table + * instead of partition columns only. + * + *

The passed in predicates have been translated in conjunctive form. + * + *

If catalog does not support this interface at present, throw an {@link + * UnsupportedOperationException} directly. If the catalog does not have a valid filter, throw + * the {@link UnsupportedOperationException} directly. Planner will fallback to get all + * partitions and filter by itself. + * + * @param tablePath path of the table + * @param filters filters to push down filter to catalog + * @return a list of CatalogPartitionSpec that is under the given CatalogPartitionSpec in the + * table + * @throws TableNotExistException thrown if the table does not exist in the catalog + * @throws TableNotPartitionedException thrown if the table is not partitioned + * @throws CatalogException in case of any runtime exception + */ + @Override + public List listPartitionsByFilter( + ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + if (!isPartitionedTable(tablePath)) { + throw new TableNotPartitionedException(getName(), tablePath); + } + + return glueOperator.listGluePartitionsByFilter(tablePath, filters); + } + + /** + * Get a partition of the given table. The given partition spec keys and values need to be + * matched exactly for a result. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of partition to get + * @return the requested partition + * @throws PartitionNotExistException thrown if the partition doesn't exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + + Partition gluePartition = glueOperator.getGluePartition(tablePath, partitionSpec); + if (gluePartition == null) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + Map properties = + new HashMap<>(gluePartition.storageDescriptor().parameters()); + properties.put( + GlueCatalogConstants.LOCATION_URI, gluePartition.storageDescriptor().location()); + String comment = properties.remove(GlueCatalogConstants.COMMENT); + return new CatalogPartitionImpl(properties, comment); + } + + /** + * Check whether a partition exists or not. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition to check + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + + if (!databaseExists(tablePath.getDatabaseName()) || !tableExists(tablePath)) { + throw new CatalogException("Database/Table Doesn't exists."); + } + return glueOperator.gluePartitionExists(tablePath, partitionSpec); + } + + /** + * Drop a partition. + * + * @param tablePath path of the table. + * @param partitionSpec partition spec of the partition to drop + * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException thrown if the target partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec); + + if (partitionExists(tablePath, partitionSpec)) { + glueOperator.dropGluePartition(tablePath, partitionSpec); + } else if (!ignoreIfNotExists) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + } + + /** + * Alter a partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @param newPartition new partition to replace the old one + * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException thrown if the target partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(partitionSpec, "CatalogPartitionSpec cannot be null"); + checkNotNull(newPartition, "New partition cannot be null"); + CatalogPartition existingPartition = getPartition(tablePath, partitionSpec); + if (existingPartition != null) { + glueOperator.alterGluePartition(tablePath, partitionSpec, newPartition); + } else if (!ignoreIfNotExists) { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + } + + /** + * Get the statistics of a table. + * + * @param tablePath path of the table + * @return statistics of the given table + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return null; + } + + /** + * Get the column statistics of a table. + * + * @param tablePath path of the table + * @return column statistics of the given table + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return null; + } + + /** + * Get the statistics of a partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @return statistics of the given partition + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return null; + } + + /** + * Get the column statistics of a partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @return column statistics of the given partition + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return null; + } + + /** + * Update the statistics of a table. + * + * @param tablePath path of the table + * @param tableStatistics new statistics to update + * @param ignoreIfNotExists flag to specify behavior if the table does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } + + /** + * Update the column statistics of a table. + * + * @param tablePath path of the table + * @param columnStatistics new column statistics to update + * @param ignoreIfNotExists flag to specify behavior if the table does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws TableNotExistException if the table does not exist in the catalog + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } + + /** + * Update the statistics of a table partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition + * @param partitionStatistics new statistics to update + * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } + + /** + * Update the column statistics of a table partition. + * + * @param tablePath path of the table + * @param partitionSpec partition spec of the partition @@param columnStatistics new column + * statistics to update + * @param columnStatistics column related statistics + * @param ignoreIfNotExists flag to specify behavior if the partition does not exist: if set to + * false, throw an exception, if set to true, nothing happens. + * @throws PartitionNotExistException if the partition does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("Operation with Statistics not supported."); + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java new file mode 100644 index 000000000..7fea020c0 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.catalog.glue.util.AwsClientFactory; + +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; + +import java.util.regex.Pattern; + +/** Configs for catalog meta-objects in {@link GlueCatalog}. */ +public class GlueCatalogConstants { + + public static final String COMMENT = "comment"; + public static final String DEFAULT_SEPARATOR = ":"; + public static final String LOCATION_SEPARATOR = "/"; + public static final String LOCATION_URI = "location-uri"; + public static final String AND = "and"; + public static final String NEXT_LINE = "\n"; + public static final String SPACE = " "; + + public static final String TABLE_OWNER = "owner"; + public static final String TABLE_INPUT_FORMAT = "table.input.format"; + public static final String TABLE_OUTPUT_FORMAT = "table.output.format"; + + public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:"; + public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:"; + public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:"; + + public static final String FLINK_CATALOG = "FLINK_CATALOG"; + + public static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$"); + public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE EXCEPTION"; + public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T EXISTS"; + public static final String DEFAULT_PARTITION_NAME = "__GLUE_DEFAULT_PARTITION__"; + + // ---- glue configs + /** + * The type of {@link software.amazon.awssdk.http.SdkHttpClient} implementation used by {@link + * AwsClientFactory} If set, all AWS clients will use this specified HTTP client. If not set, + * HTTP_CLIENT_TYPE_DEFAULT will be used. For specific types supported, see HTTP_CLIENT_TYPE_* + * defined below. + */ + public static final String HTTP_CLIENT_TYPE = "http-client.type"; + + /** + * Used to configure the connection acquisition timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS = + "http-client.apache.connection-acquisition-timeout-ms"; + + /** + * If Glue should skip name validations It is recommended to stick to Glue best practice in + * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations + * are Hive compatible. This is only added for users that have existing conventions using + * non-standard characters. When database name and table name validation are skipped, there is + * no guarantee that downstream systems would all support the names. + */ + public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION = "glue.skip-name-validation"; + + /** + * Used to configure the connection max idle time in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS = + "http-client.apache.connection-max-idle-time-ms"; + + /** + * Used to configure the connection time to live in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS = + "http-client.apache.connection-time-to-live-ms"; + + /** + * Used to configure the connection timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS = + "http-client.apache.connection-timeout-ms"; + + /** + * Used to configure whether to enable the expect continue setting for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

In default, this is disabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED = + "http-client.apache.expect-continue-enabled"; + + /** + * Used to configure the max connections number for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_MAX_CONNECTIONS = + "http-client.apache.max-connections"; + + /** + * Used to configure the socket timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS = + "http-client.apache.socket-timeout-ms"; + + /** + * Used to configure whether to enable the tcp keep alive setting for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE. + * + *

In default, this is disabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED = + "http-client.apache.tcp-keep-alive-enabled"; + + /** + * Used to configure whether to use idle connection reaper for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE. + * + *

In default, this is enabled. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED = + "http-client.apache.use-idle-connection-reaper-enabled"; + + /** + * Configure an alternative endpoint of the Glue service for GlueCatalog to access. + * + *

This could be used to use GlueCatalog with any glue-compatible metastore service that has + * a different endpoint + */ + public static final String GLUE_CATALOG_ENDPOINT = "glue.endpoint"; + + /** + * The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue + * automatically uses the caller's AWS account ID by default. + * + *

For more details, see + * https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html + */ + public static final String GLUE_CATALOG_ID = "glue.id"; + + /** + * The account ID used in a Glue resource ARN, e.g. + * arn:aws:glue:us-east-1:1000000000000:table/db1/table1 + */ + public static final String GLUE_ACCOUNT_ID = "glue.account-id"; + + /** + * If Glue should skip archiving an old table version when creating a new version in a commit. + * By default Glue archives all old table versions after an UpdateTable call, but Glue has a + * default max number of archived table versions (can be increased). So for streaming use case + * with lots of commits, it is recommended to set this value to true. + */ + public static final String GLUE_CATALOG_SKIP_ARCHIVE = "glue.skip-archive"; + + /** + * Used to configure the connection timeout in milliseconds for {@link + * UrlConnectionHttpClient.Builder}. This flag only works when {@link #HTTP_CLIENT_TYPE} is set + * to HTTP_CLIENT_TYPE_URLCONNECTION. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS = + "http-client.urlconnection.connection-timeout-ms"; + + /** + * Used to configure the socket timeout in milliseconds for {@link + * UrlConnectionHttpClient.Builder}. This flag only works when {@link #HTTP_CLIENT_TYPE} is set + * to HTTP_CLIENT_TYPE_URLCONNECTION. + * + *

For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/urlconnection/UrlConnectionHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS = + "http-client.urlconnection.socket-timeout-ms"; + + // --constants + /** + * If this is set under {HTTP_CLIENT_TYPE}, {@link + * software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient} will be used as the HTTP. + * Client in {@link AwsClientFactory} + */ + public static final String HTTP_CLIENT_TYPE_URLCONNECTION = "urlconnection"; + + /** + * If this is set under {@link GlueCatalogConstants}, {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient} will be used as the HTTP Client in + * {@linkAwsClientFactory}. + */ + public static final String HTTP_CLIENT_TYPE_APACHE = "apache"; + + public static final String HTTP_CLIENT_TYPE_DEFAULT = HTTP_CLIENT_TYPE_URLCONNECTION; + + public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false; + + public static final boolean GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT = false; +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java new file mode 100644 index 000000000..d6169157c --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; + +/** Catalog factory for {@link GlueCatalog}. */ +public class GlueCatalogFactory implements CatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogFactory.class); + + @Override + public String factoryIdentifier() { + return GlueCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return GlueCatalogOptions.getAllConfigOptions(); + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + if (LOG.isDebugEnabled()) { + String msg = + context.getOptions().entrySet().stream() + .map(entry -> entry.getKey() + "-> " + entry.getValue()) + .collect(Collectors.joining("\n")); + LOG.debug(msg); + } + + return new GlueCatalog( + context.getName(), + helper.getOptions().get(GlueCatalogFactoryOptions.DEFAULT_DATABASE), + context.getConfiguration()); + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java new file mode 100644 index 000000000..52a489112 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** {@link ConfigOption}s for {@link GlueCatalog}. */ +@Internal +public class GlueCatalogFactoryOptions { + + public static final String IDENTIFIER = "glue"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(GlueCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(GlueCatalog.DEFAULT_DB); + + private GlueCatalogFactoryOptions() {} +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java new file mode 100644 index 000000000..36d1723bf --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; + +/** A collection of {@link ConfigOption} which is used in GlueCatalog. */ +public class GlueCatalogOptions extends CommonCatalogOptions { + + public static final ConfigOption LOCATION_KEY = + ConfigOptions.key(GlueCatalogConstants.LOCATION_URI).stringType().noDefaultValue(); + + public static final ConfigOption INPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption OUTPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption HTTP_CLIENT_TYPE = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_TYPE) + .stringType() + .defaultValue(GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT); + + public static final ConfigOption HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS = + ConfigOptions.key( + GlueCatalogConstants + .HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS) + .longType() + .defaultValue(0L); + + public static final ConfigOption GLUE_CATALOG_SKIP_NAME_VALIDATION = + ConfigOptions.key(GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION) + .booleanType() + .defaultValue(GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT); + + public static final ConfigOption HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS) + .longType() + .defaultValue(0L); + + public static final ConfigOption HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS) + .longType() + .defaultValue(0L); + + public static final ConfigOption HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS) + .longType() + .defaultValue(0L); + + public static final ConfigOption HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED) + .booleanType() + .defaultValue(false); + + public static final ConfigOption HTTP_CLIENT_APACHE_MAX_CONNECTIONS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_MAX_CONNECTIONS) + .intType() + .defaultValue(1); + + public static final ConfigOption HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS) + .longType() + .defaultValue(0L); + + public static final ConfigOption HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED) + .booleanType() + .defaultValue(false); + + public static final ConfigOption + HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED = + ConfigOptions.key( + GlueCatalogConstants + .HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED) + .booleanType() + .defaultValue(false); + + public static final ConfigOption GLUE_CATALOG_ENDPOINT = + ConfigOptions.key(GlueCatalogConstants.GLUE_CATALOG_ENDPOINT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption GLUE_CATALOG_ID = + ConfigOptions.key(GlueCatalogConstants.GLUE_CATALOG_ID).stringType().noDefaultValue(); + + public static final ConfigOption GLUE_ACCOUNT_ID = + ConfigOptions.key(GlueCatalogConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); + + public static final ConfigOption GLUE_CATALOG_SKIP_ARCHIVE = + ConfigOptions.key(GlueCatalogConstants.GLUE_CATALOG_SKIP_ARCHIVE) + .booleanType() + .defaultValue(false); + + public static final ConfigOption HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS) + .longType() + .defaultValue(0L); + + public static final ConfigOption HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS = + ConfigOptions.key(GlueCatalogConstants.HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS) + .longType() + .defaultValue(0L); + + public static Set> getAllConfigOptions() { + // list all config options declared above + final Set> options = new HashSet<>(); + options.add(LOCATION_KEY); + options.add(INPUT_FORMAT); + options.add(OUTPUT_FORMAT); + options.add(HTTP_CLIENT_TYPE); + options.add(HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS); + options.add(GLUE_CATALOG_SKIP_NAME_VALIDATION); + options.add(HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS); + options.add(HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS); + options.add(HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS); + options.add(HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED); + options.add(HTTP_CLIENT_APACHE_MAX_CONNECTIONS); + options.add(HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS); + options.add(HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED); + options.add(HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED); + options.add(GLUE_CATALOG_ENDPOINT); + options.add(GLUE_CATALOG_ID); + options.add(GLUE_ACCOUNT_ID); + options.add(GLUE_CATALOG_SKIP_ARCHIVE); + options.add(HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS); + options.add(HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS); + return options; + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java new file mode 100644 index 000000000..063c97cd4 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.configuration.ReadableConfig; + +import software.amazon.awssdk.services.glue.GlueClient; + +/** Default factories. */ +public class AwsClientFactories { + + private AwsClientFactories() {} + + public static AwsClientFactory factory(AwsProperties properties) { + return new DefaultAwsClientFactory(properties); + } + + static class DefaultAwsClientFactory implements AwsClientFactory { + + /** instance that holds provides. */ + private AwsProperties awsProperties; + + DefaultAwsClientFactory(AwsProperties properties) { + awsProperties = properties; + } + + @Override + public GlueClient glue() { + return GlueClient.builder() + .applyMutation(awsProperties::applyHttpClientConfigurations) + .applyMutation(awsProperties::applyGlueEndpointConfigurations) + .build(); + } + + @Override + public void initialize(ReadableConfig properties) { + this.awsProperties = new AwsProperties(properties); + } + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java new file mode 100644 index 000000000..ffcbc4532 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.configuration.ReadableConfig; + +import software.amazon.awssdk.services.glue.GlueClient; + +/** + * Interface to customize AWS clients used by Flink. A custom factory must have a no-arg. + * constructor, and use {@link #initialize(ReadableConfig)} to initialize the factory. + */ +public interface AwsClientFactory { + + /** + * create a AWS Glue client. + * + * @return glue client + */ + GlueClient glue(); + + /** + * Initialize AWS client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(ReadableConfig properties); +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java new file mode 100644 index 000000000..f4d80bf7c --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_APACHE; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_URLCONNECTION; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + private final Long httpClientUrlConnectionConnectionTimeoutMs; + + private final Long httpClientUrlConnectionSocketTimeoutMs; + + private final Long httpClientApacheConnectionAcquisitionTimeoutMs; + + private final Long httpClientApacheConnectionMaxIdleTimeMs; + + private final Long httpClientApacheConnectionTimeToLiveMs; + + private final Long httpClientApacheConnectionTimeoutMs; + + private final Boolean httpClientApacheExpectContinueEnabled; + + private final Integer httpClientApacheMaxConnections; + + private final Long httpClientApacheSocketTimeoutMs; + + private final Boolean httpClientApacheTcpKeepAliveEnabled; + + private final Boolean httpClientApacheUseIdleConnectionReaperEnabled; + + private final String glueEndpoint; + + private final String glueCatalogId; + + private final Boolean glueCatalogSkipArchive; + + private final Boolean glueCatalogSkipNameValidation; + + /** http client. */ + private String httpClientType; + + public AwsProperties(ReadableConfig properties) { + + if (properties.getOptional(GlueCatalogOptions.HTTP_CLIENT_TYPE).isPresent()) { + this.httpClientType = properties.getOptional(GlueCatalogOptions.HTTP_CLIENT_TYPE).get(); + } + + this.httpClientUrlConnectionConnectionTimeoutMs = + properties.get(GlueCatalogOptions.HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS); + + this.httpClientUrlConnectionSocketTimeoutMs = + properties.get(GlueCatalogOptions.HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS); + + this.httpClientApacheConnectionAcquisitionTimeoutMs = + properties.get( + GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS); + + this.httpClientApacheConnectionMaxIdleTimeMs = + properties.get(GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS); + + this.httpClientApacheConnectionTimeToLiveMs = + properties.get(GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS); + + this.httpClientApacheConnectionTimeoutMs = + properties.get(GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS); + + this.httpClientApacheExpectContinueEnabled = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED) + .orElse(false); + this.httpClientApacheMaxConnections = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_MAX_CONNECTIONS) + .orElse(1); + + this.httpClientApacheSocketTimeoutMs = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS) + .orElse(0L); + + this.httpClientApacheTcpKeepAliveEnabled = + properties.get(GlueCatalogOptions.HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED); + + this.httpClientApacheUseIdleConnectionReaperEnabled = + properties.get( + GlueCatalogOptions.HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED); + + this.glueEndpoint = properties.get(GlueCatalogOptions.GLUE_CATALOG_ENDPOINT); + this.glueCatalogId = properties.get(GlueCatalogOptions.GLUE_CATALOG_ID); + + this.glueCatalogSkipArchive = properties.get(GlueCatalogOptions.GLUE_CATALOG_SKIP_ARCHIVE); + + this.glueCatalogSkipNameValidation = + properties.get(GlueCatalogOptions.GLUE_CATALOG_SKIP_NAME_VALIDATION); + } + + /** + * Configure the httpClient for a client according to the HttpClientType. The two supported + * HttpClientTypes are urlconnection and apache + * + *

Sample usage: + * + *

+     *     S3Client.builder().applyMutation(awsProperties::applyHttpClientConfigurations)
+     * 
+ */ + public void applyHttpClientConfigurations(T builder) { + if (Strings.isNullOrEmpty(httpClientType)) { + httpClientType = HTTP_CLIENT_TYPE_DEFAULT; + } + switch (httpClientType) { + case HTTP_CLIENT_TYPE_URLCONNECTION: + builder.httpClientBuilder( + UrlConnectionHttpClient.builder() + .applyMutation(this::configureUrlConnectionHttpClientBuilder)); + break; + case HTTP_CLIENT_TYPE_APACHE: + builder.httpClientBuilder( + ApacheHttpClient.builder() + .applyMutation(this::configureApacheHttpClientBuilder)); + break; + default: + throw new IllegalArgumentException( + "Unrecognized HTTP client type " + httpClientType); + } + } + + @VisibleForTesting + void configureUrlConnectionHttpClientBuilder( + T builder) { + if (httpClientUrlConnectionConnectionTimeoutMs != null) { + builder.connectionTimeout( + Duration.ofMillis(httpClientUrlConnectionConnectionTimeoutMs)); + } + + if (httpClientUrlConnectionSocketTimeoutMs != null) { + builder.socketTimeout(Duration.ofMillis(httpClientUrlConnectionSocketTimeoutMs)); + } + } + + @VisibleForTesting + void configureApacheHttpClientBuilder(T builder) { + if (httpClientApacheConnectionTimeoutMs != null) { + builder.connectionTimeout(Duration.ofMillis(httpClientApacheConnectionTimeoutMs)); + } + + if (httpClientApacheSocketTimeoutMs != null) { + builder.socketTimeout(Duration.ofMillis(httpClientApacheSocketTimeoutMs)); + } + + if (httpClientApacheConnectionAcquisitionTimeoutMs != null) { + builder.connectionAcquisitionTimeout( + Duration.ofMillis(httpClientApacheConnectionAcquisitionTimeoutMs)); + } + + if (httpClientApacheConnectionMaxIdleTimeMs != null) { + builder.connectionMaxIdleTime( + Duration.ofMillis(httpClientApacheConnectionMaxIdleTimeMs)); + } + + if (httpClientApacheConnectionTimeToLiveMs != null) { + builder.connectionTimeToLive(Duration.ofMillis(httpClientApacheConnectionTimeToLiveMs)); + } + + if (httpClientApacheExpectContinueEnabled != null) { + builder.expectContinueEnabled(httpClientApacheExpectContinueEnabled); + } + + if (httpClientApacheMaxConnections != null) { + builder.maxConnections(httpClientApacheMaxConnections); + } + + if (httpClientApacheTcpKeepAliveEnabled != null) { + builder.tcpKeepAlive(httpClientApacheTcpKeepAliveEnabled); + } + + if (httpClientApacheUseIdleConnectionReaperEnabled != null) { + builder.useIdleConnectionReaper(httpClientApacheUseIdleConnectionReaperEnabled); + } + } + + /** + * Override the endpoint for a glue client. + * + *

Sample usage: + * + *

+     *     GlueClient.builder().applyMutation(awsProperties::applyS3EndpointConfigurations)
+     * 
+ */ + public void applyGlueEndpointConfigurations(T builder) { + configureEndpoint(builder, glueEndpoint); + } + + private void configureEndpoint(T builder, String endpoint) { + if (endpoint != null) { + builder.endpointOverride(URI.create(endpoint)); + } + } + + /* + * Getter for glue catalogId. + */ + public String getGlueCatalogId() { + return glueCatalogId; + } +} diff --git a/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java new file mode 100644 index 000000000..725bc53e8 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java @@ -0,0 +1,1639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.glue.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.ManagedTableFactory; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.CreatePartitionResponse; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.CreateTableResponse; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionResponse; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableResponse; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.PrincipalType; +import software.amazon.awssdk.services.glue.model.ResourceUri; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableResponse; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; +import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** + * Utilities for Glue catalog operations. Important Note : + * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/ + */ +public class GlueOperator { + + /** Defines the location URI for database. This locationUri is high level path for catalog */ + public final String locationUri; + + /** + * Instance of AwsProperties which holds the configs related to configure glue and aws setup. + */ + private final AwsProperties awsProperties; + + /** http client for glue client. Current implementation for client is sync type. */ + private final GlueClient glueClient; + + private final String catalogName; + + private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class); + + public GlueOperator( + String locationUri, + String catalogName, + AwsProperties awsProperties, + GlueClient glueClient) { + this.locationUri = locationUri; + this.awsProperties = awsProperties; + this.glueClient = glueClient; + this.catalogName = catalogName; + } + + public void closeClient() { + glueClient.close(); + } + + // -------------- Database related operations. + + /** + * List all databases present. + * + * @return List of fully qualified database names + */ + public List listGlueDatabases() throws CatalogException { + try { + GetDatabasesRequest.Builder databasesRequestBuilder = + GetDatabasesRequest.builder().catalogId(getGlueCatalogId()); + + GetDatabasesResponse response = + glueClient.getDatabases(databasesRequestBuilder.build()); + List glueDatabases = + response.databaseList().stream() + .map(Database::name) + .collect(Collectors.toList()); + String dbResultNextToken = response.nextToken(); + if (Optional.ofNullable(dbResultNextToken).isPresent()) { + do { + databasesRequestBuilder.nextToken(dbResultNextToken); + response = glueClient.getDatabases(databasesRequestBuilder.build()); + glueDatabases.addAll( + response.databaseList().stream() + .map(Database::name) + .collect(Collectors.toList())); + dbResultNextToken = response.nextToken(); + } while (Optional.ofNullable(dbResultNextToken).isPresent()); + } + return glueDatabases; + } catch (GlueException e) { + throw new CatalogException( + GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause()); + } + } + + /** + * Create database in glue data catalog. + * + * @param databaseName fully qualified name of database. + * @param database Instance of {@link CatalogDatabase}. + * @throws CatalogException when unknown error from glue servers. + * @throws DatabaseAlreadyExistException when database exists already in glue data catalog. + */ + public void createGlueDatabase(String databaseName, CatalogDatabase database) + throws CatalogException, DatabaseAlreadyExistException { + + validateName(databaseName); + Map properties = database.getProperties(); + DatabaseInput.Builder databaseInputBuilder = + DatabaseInput.builder() + .name(databaseName) + .description(database.getComment()) + // update location and remove location from properties. + .locationUri(extractDatabaseLocation(properties, databaseName)) + .parameters(properties); + CreateDatabaseRequest.Builder requestBuilder = + CreateDatabaseRequest.builder() + .databaseInput(databaseInputBuilder.build()) + .catalogId(getGlueCatalogId()) + .tags(getFlinkCatalogTag()); + LOG.info( + String.format( + "Database Properties Listing :- %s", + properties.entrySet().stream() + .map(e -> e.getKey() + e.getValue()) + .collect(Collectors.joining(",")))); + try { + CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build()); + if (LOG.isDebugEnabled()) { + LOG.debug(getDebugLog(response)); + } + validateGlueResponse(response); + LOG.info(String.format("%s Database created.", databaseName)); + } catch (EntityNotFoundException e) { + throw new DatabaseAlreadyExistException(catalogName, databaseName, e); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ + private String extractDatabaseLocation( + Map databaseProperties, String databaseName) { + return databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI) + ? databaseProperties.remove(GlueCatalogConstants.LOCATION_URI) + : locationUri + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; + } + + /** + * Create tag for flink in glue catalog for identification. + * + * @return Key/Value pair for tags + */ + private Map getFlinkCatalogTag() { + Map tags = new HashMap<>(); + tags.put("source", "flink_catalog"); + return tags; + } + + /** + * Delete a database from Glue data catalog only when database is empty. + * + * @param databaseName fully qualified name of database + * @throws CatalogException Any Exception thrown due to glue error + * @throws DatabaseNotExistException when database doesn't exists in glue catalog. + */ + public void dropGlueDatabase(String databaseName) + throws CatalogException, DatabaseNotExistException { + + validateName(databaseName); + + DeleteDatabaseRequest deleteDatabaseRequest = + DeleteDatabaseRequest.builder() + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + try { + DeleteDatabaseResponse response = glueClient.deleteDatabase(deleteDatabaseRequest); + if (LOG.isDebugEnabled()) { + LOG.debug(getDebugLog(response)); + } + validateGlueResponse(response); + LOG.info(String.format("Database Dropped %s", databaseName)); + } catch (EntityNotFoundException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (GlueException e) { + throw new CatalogException(catalogName, e); + } + } + + /** + * Drops list of table in database from glue data catalog. + * + * @param databaseName fully qualified name of database + * @param tables List of tables to remove from database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void deleteTablesFromDatabase(String databaseName, Collection tables) + throws CatalogException { + validateName(databaseName); + BatchDeleteTableRequest batchTableRequest = + BatchDeleteTableRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()) + .tablesToDelete(tables) + .build(); + + try { + BatchDeleteTableResponse response = glueClient.batchDeleteTable(batchTableRequest); + if (response.hasErrors()) { + String errorMsg = + String.format( + "Glue Table errors:- %s", + response.errors().stream() + .map( + e -> + "Table: " + + e.tableName() + + "\nErrorDetail: " + + e.errorDetail().errorMessage()) + .collect(Collectors.joining("\n"))); + LOG.error(errorMsg); + } + validateGlueResponse(response); + LOG.info("Tables deleted."); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Drops list of user defined function in database from glue data catalog. + * + * @param databaseName fully qualified name of database + * @param functions List of tables to remove from database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void deleteFunctionsFromDatabase(String databaseName, Collection functions) + throws CatalogException { + validateName(databaseName); + try { + DeleteUserDefinedFunctionRequest.Builder requestBuilder = + DeleteUserDefinedFunctionRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()); + for (String functionName : functions) { + requestBuilder.functionName(functionName); + DeleteUserDefinedFunctionResponse response = + glueClient.deleteUserDefinedFunction(requestBuilder.build()); + if (LOG.isDebugEnabled()) { + LOG.debug(getDebugLog(response)); + } + validateGlueResponse(response); + LOG.info(String.format("Dropped Function %s", functionName)); + } + + } catch (GlueException e) { + LOG.error(String.format("Error deleting functions in database: %s", databaseName)); + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Check if database is empty. i.e. it should not contain 1. table 2. functions + * + * @param databaseName name of database. + * @return boolean True/False based on the content of database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public boolean isDatabaseEmpty(String databaseName) throws CatalogException { + checkArgument(!isNullOrWhitespaceOnly(databaseName)); + validateName(databaseName); + GetTablesRequest tablesRequest = + GetTablesRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(databaseName) + .maxResults(1) + .build(); + try { + GetTablesResponse response = glueClient.getTables(tablesRequest); + return response.sdkHttpResponse().isSuccessful() + && response.tableList().size() == 0 + && listGlueFunctions(databaseName).size() == 0; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Get a database from this glue data catalog. + * + * @param databaseName fully qualified name of database. + * @return Instance of {@link CatalogDatabase } . + * @throws DatabaseNotExistException when database doesn't exists in Glue data catalog. + * @throws CatalogException when any unknown error occurs in glue. + */ + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + validateName(databaseName); + GetDatabaseRequest getDatabaseRequest = + GetDatabaseRequest.builder() + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + try { + GetDatabaseResponse response = glueClient.getDatabase(getDatabaseRequest); + if (LOG.isDebugEnabled()) { + LOG.debug( + GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER + + ": existing database. Client call response :- " + + response.sdkHttpResponse().statusText()); + } + validateGlueResponse(response); + return getCatalogDatabase(response.database()); + } catch (EntityNotFoundException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Build CatalogDatabase instance using information from glue Database. + * + * @param glueDatabase {@link Database } + * @return {@link CatalogDatabase } instance. + */ + private CatalogDatabase getCatalogDatabase(Database glueDatabase) { + Map properties = new HashMap<>(glueDatabase.parameters()); + + // retrieve location uri into properties + properties.put(GlueCatalogConstants.LOCATION_URI, glueDatabase.locationUri()); + String comment = glueDatabase.description(); + return new CatalogDatabaseImpl(properties, comment); + } + + public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase) + throws CatalogException { + + validateName(databaseName); + Map properties = newDatabase.getProperties(); + DatabaseInput.Builder databaseInputBuilder = + DatabaseInput.builder() + .parameters(properties) + .description(newDatabase.getComment()) + .name(databaseName) + .locationUri(extractDatabaseLocation(properties, databaseName)); + + UpdateDatabaseRequest updateRequest = + UpdateDatabaseRequest.builder() + .databaseInput(databaseInputBuilder.build()) + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + UpdateDatabaseResponse response = glueClient.updateDatabase(updateRequest); + if (LOG.isDebugEnabled()) { + LOG.debug(getDebugLog(response)); + } + validateGlueResponse(response); + LOG.info(String.format("Database Updated. %s", databaseName)); + } + + // -------------- Table related operations. + + /** + * Create table in glue data catalog. + * + * @param tablePath Fully qualified name of table. {@link ObjectPath} + * @param table instance of {@link CatalogBaseTable} containing table related information. + * @param managedTable identifier if managed table. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void createGlueTable( + final ObjectPath tablePath, final CatalogBaseTable table, final boolean managedTable) + throws CatalogException { + + checkNotNull(table); + checkNotNull(tablePath); + Map properties = new HashMap<>(table.getOptions()); + String tableOwner = extractTableOwner(properties); + if (managedTable) { + properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER); + } + Set glueColumns = getGlueColumnsFromCatalogTable(table); + + // create StorageDescriptor for table + StorageDescriptor.Builder storageDescriptorBuilder = + StorageDescriptor.builder() + .inputFormat(extractInputFormat(properties)) + .outputFormat(extractOutputFormat(properties)) + .location(extractTableLocation(properties, tablePath)); + + // create TableInput Builder with available information. + TableInput.Builder tableInputBuilder = + TableInput.builder() + .name(tablePath.getObjectName()) + .description(table.getComment()) + .tableType(table.getTableKind().name()) + .lastAccessTime(Instant.now()) + .owner(tableOwner); + + if (table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name())) { + tableInputBuilder.viewExpandedText(getExpandedQuery(table)); + tableInputBuilder.viewOriginalText(getOriginalQuery(table)); + } + + CreateTableRequest.Builder requestBuilder = + CreateTableRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()); + + if (table instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) table; + if (catalogTable.isPartitioned()) { + LOG.info("Catalog table is partitioned"); + Collection partitionKeys = getPartitionKeys(catalogTable, glueColumns); + LOG.info( + "Partition columns are -> " + + partitionKeys.stream() + .map(Column::name) + .collect(Collectors.joining(","))); + tableInputBuilder.partitionKeys(partitionKeys); + } + } + + try { + // apply storage descriptor and tableInput for request + storageDescriptorBuilder.columns(glueColumns); + tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build()); + tableInputBuilder.parameters(properties); + requestBuilder.tableInput(tableInputBuilder.build()); + CreateTableResponse response = glueClient.createTable(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Table created. %s", tablePath.getFullName())); + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String getExpandedQuery(CatalogBaseTable table) { + // todo complete it. + return ""; + } + + private String getOriginalQuery(CatalogBaseTable table) { + // todo complete it. + return ""; + } + + /** + * Extract table owner name and remove from properties. + * + * @param properties Map of properties. + * @return fully qualified owner name. + */ + private String extractTableOwner(Map properties) { + return properties.containsKey(GlueCatalogConstants.TABLE_OWNER) + ? properties.remove(GlueCatalogConstants.TABLE_OWNER) + : null; + } + + /** + * Build set of {@link Column} associated with table. + * + * @param catalogBaseTable instance of {@link CatalogBaseTable}. + * @return Set of Column + */ + private Set getGlueColumnsFromCatalogTable(CatalogBaseTable catalogBaseTable) { + checkNotNull(catalogBaseTable); + TableSchema tableSchema = catalogBaseTable.getSchema(); + return Arrays.stream(tableSchema.getFieldNames()) + .map(fieldName -> getGlueColumn(catalogBaseTable, tableSchema, fieldName)) + .collect(Collectors.toSet()); + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ + private String extractTableLocation(Map tableProperties, ObjectPath tablePath) { + return tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI) + ? tableProperties.remove(GlueCatalogConstants.LOCATION_URI) + : locationUri + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getDatabaseName() + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getObjectName(); + } + + /** + * Extract OutputFormat from properties if present and remove outputFormat from properties. + * fallback to default format if not present + * + * @param tableProperties Key/Value properties + * @return output Format. + */ + private String extractOutputFormat(Map tableProperties) { + return tableProperties.containsKey(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) + ? tableProperties.remove(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) + : GlueCatalogOptions.OUTPUT_FORMAT.defaultValue(); + } + + /** + * Extract InputFormat from properties if present and remove inputFormat from properties. + * fallback to default format if not present + * + * @param tableProperties Key/Value properties + * @return input Format. + */ + private String extractInputFormat(Map tableProperties) { + return tableProperties.containsKey(GlueCatalogConstants.TABLE_INPUT_FORMAT) + ? tableProperties.remove(GlueCatalogConstants.TABLE_INPUT_FORMAT) + : GlueCatalogOptions.INPUT_FORMAT.defaultValue(); + } + + /** + * Get list of filtered columns which are partition columns. + * + * @param catalogTable {@link CatalogTable} instance. + * @param columns List of all column in table. + * @return List of column marked as partition key. + */ + private Collection getPartitionKeys( + CatalogTable catalogTable, Collection columns) { + Set partitionKeys = new HashSet<>(catalogTable.getPartitionKeys()); + return columns.stream() + .filter(column -> partitionKeys.contains(column.name())) + .collect(Collectors.toList()); + } + + /** + * @param tablePath fully Qualified table path. + * @param newTable instance of {@link CatalogBaseTable} containing information for table. + * @param managedTable identifier for managed table. + * @throws CatalogException Glue related exception. + */ + public void alterGlueTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean managedTable) + throws CatalogException { + + Map properties = new HashMap<>(newTable.getOptions()); + String tableOwner = extractTableOwner(properties); + + if (managedTable) { + properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER); + } + + Set glueColumns = getGlueColumnsFromCatalogTable(newTable); + + // create StorageDescriptor for table + StorageDescriptor.Builder storageDescriptorBuilder = + StorageDescriptor.builder() + .inputFormat(extractInputFormat(properties)) + .outputFormat(extractOutputFormat(properties)) + .location(extractTableLocation(properties, tablePath)) + .parameters(properties) + .columns(glueColumns); + + // create TableInput Builder with available information. + TableInput.Builder tableInputBuilder = + TableInput.builder() + .name(tablePath.getObjectName()) + .description(newTable.getComment()) + .tableType(newTable.getTableKind().name()) + .lastAccessTime(Instant.now()) + .owner(tableOwner); + + UpdateTableRequest.Builder requestBuilder = + UpdateTableRequest.builder() + .tableInput(tableInputBuilder.build()) + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()); + + if (newTable instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) newTable; + if (catalogTable.isPartitioned()) { + tableInputBuilder.partitionKeys(getPartitionKeys(catalogTable, glueColumns)); + } + } + + // apply storage descriptor and tableInput for request + tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build()); + requestBuilder.tableInput(tableInputBuilder.build()); + + try { + UpdateTableResponse response = glueClient.updateTable(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Table updated. %s", tablePath.getFullName())); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String getDebugLog(GlueResponse response) { + return String.format( + "Glue response : status = %s \n " + "Details = %s \nMetadataResponse = %s", + response.sdkHttpResponse().isSuccessful(), + response.sdkHttpResponse().toString(), + response.responseMetadata()); + } + + /** + * Get names of all tables or views under this database based on type identifier. An empty list + * is returned if none exists. + * + * @param databaseName fully qualified database name. + * @return a list of the names of all tables or views in this database based on type identifier. + * @throws CatalogException in case of any runtime exception + */ + public List getGlueTableList(String databaseName, String type) throws CatalogException { + GetTablesRequest.Builder tablesRequestBuilder = + GetTablesRequest.builder().databaseName(databaseName).catalogId(getGlueCatalogId()); + GetTablesResponse response = glueClient.getTables(tablesRequestBuilder.build()); + validateGlueResponse(response); + List finalTableList = + response.tableList().stream() + .filter(table -> table.tableType().equalsIgnoreCase(type)) + .map(Table::name) + .collect(Collectors.toList()); + String tableResultNextToken = response.nextToken(); + + if (Optional.ofNullable(tableResultNextToken).isPresent()) { + do { + // update token in requestBuilder to fetch next batch + tablesRequestBuilder.nextToken(tableResultNextToken); + response = glueClient.getTables(tablesRequestBuilder.build()); + validateGlueResponse(response); + finalTableList.addAll( + response.tableList().stream() + .filter(table -> table.tableType().equalsIgnoreCase(type)) + .map(Table::name) + .collect(Collectors.toList())); + tableResultNextToken = response.nextToken(); + } while (Optional.ofNullable(tableResultNextToken).isPresent()); + } + return finalTableList; + } + + /** + * Returns a {@link Table} identified by the given table Path. {@link ObjectPath}. + * + * @param tablePath Path of the table or view + * @return The requested table. Glue encapsulates whether table or view in its attribute called + * type. + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + public Table getGlueTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + GetTableRequest tablesRequest = + GetTableRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .name(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .build(); + try { + GetTableResponse response = glueClient.getTable(tablesRequest); + LOG.info(String.format("Glue table Found %s", response.table().name())); + validateGlueResponse(response); + return response.table(); + } catch (EntityNotFoundException e) { + throw new TableNotExistException(catalogName, tablePath, e); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Check if a table or view exists in glue data catalog. + * + * @param tablePath Path of the table or view + * @return true if the given table exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + public boolean glueTableExists(ObjectPath tablePath) throws CatalogException { + try { + Table glueTable = getGlueTable(tablePath); + return glueTable != null && glueTable.name().equals(tablePath.getObjectName()); + } catch (TableNotExistException e) { + LOG.warn( + String.format( + "%s\nDatabase: %s Table: %s", + GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER, + tablePath.getDatabaseName(), + tablePath.getObjectName())); + return false; + } catch (CatalogException e) { + LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause()); + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Modify an existing function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of function. + * @param newFunction modified function. + * @throws CatalogException on runtime errors. + */ + public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFunction) + throws CatalogException { + UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, newFunction); + UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest = + UpdateUserDefinedFunctionRequest.builder() + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .functionInput(functionInput) + .build(); + UpdateUserDefinedFunctionResponse response = + glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest); + validateGlueResponse(response); + LOG.info(String.format("Function altered. %s", functionPath.getFullName())); + } + + /** + * Drop a table or view from glue data catalog. + * + * @param tablePath fully qualified table path + * @throws CatalogException on runtime errors. + */ + public void dropGlueTable(ObjectPath tablePath) throws CatalogException { + DeleteTableRequest.Builder tableRequestBuilder = + DeleteTableRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .name(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()); + try { + DeleteTableResponse response = glueClient.deleteTable(tableRequestBuilder.build()); + validateGlueResponse(response); + LOG.info(String.format("Dropped Table %s.", tablePath.getObjectName())); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** validate response from client call. */ + private void validateGlueResponse(GlueResponse response) { + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER); + } + } + + public Schema getSchemaFromGlueTable(Table glueTable) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + for (Column col : glueTable.storageDescriptor().columns()) { + schemaBuilder.column(col.name(), col.type()); + } + return schemaBuilder.build(); + } + + // -------------- Function related operations. + + /** + * Create a function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of the function + * @param function Flink function to be created + * @throws CatalogException in case of any runtime exception + */ + public void createGlueFunction(ObjectPath functionPath, CatalogFunction function) + throws CatalogException, FunctionAlreadyExistException { + + UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, function); + CreateUserDefinedFunctionRequest.Builder requestBuilder = + CreateUserDefinedFunctionRequest.builder() + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .functionInput(functionInput); + try { + CreateUserDefinedFunctionResponse response = + glueClient.createUserDefinedFunction(requestBuilder.build()); + validateGlueResponse(response); + LOG.info(String.format("Function created. %s", functionPath.getFullName())); + } catch (AlreadyExistsException e) { + LOG.error( + String.format( + "%s.%s already Exists. Function language of type: %s", + functionPath.getDatabaseName(), + functionPath.getObjectName(), + function.getFunctionLanguage())); + throw new FunctionAlreadyExistException(catalogName, functionPath); + } catch (GlueException e) { + LOG.error("Error creating glue function.", e.getCause()); + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private UserDefinedFunctionInput createFunctionInput( + ObjectPath functionPath, CatalogFunction function) + throws UnsupportedOperationException { + Collection resourceUris = + new LinkedList<>(); + for (org.apache.flink.table.resource.ResourceUri resourceUri : + function.getFunctionResources()) { + switch (resourceUri.getResourceType()) { + case JAR: + case FILE: + case ARCHIVE: + resourceUris.add( + ResourceUri.builder() + .resourceType(resourceUri.getResourceType().name()) + .uri(resourceUri.getUri()) + .build()); + break; + default: + throw new UnsupportedOperationException( + "GlueCatalog supports only creating resources JAR/FILE or ARCHIVE."); + } + } + return UserDefinedFunctionInput.builder() + .functionName(functionPath.getObjectName()) + .className(getGlueFunctionClassName(function)) + .ownerType(PrincipalType.USER) + .ownerName(GlueCatalogConstants.FLINK_CATALOG) + .resourceUris(resourceUris) + .build(); + } + + /** + * Get the user defined function from glue Catalog. Function name should be handled in a + * case-insensitive way. + * + * @param functionPath path of the function + * @return the requested function + * @throws CatalogException in case of any runtime exception + */ + public CatalogFunction getGlueFunction(ObjectPath functionPath) { + GetUserDefinedFunctionRequest request = + GetUserDefinedFunctionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(functionPath.getDatabaseName()) + .functionName(functionPath.getObjectName()) + .build(); + GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request); + validateGlueResponse(response); + UserDefinedFunction udf = response.userDefinedFunction(); + + List resourceUris = new LinkedList<>(); + for (ResourceUri resourceUri : udf.resourceUris()) { + resourceUris.add( + new org.apache.flink.table.resource.ResourceUri( + ResourceType.valueOf(resourceUri.resourceType().name()), + resourceUri.uri())); + } + + return new CatalogFunctionImpl( + getCatalogFunctionClassName(udf), getFunctionalLanguage(udf), resourceUris); + } + + public List listGlueFunctions(String databaseName) { + GetUserDefinedFunctionsRequest.Builder functionsRequest = + GetUserDefinedFunctionsRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()); + + List glueFunctions; + try { + GetUserDefinedFunctionsResponse functionsResponse = + glueClient.getUserDefinedFunctions(functionsRequest.build()); + String token = functionsResponse.nextToken(); + glueFunctions = + functionsResponse.userDefinedFunctions().stream() + .map(UserDefinedFunction::functionName) + .collect(Collectors.toCollection(LinkedList::new)); + if (Optional.ofNullable(token).isPresent()) { + do { + functionsRequest.nextToken(token); + functionsResponse = + glueClient.getUserDefinedFunctions(functionsRequest.build()); + glueFunctions.addAll( + functionsResponse.userDefinedFunctions().stream() + .map(UserDefinedFunction::functionName) + .collect(Collectors.toCollection(LinkedList::new))); + token = functionsResponse.nextToken(); + } while (Optional.ofNullable(token).isPresent()); + } + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + return glueFunctions; + } + + public boolean glueFunctionExists(ObjectPath functionPath) { + GetUserDefinedFunctionRequest request = + GetUserDefinedFunctionRequest.builder() + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .build(); + + try { + GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request); + validateGlueResponse(response); + return response.userDefinedFunction() + .functionName() + .equalsIgnoreCase(functionPath.getObjectName()); + } catch (EntityNotFoundException e) { + LOG.warn( + String.format( + "Entry not found for function %s.%s", + functionPath.getObjectName(), functionPath.getDatabaseName())); + return false; + } catch (GlueException e) { + LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + public void dropGlueFunction(ObjectPath functionPath) throws CatalogException { + DeleteUserDefinedFunctionRequest request = + DeleteUserDefinedFunctionRequest.builder() + .catalogId(getGlueCatalogId()) + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .build(); + DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(request); + validateGlueResponse(response); + LOG.info(String.format("Dropped Function. %s", functionPath.getFullName())); + } + + // -------------- Partition related operations. + + public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable) + throws TableNotPartitionedException { + if (!glueTable.hasPartitionKeys()) { + throw new TableNotPartitionedException(catalogName, tablePath); + } + } + + /** + * create partition in glue data catalog. + * + * @param glueTable glue table + * @param partitionSpec partition spec + * @param catalogPartition partition to add + */ + public void createGluePartition( + Table glueTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition) + throws CatalogException, PartitionSpecInvalidException { + + List partCols = getColumnNames(glueTable.partitionKeys()); + LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols))); + List partitionValues = + getOrderedFullPartitionValues( + partitionSpec, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name())); + + // validate partition values + for (int i = 0; i < partCols.size(); i++) { + if (isNullOrWhitespaceOnly(partitionValues.get(i))) { + throw new PartitionSpecInvalidException( + catalogName, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name()), + partitionSpec); + } + } + StorageDescriptor.Builder sdBuilder = glueTable.storageDescriptor().toBuilder(); + Map partitionProperties = catalogPartition.getProperties(); + + sdBuilder.location(extractPartitionLocation(partitionProperties)); + sdBuilder.parameters(partitionSpec.getPartitionSpec()); + String comment = catalogPartition.getComment(); + if (comment != null) { + partitionProperties.put(GlueCatalogConstants.COMMENT, comment); + } + PartitionInput.Builder partitionInput = + PartitionInput.builder() + .parameters(partitionProperties) + .lastAccessTime(Instant.now()) + .storageDescriptor(sdBuilder.build()) + .values(partitionValues); + CreatePartitionRequest createPartitionRequest = + CreatePartitionRequest.builder() + .partitionInput(partitionInput.build()) + .catalogId(getGlueCatalogId()) + .databaseName(glueTable.databaseName()) + .tableName(glueTable.name()) + .build(); + + try { + CreatePartitionResponse response = glueClient.createPartition(createPartitionRequest); + validateGlueResponse(response); + LOG.info("Partition Created."); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String extractPartitionLocation(Map properties) { + return properties.containsKey(GlueCatalogConstants.LOCATION_URI) + ? properties.remove(GlueCatalogConstants.LOCATION_URI) + : null; + } + + /** Get column names from List of column. */ + private static List getColumnNames(final List columns) { + return columns.stream().map(Column::name).collect(Collectors.toList()); + } + + /** + * Get a list of ordered partition values by re-arranging them based on the given list of + * partition keys. If the partition value is null, it'll be converted into default partition + * name. + * + * @param partitionSpec a partition spec. + * @param partitionKeys a list of partition keys. + * @param tablePath path of the table to which the partition belongs. + * @return A list of partition values ordered according to partitionKeys. + * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have + * different sizes, or any key in partitionKeys doesn't exist in partitionSpec. + */ + private List getOrderedFullPartitionValues( + CatalogPartitionSpec partitionSpec, List partitionKeys, ObjectPath tablePath) + throws PartitionSpecInvalidException { + Map spec = partitionSpec.getPartitionSpec(); + if (spec.size() != partitionKeys.size()) { + throw new PartitionSpecInvalidException( + catalogName, partitionKeys, tablePath, partitionSpec); + } + + List values = new ArrayList<>(spec.size()); + for (String key : partitionKeys) { + if (!spec.containsKey(key)) { + throw new PartitionSpecInvalidException( + catalogName, partitionKeys, tablePath, partitionSpec); + } else { + String value = spec.get(key); + if (value == null) { + value = GlueCatalogConstants.DEFAULT_PARTITION_NAME; + } + values.add(value); + } + } + + return values; + } + + /** + * Update glue table. + * + * @param tablePath contains database name and table name. + * @param partitionSpec Existing partition information. + * @param newPartition Partition information with new changes. + * @throws CatalogException Exception in failure. + */ + public void alterGluePartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition) + throws CatalogException { + // todo has to implement + } + + /** + * Get CatalogPartitionSpec of all partitions from glue data catalog. + * + * @param tablePath fully qualified table path. + * @return List of PartitionSpec + */ + public List listPartitions(ObjectPath tablePath) { + + GetPartitionsRequest.Builder request = + GetPartitionsRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()); + try { + GetPartitionsResponse response = glueClient.getPartitions(request.build()); + validateGlueResponse(response); + List finalPartitionsList = + response.partitions().stream() + .map(this::getCatalogPartitionSpec) + .collect(Collectors.toList()); + String partitionsResultNextToken = response.nextToken(); + if (Optional.ofNullable(partitionsResultNextToken).isPresent()) { + do { + // creating a new GetPartitionsResult using next token. + request.nextToken(partitionsResultNextToken); + response = glueClient.getPartitions(request.build()); + finalPartitionsList.addAll( + response.partitions().stream() + .map(this::getCatalogPartitionSpec) + .collect(Collectors.toList())); + partitionsResultNextToken = response.nextToken(); + } while (Optional.ofNullable(partitionsResultNextToken).isPresent()); + } + + return finalPartitionsList; + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * @param tablePath + * @param partitionSpec + * @throws CatalogException + * @throws TableNotExistException + * @throws PartitionSpecInvalidException + */ + public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + + try { + Table glueTable = getGlueTable(tablePath); + List partCols = getColumnNames(glueTable.partitionKeys()); + DeletePartitionRequest deletePartitionRequest = + DeletePartitionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .partitionValues( + getOrderedFullPartitionValues( + partitionSpec, partCols, tablePath)) + .build(); + DeletePartitionResponse response = glueClient.deletePartition(deletePartitionRequest); + validateGlueResponse(response); + LOG.info("Partition Dropped."); + } catch (TableNotExistException | PartitionSpecInvalidException e) { + e.printStackTrace(); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + public List listGluePartitionsByFilter( + ObjectPath tablePath, List filters) { + String expressionString = + filters.stream() + .map(x -> getExpressionString(x, new StringBuilder())) + .collect( + Collectors.joining( + GlueCatalogConstants.SPACE + GlueCatalogConstants.AND)); + try { + GetPartitionsRequest request = + GetPartitionsRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .expression(expressionString) + .build(); + GetPartitionsResponse response = glueClient.getPartitions(request); + List catalogPartitionSpecList = + response.partitions().stream() + .map(this::getCatalogPartitionSpec) + .collect(Collectors.toList()); + // GlueOperator.validateGlueResponse(response); + String nextToken = response.nextToken(); + if (Optional.ofNullable(nextToken).isPresent()) { + do { + // creating a new GetPartitionsResult using next token. + request = + GetPartitionsRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .expression(expressionString) + .nextToken(nextToken) + .build(); + response = glueClient.getPartitions(request); + catalogPartitionSpecList.addAll( + response.partitions().stream() + .map(this::getCatalogPartitionSpec) + .collect(Collectors.toList())); + nextToken = response.nextToken(); + } while (Optional.ofNullable(nextToken).isPresent()); + } + return catalogPartitionSpecList; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String getExpressionString(Expression expression, StringBuilder sb) { + + for (Expression childExpression : expression.getChildren()) { + if (childExpression.getChildren() != null && childExpression.getChildren().size() > 0) { + getExpressionString(childExpression, sb); + } + } + return sb.insert( + 0, + expression.asSummaryString() + + GlueCatalogConstants.SPACE + + GlueCatalogConstants.AND) + .toString(); + } + + public Partition getGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException { + try { + GetPartitionRequest request = + GetPartitionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .build(); + GetPartitionResponse response = glueClient.getPartition(request); + validateGlueResponse(response); + Partition partition = response.partition(); + if (partition.hasValues() + && specSubset( + partitionSpec.getPartitionSpec(), + partition.storageDescriptor().parameters())) { + return partition; + } + } catch (EntityNotFoundException e) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec); + } + return null; + } + + public boolean gluePartitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + GetPartitionRequest request = + GetPartitionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .build(); + try { + GetPartitionResponse response = glueClient.getPartition(request); + validateGlueResponse(response); + return response.partition() + .parameters() + .keySet() + .containsAll(partitionSpec.getPartitionSpec().keySet()); + } catch (EntityNotFoundException e) { + LOG.warn(String.format("%s is not found", partitionSpec.getPartitionSpec())); + } catch (GlueException e) { + throw new CatalogException(catalogName, e); + } + return false; + } + + private Column getGlueColumn( + CatalogBaseTable catalogBaseTable, TableSchema tableSchema, String fieldName) + throws CatalogException { + Optional dataType = tableSchema.getFieldDataType(fieldName); + if (dataType.isPresent()) { + String glueDataType = dataType.toString(); + return Column.builder() + .comment(catalogBaseTable.getComment()) + .type(glueDataType) + .name(fieldName) + .build(); + } else { + throw new CatalogException("DataType information missing from table schema"); + } + } + + private void validateUDFClassName(String name) { + checkArgument(!isNullOrWhitespaceOnly(name)); + if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length != 3) { + throw new ValidationException("Improper classname"); + } + } + + private String getGlueCatalogId() { + return awsProperties.getGlueCatalogId(); + } + + private StorageDescriptor getTableStorageDescriptor( + ObjectPath tablePath, Collection glueColumns, Map options) { + return StorageDescriptor.builder() + .columns(glueColumns) + .location( + locationUri + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getFullName()) + .parameters(options) + .build(); + } + + /** + * @param udf Instance of UserDefinedFunction + * @return ClassName for function + */ + private String getCatalogFunctionClassName(UserDefinedFunction udf) { + validateUDFClassName(udf.className()); + return udf.functionName().split(GlueCatalogConstants.DEFAULT_SEPARATOR)[1]; + } + + private String getGlueFunctionClassName(CatalogFunction function) { + if (function.getFunctionLanguage().equals(FunctionLanguage.JAVA)) { + return GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX + + GlueCatalogConstants.DEFAULT_SEPARATOR + + function.getClassName(); + } else if (function.getFunctionLanguage().equals(FunctionLanguage.SCALA)) { + return GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX + + GlueCatalogConstants.DEFAULT_SEPARATOR + + function.getClassName(); + } else if (function.getFunctionLanguage().equals(FunctionLanguage.PYTHON)) { + return GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX + + GlueCatalogConstants.DEFAULT_SEPARATOR + + function.getClassName(); + } else { + throw new UnsupportedOperationException( + String.format( + "GlueCatalog supports only creating: [%s]", + Arrays.stream(FunctionLanguage.values()) + .map(FunctionLanguage::name) + .collect(Collectors.joining(GlueCatalogConstants.NEXT_LINE)))); + } + } + + private FunctionLanguage getFunctionalLanguage(UserDefinedFunction glueFunction) { + if (glueFunction.className().startsWith(GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX)) { + return FunctionLanguage.JAVA; + } else if (glueFunction + .className() + .startsWith(GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX)) { + return FunctionLanguage.PYTHON; + } else if (glueFunction + .className() + .startsWith(GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX)) { + return FunctionLanguage.SCALA; + } else { + throw new CatalogException("Invalid Functional Language"); + } + } + + /** + * Rename glue table. Glue catalog don't support renaming table. For renaming in Flink, it has + * to be done in 3 step. 1. fetch existing table info from glue 2. Create a table with new-name + * and use properties of existing table 3. Delete existing table Note: This above steps are not + * Atomic in nature + * + * @param oldTablePath old table name + * @param newTablePath new renamed table + */ + public void renameGlueTable(ObjectPath oldTablePath, ObjectPath newTablePath) + throws CatalogException, TableNotExistException { + // no need to handle crashes + // update user about this in proper information + // Probable steps to do + // 1. Get Current Glue Table + // 2. Derive input with renamed values + // 3. create new table + // 4. create partitions with renamed values + // 5. delete old partitions + // 6. delete old table + // todo: when statistics features are implemented in GlueCatalog + // 1. update table statistics + // 2. update table column statistics + // 3. partition statistics + // todo: has to do implementation + } + + /** + * Use information from {@link Partition} and derive {@link CatalogPartitionSpec}. + * + * @param partition Glue Partition instance + * @return instance of {@link CatalogPartitionSpec} + */ + private CatalogPartitionSpec getCatalogPartitionSpec(Partition partition) { + Map params = new HashMap<>(partition.storageDescriptor().parameters()); + return new CatalogPartitionSpec(params); + } + + /** List all the Partition associated with a table. */ + private List listGluePartitions(ObjectPath tablePath) throws CatalogException { + GetPartitionsRequest.Builder request = + GetPartitionsRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()); + try { + GetPartitionsResponse response = glueClient.getPartitions(request.build()); + validateGlueResponse(response); + List finalPartitionsList = response.partitions(); + String partitionsResultNextToken = response.nextToken(); + if (Optional.ofNullable(partitionsResultNextToken).isPresent()) { + do { + // update nextToken in requestBuilder + request.nextToken(partitionsResultNextToken); + response = glueClient.getPartitions(request.build()); + finalPartitionsList.addAll(response.partitions()); + partitionsResultNextToken = response.nextToken(); + } while (Optional.ofNullable(partitionsResultNextToken).isPresent()); + } + + return finalPartitionsList; + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Create a {@link CatalogTable} using all the information from {@link Table}. + * + * @param glueTable Instance of Table from glue Data catalog. + * @return {@link CatalogTable}. + */ + public CatalogBaseTable getCatalogBaseTableFromGlueTable(Table glueTable) { + + checkNotNull(glueTable, "Fetched Glue Table is null"); + Schema schemaInfo = getSchemaFromGlueTable(glueTable); + + // get partition column names + List partitionKeys = + glueTable.partitionKeys().stream().map(Column::name).collect(Collectors.toList()); + Map properties = new HashMap<>(glueTable.parameters()); + + // retrieve owner name. + if (glueTable.owner() != null) { + properties.put(GlueCatalogConstants.TABLE_OWNER, glueTable.owner()); + } + + // retrieve properties from storageDescriptor. + if (glueTable.storageDescriptor().hasParameters()) { + properties.putAll(glueTable.storageDescriptor().parameters()); + } + + // retrieve storage location. + if (glueTable.storageDescriptor().location() != null) { + properties.put( + GlueCatalogConstants.LOCATION_URI, glueTable.storageDescriptor().location()); + } + + // retrieve input format. + if (glueTable.storageDescriptor().inputFormat() != null) { + properties.put( + GlueCatalogConstants.TABLE_INPUT_FORMAT, + glueTable.storageDescriptor().inputFormat()); + } + + // retrieve output format + if (glueTable.storageDescriptor().outputFormat() != null) { + properties.put( + GlueCatalogConstants.TABLE_OUTPUT_FORMAT, + glueTable.storageDescriptor().outputFormat()); + } + + if (glueTable.tableType().equals(CatalogBaseTable.TableKind.TABLE.name())) { + return CatalogTable.of(schemaInfo, glueTable.description(), partitionKeys, properties); + } else if (glueTable.tableType().equals(CatalogBaseTable.TableKind.VIEW.name())) { + return CatalogView.of( + schemaInfo, + glueTable.description(), + glueTable.viewOriginalText(), + glueTable.viewExpandedText(), + properties); + + } else { + throw new CatalogException("Unknown table type."); + } + } + + /** + * Get CatalogPartitionSpec of all partitions that is under the given CatalogPartitionSpec in + * the table. + * + * @param tablePath Fully qualified table Path. + * @param partitionSpec Partition spec . + * @return List of CatalogPartitionSpec. + */ + public List listGluePartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, PartitionSpecInvalidException { + + Table glueTable = getGlueTable(tablePath); + List partCols = getColumnNames(glueTable.partitionKeys()); + LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols))); + List partitionValues = + getOrderedFullPartitionValues( + partitionSpec, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name())); + + // validate partition values + for (int i = 0; i < partCols.size(); i++) { + if (isNullOrWhitespaceOnly(partitionValues.get(i))) { + throw new PartitionSpecInvalidException( + catalogName, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name()), + partitionSpec); + } + } + List partitionSpecList = listPartitions(tablePath); + return partitionSpecList.stream() + .filter( + currPartSpec -> + specSubset( + partitionSpec.getPartitionSpec(), + currPartSpec.getPartitionSpec())) + .collect(Collectors.toList()); + } + + /** + * check spec1 is subset of spec2. + * + * @param spec1 Key/Value pair spec + * @param spec2 Key/Value pair spec + * @return is spec1 is subset of spec2 + */ + private boolean specSubset(Map spec1, Map spec2) { + return spec1.entrySet().stream().allMatch(e -> e.getValue().equals(spec2.get(e.getKey()))); + } + + /** + * A Glue database name cannot be longer than 252 characters. The only acceptable characters are + * lowercase letters, numbers, and the underscore character. More details: + * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html + * + * @param name name + */ + static void validateName(String name) { + checkArgument( + name != null && GlueCatalogConstants.GLUE_DB_PATTERN.matcher(name).find(), + "Database name is not according to Glue Norms, " + + "check here https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html"); + } +} diff --git a/flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 000000000..c8308d477 --- /dev/null +++ b/flink-catalog-aws-glue/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.catalog.glue.GlueCatalogFactory \ No newline at end of file diff --git a/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java b/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java new file mode 100644 index 000000000..b904cb67b --- /dev/null +++ b/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +/** GlueCatalog Test. */ +public class GlueCatalogTest {} diff --git a/pom.xml b/pom.xml index aa7ea1825..c73b4a1ad 100644 --- a/pom.xml +++ b/pom.xml @@ -84,8 +84,8 @@ under the License. flink-sql-connector-aws-kinesis-firehose flink-sql-connector-aws-kinesis-streams flink-sql-connector-kinesis - flink-formats-aws + flink-catalog-aws-glue flink-connector-aws-e2e-tests