From 92b8cd2d675958d1439f09eed8e5c515cf655396 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 14 Jan 2025 18:52:01 +0800 Subject: [PATCH] fixed --- docs/flink-connector/flink-catalog-iceberg.md | 19 +++++----- .../iceberg/IcebergPropertiesConstants.java | 23 ++++++----- .../iceberg/IcebergPropertiesConverter.java | 1 + .../integration/test/FlinkCommonIT.java | 38 +++++++++---------- .../test/hive/FlinkHiveCatalogIT.java | 2 +- .../test/iceberg/FlinkIcebergCatalogIT.java | 8 ++-- .../test/paimon/FlinkPaimonCatalogIT.java | 2 +- 7 files changed, 49 insertions(+), 44 deletions(-) diff --git a/docs/flink-connector/flink-catalog-iceberg.md b/docs/flink-connector/flink-catalog-iceberg.md index 002e044b6b2..77cab57e778 100644 --- a/docs/flink-connector/flink-catalog-iceberg.md +++ b/docs/flink-connector/flink-catalog-iceberg.md @@ -26,7 +26,7 @@ To enable the Flink connector, you must download the Iceberg Flink runtime JAR a - View operations - Metadata tables, like: - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots` -- Querying UDF +- Query UDF - `UPDATE` clause - `DELETE` clause - `CREATE TABLE LIKE` clause @@ -59,13 +59,15 @@ SELECT * FROM sample WHERE data = 'B'; The Gravitino Flink connector transforms the following properties in a catalog to Flink connector configuration. -| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version | -|---------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------|-------------------| -| `catalog-backend` | `catalog-type` | Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and `Rest` in Continuous Validation | 0.8.0-incubating | -| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating | -| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating | -| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating | -| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating | +| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version | +|---------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| +| `catalog-backend` | `catalog-type` | Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and `Rest` in Continuous Validation | 0.8.0-incubating | +| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating | +| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating | +| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating | +| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating | +| `oss-access-key-id` | `client.access-key-id` | The static access key ID used to access OSS data. | 0.8.0-incubating | +| `oss-secret-access-key` | `client.access-key-secret` | The static secret access key used to access OSS data. | 0.8.0-incubating | Gravitino catalog property names with the prefix `flink.bypass.` are passed to Flink iceberg connector. For example, using `flink.bypass.clients` to pass the `clients` to the Flink iceberg connector. @@ -73,5 +75,4 @@ Gravitino catalog property names with the prefix `flink.bypass.` are passed to F ### OSS -You need to add an OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`. Additionally, you need download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip), and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to the Flink classpath. diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java index 8f86ae5c50b..9aff51e2aab 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConstants.java @@ -25,22 +25,25 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.flink.FlinkCatalogFactory; -public interface IcebergPropertiesConstants { - @VisibleForTesting String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND; +public class IcebergPropertiesConstants { + @VisibleForTesting + static String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND; - String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE; + static final String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE; - String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE; + static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE; - String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION; + static final String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION; - String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI; + static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI; - String ICEBERG_CATALOG_URI = CatalogProperties.URI; + static final String ICEBERG_CATALOG_URI = CatalogProperties.URI; - @VisibleForTesting String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; + @VisibleForTesting + static String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; - String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; + static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; - @VisibleForTesting String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST; + @VisibleForTesting + static final String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST; } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java index 6510ef0167d..7684d3eadbb 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -61,6 +61,7 @@ public Map toFlinkCatalogProperties(Map gravitin all.put( CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); // Map "catalog-backend" to "catalog-type". + // TODO If catalog backend is CUSTOM, we need special compatibility logic. GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach( (key, value) -> { if (all.containsKey(key)) { diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 84f90194227..b45e5f46ec2 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -78,7 +78,7 @@ protected boolean supportGetSchemaWithoutCommentAndOption() { protected abstract String getProvider(); - protected abstract boolean dropCascade(); + protected abstract boolean supportDropCascade(); @Test public void testCreateSchema() { @@ -91,14 +91,14 @@ public void testCreateSchema() { TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); catalog.asSchemas().schemaExists(schema); } finally { - catalog.asSchemas().dropSchema(schema, dropCascade()); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); } @Test - @EnabledIf("testGetSchemaWithoutCommentAndOption") + @EnabledIf("supportGetSchemaWithoutCommentAndOption") public void testGetSchemaWithoutCommentAndOption() { doWithCatalog( currentCatalog(), @@ -147,7 +147,7 @@ public void testGetSchemaWithCommentAndOptions() { Assertions.assertEquals( location, loadedSchema.properties().get(HiveConstants.LOCATION)); } finally { - catalog.asSchemas().dropSchema(schema, dropCascade()); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); } }); @@ -185,9 +185,9 @@ public void testListSchema() { Assertions.assertEquals(schema2, schemas[2]); Assertions.assertEquals(schema3, schemas[3]); } finally { - catalog.asSchemas().dropSchema(schema, dropCascade()); - catalog.asSchemas().dropSchema(schema2, dropCascade()); - catalog.asSchemas().dropSchema(schema3, dropCascade()); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); + catalog.asSchemas().dropSchema(schema2, supportDropCascade()); + catalog.asSchemas().dropSchema(schema3, supportDropCascade()); Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); } }); @@ -225,7 +225,7 @@ public void testAlterSchemaWithCommentAndOptions() { Assertions.assertEquals("new-value", reloadedSchema.properties().get("key1")); Assertions.assertEquals("value3", reloadedSchema.properties().get("key3")); } finally { - catalog.asSchemas().dropSchema(schema, dropCascade()); + catalog.asSchemas().dropSchema(schema, supportDropCascade()); } }); } @@ -277,7 +277,7 @@ public void testCreateSimpleTable() { Row.of("B", 2.0)); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -311,7 +311,7 @@ public void testListTables() { Row.of("test_table2")); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -332,7 +332,7 @@ public void testDropTable() { Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier)); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -387,7 +387,7 @@ public void testGetSimpleTable() { } }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -424,7 +424,7 @@ public void testRenameColumn() { assertColumns(expected, actual); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -486,7 +486,7 @@ public void testAlterTableComment() { } }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -523,7 +523,7 @@ public void testAlterTableAddColumn() { assertColumns(expected, actual); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -555,7 +555,7 @@ public void testAlterTableDropColumn() { assertColumns(expected, actual); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -598,7 +598,7 @@ public void testAlterColumnTypeAndChangeOrder() { assertColumns(expected, actual); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -627,7 +627,7 @@ public void testRenameTable() { catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, newTableName))); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -671,6 +671,6 @@ public void testAlterTableProperties() { Assertions.assertNull(properties.get("key2")); }, true, - dropCascade()); + supportDropCascade()); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 42cf6bb5326..3c161fb35be 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -593,7 +593,7 @@ protected String getProvider() { } @Override - protected boolean dropCascade() { + protected boolean supportDropCascade() { return true; } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index 8f2f709b57a..0834def90b7 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -294,7 +294,7 @@ public void testIcebergTableWithPartition() { Row.of(2, "B")); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -367,7 +367,7 @@ public void testCreateIcebergTable() { Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning()); }, true, - dropCascade()); + supportDropCascade()); } @Test @@ -474,7 +474,7 @@ public void testGetIcebergTable() { } }, true, - dropCascade()); + supportDropCascade()); } @Override @@ -493,7 +493,7 @@ protected boolean supportGetSchemaWithoutCommentAndOption() { } @Override - protected boolean dropCascade() { + protected boolean supportDropCascade() { return false; } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 4494ec9b91b..49f48518bb3 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -63,7 +63,7 @@ protected String getProvider() { } @Override - protected boolean dropCascade() { + protected boolean supportDropCascade() { return true; }