Skip to content

Commit

Permalink
Address to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Samrat002 committed Aug 10, 2024
1 parent 12e9d66 commit 2cfa1bf
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class GlueCatalogConstants {

public static final String FLINK_CATALOG = "FLINK_CATALOG";

public static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$");
public static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,255}$");
public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE EXCEPTION";
public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T EXIST";
public static final String DEFAULT_PARTITION_NAME = "__GLUE_DEFAULT_PARTITION__";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ public String factoryIdentifier() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return GlueCatalogOptions.getAllConfigOptions();
Set<ConfigOption<?>> allConfigs = GlueCatalogOptions.getAllConfigOptions();
allConfigs.removeAll(GlueCatalogOptions.getRequiredConfigOptions());
return allConfigs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
Expand Down Expand Up @@ -110,14 +111,15 @@ public void createGlueDatabase(String databaseName, CatalogDatabase database)
throws CatalogException, DatabaseAlreadyExistException {
GlueUtils.validate(databaseName);
Map<String, String> properties = new HashMap<>(database.getProperties());
DatabaseInput.Builder databaseInputBuilder =
DatabaseInput databaseInput =
DatabaseInput.builder()
.name(databaseName)
.description(database.getComment())
.parameters(properties);
.parameters(properties)
.build();
CreateDatabaseRequest.Builder requestBuilder =
CreateDatabaseRequest.builder()
.databaseInput(databaseInputBuilder.build())
.databaseInput(databaseInput)
.catalogId(getGlueCatalogId());
try {
CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build());
Expand All @@ -126,6 +128,8 @@ public void createGlueDatabase(String databaseName, CatalogDatabase database)
}
GlueUtils.validateGlueResponse(response);
} catch (EntityNotFoundException e) {
throw new CatalogException(catalogName, e);
} catch (AlreadyExistsException e) {
throw new DatabaseAlreadyExistException(catalogName, databaseName, e);
} catch (GlueException e) {
throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
Expand Down Expand Up @@ -209,24 +213,27 @@ public void deleteTablesFromDatabase(String databaseName, Collection<String> tab
public void deleteFunctionsFromDatabase(String databaseName, Collection<String> functions)
throws CatalogException {
GlueUtils.validate(databaseName);
try {
DeleteUserDefinedFunctionRequest.Builder requestBuilder =
DeleteUserDefinedFunctionRequest.builder()
.databaseName(databaseName)
.catalogId(getGlueCatalogId());
DeleteUserDefinedFunctionResponse response;
for (String functionName : functions) {
requestBuilder.functionName(functionName);
DeleteUserDefinedFunctionRequest.Builder requestBuilder =
DeleteUserDefinedFunctionRequest.builder()
.databaseName(databaseName)
.catalogId(getGlueCatalogId());
DeleteUserDefinedFunctionResponse response;
for (String functionName : functions) {
requestBuilder.functionName(functionName);
try {
response = glueClient.deleteUserDefinedFunction(requestBuilder.build());
if (LOG.isDebugEnabled()) {
LOG.debug(GlueUtils.getDebugLog(response));
}
GlueUtils.validateGlueResponse(response);
} catch (GlueException e) {
LOG.error(
"Error deleting function {} in database: {}\n{}",
functionName,
databaseName,
e);
throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
}

} catch (GlueException e) {
LOG.error("Error deleting functions in database: {}", databaseName);
throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
if (LOG.isDebugEnabled()) {
LOG.debug(GlueUtils.getDebugLog(response));
}
GlueUtils.validateGlueResponse(response);
}
}

Expand All @@ -252,10 +259,7 @@ public CatalogDatabase getDatabase(String databaseName)
LOG.debug(GlueUtils.getDebugLog(response));
}
GlueUtils.validateGlueResponse(response);
return response.database().name().equals(databaseName)
? GlueUtils.getCatalogDatabase(response.database())
: null;

return GlueUtils.getCatalogDatabase(response.database());
} catch (EntityNotFoundException e) {
throw new DatabaseNotExistException(catalogName, databaseName);
} catch (GlueException e) {
Expand All @@ -274,15 +278,16 @@ public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase)
throws CatalogException {
GlueUtils.validate(databaseName);
Map<String, String> newProperties = new HashMap<>(newDatabase.getProperties());
DatabaseInput.Builder databaseInputBuilder =
DatabaseInput databaseInput =
DatabaseInput.builder()
.parameters(newProperties)
.description(newDatabase.getComment())
.name(databaseName);
.name(databaseName)
.build();

UpdateDatabaseRequest updateRequest =
UpdateDatabaseRequest.builder()
.databaseInput(databaseInputBuilder.build())
.databaseInput(databaseInput)
.name(databaseName)
.catalogId(getGlueCatalogId())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,11 @@ public void createGlueFunction(ObjectPath functionPath, CatalogFunction function
} catch (AlreadyExistsException e) {
LOG.error(
String.format(
"%s.%s already Exists. Function language of type: %s",
functionPath.getDatabaseName(),
functionPath.getObjectName(),
function.getFunctionLanguage()));
"%s already Exists. Function language of type: %s. \n%s",
functionPath.getFullName(), function.getFunctionLanguage(), e));
throw new FunctionAlreadyExistException(catalogName, functionPath, e);
} catch (GlueException e) {
LOG.error("Error creating glue function.", e);
LOG.error("Error creating glue function: {}\n{}", functionPath.getFullName(), e);
throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
}
}
Expand All @@ -109,25 +107,22 @@ public void createGlueFunction(ObjectPath functionPath, CatalogFunction function
public CatalogFunction getGlueFunction(ObjectPath functionPath) {
GetUserDefinedFunctionRequest request =
GetUserDefinedFunctionRequest.builder()
.catalogId(getGlueCatalogId())
.databaseName(functionPath.getDatabaseName())
.functionName(functionPath.getObjectName())
.catalogId(getGlueCatalogId())
.functionName(functionPath.getObjectName())
.build();
GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
GlueUtils.validateGlueResponse(response);
UserDefinedFunction udf = response.userDefinedFunction();

List<ResourceUri> resourceUris = new LinkedList<>();
for (software.amazon.awssdk.services.glue.model.ResourceUri resourceUri :
udf.resourceUris()) {
resourceUris.add(
new org.apache.flink.table.resource.ResourceUri(
org.apache.flink.table.resource.ResourceType.valueOf(
resourceUri.resourceType().name()),
resourceUri.uri()));
}

List<ResourceUri> resourceUris =
udf.resourceUris().stream()
.map(
resourceUri ->
new org.apache.flink.table.resource.ResourceUri(
org.apache.flink.table.resource.ResourceType
.valueOf(resourceUri.resourceType().name()),
resourceUri.uri()))
.collect(Collectors.toList());
return new CatalogFunctionImpl(
GlueUtils.getCatalogFunctionClassName(udf),
GlueUtils.getFunctionalLanguage(udf),
Expand Down Expand Up @@ -157,7 +152,6 @@ public List<String> listGlueFunctions(String databaseName) {
.collect(Collectors.toCollection(LinkedList::new)));
token = functionsResponse.nextToken();
}

} catch (GlueException e) {
throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
}
Expand All @@ -177,10 +171,6 @@ public boolean glueFunctionExists(ObjectPath functionPath) {
GlueUtils.validateGlueResponse(response);
return response.userDefinedFunction() != null;
} catch (EntityNotFoundException e) {
LOG.warn(
String.format(
"Entry not found for function %s.%s",
functionPath.getObjectName(), functionPath.getDatabaseName()));
return false;
} catch (GlueException e) {
LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
Expand Down Expand Up @@ -210,7 +200,7 @@ public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFuncti
UpdateUserDefinedFunctionResponse response =
glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
GlueUtils.validateGlueResponse(response);
LOG.info("Altered UDF Function: {}", functionPath.getFullName());
LOG.info("Altered Function: {}", functionPath.getFullName());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.flink.table.catalog.glue.operator;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.StringUtils;

import software.amazon.awssdk.services.glue.GlueClient;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand All @@ -38,9 +40,13 @@ public abstract class GlueOperator {
public final String catalogName;

public GlueOperator(String catalogName, GlueClient glueClient, String glueCatalogId) {
checkNotNull(catalogName, "CatalogName cannot be Null.");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(catalogName),
"catalogName name cannot be null or empty.");
checkNotNull(glueClient, "GlueClient Instance cannot be Null.");
checkNotNull(glueCatalogId, "Glue Catalog Id cannot be null.");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(glueCatalogId),
"glue Catalog Id name cannot be null or empty.");
this.catalogName = catalogName;
this.glueClient = glueClient;
this.glueCatalogId = glueCatalogId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.catalog.glue.util.GlueUtils.getExpressionString;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

/** Utilities for Glue catalog Partition related operations. */
Expand All @@ -76,7 +77,9 @@ public GluePartitionOperator(String catalogName, GlueClient glueClient, String g
*
* @param glueTable glue table
* @param partitionSpec partition spec
* @param catalogPartition partition to add
* @param catalogPartition partition to add.
* @throws CatalogException
* @throws PartitionSpecInvalidException
*/
public void createGluePartition(
final Table glueTable,
Expand Down Expand Up @@ -479,26 +482,4 @@ private CatalogPartitionSpec getCatalogPartitionSpec(Partition partition) {
Map<String, String> params = new HashMap<>(partition.storageDescriptor().parameters());
return new CatalogPartitionSpec(params);
}

/**
* Recursively derive the expression string from given {@link Expression}.
*
* @param expression Instance of {@link Expression}.
* @param sb StringBuilder.
* @return Derived String from {@link Expression}.
*/
private String getExpressionString(Expression expression, StringBuilder sb) {

for (Expression childExpression : expression.getChildren()) {
if (childExpression.getChildren() != null && !childExpression.getChildren().isEmpty()) {
getExpressionString(childExpression, sb);
}
}
return sb.insert(
0,
expression.asSummaryString()
+ GlueCatalogConstants.SPACE
+ GlueCatalogConstants.AND)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,6 @@ public boolean glueTableExists(ObjectPath tablePath) throws CatalogException {
Table glueTable = getGlueTable(tablePath);
return glueTable != null && glueTable.name().equals(tablePath.getObjectName());
} catch (TableNotExistException e) {
LOG.warn(
String.format(
"%s\tDatabase: %s Table: %s",
GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER,
tablePath.getDatabaseName(),
tablePath.getObjectName()));
return false;
} catch (CatalogException e) {
throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
Expand Down Expand Up @@ -347,7 +341,9 @@ public CatalogBaseTable getCatalogBaseTableFromGlueTable(Table glueTable) {
properties);

} else {
throw new CatalogException("Unknown TableType from Glue Catalog.");
throw new CatalogException(
String.format(
"Unknown TableType: %s from Glue Catalog.", glueTable.tableType()));
}
}

Expand Down
Loading

0 comments on commit 2cfa1bf

Please sign in to comment.