Skip to content

Commit

Permalink
add support for tables located in S3 in Iceberg REST catalog (unityca…
Browse files Browse the repository at this point in the history
…talog#123)

**PR Checklist**

- [x] A description of the changes is added to the description of this
PR.
- [x] If there is a related issue, make sure it is linked to this PR.
- [x] If you've fixed a bug or added code that should be tested, add
tests!
- [ ] If you've added or modified a feature, documentation in `docs` is
updated

**Description of changes**

This PR introduces the ability to load Iceberg tables that are located
in S3. (Previously only local path was supported). This adds new
dependencies on iceberg-aws module as well as aws sdk v2. This also
introduces new testing frameworks - mockito and S3Mock in order to
better test and simulate reading an iceberg metadata out of S3 with
S3FileIO (as well as starting to use JUnit5 instead of JUnit4).

**Related Issues**

This PR covers the basic and initial S3 support addressing issue
unitycatalog#105
  • Loading branch information
ajreid21 authored Jul 15, 2024
1 parent f11ae8a commit 209895f
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 30 deletions.
19 changes: 18 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,30 @@ lazy val server = (project in file("server"))

// Iceberg REST Catalog dependencies
"org.apache.iceberg" % "iceberg-core" % "1.5.2",
"org.apache.iceberg" % "iceberg-aws" % "1.5.2",
"software.amazon.awssdk" % "s3" % "2.24.0",
"io.vertx" % "vertx-core" % "4.3.5",
"io.vertx" % "vertx-web" % "4.3.5",
"io.vertx" % "vertx-web-client" % "4.3.5",

// Test dependencies
"junit" % "junit" % "4.13.2" % Test,
"junit" % "junit" % "4.13.2" % Test, // TODO: update tests to junit5 and remove this
"org.junit.jupiter" % "junit-jupiter" % "5.10.1" % Test,
"org.mockito" % "mockito-core" % "4.11.0" % Test,
"org.mockito" % "mockito-inline" % "4.11.0" % Test,
"org.mockito" % "mockito-junit-jupiter" % "4.11.0" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test,
"com.adobe.testing" % "s3mock-junit5" % "2.11.0" % Test
exclude("ch.qos.logback", "logback-classic")
exclude("org.apache.logging.log4j", "log4j-to-slf4j")
// the following are runtime test dependencies we exclude here
// in order to not to set off the licences check, but then
// add back below as provided
exclude("jakarta.annotation", "jakarta.annotation-api")
exclude("jakarta.servlet", "jakarta.servlet-api")
exclude("jakarta.websocket", "jakarta.websocket-api"),
"jakarta.servlet" % "jakarta.servlet-api" % "4.0.4" % Provided,
"javax.xml.bind" % "jaxb-api" % "2.3.1" % Provided
),

Compile / compile / javacOptions ++= Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@
import com.linecorp.armeria.server.annotation.JacksonRequestConverterFunction;
import com.linecorp.armeria.server.annotation.JacksonResponseConverterFunction;
import com.linecorp.armeria.server.docs.DocService;
import io.unitycatalog.server.service.*;
import io.unitycatalog.server.service.CatalogService;
import io.unitycatalog.server.service.FunctionService;
import io.unitycatalog.server.service.IcebergRestCatalogService;
import io.unitycatalog.server.service.SchemaService;
import io.unitycatalog.server.service.TableService;
import io.unitycatalog.server.service.TemporaryTableCredentialsService;
import io.unitycatalog.server.service.TemporaryVolumeCredentialsService;
import io.unitycatalog.server.service.VolumeService;
import io.unitycatalog.server.service.iceberg.FileIOFactory;
import io.unitycatalog.server.service.iceberg.MetadataService;
import io.unitycatalog.server.utils.RESTObjectMapper;
import io.unitycatalog.server.utils.VersionUtils;
import io.vertx.core.Verticle;
Expand Down Expand Up @@ -73,9 +82,10 @@ private void addServices(ServerBuilder sb) {
new JacksonRequestConverterFunction(icebergMapper);
JacksonResponseConverterFunction icebergResponseConverter =
new JacksonResponseConverterFunction(icebergMapper);
MetadataService metadataService = new MetadataService(new FileIOFactory());
sb.annotatedService(
basePath + "iceberg",
new IcebergRestCatalogService(catalogService, schemaService, tableService),
new IcebergRestCatalogService(catalogService, schemaService, tableService, metadataService),
icebergRequestConverter,
icebergResponseConverter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,58 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.server.annotation.*;
import com.linecorp.armeria.server.annotation.ExceptionHandler;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Head;
import com.linecorp.armeria.server.annotation.Param;
import com.linecorp.armeria.server.annotation.Post;
import com.linecorp.armeria.server.annotation.ProducesJson;
import io.unitycatalog.server.exception.IcebergRestExceptionHandler;
import io.unitycatalog.server.model.*;
import io.unitycatalog.server.model.CatalogInfo;
import io.unitycatalog.server.model.ListCatalogsResponse;
import io.unitycatalog.server.model.ListSchemasResponse;
import io.unitycatalog.server.model.ListTablesResponse;
import io.unitycatalog.server.model.SchemaInfo;
import io.unitycatalog.server.persist.TableRepository;
import io.unitycatalog.server.persist.utils.HibernateUtils;
import io.unitycatalog.server.service.iceberg.MetadataService;
import io.unitycatalog.server.utils.JsonUtils;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.rest.responses.*;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.GetNamespaceResponse;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.LoadViewResponse;
import org.hibernate.Session;
import org.hibernate.SessionFactory;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

@ExceptionHandler(IcebergRestExceptionHandler.class)
public class IcebergRestCatalogService {

private final CatalogService catalogService;
private final SchemaService schemaService;
private final TableService tableService;
private final MetadataService metadataService;
private final TableRepository tableRepository = TableRepository.getInstance();
private static final SessionFactory sessionFactory = HibernateUtils.getSessionFactory();

public IcebergRestCatalogService(
CatalogService catalogService, SchemaService schemaService, TableService tableService) {
CatalogService catalogService, SchemaService schemaService, TableService tableService, MetadataService metadataService) {
this.catalogService = catalogService;
this.schemaService = schemaService;
this.tableService = tableService;
this.metadataService = metadataService;
}

// Config APIs
Expand Down Expand Up @@ -175,8 +187,7 @@ public LoadTableResponse loadTable(
throw new NoSuchTableException("Table does not exist: %s", namespace + "." + table);
}

String metadataJson = new String(Files.readAllBytes(Paths.get(URI.create(metadataLocation))));
TableMetadata tableMetadata = TableMetadataParser.fromJson(metadataLocation, metadataJson);
TableMetadata tableMetadata = metadataService.readTableMetadata(metadataLocation);

return LoadTableResponse.builder().withTableMetadata(tableMetadata).build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.unitycatalog.server.service.iceberg;

import io.unitycatalog.server.exception.BaseException;
import io.unitycatalog.server.model.AwsCredentials;
import io.unitycatalog.server.persist.utils.ServerPropertiesUtils;
import io.unitycatalog.server.utils.TemporaryCredentialUtils;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.io.FileIO;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

import java.net.URI;
import java.util.Map;

public class FileIOFactory {

private static final String S3 = "s3";

public FileIOFactory() {
}

// TODO: Cache fileIOs
public FileIO getFileIO(URI tableLocationUri) {
switch (tableLocationUri.getScheme()) {
case S3: return getS3FileIO(tableLocationUri);
// TODO: should we default/fallback to HadoopFileIO ?
default: return new SimpleLocalFileIO();
}
}

protected S3FileIO getS3FileIO(URI tableLocationUri) {
String region = ServerPropertiesUtils.getInstance().getProperty("aws.region", System.getenv("AWS_REGION"));

// FIXME!! - proper credential vending and region settings
S3FileIO s3FileIO = new S3FileIO(() -> getS3Client(getAwsCredentialsProvider(tableLocationUri), region));

s3FileIO.initialize(Map.of());

return s3FileIO;
}

protected S3Client getS3Client(AwsCredentialsProvider awsCredentialsProvider, String region) {
return S3Client.builder()
.region(Region.of(region))
.credentialsProvider(awsCredentialsProvider)
.forcePathStyle(false)
.build();
}

private AwsCredentialsProvider getAwsCredentialsProvider(URI tableLocationUri) {
try {
AwsCredentials credentials = TemporaryCredentialUtils.findS3BucketConfig(tableLocationUri.toString());
return StaticCredentialsProvider.create(
AwsSessionCredentials.create(
credentials.getAccessKeyId(),credentials.getSecretAccessKey(),credentials.getSessionToken()));
} catch (BaseException e) {
return DefaultCredentialsProvider.create();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.unitycatalog.server.service.iceberg;

import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.io.FileIO;

import java.io.IOException;
import java.net.URI;

public class MetadataService {

private final FileIOFactory fileIOFactory;

public MetadataService(FileIOFactory fileIOFactory) {
this.fileIOFactory = fileIOFactory;
}

public TableMetadata readTableMetadata(String metadataLocation) {
URI metadataLocationUri = URI.create(metadataLocation);
// TODO: cache fileIO
FileIO fileIO = fileIOFactory.getFileIO(metadataLocationUri);

return TableMetadataParser.read(fileIO, metadataLocation);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.unitycatalog.server.service.iceberg;

import org.apache.iceberg.Files;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

public class SimpleLocalFileIO implements FileIO {
@Override
public InputFile newInputFile(String path) {
return Files.localInput(path);
}

@Override
public OutputFile newOutputFile(String path) {
throw new UnsupportedOperationException();
}

@Override
public void deleteFile(String path) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package io.unitycatalog.server.iceberg;

import static org.assertj.core.api.Assertions.assertThat;
package io.unitycatalog.server.service;

import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.auth.AuthToken;
import io.unitycatalog.client.ApiException;
import io.unitycatalog.client.model.*;
import io.unitycatalog.client.model.CatalogInfo;
import io.unitycatalog.client.model.ColumnInfo;
import io.unitycatalog.client.model.ColumnTypeName;
import io.unitycatalog.client.model.CreateCatalog;
import io.unitycatalog.client.model.CreateSchema;
import io.unitycatalog.client.model.CreateTable;
import io.unitycatalog.client.model.DataSourceFormat;
import io.unitycatalog.client.model.SchemaInfo;
import io.unitycatalog.client.model.TableInfo;
import io.unitycatalog.client.model.TableType;
import io.unitycatalog.server.base.BaseServerTest;
import io.unitycatalog.server.base.catalog.CatalogOperations;
import io.unitycatalog.server.base.schema.SchemaOperations;
Expand All @@ -18,12 +25,6 @@
import io.unitycatalog.server.sdk.tables.SdkTableOperations;
import io.unitycatalog.server.utils.RESTObjectMapper;
import io.unitycatalog.server.utils.TestUtils;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
Expand All @@ -38,6 +39,15 @@
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

public class IcebergRestCatalogTest extends BaseServerTest {

protected CatalogOperations catalogOperations;
Expand Down Expand Up @@ -249,7 +259,7 @@ public void testTable() throws ApiException, IOException, URISyntaxException {
assertThat(tableInfo.getTableId()).isNotNull();
session.load(tableInfoDAO, UUID.fromString(tableInfo.getTableId()));
String metadataLocation =
Objects.requireNonNull(this.getClass().getResource("/metadata.json")).toURI().toString();
Objects.requireNonNull(this.getClass().getResource("/iceberg.metadata.json")).toURI().toString();
tableInfoDAO.setUniformIcebergMetadataLocation(metadataLocation);
session.merge(tableInfoDAO);
tx.commit();
Expand Down Expand Up @@ -287,7 +297,7 @@ public void testTable() throws ApiException, IOException, URISyntaxException {
LoadTableResponse loadTableResponse =
RESTObjectMapper.mapper().readValue(resp.contentUtf8(), LoadTableResponse.class);
assertThat(loadTableResponse.tableMetadata().metadataFileLocation())
.isEqualTo(this.getClass().getResource("/metadata.json").toURI().toString());
.isEqualTo(Objects.requireNonNull(this.getClass().getResource("/iceberg.metadata.json")).getPath());
}

// List uniform tables
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.unitycatalog.server.service.iceberg;

import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.amazonaws.util.IOUtils;
import lombok.SneakyThrows;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Objects;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(S3MockExtension.class)
public class MetadataServiceTest {
@RegisterExtension
public static final S3MockExtension S3_MOCK = S3MockExtension.builder().silent().build();

public static final String TEST_BUCKET = "test-bucket";
public static final String TEST_LOCATION = "test-bucket";
public static final String TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME = "simple-v1-iceberg.metadata.json";

private final FileIOFactory mockFileIOFactory = mock();
private final S3Client mockS3Client = S3_MOCK.createS3ClientV2();

private MetadataService metadataService;

@SneakyThrows
@BeforeEach
public void setUp() {
metadataService = new MetadataService(mockFileIOFactory);
}

@SneakyThrows
@Test
public void testGetTableMetadataFromS3() {
when(mockFileIOFactory.getFileIO(any())).thenReturn(new S3FileIO(() -> mockS3Client));
mockS3Client.createBucket(builder -> builder.bucket(TEST_BUCKET).build());
String simpleMetadataJson = IOUtils.toString(
Objects.requireNonNull(this.getClass().getResourceAsStream("/" + TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME)));
mockS3Client.putObject(
builder -> builder.bucket(TEST_BUCKET).key(TEST_LOCATION + "/" + TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME).build(),
RequestBody.fromString(simpleMetadataJson));

String metadataLocation = "s3://" + TEST_BUCKET + "/" + TEST_LOCATION + "/" + TEST_SIMPLE_ICEBERG_V1_METADATA_FILE_NAME;
TableMetadata tableMetadata = metadataService.readTableMetadata(metadataLocation);
assertThat(tableMetadata.uuid()).isEqualTo("11111111-2222-3333-4444-555555555555");
}

@SneakyThrows
@Test
public void testGetTableMetadataFromLocalFS() {
when(mockFileIOFactory.getFileIO(any())).thenReturn(new SimpleLocalFileIO());
String metadataLocation = Objects.requireNonNull(
this.getClass().getResource("/iceberg.metadata.json")).toURI().toString();
TableMetadata tableMetadata = metadataService.readTableMetadata(metadataLocation);
assertThat(tableMetadata.uuid()).isEqualTo("55d4dc69-5b14-4483-bfc8-f33b80f99f99");
}

}
File renamed without changes.
Loading

0 comments on commit 209895f

Please sign in to comment.