Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce kudo serialization format. #2532

Draft
wants to merge 3 commits into
base: branch-24.12
Choose a base branch
from

Conversation

liurenjie1024
Copy link
Collaborator

@liurenjie1024 liurenjie1024 commented Oct 24, 2024

Design

This pr introduces kudo serialization format, which significantly improved serialization/deserialization time compared to jcudf serialization format. Kudo is optimized for columnar batch serialization used during spark shuffle, and the improvements are based on two observations:

  1. During spark shuffle we have a lot of contexts provided by runtime such as table schema, that means we could simplify headers. In kudo's header, we only contains necessary fields such as offset, number of rows, data lens, and one byte for each column to indicate whether it has validity buffer.
  2. Gpu's columnar batch is typically much larger than cpu's vectorized execution engine, that means we almost always need to do batch concatanation in shuffle read time. When serializing a part of a larger columnar batch, unlike jcudf, which calculates exact validity buffer and offset buffer, we only record offset and number of rows in header, and copy necessay bytes, since the exact buffer could be restored at read time when do concating. This saves a lot of compuation when doing serialization.

Performance

We have observed 30%-4000% serialization time improvement, up to 200% deserialization time improvement, and similar concat batching performance.

Notice

This pr is a draft mainly for skimming through the whole picture of codes, and we could work out a plan for splitting them into smaller pr so that it would be easier to review.

@liurenjie1024 liurenjie1024 marked this pull request as draft October 24, 2024 03:15
Signed-off-by: liurenjie1024 <[email protected]>
Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some initial comments from skimming through the code. One way to break this up into smaller PRs:

  • Have a "generic utilities" PR that includes Pair, SlicedTable, TableUtils, etc.
  • Have a "schema utilities" PR that includes the visitor framework and related utilities (i.e.: the stuff under the schema package)
  • Write-side PR that includes the parts of the Kudo serializer related to writing partitions of a table to a stream (and related tests)
  • Read-side PR that includes the remaining parts of the Kudo serializer related to merging partitions (and related tests)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing copyright and license. Same comment for many other files in this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

return numRows;
}

public Table getTable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be better named getUnslicedTable or getBaseTable as its name implies it could manifest the table after slicing takes effect.

There needs to be documentation for this method re: ownership expectations of the result, i.e.: is the caller responsible for closing the table? If so, this should arguably invalidate the local table here. If not, who ultimately should close it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

Comment on lines 13 to 22
public static Schema schemaOf(Table t) {
Schema.Builder builder = Schema.builder();

for (int i = 0; i < t.getNumberOfColumns(); i++) {
ColumnVector cv = t.getColumn(i);
addToSchema(cv, "col_" + i + "_", builder);
}

return builder.build();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for this? I would expect the RAPIDS Accelerator to want to build a Schema from the Spark schema rather than a schema from a Table that is forced to use fake column names.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was used in write time, but currently it's useless since I have removed dependency of schema in write time, I'll remove them.

Comment on lines +59 to +69
public static void ensure(boolean condition, String message) {
if (!condition) {
throw new IllegalArgumentException(message);
}
}

public static void ensure(boolean condition, Supplier<String> messageSupplier) {
if (!condition) {
throw new IllegalArgumentException(messageSupplier.get());
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These have nothing to do with Table, should be in an assertion, precondition, or more generic utility class. Similar comment for closeException, closeQuietly, and withTime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and DataWriter are copies from cudf, should discuss whether we should just make the cudf versions public instead of replicating, especially if we don't anticipate needing any changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

Comment on lines +17 to +36
MAKE_CUDF_COLUMN_VIEW = ColumnView.class.getDeclaredMethod("makeCudfColumnView",
int.class, int.class, long.class, long.class, long.class, long.class, int.class,
int.class, long[].class);
MAKE_CUDF_COLUMN_VIEW.setAccessible(true);

FROM_VIEW_WITH_CONTIGUOUS_ALLOCATION = ColumnVector.class.getDeclaredMethod(
"fromViewWithContiguousAllocation",
long.class, DeviceMemoryBuffer.class);
FROM_VIEW_WITH_CONTIGUOUS_ALLOCATION.setAccessible(true);

CONTIGUOUS_TABLE_CONSTRUCTOR = ContiguousTable.class.getDeclaredConstructor(Table.class,
DeviceMemoryBuffer.class);
CONTIGUOUS_TABLE_CONSTRUCTOR.setAccessible(true);

COPY_FROM_STREAM = HostMemoryBuffer.class.getDeclaredMethod("copyFromStream",
long.class, InputStream.class, long.class);
COPY_FROM_STREAM.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be hacking our own code for visibility. HostMemoryBuffer.copyFromStream we can trivially expose. ColumnVector.fromViewWithContiguousAllocation is now public.

Regarding the other two methods, I'm not sure we need them if we just create a Table rather than a ContiguousTable. I'm guessing the caller doesn't actually care about ContiguousTable but rather wants just the Table. Also we should consider having this implementation build a HostTable in host memory and then leverage HostTable to build the resulting device Table.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be hacking our own code for visibility. HostMemoryBuffer.copyFromStream we can trivially expose. ColumnVector.fromViewWithContiguousAllocation is now public.

Sure, this is a hack so that we could move fast to deliver to customer. I'll fix this in formal pr.

Regarding the other two methods, I'm not sure we need them if we just create a Table rather than a ContiguousTable. I'm guessing the caller doesn't actually care about ContiguousTable but rather wants just the Table. Also we should consider having this implementation build a HostTable in host memory and then leverage HostTable to build the resulting device Table.

The reason we return ContigouseTable is that when we integrate with spark rapids, I want to reuse the code.

* Magic number "KUDO" in ASCII.
*/
private static final int SER_FORMAT_MAGIC_NUMBER = 0x4B55444F;
private static final short VERSION_NUMBER = 0x0001;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JCudfSerialization was built as a generic serializer, where the software writing and reading could change. This is specific to shuffle. I don't think we need a version number, and arguably can embed that concept the magic number if it's ever needed (and I cannot see why it would be).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I'll pack them into one integer. The reason I want to keep it is for easier debug.


// Useful for reducing calculations in writing.
private long offset;
private long numRows;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's impossible to have more than a 31-bit row count, this should just be an int.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

Comment on lines +25 to +27
private long validityBufferLen;
private long offsetBufferLen;
private long totalDataLen;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should all be ints. It's not possible to serialize a single object that is > 2GB in Spark.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix.

Comment on lines +28 to +30
// This is used to indicate the validity buffer for the columns.
// 1 means that this column has validity data, 0 means it does not.
private byte[] hasValidityBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a bit mask, not byte-per-column. We never have to bit shift the entire hasValidityBuffer mask or anything complex like that, just single-bit test this which is trivial to compute, both on the write and read sides, and this saves a lot of space when we're dealing with tons of columns and tons of partitions across tons of tasks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix it.

@liurenjie1024
Copy link
Collaborator Author

Hi, @jlowe First pr is here: rapidsai/cudf#17179

This was referenced Oct 29, 2024
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request Oct 30, 2024
This is the first pr of [a larger one](NVIDIA/spark-rapids-jni#2532) to introduce a new serialization format. It make `ai.rapids.cudf.HostMemoryBuffer#copyFromStream` public. For more background, see NVIDIA/spark-rapids-jni#2496

Authors:
  - Renjie Liu (https://github.com/liurenjie1024)
  - Jason Lowe (https://github.com/jlowe)

Approvers:
  - Jason Lowe (https://github.com/jlowe)
  - Alessandro Bellina (https://github.com/abellina)

URL: #17179
This was referenced Nov 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants