Skip to content

Commit

Permalink
More robust forking, exception safety. Closes hasura#3768
Browse files Browse the repository at this point in the history
This is the result of a general audit of how we fork threads, with a
detour into how we're using mutable state especially in websocket
codepaths, making more robust to async exceptions and exceptions
resulting from bugs.

Some highlights:
- use a wrapper around 'immortal' so threads that die due to bugs are
  restarted, and log the error
- use 'withAsync' some places
- use bracket a few places where we might break invariants
- log some codepaths that represent bugs
- export UnstructuredLog for ad hoc logging (the alternative is we
  continue not logging useful stuff)

I had to timebox this. There are a few TODOs I didn't want to address.
And we'll wait until this is merged to attempt hasura#3705 for
Control.Concurrent.Extended
  • Loading branch information
jberryman committed Feb 25, 2020
1 parent b84db36 commit 33a3759
Show file tree
Hide file tree
Showing 21 changed files with 432 additions and 239 deletions.
1 change: 1 addition & 0 deletions scripts/dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ function wait_docker_postgres {
#################################
if [ "$MODE" = "graphql-engine" ]; then
cd "$PROJECT_ROOT/server"
rm -f graphql-engine.tix

export HASURA_GRAPHQL_SERVER_PORT=${HASURA_GRAPHQL_SERVER_PORT-8181}

Expand Down
1 change: 1 addition & 0 deletions server/graphql-engine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ library
, list-t
, async
, lifted-async
, immortal < 0.3

-- logging related
, base64-bytestring >= 1.0
Expand Down
79 changes: 76 additions & 3 deletions server/src-lib/Control/Concurrent/Extended.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
module Control.Concurrent.Extended
( module Control.Concurrent
, sleep
, ForkableMonadIO
-- * Robust forking
, forkImmortal
-- * Deprecated
, threadDelay
, forkIO
) where

import Prelude
import Control.Exception
import Control.Monad.IO.Class
import Control.Monad
import Data.Aeson
import Data.Void

import qualified Control.Concurrent as Base
import qualified Control.Concurrent as Base
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Immortal as Immortal
import qualified Control.Monad.Trans.Control as MC

import Control.Concurrent hiding (threadDelay)
import Data.Time.Clock.Units (Microseconds (..), DiffTime)
import Control.Concurrent hiding (threadDelay, forkIO)
import Data.Time.Clock.Units (seconds, Microseconds (..), DiffTime)

-- For forkImmortal. We could also have it take a cumbersome continuation if we
-- want to break this dependency. Probably best to move Hasura.Logging into a
-- separate lib with this if we do the override thing.
import Hasura.Logging

-- | Like 'Base.threadDelay', but takes a 'DiffTime' instead of an 'Int' microseconds.
--
Expand All @@ -22,3 +39,59 @@ sleep = Base.threadDelay . round . Microseconds
{-# DEPRECATED threadDelay "Please use `sleep` instead (and read the docs!)" #-}
threadDelay :: Int -> IO ()
threadDelay = Base.threadDelay

{-# DEPRECATED forkIO
"Please use 'Control.Control.Concurrent.Async.Lifted.Safe.withAsync'\
\ or our 'forkImmortal' instead formore robust threading."
#-}
forkIO :: IO () -> IO ThreadId
forkIO = Base.forkIO

forkImmortal
:: ForkableMonadIO m
=> String
-- ^ A label describing this thread's function (see 'labelThread').
-> Logger Hasura
-> m Void
-- ^ An IO action we expect never to return normally. This will have the type
-- signature ':: m a' (see e.g. the type of 'forever').
-> m Immortal.Thread
-- ^ A handle for the forked thread. See "Control.Immortal".
forkImmortal label logger m =
Immortal.createWithLabel label $ \this ->
Immortal.onUnexpectedFinish this logAndPause (void m)
where logAndPause = \case
Right _void -> pure () -- absurd _void (i.e. unreachable)
Left e -> liftIO $ do
liftIO $ unLogger logger $
ImmortalThreadLog label e
-- pause before restarting some arbitrary amount of time. The idea is not to flood
-- logs or cause other cascading failures.
sleep (seconds 1)

data ImmortalThreadLog = ImmortalThreadLog String SomeException

instance ToEngineLog ImmortalThreadLog Hasura where
toEngineLog (ImmortalThreadLog label e) =
(LevelError, ELTInternal ILTUnstructured, toJSON msg)
where msg = "Unexpected exception in immortal thread \""<>label<>"\" (it will be restarted):\n"
<> show e


-- TODO
-- - maybe use this everywhere, but also:
-- - consider unifying with: src-lib/Control/Monad/Stateless.hs ?
-- - nice TypeError: https://kodimensional.dev/type-errors
--
-- | Like 'MonadIO' but constrained to stacks in which forking a new thread is reasonable/safe.
-- In particular 'StateT' causes problems.
--
-- This is the constraint you can use for functions that call 'LA.async', or 'immortal'.
type ForkableMonadIO m = (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m))


-- TODO consider deprecating async.
-- export something with polymorphic return type, which makes "fork and forget" difficult
-- this could automatically link in one variant
-- another variant might return ThreadId that self destructs w/ finalizer (mkWeakThreadId)
-- and note: "Holding a normal ThreadId reference will prevent the delivery of BlockedIndefinitely exceptions because the reference could be used as the target of throwTo at any time, "
5 changes: 5 additions & 0 deletions server/src-lib/Data/TByteString.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import qualified Data.Text as T
import qualified Data.Text.Encoding as TE

import Data.Bool (bool)
import Data.String
import Prelude

-- | A JSON-serializable type for either text or raw binary data, encoded with base-64.
newtype TByteString
= TByteString (Bool, T.Text)
deriving (Show, Eq)
Expand All @@ -23,6 +25,9 @@ instance J.ToJSON TByteString where
toJSON (TByteString (isBase64, t)) =
bool (J.toJSON t) (J.toJSON ["Base64", t]) isBase64

instance IsString TByteString where
fromString = fromText . T.pack

fromText :: T.Text -> TByteString
fromText t = TByteString (False, t)

Expand Down
22 changes: 13 additions & 9 deletions server/src-lib/Hasura/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import Options.Applicative
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)

import qualified Control.Concurrent as C
import qualified Control.Concurrent.Extended as C
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
Expand Down Expand Up @@ -232,9 +232,10 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
inconsObjs <- scInconsistentObjs <$> liftIO (getSCFromRef cacheRef)
liftIO $ logInconsObjs logger inconsObjs

-- start a background thread for schema sync
startSchemaSync sqlGenCtx _icPgPool logger _icHttpManager
cacheRef _icInstanceId cacheInitTime
-- start background threads for schema sync
(_schemaSyncListenerThread, _schemaSyncProcessorThread) <-
startSchemaSyncThreads sqlGenCtx _icPgPool logger _icHttpManager
cacheRef _icInstanceId cacheInitTime

let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
Expand All @@ -251,20 +252,23 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do
prepareEvents _icPgPool logger
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
void $ liftIO $ C.forkIO $ processEventQueue logger logEnvHeaders
(_pushEventsThread, _consumeEventsThread) <- liftIO $
forkEventQueueProcessors logger logEnvHeaders
_icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx

-- start a backgroud thread to handle async actions
void $ liftIO $ C.forkIO $ asyncActionsProcessor (_scrCache cacheRef) _icPgPool _icHttpManager
_asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $ liftIO $
asyncActionsProcessor (_scrCache cacheRef) _icPgPool _icHttpManager

-- start a background thread to check for updates
void $ liftIO $ C.forkIO $ checkForUpdates loggerCtx _icHttpManager
_updateThread <- C.forkImmortal "checkForUpdates" logger $ liftIO $
checkForUpdates loggerCtx _icHttpManager

-- TODO async/immortal:
-- start a background thread for telemetry
when soEnableTelemetry $ do
unLogger logger $ mkGenericStrLog LevelInfo "telemetry" telemetryNotice
void $ liftIO $ C.forkIO $ runTelemetry logger _icHttpManager (getSCFromRef cacheRef) _icDbUid _icInstanceId
void $ C.forkImmortal "runTelemetry" logger $ liftIO $
runTelemetry logger _icHttpManager (getSCFromRef cacheRef) _icDbUid _icInstanceId

finishTime <- liftIO Clock.getCurrentTime
let apiInitTime = realToFrac $ Clock.diffUTCTime finishTime initTime
Expand Down
2 changes: 2 additions & 0 deletions server/src-lib/Hasura/Events/HTTP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ $(J.deriveJSON (J.aesonDrop 4 J.snakeCase){J.omitNothingFields=True} ''HTTPReq)
instance ToEngineLog HTTPReq Hasura where
toEngineLog req = (LevelInfo, eventTriggerLogType, J.toJSON req)

-- | Like 'HTTP.httpLbs' but we catch 'HTTP.HttpException' and return all known
-- error-like conditions as 'HTTPErr'.
runHTTP
:: ( MonadReader r m
, Has (Logger Hasura) r
Expand Down
Loading

0 comments on commit 33a3759

Please sign in to comment.