diff --git a/ci/build_wheel_cudf.sh b/ci/build_wheel_cudf.sh index fef4416a366..ae4eb0d5c66 100755 --- a/ci/build_wheel_cudf.sh +++ b/ci/build_wheel_cudf.sh @@ -23,6 +23,7 @@ export PIP_CONSTRAINT="/tmp/constraints.txt" python -m auditwheel repair \ --exclude libcudf.so \ --exclude libnvcomp.so \ + --exclude libkvikio.so \ -w ${package_dir}/final_dist \ ${package_dir}/dist/* diff --git a/ci/build_wheel_libcudf.sh b/ci/build_wheel_libcudf.sh index b3d6778ea04..aabd3814a24 100755 --- a/ci/build_wheel_libcudf.sh +++ b/ci/build_wheel_libcudf.sh @@ -33,6 +33,7 @@ RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})" mkdir -p ${package_dir}/final_dist python -m auditwheel repair \ --exclude libnvcomp.so.4 \ + --exclude libkvikio.so \ -w ${package_dir}/final_dist \ ${package_dir}/dist/* diff --git a/ci/build_wheel_pylibcudf.sh b/ci/build_wheel_pylibcudf.sh index 839d98846fe..c4a89f20f5f 100755 --- a/ci/build_wheel_pylibcudf.sh +++ b/ci/build_wheel_pylibcudf.sh @@ -21,6 +21,7 @@ export PIP_CONSTRAINT="/tmp/constraints.txt" python -m auditwheel repair \ --exclude libcudf.so \ --exclude libnvcomp.so \ + --exclude libkvikio.so \ -w ${package_dir}/final_dist \ ${package_dir}/dist/* diff --git a/conda/environments/all_cuda-118_arch-x86_64.yaml b/conda/environments/all_cuda-118_arch-x86_64.yaml index 9d9fec97731..ace55a15c09 100644 --- a/conda/environments/all_cuda-118_arch-x86_64.yaml +++ b/conda/environments/all_cuda-118_arch-x86_64.yaml @@ -19,7 +19,7 @@ dependencies: - cramjam - cubinlinker - cuda-nvtx=11.8 -- cuda-python>=11.7.1,<12.0a0 +- cuda-python>=11.7.1,<12.0a0,!=11.8.4 - cuda-sanitizer-api=11.8.86 - cuda-version=11.8 - cudatoolkit diff --git a/conda/environments/all_cuda-125_arch-x86_64.yaml b/conda/environments/all_cuda-125_arch-x86_64.yaml index 19e3eafd641..d20db44497e 100644 --- a/conda/environments/all_cuda-125_arch-x86_64.yaml +++ b/conda/environments/all_cuda-125_arch-x86_64.yaml @@ -21,7 +21,7 @@ dependencies: - cuda-nvcc - cuda-nvrtc-dev - cuda-nvtx-dev -- cuda-python>=12.0,<13.0a0 +- cuda-python>=12.0,<13.0a0,!=12.6.1 - cuda-sanitizer-api - cuda-version=12.5 - cupy>=12.0.0 diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index 2c254415318..6debcb281b1 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -91,7 +91,7 @@ requirements: - cudatoolkit - ptxcompiler >=0.7.0 - cubinlinker # CUDA enhanced compatibility. - - cuda-python >=11.7.1,<12.0a0 + - cuda-python >=11.7.1,<12.0a0,!=11.8.4 {% else %} - cuda-cudart - libcufile # [linux64] @@ -100,7 +100,7 @@ requirements: # TODO: Add nvjitlink here # xref: https://github.com/rapidsai/cudf/issues/12822 - cuda-nvrtc - - cuda-python >=12.0,<13.0a0 + - cuda-python >=12.0,<13.0a0,!=12.6.1 - pynvjitlink {% endif %} - {{ pin_compatible('cuda-version', max_pin='x', min_pin='x') }} diff --git a/conda/recipes/pylibcudf/meta.yaml b/conda/recipes/pylibcudf/meta.yaml index 3d965f30986..92ca495f972 100644 --- a/conda/recipes/pylibcudf/meta.yaml +++ b/conda/recipes/pylibcudf/meta.yaml @@ -83,9 +83,9 @@ requirements: - {{ pin_compatible('rmm', max_pin='x.x') }} - fsspec >=0.6.0 {% if cuda_major == "11" %} - - cuda-python >=11.7.1,<12.0a0 + - cuda-python >=11.7.1,<12.0a0,!=11.8.4 {% else %} - - cuda-python >=12.0,<13.0a0 + - cuda-python >=12.0,<13.0a0,!=12.6.1 {% endif %} - nvtx >=0.2.1 - packaging diff --git a/cpp/cmake/thirdparty/get_kvikio.cmake b/cpp/cmake/thirdparty/get_kvikio.cmake index 20712beec41..c949f48505e 100644 --- a/cpp/cmake/thirdparty/get_kvikio.cmake +++ b/cpp/cmake/thirdparty/get_kvikio.cmake @@ -1,5 +1,5 @@ # ============================================================================= -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except # in compliance with the License. You may obtain a copy of the License at @@ -16,7 +16,7 @@ function(find_and_configure_kvikio VERSION) rapids_cpm_find( - KvikIO ${VERSION} + kvikio ${VERSION} GLOBAL_TARGETS kvikio::kvikio CPM_ARGS GIT_REPOSITORY https://github.com/rapidsai/kvikio.git diff --git a/cpp/include/cudf/datetime.hpp b/cpp/include/cudf/datetime.hpp index 1eaea5b6374..1f6e86d0389 100644 --- a/cpp/include/cudf/datetime.hpp +++ b/cpp/include/cudf/datetime.hpp @@ -58,6 +58,8 @@ enum class datetime_component : uint8_t { * @brief Extracts year from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -65,7 +67,7 @@ enum class datetime_component : uint8_t { * @returns cudf::column of the extracted int16_t years * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_year( +[[deprecated]] std::unique_ptr extract_year( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -74,6 +76,8 @@ std::unique_ptr extract_year( * @brief Extracts month from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -81,7 +85,7 @@ std::unique_ptr extract_year( * @returns cudf::column of the extracted int16_t months * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_month( +[[deprecated]] std::unique_ptr extract_month( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -90,6 +94,8 @@ std::unique_ptr extract_month( * @brief Extracts day from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -97,7 +103,7 @@ std::unique_ptr extract_month( * @returns cudf::column of the extracted int16_t days * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_day( +[[deprecated]] std::unique_ptr extract_day( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -106,6 +112,8 @@ std::unique_ptr extract_day( * @brief Extracts a weekday from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -113,7 +121,7 @@ std::unique_ptr extract_day( * @returns cudf::column of the extracted int16_t days * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_weekday( +[[deprecated]] std::unique_ptr extract_weekday( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -122,6 +130,8 @@ std::unique_ptr extract_weekday( * @brief Extracts hour from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -129,7 +139,7 @@ std::unique_ptr extract_weekday( * @returns cudf::column of the extracted int16_t hours * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_hour( +[[deprecated]] std::unique_ptr extract_hour( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -138,6 +148,8 @@ std::unique_ptr extract_hour( * @brief Extracts minute from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -145,7 +157,7 @@ std::unique_ptr extract_hour( * @returns cudf::column of the extracted int16_t minutes * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_minute( +[[deprecated]] std::unique_ptr extract_minute( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -154,6 +166,8 @@ std::unique_ptr extract_minute( * @brief Extracts second from any datetime type and returns an int16_t * cudf::column. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -161,7 +175,7 @@ std::unique_ptr extract_minute( * @returns cudf::column of the extracted int16_t seconds * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_second( +[[deprecated]] std::unique_ptr extract_second( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -173,6 +187,8 @@ std::unique_ptr extract_second( * A millisecond fraction is only the 3 digits that make up the millisecond portion of a duration. * For example, the millisecond fraction of 1.234567890 seconds is 234. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -180,7 +196,7 @@ std::unique_ptr extract_second( * @returns cudf::column of the extracted int16_t milliseconds * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_millisecond_fraction( +[[deprecated]] std::unique_ptr extract_millisecond_fraction( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -192,6 +208,8 @@ std::unique_ptr extract_millisecond_fraction( * A microsecond fraction is only the 3 digits that make up the microsecond portion of a duration. * For example, the microsecond fraction of 1.234567890 seconds is 567. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -199,7 +217,7 @@ std::unique_ptr extract_millisecond_fraction( * @returns cudf::column of the extracted int16_t microseconds * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_microsecond_fraction( +[[deprecated]] std::unique_ptr extract_microsecond_fraction( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); @@ -211,6 +229,8 @@ std::unique_ptr extract_microsecond_fraction( * A nanosecond fraction is only the 3 digits that make up the nanosecond portion of a duration. * For example, the nanosecond fraction of 1.234567890 seconds is 890. * + * @deprecated Deprecated in 24.12, to be removed in 25.02 + * * @param column cudf::column_view of the input datetime values * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the returned column @@ -218,7 +238,7 @@ std::unique_ptr extract_microsecond_fraction( * @returns cudf::column of the extracted int16_t nanoseconds * @throw cudf::logic_error if input column datatype is not TIMESTAMP */ -std::unique_ptr extract_nanosecond_fraction( +[[deprecated]] std::unique_ptr extract_nanosecond_fraction( cudf::column_view const& column, rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); diff --git a/cpp/tests/datetime/datetime_ops_test.cpp b/cpp/tests/datetime/datetime_ops_test.cpp index 44f99adc0e9..1d1deb42a51 100644 --- a/cpp/tests/datetime/datetime_ops_test.cpp +++ b/cpp/tests/datetime/datetime_ops_test.cpp @@ -52,16 +52,26 @@ TYPED_TEST(NonTimestampTest, TestThrowsOnNonTimestamp) cudf::data_type dtype{cudf::type_to_id()}; cudf::column col{dtype, 0, rmm::device_buffer{}, rmm::device_buffer{}, 0}; - EXPECT_THROW(extract_year(col), cudf::logic_error); - EXPECT_THROW(extract_month(col), cudf::logic_error); - EXPECT_THROW(extract_day(col), cudf::logic_error); - EXPECT_THROW(extract_weekday(col), cudf::logic_error); - EXPECT_THROW(extract_hour(col), cudf::logic_error); - EXPECT_THROW(extract_minute(col), cudf::logic_error); - EXPECT_THROW(extract_second(col), cudf::logic_error); - EXPECT_THROW(extract_millisecond_fraction(col), cudf::logic_error); - EXPECT_THROW(extract_microsecond_fraction(col), cudf::logic_error); - EXPECT_THROW(extract_nanosecond_fraction(col), cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::YEAR), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::MONTH), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::DAY), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::WEEKDAY), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::HOUR), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::MINUTE), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::SECOND), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::MILLISECOND), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::MICROSECOND), + cudf::logic_error); + EXPECT_THROW(extract_datetime_component(col, cudf::datetime::datetime_component::NANOSECOND), + cudf::logic_error); EXPECT_THROW(last_day_of_month(col), cudf::logic_error); EXPECT_THROW(day_of_year(col), cudf::logic_error); EXPECT_THROW(add_calendrical_months(col, *cudf::make_empty_column(cudf::type_id::INT16)), @@ -104,96 +114,6 @@ TEST_F(BasicDatetimeOpsTest, TestExtractingDatetimeComponents) 987234623 // 1970-01-01 00:00:00.987234623 GMT }; - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps_D), - fixed_width_column_wrapper{1965, 2018, 2023}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps_s), - fixed_width_column_wrapper{1965, 2018, 2023}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps_ms), - fixed_width_column_wrapper{1965, 2018, 2023}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps_ns), - fixed_width_column_wrapper{1969, 1970, 1970}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps_D), - fixed_width_column_wrapper{10, 7, 1}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps_s), - fixed_width_column_wrapper{10, 7, 1}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps_ms), - fixed_width_column_wrapper{10, 7, 1}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps_ns), - fixed_width_column_wrapper{12, 1, 1}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps_D), - fixed_width_column_wrapper{26, 4, 25}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps_s), - fixed_width_column_wrapper{26, 4, 25}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps_ms), - fixed_width_column_wrapper{26, 4, 25}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps_ns), - fixed_width_column_wrapper{31, 1, 1}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps_D), - fixed_width_column_wrapper{2, 3, 3}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps_s), - fixed_width_column_wrapper{2, 3, 3}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps_ms), - fixed_width_column_wrapper{2, 3, 3}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps_ms), - fixed_width_column_wrapper{2, 3, 3}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps_D), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps_s), - fixed_width_column_wrapper{14, 12, 7}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps_ms), - fixed_width_column_wrapper{14, 12, 7}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps_ns), - fixed_width_column_wrapper{23, 0, 0}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps_D), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps_s), - fixed_width_column_wrapper{1, 0, 32}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps_ms), - fixed_width_column_wrapper{1, 0, 32}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps_ns), - fixed_width_column_wrapper{59, 0, 0}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_second(timestamps_D), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_second(timestamps_s), - fixed_width_column_wrapper{12, 0, 12}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_second(timestamps_ms), - fixed_width_column_wrapper{12, 0, 12}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps_ns), - fixed_width_column_wrapper{59, 0, 0}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_millisecond_fraction(timestamps_D), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_millisecond_fraction(timestamps_s), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_millisecond_fraction(timestamps_ms), - fixed_width_column_wrapper{762, 0, 929}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_millisecond_fraction(timestamps_ns), - fixed_width_column_wrapper{976, 23, 987}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_microsecond_fraction(timestamps_D), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_microsecond_fraction(timestamps_s), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_microsecond_fraction(timestamps_ms), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_microsecond_fraction(timestamps_ns), - fixed_width_column_wrapper{675, 432, 234}); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_nanosecond_fraction(timestamps_D), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_nanosecond_fraction(timestamps_s), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_nanosecond_fraction(timestamps_ms), - fixed_width_column_wrapper{0, 0, 0}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_nanosecond_fraction(timestamps_ns), - fixed_width_column_wrapper{766, 424, 623}); - CUDF_TEST_EXPECT_COLUMNS_EQUAL( *extract_datetime_component(timestamps_D, cudf::datetime::datetime_component::YEAR), fixed_width_column_wrapper{1965, 2018, 2023}); @@ -346,16 +266,29 @@ TYPED_TEST(TypedDatetimeOpsTest, TestEmptyColumns) cudf::column int16s{int16s_dtype, 0, rmm::device_buffer{}, rmm::device_buffer{}, 0}; cudf::column timestamps{timestamps_dtype, 0, rmm::device_buffer{}, rmm::device_buffer{}, 0}; - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_second(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_millisecond_fraction(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_microsecond_fraction(timestamps), int16s); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_nanosecond_fraction(timestamps), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::YEAR), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MONTH), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::DAY), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::WEEKDAY), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::HOUR), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MINUTE), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::SECOND), int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MILLISECOND), + int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MICROSECOND), + int16s); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::NANOSECOND), + int16s); } TYPED_TEST(TypedDatetimeOpsTest, TestExtractingGeneratedDatetimeComponents) @@ -385,13 +318,27 @@ TYPED_TEST(TypedDatetimeOpsTest, TestExtractingGeneratedDatetimeComponents) expected_seconds = fixed_width_column_wrapper{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; } - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps), expected_years); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps), expected_months); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps), expected_days); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps), expected_weekdays); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps), expected_hours); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps), expected_minutes); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_second(timestamps), expected_seconds); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::YEAR), + expected_years); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MONTH), + expected_months); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::DAY), + expected_days); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::WEEKDAY), + expected_weekdays); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::HOUR), + expected_hours); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MINUTE), + expected_minutes); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::SECOND), + expected_seconds); } TYPED_TEST(TypedDatetimeOpsTest, TestExtractingGeneratedNullableDatetimeComponents) @@ -441,13 +388,27 @@ TYPED_TEST(TypedDatetimeOpsTest, TestExtractingGeneratedNullableDatetimeComponen {true, false, true, false, true, false, true, false, true, false}}; } - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_year(timestamps), expected_years); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_month(timestamps), expected_months); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_day(timestamps), expected_days); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_weekday(timestamps), expected_weekdays); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_hour(timestamps), expected_hours); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_minute(timestamps), expected_minutes); - CUDF_TEST_EXPECT_COLUMNS_EQUAL(*extract_second(timestamps), expected_seconds); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::YEAR), + expected_years); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MONTH), + expected_months); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::DAY), + expected_days); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::WEEKDAY), + expected_weekdays); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::HOUR), + expected_hours); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::MINUTE), + expected_minutes); + CUDF_TEST_EXPECT_COLUMNS_EQUAL( + *extract_datetime_component(timestamps, cudf::datetime::datetime_component::SECOND), + expected_seconds); } TEST_F(BasicDatetimeOpsTest, TestLastDayOfMonthWithSeconds) diff --git a/cpp/tests/streams/datetime_test.cpp b/cpp/tests/streams/datetime_test.cpp index 82629156fa6..29b302c3637 100644 --- a/cpp/tests/streams/datetime_test.cpp +++ b/cpp/tests/streams/datetime_test.cpp @@ -35,52 +35,62 @@ class DatetimeTest : public cudf::test::BaseFixture { TEST_F(DatetimeTest, ExtractYear) { - cudf::datetime::extract_year(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::YEAR, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractMonth) { - cudf::datetime::extract_month(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::MONTH, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractDay) { - cudf::datetime::extract_day(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::DAY, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractWeekday) { - cudf::datetime::extract_weekday(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::WEEKDAY, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractHour) { - cudf::datetime::extract_hour(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::HOUR, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractMinute) { - cudf::datetime::extract_minute(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::MINUTE, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractSecond) { - cudf::datetime::extract_second(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::SECOND, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractMillisecondFraction) { - cudf::datetime::extract_millisecond_fraction(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::MILLISECOND, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractMicrosecondFraction) { - cudf::datetime::extract_microsecond_fraction(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::MICROSECOND, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, ExtractNanosecondFraction) { - cudf::datetime::extract_nanosecond_fraction(timestamps, cudf::test::get_default_stream()); + cudf::datetime::extract_datetime_component( + timestamps, cudf::datetime::datetime_component::NANOSECOND, cudf::test::get_default_stream()); } TEST_F(DatetimeTest, LastDayOfMonth) diff --git a/dependencies.yaml b/dependencies.yaml index 90255ca674c..41ac6ce1808 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -177,6 +177,7 @@ files: extras: table: project includes: + - depends_on_libkvikio - depends_on_nvcomp py_build_pylibcudf: output: pyproject @@ -658,10 +659,10 @@ dependencies: matrices: - matrix: {cuda: "12.*"} packages: - - cuda-python>=12.0,<13.0a0 + - cuda-python>=12.0,<13.0a0,!=12.6.1 - matrix: {cuda: "11.*"} packages: &run_pylibcudf_packages_all_cu11 - - cuda-python>=11.7.1,<12.0a0 + - cuda-python>=11.7.1,<12.0a0,!=11.8.4 - {matrix: null, packages: *run_pylibcudf_packages_all_cu11} run_cudf: common: @@ -684,10 +685,10 @@ dependencies: matrices: - matrix: {cuda: "12.*"} packages: - - cuda-python>=12.0,<13.0a0 + - cuda-python>=12.0,<13.0a0,!=12.6.1 - matrix: {cuda: "11.*"} packages: &run_cudf_packages_all_cu11 - - cuda-python>=11.7.1,<12.0a0 + - cuda-python>=11.7.1,<12.0a0,!=11.8.4 - {matrix: null, packages: *run_cudf_packages_all_cu11} - output_types: conda matrices: diff --git a/python/cudf/pyproject.toml b/python/cudf/pyproject.toml index b6105c17b3e..53f22a11e6b 100644 --- a/python/cudf/pyproject.toml +++ b/python/cudf/pyproject.toml @@ -20,7 +20,7 @@ requires-python = ">=3.10" dependencies = [ "cachetools", "cubinlinker", - "cuda-python>=11.7.1,<12.0a0", + "cuda-python>=11.7.1,<12.0a0,!=11.8.4", "cupy-cuda11x>=12.0.0", "fsspec>=0.6.0", "libcudf==24.12.*,>=0.0.0a0", @@ -90,6 +90,8 @@ filterwarnings = [ "error", "ignore:::.*xdist.*", "ignore:::.*pytest.*", + # https://github.com/rapidsai/build-planning/issues/116 + "ignore:.*cuda..* module is deprecated.*:DeprecationWarning", # some third-party dependencies (e.g. 'boto3') still using datetime.datetime.utcnow() "ignore:.*datetime.*utcnow.*scheduled for removal.*:DeprecationWarning:botocore", # Deprecation warning from Pyarrow Table.to_pandas() with pandas-2.2+ diff --git a/python/cudf_kafka/pyproject.toml b/python/cudf_kafka/pyproject.toml index 667cd7b1db8..ec0bc0eb22b 100644 --- a/python/cudf_kafka/pyproject.toml +++ b/python/cudf_kafka/pyproject.toml @@ -51,7 +51,9 @@ rapids = ["rmm", "cudf", "dask_cudf"] addopts = "--tb=native --strict-config --strict-markers" empty_parameter_set_mark = "fail_at_collect" filterwarnings = [ - "error" + "error", + # https://github.com/rapidsai/build-planning/issues/116 + "ignore:.*cuda..* module is deprecated.*:DeprecationWarning", ] xfail_strict = true diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 04aa74024cd..a242ff9300f 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -127,9 +127,12 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column class IR(Node["IR"]): """Abstract plan node, representing an unevaluated dataframe.""" - __slots__ = ("schema",) + __slots__ = ("schema", "_non_child_args") # This annotation is needed because of https://github.com/python/mypy/issues/17981 _non_child: ClassVar[tuple[str, ...]] = ("schema",) + # Concrete classes should set this up with the arguments that will + # be passed to do_evaluate. + _non_child_args: tuple[Any, ...] schema: Schema """Mapping from column names to their data types.""" @@ -146,9 +149,37 @@ def get_hashable(self) -> Hashable: schema_hash = tuple(self.schema.items()) return (type(self), schema_hash, args) + # Hacky to avoid type-checking issues, just advertise the + # signature. Both mypy and pyright complain if we have an abstract + # method that takes arbitrary *args, but the subclasses have + # tighter signatures. This complaint is correct because the + # subclass is not Liskov-substitutable for the superclass. + # However, we know do_evaluate will only be called with the + # correct arguments by "construction". + do_evaluate: Callable[..., DataFrame] + """ + Evaluate the node (given its evaluated children), and return a dataframe. + + Parameters + ---------- + args + Non child arguments followed by any evaluated dataframe inputs. + + Returns + ------- + DataFrame (on device) representing the evaluation of this plan + node. + + Raises + ------ + NotImplementedError + If evaluation fails. Ideally this should not occur, since the + translation phase should fail earlier. + """ + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """ - Evaluate the node and return a dataframe. + Evaluate the node (recursively) and return a dataframe. Parameters ---------- @@ -156,21 +187,27 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: Mapping from cached node ids to constructed DataFrames. Used to implement evaluation of the `Cache` node. + Notes + ----- + Prefer not to override this method. Instead implement + :meth:`do_evaluate` which doesn't encode a recursion scheme + and just assumes already evaluated inputs. + Returns ------- DataFrame (on device) representing the evaluation of this plan - node. + node (and its children). Raises ------ NotImplementedError - If we couldn't evaluate things. Ideally this should not occur, - since the translation phase should pick up things that we - cannot handle. + If evaluation fails. Ideally this should not occur, since the + translation phase should fail earlier. """ - raise NotImplementedError( - f"Evaluation of plan {type(self).__name__}" - ) # pragma: no cover + return self.do_evaluate( + *self._non_child_args, + *(child.evaluate(cache=cache) for child in self.children), + ) class PythonScan(IR): @@ -187,6 +224,7 @@ def __init__(self, schema: Schema, options: Any, predicate: expr.NamedExpr | Non self.schema = schema self.options = options self.predicate = predicate + self._non_child_args = (schema, options, predicate) self.children = () raise NotImplementedError("PythonScan not implemented") @@ -259,6 +297,17 @@ def __init__( self.n_rows = n_rows self.row_index = row_index self.predicate = predicate + self._non_child_args = ( + schema, + typ, + reader_options, + paths, + with_columns, + skip_rows, + n_rows, + row_index, + predicate, + ) self.children = () if self.typ not in ("csv", "parquet", "ndjson"): # pragma: no cover # This line is unhittable ATM since IPC/Anonymous scan raise @@ -341,19 +390,28 @@ def get_hashable(self) -> Hashable: self.predicate, ) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + schema: Schema, + typ: str, + reader_options: dict[str, Any], + paths: list[str], + with_columns: list[str] | None, + skip_rows: int, + n_rows: int, + row_index: tuple[str, int] | None, + predicate: expr.NamedExpr | None, + ): """Evaluate and return a dataframe.""" - with_columns = self.with_columns - row_index = self.row_index - n_rows = self.n_rows - if self.typ == "csv": - parse_options = self.reader_options["parse_options"] + if typ == "csv": + parse_options = reader_options["parse_options"] sep = chr(parse_options["separator"]) quote = chr(parse_options["quote_char"]) eol = chr(parse_options["eol_char"]) - if self.reader_options["schema"] is not None: + if reader_options["schema"] is not None: # Reader schema provides names - column_names = list(self.reader_options["schema"]["fields"].keys()) + column_names = list(reader_options["schema"]["fields"].keys()) else: # file provides column names column_names = None @@ -380,8 +438,8 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: # polars skips blank lines at the beginning of the file pieces = [] read_partial = n_rows != -1 - for p in self.paths: - skiprows = self.reader_options["skip_rows"] + for p in paths: + skiprows = reader_options["skip_rows"] path = Path(p) with path.open() as f: while f.readline() == "\n": @@ -400,7 +458,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: skiprows=skiprows, comment=comment, decimal=decimal, - dtypes=self.schema, + dtypes=schema, nrows=n_rows, ) pieces.append(tbl_w_meta) @@ -419,17 +477,17 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: plc.concatenate.concatenate(list(tables)), colnames[0], ) - elif self.typ == "parquet": + elif typ == "parquet": filters = None - if self.predicate is not None and self.row_index is None: + if predicate is not None and row_index is None: # Can't apply filters during read if we have a row index. - filters = to_parquet_filter(self.predicate.value) + filters = to_parquet_filter(predicate.value) tbl_w_meta = plc.io.parquet.read_parquet( - plc.io.SourceInfo(self.paths), + plc.io.SourceInfo(paths), columns=with_columns, filters=filters, nrows=n_rows, - skip_rows=self.skip_rows, + skip_rows=skip_rows, ) df = DataFrame.from_table( tbl_w_meta.tbl, @@ -439,12 +497,12 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: if filters is not None: # Mask must have been applied. return df - elif self.typ == "ndjson": + elif typ == "ndjson": json_schema: list[tuple[str, str, list]] = [ - (name, typ, []) for name, typ in self.schema.items() + (name, typ, []) for name, typ in schema.items() ] plc_tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo(self.paths), + plc.io.SourceInfo(paths), lines=True, dtypes=json_schema, prune_columns=True, @@ -454,20 +512,17 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: df = DataFrame.from_table( plc_tbl_w_meta.tbl, plc_tbl_w_meta.column_names(include_children=False) ) - col_order = list(self.schema.keys()) - # TODO: remove condition when dropping support for polars 1.0 - # https://github.com/pola-rs/polars/pull/17363 - if row_index is not None and row_index[0] in self.schema: + col_order = list(schema.keys()) + if row_index is not None: col_order.remove(row_index[0]) - if col_order is not None: - df = df.select(col_order) + df = df.select(col_order) else: raise NotImplementedError( - f"Unhandled scan type: {self.typ}" + f"Unhandled scan type: {typ}" ) # pragma: no cover; post init trips first if row_index is not None: name, offset = row_index - dtype = self.schema[name] + dtype = schema[name] step = plc.interop.from_arrow( pa.scalar(1, type=plc.interop.to_arrow(dtype)) ) @@ -482,13 +537,11 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: name=name, ) df = DataFrame([index, *df.columns]) - assert all( - c.obj.type() == self.schema[name] for name, c in df.column_map.items() - ) - if self.predicate is None: + assert all(c.obj.type() == schema[name] for name, c in df.column_map.items()) + if predicate is None: return df else: - (mask,) = broadcast(self.predicate.evaluate(df), target_length=df.num_rows) + (mask,) = broadcast(predicate.evaluate(df), target_length=df.num_rows) return df.filter(mask) @@ -508,9 +561,21 @@ def __init__(self, schema: Schema, key: int, value: IR): self.schema = schema self.key = key self.children = (value,) + self._non_child_args = (key,) + + @classmethod + def do_evaluate( + cls, key: int, df: DataFrame + ) -> DataFrame: # pragma: no cover; basic evaluation never calls this + """Evaluate and return a dataframe.""" + # Our value has already been computed for us, so let's just + # return it. + return df def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" + # We must override the recursion scheme because we don't want + # to recurse if we're in the cache. try: return cache[self.key] except KeyError: @@ -545,6 +610,7 @@ def __init__( self.df = df self.projection = tuple(projection) if projection is not None else None self.predicate = predicate + self._non_child_args = (schema, df, self.projection, predicate) self.children = () def get_hashable(self) -> Hashable: @@ -557,18 +623,25 @@ def get_hashable(self) -> Hashable: schema_hash = tuple(self.schema.items()) return (type(self), schema_hash, id(self.df), self.projection, self.predicate) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + schema: Schema, + df: Any, + projection: tuple[str, ...] | None, + predicate: expr.NamedExpr | None, + ) -> DataFrame: """Evaluate and return a dataframe.""" - pdf = pl.DataFrame._from_pydf(self.df) - if self.projection is not None: - pdf = pdf.select(self.projection) + pdf = pl.DataFrame._from_pydf(df) + if projection is not None: + pdf = pdf.select(projection) df = DataFrame.from_polars(pdf) assert all( c.obj.type() == dtype - for c, dtype in zip(df.columns, self.schema.values(), strict=True) + for c, dtype in zip(df.columns, schema.values(), strict=True) ) - if self.predicate is not None: - (mask,) = broadcast(self.predicate.evaluate(df), target_length=df.num_rows) + if predicate is not None: + (mask,) = broadcast(predicate.evaluate(df), target_length=df.num_rows) return df.filter(mask) else: return df @@ -595,14 +668,19 @@ def __init__( self.exprs = tuple(exprs) self.should_broadcast = should_broadcast self.children = (df,) + self._non_child_args = (self.exprs, should_broadcast) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + exprs: tuple[expr.NamedExpr, ...], + should_broadcast: bool, # noqa: FBT001 + df: DataFrame, + ) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) # Handle any broadcasting - columns = [e.evaluate(df) for e in self.exprs] - if self.should_broadcast: + columns = [e.evaluate(df) for e in exprs] + if should_broadcast: columns = broadcast(*columns) return DataFrame(columns) @@ -625,14 +703,14 @@ def __init__( self.schema = schema self.exprs = tuple(exprs) self.children = (df,) + self._non_child_args = (self.exprs,) - def evaluate( - self, *, cache: MutableMapping[int, DataFrame] - ) -> DataFrame: # pragma: no cover; polars doesn't emit this node yet + @classmethod + def do_evaluate( + cls, exprs: tuple[expr.NamedExpr, ...], df: DataFrame + ) -> DataFrame: # pragma: no cover; not exposed by polars yet """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - columns = broadcast(*(e.evaluate(df) for e in self.exprs)) + columns = broadcast(*(e.evaluate(df) for e in exprs)) assert all(column.obj.size() == 1 for column in columns) return DataFrame(columns) @@ -681,6 +759,13 @@ def __init__( if any(GroupBy.check_agg(a.value) > 1 for a in self.agg_requests): raise NotImplementedError("Nested aggregations in groupby") self.agg_infos = [req.collect_agg(depth=0) for req in self.agg_requests] + self._non_child_args = ( + self.keys, + self.agg_requests, + maintain_order, + options, + self.agg_infos, + ) @staticmethod def check_agg(agg: expr.Expr) -> int: @@ -710,13 +795,18 @@ def check_agg(agg: expr.Expr) -> int: else: raise NotImplementedError(f"No handler for {agg=}") - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + keys_in: Sequence[expr.NamedExpr], + agg_requests: Sequence[expr.NamedExpr], + maintain_order: bool, # noqa: FBT001 + options: Any, + agg_infos: Sequence[expr.AggInfo], + df: DataFrame, + ): """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - keys = broadcast( - *(k.evaluate(df) for k in self.keys), target_length=df.num_rows - ) + keys = broadcast(*(k.evaluate(df) for k in keys_in), target_length=df.num_rows) sorted = ( plc.types.Sorted.YES if all(k.is_sorted for k in keys) @@ -732,7 +822,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: # TODO: uniquify requests = [] replacements: list[expr.Expr] = [] - for info in self.agg_infos: + for info in agg_infos: for pre_eval, req, rep in info.requests: if pre_eval is None: # A count aggregation, doesn't touch the column, @@ -754,12 +844,10 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: for key, grouped_key in zip(keys, group_keys.columns(), strict=True) ] result_subs = DataFrame(raw_columns) - results = [ - req.evaluate(result_subs, mapping=mapping) for req in self.agg_requests - ] + results = [req.evaluate(result_subs, mapping=mapping) for req in agg_requests] broadcasted = broadcast(*result_keys, *results) # Handle order preservation of groups - if self.maintain_order and not sorted: + if maintain_order and not sorted: # The order we want want = plc.stream_compaction.stable_distinct( plc.Table([k.obj for k in keys]), @@ -799,7 +887,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: ordered_table.columns(), broadcasted, strict=True ) ] - return DataFrame(broadcasted).slice(self.options.slice) + return DataFrame(broadcasted).slice(options.slice) class Join(IR): @@ -841,6 +929,7 @@ def __init__( self.right_on = tuple(right_on) self.options = options self.children = (left, right) + self._non_child_args = (self.left_on, self.right_on, self.options) if any( isinstance(e.value, expr.Literal) for e in itertools.chain(self.left_on, self.right_on) @@ -886,8 +975,8 @@ def _joiners( ) assert_never(how) + @staticmethod def _reorder_maps( - self, left_rows: int, lg: plc.Column, left_policy: plc.copying.OutOfBoundsPolicy, @@ -939,10 +1028,23 @@ def _reorder_maps( [plc.types.NullOrder.AFTER, plc.types.NullOrder.AFTER], ).columns() - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + left_on_exprs: Sequence[expr.NamedExpr], + right_on_exprs: Sequence[expr.NamedExpr], + options: tuple[ + Literal["inner", "left", "right", "full", "semi", "anti", "cross"], + bool, + tuple[int, int] | None, + str, + bool, + ], + left: DataFrame, + right: DataFrame, + ) -> DataFrame: """Evaluate and return a dataframe.""" - left, right = (c.evaluate(cache=cache) for c in self.children) - how, join_nulls, zlice, suffix, coalesce = self.options + how, join_nulls, zlice, suffix, coalesce = options if how == "cross": # Separate implementation, since cross_join returns the # result, not the gather maps @@ -966,14 +1068,14 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: ] return DataFrame([*left_cols, *right_cols]).slice(zlice) # TODO: Waiting on clarity based on https://github.com/pola-rs/polars/issues/17184 - left_on = DataFrame(broadcast(*(e.evaluate(left) for e in self.left_on))) - right_on = DataFrame(broadcast(*(e.evaluate(right) for e in self.right_on))) + left_on = DataFrame(broadcast(*(e.evaluate(left) for e in left_on_exprs))) + right_on = DataFrame(broadcast(*(e.evaluate(right) for e in right_on_exprs))) null_equality = ( plc.types.NullEquality.EQUAL if join_nulls else plc.types.NullEquality.UNEQUAL ) - join_fn, left_policy, right_policy = Join._joiners(how) + join_fn, left_policy, right_policy = cls._joiners(how) if right_policy is None: # Semi join lg = join_fn(left_on.table, right_on.table, null_equality) @@ -987,7 +1089,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: lg, rg = join_fn(left_on.table, right_on.table, null_equality) if how == "left" or how == "right": # Order of left table is preserved - lg, rg = self._reorder_maps( + lg, rg = cls._reorder_maps( left.num_rows, lg, left_policy, right.num_rows, rg, right_policy ) if coalesce and how == "inner": @@ -1046,14 +1148,19 @@ def __init__( self.schema = schema self.columns = tuple(columns) self.should_broadcast = should_broadcast + self._non_child_args = (self.columns, self.should_broadcast) self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + exprs: Sequence[expr.NamedExpr], + should_broadcast: bool, # noqa: FBT001 + df: DataFrame, + ) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - columns = [c.evaluate(df) for c in self.columns] - if self.should_broadcast: + columns = [c.evaluate(df) for c in exprs] + if should_broadcast: columns = broadcast(*columns, target_length=df.num_rows) else: # Polars ensures this is true, but let's make sure nothing @@ -1063,7 +1170,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: # table that might have mismatching column lengths will # never be turned into a pylibcudf Table with all columns # by the Select, which is why this is safe. - assert all(e.name.startswith("__POLARS_CSER_0x") for e in self.columns) + assert all(e.name.startswith("__POLARS_CSER_0x") for e in exprs) return df.with_columns(columns) @@ -1096,6 +1203,7 @@ def __init__( self.subset = subset self.zlice = zlice self.stable = stable + self._non_child_args = (keep, subset, zlice, stable) self.children = (df,) _KEEP_MAP: ClassVar[dict[str, plc.stream_compaction.DuplicateKeepOption]] = { @@ -1105,33 +1213,39 @@ def __init__( "any": plc.stream_compaction.DuplicateKeepOption.KEEP_ANY, } - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + keep: plc.stream_compaction.DuplicateKeepOption, + subset: frozenset[str] | None, + zlice: tuple[int, int] | None, + stable: bool, # noqa: FBT001 + df: DataFrame, + ): """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - if self.subset is None: + if subset is None: indices = list(range(df.num_columns)) keys_sorted = all(c.is_sorted for c in df.column_map.values()) else: - indices = [i for i, k in enumerate(df.column_names) if k in self.subset] - keys_sorted = all(df.column_map[name].is_sorted for name in self.subset) + indices = [i for i, k in enumerate(df.column_names) if k in subset] + keys_sorted = all(df.column_map[name].is_sorted for name in subset) if keys_sorted: table = plc.stream_compaction.unique( df.table, indices, - self.keep, + keep, plc.types.NullEquality.EQUAL, ) else: distinct = ( plc.stream_compaction.stable_distinct - if self.stable + if stable else plc.stream_compaction.distinct ) table = distinct( df.table, indices, - self.keep, + keep, plc.types.NullEquality.EQUAL, plc.types.NanEquality.ALL_EQUAL, ) @@ -1142,9 +1256,9 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: for new, old in zip(table.columns(), df.columns, strict=True) ] ) - if keys_sorted or self.stable: + if keys_sorted or stable: result = result.sorted_like(df) - return result.slice(self.zlice) + return result.slice(zlice) class Sort(IR): @@ -1179,29 +1293,39 @@ def __init__( self.null_order = tuple(null_order) self.stable = stable self.zlice = zlice + self._non_child_args = ( + self.by, + self.order, + self.null_order, + self.stable, + self.zlice, + ) self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate( + cls, + by: Sequence[expr.NamedExpr], + order: Sequence[plc.types.Order], + null_order: Sequence[plc.types.NullOrder], + stable: bool, # noqa: FBT001 + zlice: tuple[int, int] | None, + df: DataFrame, + ) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - sort_keys = broadcast( - *(k.evaluate(df) for k in self.by), target_length=df.num_rows - ) + sort_keys = broadcast(*(k.evaluate(df) for k in by), target_length=df.num_rows) # TODO: More robust identification here. keys_in_result = { k.name: i for i, k in enumerate(sort_keys) if k.name in df.column_map and k.obj is df.column_map[k.name].obj } - do_sort = ( - plc.sorting.stable_sort_by_key if self.stable else plc.sorting.sort_by_key - ) + do_sort = plc.sorting.stable_sort_by_key if stable else plc.sorting.sort_by_key table = do_sort( df.table, plc.Table([k.obj for k in sort_keys]), - list(self.order), - list(self.null_order), + list(order), + list(null_order), ) columns: list[Column] = [] for name, c in zip(df.column_map, table.columns(), strict=True): @@ -1211,11 +1335,11 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: i = keys_in_result[name] column = column.set_sorted( is_sorted=plc.types.Sorted.YES, - order=self.order[i], - null_order=self.null_order[i], + order=order[i], + null_order=null_order[i], ) columns.append(column) - return DataFrame(columns).slice(self.zlice) + return DataFrame(columns).slice(zlice) class Slice(IR): @@ -1232,13 +1356,13 @@ def __init__(self, schema: Schema, offset: int, length: int, df: IR): self.schema = schema self.offset = offset self.length = length + self._non_child_args = (offset, length) self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate(cls, offset: int, length: int, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - return df.slice((self.offset, self.length)) + return df.slice((offset, length)) class Filter(IR): @@ -1252,13 +1376,13 @@ class Filter(IR): def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR): self.schema = schema self.mask = mask + self._non_child_args = (mask,) self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate(cls, mask_expr: expr.NamedExpr, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) - (mask,) = broadcast(self.mask.evaluate(df), target_length=df.num_rows) + (mask,) = broadcast(mask_expr.evaluate(df), target_length=df.num_rows) return df.filter(mask) @@ -1270,15 +1394,15 @@ class Projection(IR): def __init__(self, schema: Schema, df: IR): self.schema = schema + self._non_child_args = (schema,) self.children = (df,) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate(cls, schema: Schema, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - df = child.evaluate(cache=cache) # This can reorder things. columns = broadcast( - *(df.column_map[name] for name in self.schema), target_length=df.num_rows + *(df.column_map[name] for name in schema), target_length=df.num_rows ) return DataFrame(columns) @@ -1341,33 +1465,41 @@ def __init__(self, schema: Schema, name: str, options: Any, df: IR): "Unpivot cannot cast all input columns to " f"{self.schema[value_name].id()}" ) - self.options = (tuple(indices), tuple(pivotees), variable_name, value_name) + self.options = ( + tuple(indices), + tuple(pivotees), + (variable_name, schema[variable_name]), + (value_name, schema[value_name]), + ) + self._non_child_args = (name, self.options) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate(cls, name: str, options: Any, df: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" - (child,) = self.children - if self.name == "rechunk": + if name == "rechunk": # No-op in our data model # Don't think this appears in a plan tree from python - return child.evaluate(cache=cache) # pragma: no cover - elif self.name == "rename": - df = child.evaluate(cache=cache) + return df # pragma: no cover + elif name == "rename": # final tag is "swapping" which is useful for the # optimiser (it blocks some pushdown operations) - old, new, _ = self.options + old, new, _ = options return df.rename_columns(dict(zip(old, new, strict=True))) - elif self.name == "explode": - df = child.evaluate(cache=cache) - ((to_explode,),) = self.options + elif name == "explode": + ((to_explode,),) = options index = df.column_names.index(to_explode) subset = df.column_names_set - {to_explode} return DataFrame.from_table( plc.lists.explode_outer(df.table, index), df.column_names ).sorted_like(df, subset=subset) - elif self.name == "unpivot": - indices, pivotees, variable_name, value_name = self.options + elif name == "unpivot": + ( + indices, + pivotees, + (variable_name, variable_dtype), + (value_name, value_dtype), + ) = options npiv = len(pivotees) - df = child.evaluate(cache=cache) index_columns = [ Column(col, name=name) for col, name in zip( @@ -1382,7 +1514,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: plc.interop.from_arrow( pa.array( pivotees, - type=plc.interop.to_arrow(self.schema[variable_name]), + type=plc.interop.to_arrow(variable_dtype), ), ) ] @@ -1390,10 +1522,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: df.num_rows, ).columns() value_column = plc.concatenate.concatenate( - [ - df.column_map[pivotee].astype(self.schema[value_name]).obj - for pivotee in pivotees - ] + [df.column_map[pivotee].astype(value_dtype).obj for pivotee in pivotees] ) return DataFrame( [ @@ -1417,18 +1546,20 @@ class Union(IR): def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR): self.schema = schema self.zlice = zlice + self._non_child_args = (zlice,) self.children = children schema = self.children[0].schema if not all(s.schema == schema for s in self.children[1:]): raise NotImplementedError("Schema mismatch") - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate(cls, zlice: tuple[int, int] | None, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" - # TODO: only evaluate what we need if we have a slice - dfs = [df.evaluate(cache=cache) for df in self.children] + # TODO: only evaluate what we need if we have a slice? return DataFrame.from_table( - plc.concatenate.concatenate([df.table for df in dfs]), dfs[0].column_names - ).slice(self.zlice) + plc.concatenate.concatenate([df.table for df in dfs]), + dfs[0].column_names, + ).slice(zlice) class HConcat(IR): @@ -1439,6 +1570,7 @@ class HConcat(IR): def __init__(self, schema: Schema, *children: IR): self.schema = schema + self._non_child_args = () self.children = children @staticmethod @@ -1469,18 +1601,22 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: ] ) - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: + @classmethod + def do_evaluate(cls, *dfs: DataFrame) -> DataFrame: """Evaluate and return a dataframe.""" - dfs = [df.evaluate(cache=cache) for df in self.children] max_rows = max(df.num_rows for df in dfs) # Horizontal concatenation extends shorter tables with nulls - dfs = [ - df - if df.num_rows == max_rows - else DataFrame.from_table( - self._extend_with_nulls(df.table, nrows=max_rows - df.num_rows), - df.column_names, + return DataFrame( + itertools.chain.from_iterable( + df.columns + for df in ( + df + if df.num_rows == max_rows + else DataFrame.from_table( + cls._extend_with_nulls(df.table, nrows=max_rows - df.num_rows), + df.column_names, + ) + for df in dfs + ) ) - for df in dfs - ] - return DataFrame(itertools.chain.from_iterable(df.columns for df in dfs)) + ) diff --git a/python/cudf_polars/docs/overview.md b/python/cudf_polars/docs/overview.md index 74b2cd4e5de..17a94c633f8 100644 --- a/python/cudf_polars/docs/overview.md +++ b/python/cudf_polars/docs/overview.md @@ -212,7 +212,11 @@ methods. Plan node definitions live in `cudf_polars/dsl/ir.py`, these all inherit from the base `IR` node. The evaluation of a plan node is done -by implementing the `evaluate` method. +by implementing the `do_evaluate` method. This method takes in +the non-child arguments specified in `_non_child_args`, followed by +pre-evaluated child nodes (`DataFrame` objects). To perform the +evaluation, one should use the base class (generic) `evaluate` method +which handles the recursive evaluation of child nodes. To translate the plan node, add a case handler in `translate_ir` that lives in `cudf_polars/dsl/translate.py`. diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index a2c62ef9460..2e75dff5c9e 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -53,7 +53,9 @@ version = {file = "cudf_polars/VERSION"} addopts = "--tb=native --strict-config --strict-markers" empty_parameter_set_mark = "fail_at_collect" filterwarnings = [ - "error" + "error", + # https://github.com/rapidsai/build-planning/issues/116 + "ignore:.*cuda..* module is deprecated.*:DeprecationWarning", ] xfail_strict = true diff --git a/python/custreamz/pyproject.toml b/python/custreamz/pyproject.toml index a8ab05a3922..d3baf3bf4d2 100644 --- a/python/custreamz/pyproject.toml +++ b/python/custreamz/pyproject.toml @@ -85,6 +85,8 @@ addopts = "--tb=native --strict-config --strict-markers" empty_parameter_set_mark = "fail_at_collect" filterwarnings = [ "error", + # https://github.com/rapidsai/build-planning/issues/116 + "ignore:.*cuda..* module is deprecated.*:DeprecationWarning", "ignore:unclosed =0.0.0a0", "nvidia-nvcomp==4.1.0.6", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. diff --git a/python/pylibcudf/pyproject.toml b/python/pylibcudf/pyproject.toml index a80c85a1fa8..e8052dfba4c 100644 --- a/python/pylibcudf/pyproject.toml +++ b/python/pylibcudf/pyproject.toml @@ -18,7 +18,7 @@ authors = [ license = { text = "Apache 2.0" } requires-python = ">=3.10" dependencies = [ - "cuda-python>=11.7.1,<12.0a0", + "cuda-python>=11.7.1,<12.0a0,!=11.8.4", "libcudf==24.12.*,>=0.0.0a0", "nvtx>=0.2.1", "packaging", @@ -74,6 +74,8 @@ addopts = "--tb=native --strict-config --strict-markers --import-mode=importlib" empty_parameter_set_mark = "fail_at_collect" filterwarnings = [ "error", + # https://github.com/rapidsai/build-planning/issues/116 + "ignore:.*cuda..* module is deprecated.*:DeprecationWarning", "ignore:::.*xdist.*", "ignore:::.*pytest.*" ]