From b6a7b51257583d5a2db925c095ad48021b44fe1f Mon Sep 17 00:00:00 2001 From: Roman Zolotov Date: Tue, 10 Dec 2024 19:22:53 +0200 Subject: [PATCH] ADBDEV-6844: Exclude extra requests to the S3 server - The writer corresponding to each type already checks for the existence of the file or directory. There is no need to perform this check additionally. Similarly, we do not need to create marker directories, as they are deleted anyway when files are written at the end of the operation. --- .../pxf/plugins/hdfs/LineBreakAccessor.java | 7 ++++++- .../pxf/plugins/hdfs/ParquetFileAccessor.java | 7 ++++++- .../pxf/plugins/hdfs/SequenceFileAccessor.java | 18 +++++------------- .../plugins/hdfs/utilities/HdfsUtilities.java | 9 +++++++++ .../hdfs/utilities/HdfsUtilitiesTest.java | 17 ++++++++++++++++- 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java index 131d2888ee..bcafdbd75b 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/LineBreakAccessor.java @@ -126,7 +126,12 @@ public boolean openForWrite() throws IOException { file = new Path(fileName); fs = FileSystem.get(URI.create(fileName), configuration); - HdfsUtilities.validateFile(file, fs); + + // We don't need neither to check file and folder neither create folder fos S3A protocol + // We will check the file during the creation of the output stream + if (!HdfsUtilities.isS3Request(context)) { + HdfsUtilities.validateFile(file, fs); + } // create output stream - do not allow overwriting existing file createOutputStream(file, codec); diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java index 232f89210e..cdaf0b8ce7 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/ParquetFileAccessor.java @@ -265,7 +265,12 @@ public boolean openForWrite() throws IOException, InterruptedException { context.getSegmentId(), fileName); file = new Path(fileName); fs = FileSystem.get(URI.create(fileName), configuration); - HdfsUtilities.validateFile(file, fs); + + // We don't need neither to check file and folder neither create folder fos S3A protocol + // We will check the file during the creation of the Parquet Writer + if (!HdfsUtilities.isS3Request(context)) { + HdfsUtilities.validateFile(file, fs); + } // Read schema file, if given String schemaFile = context.getOption("SCHEMA"); diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java index c1626d565b..691f5d3c92 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/SequenceFileAccessor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.SequenceFileRecordReader; import org.greenplum.pxf.api.OneRow; import org.greenplum.pxf.api.model.RequestContext; +import org.greenplum.pxf.plugins.hdfs.utilities.HdfsUtilities; import java.io.IOException; import java.util.EnumSet; @@ -83,19 +84,10 @@ public boolean openForWrite() throws Exception { fc = FileContext.getFileContext(configuration); defaultKey = new LongWritable(context.getSegmentId()); - if (fs.exists(file)) { - throw new IOException("file " + file.toString() - + " already exists, can't write data"); - } - - Path parent = file.getParent(); - if (!fs.exists(parent)) { - if (!fs.mkdirs(parent)) { - throw new IOException("Creation of dir '" + parent.toString() + "' failed"); - } - LOG.debug("Created new dir {}", parent); - } else { - LOG.debug("Directory {} already exists. Skip creating", parent); + // We don't need neither to check file and folder neither create folder fos S3A protocol + // We will check the file during the creation of the SequenceFile.Writer + if (!HdfsUtilities.isS3Request(context)) { + HdfsUtilities.validateFile(file, fs); } writer = null; diff --git a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilities.java b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilities.java index e5cd8f5fc9..e59e48b4b9 100644 --- a/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilities.java +++ b/server/pxf-hdfs/src/main/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilities.java @@ -24,13 +24,16 @@ import org.apache.hadoop.mapred.FileSplit; import org.greenplum.pxf.api.OneField; import org.greenplum.pxf.api.io.DataType; +import org.greenplum.pxf.api.model.RequestContext; import org.greenplum.pxf.api.utilities.Utilities; import org.greenplum.pxf.plugins.hdfs.HcfsFragmentMetadata; +import org.greenplum.pxf.plugins.hdfs.HcfsType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.Optional; /** * HdfsUtilities class exposes helper methods for PXF classes. @@ -78,6 +81,12 @@ public static void validateFile(Path file, FileSystem fs) } } + public static boolean isS3Request(RequestContext context) { + return Optional.ofNullable(context.getProtocol()) + .map(p -> p.equalsIgnoreCase(HcfsType.S3A.name())) + .orElse(false); + } + /** * Returns string serialization of list of fields. Fields of binary type * (BYTEA) are converted to octal representation to make sure they will be diff --git a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java index 676f1494e1..25d69837cd 100644 --- a/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java +++ b/server/pxf-hdfs/src/test/java/org/greenplum/pxf/plugins/hdfs/utilities/HdfsUtilitiesTest.java @@ -29,7 +29,7 @@ import java.util.Collections; import java.util.List; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; public class HdfsUtilitiesTest { @@ -52,4 +52,19 @@ public void testParseFileSplit() { assertEquals(fileSplit.getLength(), 100); assertEquals(fileSplit.getPath().toString(), "/abc/path/to/data/source"); } + + @Test + public void testIsS3Request() { + RequestContext context = new RequestContext(); + context.setProtocol("s3a"); + HdfsUtilities.isS3Request(context); + assertTrue(HdfsUtilities.isS3Request(context)); + } + + @Test + public void testIsS3RequestWithoutProtocolSet() { + RequestContext context = new RequestContext(); + HdfsUtilities.isS3Request(context); + assertFalse(HdfsUtilities.isS3Request(context)); + } }