diff --git a/Dockerfile-native b/Dockerfile-native new file mode 100644 index 000000000000..2677193cf910 --- /dev/null +++ b/Dockerfile-native @@ -0,0 +1,55 @@ +ARG PRESTO_VERSION + +FROM ghcr.io/facebookincubator/velox-dev:circleci-avx as Builder + +WORKDIR /app +COPY . . +RUN cd presto-native-execution && \ + make velox-submodule && \ + source /opt/rh/gcc-toolset-9/enable && \ + ./scripts/setup-centos.sh && \ + EXTRA_CMAKE_FLAGS=" -DVELOX_ENABLE_INT64_BUILD_PARTITION_BOUND=ON -DPRESTO_ENABLE_TESTING=OFF" PRESTO_ENABLE_PARQUET=ON PRESTO_ENABLE_S3=ON MAX_HIGH_MEM_JOBS=8 MAX_LINK_JOBS=8 NUM_THREADS=8 TREAT_WARNINGS_AS_ERRORS=0 make release \ + || tail -n 500 _build/release/CMakeFiles/CMakeError.log + +RUN cd presto-native-execution && \ + mkdir -p prestissimo && \ + cp _build/release/presto_cpp/main/presto_server prestissimo && \ + mkdir -p prestissimo/runtime-libraries && \ + ldd _build/release/presto_cpp/main/presto_server | awk 'NF == 4 { system("cp " $3 " prestissimo/runtime-libraries/") }' && \ + tar cvf prestissimo.tar entrypoint.sh velox.properties prestissimo + +## + +FROM quay.io/centos/centos:stream8 + +ENV PRESTO_HOME="/opt/presto-server" + +RUN dnf update -y && dnf install -y epel-release +RUN dnf update -y && dnf --enablerepo=powertools install -y \ + awscli \ + gperf \ + iproute \ + lsof \ + procps \ + python3 \ + sysstat \ + tar \ + vim \ + wget \ + which \ + && \ + mkdir -p $PRESTO_HOME/etc/catalog && \ + mkdir -p /var/lib/presto/data + +WORKDIR /app +COPY --from=Builder /app/presto-native-execution/prestissimo.tar . +RUN tar xvf prestissimo.tar && \ + mkdir -p /opt/presto-server/etc && \ + mv prestissimo/presto_server /usr/local/bin/ && \ + mv velox.properties /opt/presto-server/etc/ && \ + mv entrypoint.sh /opt/ && \ + mv prestissimo/runtime-libraries/* /usr/local/lib/ && \ + touch /opt/presto-native-execution-${PRESTO_VERSION} + +ENTRYPOINT ["/opt/entrypoint.sh"] + diff --git a/Dockerfile-native-debug b/Dockerfile-native-debug new file mode 100644 index 000000000000..21ba1e6fefac --- /dev/null +++ b/Dockerfile-native-debug @@ -0,0 +1,53 @@ +ARG PRESTO_VERSION + +FROM ghcr.io/facebookincubator/velox-dev:circleci-avx as Builder + +WORKDIR /app +COPY . . +RUN cd presto-native-execution && \ + make velox-submodule && \ + source /opt/rh/gcc-toolset-9/enable && \ + ./scripts/setup-centos.sh && \ + EXTRA_CMAKE_FLAGS=" -DVELOX_ENABLE_INT64_BUILD_PARTITION_BOUND=ON -DPRESTO_ENABLE_TESTING=OFF" PRESTO_ENABLE_PARQUET=ON PRESTO_ENABLE_S3=ON MAX_HIGH_MEM_JOBS=8 MAX_LINK_JOBS=8 NUM_THREADS=8 TREAT_WARNINGS_AS_ERRORS=0 make debug \ + || tail -n 500 _build/debug/CMakeFiles/CMakeError.log + +RUN cd presto-native-execution && \ + mkdir -p prestissimo && \ + cp _build/debug/presto_cpp/main/presto_server prestissimo && \ + mkdir -p prestissimo/runtime-libraries && \ + ldd _build/debug/presto_cpp/main/presto_server | awk 'NF == 4 { system("cp " $3 " prestissimo/runtime-libraries/") }' && \ + tar cvf prestissimo.tar entrypoint_debug.sh velox.properties prestissimo + +ENV PRESTO_HOME="/opt/presto-server" + +RUN dnf update -y && dnf install -y epel-release + +RUN dnf update -y && dnf --enablerepo=powertools install -y \ + awscli \ + gperf \ + iproute \ + lsof \ + procps \ + python3 \ + sysstat \ + tar \ + vim \ + wget \ + which \ + lldb \ + openssh-server \ + openssh-clients \ + && \ + mkdir -p $PRESTO_HOME/etc/catalog && \ + mkdir -p /var/lib/presto/data + +RUN cd presto-native-execution && \ + tar xvf prestissimo.tar && \ + mkdir -p /opt/presto-server/etc && \ + mv prestissimo/presto_server /usr/local/bin/ && \ + mv velox.properties /opt/presto-server/etc/ && \ + mv entrypoint_debug.sh /opt/ && \ + mv prestissimo/runtime-libraries/* /usr/local/lib/ && \ + touch /opt/presto-native-execution-${PRESTO_VERSION} + +ENTRYPOINT ["/opt/entrypoint_debug.sh"] diff --git a/Jenkinsfile b/Jenkinsfile index 3f978a33b415..045093f3df97 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -94,6 +94,7 @@ pipeline { sh(script: "git show -s --format=%cd --date=format:'%Y%m%d%H%M%S'", returnStdout: true).trim() + "-" + env.PRESTO_COMMIT_SHA.substring(0, 7) env.DOCKER_IMAGE = env.AWS_ECR + "/${IMG_NAME}:${PRESTO_BUILD_VERSION}" + env.DOCKER_NATIVE_IMAGE = env.AWS_ECR + "/${IMG_NAME}-native:${PRESTO_BUILD_VERSION}" } sh 'printenv | sort' @@ -166,6 +167,22 @@ pipeline { } } + stage('Docker Native Build') { + steps { + echo "Building ${DOCKER_NATIVE_IMAGE}" + withCredentials([[ + $class: 'AmazonWebServicesCredentialsBinding', + credentialsId: "${AWS_CREDENTIAL_ID}", + accessKeyVariable: 'AWS_ACCESS_KEY_ID', + secretKeyVariable: 'AWS_SECRET_ACCESS_KEY']]) { + sh '''#!/bin/bash -ex + docker buildx build -f Dockerfile-native --load --platform "linux/amd64" -t "${DOCKER_NATIVE_IMAGE}" \ + --build-arg "PRESTO_VERSION=${PRESTO_VERSION}" . + ''' + } + } + } + stage('Publish Docker') { when { anyOf { @@ -187,6 +204,7 @@ pipeline { aws s3 ls ${AWS_S3_PREFIX}/${PRESTO_BUILD_VERSION}/ aws ecr-public get-login-password | docker login --username AWS --password-stdin ${AWS_ECR} PUBLISH=true REG_ORG=${AWS_ECR} IMAGE_NAME=${IMG_NAME} TAG=${PRESTO_BUILD_VERSION} ./build.sh ${PRESTO_VERSION} + docker push "${DOCKER_NATIVE_IMAGE}" ''' } } diff --git a/jenkins/agent-dind.yaml b/jenkins/agent-dind.yaml index 6dc3a13b2936..04514d5750a3 100644 --- a/jenkins/agent-dind.yaml +++ b/jenkins/agent-dind.yaml @@ -1,23 +1,23 @@ apiVersion: v1 kind: Pod metadata: - namespace: oss-agent - labels: - containers: dind + namespace: oss-agent + labels: + containers: dind spec: - nodeSelector: - eks.amazonaws.com/nodegroup: eks-oss-presto-dynamic-managed-ng - serviceAccountName: oss-agent - containers: - - name: dind - image: docker:20.10.16-dind-alpine3.15 - securityContext: - privileged: true - tty: true - resources: - requests: - memory: "4Gi" - cpu: "2000m" - limits: - memory: "4Gi" - cpu: "2000m" + nodeSelector: + eks.amazonaws.com/nodegroup: eks-oss-presto-dynamic-managed-ng + serviceAccountName: oss-agent + containers: + - name: dind + image: docker:20.10.16-dind-alpine3.15 + securityContext: + privileged: true + tty: true + resources: + requests: + memory: "24Gi" + cpu: "7000m" + limits: + memory: "24Gi" + cpu: "7000m" diff --git a/jenkins/agent-maven.yaml b/jenkins/agent-maven.yaml index 92651edaf2dd..720a8c74e575 100644 --- a/jenkins/agent-maven.yaml +++ b/jenkins/agent-maven.yaml @@ -1,26 +1,27 @@ apiVersion: v1 kind: Pod metadata: - namespace: oss-agent - labels: - containers: maven + namespace: oss-agent + labels: + containers: maven spec: - nodeSelector: - eks.amazonaws.com/nodegroup: eks-oss-presto-dynamic-managed-ng - serviceAccountName: oss-agent - containers: - - name: maven - image: maven:3.8.6-openjdk-8-slim - env: - - name: MAVEN_OPTS - value: "-Xmx8000m -Xms8000m" - resources: - requests: - memory: "10Gi" - cpu: "4000m" - limits: - memory: "10Gi" - cpu: "4000m" - tty: true - command: - - cat + nodeSelector: + eks.amazonaws.com/nodegroup: eks-oss-presto-dynamic-managed-ng + serviceAccountName: oss-agent + containers: + - name: maven + image: maven:3.8.6-openjdk-8-slim + env: + - name: MAVEN_OPTS + value: "-Xmx8000m -Xms8000m" + resources: + requests: + memory: "10Gi" + cpu: "4000m" + limits: + memory: "10Gi" + cpu: "4000m" + tty: true + command: + - cat + diff --git a/presto-docs/src/main/sphinx/release.rst b/presto-docs/src/main/sphinx/release.rst index 0c96a8fa4901..e3a058414757 100644 --- a/presto-docs/src/main/sphinx/release.rst +++ b/presto-docs/src/main/sphinx/release.rst @@ -5,6 +5,7 @@ Release Notes .. toctree:: :maxdepth: 1 + Release-0.286 [2024-02-12] Release-0.285.1 [2023-12-30] Release-0.285 [2023-12-08] Release-0.284 [2023-10-11] diff --git a/presto-docs/src/main/sphinx/release/release-0.286.rst b/presto-docs/src/main/sphinx/release/release-0.286.rst new file mode 100644 index 000000000000..55a352f9fa4b --- /dev/null +++ b/presto-docs/src/main/sphinx/release/release-0.286.rst @@ -0,0 +1,84 @@ +============= +Release 0.286 +============= + +**Highlights** +============== + +**Details** +=========== + +General Changes +_______________ +* Fix a bug for ``min_by`` and ``max_by`` for window functions, where results are incorrect when the function specifies number of elements to keep and the window does not have “unbounded following” in the frame. :pr:`21793` +* Fix a potential bug in ``EXCEPT`` and ``INTERSECT`` queries by not pruning unreferenced output for intersect and except nodes in the PruneUnreferencedOutputs rule. :pr:`21343` +* Fix a bug in PullUpExpressionInLambdaRules optimizer where expressions being extracted were of JoniRegexType or LikePatternType. :pr:`21407` +* Extend PullUpExpressionInLambdaRulesFix optimizers to now extract independent expressions from within conditional expressions. :pr:`21344` +* Fix an issue with the serialization of the retriable property in QueryError. :pr:`19741` +* Fix the compilation error of merge function for KHyperLogLog state and add a config property ``limit-khyperloglog-agg-group-number-enabled`` to control whether to limit the number of groups for aggregation functions using KHyperLogLog state. (enabled by default) :pr:`21824` +* Fix a bug in CTE materialization which was causing incorrect plan generation in some cases. :pr:`21580` +* Fix :doc:`/plugin/redis-hbo-provider` documentation to include the correct configuration properties and added documentation for coordinator HBO configurations. :pr:`21477` +* Fix writtenIntermediateBytes metric by ensuring its correct population through temporary table writers. :pr:`21626` +* Add support for adaptive partial aggregation which disables partial aggregation in cases where it could be inefficient. This feature is configurable by the session property ``adaptive_partial_aggregation`` (disabled by default) :pr:`20979` +* Improve predicate pushdown through Joins. :pr:`21353` +* Improve the readability of query plans by formatting numbers with commas for easier interpretation. :pr:`21486` +* Add additional linear regression functions like REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_R2, REGR_SXX, REGR_SXY, and REGR_SYY. :pr:`21630` +* Add UPDATE sql support in Presto. :pr:`21435` +* Add a session property ``skip_hash_generation_for_join_with_table_scan_input`` to skip hash precomputation for join when the input is table scan, and the hash is on a single big int and is not reused later. The property defaults to not enabled. :pr:`20948` +* Add a config property ``khyperloglog-agg-group-limit`` to limit the maximum number of groups that ``khyperloglog_agg`` function can have. The query will fail when the limit is exceeded. (The default is 0 which means no limit). :pr:`21510` +* Add a feature config property ``limit-khyperloglog-agg-group-number-enabled`` to control whether to limit the number of groups for aggregation functions using KHyperLogLog state. :pr:`21824` +* Add session property ``rewrite_expression_with_constant_expression`` which defaults to enabled. This optimizes queries which have an equivalence check filter or constant assignments. :pr:`19836` +* Add session property ``rewrite_left_join_array_contains_to_equi_join`` that transforms left joins with an ARRAY CONTAINS condition in the join criteria into an equi join. :pr:`21420` +* Add an option in the Presto client to disable redirects that fixes advisory `GHSA-xm7x-f3w2-4hjm `_. :pr:`21024` +* Improve prestodb/presto docker image by including ``config.properties`` and ``jvm.config`` files. :pr:`21384` +* Upgrade ``hadoop-apache2`` to ``2.7.4-12``. This fixes errors like ``library not found: /nativelib/Linux-aarch64/libhadoop.so`` when running presto on ARM64. :pr:`21483` +* Add validation in Presto client to ensure that the host and port of the next URI do not change during query execution in Presto, enhancing security by preventing redirection to untrusted sources. :pr:`21101` +* Add :doc:`Ecosystem ` documentation. :pr:`21698` +* Add ``cte_hash_partition_count`` session property to specify the number of buckets or writers to be used when using CTE Materialization. :pr:`21625` +* Add :doc:`/installation/deploy-helm` to Installation documentation. :pr:`21812` +* Remove redundant sort columns from query plans if a unique constraint can be identified for a prefix of the ordering list. :pr:`21371` +* Add changelog table ``$changelog`` that allows users to track when records were added or deleted in snapshots. :pr:`20937` +* Add `reservoir_sample <../functions/aggregate.html#reservoir_sample>`_ aggregation function which is useful for generating fixed-size samples. :pr:`21296` +* Remove ``exchange.async-page-transport-enabled`` configuration property as deprecated. :pr:`21772` +* Pass extra credentials such as CAT tokens for definer mode in views. :pr:`21685` +* Add function ``map_top_n_keys_by_value`` which returns top ``n`` keys of a map by value. :pr:`21259` +* Add support for materialization of Common Table Expressions (CTEs) in queries. The underlying connectors must support creating temporary tables, a functionality presently exclusive to the Hive connector. :pr:`20887` + +SPI Changes +___________ +* Add support for connectors to return joins in ``ConnectorPlanOptimizer.optimize``. :pr:`21605` + +Hive Changes +____________ +* Fix parquet dereference pushdown which was not working unless the ``parquet_use_column_names`` session property was set. :pr:`21647` +* Fix CTE materialization for unsupported Hive bucket types. :pr:`21549` +* Remove hive config ``hive.s3.use-instance-credentials`` as deprecated. :pr:`21648` + +Hudi Changes +____________ +* Upgrade Hudi version to 0.14.0. :pr:`21012` + +Iceberg Changes +_______________ +* Upgrade Apache Iceberg to 1.4.3. :pr:`21714` +* Add Iceberg Filter Pushdown Optimizer Rule for execution with Velox. :pr:`20501` +* Add ``iceberg.pushdown-filter-enabled`` config property to Iceberg Connector. This config property controls the behaviour of Filter Pushdown in the Iceberg connector. :pr:`20501` +* Add `register <../connector/iceberg.html#register-table>`_ and `unregister <../connector/iceberg.html#unregister-table>`_ procedures for Iceberg tables. :pr:`21335` +* Add session property ``iceberg.delete_as_join_rewrite_enabled`` (enabled by default) to apply equality deletes as a join. :pr:`21605` +* Add support for querying ``"$data_sequence_number"`` which returns the Iceberg data sequence number of the file containing the row. :pr:`21605` +* Add support for querying ``"$path"`` which returns the file path containing the row. :pr:`21605` +* Add support for reading v2 row level deletes in Iceberg connector. :pr:`21189` +* Add support for Day, Month, and Year transform function with partition column for date type in Presto Iceberg connector. :pr:`21303` +* Add support for Day, Month, and Year transform function with partition column for timestamp type in Presto Iceberg connector. :pr:`21303` +* Add support for Day, Month, Year, and Hour partition column transform functions when altering a table to add partition columns in Presto Iceberg connector. :pr:`21575` +* Optimize Table Metadata calls for Iceberg tables. :pr:`21629` +* Add support for `time travel <../connector/iceberg.html#time-travel-using-version-system-version-and-timestamp-system-time>`_, enabling the retrieval of historical data with the `AS OF` syntax. :pr:`21425` +* Add support for `time travel <../connector/iceberg.html#time-travel-using-version-system-version-and-timestamp-system-time>`_ ``TIMESTAMP (SYSTEM_TIME)`` syntax includes timestamp-with-time-zone data type. It will return data based on snapshot with matching timestamp or before. :pr:`21425` +* Add support for `time travel <../connector/iceberg.html#time-travel-using-version-system-version-and-timestamp-system-time>`_ ``VERSION (SYSTEM_VERSION)`` syntax includes snapshot id using bigint data type. :pr:`21425` +* Add manifest file caching support for Iceberg native catalogs. :pr:`21399` +* Fix Iceberg memory leak with ``DeleteFile``. :pr:`21612` + +**Credits** +=========== + +8dukongjian, AbhijitKulkarni1, Aditi Pandit, Ajay George, Ajay Gupte, Amit Dutta, Anant Aneja, Andrii Rosa, Anil Gupta Somisetty, Antoine Pultier, Arjun Gupta, Avinash Jain, Beinan, Bikramjeet Vig, Changli Liu, Christian Zentgraf, Chunxu Tang, Deepak Majeti, Diana Meehan, Facebook Community Bot, Ge Gao, Jalpreet Singh Nanda (:imjalpreet), Jason Fine, Jialiang Tan, Jimmy Lu, Jonathan Hehir, Junhao Liu, Ke, Kevin Wilfong, Krishna Pai, Linsong Wang, Luis Paolini, Lyublena Antova, Mahadevuni Naveen Kumar, Masha Basmanova, Matthew Peveler, Michael Shang, Nikhil Collooru, Nilay Pochhi, Patrick Stuedi, Paul Meng, Pedro Pedreira, Pramod, Pranjal Shankhdhar, Pratik Joseph Dabre, Reetika Agrawal, Richard Barnes, Rohit Jain, Sagar Sumit, Sergey Pershin, Sergii Druzkin, Shrinidhi Joshi, Steve Burnett, Sudheesh, Tai Le, Tim Meehan, TommyLemon, Vigneshwar Selvaraj, VishnuSanal, Vivek, Yihong Wang, Ying, Zac, Zac Blanco, Zhenxiao Luo, abhiseksaikia, feilong-liu, hainenber, jaystarshot, karteekmurthys, kedia,Akanksha, kiersten-stokes, mohsaka, pratyakshsharma, prithvip, renurajagop, rui-mo, shenhong, wangd, wypb, xiaoxmeng, xumingming diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java index e5eec903b3c8..5cb49263ec43 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveLogicalPlanner.java @@ -164,6 +164,50 @@ protected QueryRunner createQueryRunner() Optional.empty()); } + @Test + public void testMetadataQueryOptimizationWithLimit() + { + QueryRunner queryRunner = getQueryRunner(); + Session sessionWithOptimizeMetadataQueries = getSessionWithOptimizeMetadataQueries(); + Session defaultSession = queryRunner.getDefaultSession(); + try { + queryRunner.execute("CREATE TABLE test_metadata_query_optimization_with_limit(a varchar, b int, c int) WITH (partitioned_by = ARRAY['b', 'c'])"); + queryRunner.execute("INSERT INTO test_metadata_query_optimization_with_limit VALUES" + + " ('1001', 1, 1), ('1002', 1, 1), ('1003', 1, 1)," + + " ('1004', 1, 2), ('1005', 1, 2), ('1006', 1, 2)," + + " ('1007', 2, 1), ('1008', 2, 1), ('1009', 2, 1)"); + + // Could do metadata optimization when `limit` existing above `aggregation` + assertQuery(sessionWithOptimizeMetadataQueries, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3", + "values(1, 2), (1, 1), (2, 1)"); + assertPlan(sessionWithOptimizeMetadataQueries, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3", + anyTree(values(ImmutableList.of("b", "c"), + ImmutableList.of( + ImmutableList.of(new LongLiteral("1"), new LongLiteral("2")), + ImmutableList.of(new LongLiteral("1"), new LongLiteral("1")), + ImmutableList.of(new LongLiteral("2"), new LongLiteral("1")))))); + // Compare with default session which do not enable metadata optimization + assertQuery(defaultSession, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3", + "values(1, 2), (1, 1), (2, 1)"); + assertPlan(defaultSession, "select distinct b, c from test_metadata_query_optimization_with_limit order by c desc limit 3", + anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c")))); + + // Should not do metadata optimization when `limit` existing below `aggregation` + assertQuery(sessionWithOptimizeMetadataQueries, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b", + "values(1, 2, 2)"); + assertPlan(sessionWithOptimizeMetadataQueries, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b", + anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c")))); + // Compare with default session which do not enable metadata optimization + assertQuery(defaultSession, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b", + "values(1, 2, 2)"); + assertPlan(defaultSession, "with tt as (select b, c from test_metadata_query_optimization_with_limit order by c desc limit 3) select b, min(c), max(c) from tt group by b", + anyTree(strictTableScan("test_metadata_query_optimization_with_limit", identityMap("b", "c")))); + } + finally { + queryRunner.execute("DROP TABLE test_metadata_query_optimization_with_limit"); + } + } + @Test public void testRepeatedFilterPushdown() { @@ -1987,6 +2031,13 @@ private void assertParquetDereferencePushDown(Session session, String query, Str assertPlan(session, query, anyTree(tableScanParquetDeferencePushDowns(tableName, expectedDeferencePushDowns))); } + protected Session getSessionWithOptimizeMetadataQueries() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(OPTIMIZE_METADATA_QUERIES, "true") + .build(); + } + private Session pushdownFilterEnabled() { return Session.builder(getQueryRunner().getDefaultSession()) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/aggregation/DoubleRegressionAggregation.java b/presto-main/src/main/java/com/facebook/presto/operator/aggregation/DoubleRegressionAggregation.java index e42699f2a86c..24d1c6e61fcf 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/aggregation/DoubleRegressionAggregation.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/aggregation/DoubleRegressionAggregation.java @@ -84,7 +84,8 @@ public static void regrIntercept(@AggregationState RegressionState state, BlockB public static void regrSxy(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionSxy(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { DOUBLE.writeDouble(out, result); } else { @@ -97,7 +98,8 @@ public static void regrSxy(@AggregationState RegressionState state, BlockBuilder public static void regrSxx(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionSxx(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { DOUBLE.writeDouble(out, result); } else { @@ -110,7 +112,8 @@ public static void regrSxx(@AggregationState RegressionState state, BlockBuilder public static void regrSyy(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionSyy(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { DOUBLE.writeDouble(out, result); } else { @@ -136,7 +139,7 @@ public static void regrR2(@AggregationState RegressionState state, BlockBuilder public static void regrCount(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionCount(state); - if (Double.isFinite(result)) { + if (Double.isFinite(result) && result > 0) { DOUBLE.writeDouble(out, result); } else { @@ -149,7 +152,8 @@ public static void regrCount(@AggregationState RegressionState state, BlockBuild public static void regrAvgy(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionAvgy(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { DOUBLE.writeDouble(out, result); } else { @@ -162,7 +166,8 @@ public static void regrAvgy(@AggregationState RegressionState state, BlockBuilde public static void regrAvgx(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionAvgx(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { DOUBLE.writeDouble(out, result); } else { diff --git a/presto-main/src/main/java/com/facebook/presto/operator/aggregation/RealRegressionAggregation.java b/presto-main/src/main/java/com/facebook/presto/operator/aggregation/RealRegressionAggregation.java index 7b2943f8fac2..1fe5d006da1a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/aggregation/RealRegressionAggregation.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/aggregation/RealRegressionAggregation.java @@ -84,7 +84,8 @@ public static void regrIntercept(@AggregationState RegressionState state, BlockB public static void regrSxy(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionSxy(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { REAL.writeLong(out, floatToRawIntBits((float) result)); } else { @@ -97,7 +98,8 @@ public static void regrSxy(@AggregationState RegressionState state, BlockBuilder public static void regrSxx(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionSxx(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { REAL.writeLong(out, floatToRawIntBits((float) result)); } else { @@ -110,7 +112,8 @@ public static void regrSxx(@AggregationState RegressionState state, BlockBuilder public static void regrSyy(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionSyy(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { REAL.writeLong(out, floatToRawIntBits((float) result)); } else { @@ -136,7 +139,7 @@ public static void regrR2(@AggregationState RegressionState state, BlockBuilder public static void regrCount(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionCount(state); - if (Double.isFinite(result)) { + if (Double.isFinite(result) && result > 0) { REAL.writeLong(out, floatToRawIntBits((float) result)); } else { @@ -149,7 +152,8 @@ public static void regrCount(@AggregationState RegressionState state, BlockBuild public static void regrAvgy(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionAvgy(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { REAL.writeLong(out, floatToRawIntBits((float) result)); } else { @@ -162,7 +166,8 @@ public static void regrAvgy(@AggregationState RegressionState state, BlockBuilde public static void regrAvgx(@AggregationState RegressionState state, BlockBuilder out) { double result = getRegressionAvgx(state); - if (Double.isFinite(result)) { + double count = getRegressionCount(state); + if (Double.isFinite(result) && Double.isFinite(count) && count > 0) { REAL.writeLong(out, floatToRawIntBits((float) result)); } else { diff --git a/presto-main/src/main/java/com/facebook/presto/operator/scalar/sql/MapNormalizeFunction.java b/presto-main/src/main/java/com/facebook/presto/operator/scalar/sql/MapNormalizeFunction.java index 19159879ec0c..1618dac2c435 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/scalar/sql/MapNormalizeFunction.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/scalar/sql/MapNormalizeFunction.java @@ -28,6 +28,8 @@ private MapNormalizeFunction() {} @SqlType("map") public static String arraySumDouble() { - return "RETURN transform_values(input, (k, v) -> (v / reduce(map_values(input), double '0.0', (s, x) -> (s + coalesce(x, double '0.0')), (s) -> s)))"; + return "RETURN " + + "transform(array[ROW(input, cast(array_sum(map_values(input)) as double))], " + + "x->transform_values(x[1], (k, v) -> (v / x[2])))[1]"; } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MetadataQueryOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MetadataQueryOptimizer.java index d80f7a836921..d295658d59b8 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MetadataQueryOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MetadataQueryOptimizer.java @@ -34,13 +34,11 @@ import com.facebook.presto.spi.plan.AggregationNode.Aggregation; import com.facebook.presto.spi.plan.Assignments; import com.facebook.presto.spi.plan.FilterNode; -import com.facebook.presto.spi.plan.LimitNode; import com.facebook.presto.spi.plan.MarkDistinctNode; import com.facebook.presto.spi.plan.PlanNode; import com.facebook.presto.spi.plan.PlanNodeIdAllocator; import com.facebook.presto.spi.plan.ProjectNode; import com.facebook.presto.spi.plan.TableScanNode; -import com.facebook.presto.spi.plan.TopNNode; import com.facebook.presto.spi.plan.ValuesNode; import com.facebook.presto.spi.relation.ConstantExpression; import com.facebook.presto.spi.relation.RowExpression; @@ -384,8 +382,6 @@ private static Optional findTableScan(PlanNode source, RowExpress // allow any chain of linear transformations if (source instanceof MarkDistinctNode || source instanceof FilterNode || - source instanceof LimitNode || - source instanceof TopNNode || source instanceof SortNode) { source = source.getSources().get(0); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgxAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgxAggregation.java index 22c39514c64c..a16feba9528c 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgxAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgxAggregation.java @@ -29,7 +29,7 @@ protected String getFunctionName() public Object getExpectedValue(int start, int length) { if (length == 0) { - return 0.0; + return null; } double expected = 0.0; diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgyAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgyAggregation.java index 02446f7e716e..028385fc6050 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgyAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrAvgyAggregation.java @@ -29,7 +29,7 @@ protected String getFunctionName() public Object getExpectedValue(int start, int length) { if (length == 0) { - return 0.0; + return null; } double expected = 0.0; diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrCountAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrCountAggregation.java index 8a471cfd8c82..e7e253d92024 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrCountAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrCountAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length == 1) { return (double) length; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxxAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxxAggregation.java index 7d07463e0603..ac185912c435 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxxAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxxAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length == 1) { return (double) 0; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxyAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxyAggregation.java index d7b37ad9ac1a..2d83d8d65e4b 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxyAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSxyAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length == 1) { return (double) 0; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSyyAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSyyAggregation.java index 49de8dbe1d63..4fcfdf8f31b1 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSyyAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestDoubleRegrSyyAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length <= 1) { return (double) 0; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgxAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgxAggregation.java index 8b2500874f1a..71eaf4228721 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgxAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgxAggregation.java @@ -29,7 +29,7 @@ protected String getFunctionName() public Object getExpectedValue(int start, int length) { if (length == 0) { - return 0.0f; + return null; } float expected = 0.0f; diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgyAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgyAggregation.java index 6bb7e1ca203b..6582f6d95b25 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgyAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrAvgyAggregation.java @@ -29,7 +29,7 @@ protected String getFunctionName() public Object getExpectedValue(int start, int length) { if (length == 0) { - return 0.0f; + return null; } float expected = 0.0f; diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrCountAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrCountAggregation.java index cb4f9148d33e..eceb7bbb941b 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrCountAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrCountAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length == 1) { return (float) length; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxxAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxxAggregation.java index 6bf77d3b114d..d72fbfba72e3 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxxAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxxAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length == 1) { return (float) 0; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxyAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxyAggregation.java index ff4a8b94cab7..f83ec729c19f 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxyAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSxyAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length == 1) { return (float) 0; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSyyAggregation.java b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSyyAggregation.java index cefe9eb54741..115d0ced316a 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSyyAggregation.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/aggregation/TestRealRegrSyyAggregation.java @@ -30,7 +30,10 @@ protected String getFunctionName() @Override public Object getExpectedValue(int start, int length) { - if (length <= 1) { + if (length == 0) { + return null; + } + else if (length <= 1) { return (float) 0; } else { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/scalar/sql/TestMapNormalize.java b/presto-main/src/test/java/com/facebook/presto/operator/scalar/sql/TestMapNormalize.java index 2057818b13b9..5e2a36cfdecf 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/scalar/sql/TestMapNormalize.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/scalar/sql/TestMapNormalize.java @@ -42,6 +42,14 @@ public void testMapNormalize() "map_normalize(map(array['w', 'x', 'y', 'z'], array[1, 2, -3, -4]))", mapType(VARCHAR, DOUBLE), ImmutableMap.of("w", -0.25, "x", -0.5, "y", 0.75, "z", 1.0)); + assertFunction( + "map_normalize(map(array['1', '2'], array[1, -1]))", + mapType(VARCHAR, DOUBLE), + ImmutableMap.of("1", Double.POSITIVE_INFINITY, "2", Double.NEGATIVE_INFINITY)); + assertFunction( + "map_normalize(map(array['1', '2'], array[0, 0]))", + mapType(VARCHAR, DOUBLE), + ImmutableMap.of("1", Double.NaN, "2", Double.NaN)); Map expectedWithNull = new HashMap<>(); expectedWithNull.put("w", 1.0 / 7); @@ -56,5 +64,10 @@ public void testMapNormalize() assertFunction("map_normalize(map(array[], array[]))", mapType(VARCHAR, DOUBLE), ImmutableMap.of()); assertFunction("map_normalize(null)", mapType(VARCHAR, DOUBLE), null); + + Map expectedNullAndNan = new HashMap<>(); + expectedNullAndNan.put("1", null); + expectedNullAndNan.put("2", Double.NaN); + assertFunction("map_normalize(map(array['1', '2'], array[null, 0]))", mapType(VARCHAR, DOUBLE), expectedNullAndNan); } } diff --git a/presto-native-execution/entrypoint.sh b/presto-native-execution/entrypoint.sh new file mode 100755 index 000000000000..33749068064f --- /dev/null +++ b/presto-native-execution/entrypoint.sh @@ -0,0 +1,18 @@ +#!/bin/sh +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +echo "node.id=$HOSTNAME" >> /opt/presto-server/etc/node.properties + +GLOG_logtostderr=1 presto_server \ + --etc-dir=/opt/presto-server/etc \ + 2>&1 | tee /var/log/presto-server/console.log diff --git a/presto-native-execution/entrypoint_debug.sh b/presto-native-execution/entrypoint_debug.sh new file mode 100755 index 000000000000..872def24f80e --- /dev/null +++ b/presto-native-execution/entrypoint_debug.sh @@ -0,0 +1,21 @@ +#!/bin/sh +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +echo "node.id=$HOSTNAME" >> /opt/presto-server/etc/node.properties + +cp ~/.ssh/authorized_keys2 ~/.ssh/authorized_keys +/usr/sbin/sshd + +GLOG_logtostderr=1 presto_server \ + --etc-dir=/opt/presto-server/etc \ + 2>&1 | tee /var/log/presto-server/console.log diff --git a/presto-native-execution/presto_cpp/main/PrestoMain.cpp b/presto-native-execution/presto_cpp/main/PrestoMain.cpp index 9e74b5b1dacf..1d70c4936b71 100644 --- a/presto-native-execution/presto_cpp/main/PrestoMain.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoMain.cpp @@ -30,8 +30,3 @@ int main(int argc, char* argv[]) { presto.run(); PRESTO_SHUTDOWN_LOG(INFO) << "Exiting main()"; } - -// Initialize singleton for the reporter. -folly::Singleton reporter([]() { - return new facebook::velox::DummyStatsReporter(); -}); diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 7e38829c905f..ba6b7efd71a3 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -24,6 +24,7 @@ #include "presto_cpp/main/TaskResource.h" #include "presto_cpp/main/common/ConfigReader.h" #include "presto_cpp/main/common/Counters.h" +#include "presto_cpp/main/common/StatsReporterImpl.h" #include "presto_cpp/main/common/Utils.h" #include "presto_cpp/main/http/filters/AccessLogFilter.h" #include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h" @@ -227,6 +228,10 @@ void PrestoServer::run() { exit(EXIT_FAILURE); } + if (systemConfig->enableRuntimeMetricsCollection()) { + // This flag must be set to register the counters. + facebook::velox::BaseStatsReporter::registered = true; + } registerStatsCounters(); registerFileSinks(); registerFileSystems(); @@ -328,6 +333,14 @@ void PrestoServer::run() { proxygen::ResponseHandler* downstream) { server->reportServerInfo(downstream); }); + httpServer_->registerGet( + "/v1/info/health/metrics", + [server = this]( + proxygen::HTTPMessage* /*message*/, + const std::vector>& /*body*/, + proxygen::ResponseHandler* downstream) { + server->reportHealthMetrics(downstream); + }); httpServer_->registerGet( "/v1/info/state", [server = this]( @@ -1149,6 +1162,17 @@ void PrestoServer::reportServerInfo(proxygen::ResponseHandler* downstream) { http::sendOkResponse(downstream, json(serverInfo)); } +void PrestoServer::reportHealthMetrics(proxygen::ResponseHandler* downstream) { + auto reporter = std::dynamic_pointer_cast( + folly::Singleton::try_get()); + auto nodeConfig = facebook::presto::NodeConfig::instance(); + std::string cluster = nodeConfig->nodeEnvironment(); + char* hostName = std::getenv("HOSTNAME"); + std::string worker = !hostName ? "" : hostName; + prometheus::PrometheusSerializer serializer( + prometheus::Labels{{"cluster", cluster}, {"worker", worker}}); + http::sendOkResponse(downstream, reporter->getMetrics(serializer)); +} void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) { http::sendOkResponse(downstream, json(fetchNodeStatus())); } diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.h b/presto-native-execution/presto_cpp/main/PrestoServer.h index 55e654d7e64f..38da80ea0fb8 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.h +++ b/presto-native-execution/presto_cpp/main/PrestoServer.h @@ -193,6 +193,8 @@ class PrestoServer { void reportNodeStatus(proxygen::ResponseHandler* downstream); + void reportHealthMetrics(proxygen::ResponseHandler* downstream); + protocol::NodeStatus fetchNodeStatus(); void populateMemAndCPUInfo(); diff --git a/presto-native-execution/presto_cpp/main/common/CMakeLists.txt b/presto-native-execution/presto_cpp/main/common/CMakeLists.txt index 153883e23d53..af616052ab84 100644 --- a/presto-native-execution/presto_cpp/main/common/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/common/CMakeLists.txt @@ -11,7 +11,8 @@ # limitations under the License. add_library(presto_exception Exception.cpp) -add_library(presto_common Counters.cpp Utils.cpp ConfigReader.cpp Configs.cpp) +add_library(presto_common Counters.cpp Utils.cpp ConfigReader.cpp Configs.cpp + StatsReporterImpl.cpp) target_link_libraries(presto_exception velox_exception) set_property(TARGET presto_exception PROPERTY JOB_POOL_LINK diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index 20ec529c9943..f858058f6f33 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -319,6 +319,10 @@ class SystemConfig : public ConfigBase { /// cache entries. static constexpr std::string_view kCacheVeloxTtlCheckInterval{ "cache.velox.ttl-check-interval"}; + + static constexpr std::string_view kUseMmapArena{"use-mmap-arena"}; + static constexpr std::string_view kMmapArenaCapacityRatio{ + "mmap-arena-capacity-ratio"}; static constexpr std::string_view kUseMmapAllocator{"use-mmap-allocator"}; static constexpr std::string_view kEnableRuntimeMetricsCollection{ diff --git a/presto-native-execution/presto_cpp/main/common/StatsReporterImpl.cpp b/presto-native-execution/presto_cpp/main/common/StatsReporterImpl.cpp new file mode 100644 index 000000000000..f87b4f233559 --- /dev/null +++ b/presto-native-execution/presto_cpp/main/common/StatsReporterImpl.cpp @@ -0,0 +1,69 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "StatsReporterImpl.h" + +namespace facebook::presto { + +void StatsReporterImpl::registerMetricExportType( + folly::StringPiece key, + facebook::velox::StatType statType) const { + registerMetricExportType(key.start(), statType); +} + +void StatsReporterImpl::registerMetricExportType( + const char* key, + facebook::velox::StatType statType) const { + std::lock_guard lock(mutex_); + registeredStats_.emplace(key, statType); + metricsMap_.emplace(key, 0); +} + +void StatsReporterImpl::addMetricValue(const char* key, size_t value) const { + std::lock_guard lock(mutex_); + auto it = registeredStats_.find(key); + if (it == registeredStats_.end()) { + VLOG(1) << "addMetricValue() for unregistered stat " << key; + return; + } + if (it->second == facebook::velox::StatType::COUNT) { + // increment the counter. + metricsMap_[key] += value; + return; + } + // Gauge type metric value must be reset. + metricsMap_[key] = value; +} + +void StatsReporterImpl::addMetricValue(const std::string& key, size_t value) + const { + addMetricValue(key.c_str(), value); +} + +void StatsReporterImpl::addMetricValue(folly::StringPiece key, size_t value) + const { + addMetricValue(key.start(), value); +} + +const std::string StatsReporterImpl::getMetrics( + const MetricsSerializer& serializer) { + std::lock_guard lock(mutex_); + return serializer.serialize(registeredStats_, metricsMap_); +} + +// Initialize singleton for the reporter +folly::Singleton reporter([]() { + return new StatsReporterImpl(); +}); +} // namespace facebook::presto diff --git a/presto-native-execution/presto_cpp/main/common/StatsReporterImpl.h b/presto-native-execution/presto_cpp/main/common/StatsReporterImpl.h new file mode 100644 index 000000000000..c864fd38816e --- /dev/null +++ b/presto-native-execution/presto_cpp/main/common/StatsReporterImpl.h @@ -0,0 +1,174 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "presto_cpp/main/common/Configs.h" +#include "presto_cpp/main/common/Counters.h" +#include "velox/common/base/StatsReporter.h" +#include "velox/velox/common/base/Exceptions.h" + +namespace facebook::presto { + +class MetricsSerializer { + public: + virtual std::string serialize( + const std::unordered_map& + metricStatTypes, + const std::unordered_map& metricValues) const = 0; +}; + +namespace prometheus { +using Labels = std::unordered_map; +class PrometheusSerializer : public MetricsSerializer { + public: + explicit PrometheusSerializer(const Labels& labels) : labels_(labels) {} + + std::string serialize( + const std::unordered_map& + metricStatTypes, + const std::unordered_map& metricValues) const { + std::stringstream ss; + for (const auto metric : metricValues) { + auto metricName = metric.first; + std::replace(metricName.begin(), metricName.end(), '.', '_'); + auto statType = metricStatTypes.find(metric.first)->second; + ss << "# HELP " << metricName << std::endl; + std::string statTypeStr = "gauge"; + if (statType == facebook::velox::StatType::COUNT) { + statTypeStr = "counter"; + } + ss << "# TYPE " << metricName << " " << statTypeStr << std::endl; + int i = 0; + ss << metricName << "{"; + for (auto& label : labels_) { + ss << label.first << "=\"" << label.second << "\""; + if (i < labels_.size() - 1) { + // Comma separated labels. + ss << ","; + } + ++i; + } + ss << "} " << metric.second << std::endl; + } + return ss.str(); + } + + private: + // A map of labels assigned to each metric which helps in filtering at client + // end. + const Labels labels_; +}; +} // namespace prometheus. + +/// An implementation of BaseStatsReporter which gathers runtime metrics and +/// and maintains them in-memory. Users can call +/// StatsReporterImpl::getMetrics(MetricSerializer) to get metrics in custom +/// formatted string. +class StatsReporterImpl : public facebook::velox::BaseStatsReporter { + public: + StatsReporterImpl( + const std::string cluster = "", + const std::string worker = "") { + if (cluster.empty()) { + auto nodeConfig = facebook::presto::NodeConfig::instance(); + cluster_ = nodeConfig->nodeEnvironment(); + } else { + cluster_ = cluster; + } + char* hostName = std::getenv("HOSTNAME"); + workerPod_ = !hostName ? worker : hostName; + } + + /// Register a stat of the given stat type. + /// @param key The key to identify the stat. + /// @param statType How the stat is aggregated. + void registerMetricExportType( + const char* key, + facebook::velox::StatType statType) const override; + + void registerMetricExportType( + folly::StringPiece key, + facebook::velox::StatType statType) const override; + + void registerHistogramMetricExportType( + const char* /*key*/, + int64_t /* bucketWidth */, + int64_t /* min */, + int64_t /* max */, + const std::vector& /* pcts */) const override {} + + void registerHistogramMetricExportType( + folly::StringPiece /* key */, + int64_t /* bucketWidth */, + int64_t /* min */, + int64_t /* max */, + const std::vector& /* pcts */) const override {} + + void addMetricValue(const std::string& key, size_t value = 1) const override; + + void addMetricValue(const char* key, size_t value = 1) const override; + + void addMetricValue(folly::StringPiece key, size_t value = 1) const override; + + virtual void addHistogramMetricValue(const std::string& key, size_t value) + const override {} + + virtual void addHistogramMetricValue(const char* key, size_t value) + const override {} + + virtual void addHistogramMetricValue(folly::StringPiece key, size_t value) + const override {} + + const facebook::velox::StatType getRegisteredStatType( + const std::string& metricName) { + std::lock_guard lock(mutex_); + return registeredStats_[metricName]; + } + + /* + * Serializes the metrics collected so far in the format suitable for + * back filling Prometheus server. + * + * Given a metric name and a set of labels, time series are frequently + * identified using this notation: + * + * {