diff --git a/ghcide/session-loader/Development/IDE/Session.hs b/ghcide/session-loader/Development/IDE/Session.hs index 31b1f5965b..64c2bae003 100644 --- a/ghcide/session-loader/Development/IDE/Session.hs +++ b/ghcide/session-loader/Development/IDE/Session.hs @@ -91,7 +91,6 @@ import Data.Void import Control.Concurrent.STM.Stats (atomically, modifyTVar', readTVar, writeTVar) -import Control.Concurrent.STM.TQueue import Control.DeepSeq import Control.Exception (evaluate) import Control.Monad.IO.Unlift (MonadUnliftIO) @@ -103,7 +102,8 @@ import qualified Data.HashSet as Set import qualified Data.Set as OS import Database.SQLite.Simple import Development.IDE.Core.Tracing (withTrace) -import Development.IDE.Core.WorkerThread (awaitRunInThread, +import Development.IDE.Core.WorkerThread (WorkerQueue, + awaitRunInThread, withWorkerQueue) import qualified Development.IDE.GHC.Compat.Util as Compat import Development.IDE.Session.Diagnostics (renderCradleError) @@ -421,7 +421,7 @@ getHieDbLoc dir = do -- components mapping to the same hie.yaml file are mapped to the same -- HscEnv which is updated as new components are discovered. -loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> TQueue (IO ()) -> IO (Action IdeGhcSession) +loadSessionWithOptions :: Recorder (WithPriority Log) -> SessionLoadingOptions -> FilePath -> WorkerQueue (IO ()) -> IO (Action IdeGhcSession) loadSessionWithOptions recorder SessionLoadingOptions{..} rootDir que = do let toAbsolutePath = toAbsolute rootDir -- see Note [Root Directory] cradle_files <- newIORef [] diff --git a/ghcide/src/Development/IDE/Core/Compile.hs b/ghcide/src/Development/IDE/Core/Compile.hs index 600ea9777e..b81f7f38e9 100644 --- a/ghcide/src/Development/IDE/Core/Compile.hs +++ b/ghcide/src/Development/IDE/Core/Compile.hs @@ -75,6 +75,7 @@ import Development.IDE.Core.ProgressReporting (ProgressReporting (..)) import Development.IDE.Core.RuleTypes import Development.IDE.Core.Shake import Development.IDE.Core.Tracing (withTrace) +import Development.IDE.Core.WorkerThread (writeWorkerQueue) import Development.IDE.GHC.Compat hiding (assert, loadInterface, parseHeader, @@ -84,6 +85,7 @@ import Development.IDE.GHC.Compat hiding (assert, import qualified Development.IDE.GHC.Compat as Compat import qualified Development.IDE.GHC.Compat as GHC import qualified Development.IDE.GHC.Compat.Util as Util +import Development.IDE.Core.ProgressReporting (ProgressReporting (..), progressReportingOutsideState) import Development.IDE.GHC.CoreFile import Development.IDE.GHC.Error import Development.IDE.GHC.Orphans () @@ -795,7 +797,7 @@ indexHieFile se mod_summary srcPath !hash hf = do -- hiedb doesn't use the Haskell src, so we clear it to avoid unnecessarily keeping it around let !hf' = hf{hie_hs_src = mempty} modifyTVar' indexPending $ HashMap.insert srcPath hash - writeTQueue indexQueue $ \withHieDb -> do + writeWorkerQueue indexQueue $ \withHieDb -> do -- We are now in the worker thread -- Check if a newer index of this file has been scheduled, and if so skip this one newerScheduled <- atomically $ do diff --git a/ghcide/src/Development/IDE/Core/FileStore.hs b/ghcide/src/Development/IDE/Core/FileStore.hs index 6c0cb875b0..90dc69670b 100644 --- a/ghcide/src/Development/IDE/Core/FileStore.hs +++ b/ghcide/src/Development/IDE/Core/FileStore.hs @@ -22,7 +22,6 @@ module Development.IDE.Core.FileStore( ) where import Control.Concurrent.STM.Stats (STM, atomically) -import Control.Concurrent.STM.TQueue (writeTQueue) import Control.Exception import Control.Monad.Extra import Control.Monad.IO.Class @@ -40,6 +39,7 @@ import Development.IDE.Core.IdeConfiguration (isWorkspaceFile) import Development.IDE.Core.RuleTypes import Development.IDE.Core.Shake hiding (Log) import qualified Development.IDE.Core.Shake as Shake +import Development.IDE.Core.WorkerThread (writeWorkerQueue) import Development.IDE.GHC.Orphans () import Development.IDE.Graph import Development.IDE.Import.DependencyInformation @@ -247,7 +247,7 @@ typecheckParentsAction recorder nfp = do setSomethingModified :: VFSModified -> IdeState -> String -> IO [Key] -> IO () setSomethingModified vfs state reason actionBetweenSession = do -- Update database to remove any files that might have been renamed/deleted - atomically $ writeTQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles) + atomically $ writeWorkerQueue (indexQueue $ hiedbWriter $ shakeExtras state) (\withHieDb -> withHieDb deleteMissingRealFiles) void $ restartShakeSession (shakeExtras state) vfs reason [] actionBetweenSession registerFileWatches :: [String] -> LSP.LspT Config IO Bool diff --git a/ghcide/src/Development/IDE/Core/Shake.hs b/ghcide/src/Development/IDE/Core/Shake.hs index d8db7f67ca..09a69c814c 100644 --- a/ghcide/src/Development/IDE/Core/Shake.hs +++ b/ghcide/src/Development/IDE/Core/Shake.hs @@ -250,12 +250,12 @@ data HieDbWriter -- | Actions to queue up on the index worker thread -- The inner `(HieDb -> IO ()) -> IO ()` wraps `HieDb -> IO ()` -- with (currently) retry functionality -type IndexQueue = TQueue (((HieDb -> IO ()) -> IO ()) -> IO ()) +type IndexQueue = WorkerQueue (((HieDb -> IO ()) -> IO ()) -> IO ()) data ThreadQueue = ThreadQueue { tIndexQueue :: IndexQueue - , tRestartQueue :: TQueue (IO ()) - , tLoaderQueue :: TQueue (IO ()) + , tRestartQueue :: WorkerQueue (IO ()) + , tLoaderQueue :: WorkerQueue (IO ()) } -- Note [Semantic Tokens Cache Location] @@ -326,9 +326,9 @@ data ShakeExtras = ShakeExtras -- ^ Default HLS config, only relevant if the client does not provide any Config , dirtyKeys :: TVar KeySet -- ^ Set of dirty rule keys since the last Shake run - , restartQueue :: TQueue (IO ()) + , restartQueue :: WorkerQueue (IO ()) -- ^ Queue of restart actions to be run. - , loaderQueue :: TQueue (IO ()) + , loaderQueue :: WorkerQueue (IO ()) -- ^ Queue of loader actions to be run. } diff --git a/ghcide/src/Development/IDE/Core/WorkerThread.hs b/ghcide/src/Development/IDE/Core/WorkerThread.hs index a38da77f38..dd10c5f7e6 100644 --- a/ghcide/src/Development/IDE/Core/WorkerThread.hs +++ b/ghcide/src/Development/IDE/Core/WorkerThread.hs @@ -7,15 +7,22 @@ Description : This module provides an API for managing worker threads in the IDE see Note [Serializing runs in separate thread] -} module Development.IDE.Core.WorkerThread - (withWorkerQueue, awaitRunInThread) + (withWorkerQueue + , awaitRunInThread + , withWorkerQueueOfOne + , WorkerQueue + , writeWorkerQueue + , waitUntilWorkerQueueEmpty) where import Control.Concurrent.Async (withAsync) import Control.Concurrent.STM import Control.Concurrent.Strict (newBarrier, signalBarrier, waitBarrier) -import Control.Monad (forever) +import Control.Exception (finally) +import Control.Monad (forever, unless) import Control.Monad.Cont (ContT (ContT)) +import Control.Monad.IO.Class (liftIO) {- Note [Serializing runs in separate thread] @@ -28,27 +35,75 @@ Originally we used various ways to implement this, but it was hard to maintain a Moreover, we can not stop these threads uniformly when we are shutting down the server. -} --- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker +data WorkerQueue a = WorkerQueueOfOne (TMVar a) | WorkerQueueOfMany (TQueue a) + +-- | peekWorkerQueue returns the next action in the queue without removing it. +peekWorkerQueue :: WorkerQueue a -> STM a +peekWorkerQueue (WorkerQueueOfOne tVar) = readTMVar tVar +peekWorkerQueue (WorkerQueueOfMany tQueue) = peekTQueue tQueue + +-- | readWorkerQueue returns the next action in the queue and removes it. +readWorkerQueue :: WorkerQueue a -> STM a +readWorkerQueue (WorkerQueueOfOne tVar) = takeTMVar tVar +readWorkerQueue (WorkerQueueOfMany tQueue) = readTQueue tQueue + +writeWorkerQueue :: WorkerQueue a -> a -> STM () +writeWorkerQueue (WorkerQueueOfOne tVar) action = putTMVar tVar action +writeWorkerQueue (WorkerQueueOfMany tQueue) action = writeTQueue tQueue action + +-- | waitUntilWorkerQueueEmpty blocks until the worker queue is empty. +waitUntilWorkerQueueEmpty :: WorkerQueue a -> STM () +waitUntilWorkerQueueEmpty (WorkerQueueOfOne tVar) = do + isEmpty <- isEmptyTMVar tVar + unless isEmpty retry +waitUntilWorkerQueueEmpty (WorkerQueueOfMany queue) = do + isEmpty <- isEmptyTQueue queue + unless isEmpty retry + +newWorkerQueue :: STM (WorkerQueue a) +newWorkerQueue = WorkerQueueOfMany <$> newTQueue + +newWorkerQueueOfOne :: STM (WorkerQueue a) +newWorkerQueueOfOne = WorkerQueueOfOne <$> newEmptyTMVar + +-- | 'withWorkerQueue' creates a new 'WorkerQueue', and launches a worker -- thread which polls the queue for requests and runs the given worker -- function on them. -withWorkerQueue :: (t -> IO a) -> ContT () IO (TQueue t) -withWorkerQueue workerAction = ContT $ \mainAction -> do - q <- newTQueueIO +withWorkerQueue :: (t -> IO a) -> ContT () IO (WorkerQueue t) +withWorkerQueue workerAction = do + q <- liftIO $ atomically newWorkerQueue + runWorkerQueue q workerAction + +-- | 'withWorkerQueueOfOne' creates a new 'WorkerQueue' that only allows one action to be queued at a time. +-- and one action can only be queued after the previous action has been done. +-- this is useful when we want to cancel the action waiting to be enqueue if it's thread is cancelled. +-- e.g. session loading in session loader. When a shake session is restarted +-- , we want to cancel the previous pending session loading. +-- since the hls-graph can handle the retrying of the session loading. +withWorkerQueueOfOne :: (t -> IO a) -> ContT () IO (WorkerQueue t) +withWorkerQueueOfOne workerAction = do + q <- liftIO $ atomically newWorkerQueueOfOne + runWorkerQueue q workerAction + +runWorkerQueue :: WorkerQueue t -> (t -> IO a) -> ContT () IO (WorkerQueue t) +runWorkerQueue q workerAction = ContT $ \mainAction -> do withAsync (writerThread q) $ \_ -> mainAction q where writerThread q = forever $ do - l <- atomically $ readTQueue q - workerAction l + -- peek the action from the queue, run it and then remove it from the queue + l <- atomically $ peekWorkerQueue q + workerAction l `finally` atomically (readWorkerQueue q) + -- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread, -- and then blocks until the result is computed. -awaitRunInThread :: TQueue (IO ()) -> IO result -> IO result +awaitRunInThread :: WorkerQueue (IO ()) -> IO result -> IO result awaitRunInThread q act = do -- Take an action from TQueue, run it and -- use barrier to wait for the result barrier <- newBarrier - atomically $ writeTQueue q $ do + atomically $ writeWorkerQueue q $ do res <- act signalBarrier barrier res waitBarrier barrier diff --git a/ghcide/src/Development/IDE/LSP/LanguageServer.hs b/ghcide/src/Development/IDE/LSP/LanguageServer.hs index cf7845ce08..6743b9932f 100644 --- a/ghcide/src/Development/IDE/LSP/LanguageServer.hs +++ b/ghcide/src/Development/IDE/LSP/LanguageServer.hs @@ -39,7 +39,8 @@ import Control.Monad.Trans.Cont (evalContT) import Development.IDE.Core.IdeConfiguration import Development.IDE.Core.Shake hiding (Log) import Development.IDE.Core.Tracing -import Development.IDE.Core.WorkerThread (withWorkerQueue) +import Development.IDE.Core.WorkerThread (withWorkerQueue, + withWorkerQueueOfOne) import qualified Development.IDE.Session as Session import Development.IDE.Types.Shake (WithHieDb, WithHieDbShield (..)) @@ -261,7 +262,7 @@ handleInit recorder defaultRoot getHieDbLoc getIdeState lifetime exitClientMsg c runWithWorkerThreads :: Recorder (WithPriority Session.Log) -> FilePath -> (WithHieDb -> ThreadQueue -> IO ()) -> IO () runWithWorkerThreads recorder dbLoc f = evalContT $ do sessionRestartTQueue <- withWorkerQueue id - sessionLoaderTQueue <- withWorkerQueue id + sessionLoaderTQueue <- withWorkerQueueOfOne id (WithHieDbShield hiedb, threadQueue) <- runWithDb recorder dbLoc liftIO $ f hiedb (ThreadQueue threadQueue sessionRestartTQueue sessionLoaderTQueue)