Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/datafusion into dev/xinli/fi…
Browse files Browse the repository at this point in the history
…x-join
  • Loading branch information
xinlifoobar committed Jul 23, 2024
2 parents 02c3c3d + 77311a5 commit e8b097b
Show file tree
Hide file tree
Showing 196 changed files with 5,287 additions and 3,631 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.github/ export-ignore
datafusion/core/tests/data/newlines_in_values.csv text eol=lf
datafusion/proto/src/generated/prost.rs linguist-generated
datafusion/proto/src/generated/pbjson.rs linguist-generated
55 changes: 55 additions & 0 deletions .github/workflows/large_files.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: Large files PR check

concurrency:
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
cancel-in-progress: true

on:
pull_request:

jobs:
check-files:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Check size of new Git objects
env:
# 1 MB ought to be enough for anybody.
# TODO in case we may want to consciously commit a bigger file to the repo without using Git LFS we may disable the check e.g. with a label
MAX_FILE_SIZE_BYTES: 1048576
shell: bash
run: |
git rev-list --objects ${{ github.event.pull_request.base.sha }}..${{ github.event.pull_request.head.sha }} \
> pull-request-objects.txt
exit_code=0
while read -r id path; do
# Skip objects which are not files (commits, trees)
if [ ! -z "${path}" ]; then
size="$(git cat-file -s "${id}")"
if [ "${size}" -gt "${MAX_FILE_SIZE_BYTES}" ]; then
exit_code=1
echo "Object ${id} [${path}] has size ${size}, exceeding ${MAX_FILE_SIZE_BYTES} limit." >&2
echo "::error file=${path}::File ${path} has size ${size}, exceeding ${MAX_FILE_SIZE_BYTES} limit."
fi
fi
done < pull-request-objects.txt
exit "${exit_code}"
19 changes: 11 additions & 8 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,12 @@ fn simplify_demo() -> Result<()> {
);

// here are some other examples of what DataFusion is capable of
let schema = Schema::new(vec![
make_field("i", DataType::Int64),
make_field("b", DataType::Boolean),
])
.to_dfschema_ref()?;
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
// i + 1 + 2 => a + 3
// i + 1 + 2 => i + 3
// (note this is not done if the expr is (col("i") + (lit(1) + lit(2))))
assert_eq!(
simplifier.simplify(col("i") + (lit(1) + lit(2)))?,
Expand All @@ -209,7 +205,7 @@ fn simplify_demo() -> Result<()> {
);

// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
// `cast('2020-09-01' as date)` --> 18506 # number of days since epoch 1970-01-01
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
Expand Down
93 changes: 64 additions & 29 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ config_namespace! {
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = false

/// Specifies whether newlines in (quoted) CSV values are supported.
///
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
pub newlines_in_values: bool, default = false
}
}

Expand Down Expand Up @@ -315,121 +325,124 @@ config_namespace! {
}

config_namespace! {
/// Options related to parquet files
/// Options for reading and writing parquet files
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct ParquetOptions {
/// If true, reads the Parquet data page level metadata (the
// The following options affect reading parquet files

/// (reading) If true, reads the Parquet data page level metadata (the
/// Page Index), if present, to reduce the I/O and number of
/// rows decoded.
pub enable_page_index: bool, default = true

/// If true, the parquet reader attempts to skip entire row groups based
/// (reading) If true, the parquet reader attempts to skip entire row groups based
/// on the predicate in the query and the metadata (min/max values) stored in
/// the parquet file
pub pruning: bool, default = true

/// If true, the parquet reader skip the optional embedded metadata that may be in
/// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
/// the file Schema. This setting can help avoid schema conflicts when querying
/// multiple parquet files with schemas containing compatible types but different metadata
pub skip_metadata: bool, default = true

/// If specified, the parquet reader will try and fetch the last `size_hint`
/// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
/// bytes of the parquet file optimistically. If not specified, two reads are required:
/// One read to fetch the 8-byte parquet footer and
/// another to fetch the metadata length encoded in the footer
pub metadata_size_hint: Option<usize>, default = None

/// If true, filter expressions are be applied during the parquet decoding operation to
/// (reading) If true, filter expressions are be applied during the parquet decoding operation to
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
pub pushdown_filters: bool, default = false

/// If true, filter expressions evaluated during the parquet decoding operation
/// (reading) If true, filter expressions evaluated during the parquet decoding operation
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

// The following map to parquet::file::properties::WriterProperties
// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

/// Sets best effort maximum size of data page in bytes
/// (writing) Sets best effort maximum size of data page in bytes
pub data_pagesize_limit: usize, default = 1024 * 1024

/// Sets write_batch_size in bytes
/// (writing) Sets write_batch_size in bytes
pub write_batch_size: usize, default = 1024

/// Sets parquet writer version
/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".into()

/// Sets default parquet compression codec
/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub compression: Option<String>, default = Some("zstd(3)".into())

/// Sets if dictionary encoding is enabled. If NULL, uses
/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
pub dictionary_enabled: Option<bool>, default = None

/// Sets best effort maximum dictionary page size, in bytes
/// (writing) Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024

/// Sets if statistics are enabled for any column
/// (writing) Sets if statistics are enabled for any column
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub statistics_enabled: Option<String>, default = None

/// Sets max statistics size for any column. If NULL, uses
/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
pub max_statistics_size: Option<usize>, default = None

/// Target maximum number of rows in each row group (defaults to 1M
/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
pub max_row_group_size: usize, default = 1024 * 1024

/// Sets "created by" property
/// (writing) Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

/// Sets column index truncate length
/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = None

/// Sets best effort maximum number of rows in data page
/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = usize::MAX

/// Sets default encoding for any column
/// (writing) Sets default encoding for any column.
/// Valid values are: plain, plain_dictionary, rle,
/// bit_packed, delta_binary_packed, delta_length_byte_array,
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub encoding: Option<String>, default = None

/// Use any available bloom filters when reading parquet files
/// (writing) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true

/// Write bloom filters for all columns when creating parquet files
/// (writing) Write bloom filters for all columns when creating parquet files
pub bloom_filter_on_write: bool, default = false

/// Sets bloom filter false positive probability. If NULL, uses
/// (writing) Sets bloom filter false positive probability. If NULL, uses
/// default parquet writer setting
pub bloom_filter_fpp: Option<f64>, default = None

/// Sets bloom filter number of distinct values. If NULL, uses
/// (writing) Sets bloom filter number of distinct values. If NULL, uses
/// default parquet writer setting
pub bloom_filter_ndv: Option<u64>, default = None

/// Controls whether DataFusion will attempt to speed up writing
/// (writing) Controls whether DataFusion will attempt to speed up writing
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = true

/// By default parallel parquet writer is tuned for minimum
/// (writing) By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
/// a performance benefit when writing large parquet files
/// by increasing maximum_parallel_row_group_writers and
Expand All @@ -440,7 +453,7 @@ config_namespace! {
/// data frame.
pub maximum_parallel_row_group_writers: usize, default = 1

/// By default parallel parquet writer is tuned for minimum
/// (writing) By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
/// a performance benefit when writing large parquet files
/// by increasing maximum_parallel_row_group_writers and
Expand All @@ -450,7 +463,6 @@ config_namespace! {
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2

}
}

Expand Down Expand Up @@ -1534,6 +1546,9 @@ macro_rules! config_namespace_with_hashmap {
}

config_namespace_with_hashmap! {
/// Options controlling parquet format for individual columns.
///
/// See [`ParquetOptions`] for more details
pub struct ColumnOptions {
/// Sets if bloom filter is enabled for the column path.
pub bloom_filter_enabled: Option<bool>, default = None
Expand Down Expand Up @@ -1588,6 +1603,14 @@ config_namespace! {
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down Expand Up @@ -1660,6 +1683,18 @@ impl CsvOptions {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use parquet::{
/// Options for writing parquet files
#[derive(Clone, Debug)]
pub struct ParquetWriterOptions {
/// parquet-rs writer properties
pub writer_options: WriterProperties,
}

Expand Down
Loading

0 comments on commit e8b097b

Please sign in to comment.