Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
caican committed Oct 10, 2024
1 parent d8da103 commit e2900f7
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 41 deletions.
1 change: 1 addition & 0 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ dependencies {
testImplementation(libs.bundles.log4j)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
public static final String JDBC_PASSWORD = "jdbc.password";

public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
ImmutableMap.of(GRAVITINO_CATALOG_BACKEND, PAIMON_METASTORE, WAREHOUSE, WAREHOUSE, URI, URI, JDBC_USER, JDBC_USER, JDBC_PASSWORD, JDBC_PASSWORD);
ImmutableMap.of(
GRAVITINO_CATALOG_BACKEND,
PAIMON_METASTORE,
WAREHOUSE,
WAREHOUSE,
URI,
URI,
JDBC_USER,
JDBC_USER,
JDBC_PASSWORD,
JDBC_PASSWORD);
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
private static final Map<String, String> KERBEROS_CONFIGURATION =
ImmutableMap.of(
Expand Down Expand Up @@ -87,17 +97,17 @@ public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
null /* defaultValue */,
false /* hidden */),
stringOptionalPropertyEntry(
JDBC_USER,
"Paimon catalog jdbc user",
false /* immutable */,
null /* defaultValue */,
false /* hidden */),
JDBC_USER,
"Paimon catalog jdbc user",
false /* immutable */,
null /* defaultValue */,
false /* hidden */),
stringOptionalPropertyEntry(
JDBC_PASSWORD,
"Paimon catalog jdbc password",
false /* immutable */,
null /* defaultValue */,
false /* hidden */));
JDBC_PASSWORD,
"Paimon catalog jdbc password",
false /* immutable */,
null /* defaultValue */,
false /* hidden */));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap();
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ public class PaimonConfig extends Config {
.create();

public static final ConfigEntry<String> CATALOG_JDBC_USER =
new ConfigBuilder(PaimonCatalogPropertiesMetadata.JDBC_USER)
.doc("Paimon catalog jdbc user")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
new ConfigBuilder(PaimonCatalogPropertiesMetadata.JDBC_USER)
.doc("Paimon catalog jdbc user")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public static final ConfigEntry<String> CATALOG_JDBC_PASSWORD =
new ConfigBuilder(PaimonCatalogPropertiesMetadata.JDBC_PASSWORD)
.doc("Paimon catalog jdbc password")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
new ConfigBuilder(PaimonCatalogPropertiesMetadata.JDBC_PASSWORD)
.doc("Paimon catalog jdbc password")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public PaimonConfig() {
super(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package org.apache.gravitino.catalog.lakehouse.paimon.utils;

import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_BACKEND;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_JDBC_PASSWORD;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_JDBC_USER;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_URI;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_WAREHOUSE;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_JDBC_USER;
import static org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_JDBC_PASSWORD;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;

Expand Down Expand Up @@ -125,11 +125,12 @@ private static void checkPaimonConfig(PaimonConfig paimonConfig) {
if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(metastore)) {
String jdbcUser = paimonConfig.get(CATALOG_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser), "Paimon Catalog jdbc user can not be null or empty.");
StringUtils.isNotBlank(jdbcUser), "Paimon Catalog jdbc user can not be null or empty.");

String jdbcPassword = paimonConfig.get(CATALOG_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPassword), "Paimon Catalog jdbc password can not be null or empty.");
StringUtils.isNotBlank(jdbcPassword),
"Paimon Catalog jdbc password can not be null or empty.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.MySQLContainer;
import org.apache.gravitino.integration.test.util.AbstractIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.integration.test.util.TestDatabaseName;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
Expand Down Expand Up @@ -87,28 +89,33 @@
public abstract class CatalogPaimonBaseIT extends AbstractIT {

protected static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected static final TestDatabaseName TEST_DB_NAME =
TestDatabaseName.PG_TEST_ICEBERG_CATALOG_MULTIPLE_JDBC_LOAD;
protected static MySQLContainer mySQLContainer;
protected String WAREHOUSE;
protected String TYPE;
protected String URI;
protected String jdbcUser;
protected String jdbcPassword;
protected Catalog catalog;
protected org.apache.paimon.catalog.Catalog paimonCatalog;
protected String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake");
protected String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog");
protected String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema");
protected static final String schema_comment = "schema_comment";

private static final String provider = "lakehouse-paimon";
private static final String catalog_comment = "catalog_comment";
private static final String schema_comment = "schema_comment";
private static final String table_comment = "table_comment";
private static final String PAIMON_COL_NAME1 = "paimon_col_name1";
private static final String PAIMON_COL_NAME2 = "paimon_col_name2";
private static final String PAIMON_COL_NAME3 = "paimon_col_name3";
private static final String PAIMON_COL_NAME4 = "paimon_col_name4";
private static final String PAIMON_COL_NAME5 = "paimon_col_name5";
private static final String alertTableName = "alert_table_name";
private String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake");
private String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog");
private String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema");
private String tableName = GravitinoITUtils.genRandomName("paimon_it_table");
private static String INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE = "INSERT INTO paimon.%s VALUES %s";
private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM paimon.%s";
private GravitinoMetalake metalake;
private Catalog catalog;
private org.apache.paimon.catalog.Catalog paimonCatalog;
private SparkSession spark;
private Map<String, String> catalogProperties;

Expand Down Expand Up @@ -159,9 +166,7 @@ void testPaimonSchemaOperations() throws DatabaseNotExistException {

// load schema check.
Schema schema = schemas.loadSchema(schemaIdent.name());
// database properties is empty for Paimon FilesystemCatalog.
Assertions.assertTrue(schema.properties().isEmpty());
Assertions.assertTrue(paimonCatalog.loadDatabaseProperties(schemaIdent.name()).isEmpty());
Assertions.assertEquals(testSchemaName, schema.name());

Map<String, String> emptyMap = Collections.emptyMap();
Assertions.assertThrows(
Expand Down Expand Up @@ -198,6 +203,7 @@ void testPaimonSchemaOperations() throws DatabaseNotExistException {

@Test
void testCreateTableWithNullComment() {
String tableName = GravitinoITUtils.genRandomName("paimon_table_with_null_comment");
Column[] columns = createColumns();
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);

Expand All @@ -213,6 +219,8 @@ void testCreateTableWithNullComment() {
@Test
void testCreateAndLoadPaimonTable()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
String tableName = GravitinoITUtils.genRandomName("create_and_load_paimon_table");

// Create table from Gravitino API
Column[] columns = createColumns();

Expand Down Expand Up @@ -297,6 +305,8 @@ void testCreateAndLoadPaimonTable()
@Test
void testCreateAndLoadPaimonPartitionedTable()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
String tableName = GravitinoITUtils.genRandomName("create_and_load_paimon_partitioned_table");

// Create table from Gravitino API
Column[] columns = createColumns();

Expand Down Expand Up @@ -386,6 +396,8 @@ void testCreateAndLoadPaimonPartitionedTable()
@Test
void testCreateAndLoadPaimonPrimaryKeyTable()
throws org.apache.paimon.catalog.Catalog.TableNotExistException {
String tableName = GravitinoITUtils.genRandomName("create_and_load_paimon_primary_key_table");

// Create table from Gravitino API
Column[] columns = createColumns();
ArrayList<Column> newColumns = new ArrayList<>(Arrays.asList(columns));
Expand Down Expand Up @@ -611,6 +623,8 @@ void testListAndDropPaimonTable() throws DatabaseNotExistException {

@Test
public void testAlterPaimonTable() {
String tableName = GravitinoITUtils.genRandomName("alter_paimon_table");

Column[] columns = createColumns();
catalog
.asTableCatalog()
Expand Down Expand Up @@ -712,7 +726,7 @@ public void testAlterPaimonTable() {

Column[] newColumns = new Column[] {col1, col2, col3};
NameIdentifier tableIdentifier =
NameIdentifier.of(schemaName, GravitinoITUtils.genRandomName("PaimonAlterTableIT"));
NameIdentifier.of(schemaName, GravitinoITUtils.genRandomName("new_alter_paimon_table"));
catalog
.asTableCatalog()
.createTable(
Expand Down Expand Up @@ -853,9 +867,9 @@ void testOperationDataOfPaimonTable() {
}

private void clearTableAndSchema() {
if (catalog.asSchemas().schemaExists(schemaName)) {
catalog.asSchemas().dropSchema(schemaName, true);
}
SupportsSchemas supportsSchema = catalog.asSchemas();
Arrays.stream(supportsSchema.listSchemas())
.forEach(schema -> supportsSchema.dropSchema(schema, true));
}

private void createMetalake() {
Expand Down Expand Up @@ -894,10 +908,8 @@ private void createSchema() {
prop.put("key2", "val2");

Schema createdSchema = catalog.asSchemas().createSchema(ident.name(), schema_comment, prop);
// database properties is empty for Paimon FilesystemCatalog.
Schema loadSchema = catalog.asSchemas().loadSchema(ident.name());
Assertions.assertEquals(createdSchema.name(), loadSchema.name());
Assertions.assertTrue(loadSchema.properties().isEmpty());
}

private Column[] createColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SupportsSchemas;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.paimon.catalog.Catalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-test")
Expand All @@ -48,4 +55,22 @@ protected Map<String, String> initPaimonCatalogProperties() {

return catalogProperties;
}

@Test
void testPaimonSchemaProperties() throws Catalog.DatabaseNotExistException {
SupportsSchemas schemas = catalog.asSchemas();

// create schema.
String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
Map<String, String> schemaProperties = Maps.newHashMap();
schemaProperties.put("key1", "val1");
schemaProperties.put("key2", "val2");
schemas.createSchema(schemaIdent.name(), schema_comment, schemaProperties);

// load schema check, database properties is empty for Paimon FilesystemCatalog.
Schema schema = schemas.loadSchema(schemaIdent.name());
Assertions.assertTrue(schema.properties().isEmpty());
Assertions.assertTrue(paimonCatalog.loadDatabaseProperties(schemaIdent.name()).isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.lakehouse.paimon.integration.test;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SupportsSchemas;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.paimon.catalog.Catalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CatalogPaimonJdbcIT extends CatalogPaimonBaseIT {

@Override
protected Map<String, String> initPaimonCatalogProperties() {
containerSuite.startMySQLContainer(TEST_DB_NAME);
mySQLContainer = containerSuite.getMySQLContainer();

Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put("key1", "val1");
catalogProperties.put("key2", "val2");

TYPE = "jdbc";
WAREHOUSE =
String.format(
"hdfs://%s:%d/user/hive/warehouse-catalog-paimon/",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT);
URI = mySQLContainer.getJdbcUrl(TEST_DB_NAME);
jdbcUser = mySQLContainer.getUsername();
jdbcPassword = mySQLContainer.getPassword();

catalogProperties.put(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, TYPE);
catalogProperties.put(PaimonCatalogPropertiesMetadata.WAREHOUSE, WAREHOUSE);
catalogProperties.put(PaimonCatalogPropertiesMetadata.URI, URI);
catalogProperties.put(PaimonCatalogPropertiesMetadata.JDBC_USER, jdbcUser);
catalogProperties.put(PaimonCatalogPropertiesMetadata.JDBC_PASSWORD, jdbcPassword);

return catalogProperties;
}

@Test
void testPaimonSchemaProperties() throws Catalog.DatabaseNotExistException {
SupportsSchemas schemas = catalog.asSchemas();

// create schema check.
String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
Map<String, String> schemaProperties = Maps.newHashMap();
schemaProperties.put("key1", "val1");
schemaProperties.put("key2", "val2");
Schema createdSchema =
schemas.createSchema(schemaIdent.name(), schema_comment, schemaProperties);
Assertions.assertEquals(createdSchema.properties().get("key1"), "val1");
Assertions.assertEquals(createdSchema.properties().get("key2"), "val2");

// load schema check.
Schema schema = schemas.loadSchema(schemaIdent.name());
Assertions.assertEquals(schema.properties().get("key1"), "val1");
Assertions.assertEquals(schema.properties().get("key2"), "val2");
Map<String, String> loadedProps = paimonCatalog.loadDatabaseProperties(schemaIdent.name());
Assertions.assertEquals(loadedProps.get("key1"), "val1");
Assertions.assertEquals(loadedProps.get("key2"), "val2");
}
}

0 comments on commit e2900f7

Please sign in to comment.