Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions buck-worker/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import Internal.Log (dbg)
import Orchestration (envServerSocket)
import Run (parseOptions, runWorker)
import System.Environment (getArgs)
import System.IO (BufferMode (..), hPutStrLn, hSetBuffering, stderr, stdout)
import System.IO (BufferMode (..), hSetBuffering, stderr, stdout)

main :: IO ()
main = do
hSetBuffering stdout LineBuffering
hSetBuffering stderr LineBuffering
options <- parseOptions =<< getArgs
socket <- envServerSocket
hPutStrLn stderr $ "using worker socket: " <> show socket
try (runWorker socket options) >>= \case
Right () ->
dbg "Worker terminated without cancellation."
Expand Down
6 changes: 4 additions & 2 deletions buck-worker/buck-worker.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ library
ghc,
ghc-persistent-worker-plugin,
grapesy ^>=1.0.1,
hashable,
lens-family,
process,
proto-lens,
text,
vector
Expand All @@ -61,7 +63,7 @@ executable worker
import: all
main-is: Main.hs
hs-source-dirs: .
ghc-options: -O2 -threaded -with-rtsopts=-N -with-rtsopts=-T
ghc-options: -O2 -threaded "-with-rtsopts=-N -T"

build-depends:
async,
Expand All @@ -81,7 +83,7 @@ test-suite worker-test
other-modules:
CompileHptTest,
TestSetup
ghc-options: -threaded -rtsopts -with-rtsopts=-N -with-rtsopts=-T
ghc-options: -threaded -rtsopts "-with-rtsopts=-N -T"
build-depends:
directory,
filepath,
Expand Down
13 changes: 13 additions & 0 deletions buck-worker/lib/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[export_file(f, visibility = ["PUBLIC"]) for f in [
"BuckArgs.hs",
"BuckWorker.hs",
"GhcHandler.hs",
"Grpc.hs",
"Instrumentation.hs",
"Orchestration.hs",
"Proto/Instrument.hs",
"Proto/Instrument_Fields.hs",
"Proto/Worker.hs",
"Proto/Worker_Fields.hs",
"Run.hs",
]]
196 changes: 164 additions & 32 deletions buck-worker/lib/Orchestration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,49 @@

module Orchestration where

import qualified BuckWorker as Worker
import BuckWorker (ExecuteCommand, ExecuteResponse)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (async, cancel, wait)
import Control.Exception (finally, onException, try)
import Control.Monad (void)
import Control.DeepSeq (force)
import Control.Exception (bracket_, finally, onException, throwIO, try)
import Control.Monad (void, when)
import Data.Hashable (hash)
import Data.List (dropWhileEnd)
import Data.Maybe (isJust)
import Data.Traversable (for)
import GHC.IO.Handle.FD (withFileBlocking)
import GHC.IO.Handle.Lock (LockMode (..), hLock, hUnlock)
import Grpc (streamingNotImplemented)
import Internal.Log (dbg)
import Network.GRPC.Client (Connection, Server (..), recvNextOutput, sendFinalInput, withConnection, withRPC)
import Network.GRPC.Common (Proxy (..), def)
import Network.GRPC.Common.Protobuf (Proto, Protobuf, (%~), (&))
import Network.GRPC.Common.Protobuf (Proto, Protobuf, defMessage, (%~), (&))
import Network.GRPC.Server.Protobuf (ProtobufMethodsOf)
import Network.GRPC.Server.Run (InsecureConfig (..), ServerConfig (..), runServerWithHandlers)
import Network.GRPC.Server.StreamType (Methods (..), fromMethods, mkClientStreaming, mkNonStreaming)
import Proto.Instrument (Instrument (..))
import Proto.Worker (Worker (..))
import Proto.Worker_Fields qualified as Fields
import System.Directory (createDirectory, removeFile)
import System.Directory (createDirectoryIfMissing, getCurrentDirectory, removeFile)
import System.Environment (getEnv)
import System.Exit (exitFailure)
import System.FilePath (takeDirectory, (</>))
import System.IO (IOMode (..), hGetLine, hPutStr)
import System.IO (IOMode (..), hGetLine, hPutStr, withFile)
import System.Process (ProcessHandle, getProcessExitCode, spawnProcess)

-- | Determine how GHC servers should be started in relation to Buck worker processes.
data Orchestration =
-- | Each worker process starts its own GHC server.
Multi
|
-- | One worker process starts a GHC server, the others start a proxy server that forwards requests to the central
-- GHC.
Single
|
-- | One worker process spawns a new child process that runs the GHC server, and all workers then proxy this GHC.
-- GHC.
Spawn
deriving stock (Eq, Show)

-- | The file system path of the socket on which the worker running in this process is supposed to listen.
newtype ServerSocketPath =
Expand All @@ -35,27 +55,55 @@ newtype ServerSocketPath =
envServerSocket :: IO ServerSocketPath
envServerSocket = ServerSocketPath <$> getEnv "WORKER_SOCKET"

-- | The base dir for sockets, usually a dir in @/tmp@ created by Buck or ourselves.
newtype SocketDirectory =
SocketDirectory { path :: FilePath }
deriving stock (Eq, Show)

-- | Derive the socket base dir from the socket path provided by Buck.
spawnedSocketDirectory :: ServerSocketPath -> SocketDirectory
spawnedSocketDirectory (ServerSocketPath path) =
SocketDirectory (takeDirectory path)

-- | Use the current directory's hash to create a socket directory independent of Buck.
-- This is a hack that should eventually be replaced by a proper system.
projectSocketDirectory :: IO SocketDirectory
projectSocketDirectory = do
cwd <- getCurrentDirectory
pure (SocketDirectory ("/tmp/ghc-persistent-worker/" ++ show (hash cwd)))

-- | The file system path of the socket on which the primary worker running the GHC server is listening.
newtype PrimarySocketPath =
PrimarySocketPath { path :: FilePath }
deriving stock (Eq, Show)

-- | For the case where the primary server is spawned, rather than reusing the socket on which communication with Buck
-- is happening.
primarySocketIn :: SocketDirectory -> PrimarySocketPath
primarySocketIn dir = PrimarySocketPath (dir.path </> "server")

-- | The file system path of the socket on which the primary worker outputs instrumentation information.
newtype InstrumentSocketPath =
InstrumentSocketPath { path :: FilePath }
deriving stock (Eq, Show)

instrumentSocketIn :: FilePath -> InstrumentSocketPath
instrumentSocketIn dir = InstrumentSocketPath (dir </> "instrument")
instrumentSocketIn :: SocketDirectory -> InstrumentSocketPath
instrumentSocketIn dir = InstrumentSocketPath (dir.path </> "instrument")

-- | The file system path in which the primary worker running the GHC server stores its socket path for clients to
-- discover.
newtype PrimarySocketDiscoveryPath =
PrimarySocketDiscoveryPath { path :: FilePath }
deriving stock (Eq, Show)

primarySocketDiscoveryIn :: FilePath -> PrimarySocketDiscoveryPath
primarySocketDiscoveryIn dir = PrimarySocketDiscoveryPath (dir </> "primary")
primarySocketDiscoveryIn :: SocketDirectory -> PrimarySocketDiscoveryPath
primarySocketDiscoveryIn dir = PrimarySocketDiscoveryPath (dir.path </> "primary")

-- | Path to the worker executable, i.e. this program.
-- Used to spawn the GHC server process.
newtype WorkerExe =
WorkerExe { path :: FilePath }
deriving stock (Eq, Show)

-- | The implementation of an app consisting of two gRPC servers, implementing the protocols 'Worker' and 'Instrument'.
-- The 'Instrument' component is intended to be optional.
Expand Down Expand Up @@ -145,38 +193,122 @@ proxyServer primary socket = do
dbg ("Starting proxy for " ++ primary.path ++ " on " ++ socket.path)
runServerWithHandlers def (grpcServerConfig socket.path) $ fromMethods methods

-- | Start a gRPC server that either runs GHC (primary server) or a proxy that forwards requests to the primary.
messageExecute :: Proto Worker.ExecuteCommand
messageExecute = defMessage

-- | How often the process should wait for 100ms and retry connecting to the GHC server after spawning a process.
maxRetries :: Int
maxRetries = 30

-- | Attempt to connect and send a gRPC message to the server starting up at the given socket.
waitPoll :: PrimarySocketPath -> IO ()
waitPoll socket =
check maxRetries
where
check 0 = throwIO (userError "GHC server didn't respond within 3 seconds")
check n =
try connect >>= \case
Right () -> pure ()
Left (_ :: IOError) -> do
threadDelay 100_000
check (n - 1)

-- The part that throws is in @withConnection@, so this has to be executed every time.
connect =
withConnection def (ServerUnix socket.path) \ connection ->
withRPC connection def (Proxy @(Protobuf Worker "execute")) \ call ->
sendFinalInput call messageExecute <* recvNextOutput call

-- | Wait for a GHC server process to respond and check its exit code.
waitForCentralGhc :: ProcessHandle -> PrimarySocketPath -> IO ()
waitForCentralGhc proc socket = do
dbg "Waiting for server"
waitPoll socket
dbg "Server is up"
exitCode <- getProcessExitCode proc
when (isJust exitCode) do
dbg "Spawned process for the GHC server exited after starting up."

-- | Spawn a child process executing the worker executable (which usually is the same as this process), for the purpose
-- of running a GHC server to which all worker processes then forward their requests.
-- Afterwards, wait for the server to be responsive.
forkCentralGhc :: WorkerExe -> SocketDirectory -> IO PrimarySocketPath
forkCentralGhc exe socketDir = do
dbg ("Forking GHC server at " ++ primary.path)
proc <- spawnProcess exe.path ["--make", "--serve", primary.path]
waitForCentralGhc proc primary
pure primary
where
primary = primarySocketIn socketDir

-- | Run a GHC server synchronously.
runCentralGhcSpawned :: CreateMethods -> ServerSocketPath -> IO ()
runCentralGhcSpawned methods socket =
runCentralGhc methods primaryFile socket instr
where
instr = Just (instrumentSocketIn dir)

primaryFile = primarySocketDiscoveryIn dir

dir = spawnedSocketDirectory socket

-- | Run a server if this process is the primary worker, otherwise return the primary's socket path.
-- Since multiple workers are started in separate processes, we negotiate using file system locks.
-- There are two major scenarios:
--
-- - When the build is started, multiple workers are spawned concurrently, and no primary exists.
-- We use the common prefix of the worker sockets (something like `/tmp/buck2_worker/<hash>`) to create a lock file.
-- We create a lock file in the provided socket directory.
-- The first worker that wins the lock in `withFileBlocking` gets to be primary, and writes its socket path to
-- `/tmp/buck2_worker/<hash>/primary`.
-- `$socket_dir/primary`.
-- All other workers then own the lock in sequence and proceed with the second scenario.
--
-- - When the build is running and a primary exists, either because the worker lost the inital lock race or was started
-- later in the build due to dependencies and/or parallelism limits, the contents of the `primary` file are read to
-- obtain the primary's socket path.
-- A gRPC server is started that resends all requests to that socket.
runOrProxyCentralGhc :: CreateMethods -> ServerSocketPath -> IO ()
runOrProxyCentralGhc mode socket = do
void $ try @IOError (createDirectory dir)
result <- withFileBlocking primaryFile.path ReadWriteMode \ handle -> do
try @IOError (hGetLine handle) >>= \case
-- If the file didn't exist, `hGetLine` will still return the empty string.
-- File IO is buffered/lazy, so we have to force the pattern to avoid read after close (though this is already
-- achieved by calling `null`).
Right !primary | not (null primary) -> do
pure (Left (PrimarySocketPath primary))
_ -> do
thread <- async (runCentralGhc mode primaryFile socket instr)
hPutStr handle socket.path
pure (Right thread)
case result of
Right thread -> onException (wait thread) (cancel thread)
runOrProxyCentralGhc ::
SocketDirectory ->
(PrimarySocketDiscoveryPath -> IO (PrimarySocketPath, a)) ->
IO (Either PrimarySocketPath (PrimarySocketPath, a))
runOrProxyCentralGhc socketDir runServer = do
void $ try @IOError (createDirectoryIfMissing True socketDir.path)
withFile primaryFile.path ReadWriteMode \ handle -> do
bracket_ (hLock handle ExclusiveLock) (hUnlock handle) do
try @IOError (hGetLine handle) >>= \case
-- If the file didn't exist, `hGetLine` will still return the empty string in some GHC versions.
-- File IO is buffered/lazy, so we have to force the string to avoid read after close.
Right !primary | not (null (force primary)) -> do
pure (Left (PrimarySocketPath primary))
_ -> do
(primary, resource) <- runServer primaryFile
hPutStr handle primary.path
pure (Right (primary, resource))
where
primaryFile = primarySocketDiscoveryIn socketDir

-- | Start a gRPC server that either runs GHC (primary server) or a proxy that forwards requests to the primary.
serveOrProxyCentralGhc :: CreateMethods -> ServerSocketPath -> IO ()
serveOrProxyCentralGhc mode socket = do
runOrProxyCentralGhc socketDir run >>= \case
Right (_, thread) -> onException (wait thread) (cancel thread)
Left primary -> proxyServer primary socket
where
primaryFile = primarySocketDiscoveryIn dir
instr = Just (instrumentSocketIn dir)
dir = init (dropWhileEnd ('-' /=) (takeDirectory socket.path))
run primaryFile = do
let primary = PrimarySocketPath socket.path
thread <- async (runCentralGhc mode primaryFile socket instr)
waitPoll primary
pure (primary, thread)

instr = Just (instrumentSocketIn socketDir)

socketDir = SocketDirectory (init (dropWhileEnd ('-' /=) (takeDirectory socket.path)))

-- | Start a proxy gRPC server that forwards requests to the central GHC server.
-- If that server isn't running, spawn a process and wait for it to boot up.
spawnOrProxyCentralGhc :: WorkerExe -> ServerSocketPath -> IO ()
spawnOrProxyCentralGhc exe socket = do
socketDir <- projectSocketDirectory
primary <- runOrProxyCentralGhc socketDir \ _ -> do
primary <- forkCentralGhc exe socketDir
pure (primary, ())
proxyServer (either id fst primary) socket
Loading
Loading