From 209895f0409a8bf87f6a7fa0de30d1868d6d6727 Mon Sep 17 00:00:00 2001 From: Alex Reid Date: Sun, 14 Jul 2024 22:05:57 -0700 Subject: [PATCH] add support for tables located in S3 in Iceberg REST catalog (#123) **PR Checklist** - [x] A description of the changes is added to the description of this PR. - [x] If there is a related issue, make sure it is linked to this PR. - [x] If you've fixed a bug or added code that should be tested, add tests! - [ ] If you've added or modified a feature, documentation in `docs` is updated **Description of changes** This PR introduces the ability to load Iceberg tables that are located in S3. (Previously only local path was supported). This adds new dependencies on iceberg-aws module as well as aws sdk v2. This also introduces new testing frameworks - mockito and S3Mock in order to better test and simulate reading an iceberg metadata out of S3 with S3FileIO (as well as starting to use JUnit5 instead of JUnit4). **Related Issues** This PR covers the basic and initial S3 support addressing issue https://github.com/unitycatalog/unitycatalog/issues/105 --- build.sbt | 19 +++++- .../server/UnityCatalogServer.java | 14 +++- .../service/IcebergRestCatalogService.java | 41 +++++++---- .../server/service/iceberg/FileIOFactory.java | 64 +++++++++++++++++ .../service/iceberg/MetadataService.java | 25 +++++++ .../service/iceberg/SimpleLocalFileIO.java | 23 +++++++ .../IcebergRestCatalogTest.java | 34 ++++++---- .../service/iceberg/MetadataServiceTest.java | 68 +++++++++++++++++++ .../{metadata.json => iceberg.metadata.json} | 0 .../resources/simple-v1-iceberg.metadata.json | 42 ++++++++++++ 10 files changed, 300 insertions(+), 30 deletions(-) create mode 100644 server/src/main/java/io/unitycatalog/server/service/iceberg/FileIOFactory.java create mode 100644 server/src/main/java/io/unitycatalog/server/service/iceberg/MetadataService.java create mode 100644 server/src/main/java/io/unitycatalog/server/service/iceberg/SimpleLocalFileIO.java rename server/src/test/java/io/unitycatalog/server/{iceberg => service}/IcebergRestCatalogTest.java (94%) create mode 100644 server/src/test/java/io/unitycatalog/server/service/iceberg/MetadataServiceTest.java rename server/src/test/resources/{metadata.json => iceberg.metadata.json} (100%) create mode 100644 server/src/test/resources/simple-v1-iceberg.metadata.json diff --git a/build.sbt b/build.sbt index b0c3e667b..21472d197 100644 --- a/build.sbt +++ b/build.sbt @@ -187,13 +187,30 @@ lazy val server = (project in file("server")) // Iceberg REST Catalog dependencies "org.apache.iceberg" % "iceberg-core" % "1.5.2", + "org.apache.iceberg" % "iceberg-aws" % "1.5.2", + "software.amazon.awssdk" % "s3" % "2.24.0", "io.vertx" % "vertx-core" % "4.3.5", "io.vertx" % "vertx-web" % "4.3.5", "io.vertx" % "vertx-web-client" % "4.3.5", // Test dependencies - "junit" % "junit" % "4.13.2" % Test, + "junit" % "junit" % "4.13.2" % Test, // TODO: update tests to junit5 and remove this + "org.junit.jupiter" % "junit-jupiter" % "5.10.1" % Test, + "org.mockito" % "mockito-core" % "4.11.0" % Test, + "org.mockito" % "mockito-inline" % "4.11.0" % Test, + "org.mockito" % "mockito-junit-jupiter" % "4.11.0" % Test, "com.github.sbt" % "junit-interface" % "0.13.3" % Test, + "com.adobe.testing" % "s3mock-junit5" % "2.11.0" % Test + exclude("ch.qos.logback", "logback-classic") + exclude("org.apache.logging.log4j", "log4j-to-slf4j") + // the following are runtime test dependencies we exclude here + // in order to not to set off the licences check, but then + // add back below as provided + exclude("jakarta.annotation", "jakarta.annotation-api") + exclude("jakarta.servlet", "jakarta.servlet-api") + exclude("jakarta.websocket", "jakarta.websocket-api"), + "jakarta.servlet" % "jakarta.servlet-api" % "4.0.4" % Provided, + "javax.xml.bind" % "jaxb-api" % "2.3.1" % Provided ), Compile / compile / javacOptions ++= Seq( diff --git a/server/src/main/java/io/unitycatalog/server/UnityCatalogServer.java b/server/src/main/java/io/unitycatalog/server/UnityCatalogServer.java index ab67cc221..f0630bfe1 100644 --- a/server/src/main/java/io/unitycatalog/server/UnityCatalogServer.java +++ b/server/src/main/java/io/unitycatalog/server/UnityCatalogServer.java @@ -9,7 +9,16 @@ import com.linecorp.armeria.server.annotation.JacksonRequestConverterFunction; import com.linecorp.armeria.server.annotation.JacksonResponseConverterFunction; import com.linecorp.armeria.server.docs.DocService; -import io.unitycatalog.server.service.*; +import io.unitycatalog.server.service.CatalogService; +import io.unitycatalog.server.service.FunctionService; +import io.unitycatalog.server.service.IcebergRestCatalogService; +import io.unitycatalog.server.service.SchemaService; +import io.unitycatalog.server.service.TableService; +import io.unitycatalog.server.service.TemporaryTableCredentialsService; +import io.unitycatalog.server.service.TemporaryVolumeCredentialsService; +import io.unitycatalog.server.service.VolumeService; +import io.unitycatalog.server.service.iceberg.FileIOFactory; +import io.unitycatalog.server.service.iceberg.MetadataService; import io.unitycatalog.server.utils.RESTObjectMapper; import io.unitycatalog.server.utils.VersionUtils; import io.vertx.core.Verticle; @@ -73,9 +82,10 @@ private void addServices(ServerBuilder sb) { new JacksonRequestConverterFunction(icebergMapper); JacksonResponseConverterFunction icebergResponseConverter = new JacksonResponseConverterFunction(icebergMapper); + MetadataService metadataService = new MetadataService(new FileIOFactory()); sb.annotatedService( basePath + "iceberg", - new IcebergRestCatalogService(catalogService, schemaService, tableService), + new IcebergRestCatalogService(catalogService, schemaService, tableService, metadataService), icebergRequestConverter, icebergResponseConverter); } diff --git a/server/src/main/java/io/unitycatalog/server/service/IcebergRestCatalogService.java b/server/src/main/java/io/unitycatalog/server/service/IcebergRestCatalogService.java index 27c271265..60eeddf36 100644 --- a/server/src/main/java/io/unitycatalog/server/service/IcebergRestCatalogService.java +++ b/server/src/main/java/io/unitycatalog/server/service/IcebergRestCatalogService.java @@ -4,46 +4,58 @@ import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.server.annotation.*; +import com.linecorp.armeria.server.annotation.ExceptionHandler; +import com.linecorp.armeria.server.annotation.Get; +import com.linecorp.armeria.server.annotation.Head; +import com.linecorp.armeria.server.annotation.Param; +import com.linecorp.armeria.server.annotation.Post; +import com.linecorp.armeria.server.annotation.ProducesJson; import io.unitycatalog.server.exception.IcebergRestExceptionHandler; -import io.unitycatalog.server.model.*; +import io.unitycatalog.server.model.CatalogInfo; +import io.unitycatalog.server.model.ListCatalogsResponse; +import io.unitycatalog.server.model.ListSchemasResponse; import io.unitycatalog.server.model.ListTablesResponse; +import io.unitycatalog.server.model.SchemaInfo; import io.unitycatalog.server.persist.TableRepository; import io.unitycatalog.server.persist.utils.HibernateUtils; +import io.unitycatalog.server.service.iceberg.MetadataService; import io.unitycatalog.server.utils.JsonUtils; -import java.io.IOException; -import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.relocated.com.google.common.base.Splitter; -import org.apache.iceberg.rest.responses.*; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.GetNamespaceResponse; +import org.apache.iceberg.rest.responses.ListNamespacesResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.rest.responses.LoadViewResponse; import org.hibernate.Session; import org.hibernate.SessionFactory; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + @ExceptionHandler(IcebergRestExceptionHandler.class) public class IcebergRestCatalogService { private final CatalogService catalogService; private final SchemaService schemaService; private final TableService tableService; + private final MetadataService metadataService; private final TableRepository tableRepository = TableRepository.getInstance(); private static final SessionFactory sessionFactory = HibernateUtils.getSessionFactory(); public IcebergRestCatalogService( - CatalogService catalogService, SchemaService schemaService, TableService tableService) { + CatalogService catalogService, SchemaService schemaService, TableService tableService, MetadataService metadataService) { this.catalogService = catalogService; this.schemaService = schemaService; this.tableService = tableService; + this.metadataService = metadataService; } // Config APIs @@ -175,8 +187,7 @@ public LoadTableResponse loadTable( throw new NoSuchTableException("Table does not exist: %s", namespace + "." + table); } - String metadataJson = new String(Files.readAllBytes(Paths.get(URI.create(metadataLocation)))); - TableMetadata tableMetadata = TableMetadataParser.fromJson(metadataLocation, metadataJson); + TableMetadata tableMetadata = metadataService.readTableMetadata(metadataLocation); return LoadTableResponse.builder().withTableMetadata(tableMetadata).build(); } diff --git a/server/src/main/java/io/unitycatalog/server/service/iceberg/FileIOFactory.java b/server/src/main/java/io/unitycatalog/server/service/iceberg/FileIOFactory.java new file mode 100644 index 000000000..38a239fe5 --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/service/iceberg/FileIOFactory.java @@ -0,0 +1,64 @@ +package io.unitycatalog.server.service.iceberg; + +import io.unitycatalog.server.exception.BaseException; +import io.unitycatalog.server.model.AwsCredentials; +import io.unitycatalog.server.persist.utils.ServerPropertiesUtils; +import io.unitycatalog.server.utils.TemporaryCredentialUtils; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.io.FileIO; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +import java.net.URI; +import java.util.Map; + +public class FileIOFactory { + + private static final String S3 = "s3"; + + public FileIOFactory() { + } + + // TODO: Cache fileIOs + public FileIO getFileIO(URI tableLocationUri) { + switch (tableLocationUri.getScheme()) { + case S3: return getS3FileIO(tableLocationUri); + // TODO: should we default/fallback to HadoopFileIO ? + default: return new SimpleLocalFileIO(); + } + } + + protected S3FileIO getS3FileIO(URI tableLocationUri) { + String region = ServerPropertiesUtils.getInstance().getProperty("aws.region", System.getenv("AWS_REGION")); + + // FIXME!! - proper credential vending and region settings + S3FileIO s3FileIO = new S3FileIO(() -> getS3Client(getAwsCredentialsProvider(tableLocationUri), region)); + + s3FileIO.initialize(Map.of()); + + return s3FileIO; + } + + protected S3Client getS3Client(AwsCredentialsProvider awsCredentialsProvider, String region) { + return S3Client.builder() + .region(Region.of(region)) + .credentialsProvider(awsCredentialsProvider) + .forcePathStyle(false) + .build(); + } + + private AwsCredentialsProvider getAwsCredentialsProvider(URI tableLocationUri) { + try { + AwsCredentials credentials = TemporaryCredentialUtils.findS3BucketConfig(tableLocationUri.toString()); + return StaticCredentialsProvider.create( + AwsSessionCredentials.create( + credentials.getAccessKeyId(),credentials.getSecretAccessKey(),credentials.getSessionToken())); + } catch (BaseException e) { + return DefaultCredentialsProvider.create(); + } + } +} diff --git a/server/src/main/java/io/unitycatalog/server/service/iceberg/MetadataService.java b/server/src/main/java/io/unitycatalog/server/service/iceberg/MetadataService.java new file mode 100644 index 000000000..c65b87eda --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/service/iceberg/MetadataService.java @@ -0,0 +1,25 @@ +package io.unitycatalog.server.service.iceberg; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.io.FileIO; + +import java.io.IOException; +import java.net.URI; + +public class MetadataService { + + private final FileIOFactory fileIOFactory; + + public MetadataService(FileIOFactory fileIOFactory) { + this.fileIOFactory = fileIOFactory; + } + + public TableMetadata readTableMetadata(String metadataLocation) { + URI metadataLocationUri = URI.create(metadataLocation); + // TODO: cache fileIO + FileIO fileIO = fileIOFactory.getFileIO(metadataLocationUri); + + return TableMetadataParser.read(fileIO, metadataLocation); + } +} diff --git a/server/src/main/java/io/unitycatalog/server/service/iceberg/SimpleLocalFileIO.java b/server/src/main/java/io/unitycatalog/server/service/iceberg/SimpleLocalFileIO.java new file mode 100644 index 000000000..815c9a75d --- /dev/null +++ b/server/src/main/java/io/unitycatalog/server/service/iceberg/SimpleLocalFileIO.java @@ -0,0 +1,23 @@ +package io.unitycatalog.server.service.iceberg; + +import org.apache.iceberg.Files; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; + +public class SimpleLocalFileIO implements FileIO { + @Override + public InputFile newInputFile(String path) { + return Files.localInput(path); + } + + @Override + public OutputFile newOutputFile(String path) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteFile(String path) { + throw new UnsupportedOperationException(); + } +} diff --git a/server/src/test/java/io/unitycatalog/server/iceberg/IcebergRestCatalogTest.java b/server/src/test/java/io/unitycatalog/server/service/IcebergRestCatalogTest.java similarity index 94% rename from server/src/test/java/io/unitycatalog/server/iceberg/IcebergRestCatalogTest.java rename to server/src/test/java/io/unitycatalog/server/service/IcebergRestCatalogTest.java index 9aae1e2cc..7ad2345b2 100644 --- a/server/src/test/java/io/unitycatalog/server/iceberg/IcebergRestCatalogTest.java +++ b/server/src/test/java/io/unitycatalog/server/service/IcebergRestCatalogTest.java @@ -1,12 +1,19 @@ -package io.unitycatalog.server.iceberg; - -import static org.assertj.core.api.Assertions.assertThat; +package io.unitycatalog.server.service; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.auth.AuthToken; import io.unitycatalog.client.ApiException; -import io.unitycatalog.client.model.*; +import io.unitycatalog.client.model.CatalogInfo; +import io.unitycatalog.client.model.ColumnInfo; +import io.unitycatalog.client.model.ColumnTypeName; +import io.unitycatalog.client.model.CreateCatalog; +import io.unitycatalog.client.model.CreateSchema; +import io.unitycatalog.client.model.CreateTable; +import io.unitycatalog.client.model.DataSourceFormat; +import io.unitycatalog.client.model.SchemaInfo; +import io.unitycatalog.client.model.TableInfo; +import io.unitycatalog.client.model.TableType; import io.unitycatalog.server.base.BaseServerTest; import io.unitycatalog.server.base.catalog.CatalogOperations; import io.unitycatalog.server.base.schema.SchemaOperations; @@ -18,12 +25,6 @@ import io.unitycatalog.server.sdk.tables.SdkTableOperations; import io.unitycatalog.server.utils.RESTObjectMapper; import io.unitycatalog.server.utils.TestUtils; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.UUID; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -38,6 +39,15 @@ import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + public class IcebergRestCatalogTest extends BaseServerTest { protected CatalogOperations catalogOperations; @@ -249,7 +259,7 @@ public void testTable() throws ApiException, IOException, URISyntaxException { assertThat(tableInfo.getTableId()).isNotNull(); session.load(tableInfoDAO, UUID.fromString(tableInfo.getTableId())); String metadataLocation = - Objects.requireNonNull(this.getClass().getResource("/metadata.json")).toURI().toString(); + Objects.requireNonNull(this.getClass().getResource("/iceberg.metadata.json")).toURI().toString(); tableInfoDAO.setUniformIcebergMetadataLocation(metadataLocation); session.merge(tableInfoDAO); tx.commit(); @@ -287,7 +297,7 @@ public void testTable() throws ApiException, IOException, URISyntaxException { LoadTableResponse loadTableResponse = RESTObjectMapper.mapper().readValue(resp.contentUtf8(), LoadTableResponse.class); assertThat(loadTableResponse.tableMetadata().metadataFileLocation()) - .isEqualTo(this.getClass().getResource("/metadata.json").toURI().toString()); + .isEqualTo(Objects.requireNonNull(this.getClass().getResource("/iceberg.metadata.json")).getPath()); } // List uniform tables diff --git a/server/src/test/java/io/unitycatalog/server/service/iceberg/MetadataServiceTest.java b/server/src/test/java/io/unitycatalog/server/service/iceberg/MetadataServiceTest.java new file mode 100644 index 000000000..35cf7d93c --- /dev/null +++ b/server/src/test/java/io/unitycatalog/server/service/iceberg/MetadataServiceTest.java @@ -0,0 +1,68 @@ +package io.unitycatalog.server.service.iceberg; + +import com.adobe.testing.s3mock.junit5.S3MockExtension; +import com.amazonaws.util.IOUtils; +import lombok.SneakyThrows; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; + +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(S3MockExtension.class) +public class MetadataServiceTest { + @RegisterExtension + public static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent().build(); + + public static final String TEST_BUCKET = "test-bucket"; + public static final String TEST_LOCATION = "test-bucket"; + public static final String TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME = "simple-v1-iceberg.metadata.json"; + + private final FileIOFactory mockFileIOFactory = mock(); + private final S3Client mockS3Client = S3_MOCK.createS3ClientV2(); + + private MetadataService metadataService; + + @SneakyThrows + @BeforeEach + public void setUp() { + metadataService = new MetadataService(mockFileIOFactory); + } + + @SneakyThrows + @Test + public void testGetTableMetadataFromS3() { + when(mockFileIOFactory.getFileIO(any())).thenReturn(new S3FileIO(() -> mockS3Client)); + mockS3Client.createBucket(builder -> builder.bucket(TEST_BUCKET).build()); + String simpleMetadataJson = IOUtils.toString( + Objects.requireNonNull(this.getClass().getResourceAsStream("/" + TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME))); + mockS3Client.putObject( + builder -> builder.bucket(TEST_BUCKET).key(TEST_LOCATION + "/" + TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME).build(), + RequestBody.fromString(simpleMetadataJson)); + + String metadataLocation = "s3://" + TEST_BUCKET + "/" + TEST_LOCATION + "/" + TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME; + TableMetadata tableMetadata = metadataService.readTableMetadata(metadataLocation); + assertThat(tableMetadata.uuid()).isEqualTo("11111111-2222-3333-4444-555555555555"); + } + + @SneakyThrows + @Test + public void testGetTableMetadataFromLocalFS() { + when(mockFileIOFactory.getFileIO(any())).thenReturn(new SimpleLocalFileIO()); + String metadataLocation = Objects.requireNonNull( + this.getClass().getResource("/iceberg.metadata.json")).toURI().toString(); + TableMetadata tableMetadata = metadataService.readTableMetadata(metadataLocation); + assertThat(tableMetadata.uuid()).isEqualTo("55d4dc69-5b14-4483-bfc8-f33b80f99f99"); + } + +} diff --git a/server/src/test/resources/metadata.json b/server/src/test/resources/iceberg.metadata.json similarity index 100% rename from server/src/test/resources/metadata.json rename to server/src/test/resources/iceberg.metadata.json diff --git a/server/src/test/resources/simple-v1-iceberg.metadata.json b/server/src/test/resources/simple-v1-iceberg.metadata.json new file mode 100644 index 000000000..9a4afc100 --- /dev/null +++ b/server/src/test/resources/simple-v1-iceberg.metadata.json @@ -0,0 +1,42 @@ +{ + "format-version": 1, + "table-uuid": "11111111-2222-3333-4444-555555555555", + "location": "s3://test-bucket/testLocation", + "last-updated-ms": 1720620264000, + "last-column-id": 3, + "schema": { + "type": "struct", + "fields": [ + { + "id": 1, + "name": "id", + "required": true, + "type": "string" + }, + { + "id": 2, + "name": "cat", + "required": true, + "type": "string", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + }, + "partition-spec": [ + { + "name": "id", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ], + "properties": {}, + "current-snapshot-id": -1, + "snapshots": [] +} \ No newline at end of file