Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b4eab45
Pop the log hook after compilation to avoid memory leaks
tek Jun 2, 2025
6d20b25
Directly store DynFlags in the new unit env in the metadata step
tek May 28, 2025
e196336
Use newtype `TargetId` in `TestSetup`
tek Jun 3, 2025
8b4222e
Instrument: support multiple workers per session
sjoerdvisscher Jun 4, 2025
e11e39e
Remove some obsolete code and comments
tek May 28, 2025
50e383e
Handle Interp sharing in make mode by storing it in the cache
tek Jun 4, 2025
e9a5118
Move make mode related state into a separate type
tek Jun 4, 2025
aec0ec1
Refactor `Metadata` to use the same function as `CompileHpt` to resto…
tek Jun 4, 2025
6411817
Remove legacy code for initializing the unit env in tests
tek Jun 4, 2025
32dff98
Remove obsolete function
tek Jun 4, 2025
9405d81
Disable finder cache statistics
tek May 28, 2025
d3fbe53
Remove the command env from `Args`
tek May 28, 2025
51d1bee
Fix some names
tek May 28, 2025
4a73ba4
Enable temp file cleanup after compiling a module
tek Jun 4, 2025
98a0baf
Refactor make mode state into a separate module
tek Jun 4, 2025
2b90b00
Refactor statistics collection logic into a separate module
tek Jun 4, 2025
3148e77
Refactor oneshot mode state into a separate module
tek Jun 4, 2025
5732e44
rename `Cache` to `WorkerState`
tek Jun 5, 2025
7338711
Add CLI option to enable instrumentation
tek May 15, 2025
d944b50
Use BUCK_BUILD_ID for the primary socket path if it exists
tek Jun 6, 2025
201e6f1
Remove accidental debug code introduced in 6f97ca36
tek Jun 12, 2025
f2a689c
Grpc: We're not going to use grpc metadata, so declare NoMetadata for…
sjoerdvisscher Jun 12, 2025
63a55ea
Remove accidental inclusion of -DMWB in the Finder interface CPP
tek Jun 17, 2025
92d1cc9
Replace the `debug` flag in `Log` with a simple log level
tek Jun 12, 2025
2444b68
Use the Buck trace ID to prefix log directories
tek Jun 12, 2025
fb785c3
Dump the module graph and HUG to the log file on failure
tek Jun 12, 2025
3628e2c
Use the unit ID as the target for the metadata step
tek Jun 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions buck-proxy/lib/BuckProxy/Orchestration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, modifyMVar)
import Control.Exception (throwIO, try)
import Control.Monad (void, when)
import Data.Map.Strict (Map)
import Data.Map.Strict (Map, (!?))
import Data.Map.Strict qualified as Map
import Data.Maybe (isJust)
import Data.Maybe (fromMaybe, isJust)
import Data.Text qualified as Text
import Data.Text.Encoding (decodeUtf8Lenient)
import Network.GRPC.Client (Connection, Server (..), recvNextOutput, sendFinalInput, withConnection, withRPC)
Expand All @@ -37,7 +37,7 @@ import System.Process (ProcessHandle, getProcessExitCode, spawnProcess)
import Types.Args (TargetId)
import Types.BuckArgs (BuckArgs (workerTargetId), parseBuckArgs)
import Types.GhcHandler (WorkerMode (..))
import Types.Grpc (RequestArgs (..))
import Types.Grpc (CommandEnv (..), RequestArgs (..))
import Types.Orchestration (
PrimarySocketPath (..),
ServerSocketPath (..),
Expand Down Expand Up @@ -83,6 +83,9 @@ proxyHandler ::
proxyHandler workerMap exe wmode basePath req = do
let cmdEnv = commandEnv req.env
argv = Text.unpack . decodeUtf8Lenient <$> req.argv
-- Get the build ID for the primary socket path from the command environment, and fall back to the value extracted
-- from the gRPC socket path if the key is absent from the env.
socketId = fromMaybe basePath (cmdEnv.values !? "BUCK_BUILD_ID")
buckArgs <- either (throwIO . userError) pure (parseBuckArgs cmdEnv (RequestArgs argv))
case buckArgs.workerTargetId of
Nothing -> throwIO (userError "No --worker-target-id passed")
Expand All @@ -91,7 +94,7 @@ proxyHandler workerMap exe wmode basePath req = do
modifyMVar workerMap \wmap -> do
case Map.lookup targetId wmap of
Nothing -> do
let workerSocketDir = projectSocketDirectory basePath targetId
let workerSocketDir = projectSocketDirectory socketId targetId
void $ try @IOError (createDirectoryIfMissing True workerSocketDir.path)
resource <- spawnGhcWorker exe wmode workerSocketDir
dbg $ "No primary socket for " ++ show targetId ++ ", so created it on " ++ resource.primarySocket.path
Expand Down
2 changes: 1 addition & 1 deletion ghc-worker/lib/GhcWorker/CompileResult.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module GhcWorker.CompileResult where
import Data.Foldable (for_)
import Data.Int (Int32)
import Internal.AbiHash (AbiHash (..))
import Internal.Cache (ModuleArtifacts)
import Internal.State (ModuleArtifacts)
import Types.BuckArgs (BuckArgs (..))

-- | Right now the 'Maybe' just corresponds to the presence of the CLI argument @--abi-out@ – errors occuring while
Expand Down
63 changes: 40 additions & 23 deletions ghc-worker/lib/GhcWorker/GhcHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ module GhcWorker.GhcHandler where
import Common.Grpc (GrpcHandler (..))
import Control.Concurrent (MVar, forkIO, threadDelay)
import Control.Concurrent.STM (TVar, atomically, modifyTVar', readTVar, retry, writeTVar)
import Control.Exception (throwIO)
import Control.Monad.Catch (onException)
import Control.Exception (throwIO, try)
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Data.Coerce (coerce)
import Data.Functor ((<&>))
import Data.Int (Int32)
import GHC (DynFlags (..), Ghc, getSession)
Expand All @@ -15,18 +16,20 @@ import GHC.Driver.Monad (modifySession)
import GhcWorker.CompileResult (CompileResult (..), writeCloseOutput, writeResult)
import GhcWorker.Instrumentation (Hooks (..), InstrumentedHandler (..))
import Internal.AbiHash (AbiHash (..), showAbiHash)
import Internal.Cache (Cache (..), ModuleArtifacts (..), Target (..))
import Internal.Compile (compileModuleWithDepsInEps)
import Internal.CompileHpt (compileModuleWithDepsInHpt)
import Internal.Log (LogName (..), dbg, logFlush, newLog)
import Internal.Log (TraceId, dbg, logDebug, logFlush, newLog, setLogTarget)
import Internal.Metadata (computeMetadata)
import Internal.Session (Env (..), withGhc, withGhcMhu)
import Internal.State (ModuleArtifacts (..), WorkerState (..), dumpState)
import Prelude hiding (log)
import System.Exit (ExitCode (ExitSuccess))
import System.Posix.Process (exitImmediately)
import Types.BuckArgs (BuckArgs, Mode (..), parseBuckArgs, toGhcArgs)
import qualified Types.BuckArgs
import Types.GhcHandler (WorkerMode (..))
import Types.Grpc (RequestArgs (..))
import Types.State (Target)

data LockState = LockStart | LockFreeze Int | LockThaw Int | LockEnd
deriving stock (Eq, Show)
Expand Down Expand Up @@ -90,10 +93,8 @@ dispatch lock workerMode hooks env args =
pure (code, result)
pure (code, snd <$> result)
Just ModeMetadata -> do
code <- computeMetadata env <&> \case
True -> 0
False -> 1
pure (code, Just (Target "metadata"))
(success, target) <- computeMetadata env
pure (if success then 0 else 1, target)
Just ModeClose -> do
dbg "in dispatch. Mode Close"
_ <- writeCloseOutput args
Expand All @@ -111,9 +112,31 @@ dispatch lock workerMode hooks env args =
withGhcMhu env \ _ ->
withTarget (compileAndReadAbiHash CompManager compileModuleWithDepsInHpt hooks args)

withTarget f target =
withTarget f target = do
liftIO $ setLogTarget env.log target
f target <&> fmap \ r -> (r, target)

processResult ::
Hooks ->
Env ->
Either IOError (Int32, Maybe Target) ->
IO ([String], Int32)
processResult hooks env result = do
when (exitCode /= 0) do
dumpState env.log env.state exception
output <- logFlush env.log
hooks.compileFinish (hookPayload output)
pure (output, exitCode)
where
hookPayload output =
if exitCode == 0
then Just (target, output, exitCode)
else Nothing

((exitCode, target), exception) = case result of
Right out -> (out, Nothing)
Left err -> ((1, Nothing), Just ("Exception: " ++ show err))

-- | Default implementation of an 'InstrumentedHandler' using our custom persistent worker GHC mode, either using HPT or
-- EPS for local dependency lookup.
--
Expand All @@ -125,22 +148,16 @@ dispatch lock workerMode hooks env args =
ghcHandler ::
-- | first req lock hack
TVar LockState ->
MVar Cache ->
MVar WorkerState ->
WorkerMode ->
Maybe TraceId ->
InstrumentedHandler
ghcHandler lock cache workerMode =
ghcHandler lock state workerMode traceId =
InstrumentedHandler \ hooks -> GrpcHandler \ commandEnv argv -> do
buckArgs <- either (throwIO . userError) pure (parseBuckArgs commandEnv argv)
args <- toGhcArgs buckArgs
log <- newLog True
let env = Env {log, cache, args}
onException
do
(result, target) <- dispatch lock workerMode hooks env buckArgs
output <- logFlush (logName <$> target) env.log
liftIO $ hooks.compileFinish (Just (target, output, result))
pure (output, result)
do
liftIO $ hooks.compileFinish Nothing
where
logName (Target target) = LogName target
log <- newLog traceId
logDebug log (unlines (coerce argv))
let env = Env {log, state, args}
result <- try $ dispatch lock workerMode hooks env buckArgs
processResult hooks env result
28 changes: 14 additions & 14 deletions ghc-worker/lib/GhcWorker/Grpc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Control.Monad (forever)
import Data.Map.Strict qualified as Map
import Data.Text qualified as Text
import GHC.Stats (GCDetails (..), RTSStats (..), getRTSStats)
import Internal.Cache (Cache (..), Options (..))
import Internal.State (WorkerState (..), Options (..))
import Network.GRPC.Common (NextElem (..))
import Network.GRPC.Common.Protobuf (Proto, defMessage, (&), (.~))
import Network.GRPC.Server.Protobuf (ProtobufMethodsOf)
Expand All @@ -22,7 +22,7 @@ import Proto.Instrument (Instrument)
import Proto.Instrument_Fields qualified as Instr

-- | Fetch statistics about the current state of the RTS for instrumentation.
mkStats :: Cache -> IO (Proto Instr.Stats)
mkStats :: WorkerState -> IO (Proto Instr.Stats)
mkStats _ = do
s <- getRTSStats
pure $
Expand All @@ -36,14 +36,14 @@ mkStats _ = do
-- | Implementation of a streaming grapesy handler that sends instrumentation statistics pulled from the provided
-- channel to the client.
notifyMe ::
MVar Cache ->
MVar WorkerState ->
Chan (Proto Instr.Event) ->
(NextElem (Proto Instr.Event) -> IO ()) ->
IO ()
notifyMe cacheVar chan callback = do
cache <- readMVar cacheVar
notifyMe stateVar chan callback = do
state <- readMVar stateVar
myChan <- dupChan chan
stats <- mkStats cache
stats <- mkStats state
callback $ NextElem $
defMessage
& Instr.stats .~ stats
Expand All @@ -53,12 +53,12 @@ notifyMe cacheVar chan callback = do

-- | Set the options for the server.
setOptions ::
MVar Cache ->
MVar WorkerState ->
Proto Instr.Options ->
IO (Proto Instr.Empty)
setOptions cacheVar opts = do
modifyMVar_ cacheVar $ \cache ->
pure cache {
setOptions stateVar opts = do
modifyMVar_ stateVar $ \state ->
pure state {
options = Options {
extraGhcOptions = Text.unpack opts.extraGhcOptions
}
Expand All @@ -68,9 +68,9 @@ setOptions cacheVar opts = do
-- | A grapesy server that streams instrumentation data from the provided channel.
instrumentMethods ::
Chan (Proto Instr.Event) ->
MVar Cache ->
MVar WorkerState ->
Methods IO (ProtobufMethodsOf Instrument)
instrumentMethods chan cacheVar =
instrumentMethods chan stateVar =
simpleMethods
(mkServerStreaming (const (notifyMe cacheVar chan)))
(mkNonStreaming (setOptions cacheVar))
(mkServerStreaming (const (notifyMe stateVar chan)))
(mkNonStreaming (setOptions stateVar))
21 changes: 11 additions & 10 deletions ghc-worker/lib/GhcWorker/Instrumentation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import Data.Foldable (traverse_)
import Data.Int (Int32)
import Data.Text qualified as Text
import GhcWorker.Grpc (mkStats)
import Internal.Cache (Cache, Target (..))
import Internal.State (WorkerState)
import Internal.Log (dbg)
import Network.GRPC.Common.Protobuf (Proto, defMessage, (&), (.~))
import Prelude hiding (log)
import qualified Proto.Instrument as Instr
import Proto.Instrument_Fields qualified as Instr
import Types.State (Target (..))

-- | Rudimentary dummy state for instrumentation, counting concurrently compiling sessions.
data WorkerStatus =
Expand Down Expand Up @@ -89,15 +90,15 @@ messageCompileEnd target exitCode err =
withInstrumentation ::
Chan (Proto Instr.Event) ->
MVar WorkerStatus ->
MVar Cache ->
MVar WorkerState ->
InstrumentedHandler ->
GrpcHandler
withInstrumentation instrChan status cacheVar handler =
withInstrumentation instrChan status stateVar handler =
GrpcHandler \ commandEnv argv -> do
cache <- readMVar cacheVar
state <- readMVar stateVar
bracket_ (startJob status) (finishJob status) do
result <- (handler.create hooks).run commandEnv argv
stats <- mkStats cache
stats <- mkStats state
writeChan instrChan (defMessage & Instr.stats .~ stats)
pure result
where
Expand All @@ -111,12 +112,12 @@ withInstrumentation instrChan status cacheVar handler =
writeChan instrChan $
defMessage &
Instr.compileStart .~
messageCompileStart target.get
messageCompileStart target.path

-- Note: This is WIP.
compileFinish =
traverse_ \ (target, output, exitCode) -> do
let tgt = maybe "" (.get) target
let tgt = maybe "" (.path) target
writeChan instrChan $
defMessage &
Instr.compileEnd .~
Expand All @@ -127,9 +128,9 @@ withInstrumentation instrChan status cacheVar handler =
toGrpcHandler ::
InstrumentedHandler ->
MVar WorkerStatus ->
MVar Cache ->
MVar WorkerState ->
Maybe (Chan (Proto Instr.Event)) ->
GrpcHandler
toGrpcHandler createHandler status cacheVar = \case
toGrpcHandler createHandler status stateVar = \case
Nothing -> createHandler.create hooksNoop
Just instrChan -> withInstrumentation instrChan status cacheVar createHandler
Just instrChan -> withInstrumentation instrChan status stateVar createHandler
35 changes: 21 additions & 14 deletions ghc-worker/lib/GhcWorker/Orchestration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ import Types.Orchestration (
-- The 'Instrument' component is intended to be optional.
data CreateMethods where
CreateMethods :: {
createInstrumentation :: IO (instr, Methods IO (ProtobufMethodsOf Instrument)),
createGhc :: Maybe instr -> IO (Methods IO (ProtobufMethodsOf Worker))
createInstrumentation :: IO (instrumentSocket, Methods IO (ProtobufMethodsOf Instrument)),
createGhc :: Maybe instrumentSocket -> IO (Methods IO (ProtobufMethodsOf Worker))
} -> CreateMethods

newtype FeatureInstrument =
FeatureInstrument { flag :: Bool }
deriving stock (Eq, Show)

-- | Start a gRPC server that dispatches requests to GHC handlers.
runLocalGhc ::
CreateMethods ->
Expand All @@ -54,10 +58,10 @@ runLocalGhc ::
IO ()
runLocalGhc CreateMethods {..} socket minstr = do
dbg ("Starting ghc server on " ++ socket.path)
instrResource <- for minstr \instr -> do
dbg ("Instrumentation info available on " ++ instr.path)
instrResource <- for minstr \instrumentSocket -> do
dbg ("Instrumentation info available on " ++ instrumentSocket.path)
(resource, methods) <- createInstrumentation
_instrThread <- async $ runServerWithHandlers def (grpcServerConfig instr.path) (fromMethods methods)
_instrThread <- async $ runServerWithHandlers def (grpcServerConfig instrumentSocket.path) (fromMethods methods)
pure resource
methods <- createGhc instrResource
runServerWithHandlers def (grpcServerConfig socket.path) (fromMethods methods)
Expand All @@ -69,8 +73,8 @@ runCentralGhc ::
ServerSocketPath ->
Maybe InstrumentSocketPath ->
IO ()
runCentralGhc mode discovery socket instr =
finally (runLocalGhc mode socket instr) do
runCentralGhc mode discovery socket instrumentSocket =
finally (runLocalGhc mode socket instrumentSocket) do
dbg ("Shutting down ghc server on " ++ socket.path)
removeFile discovery.path

Expand Down Expand Up @@ -163,11 +167,14 @@ waitForCentralGhc proc socket = do
dbg "Spawned process for the GHC server exited after starting up."

-- | Run a GHC server synchronously.
runCentralGhcSpawned :: CreateMethods -> ServerSocketPath -> IO ()
runCentralGhcSpawned methods socket =
runCentralGhc methods primaryFile socket instr
runCentralGhcSpawned :: CreateMethods -> FeatureInstrument -> ServerSocketPath -> IO ()
runCentralGhcSpawned methods featureInstrument socket =
runCentralGhc methods primaryFile socket instrumentSocket
where
instr = Just (instrumentSocketIn dir)
instrumentSocket =
if featureInstrument.flag
then Just (instrumentSocketIn dir)
else Nothing

primaryFile = primarySocketDiscoveryIn dir

Expand Down Expand Up @@ -209,17 +216,17 @@ runOrProxyCentralGhc socketDir runServer = do

-- | Start a gRPC server that either runs GHC (primary server) or a proxy that forwards requests to the primary.
serveOrProxyCentralGhc :: CreateMethods -> ServerSocketPath -> IO ()
serveOrProxyCentralGhc mode socket = do
serveOrProxyCentralGhc methods socket = do
runOrProxyCentralGhc socketDir run >>= \case
Right (_, thread) -> onException (wait thread) (cancel thread)
Left primary -> proxyServer primary socket
where
run primaryFile = do
let primary = PrimarySocketPath socket.path
thread <- async (runCentralGhc mode primaryFile socket instr)
thread <- async (runCentralGhc methods primaryFile socket instrumentSocket)
waitPoll primary
pure (primary, thread)

instr = Just (instrumentSocketIn socketDir)
instrumentSocket = Just (instrumentSocketIn socketDir)

socketDir = SocketDirectory (init (dropWhileEnd ('-' /=) (takeDirectory socket.path)))
Loading
Loading