diff --git a/.env b/.env index 1358aafe824a6..c8c236d5ac44b 100644 --- a/.env +++ b/.env @@ -58,8 +58,8 @@ CUDA=11.2.2 DASK=latest DOTNET=8.0 GCC_VERSION="" -GO=1.21.8 -STATICCHECK=v0.4.7 +GO=1.22.6 +STATICCHECK=v0.5.1 HDFS=3.2.1 JDK=11 KARTOTHEK=latest @@ -71,6 +71,7 @@ NUMBA=latest NUMPY=latest PANDAS=latest PYTHON=3.8 +PYTHON_IMAGE_TAG=3.8 R=4.4 SPARK=master TURBODBC=latest @@ -95,7 +96,7 @@ VCPKG="943c5ef1c8f6b5e6ced092b242c8299caae2ff01" # 2024.04.26 Release # ci/docker/python-wheel-windows-vs2019.dockerfile. # This is a workaround for our CI problem that "archery docker build" doesn't # use pulled built images in dev/tasks/python-wheels/github.windows.yml. -PYTHON_WHEEL_WINDOWS_IMAGE_REVISION=2024-06-18 +PYTHON_WHEEL_WINDOWS_IMAGE_REVISION=2024-08-06 # Use conanio/${CONAN_BASE}:{CONAN_VERSION} for "docker-compose run --rm conan". # See https://github.com/conan-io/conan-docker-tools#readme and diff --git a/.github/workflows/archery.yml b/.github/workflows/archery.yml index b016f7d11b9fa..2c46071010962 100644 --- a/.github/workflows/archery.yml +++ b/.github/workflows/archery.yml @@ -58,7 +58,7 @@ jobs: shell: bash run: git branch $ARCHERY_DEFAULT_BRANCH origin/$ARCHERY_DEFAULT_BRANCH || true - name: Setup Python - uses: actions/setup-python@v5.1.1 + uses: actions/setup-python@v5.2.0 with: python-version: '3.9' - name: Install pygit2 binary wheel diff --git a/.github/workflows/comment_bot.yml b/.github/workflows/comment_bot.yml index 1138c0a02f812..b7af4c5800835 100644 --- a/.github/workflows/comment_bot.yml +++ b/.github/workflows/comment_bot.yml @@ -41,7 +41,7 @@ jobs: # fetch the tags for version number generation fetch-depth: 0 - name: Set up Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.12 - name: Install Archery and Crossbow dependencies diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index a82e1eb76660b..20bcfcb38da69 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -99,7 +99,6 @@ jobs: cat <> "$GITHUB_OUTPUT" { "arch": "arm64v8", - "archery-use-legacy-docker-compose": "1", "clang-tools": "10", "image": "ubuntu-cpp", "llvm": "10", @@ -124,9 +123,6 @@ jobs: include: ${{ fromJson(needs.docker-targets.outputs.targets) }} env: ARCH: ${{ matrix.arch }} - # By default, use `docker compose` because docker-compose v1 is obsolete, - # except where the Docker client version is too old. - ARCHERY_USE_LEGACY_DOCKER_COMPOSE: ${{ matrix.archery-use-legacy-docker-compose || '0' }} ARROW_SIMD_LEVEL: ${{ matrix.simd-level }} CLANG_TOOLS: ${{ matrix.clang-tools }} LLVM: ${{ matrix.llvm }} @@ -147,6 +143,7 @@ jobs: run: | sudo apt update sudo apt install -y --no-install-recommends python3 python3-dev python3-pip + python3 -m pip install -U pip - name: Setup Archery run: python3 -m pip install -e dev/archery[docker] - name: Execute Docker Build @@ -246,7 +243,7 @@ jobs: $(brew --prefix bash)/bin/bash \ ci/scripts/install_minio.sh latest ${ARROW_HOME} - name: Set up Python - uses: actions/setup-python@v5.1.1 + uses: actions/setup-python@v5.2.0 with: python-version: 3.12 - name: Install Google Cloud Storage Testbench @@ -412,12 +409,10 @@ jobs: ARROW_WITH_SNAPPY: ON ARROW_WITH_ZLIB: ON ARROW_WITH_ZSTD: ON - # Don't use preinstalled Boost by empty BOOST_ROOT and - # -DBoost_NO_BOOST_CMAKE=ON + # Don't use preinstalled Boost by empty BOOST_ROOT BOOST_ROOT: "" ARROW_CMAKE_ARGS: >- -DARROW_PACKAGE_PREFIX=/${{ matrix.msystem_lower}} - -DBoost_NO_BOOST_CMAKE=ON -DCMAKE_FIND_PACKAGE_PREFER_CONFIG=ON # We can't use unity build because we don't have enough memory on # GitHub Actions. @@ -467,16 +462,18 @@ jobs: https://dl.min.io/server/minio/release/windows-amd64/archive/minio.RELEASE.2022-05-26T05-48-41Z chmod +x /usr/local/bin/minio.exe - name: Set up Python - uses: actions/setup-python@v5.1.1 + uses: actions/setup-python@v5.2.0 + id: python-install with: python-version: 3.9 - name: Install Google Cloud Storage Testbench - shell: bash + shell: msys2 {0} + env: + PIPX_BIN_DIR: /usr/local/bin + PIPX_BASE_PYTHON: ${{ steps.python-install.outputs.python-path }} run: | ci/scripts/install_gcs_testbench.sh default - echo "PYTHON_BIN_DIR=$(cygpath --windows $(dirname $(which python3.exe)))" >> $GITHUB_ENV - name: Test shell: msys2 {0} run: | - PATH="$(cygpath --unix ${PYTHON_BIN_DIR}):${PATH}" ci/scripts/cpp_test.sh "$(pwd)" "$(pwd)/build" diff --git a/.github/workflows/csharp.yml b/.github/workflows/csharp.yml index 6e8548dc960f4..c618350affbeb 100644 --- a/.github/workflows/csharp.yml +++ b/.github/workflows/csharp.yml @@ -108,7 +108,7 @@ jobs: with: dotnet-version: ${{ matrix.dotnet }} - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.12 - name: Checkout Arrow diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index cc3ff6330746d..1cc8d993498b6 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -45,7 +45,7 @@ jobs: with: fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.12 - name: Install pre-commit @@ -104,7 +104,7 @@ jobs: with: fetch-depth: 0 - name: Install Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: '3.12' - name: Install Ruby diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 25db1c39ad89e..1219f7526f9f2 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -52,7 +52,7 @@ jobs: key: debian-docs-${{ hashFiles('cpp/**') }} restore-keys: debian-docs- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.12 - name: Setup Archery diff --git a/.github/workflows/docs_light.yml b/.github/workflows/docs_light.yml index ea7fe5d02d7b8..454affd7fa7f9 100644 --- a/.github/workflows/docs_light.yml +++ b/.github/workflows/docs_light.yml @@ -58,7 +58,7 @@ jobs: key: conda-docs-${{ hashFiles('cpp/**') }} restore-keys: conda-docs- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.12 - name: Setup Archery diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 20c78d86cb2a3..9b18b010a0cb9 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -62,13 +62,13 @@ jobs: { "arch-label": "AMD64", "arch": "amd64", - "go": "1.21", + "go": "1.22", "runs-on": "ubuntu-latest" }, { "arch-label": "AMD64", "arch": "amd64", - "go": "1.22", + "go": "1.23", "runs-on": "ubuntu-latest" } JSON @@ -78,15 +78,13 @@ jobs: { "arch-label": "ARM64", "arch": "arm64v8", - "archery-use-legacy-docker-compose": "1", - "go": "1.21", + "go": "1.22", "runs-on": ["self-hosted", "arm", "linux"] }, { "arch-label": "ARM64", "arch": "arm64v8", - "archery-use-legacy-docker-compose": "1", - "go": "1.22", + "go": "1.23", "runs-on": ["self-hosted", "arm", "linux"] } JSON @@ -106,9 +104,6 @@ jobs: include: ${{ fromJson(needs.docker-targets.outputs.targets) }} env: ARCH: ${{ matrix.arch }} - # By default, use Docker CLI because docker-compose v1 is obsolete, - # except where the Docker client version is too old. - ARCHERY_USE_LEGACY_DOCKER_COMPOSE: ${{ matrix.archery-use-legacy-docker-compose || '0' }} GO: ${{ matrix.go }} steps: - name: Checkout Arrow @@ -202,7 +197,7 @@ jobs: strategy: fail-fast: false matrix: - go: ['1.21', '1.22'] + go: ['1.22', '1.23'] env: GO: ${{ matrix.go }} steps: @@ -212,7 +207,7 @@ jobs: fetch-depth: 0 submodules: recursive - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -243,7 +238,7 @@ jobs: strategy: fail-fast: false matrix: - go: ['1.21', '1.22'] + go: ['1.22', '1.23'] env: GO: ${{ matrix.go }} steps: @@ -252,7 +247,7 @@ jobs: with: fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -282,7 +277,7 @@ jobs: strategy: fail-fast: false matrix: - go: ['1.21', '1.22'] + go: ['1.22', '1.23'] steps: - name: Checkout Arrow uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0 @@ -315,7 +310,7 @@ jobs: strategy: fail-fast: false matrix: - go: ['1.21', '1.22'] + go: ['1.22', '1.23'] steps: - name: Checkout Arrow uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0 @@ -328,7 +323,7 @@ jobs: go-version: ${{ matrix.go }} cache: true cache-dependency-path: go/go.sum - - name: Install staticcheck + - name: Install staticcheck run: | . .env go install honnef.co/go/tools/cmd/staticcheck@${STATICCHECK} @@ -344,7 +339,7 @@ jobs: github.event_name == 'push' && github.repository == 'apache/arrow' && github.ref_name == 'main' - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: '3.10' - name: Run Benchmarks @@ -373,7 +368,7 @@ jobs: strategy: fail-fast: false matrix: - go: ['1.21', '1.22'] + go: ['1.22', '1.23'] env: ARROW_GO_TESTCGO: "1" steps: @@ -444,7 +439,7 @@ jobs: ci/scripts/msys2_setup.sh cgo - name: Get required Go version run: | - (. .env && echo "GO_VERSION=${GO}") >> $GITHUB_ENV + (. .env && echo "GO_VERSION=${GO}") >> $GITHUB_ENV - name: Update CGO Env vars shell: msys2 {0} run: | diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 43f8af0a600d8..3a6b568c5207f 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -89,7 +89,7 @@ jobs: key: conda-${{ hashFiles('cpp/**') }} restore-keys: conda- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 0317879b580ba..8560f0dd1cbe9 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -76,7 +76,7 @@ jobs: key: maven-${{ hashFiles('java/**') }} restore-keys: maven- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery diff --git a/.github/workflows/java_jni.yml b/.github/workflows/java_jni.yml index c2bc679e681a2..f204d6459ae01 100644 --- a/.github/workflows/java_jni.yml +++ b/.github/workflows/java_jni.yml @@ -70,7 +70,7 @@ jobs: key: java-jni-manylinux-2014-${{ hashFiles('cpp/**', 'java/**') }} restore-keys: java-jni-manylinux-2014- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -110,7 +110,7 @@ jobs: key: maven-${{ hashFiles('java/**') }} restore-keys: maven- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery diff --git a/.github/workflows/java_nightly.yml b/.github/workflows/java_nightly.yml index 72afb6dbf1c1d..0bf0c27288faf 100644 --- a/.github/workflows/java_nightly.yml +++ b/.github/workflows/java_nightly.yml @@ -58,7 +58,7 @@ jobs: repository: ursacomputing/crossbow ref: main - name: Set up Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: cache: 'pip' python-version: 3.12 diff --git a/.github/workflows/js.yml b/.github/workflows/js.yml index 630bef61105f6..4ab9831924fb1 100644 --- a/.github/workflows/js.yml +++ b/.github/workflows/js.yml @@ -54,7 +54,7 @@ jobs: with: fetch-depth: 0 - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery diff --git a/.github/workflows/pr_bot.yml b/.github/workflows/pr_bot.yml index 7dd06b6aeec09..bbb1a2d7228d0 100644 --- a/.github/workflows/pr_bot.yml +++ b/.github/workflows/pr_bot.yml @@ -82,7 +82,7 @@ jobs: # fetch the tags for version number generation fetch-depth: 0 - name: Set up Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.12 - name: Install Archery and Crossbow dependencies diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 916db2580e371..b88ea7ce4f1ee 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -59,6 +59,7 @@ jobs: - conda-python-3.9-nopandas - conda-python-3.8-pandas-1.0 - conda-python-3.10-pandas-latest + - conda-python-3.10-no-numpy include: - name: conda-python-docs cache: conda-python-3.9 @@ -83,6 +84,11 @@ jobs: title: AMD64 Conda Python 3.10 Pandas latest python: "3.10" pandas: latest + - name: conda-python-3.10-no-numpy + cache: conda-python-3.10 + image: conda-python-no-numpy + title: AMD64 Conda Python 3.10 without NumPy + python: "3.10" env: PYTHON: ${{ matrix.python || 3.8 }} UBUNTU: ${{ matrix.ubuntu || 20.04 }} @@ -101,7 +107,7 @@ jobs: key: ${{ matrix.cache }}-${{ hashFiles('cpp/**') }} restore-keys: ${{ matrix.cache }}- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -163,7 +169,7 @@ jobs: ARROW_BUILD_TESTS: OFF PYARROW_TEST_LARGE_MEMORY: ON # Current oldest supported version according to https://endoflife.date/macos - MACOSX_DEPLOYMENT_TARGET: 10.15 + MACOSX_DEPLOYMENT_TARGET: 12.0 steps: - name: Checkout Arrow uses: actions/checkout@v4 @@ -171,7 +177,7 @@ jobs: fetch-depth: 0 submodules: recursive - name: Setup Python - uses: actions/setup-python@v5.1.1 + uses: actions/setup-python@v5.2.0 with: python-version: '3.11' - name: Install Dependencies diff --git a/.github/workflows/r.yml b/.github/workflows/r.yml index bf7eb99e7e990..21afa4586b5a4 100644 --- a/.github/workflows/r.yml +++ b/.github/workflows/r.yml @@ -86,19 +86,18 @@ jobs: run: | sudo apt-get install devscripts - # replace the SHA with v2 once INFRA-26031 is resolved - - uses: r-lib/actions/setup-r@732fb28088814627972f1ccbacc02561178cf391 + - uses: r-lib/actions/setup-r@v2 with: use-public-rspm: true install-r: false - - uses: r-lib/actions/setup-r-dependencies@732fb28088814627972f1ccbacc02561178cf391 + - uses: r-lib/actions/setup-r-dependencies@v2 with: extra-packages: any::rcmdcheck needs: check working-directory: src/r - - uses: r-lib/actions/check-r-package@732fb28088814627972f1ccbacc02561178cf391 + - uses: r-lib/actions/check-r-package@v2 with: working-directory: src/r env: @@ -147,7 +146,7 @@ jobs: ubuntu-${{ matrix.ubuntu }}-r-${{ matrix.r }}-${{ hashFiles('cpp/src/**/*.cc','cpp/src/**/*.h)') }}- ubuntu-${{ matrix.ubuntu }}-r-${{ matrix.r }}- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -207,7 +206,7 @@ jobs: fetch-depth: 0 submodules: recursive - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -341,11 +340,11 @@ jobs: cd r/windows ls *.zip | xargs -n 1 unzip -uo rm -rf *.zip - - uses: r-lib/actions/setup-r@732fb28088814627972f1ccbacc02561178cf391 + - uses: r-lib/actions/setup-r@v2 with: r-version: ${{ matrix.config.rversion }} Ncpus: 2 - - uses: r-lib/actions/setup-r-dependencies@732fb28088814627972f1ccbacc02561178cf391 + - uses: r-lib/actions/setup-r-dependencies@v2 env: GITHUB_PAT: "${{ github.token }}" with: diff --git a/.github/workflows/r_nightly.yml b/.github/workflows/r_nightly.yml index 1ec071b6bbb5e..9817e41d3b61d 100644 --- a/.github/workflows/r_nightly.yml +++ b/.github/workflows/r_nightly.yml @@ -60,7 +60,7 @@ jobs: repository: ursacomputing/crossbow ref: main - name: Set up Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: cache: 'pip' python-version: 3.12 diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml index e4d650e74a8ad..228bacb77e58a 100644 --- a/.github/workflows/ruby.yml +++ b/.github/workflows/ruby.yml @@ -83,7 +83,7 @@ jobs: key: ubuntu-${{ matrix.ubuntu }}-ruby-${{ hashFiles('cpp/**') }} restore-keys: ubuntu-${{ matrix.ubuntu }}-ruby- - name: Setup Python - uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1 + uses: actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3 # v5.2.0 with: python-version: 3.8 - name: Setup Archery @@ -406,7 +406,10 @@ jobs: -source "https://nuget.pkg.github.com/$GITHUB_REPOSITORY_OWNER/index.json" - name: Build C++ vcpkg dependencies run: | - vcpkg\vcpkg.exe install --triplet $env:VCPKG_TRIPLET --x-manifest-root cpp --x-install-root build\cpp\vcpkg_installed + vcpkg\vcpkg.exe install ` + --triplet $env:VCPKG_TRIPLET ` + --x-manifest-root cpp ` + --x-install-root build\cpp\vcpkg_installed - name: Build C++ shell: cmd run: | diff --git a/appveyor.yml b/appveyor.yml index 5954251d34733..9e4582f1d8d7f 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -24,6 +24,7 @@ only_commits: - appveyor.yml - ci/appveyor* - ci/conda* + - ci/scripts/*.bat - cpp/ - format/ - python/ diff --git a/c_glib/arrow-flight-glib/client.cpp b/c_glib/arrow-flight-glib/client.cpp index 80c47e336f872..23f59c9da69ad 100644 --- a/c_glib/arrow-flight-glib/client.cpp +++ b/c_glib/arrow-flight-glib/client.cpp @@ -33,10 +33,19 @@ G_BEGIN_DECLS * #GAFlightStreamReader is a class for reading record batches from a * server. * + * #GAFlightStreamWriter is a class for writing record batches to a + * server. + * + * #GAFlightMetadataReader is a class for reading metadata from a + * server. + * * #GAFlightCallOptions is a class for options of each call. * * #GAFlightClientOptions is a class for options of each client. * + * #GAFlightDoPutResult is a class that has gaflight_client_do_put() + * result. + * * #GAFlightClient is a class for Apache Arrow Flight client. * * Since: 5.0.0 @@ -56,6 +65,128 @@ gaflight_stream_reader_class_init(GAFlightStreamReaderClass *klass) { } +G_DEFINE_TYPE(GAFlightStreamWriter, + gaflight_stream_writer, + GAFLIGHT_TYPE_RECORD_BATCH_WRITER) + +static void +gaflight_stream_writer_init(GAFlightStreamWriter *object) +{ +} + +static void +gaflight_stream_writer_class_init(GAFlightStreamWriterClass *klass) +{ +} + +/** + * gaflight_stream_writer_done_writing: + * @writer: A #GAFlightStreamWriter. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +gaflight_stream_writer_done_writing(GAFlightStreamWriter *writer, GError **error) +{ + auto flight_writer = std::static_pointer_cast( + garrow_record_batch_writer_get_raw(GARROW_RECORD_BATCH_WRITER(writer))); + return garrow::check(error, + flight_writer->DoneWriting(), + "[flight-stream-writer][done-writing]"); +} + +struct GAFlightMetadataReaderPrivate +{ + arrow::flight::FlightMetadataReader *reader; +}; + +enum { + PROP_METADATA_READER_READER = 1, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightMetadataReader, + gaflight_metadata_reader, + G_TYPE_OBJECT) + +#define GAFLIGHT_METADATA_READER_GET_PRIVATE(object) \ + static_cast( \ + gaflight_metadata_reader_get_instance_private(GAFLIGHT_METADATA_READER(object))) + +static void +gaflight_metadata_reader_finalize(GObject *object) +{ + auto priv = GAFLIGHT_METADATA_READER_GET_PRIVATE(object); + delete priv->reader; + G_OBJECT_CLASS(gaflight_metadata_reader_parent_class)->finalize(object); +} + +static void +gaflight_metadata_reader_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHT_METADATA_READER_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_METADATA_READER_READER: + priv->reader = + static_cast(g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflight_metadata_reader_init(GAFlightMetadataReader *object) +{ +} + +static void +gaflight_metadata_reader_class_init(GAFlightMetadataReaderClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->finalize = gaflight_metadata_reader_finalize; + gobject_class->set_property = gaflight_metadata_reader_set_property; + + GParamSpec *spec; + spec = g_param_spec_pointer( + "reader", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_METADATA_READER_READER, spec); +} + +/** + * gaflight_metadata_reader_read: + * @reader: A #GAFlightMetadataReader. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Returns: (transfer full): The metadata on success, %NULL on error. + * + * Since: 18.0.0 + */ +GArrowBuffer * +gaflight_metadata_reader_read(GAFlightMetadataReader *reader, GError **error) +{ + auto flight_reader = gaflight_metadata_reader_get_raw(reader); + std::shared_ptr metadata; + if (garrow::check(error, + flight_reader->ReadMetadata(&metadata), + "[flight-metadata-reader][read]")) { + return garrow_buffer_new_raw(&metadata); + } else { + return nullptr; + } +} + typedef struct GAFlightCallOptionsPrivate_ { arrow::flight::FlightCallOptions options; @@ -385,6 +516,137 @@ gaflight_client_options_new(void) g_object_new(GAFLIGHT_TYPE_CLIENT_OPTIONS, NULL)); } +struct GAFlightDoPutResultPrivate +{ + GAFlightStreamWriter *writer; + GAFlightMetadataReader *reader; +}; + +enum { + PROP_DO_PUT_RESULT_RESULT = 1, + PROP_DO_PUT_RESULT_WRITER, + PROP_DO_PUT_RESULT_READER, +}; + +G_DEFINE_TYPE_WITH_PRIVATE(GAFlightDoPutResult, gaflight_do_put_result, G_TYPE_OBJECT) + +#define GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object) \ + static_cast( \ + gaflight_do_put_result_get_instance_private(GAFLIGHT_DO_PUT_RESULT(object))) + +static void +gaflight_do_put_result_dispose(GObject *object) +{ + auto priv = GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object); + + if (priv->writer) { + g_object_unref(priv->writer); + priv->writer = nullptr; + } + + if (priv->reader) { + g_object_unref(priv->reader); + priv->reader = nullptr; + } + + G_OBJECT_CLASS(gaflight_do_put_result_parent_class)->dispose(object); +} + +static void +gaflight_do_put_result_init(GAFlightDoPutResult *object) +{ +} + +static void +gaflight_do_put_result_set_property(GObject *object, + guint prop_id, + const GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_DO_PUT_RESULT_RESULT: + { + auto result = static_cast( + g_value_get_pointer(value)); + priv->writer = gaflight_stream_writer_new_raw(result->writer.release()); + priv->reader = gaflight_metadata_reader_new_raw(result->reader.release()); + break; + } + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflight_do_put_result_get_property(GObject *object, + guint prop_id, + GValue *value, + GParamSpec *pspec) +{ + auto priv = GAFLIGHT_DO_PUT_RESULT_GET_PRIVATE(object); + + switch (prop_id) { + case PROP_DO_PUT_RESULT_WRITER: + g_value_set_object(value, priv->writer); + break; + case PROP_DO_PUT_RESULT_READER: + g_value_set_object(value, priv->reader); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); + break; + } +} + +static void +gaflight_do_put_result_class_init(GAFlightDoPutResultClass *klass) +{ + auto gobject_class = G_OBJECT_CLASS(klass); + + gobject_class->dispose = gaflight_do_put_result_dispose; + gobject_class->set_property = gaflight_do_put_result_set_property; + gobject_class->get_property = gaflight_do_put_result_get_property; + + GParamSpec *spec; + spec = g_param_spec_pointer( + "result", + nullptr, + nullptr, + static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(gobject_class, PROP_DO_PUT_RESULT_RESULT, spec); + + /** + * GAFlightDoPutResult:writer: + * + * A writer to write record batches to. + * + * Since: 18.0.0 + */ + spec = g_param_spec_object("writer", + nullptr, + nullptr, + GAFLIGHT_TYPE_STREAM_WRITER, + static_cast(G_PARAM_READABLE)); + g_object_class_install_property(gobject_class, PROP_DO_PUT_RESULT_WRITER, spec); + + /** + * GAFlightDoPutResult:reader: + * + * A reader for application metadata from the server. + * + * Since: 18.0.0 + */ + spec = g_param_spec_object("reader", + nullptr, + nullptr, + GAFLIGHT_TYPE_METADATA_READER, + static_cast(G_PARAM_READABLE)); + g_object_class_install_property(gobject_class, PROP_DO_PUT_RESULT_READER, spec); +} + struct GAFlightClientPrivate { std::shared_ptr client; @@ -661,6 +923,51 @@ gaflight_client_do_get(GAFlightClient *client, return gaflight_stream_reader_new_raw(flight_reader.release(), TRUE); } +/** + * gaflight_client_do_put: + * @client: A #GAFlightClient. + * @descriptor: A #GAFlightDescriptor. + * @schema: A #GArrowSchema. + * @options: (nullable): A #GAFlightCallOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Upload data to a Flight described by the given descriptor. The + * caller must call garrow_record_batch_writer_close() on the + * returned stream once they are done writing. + * + * The reader and writer are linked; closing the writer will also + * close the reader. Use garrow_flight_stream_writer_done_writing() to + * only close the write side of the channel. + * + * Returns: (nullable) (transfer full): + * The #GAFlighDoPutResult holding a reader and a writer on success, + * %NULL on error. + * + * Since: 18.0.0 + */ +GAFlightDoPutResult * +gaflight_client_do_put(GAFlightClient *client, + GAFlightDescriptor *descriptor, + GArrowSchema *schema, + GAFlightCallOptions *options, + GError **error) +{ + auto flight_client = gaflight_client_get_raw(client); + auto flight_descriptor = gaflight_descriptor_get_raw(descriptor); + auto arrow_schema = garrow_schema_get_raw(schema); + arrow::flight::FlightCallOptions flight_default_options; + auto flight_options = &flight_default_options; + if (options) { + flight_options = gaflight_call_options_get_raw(options); + } + auto result = flight_client->DoPut(*flight_options, *flight_descriptor, arrow_schema); + if (!garrow::check(error, result, "[flight-client][do-put]")) { + return nullptr; + } + auto flight_result = std::move(*result); + return gaflight_do_put_result_new_raw(&flight_result); +} + G_END_DECLS GAFlightStreamReader * @@ -672,7 +979,28 @@ gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader, flight_reader, "is-owner", is_owner, - NULL)); + nullptr)); +} + +GAFlightStreamWriter * +gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer) +{ + return GAFLIGHT_STREAM_WRITER( + g_object_new(GAFLIGHT_TYPE_STREAM_WRITER, "writer", flight_writer, nullptr)); +} + +GAFlightMetadataReader * +gaflight_metadata_reader_new_raw(arrow::flight::FlightMetadataReader *flight_reader) +{ + return GAFLIGHT_METADATA_READER( + g_object_new(GAFLIGHT_TYPE_METADATA_READER, "reader", flight_reader, nullptr)); +} + +arrow::flight::FlightMetadataReader * +gaflight_metadata_reader_get_raw(GAFlightMetadataReader *reader) +{ + auto priv = GAFLIGHT_METADATA_READER_GET_PRIVATE(reader); + return priv->reader; } arrow::flight::FlightCallOptions * @@ -689,6 +1017,13 @@ gaflight_client_options_get_raw(GAFlightClientOptions *options) return &(priv->options); } +GAFlightDoPutResult * +gaflight_do_put_result_new_raw(arrow::flight::FlightClient::DoPutResult *flight_result) +{ + return GAFLIGHT_DO_PUT_RESULT( + g_object_new(GAFLIGHT_TYPE_DO_PUT_RESULT, "result", flight_result, nullptr)); +} + std::shared_ptr gaflight_client_get_raw(GAFlightClient *client) { diff --git a/c_glib/arrow-flight-glib/client.h b/c_glib/arrow-flight-glib/client.h index a91bbe55e3c04..12c5a06b810e1 100644 --- a/c_glib/arrow-flight-glib/client.h +++ b/c_glib/arrow-flight-glib/client.h @@ -35,6 +35,35 @@ struct _GAFlightStreamReaderClass GAFlightRecordBatchReaderClass parent_class; }; +#define GAFLIGHT_TYPE_STREAM_WRITER (gaflight_stream_writer_get_type()) +GAFLIGHT_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE(GAFlightStreamWriter, + gaflight_stream_writer, + GAFLIGHT, + STREAM_WRITER, + GAFlightRecordBatchWriter) +struct _GAFlightStreamWriterClass +{ + GAFlightRecordBatchWriterClass parent_class; +}; + +GAFLIGHT_AVAILABLE_IN_18_0 +gboolean +gaflight_stream_writer_done_writing(GAFlightStreamWriter *writer, GError **error); + +#define GAFLIGHT_TYPE_METADATA_READER (gaflight_metadata_reader_get_type()) +GAFLIGHT_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE( + GAFlightMetadataReader, gaflight_metadata_reader, GAFLIGHT, METADATA_READER, GObject) +struct _GAFlightMetadataReaderClass +{ + GObjectClass parent_class; +}; + +GAFLIGHT_AVAILABLE_IN_18_0 +GArrowBuffer * +gaflight_metadata_reader_read(GAFlightMetadataReader *reader, GError **error); + #define GAFLIGHT_TYPE_CALL_OPTIONS (gaflight_call_options_get_type()) GAFLIGHT_AVAILABLE_IN_5_0 G_DECLARE_DERIVABLE_TYPE( @@ -75,6 +104,15 @@ GAFLIGHT_AVAILABLE_IN_5_0 GAFlightClientOptions * gaflight_client_options_new(void); +#define GAFLIGHT_TYPE_DO_PUT_RESULT (gaflight_do_put_result_get_type()) +GAFLIGHT_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE( + GAFlightDoPutResult, gaflight_do_put_result, GAFLIGHT, DO_PUT_RESULT, GObject) +struct _GAFlightDoPutResultClass +{ + GObjectClass parent_class; +}; + #define GAFLIGHT_TYPE_CLIENT (gaflight_client_get_type()) GAFLIGHT_AVAILABLE_IN_5_0 G_DECLARE_DERIVABLE_TYPE(GAFlightClient, gaflight_client, GAFLIGHT, CLIENT, GObject) @@ -124,4 +162,12 @@ gaflight_client_do_get(GAFlightClient *client, GAFlightCallOptions *options, GError **error); +GAFLIGHT_AVAILABLE_IN_18_0 +GAFlightDoPutResult * +gaflight_client_do_put(GAFlightClient *client, + GAFlightDescriptor *descriptor, + GArrowSchema *schema, + GAFlightCallOptions *options, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-flight-glib/client.hpp b/c_glib/arrow-flight-glib/client.hpp index 185a28e6dc4bd..888f87ecb5732 100644 --- a/c_glib/arrow-flight-glib/client.hpp +++ b/c_glib/arrow-flight-glib/client.hpp @@ -28,6 +28,18 @@ GAFlightStreamReader * gaflight_stream_reader_new_raw(arrow::flight::FlightStreamReader *flight_reader, gboolean is_owner); +GAFLIGHT_EXTERN +GAFlightStreamWriter * +gaflight_stream_writer_new_raw(arrow::flight::FlightStreamWriter *flight_writer); + +GAFLIGHT_EXTERN +GAFlightMetadataReader * +gaflight_metadata_reader_new_raw(arrow::flight::FlightMetadataReader *flight_reader); + +GAFLIGHT_EXTERN +arrow::flight::FlightMetadataReader * +gaflight_metadata_reader_get_raw(GAFlightMetadataReader *reader); + GAFLIGHT_EXTERN arrow::flight::FlightCallOptions * gaflight_call_options_get_raw(GAFlightCallOptions *options); @@ -36,6 +48,10 @@ GAFLIGHT_EXTERN arrow::flight::FlightClientOptions * gaflight_client_options_get_raw(GAFlightClientOptions *options); +GAFLIGHT_EXTERN +GAFlightDoPutResult * +gaflight_do_put_result_new_raw(arrow::flight::FlightClient::DoPutResult *flight_result); + GAFLIGHT_EXTERN std::shared_ptr gaflight_client_get_raw(GAFlightClient *client); diff --git a/c_glib/arrow-flight-glib/common.cpp b/c_glib/arrow-flight-glib/common.cpp index efc544f10cf66..3deaf67cc14e8 100644 --- a/c_glib/arrow-flight-glib/common.cpp +++ b/c_glib/arrow-flight-glib/common.cpp @@ -48,7 +48,11 @@ G_BEGIN_DECLS * * #GAFlightStreamChunk is a class for a chunk in stream. * - * #GAFlightRecordBatchReader is a class for reading record batches. + * #GAFlightRecordBatchReader is an abstract class for reading record + * batches with metadata. + * + * #GAFlightRecordBatchWeriter is an abstract class for + * writing record batches with metadata. * * Since: 5.0.0 */ @@ -1172,13 +1176,13 @@ typedef struct GAFlightRecordBatchReaderPrivate_ } GAFlightRecordBatchReaderPrivate; enum { - PROP_READER = 1, - PROP_IS_OWNER, + PROP_RECORD_BATCH_READER_READER = 1, + PROP_RECORD_BATCH_READER_IS_OWNER, }; -G_DEFINE_TYPE_WITH_PRIVATE(GAFlightRecordBatchReader, - gaflight_record_batch_reader, - G_TYPE_OBJECT) +G_DEFINE_ABSTRACT_TYPE_WITH_PRIVATE(GAFlightRecordBatchReader, + gaflight_record_batch_reader, + G_TYPE_OBJECT) #define GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(obj) \ static_cast( \ @@ -1192,7 +1196,7 @@ gaflight_record_batch_reader_finalize(GObject *object) if (priv->is_owner) { delete priv->reader; } - G_OBJECT_CLASS(gaflight_info_parent_class)->finalize(object); + G_OBJECT_CLASS(gaflight_record_batch_reader_parent_class)->finalize(object); } static void @@ -1204,11 +1208,11 @@ gaflight_record_batch_reader_set_property(GObject *object, auto priv = GAFLIGHT_RECORD_BATCH_READER_GET_PRIVATE(object); switch (prop_id) { - case PROP_READER: + case PROP_RECORD_BATCH_READER_READER: priv->reader = static_cast(g_value_get_pointer(value)); break; - case PROP_IS_OWNER: + case PROP_RECORD_BATCH_READER_IS_OWNER: priv->is_owner = g_value_get_boolean(value); break; default: @@ -1236,7 +1240,7 @@ gaflight_record_batch_reader_class_init(GAFlightRecordBatchReaderClass *klass) nullptr, nullptr, static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_READER, spec); + g_object_class_install_property(gobject_class, PROP_RECORD_BATCH_READER_READER, spec); spec = g_param_spec_boolean( "is-owner", @@ -1244,7 +1248,7 @@ gaflight_record_batch_reader_class_init(GAFlightRecordBatchReaderClass *klass) nullptr, TRUE, static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_IS_OWNER, spec); + g_object_class_install_property(gobject_class, PROP_RECORD_BATCH_READER_IS_OWNER, spec); } /** @@ -1296,6 +1300,108 @@ gaflight_record_batch_reader_read_all(GAFlightRecordBatchReader *reader, GError } } +G_DEFINE_ABSTRACT_TYPE(GAFlightRecordBatchWriter, + gaflight_record_batch_writer, + GARROW_TYPE_RECORD_BATCH_WRITER) + +static void +gaflight_record_batch_writer_init(GAFlightRecordBatchWriter *object) +{ +} + +static void +gaflight_record_batch_writer_class_init(GAFlightRecordBatchWriterClass *klass) +{ +} + +/** + * gaflight_record_batch_writer_begin: + * @writer: A #GAFlightRecordBatchWriter. + * @schema: A #GArrowSchema. + * @options: (nullable): A #GArrowWriteOptions. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Begins writing data with the given schema. Only used with + * `DoExchange`. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +gaflight_record_batch_writer_begin(GAFlightRecordBatchWriter *writer, + GArrowSchema *schema, + GArrowWriteOptions *options, + GError **error) +{ + auto flight_writer = std::static_pointer_cast( + garrow_record_batch_writer_get_raw(GARROW_RECORD_BATCH_WRITER(writer))); + auto arrow_schema = garrow_schema_get_raw(schema); + arrow::ipc::IpcWriteOptions arrow_write_options; + if (options) { + arrow_write_options = *garrow_write_options_get_raw(options); + } else { + arrow_write_options = arrow::ipc::IpcWriteOptions::Defaults(); + } + return garrow::check(error, + flight_writer->Begin(arrow_schema, arrow_write_options), + "[flight-record-batch-writer][begin]"); +} + +/** + * gaflight_record_batch_writer_write_metadata: + * @writer: A #GAFlightRecordBatchWriter. + * @metadata: A #GArrowBuffer. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Write metadata. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +gaflight_record_batch_writer_write_metadata(GAFlightRecordBatchWriter *writer, + GArrowBuffer *metadata, + GError **error) +{ + auto flight_writer = std::static_pointer_cast( + garrow_record_batch_writer_get_raw(GARROW_RECORD_BATCH_WRITER(writer))); + auto arrow_metadata = garrow_buffer_get_raw(metadata); + return garrow::check(error, + flight_writer->WriteMetadata(arrow_metadata), + "[flight-record-batch-writer][write-metadata]"); +} + +/** + * gaflight_record_batch_writer_write_record_batch: + * @writer: A #GAFlightRecordBatchWriter. + * @record_batch: A #GArrowRecordBatch. + * @metadata: (nullable): A #GArrowBuffer. + * @error: (nullable): Return location for a #GError or %NULL. + * + * Write a record batch with metadata. + * + * Returns: %TRUE on success, %FALSE on error. + * + * Since: 18.0.0 + */ +gboolean +gaflight_record_batch_writer_write_record_batch(GAFlightRecordBatchWriter *writer, + GArrowRecordBatch *record_batch, + GArrowBuffer *metadata, + GError **error) +{ + auto flight_writer = std::static_pointer_cast( + garrow_record_batch_writer_get_raw(GARROW_RECORD_BATCH_WRITER(writer))); + auto arrow_record_batch = garrow_record_batch_get_raw(record_batch); + auto arrow_metadata = garrow_buffer_get_raw(metadata); + return garrow::check( + error, + flight_writer->WriteWithMetadata(*arrow_record_batch, arrow_metadata), + "[flight-record-batch-writer][write]"); +} + G_END_DECLS GAFlightCriteria * diff --git a/c_glib/arrow-flight-glib/common.h b/c_glib/arrow-flight-glib/common.h index b1d89f79c357e..726132fe4921b 100644 --- a/c_glib/arrow-flight-glib/common.h +++ b/c_glib/arrow-flight-glib/common.h @@ -232,4 +232,36 @@ GAFLIGHT_AVAILABLE_IN_6_0 GArrowTable * gaflight_record_batch_reader_read_all(GAFlightRecordBatchReader *reader, GError **error); +#define GAFLIGHT_TYPE_RECORD_BATCH_WRITER (gaflight_record_batch_writer_get_type()) +GAFLIGHT_AVAILABLE_IN_18_0 +G_DECLARE_DERIVABLE_TYPE(GAFlightRecordBatchWriter, + gaflight_record_batch_writer, + GAFLIGHT, + RECORD_BATCH_WRITER, + GArrowRecordBatchWriter) +struct _GAFlightRecordBatchWriterClass +{ + GArrowRecordBatchWriterClass parent_class; +}; + +GAFLIGHT_AVAILABLE_IN_18_0 +gboolean +gaflight_record_batch_writer_begin(GAFlightRecordBatchWriter *writer, + GArrowSchema *schema, + GArrowWriteOptions *options, + GError **error); + +GAFLIGHT_AVAILABLE_IN_18_0 +gboolean +gaflight_record_batch_writer_write_metadata(GAFlightRecordBatchWriter *writer, + GArrowBuffer *metadata, + GError **error); + +GAFLIGHT_AVAILABLE_IN_18_0 +gboolean +gaflight_record_batch_writer_write_record_batch(GAFlightRecordBatchWriter *writer, + GArrowRecordBatch *record_batch, + GArrowBuffer *metadata, + GError **error); + G_END_DECLS diff --git a/c_glib/arrow-flight-glib/common.hpp b/c_glib/arrow-flight-glib/common.hpp index db56fff579baf..ae5a7703397dd 100644 --- a/c_glib/arrow-flight-glib/common.hpp +++ b/c_glib/arrow-flight-glib/common.hpp @@ -79,3 +79,7 @@ gaflight_stream_chunk_get_raw(GAFlightStreamChunk *chunk); GAFLIGHT_EXTERN arrow::flight::MetadataRecordBatchReader * gaflight_record_batch_reader_get_raw(GAFlightRecordBatchReader *reader); + +GAFLIGHT_EXTERN +arrow::flight::MetadataRecordBatchWriter * +gaflight_record_batch_writer_get_raw(GAFlightRecordBatchWriter *writer); diff --git a/c_glib/arrow-glib/writer.hpp b/c_glib/arrow-glib/writer.hpp index aa87ffe77d79b..1d85ac52f88d1 100644 --- a/c_glib/arrow-glib/writer.hpp +++ b/c_glib/arrow-glib/writer.hpp @@ -25,16 +25,20 @@ #include +GARROW_AVAILABLE_IN_ALL GArrowRecordBatchWriter * garrow_record_batch_writer_new_raw( std::shared_ptr *arrow_writer); +GARROW_AVAILABLE_IN_ALL std::shared_ptr garrow_record_batch_writer_get_raw(GArrowRecordBatchWriter *writer); +GARROW_AVAILABLE_IN_ALL GArrowRecordBatchStreamWriter * garrow_record_batch_stream_writer_new_raw( std::shared_ptr *arrow_writer); +GARROW_AVAILABLE_IN_ALL GArrowRecordBatchFileWriter * garrow_record_batch_file_writer_new_raw( std::shared_ptr *arrow_writer); diff --git a/ci/appveyor-cpp-build.bat b/ci/appveyor-cpp-build.bat index f688fbb63a9ad..08a052e82f24d 100644 --- a/ci/appveyor-cpp-build.bat +++ b/ci/appveyor-cpp-build.bat @@ -46,7 +46,9 @@ set ARROW_CMAKE_ARGS=-DARROW_DEPENDENCY_SOURCE=CONDA -DARROW_WITH_BZ2=ON set ARROW_CXXFLAGS=/WX /MP @rem Install GCS testbench +set PIPX_BIN_DIR=C:\Windows\ call %CD%\ci\scripts\install_gcs_testbench.bat +storage-testbench -h || exit /B @rem @rem Build and test Arrow C++ libraries (including Parquet) diff --git a/ci/docker/conda-cpp.dockerfile b/ci/docker/conda-cpp.dockerfile index dff1f2224809a..f0084894e19dc 100644 --- a/ci/docker/conda-cpp.dockerfile +++ b/ci/docker/conda-cpp.dockerfile @@ -42,17 +42,19 @@ RUN mamba install -q -y \ valgrind && \ mamba clean --all +# We want to install the GCS testbench using the Conda base environment's Python, +# because the test environment's Python may later change. +ENV PIPX_BASE_PYTHON=/opt/conda/bin/python3 +COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts +RUN /arrow/ci/scripts/install_gcs_testbench.sh default + # Ensure npm, node and azurite are on path. npm and node are required to install azurite, which will then need to -# be on the path for the tests to run. +# be on the path for the tests to run. ENV PATH=/opt/conda/envs/arrow/bin:$PATH COPY ci/scripts/install_azurite.sh /arrow/ci/scripts/ RUN /arrow/ci/scripts/install_azurite.sh -# We want to install the GCS testbench using the same Python binary that the Conda code will use. -COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts -RUN /arrow/ci/scripts/install_gcs_testbench.sh default - COPY ci/scripts/install_sccache.sh /arrow/ci/scripts/ RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin diff --git a/ci/docker/conda-integration.dockerfile b/ci/docker/conda-integration.dockerfile index c602490d6b729..7ad2e5c0e8008 100644 --- a/ci/docker/conda-integration.dockerfile +++ b/ci/docker/conda-integration.dockerfile @@ -24,7 +24,7 @@ ARG maven=3.8.7 ARG node=16 ARG yarn=1.22 ARG jdk=11 -ARG go=1.21.8 +ARG go=1.22.6 # Install Archery and integration dependencies COPY ci/conda_env_archery.txt /arrow/ci/ diff --git a/ci/docker/conda-python.dockerfile b/ci/docker/conda-python.dockerfile index 027fd589cecca..7e8dbe76f6248 100644 --- a/ci/docker/conda-python.dockerfile +++ b/ci/docker/conda-python.dockerfile @@ -32,11 +32,6 @@ RUN mamba install -q -y \ nomkl && \ mamba clean --all -# XXX The GCS testbench was already installed in conda-cpp.dockerfile, -# but we changed the installed Python version above, so we need to reinstall it. -COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts -RUN /arrow/ci/scripts/install_gcs_testbench.sh default - ENV ARROW_ACERO=ON \ ARROW_BUILD_STATIC=OFF \ ARROW_BUILD_TESTS=OFF \ diff --git a/ci/docker/debian-12-go.dockerfile b/ci/docker/debian-12-go.dockerfile index c958e6bdee211..4bc683c109eb8 100644 --- a/ci/docker/debian-12-go.dockerfile +++ b/ci/docker/debian-12-go.dockerfile @@ -16,8 +16,8 @@ # under the License. ARG arch=amd64 -ARG go=1.21 -ARG staticcheck=v0.4.7 +ARG go=1.22 +ARG staticcheck=v0.5.1 FROM ${arch}/golang:${go}-bookworm # FROM collects all the args, get back the staticcheck version arg diff --git a/ci/docker/python-wheel-manylinux-test.dockerfile b/ci/docker/python-wheel-manylinux-test.dockerfile index cdd0ae3ced756..09883f9780a36 100644 --- a/ci/docker/python-wheel-manylinux-test.dockerfile +++ b/ci/docker/python-wheel-manylinux-test.dockerfile @@ -16,15 +16,22 @@ # under the License. ARG arch -ARG python -FROM ${arch}/python:${python} - -# RUN pip install --upgrade pip +ARG python_image_tag +FROM ${arch}/python:${python_image_tag} # pandas doesn't provide wheel for aarch64 yet, so cache the compiled # test dependencies in a docker image COPY python/requirements-wheel-test.txt /arrow/python/ RUN pip install -r /arrow/python/requirements-wheel-test.txt +# Install the GCS testbench with the system Python +RUN apt-get update -y -q && \ + apt-get install -y -q \ + build-essential \ + python3-dev && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists* + COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts/ -RUN PYTHON=python /arrow/ci/scripts/install_gcs_testbench.sh default +ENV PIPX_PYTHON=/usr/bin/python3 PIPX_PIP_ARGS=--prefer-binary +RUN /arrow/ci/scripts/install_gcs_testbench.sh default diff --git a/ci/docker/python-wheel-manylinux.dockerfile b/ci/docker/python-wheel-manylinux.dockerfile index cb39667af1e10..42f088fd8a22a 100644 --- a/ci/docker/python-wheel-manylinux.dockerfile +++ b/ci/docker/python-wheel-manylinux.dockerfile @@ -103,7 +103,7 @@ RUN vcpkg install \ # Configure Python for applications running in the bash shell of this Dockerfile ARG python=3.8 ENV PYTHON_VERSION=${python} -RUN PYTHON_ROOT=$(find /opt/python -name cp${PYTHON_VERSION/./}-*) && \ +RUN PYTHON_ROOT=$(find /opt/python -name cp${PYTHON_VERSION/./}-cp${PYTHON_VERSION/./}) && \ echo "export PATH=$PYTHON_ROOT/bin:\$PATH" >> /etc/profile.d/python.sh SHELL ["/bin/bash", "-i", "-c"] diff --git a/ci/docker/python-wheel-windows-test-vs2019.dockerfile b/ci/docker/python-wheel-windows-test-vs2019.dockerfile index 32bbb55e82689..625ab25f848f2 100644 --- a/ci/docker/python-wheel-windows-test-vs2019.dockerfile +++ b/ci/docker/python-wheel-windows-test-vs2019.dockerfile @@ -35,15 +35,27 @@ RUN setx path "%path%;C:\Program Files\Git\usr\bin" RUN wmic product where "name like 'python%%'" call uninstall /nointeractive && \ rm -rf Python* +# Install the GCS testbench using a well-known Python version. +# NOTE: cannot use pipx's `--fetch-missing-python` because of +# https://github.com/pypa/pipx/issues/1521, therefore download Python ourselves. +RUN choco install -r -y --pre --no-progress python --version=3.11.9 +ENV PIPX_BIN_DIR=C:\\Windows\\ +ENV PIPX_PYTHON="C:\Python311\python.exe" +COPY ci/scripts/install_gcs_testbench.bat C:/arrow/ci/scripts/ +RUN call "C:\arrow\ci\scripts\install_gcs_testbench.bat" && \ + storage-testbench -h + # Define the full version number otherwise choco falls back to patch number 0 (3.8 => 3.8.0) ARG python=3.8 -RUN (if "%python%"=="3.8" setx PYTHON_VERSION "3.8.10" && setx PATH "%PATH%;C:\Python38;C:\Python38\Scripts") & \ - (if "%python%"=="3.9" setx PYTHON_VERSION "3.9.13" && setx PATH "%PATH%;C:\Python39;C:\Python39\Scripts") & \ - (if "%python%"=="3.10" setx PYTHON_VERSION "3.10.11" && setx PATH "%PATH%;C:\Python310;C:\Python310\Scripts") & \ - (if "%python%"=="3.11" setx PYTHON_VERSION "3.11.5" && setx PATH "%PATH%;C:\Python311;C:\Python311\Scripts") & \ - (if "%python%"=="3.12" setx PYTHON_VERSION "3.12.0" && setx PATH "%PATH%;C:\Python312;C:\Python312\Scripts") +RUN (if "%python%"=="3.8" setx PYTHON_VERSION "3.8.10") & \ + (if "%python%"=="3.9" setx PYTHON_VERSION "3.9.13") & \ + (if "%python%"=="3.10" setx PYTHON_VERSION "3.10.11") & \ + (if "%python%"=="3.11" setx PYTHON_VERSION "3.11.9") & \ + (if "%python%"=="3.12" setx PYTHON_VERSION "3.12.4") & \ + (if "%python%"=="3.13" setx PYTHON_VERSION "3.13.0-rc1") # Install archiver to extract xz archives -RUN choco install -r -y --no-progress python --version=%PYTHON_VERSION% & \ - python -m pip install --no-cache-dir -U pip setuptools & \ +RUN choco install -r -y --pre --no-progress --force python --version=%PYTHON_VERSION% && \ choco install --no-progress -r -y archiver + +ENV PYTHON=$python diff --git a/ci/docker/python-wheel-windows-vs2019.dockerfile b/ci/docker/python-wheel-windows-vs2019.dockerfile index ff42de939d91f..5a17e3e4c52c2 100644 --- a/ci/docker/python-wheel-windows-vs2019.dockerfile +++ b/ci/docker/python-wheel-windows-vs2019.dockerfile @@ -83,9 +83,10 @@ ARG python=3.8 RUN (if "%python%"=="3.8" setx PYTHON_VERSION "3.8.10" && setx PATH "%PATH%;C:\Python38;C:\Python38\Scripts") & \ (if "%python%"=="3.9" setx PYTHON_VERSION "3.9.13" && setx PATH "%PATH%;C:\Python39;C:\Python39\Scripts") & \ (if "%python%"=="3.10" setx PYTHON_VERSION "3.10.11" && setx PATH "%PATH%;C:\Python310;C:\Python310\Scripts") & \ - (if "%python%"=="3.11" setx PYTHON_VERSION "3.11.5" && setx PATH "%PATH%;C:\Python311;C:\Python311\Scripts") & \ - (if "%python%"=="3.12" setx PYTHON_VERSION "3.12.0" && setx PATH "%PATH%;C:\Python312;C:\Python312\Scripts") -RUN choco install -r -y --no-progress python --version=%PYTHON_VERSION% + (if "%python%"=="3.11" setx PYTHON_VERSION "3.11.9" && setx PATH "%PATH%;C:\Python311;C:\Python311\Scripts") & \ + (if "%python%"=="3.12" setx PYTHON_VERSION "3.12.4" && setx PATH "%PATH%;C:\Python312;C:\Python312\Scripts") & \ + (if "%python%"=="3.13" setx PYTHON_VERSION "3.13.0-rc1" && setx PATH "%PATH%;C:\Python313;C:\Python313\Scripts") +RUN choco install -r -y --pre --no-progress python --version=%PYTHON_VERSION% RUN python -m pip install -U pip setuptools COPY python/requirements-wheel-build.txt arrow/python/ diff --git a/ci/docker/ubuntu-20.04-cpp-minimal.dockerfile b/ci/docker/ubuntu-20.04-cpp-minimal.dockerfile index e17c0306f115d..4d867a448c994 100644 --- a/ci/docker/ubuntu-20.04-cpp-minimal.dockerfile +++ b/ci/docker/ubuntu-20.04-cpp-minimal.dockerfile @@ -33,6 +33,7 @@ RUN apt-get update -y -q && \ libssl-dev \ libcurl4-openssl-dev \ python3-pip \ + python3-venv \ tzdata \ wget && \ apt-get clean && \ diff --git a/ci/docker/ubuntu-22.04-cpp-minimal.dockerfile b/ci/docker/ubuntu-22.04-cpp-minimal.dockerfile index 341d8a87e8661..f26cad51f0983 100644 --- a/ci/docker/ubuntu-22.04-cpp-minimal.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp-minimal.dockerfile @@ -33,6 +33,7 @@ RUN apt-get update -y -q && \ libssl-dev \ libcurl4-openssl-dev \ python3-pip \ + python3-venv \ tzdata \ wget && \ apt-get clean && \ diff --git a/ci/docker/ubuntu-24.04-cpp-minimal.dockerfile b/ci/docker/ubuntu-24.04-cpp-minimal.dockerfile index a995ab2a8bc2d..125bc7ba46a81 100644 --- a/ci/docker/ubuntu-24.04-cpp-minimal.dockerfile +++ b/ci/docker/ubuntu-24.04-cpp-minimal.dockerfile @@ -33,6 +33,7 @@ RUN apt-get update -y -q && \ libssl-dev \ libcurl4-openssl-dev \ python3-pip \ + python3-venv \ tzdata \ tzdata-legacy \ wget && \ diff --git a/ci/scripts/install_gcs_testbench.bat b/ci/scripts/install_gcs_testbench.bat index b03d0c2ad6608..f54f98db7cac8 100644 --- a/ci/scripts/install_gcs_testbench.bat +++ b/ci/scripts/install_gcs_testbench.bat @@ -17,9 +17,18 @@ @echo on -set GCS_TESTBENCH_VERSION="v0.36.0" +set GCS_TESTBENCH_VERSION="v0.40.0" + +set PIPX_FLAGS=--verbose +if NOT "%PIPX_PYTHON%"=="" ( + set PIPX_FLAGS=--python %PIPX_PYTHON% %PIPX_FLAGS% +) + +python -m pip install -U pipx || exit /B 1 @REM Install GCS testbench %GCS_TESTBENCH_VERSION% -python -m pip install ^ +pipx install %PIPX_FLAGS% ^ "https://github.com/googleapis/storage-testbench/archive/%GCS_TESTBENCH_VERSION%.tar.gz" ^ || exit /B 1 + +pipx list --verbose diff --git a/ci/scripts/install_gcs_testbench.sh b/ci/scripts/install_gcs_testbench.sh index 2090290c99322..48a5858a358c9 100755 --- a/ci/scripts/install_gcs_testbench.sh +++ b/ci/scripts/install_gcs_testbench.sh @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. -set -e +set -ex if [ "$#" -ne 1 ]; then echo "Usage: $0 " @@ -34,15 +34,26 @@ case "$(uname -m)" in ;; esac -# On newer pythons install into the system will fail, so override that -export PIP_BREAK_SYSTEM_PACKAGES=1 - version=$1 if [[ "${version}" -eq "default" ]]; then version="v0.39.0" - # Latests versions of Testbench require newer setuptools - ${PYTHON:-python3} -m pip install --upgrade setuptools fi -${PYTHON:-python3} -m pip install \ +# The Python to install pipx with +: ${PIPX_BASE_PYTHON:=$(which python3)} +# The Python to install the GCS testbench with +: ${PIPX_PYTHON:=${PIPX_BASE_PYTHON:-$(which python3)}} + +export PIP_BREAK_SYSTEM_PACKAGES=1 +${PIPX_BASE_PYTHON} -m pip install -U pipx + +pipx_flags=(--verbose --python ${PIPX_PYTHON}) +if [[ $(id -un) == "root" ]]; then + # Install globally as /root/.local/bin is typically not in $PATH + pipx_flags+=(--global) +fi +if [[ -n "${PIPX_PIP_ARGS}" ]]; then + pipx_flags+=(--pip-args "'${PIPX_PIP_ARGS}'") +fi +${PIPX_BASE_PYTHON} -m pipx install ${pipx_flags[@]} \ "https://github.com/googleapis/storage-testbench/archive/${version}.tar.gz" diff --git a/ci/scripts/install_python.sh b/ci/scripts/install_python.sh index 5f962f02b911b..42d0e9ca179fb 100755 --- a/ci/scripts/install_python.sh +++ b/ci/scripts/install_python.sh @@ -28,8 +28,9 @@ declare -A versions versions=([3.8]=3.8.10 [3.9]=3.9.13 [3.10]=3.10.11 - [3.11]=3.11.5 - [3.12]=3.12.0) + [3.11]=3.11.9 + [3.12]=3.12.4 + [3.13]=3.13.0) if [ "$#" -ne 2 ]; then echo "Usage: $0 " @@ -46,7 +47,14 @@ full_version=${versions[$2]} if [ $platform = "macOS" ]; then echo "Downloading Python installer..." - if [ "$(uname -m)" = "arm64" ] || [ "$version" = "3.10" ] || [ "$version" = "3.11" ] || [ "$version" = "3.12" ]; then + if [ "$version" = "3.13" ]; + then + fname="python-${full_version}rc1-macos11.pkg" + elif [ "$(uname -m)" = "arm64" ] || \ + [ "$version" = "3.10" ] || \ + [ "$version" = "3.11" ] || \ + [ "$version" = "3.12" ]; + then fname="python-${full_version}-macos11.pkg" else fname="python-${full_version}-macosx10.9.pkg" diff --git a/ci/scripts/python_wheel_macos_build.sh b/ci/scripts/python_wheel_macos_build.sh index 3ed9d5d8dd12f..92b962f1740bd 100755 --- a/ci/scripts/python_wheel_macos_build.sh +++ b/ci/scripts/python_wheel_macos_build.sh @@ -34,7 +34,7 @@ rm -rf ${source_dir}/python/pyarrow/*.so.* echo "=== (${PYTHON_VERSION}) Set SDK, C++ and Wheel flags ===" export _PYTHON_HOST_PLATFORM="macosx-${MACOSX_DEPLOYMENT_TARGET}-${arch}" -export MACOSX_DEPLOYMENT_TARGET=${MACOSX_DEPLOYMENT_TARGET:-10.15} +export MACOSX_DEPLOYMENT_TARGET=${MACOSX_DEPLOYMENT_TARGET:-12.0} export SDKROOT=${SDKROOT:-$(xcrun --sdk macosx --show-sdk-path)} if [ $arch = "arm64" ]; then @@ -48,13 +48,11 @@ fi echo "=== (${PYTHON_VERSION}) Install Python build dependencies ===" export PIP_SITE_PACKAGES=$(python -c 'import site; print(site.getsitepackages()[0])') -export PIP_TARGET_PLATFORM="macosx_${MACOSX_DEPLOYMENT_TARGET//./_}_${arch}" pip install \ --upgrade \ --only-binary=:all: \ --target $PIP_SITE_PACKAGES \ - --platform $PIP_TARGET_PLATFORM \ -r ${source_dir}/python/requirements-wheel-build.txt pip install "delocate>=0.10.3" diff --git a/ci/scripts/python_wheel_windows_test.bat b/ci/scripts/python_wheel_windows_test.bat index 87c0bb1252024..cac3f18434b6c 100755 --- a/ci/scripts/python_wheel_windows_test.bat +++ b/ci/scripts/python_wheel_windows_test.bat @@ -37,28 +37,32 @@ set PYARROW_TEST_TENSORFLOW=ON set ARROW_TEST_DATA=C:\arrow\testing\data set PARQUET_TEST_DATA=C:\arrow\cpp\submodules\parquet-testing\data -@REM Install testing dependencies -pip install -r C:\arrow\python\requirements-wheel-test.txt || exit /B 1 +@REM List installed Pythons +py -0p + +set PYTHON_CMD=py -%PYTHON% -@REM Install GCS testbench -call "C:\arrow\ci\scripts\install_gcs_testbench.bat" +%PYTHON_CMD% -m pip install -U pip setuptools || exit /B 1 + +@REM Install testing dependencies +%PYTHON_CMD% -m pip install -r C:\arrow\python\requirements-wheel-test.txt || exit /B 1 @REM Install the built wheels -python -m pip install --no-index --find-links=C:\arrow\python\dist\ pyarrow || exit /B 1 +%PYTHON_CMD% -m pip install --no-index --find-links=C:\arrow\python\dist\ pyarrow || exit /B 1 @REM Test that the modules are importable -python -c "import pyarrow" || exit /B 1 -python -c "import pyarrow._gcsfs" || exit /B 1 -python -c "import pyarrow._hdfs" || exit /B 1 -python -c "import pyarrow._s3fs" || exit /B 1 -python -c "import pyarrow.csv" || exit /B 1 -python -c "import pyarrow.dataset" || exit /B 1 -python -c "import pyarrow.flight" || exit /B 1 -python -c "import pyarrow.fs" || exit /B 1 -python -c "import pyarrow.json" || exit /B 1 -python -c "import pyarrow.orc" || exit /B 1 -python -c "import pyarrow.parquet" || exit /B 1 -python -c "import pyarrow.substrait" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow._gcsfs" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow._hdfs" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow._s3fs" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.csv" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.dataset" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.flight" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.fs" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.json" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.orc" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.parquet" || exit /B 1 +%PYTHON_CMD% -c "import pyarrow.substrait" || exit /B 1 @rem Download IANA Timezone Database for ORC C++ curl https://cygwin.osuosl.org/noarch/release/tzdata/tzdata-2024a-1.tar.xz --output tzdata.tar.xz || exit /B @@ -67,4 +71,4 @@ arc unarchive tzdata.tar.xz %USERPROFILE%\Downloads\test\tzdata set TZDIR=%USERPROFILE%\Downloads\test\tzdata\usr\share\zoneinfo @REM Execute unittest -pytest -r s --pyargs pyarrow || exit /B 1 +%PYTHON_CMD% -m pytest -r s --pyargs pyarrow || exit /B 1 diff --git a/ci/vcpkg/arm64-osx-static-debug.cmake b/ci/vcpkg/arm64-osx-static-debug.cmake index f511819a2edd9..32ae7bc433489 100644 --- a/ci/vcpkg/arm64-osx-static-debug.cmake +++ b/ci/vcpkg/arm64-osx-static-debug.cmake @@ -21,6 +21,6 @@ set(VCPKG_LIBRARY_LINKAGE static) set(VCPKG_CMAKE_SYSTEM_NAME Darwin) set(VCPKG_OSX_ARCHITECTURES arm64) -set(VCPKG_OSX_DEPLOYMENT_TARGET "11.0") +set(VCPKG_OSX_DEPLOYMENT_TARGET "12.0") set(VCPKG_BUILD_TYPE debug) diff --git a/ci/vcpkg/arm64-osx-static-release.cmake b/ci/vcpkg/arm64-osx-static-release.cmake index 43d65efb2651b..dde46cd763afe 100644 --- a/ci/vcpkg/arm64-osx-static-release.cmake +++ b/ci/vcpkg/arm64-osx-static-release.cmake @@ -21,6 +21,6 @@ set(VCPKG_LIBRARY_LINKAGE static) set(VCPKG_CMAKE_SYSTEM_NAME Darwin) set(VCPKG_OSX_ARCHITECTURES arm64) -set(VCPKG_OSX_DEPLOYMENT_TARGET "11.0") +set(VCPKG_OSX_DEPLOYMENT_TARGET "12.0") set(VCPKG_BUILD_TYPE release) diff --git a/ci/vcpkg/universal2-osx-static-debug.cmake b/ci/vcpkg/universal2-osx-static-debug.cmake index 8abc1ebf838f1..d3ef0d67eb719 100644 --- a/ci/vcpkg/universal2-osx-static-debug.cmake +++ b/ci/vcpkg/universal2-osx-static-debug.cmake @@ -21,6 +21,6 @@ set(VCPKG_LIBRARY_LINKAGE static) set(VCPKG_CMAKE_SYSTEM_NAME Darwin) set(VCPKG_OSX_ARCHITECTURES "x86_64;arm64") -set(VCPKG_OSX_DEPLOYMENT_TARGET "10.15") +set(VCPKG_OSX_DEPLOYMENT_TARGET "12.0") set(VCPKG_BUILD_TYPE debug) diff --git a/ci/vcpkg/universal2-osx-static-release.cmake b/ci/vcpkg/universal2-osx-static-release.cmake index 2eb36c15175b2..3018aa93e5fbb 100644 --- a/ci/vcpkg/universal2-osx-static-release.cmake +++ b/ci/vcpkg/universal2-osx-static-release.cmake @@ -21,6 +21,6 @@ set(VCPKG_LIBRARY_LINKAGE static) set(VCPKG_CMAKE_SYSTEM_NAME Darwin) set(VCPKG_OSX_ARCHITECTURES "x86_64;arm64") -set(VCPKG_OSX_DEPLOYMENT_TARGET "10.15") +set(VCPKG_OSX_DEPLOYMENT_TARGET "12.0") set(VCPKG_BUILD_TYPE release) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a1e3138da9e0b..5ead9e4b063cd 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -84,7 +84,7 @@ set(ARROW_VERSION "18.0.0-SNAPSHOT") string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") # if no build type is specified, default to release builds -if(NOT DEFINED CMAKE_BUILD_TYPE) +if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build.") diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 41466a1c22404..755887314d110 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -303,7 +303,10 @@ takes precedence over ccache if a storage backend is configured" ON) ARROW_IPC) define_option(ARROW_AZURE - "Build Arrow with Azure support (requires the Azure SDK for C++)" OFF) + "Build Arrow with Azure support (requires the Azure SDK for C++)" + OFF + DEPENDS + ARROW_FILESYSTEM) define_option(ARROW_BUILD_UTILITIES "Build Arrow commandline utilities" OFF) @@ -346,9 +349,16 @@ takes precedence over ccache if a storage backend is configured" ON) ARROW_WITH_UTF8PROC) define_option(ARROW_GCS - "Build Arrow with GCS support (requires the GCloud SDK for C++)" OFF) + "Build Arrow with GCS support (requires the GCloud SDK for C++)" + OFF + DEPENDS + ARROW_FILESYSTEM) - define_option(ARROW_HDFS "Build the Arrow HDFS bridge" OFF) + define_option(ARROW_HDFS + "Build the Arrow HDFS bridge" + OFF + DEPENDS + ARROW_FILESYSTEM) define_option(ARROW_IPC "Build the Arrow IPC extensions" ON) @@ -398,7 +408,11 @@ takes precedence over ccache if a storage backend is configured" ON) ARROW_HDFS ARROW_JSON) - define_option(ARROW_S3 "Build Arrow with S3 support (requires the AWS SDK for C++)" OFF) + define_option(ARROW_S3 + "Build Arrow with S3 support (requires the AWS SDK for C++)" + OFF + DEPENDS + ARROW_FILESYSTEM) define_option(ARROW_SKYHOOK "Build the Skyhook libraries" diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 63e2c036c9a6f..b31037a973279 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -259,7 +259,7 @@ macro(resolve_dependency DEPENDENCY_NAME) IS_RUNTIME_DEPENDENCY REQUIRED_VERSION USE_CONFIG) - set(multi_value_args COMPONENTS PC_PACKAGE_NAMES) + set(multi_value_args COMPONENTS OPTIONAL_COMPONENTS PC_PACKAGE_NAMES) cmake_parse_arguments(ARG "${options}" "${one_value_args}" @@ -287,6 +287,9 @@ macro(resolve_dependency DEPENDENCY_NAME) if(ARG_COMPONENTS) list(APPEND FIND_PACKAGE_ARGUMENTS COMPONENTS ${ARG_COMPONENTS}) endif() + if(ARG_OPTIONAL_COMPONENTS) + list(APPEND FIND_PACKAGE_ARGUMENTS OPTIONAL_COMPONENTS ${ARG_OPTIONAL_COMPONENTS}) + endif() if(${DEPENDENCY_NAME}_SOURCE STREQUAL "AUTO") find_package(${FIND_PACKAGE_ARGUMENTS}) set(COMPATIBLE ${${PACKAGE_NAME}_FOUND}) @@ -1289,15 +1292,19 @@ if(ARROW_USE_BOOST) set(Boost_USE_STATIC_LIBS ON) endif() if(ARROW_BOOST_REQUIRE_LIBRARY) - set(ARROW_BOOST_COMPONENTS system filesystem) + set(ARROW_BOOST_COMPONENTS filesystem system) + set(ARROW_BOOST_OPTIONAL_COMPONENTS process) else() set(ARROW_BOOST_COMPONENTS) + set(ARROW_BOOST_OPTIONAL_COMPONENTS) endif() resolve_dependency(Boost REQUIRED_VERSION ${ARROW_BOOST_REQUIRED_VERSION} COMPONENTS ${ARROW_BOOST_COMPONENTS} + OPTIONAL_COMPONENTS + ${ARROW_BOOST_OPTIONAL_COMPONENTS} IS_RUNTIME_DEPENDENCY # libarrow.so doesn't depend on libboost*. FALSE) @@ -1316,14 +1323,35 @@ if(ARROW_USE_BOOST) endif() endforeach() - if(WIN32 AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU") - # boost/process/detail/windows/handle_workaround.hpp doesn't work - # without BOOST_USE_WINDOWS_H with MinGW because MinGW doesn't - # provide __kernel_entry without winternl.h. - # - # See also: - # https://github.com/boostorg/process/blob/develop/include/boost/process/detail/windows/handle_workaround.hpp - target_compile_definitions(Boost::headers INTERFACE "BOOST_USE_WINDOWS_H=1") + if(TARGET Boost::process) + # Boost >= 1.86 + target_compile_definitions(Boost::process INTERFACE "BOOST_PROCESS_HAVE_V1") + target_compile_definitions(Boost::process INTERFACE "BOOST_PROCESS_HAVE_V2") + else() + # Boost < 1.86 + add_library(Boost::process INTERFACE IMPORTED) + if(TARGET Boost::filesystem) + target_link_libraries(Boost::process INTERFACE Boost::filesystem) + endif() + if(TARGET Boost::system) + target_link_libraries(Boost::process INTERFACE Boost::system) + endif() + if(TARGET Boost::headers) + target_link_libraries(Boost::process INTERFACE Boost::headers) + endif() + if(Boost_VERSION VERSION_GREATER_EQUAL 1.80) + target_compile_definitions(Boost::process INTERFACE "BOOST_PROCESS_HAVE_V2") + # Boost < 1.86 has a bug that + # boost::process::v2::process_environment::on_setup() isn't + # defined. We need to build Boost Process source to define it. + # + # See also: + # https://github.com/boostorg/process/issues/312 + target_compile_definitions(Boost::process INTERFACE "BOOST_PROCESS_NEED_SOURCE") + if(WIN32) + target_link_libraries(Boost::process INTERFACE bcrypt ntdll) + endif() + endif() endif() message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}") diff --git a/cpp/examples/minimal_build/CMakeLists.txt b/cpp/examples/minimal_build/CMakeLists.txt index b4a7cde938c87..95dad34221add 100644 --- a/cpp/examples/minimal_build/CMakeLists.txt +++ b/cpp/examples/minimal_build/CMakeLists.txt @@ -30,7 +30,7 @@ endif() # We require a C++17 compliant compiler set(CMAKE_CXX_STANDARD_REQUIRED ON) -if(NOT DEFINED CMAKE_BUILD_TYPE) +if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Release) endif() diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index fb7253b6fd69d..01ac813f4713b 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -373,7 +373,10 @@ set(ARROW_SRCS config.cc datum.cc device.cc + device_allocation_type_set.cc extension_type.cc + extension/bool8.cc + extension/uuid.cc pretty_print.cc record_batch.cc result.cc @@ -641,9 +644,13 @@ else() endif() set(ARROW_TESTING_SHARED_LINK_LIBS arrow_shared ${ARROW_GTEST_GTEST}) -set(ARROW_TESTING_SHARED_PRIVATE_LINK_LIBS arrow::flatbuffers RapidJSON) -set(ARROW_TESTING_STATIC_LINK_LIBS arrow::flatbuffers RapidJSON arrow_static - ${ARROW_GTEST_GTEST}) +set(ARROW_TESTING_SHARED_PRIVATE_LINK_LIBS arrow::flatbuffers RapidJSON Boost::process) +set(ARROW_TESTING_STATIC_LINK_LIBS + arrow::flatbuffers + RapidJSON + Boost::process + arrow_static + ${ARROW_GTEST_GTEST}) set(ARROW_TESTING_SHARED_INSTALL_INTERFACE_LIBS Arrow::arrow_shared) set(ARROW_TESTING_STATIC_INSTALL_INTERFACE_LIBS Arrow::arrow_static) # that depend on gtest @@ -664,9 +671,10 @@ set(ARROW_TESTING_SRCS io/test_common.cc ipc/test_common.cc testing/fixed_width_test_util.cc + testing/generator.cc testing/gtest_util.cc + testing/process.cc testing/random.cc - testing/generator.cc testing/util.cc) # @@ -906,7 +914,6 @@ endif() if(ARROW_JSON) arrow_add_object_library(ARROW_JSON - extension/bool8.cc extension/fixed_shape_tensor.cc extension/opaque.cc json/options.cc @@ -1225,6 +1232,7 @@ add_subdirectory(testing) add_subdirectory(array) add_subdirectory(c) add_subdirectory(compute) +add_subdirectory(extension) add_subdirectory(io) add_subdirectory(tensor) add_subdirectory(util) @@ -1267,7 +1275,6 @@ endif() if(ARROW_JSON) add_subdirectory(json) - add_subdirectory(extension) endif() if(ARROW_ORC) diff --git a/cpp/src/arrow/acero/aggregate_node_test.cc b/cpp/src/arrow/acero/aggregate_node_test.cc index d398fb24b73d5..c623271db9fb4 100644 --- a/cpp/src/arrow/acero/aggregate_node_test.cc +++ b/cpp/src/arrow/acero/aggregate_node_test.cc @@ -210,5 +210,57 @@ TEST(GroupByNode, NoSkipNulls) { AssertExecBatchesEqualIgnoringOrder(out_schema, {expected_batch}, out_batches.batches); } +TEST(ScalarAggregateNode, AnyAll) { + // GH-43768: boolean_any and boolean_all with constant input should work well + // when min_count != 0. + std::shared_ptr in_schema = schema({field("not_used", int32())}); + std::shared_ptr out_schema = schema({field("agg_out", boolean())}); + struct AnyAllCase { + std::string batches_json; + Expression literal; + std::string expected_json; + bool skip_nulls = false; + uint32_t min_count = 2; + }; + std::vector cases{ + {"[[42], [42], [42], [42]]", literal(true), "[[true]]"}, + {"[[42], [42], [42], [42]]", literal(false), "[[false]]"}, + {"[[42], [42], [42], [42]]", literal(BooleanScalar{}), "[[null]]"}, + {"[[42]]", literal(true), "[[null]]"}, + {"[[42], [42], [42]]", literal(true), "[[true]]"}, + {"[[42], [42], [42]]", literal(true), "[[null]]", /*skip_nulls=*/false, + /*min_count=*/4}, + {"[[42], [42], [42], [42]]", literal(BooleanScalar{}), "[[null]]", + /*skip_nulls=*/true}, + }; + for (const AnyAllCase& any_all_case : cases) { + for (auto func_name : {"any", "all"}) { + std::vector batches{ + ExecBatchFromJSON({int32()}, any_all_case.batches_json)}; + std::vector aggregates = { + Aggregate(func_name, + std::make_shared( + /*skip_nulls=*/any_all_case.skip_nulls, + /*min_count=*/any_all_case.min_count), + FieldRef("literal"))}; + + // And a projection to make the input including a Scalar Boolean + Declaration plan = Declaration::Sequence( + {{"exec_batch_source", ExecBatchSourceNodeOptions(in_schema, batches)}, + {"project", ProjectNodeOptions({any_all_case.literal}, {"literal"})}, + {"aggregate", AggregateNodeOptions(aggregates)}}); + + ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema out_batches, + DeclarationToExecBatches(plan)); + + ExecBatch expected_batch = + ExecBatchFromJSON({boolean()}, any_all_case.expected_json); + + AssertExecBatchesEqualIgnoringOrder(out_schema, {expected_batch}, + out_batches.batches); + } + } +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index 470960b1c5062..e3e37e249e6a3 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -104,7 +104,7 @@ class JoinBenchmark { key_cmp.push_back(JoinKeyCmp::EQ); } - for (size_t i = 0; i < settings.build_payload_types.size(); i++) { + for (size_t i = 0; i < settings.probe_payload_types.size(); i++) { std::string name = "lp" + std::to_string(i); DCHECK_OK(l_schema_builder.AddField(field(name, settings.probe_payload_types[i]))); } @@ -279,7 +279,7 @@ static void BM_HashJoinBasic_MatchesPerRow(benchmark::State& st) { settings.cardinality = 1.0 / static_cast(st.range(0)); settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_probe_batches; + settings.num_probe_batches = settings.num_build_batches; HashJoinBasicBenchmarkImpl(st, settings); } @@ -291,7 +291,7 @@ static void BM_HashJoinBasic_PayloadSize(benchmark::State& st) { settings.cardinality = 1.0 / static_cast(st.range(1)); settings.num_build_batches = static_cast(st.range(2)); - settings.num_probe_batches = settings.num_probe_batches; + settings.num_probe_batches = settings.num_build_batches; HashJoinBasicBenchmarkImpl(st, settings); } diff --git a/cpp/src/arrow/acero/hash_join_dict.cc b/cpp/src/arrow/acero/hash_join_dict.cc index 3aef08e6e9ccf..8db9dddb2c3a0 100644 --- a/cpp/src/arrow/acero/hash_join_dict.cc +++ b/cpp/src/arrow/acero/hash_join_dict.cc @@ -225,21 +225,20 @@ Status HashJoinDictBuild::Init(ExecContext* ctx, std::shared_ptr dictiona return Status::OK(); } - dictionary_ = dictionary; + dictionary_ = std::move(dictionary); // Initialize encoder RowEncoder encoder; - std::vector encoder_types; - encoder_types.emplace_back(value_type_); + std::vector encoder_types{value_type_}; encoder.Init(encoder_types, ctx); // Encode all dictionary values - int64_t length = dictionary->data()->length; + int64_t length = dictionary_->data()->length; if (length >= std::numeric_limits::max()) { return Status::Invalid( "Dictionary length in hash join must fit into signed 32-bit integer."); } - RETURN_NOT_OK(encoder.EncodeAndAppend(ExecSpan({*dictionary->data()}, length))); + RETURN_NOT_OK(encoder.EncodeAndAppend(ExecSpan({*dictionary_->data()}, length))); std::vector entries_to_take; diff --git a/cpp/src/arrow/acero/hash_join_node.cc b/cpp/src/arrow/acero/hash_join_node.cc index 67f902e64be93..80dd163ced740 100644 --- a/cpp/src/arrow/acero/hash_join_node.cc +++ b/cpp/src/arrow/acero/hash_join_node.cc @@ -61,30 +61,30 @@ Result> HashJoinSchema::ComputePayload( const std::vector& filter, const std::vector& keys) { // payload = (output + filter) - keys, with no duplicates std::unordered_set payload_fields; - for (auto ref : output) { + for (const auto& ref : output) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema)); payload_fields.insert(match[0]); } - for (auto ref : filter) { + for (const auto& ref : filter) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema)); payload_fields.insert(match[0]); } - for (auto ref : keys) { + for (const auto& ref : keys) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema)); payload_fields.erase(match[0]); } std::vector payload_refs; - for (auto ref : output) { + for (const auto& ref : output) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema)); if (payload_fields.find(match[0]) != payload_fields.end()) { payload_refs.push_back(ref); payload_fields.erase(match[0]); } } - for (auto ref : filter) { + for (const auto& ref : filter) { ARROW_ASSIGN_OR_RAISE(auto match, ref.FindOne(schema)); if (payload_fields.find(match[0]) != payload_fields.end()) { payload_refs.push_back(ref); @@ -198,7 +198,7 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc return Status::Invalid("Different number of key fields on left (", left_keys.size(), ") and right (", right_keys.size(), ") side of the join"); } - if (left_keys.size() < 1) { + if (left_keys.empty()) { return Status::Invalid("Join key cannot be empty"); } for (size_t i = 0; i < left_keys.size() + right_keys.size(); ++i) { @@ -432,7 +432,7 @@ Status HashJoinSchema::CollectFilterColumns(std::vector& left_filter, indices[0] -= left_schema.num_fields(); FieldPath corrected_path(std::move(indices)); if (right_seen_paths.find(*path) == right_seen_paths.end()) { - right_filter.push_back(corrected_path); + right_filter.emplace_back(corrected_path); right_seen_paths.emplace(std::move(corrected_path)); } } else if (left_seen_paths.find(*path) == left_seen_paths.end()) { @@ -698,7 +698,7 @@ class HashJoinNode : public ExecNode, public TracedNode { std::shared_ptr output_schema, std::unique_ptr schema_mgr, Expression filter, std::unique_ptr impl) - : ExecNode(plan, inputs, {"left", "right"}, + : ExecNode(plan, std::move(inputs), {"left", "right"}, /*output_schema=*/std::move(output_schema)), TracedNode(this), join_type_(join_options.join_type), diff --git a/cpp/src/arrow/acero/hash_join_node.h b/cpp/src/arrow/acero/hash_join_node.h index ad60019ceabc4..19745b8675cf0 100644 --- a/cpp/src/arrow/acero/hash_join_node.h +++ b/cpp/src/arrow/acero/hash_join_node.h @@ -65,9 +65,9 @@ class ARROW_ACERO_EXPORT HashJoinSchema { std::shared_ptr MakeOutputSchema(const std::string& left_field_name_suffix, const std::string& right_field_name_suffix); - bool LeftPayloadIsEmpty() { return PayloadIsEmpty(0); } + bool LeftPayloadIsEmpty() const { return PayloadIsEmpty(0); } - bool RightPayloadIsEmpty() { return PayloadIsEmpty(1); } + bool RightPayloadIsEmpty() const { return PayloadIsEmpty(1); } static int kMissingField() { return SchemaProjectionMaps::kMissingField; @@ -88,7 +88,7 @@ class ARROW_ACERO_EXPORT HashJoinSchema { const SchemaProjectionMap& right_to_filter, const Expression& filter); - bool PayloadIsEmpty(int side) { + bool PayloadIsEmpty(int side) const { assert(side == 0 || side == 1); return proj_maps[side].num_cols(HashJoinProjection::PAYLOAD) == 0; } diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index 9065e286a2228..76ad9c7d650eb 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -29,6 +29,7 @@ #include "arrow/compute/kernels/test_util.h" #include "arrow/compute/light_array_internal.h" #include "arrow/compute/row/row_encoder_internal.h" +#include "arrow/extension/uuid.h" #include "arrow/testing/extension_type.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 4d0c8187ac6e2..6c783110af571 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1667,7 +1667,7 @@ Result> JoinResultMaterialize::FlushBuildColumn( const std::shared_ptr& data_type, const RowArray* row_array, int column_id, uint32_t* row_ids) { ResizableArrayData output; - output.Init(data_type, pool_, bit_util::Log2(num_rows_)); + RETURN_NOT_OK(output.Init(data_type, pool_, bit_util::Log2(num_rows_))); for (size_t i = 0; i <= null_ranges_.size(); ++i) { int row_id_begin = @@ -2247,8 +2247,9 @@ Result JoinResidualFilter::MaterializeFilterInput( build_schemas_->map(HashJoinProjection::FILTER, HashJoinProjection::PAYLOAD); for (int i = 0; i < num_build_cols; ++i) { ResizableArrayData column_data; - column_data.Init(build_schemas_->data_type(HashJoinProjection::FILTER, i), pool_, - bit_util::Log2(num_batch_rows)); + RETURN_NOT_OK( + column_data.Init(build_schemas_->data_type(HashJoinProjection::FILTER, i), + pool_, bit_util::Log2(num_batch_rows))); if (auto idx = to_key.get(i); idx != SchemaProjectionMap::kMissingField) { RETURN_NOT_OK(build_keys_->DecodeSelected(&column_data, idx, num_batch_rows, key_ids_maybe_null, pool_)); diff --git a/cpp/src/arrow/array/array_base.h b/cpp/src/arrow/array/array_base.h index 716ae0722069e..e4af67d7e5f0b 100644 --- a/cpp/src/arrow/array/array_base.h +++ b/cpp/src/arrow/array/array_base.h @@ -232,6 +232,14 @@ class ARROW_EXPORT Array { /// \return DeviceAllocationType DeviceAllocationType device_type() const { return data_->device_type(); } + /// \brief Return the statistics of this Array + /// + /// This just delegates to calling statistics on the underlying ArrayData + /// object which backs this Array. + /// + /// \return const ArrayStatistics& + std::shared_ptr statistics() const { return data_->statistics; } + protected: Array() = default; ARROW_DEFAULT_MOVE_AND_ASSIGN(Array); diff --git a/cpp/src/arrow/array/array_test.cc b/cpp/src/arrow/array/array_test.cc index 32806d9d2edb3..73e0c692432b6 100644 --- a/cpp/src/arrow/array/array_test.cc +++ b/cpp/src/arrow/array/array_test.cc @@ -3709,6 +3709,132 @@ TEST(TestSwapEndianArrayData, InvalidLength) { } } +class TestArrayDataStatistics : public ::testing::Test { + public: + void SetUp() { + valids_ = {1, 0, 1, 1}; + null_count_ = std::count(valids_.begin(), valids_.end(), 0); + null_buffer_ = *internal::BytesToBits(valids_); + values_ = {1, 0, 3, -4}; + min_ = *std::min_element(values_.begin(), values_.end()); + max_ = *std::max_element(values_.begin(), values_.end()); + values_buffer_ = Buffer::FromVector(values_); + data_ = ArrayData::Make(int32(), values_.size(), {null_buffer_, values_buffer_}, + null_count_); + data_->statistics = std::make_shared(); + data_->statistics->null_count = null_count_; + data_->statistics->min = min_; + data_->statistics->is_min_exact = true; + data_->statistics->max = max_; + data_->statistics->is_max_exact = true; + } + + protected: + std::vector valids_; + size_t null_count_; + std::shared_ptr null_buffer_; + std::vector values_; + int64_t min_; + int64_t max_; + std::shared_ptr values_buffer_; + std::shared_ptr data_; +}; + +TEST_F(TestArrayDataStatistics, MoveConstructor) { + ArrayData copied_data(*data_); + ArrayData moved_data(std::move(copied_data)); + + ASSERT_TRUE(moved_data.statistics->null_count.has_value()); + ASSERT_EQ(null_count_, moved_data.statistics->null_count.value()); + + ASSERT_TRUE(moved_data.statistics->min.has_value()); + ASSERT_TRUE(std::holds_alternative(moved_data.statistics->min.value())); + ASSERT_EQ(min_, std::get(moved_data.statistics->min.value())); + ASSERT_TRUE(moved_data.statistics->is_min_exact); + + ASSERT_TRUE(moved_data.statistics->max.has_value()); + ASSERT_TRUE(std::holds_alternative(moved_data.statistics->max.value())); + ASSERT_EQ(max_, std::get(moved_data.statistics->max.value())); + ASSERT_TRUE(moved_data.statistics->is_max_exact); +} + +TEST_F(TestArrayDataStatistics, CopyConstructor) { + ArrayData copied_data(*data_); + + ASSERT_TRUE(copied_data.statistics->null_count.has_value()); + ASSERT_EQ(null_count_, copied_data.statistics->null_count.value()); + + ASSERT_TRUE(copied_data.statistics->min.has_value()); + ASSERT_TRUE(std::holds_alternative(copied_data.statistics->min.value())); + ASSERT_EQ(min_, std::get(copied_data.statistics->min.value())); + ASSERT_TRUE(copied_data.statistics->is_min_exact); + + ASSERT_TRUE(copied_data.statistics->max.has_value()); + ASSERT_TRUE(std::holds_alternative(copied_data.statistics->max.value())); + ASSERT_EQ(max_, std::get(copied_data.statistics->max.value())); + ASSERT_TRUE(copied_data.statistics->is_max_exact); +} + +TEST_F(TestArrayDataStatistics, MoveAssignment) { + ArrayData copied_data(*data_); + ArrayData moved_data; + moved_data = std::move(copied_data); + + ASSERT_TRUE(moved_data.statistics->null_count.has_value()); + ASSERT_EQ(null_count_, moved_data.statistics->null_count.value()); + + ASSERT_TRUE(moved_data.statistics->min.has_value()); + ASSERT_TRUE(std::holds_alternative(moved_data.statistics->min.value())); + ASSERT_EQ(min_, std::get(moved_data.statistics->min.value())); + ASSERT_TRUE(moved_data.statistics->is_min_exact); + + ASSERT_TRUE(moved_data.statistics->max.has_value()); + ASSERT_TRUE(std::holds_alternative(moved_data.statistics->max.value())); + ASSERT_EQ(max_, std::get(moved_data.statistics->max.value())); + ASSERT_TRUE(moved_data.statistics->is_max_exact); +} + +TEST_F(TestArrayDataStatistics, CopyAssignment) { + ArrayData copied_data; + copied_data = *data_; + + ASSERT_TRUE(copied_data.statistics->null_count.has_value()); + ASSERT_EQ(null_count_, copied_data.statistics->null_count.value()); + + ASSERT_TRUE(copied_data.statistics->min.has_value()); + ASSERT_TRUE(std::holds_alternative(copied_data.statistics->min.value())); + ASSERT_EQ(min_, std::get(copied_data.statistics->min.value())); + ASSERT_TRUE(copied_data.statistics->is_min_exact); + + ASSERT_TRUE(copied_data.statistics->max.has_value()); + ASSERT_TRUE(std::holds_alternative(copied_data.statistics->max.value())); + ASSERT_EQ(max_, std::get(copied_data.statistics->max.value())); + ASSERT_TRUE(copied_data.statistics->is_max_exact); +} + +TEST_F(TestArrayDataStatistics, CopyTo) { + ASSERT_OK_AND_ASSIGN(auto copied_data, + data_->CopyTo(arrow::default_cpu_memory_manager())); + + ASSERT_TRUE(copied_data->statistics->null_count.has_value()); + ASSERT_EQ(null_count_, copied_data->statistics->null_count.value()); + + ASSERT_TRUE(copied_data->statistics->min.has_value()); + ASSERT_TRUE(std::holds_alternative(copied_data->statistics->min.value())); + ASSERT_EQ(min_, std::get(copied_data->statistics->min.value())); + ASSERT_TRUE(copied_data->statistics->is_min_exact); + + ASSERT_TRUE(copied_data->statistics->max.has_value()); + ASSERT_TRUE(std::holds_alternative(copied_data->statistics->max.value())); + ASSERT_EQ(max_, std::get(copied_data->statistics->max.value())); + ASSERT_TRUE(copied_data->statistics->is_max_exact); +} + +TEST_F(TestArrayDataStatistics, Slice) { + auto sliced_data = data_->Slice(0, 1); + ASSERT_FALSE(sliced_data->statistics); +} + template class TestPrimitiveArray : public ::testing::Test { public: diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 83eeb56c496cf..8e29297a8c175 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -165,6 +165,8 @@ Result> CopyToImpl(const ArrayData& data, ARROW_ASSIGN_OR_RAISE(output->dictionary, CopyToImpl(*data.dictionary, to, copy_fn)); } + output->statistics = data.statistics; + return output; } } // namespace @@ -195,6 +197,7 @@ std::shared_ptr ArrayData::Slice(int64_t off, int64_t len) const { } else { copy->null_count = null_count != 0 ? kUnknownNullCount : 0; } + copy->statistics = nullptr; return copy; } diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h index e0508fe6980a7..1e6ee9a1d32ff 100644 --- a/cpp/src/arrow/array/data.h +++ b/cpp/src/arrow/array/data.h @@ -24,6 +24,7 @@ #include #include +#include "arrow/array/statistics.h" #include "arrow/buffer.h" #include "arrow/result.h" #include "arrow/type.h" @@ -152,7 +153,8 @@ struct ARROW_EXPORT ArrayData { offset(other.offset), buffers(std::move(other.buffers)), child_data(std::move(other.child_data)), - dictionary(std::move(other.dictionary)) { + dictionary(std::move(other.dictionary)), + statistics(std::move(other.statistics)) { SetNullCount(other.null_count); } @@ -163,7 +165,8 @@ struct ARROW_EXPORT ArrayData { offset(other.offset), buffers(other.buffers), child_data(other.child_data), - dictionary(other.dictionary) { + dictionary(other.dictionary), + statistics(other.statistics) { SetNullCount(other.null_count); } @@ -176,6 +179,7 @@ struct ARROW_EXPORT ArrayData { buffers = std::move(other.buffers); child_data = std::move(other.child_data); dictionary = std::move(other.dictionary); + statistics = std::move(other.statistics); return *this; } @@ -188,6 +192,7 @@ struct ARROW_EXPORT ArrayData { buffers = other.buffers; child_data = other.child_data; dictionary = other.dictionary; + statistics = other.statistics; return *this; } @@ -274,6 +279,18 @@ struct ARROW_EXPORT ArrayData { } /// \brief Construct a zero-copy slice of the data with the given offset and length + /// + /// The associated `ArrayStatistics` is always discarded in a sliced + /// `ArrayData`. Because `ArrayStatistics` in the original + /// `ArrayData` may be invalid in a sliced `ArrayData`. If you want + /// to reuse statistics in the original `ArrayData`, you need to do + /// it by yourself. + /// + /// If the specified slice range has the same range as the original + /// `ArrayData`, we can reuse statistics in the original + /// `ArrayData`. Because it has the same data as the original + /// `ArrayData`. But the associated `ArrayStatistics` is discarded + /// in this case too. Use `Copy()` instead for the case. std::shared_ptr Slice(int64_t offset, int64_t length) const; /// \brief Input-checking variant of Slice @@ -390,6 +407,9 @@ struct ARROW_EXPORT ArrayData { // The dictionary for this Array, if any. Only used for dictionary type std::shared_ptr dictionary; + + // The statistics for this Array. + std::shared_ptr statistics; }; /// \brief A non-owning Buffer reference diff --git a/cpp/src/arrow/chunk_resolver.cc b/cpp/src/arrow/chunk_resolver.cc index 55eec53ced1c7..854127480744e 100644 --- a/cpp/src/arrow/chunk_resolver.cc +++ b/cpp/src/arrow/chunk_resolver.cc @@ -60,42 +60,38 @@ inline std::vector MakeChunksOffsets(const std::vector& chunks) { template void ResolveManyInline(size_t num_offsets, const int64_t* signed_offsets, int64_t n_indices, const IndexType* logical_index_vec, - IndexType* out_chunk_index_vec, IndexType chunk_hint, - IndexType* out_index_in_chunk_vec) { + TypedChunkLocation* out_chunk_location_vec, + IndexType chunk_hint) { auto* offsets = reinterpret_cast(signed_offsets); const auto num_chunks = static_cast(num_offsets - 1); // chunk_hint in [0, num_offsets) per the precondition. for (int64_t i = 0; i < n_indices; i++) { - const auto index = static_cast(logical_index_vec[i]); + auto typed_logical_index = logical_index_vec[i]; + const auto index = static_cast(typed_logical_index); + // use or update chunk_hint if (index >= offsets[chunk_hint] && (chunk_hint == num_chunks || index < offsets[chunk_hint + 1])) { - out_chunk_index_vec[i] = chunk_hint; // hint is correct! - continue; + // hint is correct! + } else { + // lo < hi is guaranteed by `num_offsets = chunks.size() + 1` + auto chunk_index = + ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); + chunk_hint = static_cast(chunk_index); } - // lo < hi is guaranteed by `num_offsets = chunks.size() + 1` - auto chunk_index = - ChunkResolver::Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets); - chunk_hint = static_cast(chunk_index); - out_chunk_index_vec[i] = chunk_hint; - } - if (out_index_in_chunk_vec != NULLPTR) { - for (int64_t i = 0; i < n_indices; i++) { - auto logical_index = logical_index_vec[i]; - auto chunk_index = out_chunk_index_vec[i]; - // chunk_index is in [0, chunks.size()] no matter what the - // value of logical_index is, so it's always safe to dereference - // offset_ as it contains chunks.size()+1 values. - out_index_in_chunk_vec[i] = - logical_index - static_cast(offsets[chunk_index]); + out_chunk_location_vec[i].chunk_index = chunk_hint; + // chunk_index is in [0, chunks.size()] no matter what the + // value of logical_index is, so it's always safe to dereference + // offset_ as it contains chunks.size()+1 values. + out_chunk_location_vec[i].index_in_chunk = + typed_logical_index - static_cast(offsets[chunk_hint]); #if defined(ARROW_VALGRIND) || defined(ADDRESS_SANITIZER) - // Make it more likely that Valgrind/ASAN can catch an invalid memory - // access by poisoning out_index_in_chunk_vec[i] when the logical - // index is out-of-bounds. - if (chunk_index == num_chunks) { - out_index_in_chunk_vec[i] = std::numeric_limits::max(); - } -#endif + // Make it more likely that Valgrind/ASAN can catch an invalid memory + // access by poisoning the index-in-chunk value when the logical + // index is out-of-bounds. + if (chunk_hint == num_chunks) { + out_chunk_location_vec[i].index_in_chunk = std::numeric_limits::max(); } +#endif } } @@ -130,31 +126,31 @@ ChunkResolver& ChunkResolver::operator=(const ChunkResolver& other) noexcept { } void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint8_t* logical_index_vec, - uint8_t* out_chunk_index_vec, uint8_t chunk_hint, - uint8_t* out_index_in_chunk_vec) const { + TypedChunkLocation* out_chunk_location_vec, + uint8_t chunk_hint) const { ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec, - out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec); + out_chunk_location_vec, chunk_hint); } void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint32_t* logical_index_vec, - uint32_t* out_chunk_index_vec, uint32_t chunk_hint, - uint32_t* out_index_in_chunk_vec) const { + TypedChunkLocation* out_chunk_location_vec, + uint32_t chunk_hint) const { ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec, - out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec); + out_chunk_location_vec, chunk_hint); } void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint16_t* logical_index_vec, - uint16_t* out_chunk_index_vec, uint16_t chunk_hint, - uint16_t* out_index_in_chunk_vec) const { + TypedChunkLocation* out_chunk_location_vec, + uint16_t chunk_hint) const { ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec, - out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec); + out_chunk_location_vec, chunk_hint); } void ChunkResolver::ResolveManyImpl(int64_t n_indices, const uint64_t* logical_index_vec, - uint64_t* out_chunk_index_vec, uint64_t chunk_hint, - uint64_t* out_index_in_chunk_vec) const { + TypedChunkLocation* out_chunk_location_vec, + uint64_t chunk_hint) const { ResolveManyInline(offsets_.size(), offsets_.data(), n_indices, logical_index_vec, - out_chunk_index_vec, chunk_hint, out_index_in_chunk_vec); + out_chunk_location_vec, chunk_hint); } } // namespace arrow::internal diff --git a/cpp/src/arrow/chunk_resolver.h b/cpp/src/arrow/chunk_resolver.h index a2a3d5a864243..83fda62387fe1 100644 --- a/cpp/src/arrow/chunk_resolver.h +++ b/cpp/src/arrow/chunk_resolver.h @@ -31,28 +31,34 @@ namespace arrow::internal { struct ChunkResolver; -struct ChunkLocation { +template +struct TypedChunkLocation { /// \brief Index of the chunk in the array of chunks /// /// The value is always in the range `[0, chunks.size()]`. `chunks.size()` is used /// to represent out-of-bounds locations. - int64_t chunk_index = 0; + IndexType chunk_index = 0; /// \brief Index of the value in the chunk /// /// The value is UNDEFINED if chunk_index >= chunks.size() - int64_t index_in_chunk = 0; + IndexType index_in_chunk = 0; - ChunkLocation() = default; + TypedChunkLocation() = default; - ChunkLocation(int64_t chunk_index, int64_t index_in_chunk) - : chunk_index(chunk_index), index_in_chunk(index_in_chunk) {} + TypedChunkLocation(IndexType chunk_index, IndexType index_in_chunk) + : chunk_index(chunk_index), index_in_chunk(index_in_chunk) { + static_assert(sizeof(TypedChunkLocation) == 2 * sizeof(IndexType)); + static_assert(alignof(TypedChunkLocation) == alignof(IndexType)); + } - bool operator==(ChunkLocation other) const { + bool operator==(TypedChunkLocation other) const { return chunk_index == other.chunk_index && index_in_chunk == other.index_in_chunk; } }; +using ChunkLocation = TypedChunkLocation; + /// \brief An utility that incrementally resolves logical indices into /// physical indices in a chunked array. struct ARROW_EXPORT ChunkResolver { @@ -144,26 +150,25 @@ struct ARROW_EXPORT ChunkResolver { /// /// \pre 0 <= logical_index_vec[i] < logical_array_length() /// (for well-defined and valid chunk index results) - /// \pre out_chunk_index_vec has space for `n_indices` + /// \pre out_chunk_location_vec has space for `n_indices` locations /// \pre chunk_hint in [0, chunks.size()] - /// \post out_chunk_index_vec[i] in [0, chunks.size()] for i in [0, n) + /// \post out_chunk_location_vec[i].chunk_index in [0, chunks.size()] for i in [0, n) /// \post if logical_index_vec[i] >= chunked_array.length(), then - /// out_chunk_index_vec[i] == chunks.size() - /// and out_index_in_chunk_vec[i] is UNDEFINED (can be out-of-bounds) - /// \post if logical_index_vec[i] < 0, then both out_chunk_index_vec[i] and - /// out_index_in_chunk_vec[i] are UNDEFINED + /// out_chunk_location_vec[i].chunk_index == chunks.size() + /// and out_chunk_location_vec[i].index_in_chunk is UNDEFINED (can be + /// out-of-bounds) + /// \post if logical_index_vec[i] < 0, then both values in out_chunk_index_vec[i] + /// are UNDEFINED /// /// \param n_indices The number of logical indices to resolve /// \param logical_index_vec The logical indices to resolve - /// \param out_chunk_index_vec The output array where the chunk indices will be written + /// \param out_chunk_location_vec The output array where the locations will be written /// \param chunk_hint 0 or the last chunk_index produced by ResolveMany - /// \param out_index_in_chunk_vec If not NULLPTR, the output array where the - /// within-chunk indices will be written /// \return false iff chunks.size() > std::numeric_limits::max() template [[nodiscard]] bool ResolveMany(int64_t n_indices, const IndexType* logical_index_vec, - IndexType* out_chunk_index_vec, IndexType chunk_hint = 0, - IndexType* out_index_in_chunk_vec = NULLPTR) const { + TypedChunkLocation* out_chunk_location_vec, + IndexType chunk_hint = 0) const { if constexpr (sizeof(IndexType) < sizeof(uint64_t)) { // The max value returned by Bisect is `offsets.size() - 1` (= chunks.size()). constexpr uint64_t kMaxIndexTypeValue = std::numeric_limits::max(); @@ -188,13 +193,11 @@ struct ARROW_EXPORT ChunkResolver { // logical index in the chunked array. using U = std::make_unsigned_t; ResolveManyImpl(n_indices, reinterpret_cast(logical_index_vec), - reinterpret_cast(out_chunk_index_vec), - static_cast(chunk_hint), - reinterpret_cast(out_index_in_chunk_vec)); + reinterpret_cast*>(out_chunk_location_vec), + static_cast(chunk_hint)); } else { static_assert(std::is_unsigned_v); - ResolveManyImpl(n_indices, logical_index_vec, out_chunk_index_vec, chunk_hint, - out_index_in_chunk_vec); + ResolveManyImpl(n_indices, logical_index_vec, out_chunk_location_vec, chunk_hint); } return true; } @@ -226,10 +229,14 @@ struct ARROW_EXPORT ChunkResolver { /// \pre all the pre-conditions of ChunkResolver::ResolveMany() /// \pre num_offsets - 1 <= std::numeric_limits::max() - void ResolveManyImpl(int64_t, const uint8_t*, uint8_t*, uint8_t, uint8_t*) const; - void ResolveManyImpl(int64_t, const uint16_t*, uint16_t*, uint16_t, uint16_t*) const; - void ResolveManyImpl(int64_t, const uint32_t*, uint32_t*, uint32_t, uint32_t*) const; - void ResolveManyImpl(int64_t, const uint64_t*, uint64_t*, uint64_t, uint64_t*) const; + void ResolveManyImpl(int64_t, const uint8_t*, TypedChunkLocation*, + uint8_t) const; + void ResolveManyImpl(int64_t, const uint16_t*, TypedChunkLocation*, + uint16_t) const; + void ResolveManyImpl(int64_t, const uint32_t*, TypedChunkLocation*, + uint32_t) const; + void ResolveManyImpl(int64_t, const uint64_t*, TypedChunkLocation*, + uint64_t) const; public: /// \brief Find the index of the chunk that contains the logical index. diff --git a/cpp/src/arrow/chunked_array.cc b/cpp/src/arrow/chunked_array.cc index c36b736d5d5df..dd6aa51534fcb 100644 --- a/cpp/src/arrow/chunked_array.cc +++ b/cpp/src/arrow/chunked_array.cc @@ -27,6 +27,7 @@ #include "arrow/array/array_nested.h" #include "arrow/array/util.h" #include "arrow/array/validate.h" +#include "arrow/device_allocation_type_set.h" #include "arrow/pretty_print.h" #include "arrow/status.h" #include "arrow/type.h" @@ -86,6 +87,18 @@ Result> ChunkedArray::MakeEmpty( return std::make_shared(std::move(new_chunks)); } +DeviceAllocationTypeSet ChunkedArray::device_types() const { + if (chunks_.empty()) { + // An empty ChunkedArray is considered to be CPU-only. + return DeviceAllocationTypeSet::CpuOnly(); + } + DeviceAllocationTypeSet set; + for (const auto& chunk : chunks_) { + set.add(chunk->device_type()); + } + return set; +} + bool ChunkedArray::Equals(const ChunkedArray& other, const EqualOptions& opts) const { if (length_ != other.length()) { return false; diff --git a/cpp/src/arrow/chunked_array.h b/cpp/src/arrow/chunked_array.h index 5d300861d85c2..c65b6cb6e227f 100644 --- a/cpp/src/arrow/chunked_array.h +++ b/cpp/src/arrow/chunked_array.h @@ -25,6 +25,7 @@ #include "arrow/chunk_resolver.h" #include "arrow/compare.h" +#include "arrow/device_allocation_type_set.h" #include "arrow/result.h" #include "arrow/status.h" #include "arrow/type_fwd.h" @@ -116,6 +117,13 @@ class ARROW_EXPORT ChunkedArray { /// \return an ArrayVector of chunks const ArrayVector& chunks() const { return chunks_; } + /// \return The set of device allocation types used by the chunks in this + /// chunked array. + DeviceAllocationTypeSet device_types() const; + + /// \return true if all chunks are allocated on CPU-accessible memory. + bool is_cpu() const { return device_types().is_cpu_only(); } + /// \brief Construct a zero-copy slice of the chunked array with the /// indicated offset and length /// diff --git a/cpp/src/arrow/chunked_array_test.cc b/cpp/src/arrow/chunked_array_test.cc index e9cc283b53cd5..bf9d4af7c7bb0 100644 --- a/cpp/src/arrow/chunked_array_test.cc +++ b/cpp/src/arrow/chunked_array_test.cc @@ -37,6 +37,7 @@ namespace arrow { using internal::ChunkLocation; using internal::ChunkResolver; +using internal::TypedChunkLocation; class TestChunkedArray : public ::testing::Test { protected: @@ -61,12 +62,17 @@ TEST_F(TestChunkedArray, Make) { ChunkedArray::Make({}, int64())); AssertTypeEqual(*int64(), *result->type()); ASSERT_EQ(result->num_chunks(), 0); + // Empty chunked arrays are treated as CPU-allocated. + ASSERT_TRUE(result->is_cpu()); auto chunk0 = ArrayFromJSON(int8(), "[0, 1, 2]"); auto chunk1 = ArrayFromJSON(int16(), "[3, 4, 5]"); ASSERT_OK_AND_ASSIGN(result, ChunkedArray::Make({chunk0, chunk0})); ASSERT_OK_AND_ASSIGN(auto result2, ChunkedArray::Make({chunk0, chunk0}, int8())); + // All chunks are CPU-accessible. + ASSERT_TRUE(result->is_cpu()); + ASSERT_TRUE(result2->is_cpu()); AssertChunkedEqual(*result, *result2); ASSERT_RAISES(TypeError, ChunkedArray::Make({chunk0, chunk1})); @@ -375,24 +381,26 @@ class TestChunkResolverMany : public ::testing::Test { Result> ResolveMany( const ChunkResolver& resolver, const std::vector& logical_index_vec) { const size_t n = logical_index_vec.size(); - std::vector chunk_index_vec; - chunk_index_vec.resize(n); - std::vector index_in_chunk_vec; - index_in_chunk_vec.resize(n); + std::vector> chunk_location_vec; + chunk_location_vec.resize(n); bool valid = resolver.ResolveMany( - static_cast(n), logical_index_vec.data(), chunk_index_vec.data(), 0, - index_in_chunk_vec.data()); + static_cast(n), logical_index_vec.data(), chunk_location_vec.data(), 0); if (ARROW_PREDICT_FALSE(!valid)) { return Status::Invalid("index type doesn't fit possible chunk indexes"); } - std::vector locations; - locations.reserve(n); - for (size_t i = 0; i < n; i++) { - auto chunk_index = static_cast(chunk_index_vec[i]); - auto index_in_chunk = static_cast(index_in_chunk_vec[i]); - locations.emplace_back(chunk_index, index_in_chunk); + if constexpr (std::is_same::value) { + return chunk_location_vec; + } else { + std::vector locations; + locations.reserve(n); + for (size_t i = 0; i < n; i++) { + auto loc = chunk_location_vec[i]; + auto chunk_index = static_cast(loc.chunk_index); + auto index_in_chunk = static_cast(loc.index_in_chunk); + locations.emplace_back(chunk_index, index_in_chunk); + } + return locations; } - return locations; } void CheckResolveMany(const ChunkResolver& resolver, diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index e1a2e8c5d8879..0478a3d1e801a 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -30,6 +30,7 @@ #include "arrow/compute/kernels/common_internal.h" #include "arrow/compute/registry.h" #include "arrow/datum.h" +#include "arrow/device_allocation_type_set.h" #include "arrow/util/cpu_info.h" #include "arrow/util/logging.h" #include "arrow/util/tracing_internal.h" diff --git a/cpp/src/arrow/compute/kernel.cc b/cpp/src/arrow/compute/kernel.cc index 5c87ef2cd0561..5e7461cc52d0e 100644 --- a/cpp/src/arrow/compute/kernel.cc +++ b/cpp/src/arrow/compute/kernel.cc @@ -24,6 +24,7 @@ #include "arrow/buffer.h" #include "arrow/compute/exec.h" +#include "arrow/device_allocation_type_set.h" #include "arrow/result.h" #include "arrow/type_traits.h" #include "arrow/util/bit_util.h" diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 1adb3e96c97c8..cfa1cd8193f36 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -31,6 +31,7 @@ #include "arrow/buffer.h" #include "arrow/compute/exec.h" #include "arrow/datum.h" +#include "arrow/device_allocation_type_set.h" #include "arrow/memory_pool.h" #include "arrow/result.h" #include "arrow/status.h" diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index 1fbcd6a249093..b545d8bcc1003 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -23,7 +23,9 @@ #include "arrow/util/cpu_info.h" #include "arrow/util/hashing.h" -#include +// Include templated definitions for aggregate kernels that must compiled here +// with the SIMD level configured for this compilation unit in the build. +#include "arrow/compute/kernels/aggregate_basic.inc.cc" // NOLINT(build/include) namespace arrow { namespace compute { @@ -276,11 +278,6 @@ struct SumImplDefault : public SumImpl { using SumImpl::SumImpl; }; -template -struct MeanImplDefault : public MeanImpl { - using MeanImpl::MeanImpl; -}; - Result> SumInit(KernelContext* ctx, const KernelInitArgs& args) { SumLikeInit visitor( @@ -289,6 +286,14 @@ Result> SumInit(KernelContext* ctx, return visitor.Create(); } +// ---------------------------------------------------------------------- +// Mean implementation + +template +struct MeanImplDefault : public MeanImpl { + using MeanImpl::MeanImpl; +}; + Result> MeanInit(KernelContext* ctx, const KernelInitArgs& args) { MeanKernelInit visitor( @@ -482,8 +487,8 @@ void AddFirstOrLastAggKernel(ScalarAggregateFunction* func, // ---------------------------------------------------------------------- // MinMax implementation -Result> MinMaxInit(KernelContext* ctx, - const KernelInitArgs& args) { +Result> MinMaxInitDefault(KernelContext* ctx, + const KernelInitArgs& args) { ARROW_ASSIGN_OR_RAISE(TypeHolder out_type, args.kernel->signature->out_type().Resolve(ctx, args.inputs)); MinMaxInitState visitor( @@ -532,13 +537,13 @@ struct BooleanAnyImpl : public ScalarAggregator { } if (batch[0].is_scalar()) { const Scalar& scalar = *batch[0].scalar; - this->has_nulls = !scalar.is_valid; - this->any = scalar.is_valid && checked_cast(scalar).value; - this->count += scalar.is_valid; + this->has_nulls |= !scalar.is_valid; + this->any |= scalar.is_valid && checked_cast(scalar).value; + this->count += scalar.is_valid * batch.length; return Status::OK(); } const ArraySpan& data = batch[0].array; - this->has_nulls = data.GetNullCount() > 0; + this->has_nulls |= data.GetNullCount() > 0; this->count += data.length - data.GetNullCount(); arrow::internal::OptionalBinaryBitBlockCounter counter( data.buffers[0].data, data.offset, data.buffers[1].data, data.offset, @@ -603,13 +608,13 @@ struct BooleanAllImpl : public ScalarAggregator { } if (batch[0].is_scalar()) { const Scalar& scalar = *batch[0].scalar; - this->has_nulls = !scalar.is_valid; - this->count += scalar.is_valid; - this->all = !scalar.is_valid || checked_cast(scalar).value; + this->has_nulls |= !scalar.is_valid; + this->count += scalar.is_valid * batch.length; + this->all &= !scalar.is_valid || checked_cast(scalar).value; return Status::OK(); } const ArraySpan& data = batch[0].array; - this->has_nulls = data.GetNullCount() > 0; + this->has_nulls |= data.GetNullCount() > 0; this->count += data.length - data.GetNullCount(); arrow::internal::OptionalBinaryBitBlockCounter counter( data.buffers[1].data, data.offset, data.buffers[0].data, data.offset, @@ -1114,14 +1119,14 @@ void RegisterScalarAggregateBasic(FunctionRegistry* registry) { // Add min max function func = std::make_shared("min_max", Arity::Unary(), min_max_doc, &default_scalar_aggregate_options); - AddMinMaxKernels(MinMaxInit, {null(), boolean()}, func.get()); - AddMinMaxKernels(MinMaxInit, NumericTypes(), func.get()); - AddMinMaxKernels(MinMaxInit, TemporalTypes(), func.get()); - AddMinMaxKernels(MinMaxInit, BaseBinaryTypes(), func.get()); - AddMinMaxKernel(MinMaxInit, Type::FIXED_SIZE_BINARY, func.get()); - AddMinMaxKernel(MinMaxInit, Type::INTERVAL_MONTHS, func.get()); - AddMinMaxKernel(MinMaxInit, Type::DECIMAL128, func.get()); - AddMinMaxKernel(MinMaxInit, Type::DECIMAL256, func.get()); + AddMinMaxKernels(MinMaxInitDefault, {null(), boolean()}, func.get()); + AddMinMaxKernels(MinMaxInitDefault, NumericTypes(), func.get()); + AddMinMaxKernels(MinMaxInitDefault, TemporalTypes(), func.get()); + AddMinMaxKernels(MinMaxInitDefault, BaseBinaryTypes(), func.get()); + AddMinMaxKernel(MinMaxInitDefault, Type::FIXED_SIZE_BINARY, func.get()); + AddMinMaxKernel(MinMaxInitDefault, Type::INTERVAL_MONTHS, func.get()); + AddMinMaxKernel(MinMaxInitDefault, Type::DECIMAL128, func.get()); + AddMinMaxKernel(MinMaxInitDefault, Type::DECIMAL256, func.get()); // Add the SIMD variants for min max #if defined(ARROW_HAVE_RUNTIME_AVX2) if (cpu_info->IsSupported(arrow::internal::CpuInfo::AVX2)) { diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc new file mode 100644 index 0000000000000..f2151e0a9e029 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.inc.cc @@ -0,0 +1,1025 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// .inc.cc file to be included in compilation unit where kernels are meant to be +// compiled auto-vectorized by the compiler with different SIMD levels passed +// as compiler flags. +// +// It contains no includes to avoid double inclusion in the compilation unit +// that includes this .inc.cc file. + +#include +#include +#include +#include +#include + +#include "arrow/compute/api_aggregate.h" +#include "arrow/compute/kernels/aggregate_internal.h" +#include "arrow/compute/kernels/codegen_internal.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/align_util.h" +#include "arrow/util/bit_block_counter.h" +#include "arrow/util/decimal.h" + +namespace arrow::compute::internal { +namespace { + +// ---------------------------------------------------------------------- +// Sum implementation + +template ::Type> +struct SumImpl : public ScalarAggregator { + using ThisType = SumImpl; + using CType = typename TypeTraits::CType; + using SumType = ResultType; + using SumCType = typename TypeTraits::CType; + using OutputType = typename TypeTraits::ScalarType; + + SumImpl(std::shared_ptr out_type, ScalarAggregateOptions options_) + : out_type(std::move(out_type)), options(std::move(options_)) {} + + Status Consume(KernelContext*, const ExecSpan& batch) override { + if (batch[0].is_array()) { + const ArraySpan& data = batch[0].array; + this->count += data.length - data.GetNullCount(); + this->nulls_observed = this->nulls_observed || data.GetNullCount(); + + if (!options.skip_nulls && this->nulls_observed) { + // Short-circuit + return Status::OK(); + } + + if (is_boolean_type::value) { + this->sum += GetTrueCount(data); + } else { + this->sum += SumArray(data); + } + } else { + const Scalar& data = *batch[0].scalar; + this->count += data.is_valid * batch.length; + this->nulls_observed = this->nulls_observed || !data.is_valid; + if (data.is_valid) { + this->sum += internal::UnboxScalar::Unbox(data) * batch.length; + } + } + return Status::OK(); + } + + Status MergeFrom(KernelContext*, KernelState&& src) override { + const auto& other = checked_cast(src); + this->count += other.count; + this->sum += other.sum; + this->nulls_observed = this->nulls_observed || other.nulls_observed; + return Status::OK(); + } + + Status Finalize(KernelContext*, Datum* out) override { + if ((!options.skip_nulls && this->nulls_observed) || + (this->count < options.min_count)) { + out->value = std::make_shared(out_type); + } else { + out->value = std::make_shared(this->sum, out_type); + } + return Status::OK(); + } + + size_t count = 0; + bool nulls_observed = false; + SumCType sum = 0; + std::shared_ptr out_type; + ScalarAggregateOptions options; +}; + +template +struct NullImpl : public ScalarAggregator { + using ScalarType = typename TypeTraits::ScalarType; + + explicit NullImpl(const ScalarAggregateOptions& options_) : options(options_) {} + + Status Consume(KernelContext*, const ExecSpan& batch) override { + if (batch[0].is_scalar() || batch[0].array.GetNullCount() > 0) { + // If the batch is a scalar or an array with elements, set is_empty to false + is_empty = false; + } + return Status::OK(); + } + + Status MergeFrom(KernelContext*, KernelState&& src) override { + const auto& other = checked_cast(src); + this->is_empty &= other.is_empty; + return Status::OK(); + } + + Status Finalize(KernelContext*, Datum* out) override { + if ((options.skip_nulls || this->is_empty) && options.min_count == 0) { + // Return 0 if the remaining data is empty + out->value = output_empty(); + } else { + out->value = MakeNullScalar(TypeTraits::type_singleton()); + } + return Status::OK(); + } + + virtual std::shared_ptr output_empty() = 0; + + bool is_empty = true; + ScalarAggregateOptions options; +}; + +template +struct NullSumImpl : public NullImpl { + using ScalarType = typename TypeTraits::ScalarType; + + explicit NullSumImpl(const ScalarAggregateOptions& options_) + : NullImpl(options_) {} + + std::shared_ptr output_empty() override { + return std::make_shared(0); + } +}; + +template