Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-6844: Exclude extra requests to the S3 server #148

Open
wants to merge 1 commit into
base: pxf-6.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ public boolean openForWrite() throws IOException {

file = new Path(fileName);
fs = FileSystem.get(URI.create(fileName), configuration);
HdfsUtilities.validateFile(file, fs);

// We don't need neither to check file and folder neither create folder fos S3A protocol
// We will check the file during the creation of the output stream
if (!HdfsUtilities.isS3Request(context)) {
HdfsUtilities.validateFile(file, fs);
}

// create output stream - do not allow overwriting existing file
createOutputStream(file, codec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,12 @@ public boolean openForWrite() throws IOException, InterruptedException {
context.getSegmentId(), fileName);
file = new Path(fileName);
fs = FileSystem.get(URI.create(fileName), configuration);
HdfsUtilities.validateFile(file, fs);

// We don't need neither to check file and folder neither create folder fos S3A protocol
// We will check the file during the creation of the Parquet Writer
if (!HdfsUtilities.isS3Request(context)) {
HdfsUtilities.validateFile(file, fs);
}

// Read schema file, if given
String schemaFile = context.getOption("SCHEMA");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.greenplum.pxf.api.OneRow;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.plugins.hdfs.utilities.HdfsUtilities;

import java.io.IOException;
import java.util.EnumSet;
Expand Down Expand Up @@ -83,19 +84,10 @@ public boolean openForWrite() throws Exception {
fc = FileContext.getFileContext(configuration);
defaultKey = new LongWritable(context.getSegmentId());

if (fs.exists(file)) {
throw new IOException("file " + file.toString()
+ " already exists, can't write data");
}

Path parent = file.getParent();
if (!fs.exists(parent)) {
if (!fs.mkdirs(parent)) {
throw new IOException("Creation of dir '" + parent.toString() + "' failed");
}
LOG.debug("Created new dir {}", parent);
} else {
LOG.debug("Directory {} already exists. Skip creating", parent);
// We don't need neither to check file and folder neither create folder fos S3A protocol
// We will check the file during the creation of the SequenceFile.Writer
if (!HdfsUtilities.isS3Request(context)) {
HdfsUtilities.validateFile(file, fs);
}

writer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import org.apache.hadoop.mapred.FileSplit;
import org.greenplum.pxf.api.OneField;
import org.greenplum.pxf.api.io.DataType;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.api.utilities.Utilities;
import org.greenplum.pxf.plugins.hdfs.HcfsFragmentMetadata;
import org.greenplum.pxf.plugins.hdfs.HcfsType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

/**
* HdfsUtilities class exposes helper methods for PXF classes.
Expand Down Expand Up @@ -78,6 +81,12 @@ public static void validateFile(Path file, FileSystem fs)
}
}

public static boolean isS3Request(RequestContext context) {
return Optional.ofNullable(context.getProtocol())
.map(p -> p.equalsIgnoreCase(HcfsType.S3A.name()))
.orElse(false);
}

/**
* Returns string serialization of list of fields. Fields of binary type
* (BYTEA) are converted to octal representation to make sure they will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Collections;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;

public class HdfsUtilitiesTest {

Expand All @@ -52,4 +52,19 @@ public void testParseFileSplit() {
assertEquals(fileSplit.getLength(), 100);
assertEquals(fileSplit.getPath().toString(), "/abc/path/to/data/source");
}

@Test
public void testIsS3Request() {
RequestContext context = new RequestContext();
context.setProtocol("s3a");
HdfsUtilities.isS3Request(context);
assertTrue(HdfsUtilities.isS3Request(context));
}

@Test
public void testIsS3RequestWithoutProtocolSet() {
RequestContext context = new RequestContext();
HdfsUtilities.isS3Request(context);
assertFalse(HdfsUtilities.isS3Request(context));
}
}