From 946c36c16af2e42071667a806e5e7b545b59a979 Mon Sep 17 00:00:00 2001 From: chaoqin-li1123 <55518381+chaoqin-li1123@users.noreply.github.com> Date: Fri, 26 Jan 2024 12:08:39 -0800 Subject: [PATCH] close reader in all case (#172) --- .../org/apache/spark/sql/pulsar/PulsarSourceRDD.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala index 49461ec..9442ccc 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSourceRDD.scala @@ -70,7 +70,7 @@ private[pulsar] abstract class PulsarSourceRDDBase( .loadConf(readerConf) .create() - new NextIterator[InternalRow] { + val iter = new NextIterator[InternalRow] { private var inEnd: Boolean = false private var isLast: Boolean = false @@ -150,7 +150,6 @@ private[pulsar] abstract class PulsarSourceRDDBase( } catch { case e: PulsarClientException => logError(s"PulsarClient failed to read message from topic $topic", e) - close() throw e case e: Throwable => throw e @@ -161,6 +160,10 @@ private[pulsar] abstract class PulsarSourceRDDBase( reader.close() } } + context.addTaskCompletionListener[Unit] { _ => + iter.closeIfNeeded() + } + iter } }