Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 14, 2025
1 parent 9ec6317 commit 92b8cd2
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 44 deletions.
19 changes: 10 additions & 9 deletions docs/flink-connector/flink-catalog-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To enable the Flink connector, you must download the Iceberg Flink runtime JAR a
- View operations
- Metadata tables, like:
- `{iceberg_catalog}.{iceberg_database}.{iceberg_table}&snapshots`
- Querying UDF
- Query UDF
- `UPDATE` clause
- `DELETE` clause
- `CREATE TABLE LIKE` clause
Expand Down Expand Up @@ -59,19 +59,20 @@ SELECT * FROM sample WHERE data = 'B';
The Gravitino Flink connector transforms the following properties in a catalog to Flink connector configuration.


| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version |
|---------------------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------|-------------------|
| `catalog-backend` | `catalog-type` | Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and `Rest` in Continuous Validation | 0.8.0-incubating |
| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating |
| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating |
| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating |
| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating |
| Gravitino catalog property name | Flink Iceberg connector configuration | Description | Since Version |
|---------------------------------|---------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------|
| `catalog-backend` | `catalog-type` | Catalog backend type, currently, only `Hive` Catalog is supported, `JDBC` and `Rest` in Continuous Validation | 0.8.0-incubating |
| `uri` | `uri` | Catalog backend URI | 0.8.0-incubating |
| `warehouse` | `warehouse` | Catalog backend warehouse | 0.8.0-incubating |
| `io-impl` | `io-impl` | The IO implementation for `FileIO` in Iceberg. | 0.8.0-incubating |
| `oss-endpoint` | `oss.endpoint` | The endpoint of Aliyun OSS service. | 0.8.0-incubating |
| `oss-access-key-id` | `client.access-key-id` | The static access key ID used to access OSS data. | 0.8.0-incubating |
| `oss-secret-access-key` | `client.access-key-secret` | The static secret access key used to access OSS data. | 0.8.0-incubating |

Gravitino catalog property names with the prefix `flink.bypass.` are passed to Flink iceberg connector. For example, using `flink.bypass.clients` to pass the `clients` to the Flink iceberg connector.

## Storage

### OSS

You need to add an OSS secret key to the Flink configuration using `client.access-key-id` and `client.access-key-secret`.
Additionally, you need download the [Aliyun OSS SDK](https://gosspublic.alicdn.com/sdks/java/aliyun_java_sdk_3.10.2.zip), and copy `aliyun-sdk-oss-3.10.2.jar`, `hamcrest-core-1.1.jar`, `jdom2-2.0.6.jar` to the Flink classpath.
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,25 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.flink.FlinkCatalogFactory;

public interface IcebergPropertiesConstants {
@VisibleForTesting String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND;
public class IcebergPropertiesConstants {
@VisibleForTesting
static String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND;

String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE;
static final String ICEBERG_CATALOG_TYPE = FlinkCatalogFactory.ICEBERG_CATALOG_TYPE;

String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE;
static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE;

String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION;
static final String ICEBERG_CATALOG_WAREHOUSE = CatalogProperties.WAREHOUSE_LOCATION;

String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI;
static final String GRAVITINO_ICEBERG_CATALOG_URI = IcebergConstants.URI;

String ICEBERG_CATALOG_URI = CatalogProperties.URI;
static final String ICEBERG_CATALOG_URI = CatalogProperties.URI;

@VisibleForTesting String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;
@VisibleForTesting
static String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;

String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";

@VisibleForTesting String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
@VisibleForTesting
static final String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST;
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitin
all.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
// Map "catalog-backend" to "catalog-type".
// TODO If catalog backend is CUSTOM, we need special compatibility logic.
GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
(key, value) -> {
if (all.containsKey(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {

protected abstract String getProvider();

protected abstract boolean dropCascade();
protected abstract boolean supportDropCascade();

@Test
public void testCreateSchema() {
Expand All @@ -91,14 +91,14 @@ public void testCreateSchema() {
TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS);
catalog.asSchemas().schemaExists(schema);
} finally {
catalog.asSchemas().dropSchema(schema, dropCascade());
catalog.asSchemas().dropSchema(schema, supportDropCascade());
Assertions.assertFalse(catalog.asSchemas().schemaExists(schema));
}
});
}

@Test
@EnabledIf("testGetSchemaWithoutCommentAndOption")
@EnabledIf("supportGetSchemaWithoutCommentAndOption")
public void testGetSchemaWithoutCommentAndOption() {
doWithCatalog(
currentCatalog(),
Expand Down Expand Up @@ -147,7 +147,7 @@ public void testGetSchemaWithCommentAndOptions() {
Assertions.assertEquals(
location, loadedSchema.properties().get(HiveConstants.LOCATION));
} finally {
catalog.asSchemas().dropSchema(schema, dropCascade());
catalog.asSchemas().dropSchema(schema, supportDropCascade());
Assertions.assertFalse(catalog.asSchemas().schemaExists(schema));
}
});
Expand Down Expand Up @@ -185,9 +185,9 @@ public void testListSchema() {
Assertions.assertEquals(schema2, schemas[2]);
Assertions.assertEquals(schema3, schemas[3]);
} finally {
catalog.asSchemas().dropSchema(schema, dropCascade());
catalog.asSchemas().dropSchema(schema2, dropCascade());
catalog.asSchemas().dropSchema(schema3, dropCascade());
catalog.asSchemas().dropSchema(schema, supportDropCascade());
catalog.asSchemas().dropSchema(schema2, supportDropCascade());
catalog.asSchemas().dropSchema(schema3, supportDropCascade());
Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length);
}
});
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testAlterSchemaWithCommentAndOptions() {
Assertions.assertEquals("new-value", reloadedSchema.properties().get("key1"));
Assertions.assertEquals("value3", reloadedSchema.properties().get("key3"));
} finally {
catalog.asSchemas().dropSchema(schema, dropCascade());
catalog.asSchemas().dropSchema(schema, supportDropCascade());
}
});
}
Expand Down Expand Up @@ -277,7 +277,7 @@ public void testCreateSimpleTable() {
Row.of("B", 2.0));
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testListTables() {
Row.of("test_table2"));
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand All @@ -332,7 +332,7 @@ public void testDropTable() {
Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier));
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -387,7 +387,7 @@ public void testGetSimpleTable() {
}
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -424,7 +424,7 @@ public void testRenameColumn() {
assertColumns(expected, actual);
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -486,7 +486,7 @@ public void testAlterTableComment() {
}
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -523,7 +523,7 @@ public void testAlterTableAddColumn() {
assertColumns(expected, actual);
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -555,7 +555,7 @@ public void testAlterTableDropColumn() {
assertColumns(expected, actual);
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -598,7 +598,7 @@ public void testAlterColumnTypeAndChangeOrder() {
assertColumns(expected, actual);
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -627,7 +627,7 @@ public void testRenameTable() {
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName, newTableName)));
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -671,6 +671,6 @@ public void testAlterTableProperties() {
Assertions.assertNull(properties.get("key2"));
},
true,
dropCascade());
supportDropCascade());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ protected String getProvider() {
}

@Override
protected boolean dropCascade() {
protected boolean supportDropCascade() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testIcebergTableWithPartition() {
Row.of(2, "B"));
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -367,7 +367,7 @@ public void testCreateIcebergTable() {
Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
},
true,
dropCascade());
supportDropCascade());
}

@Test
Expand Down Expand Up @@ -474,7 +474,7 @@ public void testGetIcebergTable() {
}
},
true,
dropCascade());
supportDropCascade());
}

@Override
Expand All @@ -493,7 +493,7 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {
}

@Override
protected boolean dropCascade() {
protected boolean supportDropCascade() {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected String getProvider() {
}

@Override
protected boolean dropCascade() {
protected boolean supportDropCascade() {
return true;
}

Expand Down

0 comments on commit 92b8cd2

Please sign in to comment.