From 0de24a0cfac5b40316a97092dbfb769218602720 Mon Sep 17 00:00:00 2001 From: Rui Wang Date: Sun, 28 Jul 2024 22:03:57 -0700 Subject: [PATCH] [FOLLOW-UP] Format Spark Integration code by javafmtAll (#291) **Description of changes** [FOLLOW-UP] Format Spark Integration code by javafmtAll. --- .../connectors/spark/UCSingleCatalog.scala | 1 - .../spark/CredentialTestFileSystem.java | 135 ++-- .../spark/SparkIntegrationTest.java | 575 +++++++++--------- 3 files changed, 355 insertions(+), 356 deletions(-) diff --git a/connectors/spark/src/main/scala/io/unitycatalog/connectors/spark/UCSingleCatalog.scala b/connectors/spark/src/main/scala/io/unitycatalog/connectors/spark/UCSingleCatalog.scala index d82e9592c..9b7461c79 100644 --- a/connectors/spark/src/main/scala/io/unitycatalog/connectors/spark/UCSingleCatalog.scala +++ b/connectors/spark/src/main/scala/io/unitycatalog/connectors/spark/UCSingleCatalog.scala @@ -6,7 +6,6 @@ import io.unitycatalog.client.model.{AwsCredentials, GenerateTemporaryTableCrede import java.net.URI import java.util - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} diff --git a/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/CredentialTestFileSystem.java b/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/CredentialTestFileSystem.java index 81721597e..5466e1095 100644 --- a/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/CredentialTestFileSystem.java +++ b/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/CredentialTestFileSystem.java @@ -1,88 +1,87 @@ package io.unitycatalog.connectors.spark; +import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.util.Progressable; -import java.io.IOException; - // A wrapper over the local file system to test UC table credentials. public class CredentialTestFileSystem extends RawLocalFileSystem { - @Override - protected void checkPath(Path path) { - // Do nothing. - } + @Override + protected void checkPath(Path path) { + // Do nothing. + } - @Override - public FSDataOutputStream create( - Path f, - boolean overwrite, - int bufferSize, - short replication, - long blockSize, - Progressable progress) throws IOException { - return super.create( - toLocalPath(f), overwrite, bufferSize, replication, blockSize, progress); - } + @Override + public FSDataOutputStream create( + Path f, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress) + throws IOException { + return super.create(toLocalPath(f), overwrite, bufferSize, replication, blockSize, progress); + } - @Override - public FileStatus getFileStatus(Path f) throws IOException { - if (f.toString().startsWith("s3:")) { - String s3Prefix = "s3://" + f.toUri().getHost(); - return restorePathInFileStatus(s3Prefix, super.getFileStatus(toLocalPath(f))); - } else { - assert f.toString().startsWith("file:"); - return super.getFileStatus(f); - } + @Override + public FileStatus getFileStatus(Path f) throws IOException { + if (f.toString().startsWith("s3:")) { + String s3Prefix = "s3://" + f.toUri().getHost(); + return restorePathInFileStatus(s3Prefix, super.getFileStatus(toLocalPath(f))); + } else { + assert f.toString().startsWith("file:"); + return super.getFileStatus(f); } + } - @Override - public FSDataInputStream open(Path f) throws IOException { - return super.open(toLocalPath(f)); - } + @Override + public FSDataInputStream open(Path f) throws IOException { + return super.open(toLocalPath(f)); + } - @Override - public RemoteIterator listFiles(Path f, boolean recursive) throws IOException { - throw new RuntimeException("implement it when testing s3a"); - } + @Override + public RemoteIterator listFiles(Path f, boolean recursive) throws IOException { + throw new RuntimeException("implement it when testing s3a"); + } - @Override - public FileStatus[] listStatus(Path f) throws IOException { - String s3Prefix = "s3://" + f.toUri().getHost(); - FileStatus[] files = super.listStatus(toLocalPath(f)); - FileStatus[] res = new FileStatus[files.length]; - for (int i = 0; i < files.length; i++) { - res[i] = restorePathInFileStatus(s3Prefix, files[i]); - } - return res; + @Override + public FileStatus[] listStatus(Path f) throws IOException { + String s3Prefix = "s3://" + f.toUri().getHost(); + FileStatus[] files = super.listStatus(toLocalPath(f)); + FileStatus[] res = new FileStatus[files.length]; + for (int i = 0; i < files.length; i++) { + res[i] = restorePathInFileStatus(s3Prefix, files[i]); } + return res; + } - private FileStatus restorePathInFileStatus(String s3Prefix, FileStatus f) { - String path = f.getPath().toString().replace("file:", s3Prefix); - return new FileStatus( - f.getLen(), - f.isDirectory(), - f.getReplication(), - f.getBlockSize(), - f.getModificationTime(), - new Path(path)); - } + private FileStatus restorePathInFileStatus(String s3Prefix, FileStatus f) { + String path = f.getPath().toString().replace("file:", s3Prefix); + return new FileStatus( + f.getLen(), + f.isDirectory(), + f.getReplication(), + f.getBlockSize(), + f.getModificationTime(), + new Path(path)); + } - private Path toLocalPath(Path f) { - Configuration conf = getConf(); - String host = f.toUri().getHost(); - if ("test-bucket0".equals(host)) { - assert "accessKey0".equals(conf.get("fs.s3a.access.key")); - assert "secretKey0".equals(conf.get("fs.s3a.secret.key")); - assert "sessionToken0".equals(conf.get("fs.s3a.session.token")); - } else if ("test-bucket1".equals(host)) { - assert "accessKey1".equals(conf.get("fs.s3a.access.key")); - assert "secretKey1".equals(conf.get("fs.s3a.secret.key")); - assert "sessionToken1".equals(conf.get("fs.s3a.session.token")); - } else { - throw new RuntimeException("invalid path: " + f); - } - return new Path(f.toString().replaceAll("s3://.*?/", "file:///")); + private Path toLocalPath(Path f) { + Configuration conf = getConf(); + String host = f.toUri().getHost(); + if ("test-bucket0".equals(host)) { + assert "accessKey0".equals(conf.get("fs.s3a.access.key")); + assert "secretKey0".equals(conf.get("fs.s3a.secret.key")); + assert "sessionToken0".equals(conf.get("fs.s3a.session.token")); + } else if ("test-bucket1".equals(host)) { + assert "accessKey1".equals(conf.get("fs.s3a.access.key")); + assert "secretKey1".equals(conf.get("fs.s3a.secret.key")); + assert "sessionToken1".equals(conf.get("fs.s3a.session.token")); + } else { + throw new RuntimeException("invalid path: " + f); } + return new Path(f.toString().replaceAll("s3://.*?/", "file:///")); + } } diff --git a/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/SparkIntegrationTest.java b/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/SparkIntegrationTest.java index 9f41d10d7..cf10198d0 100644 --- a/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/SparkIntegrationTest.java +++ b/connectors/spark/src/test/java/io/unitycatalog/connectors/spark/SparkIntegrationTest.java @@ -1,5 +1,9 @@ package io.unitycatalog.connectors.spark; +import static io.unitycatalog.server.utils.TestUtils.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import io.unitycatalog.client.ApiException; import io.unitycatalog.client.model.*; import io.unitycatalog.server.base.BaseCRUDTest; @@ -11,305 +15,302 @@ import io.unitycatalog.server.sdk.schema.SdkSchemaOperations; import io.unitycatalog.server.sdk.tables.SdkTableOperations; import io.unitycatalog.server.utils.TestUtils; +import java.io.File; +import java.io.IOException; +import java.util.*; import org.apache.spark.network.util.JavaUtils; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.File; -import java.io.IOException; -import java.util.*; - -import static io.unitycatalog.server.utils.TestUtils.*; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - public class SparkIntegrationTest extends BaseCRUDTest { - private static final String SPARK_CATALOG = "spark_catalog"; - private static final String PARQUET_TABLE = "test_parquet"; - private static final String ANOTHER_PARQUET_TABLE = "test_parquet_another"; - private static final String PARQUET_TABLE_PARTITIONED = "test_parquet_partitioned"; - private static final String DELTA_TABLE = "test_delta"; - private static final String DELTA_TABLE_PARTITIONED = "test_delta_partitioned"; - - - final private File dataDir = new File(System.getProperty("java.io.tmpdir"), "spark_test"); - - @Test - public void testParquetReadWrite() throws IOException, ApiException { - createCommonResources(); - SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG); - // Spark only allow `spark_catalog` to return built-in file source tables. - setupExternalParquetTable(PARQUET_TABLE, new ArrayList<>(0)); - testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + PARQUET_TABLE, session); - - setupExternalParquetTable(PARQUET_TABLE_PARTITIONED, Arrays.asList("s")); - testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + PARQUET_TABLE_PARTITIONED, session); - - session.stop(); - } - - @Test - public void testDeltaReadWrite() throws IOException, ApiException { - createCommonResources(); - // Test both `spark_catalog` and other catalog names. - SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG, CATALOG_NAME); - - setupExternalDeltaTable(SPARK_CATALOG, DELTA_TABLE, new ArrayList<>(0), session); - testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + DELTA_TABLE, session); - - setupExternalDeltaTable(SPARK_CATALOG, DELTA_TABLE_PARTITIONED, Arrays.asList("s"), session); - testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + DELTA_TABLE_PARTITIONED, session); - - setupExternalDeltaTable(CATALOG_NAME, DELTA_TABLE, new ArrayList<>(0), session); - testTableReadWrite(CATALOG_NAME + "." + SCHEMA_NAME + "." + DELTA_TABLE, session); - - setupExternalDeltaTable(CATALOG_NAME, DELTA_TABLE_PARTITIONED, Arrays.asList("s"), session); - testTableReadWrite(CATALOG_NAME + "." + SCHEMA_NAME + "." + DELTA_TABLE_PARTITIONED, session); - - session.stop(); - } - - @Test - public void testDeltaPathTable() throws IOException, ApiException { - createCommonResources(); - // We must replace the `spark_catalog` in order to support Delta path tables. - SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG); - - String path1 = new File(dataDir, "test_delta_path1").getCanonicalPath(); - String tableName1 = String.format("delta.`%s`", path1); - session.sql(String.format("CREATE TABLE %s(i INT) USING delta", tableName1)); - assertTrue( - session.sql("SELECT * FROM " + tableName1).collectAsList().isEmpty()); - session.sql("INSERT INTO " + tableName1 + " SELECT 1"); - assertEquals(1, - session.sql("SELECT * FROM " + tableName1).collectAsList().get(0).getInt(0)); - - // Test CTAS - String path2 = new File(dataDir, "test_delta_path2").getCanonicalPath(); - String tableName2 = String.format("delta.`%s`", path2); - session.sql(String.format("CREATE TABLE %s USING delta AS SELECT 1 AS i", tableName2)); - assertEquals(1, - session.sql("SELECT * FROM " + tableName2).collectAsList().get(0).getInt(0)); - - session.stop(); - } - - @Test - public void testCredentialParquet() throws ApiException, IOException { - createCommonResources(); - SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG); - - String loc0 = "s3://test-bucket0" + generateTableLocation(SPARK_CATALOG, PARQUET_TABLE); - setupExternalParquetTable(PARQUET_TABLE, loc0, new ArrayList<>(0)); - String t1 = SPARK_CATALOG + "." + SCHEMA_NAME + "." + PARQUET_TABLE; - testTableReadWrite(t1, session); - - String loc1 = "s3://test-bucket1" + generateTableLocation(SPARK_CATALOG, ANOTHER_PARQUET_TABLE); - setupExternalParquetTable(ANOTHER_PARQUET_TABLE, loc1, new ArrayList<>(0)); - String t2 = SPARK_CATALOG + "." + SCHEMA_NAME + "." + ANOTHER_PARQUET_TABLE; - testTableReadWrite(t2, session); - - Row row = session.sql(String.format("SELECT l.i FROM %s l JOIN %s r ON l.i = r.i", t1, t2)) - .collectAsList().get(0); - assertEquals(1, row.getInt(0)); - - session.stop(); - } - - @Test - public void testCredentialDelta() throws ApiException, IOException { - createCommonResources(); - SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG, CATALOG_NAME); - - String loc0 = "s3://test-bucket0" + generateTableLocation(SPARK_CATALOG, DELTA_TABLE); - setupExternalDeltaTable(SPARK_CATALOG, DELTA_TABLE, loc0, new ArrayList<>(0), session); - String t1 = SPARK_CATALOG + "." + SCHEMA_NAME + "." + DELTA_TABLE; - testTableReadWrite(t1, session); - - String loc1 = "s3://test-bucket1" + generateTableLocation(CATALOG_NAME, DELTA_TABLE); - setupExternalDeltaTable(CATALOG_NAME, DELTA_TABLE, loc1, new ArrayList<>(0), session); - String t2 = CATALOG_NAME + "." + SCHEMA_NAME + "." + DELTA_TABLE; - testTableReadWrite(t2, session); - - Row row = session.sql(String.format("SELECT l.i FROM %s l JOIN %s r ON l.i = r.i", t1, t2)) - .collectAsList().get(0); - assertEquals(1, row.getInt(0)); - - session.stop(); - } - - private String generateTableLocation(String catalogName, String tableName) throws IOException { - return new File(new File(dataDir, catalogName), tableName).getCanonicalPath(); - } - - private void testTableReadWrite(String tableFullName, SparkSession session) { - assertTrue( - session.sql("SELECT * FROM " + tableFullName).collectAsList().isEmpty()); - session.sql("INSERT INTO " + tableFullName + " SELECT 1, 'a'"); - Row row = session.sql("SELECT * FROM " + tableFullName).collectAsList().get(0); - assertEquals(1, row.getInt(0)); - assertEquals("a", row.getString(1)); + private static final String SPARK_CATALOG = "spark_catalog"; + private static final String PARQUET_TABLE = "test_parquet"; + private static final String ANOTHER_PARQUET_TABLE = "test_parquet_another"; + private static final String PARQUET_TABLE_PARTITIONED = "test_parquet_partitioned"; + private static final String DELTA_TABLE = "test_delta"; + private static final String DELTA_TABLE_PARTITIONED = "test_delta_partitioned"; + + private final File dataDir = new File(System.getProperty("java.io.tmpdir"), "spark_test"); + + @Test + public void testParquetReadWrite() throws IOException, ApiException { + createCommonResources(); + SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG); + // Spark only allow `spark_catalog` to return built-in file source tables. + setupExternalParquetTable(PARQUET_TABLE, new ArrayList<>(0)); + testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + PARQUET_TABLE, session); + + setupExternalParquetTable(PARQUET_TABLE_PARTITIONED, Arrays.asList("s")); + testTableReadWrite( + SPARK_CATALOG + "." + SCHEMA_NAME + "." + PARQUET_TABLE_PARTITIONED, session); + + session.stop(); + } + + @Test + public void testDeltaReadWrite() throws IOException, ApiException { + createCommonResources(); + // Test both `spark_catalog` and other catalog names. + SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG, CATALOG_NAME); + + setupExternalDeltaTable(SPARK_CATALOG, DELTA_TABLE, new ArrayList<>(0), session); + testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + DELTA_TABLE, session); + + setupExternalDeltaTable(SPARK_CATALOG, DELTA_TABLE_PARTITIONED, Arrays.asList("s"), session); + testTableReadWrite(SPARK_CATALOG + "." + SCHEMA_NAME + "." + DELTA_TABLE_PARTITIONED, session); + + setupExternalDeltaTable(CATALOG_NAME, DELTA_TABLE, new ArrayList<>(0), session); + testTableReadWrite(CATALOG_NAME + "." + SCHEMA_NAME + "." + DELTA_TABLE, session); + + setupExternalDeltaTable(CATALOG_NAME, DELTA_TABLE_PARTITIONED, Arrays.asList("s"), session); + testTableReadWrite(CATALOG_NAME + "." + SCHEMA_NAME + "." + DELTA_TABLE_PARTITIONED, session); + + session.stop(); + } + + @Test + public void testDeltaPathTable() throws IOException, ApiException { + createCommonResources(); + // We must replace the `spark_catalog` in order to support Delta path tables. + SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG); + + String path1 = new File(dataDir, "test_delta_path1").getCanonicalPath(); + String tableName1 = String.format("delta.`%s`", path1); + session.sql(String.format("CREATE TABLE %s(i INT) USING delta", tableName1)); + assertTrue(session.sql("SELECT * FROM " + tableName1).collectAsList().isEmpty()); + session.sql("INSERT INTO " + tableName1 + " SELECT 1"); + assertEquals(1, session.sql("SELECT * FROM " + tableName1).collectAsList().get(0).getInt(0)); + + // Test CTAS + String path2 = new File(dataDir, "test_delta_path2").getCanonicalPath(); + String tableName2 = String.format("delta.`%s`", path2); + session.sql(String.format("CREATE TABLE %s USING delta AS SELECT 1 AS i", tableName2)); + assertEquals(1, session.sql("SELECT * FROM " + tableName2).collectAsList().get(0).getInt(0)); + + session.stop(); + } + + @Test + public void testCredentialParquet() throws ApiException, IOException { + createCommonResources(); + SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG); + + String loc0 = "s3://test-bucket0" + generateTableLocation(SPARK_CATALOG, PARQUET_TABLE); + setupExternalParquetTable(PARQUET_TABLE, loc0, new ArrayList<>(0)); + String t1 = SPARK_CATALOG + "." + SCHEMA_NAME + "." + PARQUET_TABLE; + testTableReadWrite(t1, session); + + String loc1 = "s3://test-bucket1" + generateTableLocation(SPARK_CATALOG, ANOTHER_PARQUET_TABLE); + setupExternalParquetTable(ANOTHER_PARQUET_TABLE, loc1, new ArrayList<>(0)); + String t2 = SPARK_CATALOG + "." + SCHEMA_NAME + "." + ANOTHER_PARQUET_TABLE; + testTableReadWrite(t2, session); + + Row row = + session + .sql(String.format("SELECT l.i FROM %s l JOIN %s r ON l.i = r.i", t1, t2)) + .collectAsList() + .get(0); + assertEquals(1, row.getInt(0)); + + session.stop(); + } + + @Test + public void testCredentialDelta() throws ApiException, IOException { + createCommonResources(); + SparkSession session = createSparkSessionWithCatalogs(SPARK_CATALOG, CATALOG_NAME); + + String loc0 = "s3://test-bucket0" + generateTableLocation(SPARK_CATALOG, DELTA_TABLE); + setupExternalDeltaTable(SPARK_CATALOG, DELTA_TABLE, loc0, new ArrayList<>(0), session); + String t1 = SPARK_CATALOG + "." + SCHEMA_NAME + "." + DELTA_TABLE; + testTableReadWrite(t1, session); + + String loc1 = "s3://test-bucket1" + generateTableLocation(CATALOG_NAME, DELTA_TABLE); + setupExternalDeltaTable(CATALOG_NAME, DELTA_TABLE, loc1, new ArrayList<>(0), session); + String t2 = CATALOG_NAME + "." + SCHEMA_NAME + "." + DELTA_TABLE; + testTableReadWrite(t2, session); + + Row row = + session + .sql(String.format("SELECT l.i FROM %s l JOIN %s r ON l.i = r.i", t1, t2)) + .collectAsList() + .get(0); + assertEquals(1, row.getInt(0)); + + session.stop(); + } + + private String generateTableLocation(String catalogName, String tableName) throws IOException { + return new File(new File(dataDir, catalogName), tableName).getCanonicalPath(); + } + + private void testTableReadWrite(String tableFullName, SparkSession session) { + assertTrue(session.sql("SELECT * FROM " + tableFullName).collectAsList().isEmpty()); + session.sql("INSERT INTO " + tableFullName + " SELECT 1, 'a'"); + Row row = session.sql("SELECT * FROM " + tableFullName).collectAsList().get(0); + assertEquals(1, row.getInt(0)); + assertEquals("a", row.getString(1)); + } + + private SchemaOperations schemaOperations; + private TableOperations tableOperations; + + private void createCommonResources() throws ApiException { + // Common setup operations such as creating a catalog and schema + catalogOperations.createCatalog( + new CreateCatalog().name(TestUtils.CATALOG_NAME).comment(TestUtils.COMMENT)); + schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); + catalogOperations.createCatalog( + new CreateCatalog().name(SPARK_CATALOG).comment("Spark catalog")); + schemaOperations.createSchema(new CreateSchema().name(SCHEMA_NAME).catalogName(SPARK_CATALOG)); + } + + private SparkSession createSparkSessionWithCatalogs(String... catalogs) { + SparkSession.Builder builder = + SparkSession.builder() + .appName("test") + .master("local[*]") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"); + for (String catalog : catalogs) { + String catalogConf = "spark.sql.catalog." + catalog; + builder = + builder + .config(catalogConf, UCSingleCatalog.class.getName()) + .config(catalogConf + ".uri", serverConfig.getServerUrl()) + .config(catalogConf + ".token", serverConfig.getAuthToken()); } - - private SchemaOperations schemaOperations; - private TableOperations tableOperations; - private void createCommonResources() throws ApiException { - // Common setup operations such as creating a catalog and schema - catalogOperations.createCatalog(new CreateCatalog() - .name(TestUtils.CATALOG_NAME) - .comment(TestUtils.COMMENT)); - schemaOperations.createSchema( - new CreateSchema().name(SCHEMA_NAME).catalogName(CATALOG_NAME)); - catalogOperations.createCatalog(new CreateCatalog() - .name(SPARK_CATALOG) - .comment("Spark catalog")); - schemaOperations.createSchema( - new CreateSchema().name(SCHEMA_NAME).catalogName(SPARK_CATALOG)); - } - - private SparkSession createSparkSessionWithCatalogs(String... catalogs) { - SparkSession.Builder builder = SparkSession.builder().appName("test").master("local[*]") - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"); - for (String catalog : catalogs) { - String catalogConf = "spark.sql.catalog." + catalog; - builder = builder.config(catalogConf, UCSingleCatalog.class.getName()) - .config(catalogConf + ".uri", serverConfig.getServerUrl()) - .config(catalogConf + ".token", serverConfig.getAuthToken()); - } - // Use fake file system for s3:// so that we can test credentials. - builder.config("fs.s3.impl", CredentialTestFileSystem.class.getName()); - return builder.getOrCreate(); - } - - private void setupExternalParquetTable( - String tableName, - List partitionColumns) throws IOException, ApiException { - String location = generateTableLocation(SPARK_CATALOG, tableName); - setupExternalParquetTable(tableName, location, partitionColumns); + // Use fake file system for s3:// so that we can test credentials. + builder.config("fs.s3.impl", CredentialTestFileSystem.class.getName()); + return builder.getOrCreate(); + } + + private void setupExternalParquetTable(String tableName, List partitionColumns) + throws IOException, ApiException { + String location = generateTableLocation(SPARK_CATALOG, tableName); + setupExternalParquetTable(tableName, location, partitionColumns); + } + + private void setupExternalParquetTable( + String tableName, String location, List partitionColumns) + throws IOException, ApiException { + setupTables( + SPARK_CATALOG, tableName, DataSourceFormat.PARQUET, location, partitionColumns, false); + } + + private void setupExternalDeltaTable( + String catalogName, String tableName, List partitionColumns, SparkSession session) + throws IOException, ApiException { + String location = generateTableLocation(catalogName, tableName); + setupExternalDeltaTable(catalogName, tableName, location, partitionColumns, session); + } + + private void setupExternalDeltaTable( + String catalogName, + String tableName, + String location, + List partitionColumns, + SparkSession session) + throws IOException, ApiException { + // The Delta path can't be empty, need to initialize before read. + String partitionClause; + if (partitionColumns.isEmpty()) { + partitionClause = ""; + } else { + partitionClause = String.format(" PARTITIONED BY (%s)", String.join(", ", partitionColumns)); } - - private void setupExternalParquetTable( - String tableName, - String location, - List partitionColumns) throws IOException, ApiException { - setupTables( - SPARK_CATALOG, - tableName, - DataSourceFormat.PARQUET, - location, - partitionColumns, - false); + session.sql( + String.format("CREATE TABLE delta.`%s`(i INT, s STRING) USING delta", location) + + partitionClause); + + setupTables(catalogName, tableName, DataSourceFormat.DELTA, location, partitionColumns, false); + } + + private void setupTables( + String catalogName, + String tableName, + DataSourceFormat format, + String location, + List partitionColumns, + boolean isManaged) + throws IOException, ApiException { + Integer partitionIndex1 = partitionColumns.indexOf("i"); + if (partitionIndex1 == -1) partitionIndex1 = null; + Integer partitionIndex2 = partitionColumns.indexOf("s"); + if (partitionIndex2 == -1) partitionIndex2 = null; + + ColumnInfo c1 = + new ColumnInfo() + .name("i") + .typeText("INTEGER") + .typeJson("{\"type\": \"integer\"}") + .typeName(ColumnTypeName.INT) + .typePrecision(10) + .typeScale(0) + .position(0) + .partitionIndex(partitionIndex1) + .comment("Integer column") + .nullable(true); + + ColumnInfo c2 = + new ColumnInfo() + .name("s") + .typeText("STRING") + .typeJson("{\"type\": \"string\"}") + .typeName(ColumnTypeName.STRING) + .position(1) + .partitionIndex(partitionIndex2) + .comment("String column") + .nullable(true); + TableType tableType; + if (isManaged) { + tableType = TableType.MANAGED; + } else { + tableType = TableType.EXTERNAL; } - - private void setupExternalDeltaTable( - String catalogName, - String tableName, - List partitionColumns, - SparkSession session) throws IOException, ApiException { - String location = generateTableLocation(catalogName, tableName); - setupExternalDeltaTable(catalogName, tableName, location, partitionColumns, session); + CreateTable createTableRequest = + new CreateTable() + .name(tableName) + .catalogName(catalogName) + .schemaName(SCHEMA_NAME) + .columns(Arrays.asList(c1, c2)) + .comment(COMMENT) + .tableType(tableType) + .dataSourceFormat(format); + if (!isManaged) { + createTableRequest = createTableRequest.storageLocation(location); } - - private void setupExternalDeltaTable( - String catalogName, - String tableName, - String location, - List partitionColumns, - SparkSession session) throws IOException, ApiException { - // The Delta path can't be empty, need to initialize before read. - String partitionClause; - if (partitionColumns.isEmpty()) { - partitionClause = ""; - } else { - partitionClause = String.format( - " PARTITIONED BY (%s)", - String.join(", ", partitionColumns)); - } - session.sql(String.format("CREATE TABLE delta.`%s`(i INT, s STRING) USING delta", location) + partitionClause); - - setupTables( - catalogName, - tableName, - DataSourceFormat.DELTA, - location, - partitionColumns, - false); + tableOperations.createTable(createTableRequest); + } + + @BeforeEach + @Override + public void setUp() { + super.setUp(); + schemaOperations = new SdkSchemaOperations(createApiClient(serverConfig)); + tableOperations = new SdkTableOperations(createApiClient(serverConfig)); + cleanUp(); + } + + @Override + protected CatalogOperations createCatalogOperations(ServerConfig serverConfig) { + return new SdkCatalogOperations(createApiClient(serverConfig)); + } + + @Override + public void cleanUp() { + try { + catalogOperations.deleteCatalog(SPARK_CATALOG, Optional.of(true)); + } catch (Exception e) { + // Ignore } - - private void setupTables( - String catalogName, - String tableName, - DataSourceFormat format, - String location, - List partitionColumns, - boolean isManaged) throws IOException, ApiException { - Integer partitionIndex1 = partitionColumns.indexOf("i"); - if (partitionIndex1 == -1) partitionIndex1 = null; - Integer partitionIndex2 = partitionColumns.indexOf("s"); - if (partitionIndex2 == -1) partitionIndex2 = null; - - ColumnInfo c1 = new ColumnInfo().name("i").typeText("INTEGER") - .typeJson("{\"type\": \"integer\"}") - .typeName(ColumnTypeName.INT).typePrecision(10).typeScale(0).position(0) - .partitionIndex(partitionIndex1) - .comment("Integer column") - .nullable(true); - - ColumnInfo c2 = new ColumnInfo().name("s").typeText("STRING") - .typeJson("{\"type\": \"string\"}") - .typeName(ColumnTypeName.STRING).position(1) - .partitionIndex(partitionIndex2) - .comment("String column") - .nullable(true); - TableType tableType; - if (isManaged) { - tableType = TableType.MANAGED; - } else { - tableType = TableType.EXTERNAL; - } - CreateTable createTableRequest = new CreateTable() - .name(tableName) - .catalogName(catalogName) - .schemaName(SCHEMA_NAME) - .columns(Arrays.asList(c1, c2)) - .comment(COMMENT) - .tableType(tableType) - .dataSourceFormat(format); - if (!isManaged) { - createTableRequest = createTableRequest.storageLocation(location); - } - tableOperations.createTable(createTableRequest); - } - - @BeforeEach - @Override - public void setUp() { - super.setUp(); - schemaOperations = new SdkSchemaOperations(createApiClient(serverConfig)); - tableOperations = new SdkTableOperations(createApiClient(serverConfig)); - cleanUp(); - } - - @Override - protected CatalogOperations createCatalogOperations(ServerConfig serverConfig) { - return new SdkCatalogOperations(createApiClient(serverConfig)); - } - - @Override - public void cleanUp() { - try { - catalogOperations.deleteCatalog(SPARK_CATALOG, Optional.of(true)); - } catch (Exception e) { - // Ignore - } - super.cleanUp(); - try { - JavaUtils.deleteRecursively(dataDir); - } catch (IOException e) { - throw new RuntimeException(e); - } + super.cleanUp(); + try { + JavaUtils.deleteRecursively(dataDir); + } catch (IOException e) { + throw new RuntimeException(e); } + } }