-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30481][FLIP-277] GlueCatalog Implementation #47
base: main
Are you sure you want to change the base?
Conversation
665271b
to
4ad9f96
Compare
43308aa
to
ccf35dc
Compare
ccf35dc
to
ef5e967
Compare
...catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java
Outdated
Show resolved
Hide resolved
...k-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java
Outdated
Show resolved
Hide resolved
2a82242
to
9117402
Compare
@dannycranmer please review changes in free time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Samrat002 Thanks for bring this feature. I'm not familiar with GlueCatalog, but I left some comments from Flink and Flink Catalog sides.
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
...k-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java
Outdated
Show resolved
Hide resolved
...k-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java
Outdated
Show resolved
Hide resolved
...catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java
Outdated
Show resolved
Hide resolved
...atalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java
Outdated
Show resolved
Hide resolved
...catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java
Outdated
Show resolved
Hide resolved
0e2dc19
to
10dfc99
Compare
flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/GlueCatalogTest.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
...catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java
Outdated
Show resolved
Hide resolved
...atalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java
Outdated
Show resolved
Hide resolved
...alog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java
Outdated
Show resolved
Hide resolved
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java
Outdated
Show resolved
Hide resolved
@dannycranmer @vahmed-hamdy please review wheenver time |
@dannycranmer @vahmed-hamdy please review whenever time 🙏🏻 |
[gentle ping] |
@PrabhuJoseph please review the PR whenever time |
Any news here? :) |
any updates here? This is a feature that is already available on Flink EMR would be great to have it here too. |
vikramsinghchandel@ wckdman@ . thank you for your interest |
@dannycranmer @fapaul Please review whenever time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for your efforts with this one @Samrat002 - I've made a few suggestions.
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Outdated
Show resolved
Hide resolved
...ws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java
Show resolved
Hide resolved
...flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java
Outdated
Show resolved
Hide resolved
...flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java
Outdated
Show resolved
Hide resolved
...flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java
Outdated
Show resolved
Hide resolved
...flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java
Outdated
Show resolved
Hide resolved
...flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java
Outdated
Show resolved
Hide resolved
Thank you @foxus for the review . |
|
Hey, let me know if I can help progress this? I'm happy to assist if you're willing and able to give me write access to the branch. |
@foxus |
yes . send invite for colaboration |
@foxus Please review whenever time |
Thanks for the effort responding to the comments, there's only one which I think has been marked as resolved but still appears not to have been addressed. |
Fixed the missing comment. @foxus please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies @Samrat002, I've spotted two more blocking bugs that I should have spotted before.
return DataTypes.TIMESTAMP(5); | ||
case "array": | ||
// Example: array<string> -> DataTypes.ARRAY(DataTypes.STRING()) | ||
String elementType = glueType.substring(6, glueType.length() - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgive me for not spotting this earlier - this will always cause an out of bounds exception. The only way this block of code runs is if glueType
is equal to the string literal array
but in this block you're attempting to parse the part of the string which cannot exist (array<string>
). It looks like you're expecting case
here to behave as a startsWith
. Please address this and consider adding unit tests for this method.
// Example: map<string, string> -> DataTypes.MAP(DataTypes.STRING(), | ||
// DataTypes.STRING()) | ||
int commaIndex = glueType.indexOf(","); | ||
String keyType = glueType.substring(4, commaIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgive me for not spotting this earlier - this will always cause an out of bounds exception. The only way this block of code runs is if glueType
is equal to the string literal map
but in this block you're attempting to parse the part of the string which cannot exist (map<string, string>
). It looks like you're expecting case
here to behave as a startsWith
. Please address this and consider adding unit tests for this method. I would consider using regex here to parse the key and value types - it will allow you to be more permissive about whitespace (e.g. map<string, string>
and map<string,string>
) and also allow you to play with named groups (a personal favourite).
@Samrat002, thanks for agreeing to collaborate, I've resolved some of the comments but left it as a separate commit to allow you to review and squash if you're happy whilst you're addressing the |
Co-Authored-By: Anthony Pounds-Cornish <[email protected]>
@foxus thank you for adding the change. 👍🏻 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have partially reviewed the PR, I will try to resume during this week, the PR has been hanging for a very long time and is of huge size, Do you believe it might be easier to break it down to a couple of PRs, so it is easier to review and more importantly to provide thorough testing for each part.
I suggest starting with laying the code structure and minimum possible set of features and exposing minimum configuration while defaulting the rest.
We can add features like functions and partitions and http-client configurations in a following PR.
</parent> | ||
|
||
<artifactId>flink-catalog-aws-glue</artifactId> | ||
<name>Flink : Catalog : AWS : Glue</name> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<name>Flink : Catalog : AWS : Glue</name> | |
<name>Flink : Catalog : AWS : Glue Data Catalog</name> |
* <p>For more details, see <a | ||
* href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html">...</a> | ||
*/ | ||
public static final String GLUE_CATALOG_ID = "aws.glue.id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we use aws.glue.catalog-id
since we also have account-id
} | ||
|
||
public static Set<ConfigOption<?>> getRequiredConfigOptions() { | ||
return new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not have GLUE_CATALOG_ID
and REGION
as required options?
public static final ConfigOption<String> REGION = | ||
ConfigOptions.key(AWSConfigConstants.AWS_REGION) | ||
.stringType() | ||
.defaultValue(Region.US_WEST_1.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why us-west-1
? I prefer to have no default and make it required like the case in the connectors
import static org.apache.flink.table.catalog.glue.GlueCatalog.DEFAULT_DB; | ||
|
||
/** Collection of {@link ConfigOption} used in GlueCatalog. */ | ||
@Internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not Internal
suggest to go with PublicEvolving
class GlueCatalogOptionsUtilsTest { | ||
|
||
@Test | ||
void testGetValidatedConfigurations() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove If not needed
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html">...</a> | ||
*/ | ||
public static final String HTTP_CLIENT_CONNECTION_TIMEOUT_MS = | ||
"http-client.connection-timeout-ms"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
* <p>For more details, see <a | ||
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html">...</a> | ||
*/ | ||
public static final String HTTP_CLIENT_SOCKET_TIMEOUT_MS = "http-client.socket-timeout-ms"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
private static final String CLIENT_HTTP_PROTOCOL_VERSION_OPTION = "protocol.version"; | ||
private static final String CLIENT_HTTP_MAX_CONNECTION_TIMEOUT_MS = "connection-timeout-ms"; | ||
private static final String CLIENT_HTTP_MAX_SOCKET_TIMEOUT_MS = "socket-timeout-ms"; | ||
private static final String APACHE_MAX_CONNECTIONS = "apache.max-connections"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump
@@ -0,0 +1,7 @@ | |||
flink-catalog-aws-glue | |||
Copyright 2014-2023 The Apache Software Foundation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2024
* @see org.apache.flink.table.catalog.CatalogTable | ||
* @see org.apache.flink.table.catalog.ResolvedCatalogTable | ||
*/ | ||
public class TypeMapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annotation missing
public class TypeMapperTest { | ||
|
||
@Test | ||
public void testMapFlinkTypeToGlueType_Primitives() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Junit5 conventions don't suggest a prefix test
in test names
https://junit.org/junit5/docs/current/user-guide/
} | ||
|
||
@Test | ||
public void testMapFlinkTypeToGlueType_Array() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can use parameterized tests for all of these tests
@Test | ||
public void testGlueTypeToFlinkType_Array() { | ||
LogicalType arrayType = new ArrayType(new VarCharType(255)); | ||
assertEquals("array<string>", TypeMapper.mapFlinkTypeToGlueType(arrayType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not glue to Flink as test suggests
@Test | ||
public void testGlueTypeToFlinkType_Map() { | ||
LogicalType mapType = new MapType(new VarCharType(255), new IntType()); | ||
assertEquals("map<string,int>", TypeMapper.mapFlinkTypeToGlueType(mapType)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not glue to Flink as test suggests
</dependency> | ||
|
||
</dependencies> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this should be distributed as an uberjar, right?
Do we need to relocate the sdk dependencies?
This product includes software developed at | ||
The Apache Software Foundation (http://www.apache.org/). | ||
|
||
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this file needed? Unless we bundle and relocate any dependencies I don't think we need it.
* @param properties Map of properties. | ||
* @return fully qualified owner name. | ||
*/ | ||
public static String extractTableOwner(Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Ideally we return Optional and add it to builder if present
* @param properties Map of properties. | ||
* @return fully qualified owner name. | ||
*/ | ||
public static String extractTableOwner(Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why do we remove?
- removing is not tested
.tableType(table.getTableKind().name()) | ||
.lastAccessTime(Instant.now()) | ||
.owner(tableOwner) | ||
.viewExpandedText(GlueUtils.getExpandedQuery(table)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is not a mandatory parameter in builder and we don't really support those yet, why do we set those?
Purpose of the change
Add Glue catalog feature
Verifying this change
Tested in EMR cluster
Significant changes
(Please check any boxes [x] if the answer is "yes" )
@Public(Evolving)
)