Skip to content

Commit

Permalink
Release the reference to the calling Spark session upon successful co… (
Browse files Browse the repository at this point in the history
#4010)

…mpletion of NonFateSharingFuture

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

SparkSession object which initiated the Delta snapshot is kept alive for
the lifetime of the snapshot. This change releases the reference to
SparkSession once the future completes.

## How was this patch tested?

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
slava-min authored Jan 6, 2025
1 parent b6745bb commit d6d6b04
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,10 @@ class NonFateSharingFuture[T](pool: DeltaThreadPool)(f: SparkSession => T)
// Prefer to get a prefetched result from the future, but never fail because of it.
val futureResult = futureOpt.flatMap { case (ownerSession, future) =>
try {
Some(ThreadUtils.awaitResult(future, timeout))
val result = Some(ThreadUtils.awaitResult(future, timeout))
// no reason to keep the reference to the calling session anymore
futureOpt = Some(null, future)
result
} catch {
// NOTE: ThreadUtils.awaitResult wraps all non-fatal exceptions other than TimeoutException
// with SparkException. Meanwhile, Java Future.get only throws four exceptions:
Expand Down

0 comments on commit d6d6b04

Please sign in to comment.