@@ -507,10 +507,10 @@ public void notifyCheckpointStart(SubtaskKey subtaskKey, long checkpointId) {
507507 managedSharedStateDirHandles .computeIfPresent (
508508 subtaskKey ,
509509 (k , v ) -> {
510- v .increaseRefCountWhenCheckpointStart (checkpointId );
510+ v .addReferenceWhenCheckpointStart (checkpointId );
511511 return v ;
512512 });
513- managedExclusiveStateDirHandle .increaseRefCountWhenCheckpointStart (checkpointId );
513+ managedExclusiveStateDirHandle .addReferenceWhenCheckpointStart (checkpointId );
514514 }
515515 }
516516
@@ -534,10 +534,10 @@ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) th
534534 managedSharedStateDirHandles .computeIfPresent (
535535 subtaskKey ,
536536 (k , v ) -> {
537- v .decreaseRefCountWhenCheckpointAbort (checkpointId );
537+ v .removeReferenceWhenCheckpointAbort (checkpointId );
538538 return v ;
539539 });
540- managedExclusiveStateDirHandle .decreaseRefCountWhenCheckpointAbort (checkpointId );
540+ managedExclusiveStateDirHandle .removeReferenceWhenCheckpointAbort (checkpointId );
541541 }
542542
543543 synchronized (lock ) {
@@ -948,20 +948,22 @@ boolean isCheckpointDiscard(long checkpointId) {
948948 }
949949
950950 /**
951- * This class wrap DirectoryStreamStateHandle with reference count by ongoing checkpoint. If an
952- * ongoing checkpoint which reference the directory handle complete, we will stop tracking the
953- * handle, because the ownership of the handle is handover to JobManager.
951+ * This class wrap DirectoryStreamStateHandle with reference by ongoing checkpoint. If an
952+ * ongoing checkpoint which reference the directory handle complete or be subsumed, we will stop
953+ * tracking the handle, because the ownership of the handle is handover to JobManager.
954+ * JobManager acknowledges the handle and will clean up the directory when it is no longer
955+ * needed.
954956 */
955957 protected static class DirectoryHandleWithReferenceTrack {
956958
957959 private final DirectoryStreamStateHandle directoryHandle ;
958- // reference count by ongoing checkpoint
959- private final AtomicLong ongoingRefCount ;
960+ // reference by ongoing checkpoint
961+ private final Set < Long > refCheckpointIds ;
960962 private boolean tracking ;
961963
962964 DirectoryHandleWithReferenceTrack (DirectoryStreamStateHandle directoryHandle , boolean own ) {
963965 this .directoryHandle = directoryHandle ;
964- this .ongoingRefCount = new AtomicLong ( 0 );
966+ this .refCheckpointIds = new HashSet <>( );
965967 this .tracking = own ;
966968 }
967969
@@ -974,23 +976,23 @@ DirectoryStreamStateHandle getHandle() {
974976 return directoryHandle ;
975977 }
976978
977- void increaseRefCountWhenCheckpointStart (long checkpointId ) {
979+ void addReferenceWhenCheckpointStart (long checkpointId ) {
978980 if (tracking ) {
979981 LOG .debug (
980- "checkpoint:{} start, increase ref-count to file-merging managed shared dir : {}" ,
982+ "checkpoint:{} start, add reference to file-merging managed shared dir : {}" ,
981983 checkpointId ,
982984 directoryHandle .getDirectory ());
983- ongoingRefCount . incrementAndGet ( );
985+ refCheckpointIds . add ( checkpointId );
984986 }
985987 }
986988
987- void decreaseRefCountWhenCheckpointAbort (long checkpointId ) {
989+ void removeReferenceWhenCheckpointAbort (long checkpointId ) {
988990 if (tracking ) {
989991 LOG .debug (
990- "checkpoint:{} aborted, decrease ref-count to file-merging managed shared dir : {}" ,
992+ "checkpoint:{} aborted, remove reference to file-merging managed shared dir : {}" ,
991993 checkpointId ,
992994 directoryHandle .getDirectory ());
993- ongoingRefCount . decrementAndGet ( );
995+ refCheckpointIds . remove ( checkpointId );
994996 }
995997 }
996998
@@ -1001,6 +1003,7 @@ void handoverOwnershipWhenCheckpointComplete(long checkpointId) {
10011003 checkpointId ,
10021004 directoryHandle .getDirectory ());
10031005 tracking = false ;
1006+ refCheckpointIds .clear ();
10041007 }
10051008 }
10061009
@@ -1011,11 +1014,12 @@ void handoverOwnershipWhenCheckpointSubsumed(long checkpointId) {
10111014 checkpointId ,
10121015 directoryHandle .getDirectory ());
10131016 tracking = false ;
1017+ refCheckpointIds .clear ();
10141018 }
10151019 }
10161020
10171021 void tryCleanupQuietly () {
1018- if (tracking && ongoingRefCount . get () == 0 && directoryHandle != null ) {
1022+ if (tracking && refCheckpointIds . isEmpty () && directoryHandle != null ) {
10191023 try {
10201024 directoryHandle .discardState ();
10211025 } catch (Exception e ) {
0 commit comments