diff --git a/chia/full_node/full_node.py b/chia/full_node/full_node.py index e72689c129a2..f5531e8d7b90 100644 --- a/chia/full_node/full_node.py +++ b/chia/full_node/full_node.py @@ -142,7 +142,7 @@ class FullNode: subscriptions: PeerSubscriptions = dataclasses.field(default_factory=PeerSubscriptions) _transaction_queue_task: Optional[asyncio.Task[None]] = None simulator_transaction_callback: Optional[Callable[[bytes32], Awaitable[None]]] = None - _sync_task: Optional[asyncio.Task[None]] = None + _sync_task_list: list[asyncio.Task[None]] = dataclasses.field(default_factory=list) _transaction_queue: Optional[TransactionQueue] = None _compact_vdf_sem: Optional[LimitedSemaphore] = None _new_peak_sem: Optional[LimitedSemaphore] = None @@ -358,15 +358,21 @@ async def manage(self) -> AsyncIterator[None]: if self._transaction_queue_task is not None: self._transaction_queue_task.cancel() cancel_task_safe(task=self.wallet_sync_task, log=self.log) - cancel_task_safe(task=self._sync_task, log=self.log) + for one_sync_task in self._sync_task_list: + if not one_sync_task.done(): + cancel_task_safe(task=one_sync_task, log=self.log) for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()): cancel_task_safe(task, self.log) if self._init_weight_proof is not None: await asyncio.wait([self._init_weight_proof]) - if self._sync_task is not None: - with contextlib.suppress(asyncio.CancelledError): - await self._sync_task + for one_sync_task in self._sync_task_list: + if one_sync_task.done(): + self.log.info(f"Long sync task {one_sync_task.get_name()} done") + else: + with contextlib.suppress(asyncio.CancelledError): + self.log.info(f"Awaiting long sync task {one_sync_task.get_name()}") + await one_sync_task @property def block_store(self) -> BlockStore: @@ -770,9 +776,16 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: WSChiaConnec if await self.short_sync_batch(peer, uint32(max(curr_peak_height - 6, 0)), request.height): return None + # Clean up task reference list (used to prevent gc from killing running tasks) + for oldtask in self._sync_task_list[:]: + if oldtask.done(): + self._sync_task_list.remove(oldtask) + # This is the either the case where we were not able to sync successfully (for example, due to the fork # point being in the past), or we are very far behind. Performs a long sync. - self._sync_task = asyncio.create_task(self._sync()) + # Multiple tasks may be created here. If we don't save all handles, a task could enter a sync object + # and be cleaned up by the GC, corrupting the sync object and possibly not allowing anything else in. + self._sync_task_list.append(asyncio.create_task(self._sync())) async def send_peak_to_timelords( self, peak_block: Optional[FullBlock] = None, peer: Optional[WSChiaConnection] = None