Skip to content

Commit

Permalink
Keep track of all long sync task references (#18769)
Browse files Browse the repository at this point in the history
* keep track of all long sync task references

* List

* flake8

* flake8

* shorten comments... ooof indents

* comment
  • Loading branch information
wjblanke authored Oct 25, 2024
1 parent e2c63db commit 0354e6e
Showing 1 changed file with 19 additions and 6 deletions.
25 changes: 19 additions & 6 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class FullNode:
subscriptions: PeerSubscriptions = dataclasses.field(default_factory=PeerSubscriptions)
_transaction_queue_task: Optional[asyncio.Task[None]] = None
simulator_transaction_callback: Optional[Callable[[bytes32], Awaitable[None]]] = None
_sync_task: Optional[asyncio.Task[None]] = None
_sync_task_list: list[asyncio.Task[None]] = dataclasses.field(default_factory=list)
_transaction_queue: Optional[TransactionQueue] = None
_compact_vdf_sem: Optional[LimitedSemaphore] = None
_new_peak_sem: Optional[LimitedSemaphore] = None
Expand Down Expand Up @@ -358,15 +358,21 @@ async def manage(self) -> AsyncIterator[None]:
if self._transaction_queue_task is not None:
self._transaction_queue_task.cancel()
cancel_task_safe(task=self.wallet_sync_task, log=self.log)
cancel_task_safe(task=self._sync_task, log=self.log)
for one_sync_task in self._sync_task_list:
if not one_sync_task.done():
cancel_task_safe(task=one_sync_task, log=self.log)

for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()):
cancel_task_safe(task, self.log)
if self._init_weight_proof is not None:
await asyncio.wait([self._init_weight_proof])
if self._sync_task is not None:
with contextlib.suppress(asyncio.CancelledError):
await self._sync_task
for one_sync_task in self._sync_task_list:
if one_sync_task.done():
self.log.info(f"Long sync task {one_sync_task.get_name()} done")
else:
with contextlib.suppress(asyncio.CancelledError):
self.log.info(f"Awaiting long sync task {one_sync_task.get_name()}")
await one_sync_task

@property
def block_store(self) -> BlockStore:
Expand Down Expand Up @@ -770,9 +776,16 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec
if await self.short_sync_batch(peer, uint32(max(curr_peak_height - 6, 0)), request.height):
return None

# Clean up task reference list (used to prevent gc from killing running tasks)
for oldtask in self._sync_task_list[:]:
if oldtask.done():
self._sync_task_list.remove(oldtask)

# This is the either the case where we were not able to sync successfully (for example, due to the fork
# point being in the past), or we are very far behind. Performs a long sync.
self._sync_task = asyncio.create_task(self._sync())
# Multiple tasks may be created here. If we don't save all handles, a task could enter a sync object
# and be cleaned up by the GC, corrupting the sync object and possibly not allowing anything else in.
self._sync_task_list.append(asyncio.create_task(self._sync()))

async def send_peak_to_timelords(
self, peak_block: Optional[FullBlock] = None, peer: Optional[WSChiaConnection] = None
Expand Down

0 comments on commit 0354e6e

Please sign in to comment.