From e1fe32136201edd8d8db5bd3936bfdad37b058c6 Mon Sep 17 00:00:00 2001 From: guillaume-dequenne-sonarsource Date: Tue, 28 Jan 2025 14:12:21 +0000 Subject: [PATCH] SONARPY-2489 Create rule S7189 PySpark DataFrames used multiple times should be cached or persisted --- rules/S7189/metadata.json | 2 + rules/S7189/python/metadata.json | 23 ++++++++ rules/S7189/python/rule.adoc | 94 ++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 rules/S7189/metadata.json create mode 100644 rules/S7189/python/metadata.json create mode 100644 rules/S7189/python/rule.adoc diff --git a/rules/S7189/metadata.json b/rules/S7189/metadata.json new file mode 100644 index 00000000000..2c63c085104 --- /dev/null +++ b/rules/S7189/metadata.json @@ -0,0 +1,2 @@ +{ +} diff --git a/rules/S7189/python/metadata.json b/rules/S7189/python/metadata.json new file mode 100644 index 00000000000..f7b8f245219 --- /dev/null +++ b/rules/S7189/python/metadata.json @@ -0,0 +1,23 @@ +{ + "title": "PySpark DataFrames used multiple times should be cached or persisted", + "type": "CODE_SMELL", + "status": "ready", + "remediation": { + "func": "Constant\/Issue", + "constantCost": "5min" + }, + "tags": [ + ], + "defaultSeverity": "Major", + "ruleSpecification": "RSPEC-7189", + "sqKey": "S7189", + "scope": "All", + "defaultQualityProfiles": ["Sonar way"], + "quickfix": "unknown", + "code": { + "impacts": { + "MAINTAINABILITY": "MEDIUM" + }, + "attribute": "EFFICIENT" + } +} diff --git a/rules/S7189/python/rule.adoc b/rules/S7189/python/rule.adoc new file mode 100644 index 00000000000..d08c3146d3a --- /dev/null +++ b/rules/S7189/python/rule.adoc @@ -0,0 +1,94 @@ +This rule raises an issue when a PySpark DataFrame is used multiple times without being cached using the `.cache()` method. + +== Why is this an issue? + +In Spark, transformations on DataFrames are lazy, meaning they are not executed until an action (like `count`, `collect`, etc.) is called. If you perform multiple actions on the same `DataFrame` without caching or persisting it, Spark will recompute the entire lineage of transformations for each action. By caching or persisting the DataFrame, you store the result of the transformations, avoiding the need to recompute them each time. + +For this reason, DataFrames that are reused across multiple functions or operations should be cached using the `.cache()` method. This practice helps to prevent unnecessary recomputations, which can be resource-intensive and time-consuming. By caching `DataFrames`, you can leverage Spark's in-memory computation capabilities to enhance performance. This also reduces the need to read data from the original source repeatedly. + +If the DataFrame is too large to fit into memory, consider using .persist() with an appropriate storage level instead of .cache(). + +== How to fix it +To fix this issue, make sure to either cache or persist `DataFrames` that are reused multiple times. + +=== Code examples + +==== Noncompliant code example + +[source,python,diff-id=1,diff-type=noncompliant] +---- +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("Example").getOrCreate() + +df = spark.read.csv("data.csv") # Noncompliant + +def transform_data_1(df): + # Some transformations + return df.filter(df['value'] > 10) + +def transform_data_2(df): + # Some other transformations + return df.groupBy('category').count() + +def transform_data_3(df): + # Some other transformations + return df.groupBy('customerID').count() + +result1 = transform_data_1(df) +result2 = transform_data_2(df) +result3 = transform_data_3(df) +---- + +==== Compliant solution + +[source,python,diff-id=1,diff-type=compliant] +---- +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("Example").getOrCreate() + +df = spark.read.csv("data.csv").cache() # Compliant + +def transform_data_1(df): + # Some transformations + return df.filter(df['value'] > 10) + +def transform_data_2(df): + # Some other transformations + return df.groupBy('category').count() + +def transform_data_3(df): + # Some other transformations + return df.groupBy('customerID').count() + +result1 = transform_data_1(df) +result2 = transform_data_2(df) +result3 = transform_data_3(df) +---- + +== Resources +=== Documentation + +* Spark documentation - https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.cache.html[pyspark.sql.DataFrame.cache] + +=== Articles & blog posts +* Spark by Example - https://sparkbyexamples.com/pyspark/pyspark-cache-explained/[PySpark cache explained] + +ifdef::env-github,rspecator-view[] +=== Implementation Specification + +=== Message + +Consider caching or persisting this DataFrame. + +=== Highlighting + +The API call reading and creating the initial DataFrame. + +=== Quickfix + +We can add the `.cache()` method to the DataFrame. +Quick fix message: `Cache the DataFrame`. + +endif::env-github,rspecator-view[]