Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2171: Support Hadoop vectored IO #1139

Merged
merged 18 commits into from
Apr 26, 2024

Conversation

steveloughran
Copy link
Contributor

Make sure you have checked all steps below.

Jira

Tests

  • My PR adds the following unit tests
    TestVectorIOBridge
    TestFileRangeBridge

It also parameterizes existing tests run with/without vector IO enabled.
TestParquetFileWriter
TestInputFormatColumnProjection
TestInputOutputFormat

Commits

  • My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

@danielcweeks
Copy link

danielcweeks commented Sep 15, 2023

@steveloughran This looks really great! I think my only comment would be about wether we can expose the implementation in a way that might be more pluggable. In Iceberg we have a similar parallel to the InputFile and SeekableStream, but it's not apparent to me that we would be able to adapt our IO implementation to leverage vectored reads.

Open to thoughts on how we might do that as well.

Maybe this would be possible by implementing the vectoredIO methods in the Iceberg adaptor class here?

@shangxinli
Copy link
Contributor

@steveloughran Thanks a lot for creating this PR! This is an important feature that we improve the reading performance of Parquet. I just took a brief look and they look great! I will spend some time later to review it.

@steveloughran
Copy link
Contributor Author

@shangxinli looking forward to your comments -anything you can do to test will be wonderful too!

@steveloughran
Copy link
Contributor Author

@danielcweeks that's a good point about pluggability.

  1. an interface/implementation split in parquet would line you up later to choose the back end, maybe?
  2. I've done an initial pass at an shim library to use vectored IO operations if a stream/hadoop version had it, but fall back to usual blocking reads if not (along with the same for everything else). but just getting the base vector io stuff into parquet is a lot simpler. I don't know if that would be useful for iceberg https://github.com/apache/hadoop-api-shim
  3. video on the whole topic

getting iceberg to pass down which stripes it wants to read is critical for this to work best with s3, abfs and gcs. how are you reading the files at present?

@steveloughran
Copy link
Contributor Author

@gszadovszky thanks for your comments, will update the PR

@parthchandra
Copy link
Contributor

@danielcweeks that's a good point about pluggability.
I don't know if that would be useful for iceberg https://github.com/apache/hadoop-api-shim

Iceberg can use the base Parquet File reader out of the box so should be able to use vector IO as it is.

getting iceberg to pass down which stripes it wants to read is critical for this to work best with s3, abfs and gcs. how are you reading the files at present?

However if the S3FileIO feature is enabled, Iceberg provides its own InputStream and InputFile implementation that use AWS SDK V2. Maybe an option to provide your own input stream to vector io might work.

@parthchandra
Copy link
Contributor

@mukund-thakur @steveloughran this is a great PR! Some numbers from an independent benchmark. I used Spark to parallelize the reading of all rowgroups (just the reading of the raw data) from TPC-DS/SF10000/store_sales using various APIS and here are some numbers for you.

32 executors, 16 cores
fs.s3a.threads.max = 20

Reader Avg Time (minutes) Median vs Baseline
Parquet 10.32 10 1
Parquet Vector IO 2.02 2 5.1
AWS SDK V2 9.86 10 1
AWS SDK V2 Async 9.66 9.6 1.1
AWS SDK V2 AsyncCrt 9.76 10 1.1
AWS SDK V2 S3TransferManager 9.58 9.5 1.1
AWS SDK V2 Async CRT Http Client 10.8 11 1

Summary - The various V2 SDK clients provide lower latency and better upload speeds but for raw data scans, they are all pretty much the same.
Increasing the parallelism as, vector IO does, has maximum benefit.

@mukund-thakur
Copy link
Contributor

@parthchandra Thanks for running the benchmarks. the numbers are impressive.

@ahmarsuhail
Copy link

ahmarsuhail commented Oct 17, 2023

@parthchandra just wanted to check, are these numbers with Iceberg and S3FileIO? With S3A now using SDKV2, I'm looking at running a similar benchmark too with async CRT clients, but have been seeings some issues around connection pool exhaustion.

@parthchandra
Copy link
Contributor

@ahmarsuhail No these numbers are not with iceberg and S3FileIO.
I used a modified (lots of stuff removed) version of the ParquetFileReader and a custom benchmark program that reads all the row groups in parallel and records the time spent in each read from S3. The modified version of ParquetFileReader can switch between the various methods of reading from S3.
The entry AWS SDK V2 is a near copy of the Iceberg S3FileIO code though.
I saw issues with the CRT client when running at scale causing JVM crashes. And the V2 transfer manager did not do range reads properly. Do share your experience.

@steveloughran
Copy link
Contributor Author

Thanks for the numbers; I am deep in the aws v2 sdk migration right now and haven't had a chance to work on this.

@steveloughran
Copy link
Contributor Author

...back on this. @parthchandra you know that hadoop trunk is built on the v2 sdk now?

@steveloughran steveloughran force-pushed the PARQUET-2171-vector-io-integrated branch from 7e02d53 to da3db9b Compare November 17, 2023 19:48
@steveloughran
Copy link
Contributor Author

OK, I've tried to address the changes as well as merge with master

The one thing I'm yet to do is the one by @danielcweeks : have an interface for which the hadoop vector IO would be just one implementation.

We effectively have that in SeekableInputStream; two new default methods: one a probe for the api availability and the other an invocation.


Would you be able to wire up the iceberg reader to that? And if not, what changes are needed?

One thing we would need to make sure was good is the awaitFuture stuff; that's a copy of what's in hadoop to handle async IO operations. There's also a hard coded timeout of 300s to wait for the results; I don't know/recall where that number came from but it's potentially dubious as it won't recover from network problems.

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! I've just scanned it for the 1st pass. Do you have any outstanding work on this PR?

@steveloughran
Copy link
Contributor Author

Code wise, no, other than reviews from others about what is the best place for things, such as that awaitFuture stuff or any other suggestions which people who know the parquet codebase think is best. Code works and we have been testing this through Amazon S3 Express storage for extra speed up. To be ruthless: there's no point paying the premium for that until you've embraced the extra speed ups you get from this first

@Fokko
Copy link
Contributor

Fokko commented Nov 29, 2023

@steveloughran Can you fix the compatibility issue?

Error:  Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.18.2:cmp (default) on project parquet-hadoop: There is at least one incompatibility: 
org.apache.parquet.hadoop.util.vectorio.BindingUtils.raiseInnerCause(java.util.concurrent.ExecutionException):CLASS_GENERIC_TEMPLATE_CHANGED -> [Help 1]

@steveloughran
Copy link
Contributor Author

@Fokko that's japicmp getting its logic wrong because it's a new file; thought I'd edited the build rules so it would ignore that.

anyway, need to fix the merge as something (#1209?) has just broken it

@steveloughran
Copy link
Contributor Author

Thanks. I'm away from my laptop until 2024 but really do want to get this in.

Copy link
Contributor

@mukund-thakur mukund-thakur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1. I think this is good to go in now.

@parthchandra
Copy link
Contributor

+1 from me as well.

@steveloughran
Copy link
Contributor Author

steveloughran commented Apr 2, 2024

tested across the different profiles, all good. i am seeing hadoop 3.4 mr test failures, but that happens on master too.

One thing I do want to change now is actually propose changing the package name from vectorio to something more like "wrapped io"

the reason for this is that in apache/hadoop#6686
I'm adding an "easy use with reflection" wrapper to openFile(), making it easy to

  • pass down filestatus already collected (saves a HEAD on the stores)
  • pass in read policy as "vectored, random"

Having a unified package for this stuff makes sense, and vectorio is too narrow

…pedio

This is to prepare the package as a future home for other ongoing
reflection support for parquet to get better cloud performance,
initially HADOOP-19131. Assist reflection IO with WrappedOperations class,
which adds a static method invoking FileSystem.openFile() with
various options to make it easy for parquet to adopt.

Move org.apache.hadoop.util.functional.FutureIO from BindingUtils
to to org.apache.parquet.hadoop.util.wrappedio.FutureIO

Change-Id: I414086c95d87710bdc14ac304fdf22aabf86002a
Goal: make any future move to hadoop release versions easier.
@steveloughran
Copy link
Contributor Author

Update

  • moved package to org.apache.parquet.hadoop.util.wrappedio
  • pulled copied hadoop.util.FutureIO methods into one of the same name

We could consider making BindingUtils package private.

@parthchandra
Copy link
Contributor

parthchandra commented Apr 3, 2024

We could consider making BindingUtils package private.

It's pretty generic and might be useful elsewhere so I feel it is ok to keep it public.

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reviewed for another pass. I've left some minor comments. Overall LGTM, thanks @steveloughran!


@Override
public String toString() {
return "range[" + this.offset + " - " + (this.offset + (long) this.length) + "]";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "range[" + this.offset + " - " + (this.offset + (long) this.length) + "]";
return "range[" + this.offset + " - " + (this.offset + (long) this.length) + ")";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change, given range starts with the "[" symbol? I'm happy to merge it, just not sure why unless you want to delineate that the second value in the range is inclusive, which is isn't, now I look at it. maybe it should be

return String.format("range[%,d to %,d]",
   offset, 
   (offset + (length > 0 ? (length - 1 ) : 0));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my intention is to let readers know whether each end is exclusive or inclusive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will tweak


/**
* Is the {@link #readVectored(List, ByteBufferAllocator)} method available?
* @param allocator the allocator to use for allocating ByteBuffers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched for a while on why allocator param is required and found that internally it checks !allocator.isDirect().

It seems that we can remove the extra param from readVectoredAvailable and modify ParquetFileReader.shouldUseVectoredIO() instead like below:

  private boolean shouldUseVectoredIO(final List<ConsecutivePartList> allParts) {
    return options.useHadoopVectoredIO()
        && f.readVectoredAvailable()
        && !options.getAllocator().isDirect()
        && arePartsValidForVectoredIO(allParts);
  }

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's your opinion in this? @steveloughran

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, you've noticed that! It's because I've realised that on s3/abfs/gcs network failures which don't recover after retries, we need a way to return the buffer to the pool. See https://issues.apache.org/jira/browse/HADOOP-19105

I actually need to do some more in general on s3a read() recovery: we retry on the GET calls and then keep the connection active only as long as it takes to read the data -so no risk of stale connections is low- but I do need to add the full resilience logic there

  • abort http connections considered unrecoverable, rather than return to pool
  • retry outstanding entries in each range
  • maybe actually handle failures by decombining adjacent ranges.

Having parquet pass the allocator as far down as it does means that supporting releases with the "return to pool" callback doesn't change the rest of the code, instead the bridge would first look for the new method, falling back to the original one if not found.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Thanks for the explanation!

pom.xml Outdated Show resolved Hide resolved
steveloughran and others added 2 commits April 4, 2024 17:47
* use Io rather than IO in fields, classnames
* package is now org.apache.parquet.hadoop.util.wrapped.io
* comment in pom clarifies that the FutureIO method is excluded from japicmp
  because it fails otherwise.

Change-Id: Id82d1239b8b0ee5d38f0487e20b9e128e8cad8eb
@wgtmac
Copy link
Member

wgtmac commented Apr 5, 2024

@steveloughran This looks really great! I think my only comment would be about wether we can expose the implementation in a way that might be more pluggable. In Iceberg we have a similar parallel to the InputFile and SeekableStream, but it's not apparent to me that we would be able to adapt our IO implementation to leverage vectored reads.

Open to thoughts on how we might do that as well.

Maybe this would be possible by implementing the vectoredIO methods in the Iceberg adaptor class here?

Do you want to double check this again? @danielcweeks @Fokko

VectorIOBridge.readVectoredRanges() is non static

Change-Id: Ic23068f8123d0b86b971b5627a5b724e86d9ddcd
@steveloughran
Copy link
Contributor Author

Right, I think I have addressed all the issues except for the design of a public API for other input sources.

I do think that would be good, not just for iceberg integration, but because it will let local filesystem clients do this through LocalInputFile. This is already in hadoop, where it only takes 100 LoC to implement (https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java#L319). (The hadoop API used java.io.nio when designing ours, so this isn't surprising).

Can we get this in and then I can collaborate with you and anyone else interested on having a broader API? I would obviously like what we've done to be ~the reference design, but the binding, testing and including this in the benchmarks are areas I need help from the experts in the project to understand how to fit it in the best.

Incidentally, you should be aware that the next bit of parquet work I want to do is to pass in seek policy and file status when opening files. This is trivial to do on Hadoop 3.3+ saves a HEAD request through the ABFS and S3A connectors, and let them tune prefetching/caching/GET range policy. However, trying to get that builder API through reflection is very complicated.

What I am doing is create some reflection-friendly static methods (apache/hadoop#6686), ship this in hadoop 3.3.9 and 3.4.1 and use the BindingUtils and DynMethods code to invoke. Parquet always calls getFileStatus() before opening a file, so the HEAD is completely superfluous. Note: that hadoop PR lifts and repackages the DynMethods classes so that for testing there we can be confident it works.

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@wgtmac
Copy link
Member

wgtmac commented Apr 11, 2024

@gszadovszky @shangxinli @ggershinsky Could you help review this for another pass? Thanks!

Now:
String.format("range[%,d - %,d)", offset, offset + length)

Change-Id: I5acde3bd2fa9a9d5f19b603082a894c44c9f51ad
@steveloughran
Copy link
Contributor Author

@wgtmac tweaked that FileRange.toString(); now

String.format("range[%,d - %,d)", offset, offset + length);

...that shows that the range is inclusive and gives , separators on large ranges.

This is just for debugging -but that doesn't mean it shouldn't be meaningful

* Make clear ranges must be disjoint
* Fill out exception list

Change-Id: I55db835f3b7d376d0a5cc3d627b85cf1e475a3ec
Negative offsets on read are raised as IllegalArgumentException

This isn't what the hadoop inner code does, but that is irrelevant
because the parquet-side validations happen first.

Change-Id: I94249ba8437372789dc8582b1b09a076ac7ab4e3
Change-Id: I5074de86870f5f773695e7c05d1f43bc2fa90253
@wgtmac
Copy link
Member

wgtmac commented Apr 24, 2024

I will merge this by the end of this week if no objection. We need to release parquet 1.14.0 and this is a must-have feature.

@steveloughran
Copy link
Contributor Author

thanks! FWIW ORC shipped with this feature recently too.
they also made their release java 17+ only which is quite aggressive and something I'm jealous of...java 8 is way past its EOL

@steveloughran
Copy link
Contributor Author

thanks for the approval. I have more plans. but first want to make it really easy for parquet to pick up hadoop-3.2/3.3 APIs through reflection; apache/hadoop#6686

get that into 3.3.9/3.4.1 and then parquet will be able to

  • open files with explicit read policy and skipping the head call
  • maybe use ByteBufferPositionedReadable.readFully() to read buffers. that's only currently in hdfs though, so the existing code is still used in most places.
  • more

if we can do vectored IO through reflection, then other stuff is there for the taking too.

@parthchandra
Copy link
Contributor

@wgtmac it would be great to get this in. I'm waiting with a followup to add simple metrics for the vector io read path and it would be nice if that might make it to the release as well.

@wgtmac wgtmac merged commit c8aee7f into apache:master Apr 26, 2024
9 checks passed
@wgtmac
Copy link
Member

wgtmac commented Apr 26, 2024

I just merged it. Thanks all!

@steveloughran
Copy link
Contributor Author

thanks! looking forward to other people's results.

I also have plans for more...

@@ -152,6 +161,35 @@ public class TestParquetFileWriter {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();

@Parameterized.Parameters(name = "vectored : {0}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow we need to add a validation that when tests with vectored IO enabled run, vectored IO actually gets triggered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.