Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
caican committed Oct 10, 2024
1 parent 169802b commit d08ff89
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 5 deletions.
1 change: 1 addition & 0 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ dependencies {
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.mysql.driver)
testImplementation(libs.postgresql.driver)
testImplementation(libs.h2db)
testImplementation(libs.bundles.log4j)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.paimon.s3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
/** The type of Apache Paimon catalog backend. */
public enum PaimonCatalogBackend {
FILESYSTEM,
JDBC
JDBC,
HIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
protected String jdbcPassword;
protected Catalog catalog;
protected org.apache.paimon.catalog.Catalog paimonCatalog;
protected SparkSession spark;
protected String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake");
protected String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog");
protected String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema");
Expand All @@ -115,8 +116,8 @@ public abstract class CatalogPaimonBaseIT extends AbstractIT {
private static final String alertTableName = "alert_table_name";
private static String INSERT_BATCH_WITHOUT_PARTITION_TEMPLATE = "INSERT INTO paimon.%s VALUES %s";
private static final String SELECT_ALL_TEMPLATE = "SELECT * FROM paimon.%s";
private static final String DEFAULT_DB = "default";
private GravitinoMetalake metalake;
protected SparkSession spark;
private Map<String, String> catalogProperties;

@BeforeAll
Expand Down Expand Up @@ -726,7 +727,7 @@ public void testAlterPaimonTable() {
// update column position
Column col1 = Column.of("name", Types.StringType.get(), "comment");
Column col2 = Column.of("address", Types.StringType.get(), "comment");
Column col3 = Column.of("date_of_birth", Types.DateType.get(), "comment");
Column col3 = Column.of("date_of_birth", Types.StringType.get(), "comment");

Column[] newColumns = new Column[] {col1, col2, col3};
NameIdentifier tableIdentifier =
Expand Down Expand Up @@ -873,7 +874,13 @@ void testOperationDataOfPaimonTable() {
private void clearTableAndSchema() {
SupportsSchemas supportsSchema = catalog.asSchemas();
Arrays.stream(supportsSchema.listSchemas())
.forEach(schema -> supportsSchema.dropSchema(schema, true));
.forEach(
schema -> {
// can not drop default database for hive backend.
if (!DEFAULT_DB.equalsIgnoreCase(schema)) {
supportsSchema.dropSchema(schema, true);
}
});
}

private void createMetalake() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.lakehouse.paimon.integration.test;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SupportsSchemas;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.paimon.catalog.Catalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CatalogPaimonHiveIT extends CatalogPaimonBaseIT {

@Override
protected Map<String, String> initPaimonCatalogProperties() {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put("key1", "val1");
catalogProperties.put("key2", "val2");

TYPE = "hive";
WAREHOUSE =
String.format(
"hdfs://%s:%d/user/hive/warehouse-catalog-paimon/",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT);
URI =
String.format(
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);

catalogProperties.put(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, TYPE);
catalogProperties.put(PaimonCatalogPropertiesMetadata.WAREHOUSE, WAREHOUSE);
catalogProperties.put(PaimonCatalogPropertiesMetadata.URI, URI);

return catalogProperties;
}

@Test
void testPaimonSchemaProperties() throws Catalog.DatabaseNotExistException {
SupportsSchemas schemas = catalog.asSchemas();

// create schema check.
String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1");
NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName);
Map<String, String> schemaProperties = Maps.newHashMap();
schemaProperties.put("key", "hive");
Schema createdSchema =
schemas.createSchema(schemaIdent.name(), schema_comment, schemaProperties);
Assertions.assertEquals(createdSchema.properties().get("key"), "hive");

// load schema check.
Schema schema = schemas.loadSchema(schemaIdent.name());
Assertions.assertEquals(schema.properties().get("key"), "hive");
Map<String, String> loadedProps = paimonCatalog.loadDatabaseProperties(schemaIdent.name());
Assertions.assertEquals(loadedProps.get("key"), "hive");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import java.util.function.Consumer;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig;
import org.apache.gravitino.integration.test.container.ContainerSuite;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.factories.FactoryException;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.jdbc.JdbcCatalog;
import org.junit.jupiter.api.Test;

/** Tests for {@link org.apache.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils}. */
Expand All @@ -41,6 +45,10 @@ public class TestCatalogUtils {
void testLoadCatalogBackend() throws Exception {
// Test load FileSystemCatalog for filesystem metastore.
assertCatalog(PaimonCatalogBackend.FILESYSTEM.name(), FileSystemCatalog.class);
// Test load JdbcCatalog for jdbc metastore.
assertCatalog(PaimonCatalogBackend.JDBC.name(), JdbcCatalog.class);
// Test load HiveCatalog for hive metastore.
assertCatalog(PaimonCatalogBackend.HIVE.name(), HiveCatalog.class);
// Test load catalog exception for other metastore.
assertThrowsExactly(FactoryException.class, () -> assertCatalog("other", catalog -> {}));
}
Expand All @@ -51,6 +59,7 @@ private void assertCatalog(String metastore, Class<?> expected) throws Exception
}

private void assertCatalog(String metastore, Consumer<Catalog> consumer) throws Exception {

try (Catalog catalog =
loadCatalogBackend(
new PaimonConfig(
Expand All @@ -63,9 +72,29 @@ private void assertCatalog(String metastore, Consumer<Catalog> consumer) throws
System.getProperty("java.io.tmpdir"),
"paimon_catalog_warehouse"),
PaimonConfig.CATALOG_URI.getKey(),
"uri")))
generateUri(metastore),
PaimonConfig.CATALOG_JDBC_USER.getKey(),
"user",
PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(),
"password")))
.getCatalog()) {
consumer.accept(catalog);
}
}

private static String generateUri(String metastore) {
String uri = "uri";
if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(metastore)) {
uri = "jdbc:h2:mem:testdb";
} else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(metastore)) {
ContainerSuite containerSuite = ContainerSuite.getInstance();
containerSuite.startHiveContainer();
uri =
String.format(
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
}
return uri;
}
}

0 comments on commit d08ff89

Please sign in to comment.