diff --git a/clients/hadoopfs/pom.xml b/clients/hadoopfs/pom.xml index ff31651e65a..066c38356dc 100644 --- a/clients/hadoopfs/pom.xml +++ b/clients/hadoopfs/pom.xml @@ -287,7 +287,7 @@ To export to S3: io.lakefs sdk - 1.18.0 + 1.43.0 org.apache.commons diff --git a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSLinker.java b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSLinker.java index 9f7f9a88b31..76674d1845e 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/LakeFSLinker.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/LakeFSLinker.java @@ -1,6 +1,7 @@ package io.lakefs; import java.io.IOException; +import java.util.Date; import org.apache.hadoop.fs.Path; import io.lakefs.clients.sdk.ApiException; import io.lakefs.clients.sdk.StagingApi; @@ -24,10 +25,16 @@ public LakeFSLinker(LakeFSFileSystem lfs, LakeFSClient lfsClient, this.overwrite = overwrite; } - public void link(String eTag, long byteSize) throws IOException { + public void link(String eTag, long byteSize, Date time) throws IOException { StagingApi staging = lakeFSClient.getStagingApi(); - StagingMetadata stagingMetadata = - new StagingMetadata().checksum(eTag).sizeBytes(byteSize).staging(stagingLocation); + StagingMetadata stagingMetadata = new StagingMetadata() + .checksum(eTag) + .sizeBytes(byteSize) + .staging(stagingLocation); + if (time != null) { + long secs = (time.getTime() + 500) / 1000; + stagingMetadata.setMtime(secs); + } try { StagingApi.APIlinkPhysicalAddressRequest request = staging.linkPhysicalAddress(objectLoc.getRepository(), objectLoc.getRef(), objectLoc.getPath(), stagingMetadata); diff --git a/clients/hadoopfs/src/main/java/io/lakefs/storage/LakeFSFileSystemOutputStream.java b/clients/hadoopfs/src/main/java/io/lakefs/storage/LakeFSFileSystemOutputStream.java index c33bcec278c..d823477c93e 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/storage/LakeFSFileSystemOutputStream.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/storage/LakeFSFileSystemOutputStream.java @@ -50,7 +50,7 @@ public void close() throws IOException { if (eTag == null) { throw new IOException("Failed to finish writing to presigned link. No ETag found."); } - linker.link(eTag, buffer.size()); + linker.link(eTag, buffer.size(), null); if (connection.getResponseCode() > 299) { throw new IOException("Failed to finish writing to presigned link. Response code: " + connection.getResponseCode()); diff --git a/clients/hadoopfs/src/main/java/io/lakefs/storage/LinkOnCloseOutputStream.java b/clients/hadoopfs/src/main/java/io/lakefs/storage/LinkOnCloseOutputStream.java index ceb8eb3d42f..4a2a869cf41 100644 --- a/clients/hadoopfs/src/main/java/io/lakefs/storage/LinkOnCloseOutputStream.java +++ b/clients/hadoopfs/src/main/java/io/lakefs/storage/LinkOnCloseOutputStream.java @@ -58,7 +58,7 @@ public void close() throws IOException { // the underlying Hadoop FileSystem) so we can link it on lakeFS. if (!this.isLinked.getAndSet(true)) { ObjectMetadata objectMetadata = metadataClient.getObjectMetadata(physicalUri); - linker.link(objectMetadata.getETag(), objectMetadata.getContentLength()); + linker.link(objectMetadata.getETag(), objectMetadata.getContentLength(), objectMetadata.getLastModified()); } } } diff --git a/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java b/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java index 6b41dd1da4b..de4a9fd3c66 100644 --- a/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java +++ b/clients/hadoopfs/src/test/java/io/lakefs/FSTestBase.java @@ -121,20 +121,12 @@ protected String getS3Key(StagingLocation stagingLocation) { return removeStart(stagingLocation.getPhysicalAddress(), s3Base); } - /** - * Override to add to Hadoop configuration. - */ - protected void addHadoopConfiguration(Configuration conf) { - } - @Before public void hadoopSetup() throws IOException, URISyntaxException { s3Base = "s3a://UNUSED/"; // Overridden if S3 will be used! conf = new Configuration(false); - addHadoopConfiguration(conf); - conf.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem"); conf.set("fs.lakefs.access.key", "unused-but-checked"); diff --git a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java index ef70cc74ac4..55efc2bb2a4 100644 --- a/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java +++ b/clients/hadoopfs/src/test/java/io/lakefs/LakeFSFileSystemServerS3Test.java @@ -9,14 +9,13 @@ import io.lakefs.utils.ObjectLocation; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.Path; -import com.amazonaws.HttpMethod; import com.amazonaws.services.s3.model.*; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -29,21 +28,13 @@ import static org.mockserver.model.JsonBody.json; import java.io.*; -import java.net.URL; import java.util.Date; import java.util.Arrays; -import java.util.concurrent.TimeUnit; @RunWith(Parameterized.class) public class LakeFSFileSystemServerS3Test extends S3FSTestBase { static private final Logger LOG = LoggerFactory.getLogger(LakeFSFileSystemServerS3Test.class); - public static interface PhysicalAddressCreator { - default void initConfiguration(Configuration conf) {} - String createGetPhysicalAddress(S3FSTestBase o, String key); - StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path); - } - @Parameters(name="{1}") public static Iterable data() { return Arrays.asList(new Object[][]{ @@ -51,73 +42,15 @@ public static Iterable data() { {new PresignedPhysicalAddressCreator(), "presigned"}}); } - @Parameter(1) - public String unusedAddressCreatorType; - @Parameter(0) public PhysicalAddressCreator pac; - static private class SimplePhysicalAddressCreator implements PhysicalAddressCreator { - public String createGetPhysicalAddress(S3FSTestBase o, String key) { - return o.s3Url(key); - } - - public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) { - String fullPath = String.format("%s/%s/%s/%s/%s-object", - o.sessionId(), namespace, repo, branch, path); - return new StagingLocation().physicalAddress(o.s3Url(fullPath)); - } - } - - static private class PresignedPhysicalAddressCreator implements PhysicalAddressCreator { - public void initConfiguration(Configuration conf) { - conf.set("fs.lakefs.access.mode", "presigned"); - } - - protected Date getExpiration() { - return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)); - } - - public String createGetPhysicalAddress(S3FSTestBase o, String key) { - Date expiration = getExpiration(); - URL presignedUrl = - o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key) - .withMethod(HttpMethod.GET) - .withExpiration(expiration)); - return presignedUrl.toString(); - } - - public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) { - String fullPath = String.format("%s/%s/%s/%s/%s-object", - o.sessionId(), namespace, repo, branch, path); - Date expiration = getExpiration(); - URL presignedUrl = - o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath) - .withMethod(HttpMethod.PUT) - .withExpiration(expiration)); - return new StagingLocation() - .physicalAddress(o.s3Url(fullPath)) - .presignedUrl(presignedUrl.toString()); - } - } + @Parameter(1) + public String unusedAddressCreatorType; @Override - protected void moreHadoopSetup() { - super.moreHadoopSetup(); - pac.initConfiguration(conf); - } - - // Return a location under namespace for this getPhysicalAddress call. - protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) { - StagingLocation stagingLocation = - pac.createPutStagingLocation(this, namespace, repo, branch, path); - mockServerClient.when(request() - .withMethod("GET") - .withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch)) - .withQueryStringParameter("path", path)) - .respond(response().withStatusCode(200) - .withBody(gson.toJson(stagingLocation))); - return stagingLocation; + protected PhysicalAddressCreator getPac() { + return pac; } @Test @@ -180,7 +113,7 @@ public void testMkdirs() throws IOException { mockGetPhysicalAddress("repo", "main", "dir1/dir2/dir3/", "repo-base/emptyDir"); ObjectStats newStats = makeObjectStats("dir1/dir2/dir3/") - .physicalAddress(pac.createGetPhysicalAddress(this, "repo-base/dir12")); + .physicalAddress(getPac().createGetPhysicalAddress(this, "repo-base/dir12")); mockStatObject("repo", "main", "dir1/dir2/dir3/", newStats); mockServerClient.when(request() @@ -209,7 +142,7 @@ public void testCreateExistingDirectory() throws IOException { mockStatObjectNotFound("repo", "main", "sub1/sub2/create.me"); ObjectStats stats = makeObjectStats("sub1/sub2/create.me/") - .physicalAddress(pac.createGetPhysicalAddress(this, "repo-base/sub1/sub2/create.me")); + .physicalAddress(getPac().createGetPhysicalAddress(this, "repo-base/sub1/sub2/create.me")); mockStatObject("repo", "main", "sub1/sub2/create.me/", stats); Exception e = @@ -234,7 +167,7 @@ public void testOpen() throws IOException, ApiException { String contents = "The quick brown fox jumps over the lazy dog."; byte[] contentsBytes = contents.getBytes(); String physicalPath = sessionId() + "/repo-base/open"; - String physicalKey = pac.createGetPhysicalAddress(this, physicalPath); + String physicalKey = getPac().createGetPhysicalAddress(this, physicalPath); int readBufferSize = 5; Path path = new Path("lakefs://repo/main/read.me"); @@ -285,7 +218,7 @@ public void testOpenWithInvalidUriChars() throws IOException, ApiException { String path = String.format("lakefs://repo/main/%s-x", suffix); ObjectStats stats = makeObjectStats(suffix + "-x") - .physicalAddress(pac.createGetPhysicalAddress(this, key)) + .physicalAddress(getPac().createGetPhysicalAddress(this, key)) .sizeBytes((long) contentsBytes.length); mockStatObject("repo", "main", suffix + "-x", stats); diff --git a/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java b/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java index d926c98410d..79dae41a81d 100644 --- a/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java +++ b/clients/hadoopfs/src/test/java/io/lakefs/S3FSTestBase.java @@ -3,6 +3,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.ClientConfiguration; +import com.amazonaws.HttpMethod; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.s3.AmazonS3; @@ -14,9 +15,12 @@ import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import io.lakefs.clients.sdk.model.*; +import static org.mockserver.model.HttpResponse.response; + import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -27,6 +31,9 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.utility.DockerImageName; +import java.net.URL; +import java.util.concurrent.TimeUnit; +import java.util.Date; import java.util.List; /** @@ -40,10 +47,10 @@ public abstract class S3FSTestBase extends FSTestBase { protected String s3Endpoint; protected AmazonS3 s3Client; - private static final DockerImageName MINIO = DockerImageName.parse("minio/minio:RELEASE.2021-06-07T21-40-51Z"); + private static final DockerImageName MINIO = DockerImageName.parse("minio/minio:RELEASE.2024-11-07T00-52-20Z"); @Rule - public final GenericContainer s3 = new GenericContainer(MINIO.toString()). + public final GenericContainer s3 = new GenericContainer(MINIO). withCommand("minio", "server", "/data"). withEnv("MINIO_ROOT_USER", S3_ACCESS_KEY_ID). withEnv("MINIO_ROOT_PASSWORD", S3_SECRET_ACCESS_KEY). @@ -65,7 +72,7 @@ public void s3ClientSetup() { ClientConfiguration clientConfiguration = new ClientConfiguration() .withSignerOverride("AWSS3V4SignerType"); - s3Endpoint = String.format("http://s3.local.lakefs.io:%d", s3.getMappedPort(9000)); + s3Endpoint = String.format("http://s3.local.lakefs.io:%d/", s3.getMappedPort(9000)); s3Client = new AmazonS3Client(creds, clientConfiguration); @@ -76,7 +83,8 @@ public void s3ClientSetup() { s3Bucket = makeS3BucketName(); s3Base = String.format("s3://%s/", s3Bucket); - LOG.info("S3: bucket {} => base URL {}", s3Bucket, s3Base); + LOG.info("S3 [endpoint {}]: bucket {} => base URL {}", + s3Endpoint, s3Bucket, s3Base); CreateBucketRequest cbr = new CreateBucketRequest(s3Bucket); s3Client.createBucket(cbr); @@ -117,6 +125,8 @@ protected void assertS3Object(StagingLocation stagingLocation, String contents) } } + protected abstract PhysicalAddressCreator getPac(); + protected void moreHadoopSetup() { s3ClientSetup(); @@ -125,5 +135,71 @@ protected void moreHadoopSetup() { conf.set(org.apache.hadoop.fs.s3a.Constants.SECRET_KEY, S3_SECRET_ACCESS_KEY); conf.set(org.apache.hadoop.fs.s3a.Constants.ENDPOINT, s3Endpoint); conf.set(org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR, "/tmp/s3a"); + getPac().initConfiguration(conf); + + LOG.info("Setup done!"); + } + + public static interface PhysicalAddressCreator { + default void initConfiguration(Configuration conf) {} + String createGetPhysicalAddress(S3FSTestBase o, String key); + StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path); + } + + protected static class SimplePhysicalAddressCreator implements PhysicalAddressCreator { + public String createGetPhysicalAddress(S3FSTestBase o, String key) { + return o.s3Url(key); + } + + public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) { + String fullPath = String.format("%s/%s/%s/%s/%s-object", + o.sessionId(), namespace, repo, branch, path); + return new StagingLocation().physicalAddress(o.s3Url(fullPath)); + } + } + + protected static class PresignedPhysicalAddressCreator implements PhysicalAddressCreator { + public void initConfiguration(Configuration conf) { + conf.set("fs.lakefs.access.mode", "presigned"); + } + + protected Date getExpiration() { + return new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)); + } + + public String createGetPhysicalAddress(S3FSTestBase o, String key) { + Date expiration = getExpiration(); + URL presignedUrl = + o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, key) + .withMethod(HttpMethod.GET) + .withExpiration(expiration)); + return presignedUrl.toString(); + } + + public StagingLocation createPutStagingLocation(S3FSTestBase o, String namespace, String repo, String branch, String path) { + String fullPath = String.format("%s/%s/%s/%s/%s-object", + o.sessionId(), namespace, repo, branch, path); + Date expiration = getExpiration(); + URL presignedUrl = + o.s3Client.generatePresignedUrl(new GeneratePresignedUrlRequest(o.s3Bucket, fullPath) + .withMethod(HttpMethod.PUT) + .withExpiration(expiration)); + return new StagingLocation() + .physicalAddress(o.s3Url(fullPath)) + .presignedUrl(presignedUrl.toString()); + } + } + + // Return a location under namespace for this getPhysicalAddress call. + protected StagingLocation mockGetPhysicalAddress(String repo, String branch, String path, String namespace) { + StagingLocation stagingLocation = + getPac().createPutStagingLocation(this, namespace, repo, branch, path); + mockServerClient.when(request() + .withMethod("GET") + .withPath(String.format("/repositories/%s/branches/%s/staging/backing", repo, branch)) + .withQueryStringParameter("path", path)) + .respond(response().withStatusCode(200) + .withBody(gson.toJson(stagingLocation))); + return stagingLocation; } }