-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Go][Parquet] Arrow DATE64 type is coerced into Parquet TIMESTAMP[ms] logical type instead of DATE (32-bit) #39456
Comments
I'm planning to open a PR with the proposed change. |
kou
changed the title
[GO][PARQUET] Arrow DATE64 type is coerced into Parquet TIMESTAMP[ms] logical type instead of DATE (32-bit)
[Go][Parquet] Arrow DATE64 type is coerced into Parquet TIMESTAMP[ms] logical type instead of DATE (32-bit)
Jan 4, 2024
zeroshade
pushed a commit
that referenced
this issue
Jan 9, 2024
…ical Type (#39460) ### Rationale for this change Closes: #39456 ### What changes are included in this PR? Update physical and logical type mapping from Arrow to Parquet for DATE64 type ### Are these changes tested? Yes, - Update expected schema mapping in existing test - Tests asserting new behavior - Arrow DATE64 will roundtrip -> Parquet -> Arrow as DATE32 - Arrow DATE64 _not aligned_ to exact date boundary will truncate to milliseconds at boundary of greatest full day on Parquet roundtrip ### Are there any user-facing changes? Yes, users of `pqarrow.FileWriter` will produce Parquet files containing `DATE` logical type instead of `TIMESTAMP[ms]` when writing Arrow data containing DATE64 field(s). The proposed implementation truncates `int64` values to be divisible by 86400000 rather than validating that this is already the case, as some implementations do. I am happy to add this validation if it would be preferred, but the truncating behavior will likely break fewer existing users. I'm not sure whether this is technically considered a breaking change to a public API and if/how it should be communicated. Any direction regarding this would be appreciated. * Closes: #39456 Authored-by: Joel Lubinitsky <[email protected]> Signed-off-by: Matt Topol <[email protected]>
clayburn
pushed a commit
to clayburn/arrow
that referenced
this issue
Jan 23, 2024
…TE Logical Type (apache#39460) ### Rationale for this change Closes: apache#39456 ### What changes are included in this PR? Update physical and logical type mapping from Arrow to Parquet for DATE64 type ### Are these changes tested? Yes, - Update expected schema mapping in existing test - Tests asserting new behavior - Arrow DATE64 will roundtrip -> Parquet -> Arrow as DATE32 - Arrow DATE64 _not aligned_ to exact date boundary will truncate to milliseconds at boundary of greatest full day on Parquet roundtrip ### Are there any user-facing changes? Yes, users of `pqarrow.FileWriter` will produce Parquet files containing `DATE` logical type instead of `TIMESTAMP[ms]` when writing Arrow data containing DATE64 field(s). The proposed implementation truncates `int64` values to be divisible by 86400000 rather than validating that this is already the case, as some implementations do. I am happy to add this validation if it would be preferred, but the truncating behavior will likely break fewer existing users. I'm not sure whether this is technically considered a breaking change to a public API and if/how it should be communicated. Any direction regarding this would be appreciated. * Closes: apache#39456 Authored-by: Joel Lubinitsky <[email protected]> Signed-off-by: Matt Topol <[email protected]>
zeroshade
pushed a commit
to apache/arrow-adbc
that referenced
this issue
Jan 26, 2024
# What - Replace Snowflake bulk ingestion with Parquet-based approach with higher throughput and better type support - Previously: INSERT bind parameters were uploaded to a CSV-based stage, once per record batch - Now: Parquet files written concurrently to stage independently of record batch size. Parquet logical types are used to infer schema on COPY. - Tests to validate type support and consistency through Arrow -> Parquet -> Snowflake -> Arrow roundtrip - Improved type mapping between Arrow <-> Snowflake timestamps. [TIMESTAMP_LTZ](https://docs.snowflake.com/en/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz) is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ, which can lead to lossy roundtrips. - Minor bugfix where Snowflake local timestamps with timezone set to UTC were being interpreted as non-local. # Why - Implements #1327, which comes from improvement request #1322 - BindStream ingestion is significantly faster - Arrow type support is improved # Methodology The general approach for ingestion is most clearly demonstrated by the path taken when `stmt.Bind()` for a single record is used: ### IngestRecord ```mermaid flowchart LR A(Record) --> B(Write Parquet) B --> C(Upload File) C --> D(Execute COPY) D --> E(Check Row Count) ``` The Arrow record is written to a Parquet file due to its logical type support, compressibility, and native Snowflake support. The file is then uploaded to a temporary Snowflake stage via PUT query, and then loaded into the target table via COPY query. Once the COPY has finished, one more query to check the resulting row count is dispatched to accurately return the number of rows affected. This is used instead of counting the Arrow rows written in case there are any undetected losses when importing the uploaded file into Snowflake. A similar approach is taken when ingesting an arbitrarily large stream of records via `stmt.BindStream()`, but makes use of several opportunities to parallelize the work involved at different stages: ### IngestStream ```mermaid flowchart LR A(Read Records) --> B(Write Parquet) A --> C(Write Parquet) A --> D(Write Parquet) A --> E(Write Parquet) B --> J(Buffer Pool) C --> J D --> J E --> J J --> K(Upload File) J --> L(Upload File) K --> M(Finalize COPY) L --> M M --> N(Check Row Count) O(File Ready) --> P(Execute COPY) P --> O ``` The same steps are used, but the stream of records is now distributed among a pool of Parquet writers. This step is inherently CPU-bound, so it is desirable for it to scale independently with the availability of logical cores for writing/compression. These Parquet files are written to a buffer pool in memory to help decouple the upload stage from writing, and so that a writer can start working on the next file _while_ the last file it wrote is being uploaded. Uploads from the buffer pool also benefit from parallelism, but more so to maximize network utilization by limiting idle time between uploads and amortizing potential slowdown in any one upload. Technically, only a single COPY command is required after the last file is uploaded in order to load the Parquet files into the Snowflake table. However, on many warehouses this operation takes as long or even longer than the upload itself but can be made faster by paying for a larger warehouse. Given the batched approach taken and that the COPY command is idempotent, we can execute COPY repeatedly as files are uploaded to load them into the table on an ongoing basis. These COPY queries are executed asynchronously and listen for an upload-completed callback to ensure at least one file will be loaded by the query (otherwise it will no-op so this just prevents spamming Snowflake with a bunch of no-op COPYs). Empirically, ingestion works reasonably well on an XS warehouse. COPY speed is no longer a bottleneck with an S warehouse with high-speed home internet, or on an M warehouse with same-region data center networking. # Performance Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM) Snowflake warehouse size M, same GCP region as Snowflake account Default ingestion settings Benchmarking TPC-H Lineitem @ SF1 (6M Rows): - Current: 11m50s - New: 14s Benchmarking TPC-H Lineitem @ SF10 (60M Rows): - Current: Didn't attempt - New: 1m16s _This configuration is CPU bound, so I did another attempt with more cores available..._ Now with GCP e2-standard-4 (4 vCPU, 16GB RAM) Benchmarking TPC-H Lineitem @ SF1 (6M Rows): - Current: 11m17s - New: 9.5s Benchmarking TPC-H Lineitem @ SF10 (60M Rows): - Current: 1h47m - New: 45s # Considerations - Snowflake [guides](https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques) indicate that ingestion via CSV is the fastest. Experimentally, it does appear to be true that a COPY query on staged CSV files executes much faster than for Parquet files. However by distributing the COPY workloads _in parallel to_ the batched file uploads, overall performance is better with Parquet since it can be compressed _much_ more efficiently allowing the upload to complete in less time and with fewer bytes transferred than with CSV. Type support is also much better. - Single-Record ingestion performance is slightly worse than the previous INSERT-bind approach. As a rough idea, a record that previously ingested in about 1.7s now ingests in about 2.5s. However, the new approach does come with expanded type support and better consistency with the streaming approach. - An ingestion run that fails part-way through may leave the table with partial results. Transaction semantics may be added in the future by overriding the CopyConcurrency parameter to be 0, in which case only the final COPY will execute. # Additional Work ### Blocking - ~Timestamps will roundtrip properly after Arrow [GH-39466](apache/arrow#39466) is closed. A test is included but skipped for now.~ - ~Date64 will roundtrip properly after Arrow [GH-39456](apache/arrow#39456) is closed. A test is included but skipped for now.~ ### Non-Blocking - Compression codec and level are included in `ingestOptions` but are not configurable using `stmt.SetOption()`. It is trivial to add this, but it would be nice to be able to use the currently internal [CompressionCodecFromString](https://github.com/apache/arrow/blob/e6323646558ee01234ce58af273c5a834745f298/go/parquet/internal/gen-go/parquet/parquet.go#L387-L399) method to automatically pick up support for any other codecs added in the future. Captured in #1473. - List and Map types have some issues on ingestion. Snowflake returns `SQL execution internal error` whenever repetition level is greater than 0. Still some more investigation to do here. This is non-blocking because neither type was previously supported for ingestion. - Context cancelation is supported for all goroutines and queries executed as part of ingestion, _except_ for the PUT query (i.e. file uploads). This issue is being tracked in gosnowflake [1028](snowflakedb/gosnowflake#1028). In practice, it likely takes just a few seconds for in-progress uploads to complete and properly conclude cancelation. Once this issue is fixed, the queries would be canceled in Snowflake, allowing the process to exit faster and reduce unnecessary work. - ~The code previously meant to map Snowflake types to Go types is no longer used. It may still be useful for binding an Arrow record to an arbitrary Update query, but `stmt.Prepare` should be implemented first to follow ADBC spec for binding parameters.~
soumyadsanyal
pushed a commit
to soumyadsanyal/arrow-adbc
that referenced
this issue
Jan 31, 2024
…1456) # What - Replace Snowflake bulk ingestion with Parquet-based approach with higher throughput and better type support - Previously: INSERT bind parameters were uploaded to a CSV-based stage, once per record batch - Now: Parquet files written concurrently to stage independently of record batch size. Parquet logical types are used to infer schema on COPY. - Tests to validate type support and consistency through Arrow -> Parquet -> Snowflake -> Arrow roundtrip - Improved type mapping between Arrow <-> Snowflake timestamps. [TIMESTAMP_LTZ](https://docs.snowflake.com/en/sql-reference/data-types-datetime#timestamp-ltz-timestamp-ntz-timestamp-tz) is more consistent with Arrow timestamp semantics than TIMESTAMP_TZ, which can lead to lossy roundtrips. - Minor bugfix where Snowflake local timestamps with timezone set to UTC were being interpreted as non-local. # Why - Implements apache#1327, which comes from improvement request apache#1322 - BindStream ingestion is significantly faster - Arrow type support is improved # Methodology The general approach for ingestion is most clearly demonstrated by the path taken when `stmt.Bind()` for a single record is used: ### IngestRecord ```mermaid flowchart LR A(Record) --> B(Write Parquet) B --> C(Upload File) C --> D(Execute COPY) D --> E(Check Row Count) ``` The Arrow record is written to a Parquet file due to its logical type support, compressibility, and native Snowflake support. The file is then uploaded to a temporary Snowflake stage via PUT query, and then loaded into the target table via COPY query. Once the COPY has finished, one more query to check the resulting row count is dispatched to accurately return the number of rows affected. This is used instead of counting the Arrow rows written in case there are any undetected losses when importing the uploaded file into Snowflake. A similar approach is taken when ingesting an arbitrarily large stream of records via `stmt.BindStream()`, but makes use of several opportunities to parallelize the work involved at different stages: ### IngestStream ```mermaid flowchart LR A(Read Records) --> B(Write Parquet) A --> C(Write Parquet) A --> D(Write Parquet) A --> E(Write Parquet) B --> J(Buffer Pool) C --> J D --> J E --> J J --> K(Upload File) J --> L(Upload File) K --> M(Finalize COPY) L --> M M --> N(Check Row Count) O(File Ready) --> P(Execute COPY) P --> O ``` The same steps are used, but the stream of records is now distributed among a pool of Parquet writers. This step is inherently CPU-bound, so it is desirable for it to scale independently with the availability of logical cores for writing/compression. These Parquet files are written to a buffer pool in memory to help decouple the upload stage from writing, and so that a writer can start working on the next file _while_ the last file it wrote is being uploaded. Uploads from the buffer pool also benefit from parallelism, but more so to maximize network utilization by limiting idle time between uploads and amortizing potential slowdown in any one upload. Technically, only a single COPY command is required after the last file is uploaded in order to load the Parquet files into the Snowflake table. However, on many warehouses this operation takes as long or even longer than the upload itself but can be made faster by paying for a larger warehouse. Given the batched approach taken and that the COPY command is idempotent, we can execute COPY repeatedly as files are uploaded to load them into the table on an ongoing basis. These COPY queries are executed asynchronously and listen for an upload-completed callback to ensure at least one file will be loaded by the query (otherwise it will no-op so this just prevents spamming Snowflake with a bunch of no-op COPYs). Empirically, ingestion works reasonably well on an XS warehouse. COPY speed is no longer a bottleneck with an S warehouse with high-speed home internet, or on an M warehouse with same-region data center networking. # Performance Running on GCP e2-medium (shared-core 1 vCPU, 4GB RAM) Snowflake warehouse size M, same GCP region as Snowflake account Default ingestion settings Benchmarking TPC-H Lineitem @ SF1 (6M Rows): - Current: 11m50s - New: 14s Benchmarking TPC-H Lineitem @ SF10 (60M Rows): - Current: Didn't attempt - New: 1m16s _This configuration is CPU bound, so I did another attempt with more cores available..._ Now with GCP e2-standard-4 (4 vCPU, 16GB RAM) Benchmarking TPC-H Lineitem @ SF1 (6M Rows): - Current: 11m17s - New: 9.5s Benchmarking TPC-H Lineitem @ SF10 (60M Rows): - Current: 1h47m - New: 45s # Considerations - Snowflake [guides](https://community.snowflake.com/s/article/How-to-Load-Terabytes-Into-Snowflake-Speeds-Feeds-and-Techniques) indicate that ingestion via CSV is the fastest. Experimentally, it does appear to be true that a COPY query on staged CSV files executes much faster than for Parquet files. However by distributing the COPY workloads _in parallel to_ the batched file uploads, overall performance is better with Parquet since it can be compressed _much_ more efficiently allowing the upload to complete in less time and with fewer bytes transferred than with CSV. Type support is also much better. - Single-Record ingestion performance is slightly worse than the previous INSERT-bind approach. As a rough idea, a record that previously ingested in about 1.7s now ingests in about 2.5s. However, the new approach does come with expanded type support and better consistency with the streaming approach. - An ingestion run that fails part-way through may leave the table with partial results. Transaction semantics may be added in the future by overriding the CopyConcurrency parameter to be 0, in which case only the final COPY will execute. # Additional Work ### Blocking - ~Timestamps will roundtrip properly after Arrow [GH-39466](apache/arrow#39466) is closed. A test is included but skipped for now.~ - ~Date64 will roundtrip properly after Arrow [GH-39456](apache/arrow#39456) is closed. A test is included but skipped for now.~ ### Non-Blocking - Compression codec and level are included in `ingestOptions` but are not configurable using `stmt.SetOption()`. It is trivial to add this, but it would be nice to be able to use the currently internal [CompressionCodecFromString](https://github.com/apache/arrow/blob/e6323646558ee01234ce58af273c5a834745f298/go/parquet/internal/gen-go/parquet/parquet.go#L387-L399) method to automatically pick up support for any other codecs added in the future. Captured in apache#1473. - List and Map types have some issues on ingestion. Snowflake returns `SQL execution internal error` whenever repetition level is greater than 0. Still some more investigation to do here. This is non-blocking because neither type was previously supported for ingestion. - Context cancelation is supported for all goroutines and queries executed as part of ingestion, _except_ for the PUT query (i.e. file uploads). This issue is being tracked in gosnowflake [1028](snowflakedb/gosnowflake#1028). In practice, it likely takes just a few seconds for in-progress uploads to complete and properly conclude cancelation. Once this issue is fixed, the queries would be canceled in Snowflake, allowing the process to exit faster and reduce unnecessary work. - ~The code previously meant to map Snowflake types to Go types is no longer used. It may still be useful for binding an Arrow record to an arbitrary Update query, but `stmt.Prepare` should be implemented first to follow ADBC spec for binding parameters.~
dgreiss
pushed a commit
to dgreiss/arrow
that referenced
this issue
Feb 19, 2024
…TE Logical Type (apache#39460) ### Rationale for this change Closes: apache#39456 ### What changes are included in this PR? Update physical and logical type mapping from Arrow to Parquet for DATE64 type ### Are these changes tested? Yes, - Update expected schema mapping in existing test - Tests asserting new behavior - Arrow DATE64 will roundtrip -> Parquet -> Arrow as DATE32 - Arrow DATE64 _not aligned_ to exact date boundary will truncate to milliseconds at boundary of greatest full day on Parquet roundtrip ### Are there any user-facing changes? Yes, users of `pqarrow.FileWriter` will produce Parquet files containing `DATE` logical type instead of `TIMESTAMP[ms]` when writing Arrow data containing DATE64 field(s). The proposed implementation truncates `int64` values to be divisible by 86400000 rather than validating that this is already the case, as some implementations do. I am happy to add this validation if it would be preferred, but the truncating behavior will likely break fewer existing users. I'm not sure whether this is technically considered a breaking change to a public API and if/how it should be communicated. Any direction regarding this would be appreciated. * Closes: apache#39456 Authored-by: Joel Lubinitsky <[email protected]> Signed-off-by: Matt Topol <[email protected]>
zanmato1984
pushed a commit
to zanmato1984/arrow
that referenced
this issue
Feb 28, 2024
…TE Logical Type (apache#39460) ### Rationale for this change Closes: apache#39456 ### What changes are included in this PR? Update physical and logical type mapping from Arrow to Parquet for DATE64 type ### Are these changes tested? Yes, - Update expected schema mapping in existing test - Tests asserting new behavior - Arrow DATE64 will roundtrip -> Parquet -> Arrow as DATE32 - Arrow DATE64 _not aligned_ to exact date boundary will truncate to milliseconds at boundary of greatest full day on Parquet roundtrip ### Are there any user-facing changes? Yes, users of `pqarrow.FileWriter` will produce Parquet files containing `DATE` logical type instead of `TIMESTAMP[ms]` when writing Arrow data containing DATE64 field(s). The proposed implementation truncates `int64` values to be divisible by 86400000 rather than validating that this is already the case, as some implementations do. I am happy to add this validation if it would be preferred, but the truncating behavior will likely break fewer existing users. I'm not sure whether this is technically considered a breaking change to a public API and if/how it should be communicated. Any direction regarding this would be appreciated. * Closes: apache#39456 Authored-by: Joel Lubinitsky <[email protected]> Signed-off-by: Matt Topol <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Describe the bug, including details regarding any error messages, version, and platform.
The Parquet DATE logical type must annotate an
int32
representing days since the UNIX epoc per the spec. The Arrow DATE64 (ms since UNIX epoch) type does not have a direct analog in Parquet, so it must be coerced into a compatible representation when writing Arrow data to Parquet.The prevailing convention is to coerce DATE64 to
int32
seconds since the UNIX epoch (Parquet DATE logical type) [e.g. C++, Rust]. The behavior for handling anint64
value not on a date boundary (i.e. not divisible by 86400000) is not defined. Some implementations validate this condition while others truncate to the date the physical value falls within.The current Go implementation diverges from the approach followed by these languages, coercing instead to a UTC-normalized TIMESTAMP[ms]. This may lead to surprising behavior in cross-language use-cases and alters the original semantics of the type (at least for non-arrow consumers that don't handle store_schema). It seems that it would increase overall compatibility in the ecosystem to align Go to the convention currently followed in the other implementations.
See also: https://lists.apache.org/thread/q036r1q3cw5ysn3zkpvljx3s9ho18419
Component(s)
Go, Parquet
The text was updated successfully, but these errors were encountered: