Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark scala udf fuction ,it always fails with such error:External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF #648

Open
frencopei opened this issue Nov 14, 2024 · 3 comments

Comments

@frencopei
Copy link

Describe the bug
spark verion: 3.2.2
scala version: Scala version 2.12.15
blaze version: 4.0

yarn executor log:
2024-11-14 18:54:08,243 [ERROR] [task-result-getter-0] Task 0 in stage 0.0 failed 4 times; aborting job (org.apache.spark.scheduler.TaskSetManager(org.apache.spark.internal.Logging.logError:73))
2024-11-14 18:54:08,261 [ERROR] [Driver] User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 76) (10.16.7.17 executor 1): java.lang.RuntimeException: [partition=0] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[Project]: output() returns error: External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

Driver stacktrace: (org.apache.spark.deploy.yarn.ApplicationMaster(org.apache.spark.internal.Logging.logError:94))
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 76) (10.16.7.17 executor 1): java.lang.RuntimeException: [partition=0] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[Project]: output() returns error: External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: [partition=0] panics: Execution error: Execution error: output_with_sender[Shuffle] error: Execution error: output_with_sender[Project] error: Execution error: output_with_sender[Project]: output() returns error: External error: Java exception thrown at native-engine/datafusion-ext-exprs/src/spark_udf_wrapper.rs:94: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF

To Reproduce
1 coding:
import com..task.CommonMain
import com.
.task.bi.commom.CommonFilterCondition.BOOTSTRAP_FILTER_CONDITION
import com..task.bi.commom.CommonHiveUtils.{getHiveDataByMonth, getHiveDataByWeek}
import com.
.task.panoramicview.ClickHouseUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._

object MyExample extends CommonMain {

val RESULT_TABLE = "ads_nps_stability_audio_exception_mi_cluster"
val AUDIO_TABLE= "terminal_tv.ods_nps_stability_audio_dfx_exception_di"

override def execute(spark: SparkSession, calcDate: String): Unit = {
import spark.implicits._

val firstDay =calcDate.substring(0,7)+"-01"

// read hive table
val result = getHiveDataByMonth(spark,AUDIO_TABLE,calcDate)
.filter(BOOTSTRAP_FILTER_CONDITION)
.filter(" deviceType is not null and deviceType not in ('','000') ")
.select("dnum","deviceType","tclOsVersion","exceptionName","halAlgResult","underrunCnt","repairAction",
"repairSuccess","trackPkgName")
.withColumn("underloadLevel",valueRangeUDF(col("underrunCnt")))
.withColumn("trackPkgName",expr(" case when trackPkgName is not null and trackPkgName not in ( '','null') " +
" then trackPkgName else 'unknown' end "))
.withColumn("chip", split($"deviceType", "-")(2))
.select("dnum", "chip","deviceType", "tclOsVersion", "exceptionName", "halAlgResult", "underloadLevel", "repairAction",
"repairSuccess", "trackPkgName")
.groupBy("chip","deviceType", "tclOsVersion", "exceptionName", "halAlgResult", "underloadLevel", "repairAction",
"repairSuccess", "trackPkgName")
.agg(countDistinct("dnum").alias("machineCount"),
count("dnum").alias("timeCount"))
.withColumn("recordDate",lit(firstDay))
.select("chip","deviceType", "tclOsVersion", "exceptionName", "halAlgResult", "underloadLevel", "repairAction",
"repairSuccess", "trackPkgName","machineCount","timeCount","recordDate")

result.show()
ClickHouseUtils.saveToDb(result, RESULT_TABLE, firstDay)

}

def valueRangeUDF:UserDefinedFunction=udf{(valueStr: String) =>

val value = valueStr.toDouble
// return range discription by value
value match {
  case v if v == 0 => "0"
  case v if v > 0 && v <= 100 => "(0-100]"
  case v if v > 100 && v <= 200 => "(100-200]"
  case v if v > 200 && v <= 300 => "(200-300]"
  case v if v > 300 && v <= 400 => "(300-400]"
  case v if v > 400 && v <= 500 => "(400-500]"
  case v if v > 500 && v <= 600 => "(500-600]"
  case v if v > 600 && v <= 700 => "(600-700]"
  case v if v > 700 && v <= 800 => "(700-800]"
  case v if v > 800 && v <= 900 => "(800-900]"
  case v if v > 900 && v <= 1000 => "(900-1000]"
  case v if v > 10000 && v <= 15000 => "(10000-15000]"
  case v if v > 15000 && v <= 20000 => "(15000-20000]"
  case v if v > 20000 && v <= 25000 => "(20000-25000]"
  case v if v > 25000 && v <= 30000 => "(25000-30000]"
  case v if v > 30000 => "大于 30000"
  case _ => "Invalid Range" // 
}

}
}

2 aflter maven build jar, spark-submit run it , always failed
it seem blaze cannot convert UDF class .
/usr/local/bin/spark-submit --class com.tcl.task.MyExample --master yarn --jars .... --executor-memory 12G --executor-cores 6 --driver-memory 6g --conf spark.sql.broadcastTimeout=600 --deploy-mode cluster --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=4 --conf spark.dynamicAllocation.maxExecutors=8 --conf spark.yarn.executor.memoryOverhead=1024 cosn://dc-bucket/myapp/MyScalaExample.jar 2024-10-12 yarn

exception:
image

@wfxxh
Copy link

wfxxh commented Nov 19, 2024

I have got the same error.
QQ_1731993126356

My udf is :
QQ_1731993186655

@richox
Copy link
Collaborator

richox commented Nov 27, 2024

seems related to https://issues.apache.org/jira/browse/SPARK-29497
could you try to recompile the code containing ScalaUDF with the same scala minor version to spark and see if this issue still happens?

@frencopei
Copy link
Author

seems related to https://issues.apache.org/jira/browse/SPARK-29497 could you try to recompile the code containing ScalaUDF with the same scala minor version to spark and see if this issue still happens?

i double check it is the same min version.so any other possibility?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants