Skip to content

Commit

Permalink
handle rocksdb exception in write path
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jul 5, 2024
1 parent f5599c4 commit 2f10a09
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 78 deletions.
39 changes: 19 additions & 20 deletions hstream/src/HStream/Server/CacheStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Control.Concurrent.STM (TVar, atomically, check, newTVarIO,
readTVar, readTVarIO, swapTVar,
writeTVar)
import Control.Exception (Exception (displayException),
throwIO, try)
catch, throwIO, try)
import Control.Monad (unless, void, when)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -173,7 +173,7 @@ getStore CacheStore{store} = do
Log.fatal $ "Cache store not initialized, should not get here"
throwIO $ HE.UnexpectedError "Store is not initialized."

writeRecord :: CacheStore -> T.Text -> Word64 -> ByteString -> IO (Either RocksDbException S.AppendCompletion)
writeRecord :: CacheStore -> T.Text -> Word64 -> ByteString -> IO S.AppendCompletion
writeRecord st@CacheStore{..} streamName shardId payload = do
canWrite <- readMVar enableWrite
unless canWrite $ do
Expand All @@ -185,27 +185,26 @@ writeRecord st@CacheStore{..} streamName shardId payload = do
-- Obtain a monotonically increasing offset to ensure that each encoded-key is unique.
offset <- atomicModifyIORef' counter (\x -> (x + 1, x))
let k = encodeKey streamName shardId offset

!append_start <- getPOSIXTime
res <- try @RocksDbException $ putCF db writeOptions columnFamily k payload
case res of
Left e -> do
ST.cache_store_stat_add_cs_append_failed statsHolder cStreamName 1
return $ Left e
Right _ -> do
ST.serverHistogramAdd statsHolder ST.SHL_AppendCacheStoreLatency =<< msecSince append_start
ST.cache_store_stat_add_cs_append_in_bytes statsHolder cStreamName (fromIntegral $ BS.length payload)
ST.cache_store_stat_add_cs_append_in_records statsHolder cStreamName 1
ST.cache_store_stat_add_cs_append_total statsHolder cStreamName 1
void $ atomicModifyIORef' totalWrite (\x -> (x + 1, x))
lsn <- getCurrentTimestamp
return . Right $
S.AppendCompletion
{ appendCompLogID = shardId
, appendCompLSN = fromIntegral lsn
, appendCompTimestamp = 0
}
putCF db writeOptions columnFamily k payload `catch` record_failed
ST.serverHistogramAdd statsHolder ST.SHL_AppendCacheStoreLatency =<< msecSince append_start
ST.cache_store_stat_add_cs_append_in_bytes statsHolder cStreamName (fromIntegral $ BS.length payload)
ST.cache_store_stat_add_cs_append_in_records statsHolder cStreamName 1
ST.cache_store_stat_add_cs_append_total statsHolder cStreamName 1

void $ atomicModifyIORef' totalWrite (\x -> (x + 1, x))
lsn <- getCurrentTimestamp
return S.AppendCompletion
{ appendCompLogID = shardId
, appendCompLSN = fromIntegral lsn
, appendCompTimestamp = 0
}
where
cStreamName = "cache_store"
record_failed (e :: RocksDbException) = do
ST.cache_store_stat_add_cs_append_failed statsHolder cStreamName 1
throwIO e

-- dump all cached records to HStore
dumpToHStore :: CacheStore -> S.LDClient -> S.Compression -> IO ()
Expand Down
53 changes: 18 additions & 35 deletions hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ module HStream.Server.Core.Stream
, getStream
, listStreams
, listStreamsWithPrefix
, append
, appendStream
, listShards
, getTailRecordId
Expand Down Expand Up @@ -351,24 +350,6 @@ getTailRecordIdV2 ServerContext{..} slotConfig API.GetTailRecordIdRequest{..} =
}
return $ API.GetTailRecordIdResponse { getTailRecordIdResponseTailRecordId = Just recordId}

append :: HasCallStack
=> ServerContext
-> T.Text -- streamName
-> Word64 -- shardId
-> API.BatchedRecord -- payload
-> IO API.AppendResponse
append sc@ServerContext{..} streamName shardId payload = do
!recv_time <- getPOSIXTime
Log.debug $ "Receive Append Request: StreamName {"
<> Log.build streamName
<> "(shardId: "
<> Log.build shardId
<> ")}"

resp <- appendStream sc streamName shardId payload
Stats.serverHistogramAdd scStatsHolder Stats.SHL_AppendRequestLatency =<< msecSince recv_time
return resp

appendStream :: HasCallStack
=> ServerContext
-> T.Text
Expand All @@ -379,37 +360,39 @@ appendStream ServerContext{..} streamName shardId record = do
let payload = encodBatchRecord record
recordSize = API.batchedRecordBatchSize record
payloadSize = BS.length payload
cStreamName = textToCBytes streamName
when (payloadSize > scMaxRecordSize) $ throwIO $ HE.InvalidRecordSize payloadSize

state <- readIORef serverState
rids <- case state of
S.AppendCompletion{..} <- case state of
ServerNormal -> do
Stats.handle_time_series_add_queries_in scStatsHolder "append" 1
Stats.stream_stat_add_append_total scStatsHolder cStreamName 1
Stats.stream_time_series_add_append_in_requests scStatsHolder cStreamName 1

!append_start <- getPOSIXTime
S.AppendCompletion {..} <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing
appendRes <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing `catch` record_failed

Stats.serverHistogramAdd scStatsHolder Stats.SHL_AppendLatency =<< msecSince append_start
Stats.stream_stat_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize)
Stats.stream_stat_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize)
Stats.stream_time_series_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize)
Stats.stream_time_series_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize)
return $ V.zipWith (API.RecordId shardId) (V.replicate (fromIntegral recordSize) appendCompLSN) (V.fromList [0..])
return appendRes
ServerBackup -> do
res <- DB.writeRecord cacheStore streamName shardId payload
case res of
Right S.AppendCompletion{..} ->
return $ V.zipWith (API.RecordId shardId) (V.replicate (fromIntegral recordSize) appendCompLSN) (V.fromList [0..])
Left e -> do
-- If write cache store failed, server will drop this record???
Log.fatal $ "write to cache store failed: " <> Log.build (displayException e)
return V.empty

return $ API.AppendResponse {
appendResponseStreamName = streamName
DB.writeRecord cacheStore streamName shardId payload

let rids = V.zipWith (API.RecordId shardId) (V.replicate (fromIntegral recordSize) appendCompLSN) (V.fromList [0..])
return $ API.AppendResponse
{ appendResponseStreamName = streamName
, appendResponseShardId = shardId
, appendResponseRecordIds = rids }
, appendResponseRecordIds = rids
}
where
cStreamName = textToCBytes streamName
record_failed (e :: S.SomeHStoreException) = do
Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1
Stats.stream_time_series_add_append_failed_requests scStatsHolder cStreamName 1
throwIO e

listShards
:: HasCallStack
Expand Down
42 changes: 19 additions & 23 deletions hstream/src/HStream/Server/Handler/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ module HStream.Server.Handler.Stream
) where

import Control.Exception
import Control.Monad (when)
import qualified Data.Map as Map
import Data.Maybe (fromJust, isNothing)
import qualified Data.Vector as V
import Database.RocksDB.Exception (RocksDbException)
import qualified HsGrpc.Server as G
import qualified HsGrpc.Server.Types as G
import Network.GRPC.HighLevel.Generated
import qualified ZooKeeper.Exception as ZK

import Control.Monad (when)
import qualified HStream.Common.ZookeeperSlotAlloc as Slot
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
Expand All @@ -56,7 +57,6 @@ import HStream.Server.Exception
import HStream.Server.HStreamApi
import HStream.Server.Types (ServerContext (..))
import HStream.Server.Validation
import qualified HStream.Stats as Stats
import qualified HStream.Store as Store
import HStream.ThirdParty.Protobuf as PB
import HStream.Utils
Expand Down Expand Up @@ -221,25 +221,19 @@ appendHandler
:: ServerContext
-> ServerRequest 'Normal AppendRequest AppendResponse
-> IO (ServerResponse 'Normal AppendResponse)
appendHandler sc@ServerContext{..} (ServerNormalRequest _metadata request@AppendRequest{..}) =
appendStreamExceptionHandle inc_failed $ do
appendHandler sc (ServerNormalRequest _metadata request@AppendRequest{..}) =
appendStreamExceptionHandle $ do
Log.debug $ "Receive Append Request: StreamName {" <> Log.build appendRequestStreamName
<> "(shardId: " <> Log.build appendRequestShardId <> ")}"
validateAppendRequest request
returnResp =<< C.append sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords)
where
inc_failed = do
Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1
Stats.stream_time_series_add_append_failed_requests scStatsHolder cStreamName 1
cStreamName = textToCBytes appendRequestStreamName
returnResp =<< C.appendStream sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords)

handleAppend :: ServerContext -> G.UnaryHandler AppendRequest AppendResponse
handleAppend sc@ServerContext{..} _ req@AppendRequest{..} = appendExHandle inc_failed $ do
handleAppend sc _ req@AppendRequest{..} = appendExHandle $ do
Log.debug $ "Receive Append Request: StreamName {" <> Log.build appendRequestStreamName
<> "(shardId: " <> Log.build appendRequestShardId <> ")}"
validateAppendRequest req
C.append sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords)
where
inc_failed = do
Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1
Stats.stream_time_series_add_append_failed_requests scStatsHolder cStreamName 1
cStreamName = textToCBytes appendRequestStreamName
C.appendStream sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords)
{-# INLINE handleAppend #-}

--------------------------------------------------------------------------------
Expand Down Expand Up @@ -286,11 +280,9 @@ handleTrimShard sc _ request@TrimShardRequest{..} = catchDefaultEx $ do
--------------------------------------------------------------------------------
-- Exception Handlers

appendStreamExceptionHandle :: IO () -> HE.ExceptionHandle (ServerResponse 'Normal a)
appendStreamExceptionHandle f = HE.mkExceptionHandle' whileEx mkHandlers
appendStreamExceptionHandle :: HE.ExceptionHandle (ServerResponse 'Normal a)
appendStreamExceptionHandle = HE.mkExceptionHandle mkHandlers
where
whileEx :: forall e. Exception e => e -> IO ()
whileEx err = Log.warning (Log.buildString' err) >> f
handlers =
[ Handler (\(err :: Store.NOTFOUND) ->
return (StatusUnavailable, HE.mkStatusDetails err))
Expand All @@ -302,6 +294,8 @@ appendStreamExceptionHandle f = HE.mkExceptionHandle' whileEx mkHandlers
return (StatusUnavailable, HE.mkStatusDetails err))
, Handler (\(err :: Store.PEER_UNAVAILABLE) -> do
return (StatusUnavailable, HE.mkStatusDetails err))
, Handler (\(err :: RocksDbException) ->
return (StatusInternal, HE.mkStatusDetails err))
] ++ defaultHandlers
mkHandlers = HE.setRespType mkServerErrResp handlers

Expand All @@ -310,15 +304,17 @@ appendStreamExceptionHandle f = HE.mkExceptionHandle' whileEx mkHandlers
Log.warning (Log.buildString' err); \
G.throwGrpcError $ HE.mkGrpcStatus err G.StatusUnavailable

appendExHandle :: IO () -> (IO a -> IO a)
appendExHandle f = HE.mkExceptionHandle' (const f) handlers
appendExHandle :: IO a -> IO a
appendExHandle = HE.mkExceptionHandle handlers
where
handlers =
[ MkUnavailable(Store.NOTFOUND)
, MkUnavailable(Store.NOTINSERVERCONFIG)
, MkUnavailable(Store.NOSEQUENCER)
, MkUnavailable(Store.TIMEDOUT)
, MkUnavailable(Store.PEER_UNAVAILABLE)
, Handler (\(err :: RocksDbException) ->
G.throwGrpcError $ HE.mkGrpcStatus err G.StatusInternal)
] ++ defaultExHandlers

listShardsExceptionHandle :: HE.ExceptionHandle (ServerResponse 'Normal a)
Expand Down

0 comments on commit 2f10a09

Please sign in to comment.