Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support DictionaryArrays in Arrow Flight #3389

Open
Tracked by #3301
alamb opened this issue Dec 23, 2022 · 10 comments
Open
Tracked by #3301

Support DictionaryArrays in Arrow Flight #3389

alamb opened this issue Dec 23, 2022 · 10 comments
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog

Comments

@alamb
Copy link
Contributor

alamb commented Dec 23, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We are building our application on top of FlightSQL as the transport protocol, with the eventual goal of FlightSQL: #3301

We make heavy use of DictionaryArrays to encode low cardinality string data. At the moment, we need to decode DictionaryArrays prior to sending them via Arrow Flight -- see https://github.com/influxdata/influxdb_iox/issues/1318

Describe the solution you'd like
I would like the Flight Client / stuff in arrow-rs to handle Dictionary encoded data efficiently and correctly.

Describe alternatives you've considered

Additional context
Possibly related / duplicate: #1206

cc @tustvold and @avantgardnerio

@alamb alamb added enhancement Any new improvement worthy of a entry in the changelog arrow-flight Changes to the arrow-flight crate labels Dec 23, 2022
@alamb alamb mentioned this issue Dec 23, 2022
11 tasks
@alamb alamb changed the title Support DictionaryArrays in ArrowFlight Support DictionaryArrays in Arrow Flight Dec 23, 2022
@alamb
Copy link
Contributor Author

alamb commented Jan 2, 2023

Here is my high level plan.

Reference: https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc

  1. Add an option to `FlightIpcEncoder to "Assign dictionary ids based on pointers", defaults to true
  2. If this option is active, the encoder will keep a mapping from actual pointer of the dictionary to ids
  3. The encoder will check any new new dictionary arrays encountered for an existing entry in the map
  4. If the entry already exists, the dictionary batch that is transmitted will use the pre-existing entry in the map
  5. If the entry does not exist, the dictionary will be transmitted and the entry sent with the batch

There is also the usecase where there are arrays that have the same logical dictionary but the contents are in different actual arrays. While I thought about adding a feature to directly "normalize" these dictionaries by comparing their values, I would like to avoid this for the first version if possible because:

  1. It will be non trivially expensive
  2. It can be done as a pre-pass over the record batches for systems that want to trade off additional CPU time for less network bandwidtha
  3. I can imagine users might want to do other normalizing operations (like combine / coalesce dictionaries) prior to transmission, but that would be more system specific

@tustvold
Copy link
Contributor

tustvold commented Jan 2, 2023

Assign dictionary ids based on pointers

I think you will need to keep an owned ArrayData to avoid issues due to pointer reuse, but otherwise this sounds like a good idea. ArrayData::ptr_eq already has the necessary logic.

You might want to think about only keeping the last dictionary for each column, to constrain memory growth and keep things simple (you wouldn't need a map). Whilst this may unnecessarily retransmit a dictionary, I'm not sure that reusing and interleaving dictionaries for the same column is a common enough use case to be concerned with?

@alamb
Copy link
Contributor Author

alamb commented Jan 2, 2023

You might want to think about only keeping the last dictionary for each column, to constrain memory growth and keep things simple (you wouldn't need a map).

Good idea. In fact this may be what the IPC format requires (the dictionary_id is per field rather than per batch as I was thinking). I believe the spec calls a new dictionary for the same field a "delta dictionary batch"

However, it appears that the arrow ipc reader doesn't actually support this yet

if batch.isDelta() {
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
));

I will probably start my work with some arrow-ipc test cleanup (which will convince me we have coverage in the integration tests) and then move on to actually adding support for delta dictionary batches in the ipc reader/writer and then I can add support to flight

@tustvold
Copy link
Contributor

tustvold commented Jan 2, 2023

In fact this may be what the IPC format requires

Quite possibly, I think the field is per column and if you want to change dictionary part way through, you need to reserialize it, even if it is actually reverting to a dictionary sent before.

I believe the spec calls a new dictionary for the same field a "delta dictionary batch"

The spec states

Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID.

So I don't think we need to support delta dictionaries in order to unblock this ticket, we can just send the entire dictionary again

@e-dard
Copy link
Contributor

e-dard commented Feb 2, 2023

I ran into this today when I was surprised to find all my dictionaries were Utf8 on the server side of the Flight connection. +💯 on getting dictionary transmission support in Flight.

@alamb
Copy link
Contributor Author

alamb commented Feb 2, 2023

We would, of course, welcome contributions in this area. I hope to eventually implement this as it will improve the iox query performance / latency with data from the ingester, but haven't had a chance

@alexwilcoxson-rel
Copy link
Contributor

Hey folks, we're looking at needing to leverage dictionaries in flight to reduce memory from low cardinality wide columns.

I have been tinkering with a local branch and have an option on the flight encoder to skip dictionary hydration and also allow dictionaries to be resent/replaced with each batch. Still doing some testing with our use case to see if it's viable, but would this be acceptable for an interim option before some of this other proposed work can be picked up?

@tustvold
Copy link
Contributor

tustvold commented Sep 26, 2023

I think an option to send the dictionaries every batch sounds reasonable to me, whilst perhaps not "optimal", there are many cases in which case it will be better than the current behaviour, and we can always revisit this later 👍

@alamb
Copy link
Contributor Author

alamb commented Sep 26, 2023

I think an option to send the dictionaries every batch sounds reasonable to me, whilst perhaps not "optimal", there are many cases in which case it will be better than the current behaviour, and we can always revisit this later 👍

I agree. Excited to see how that looks @alexwilcoxson-rel . Thank you 🙏

@alexwilcoxson-rel
Copy link
Contributor

Opened issue #4895 and PR #4896

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow-flight Changes to the arrow-flight crate enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

4 participants