Skip to content

Commit

Permalink
Add GC worker thread for delisting Pools
Browse files Browse the repository at this point in the history
  • Loading branch information
Julian Ospald committed Oct 16, 2020
1 parent 6cab51a commit c0dc034
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 21 deletions.
3 changes: 1 addition & 2 deletions lib/core/src/Cardano/Pool/DB/Model.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UndecidableInstances #-}

-- `const` isn't more readable than lambdas. Our language is based on
Expand Down Expand Up @@ -84,13 +83,13 @@ import Cardano.Wallet.Primitive.Types
( BlockHeader (..)
, CertificatePublicationTime
, EpochNo (..)
, InternalState (..)
, PoolId
, PoolLifeCycleStatus (..)
, PoolOwner (..)
, PoolRegistrationCertificate (..)
, PoolRetirementCertificate (..)
, Settings
, InternalState (..)
, SlotNo (..)
, StakePoolMetadata (..)
, StakePoolMetadataHash
Expand Down
1 change: 1 addition & 0 deletions lib/core/src/Cardano/Pool/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ newDBLayer trace fp timeInterpreter = do
deleteWhere [ BlockSlot >. point ]
-- TODO: remove dangling metadata no longer attached to a pool

delistPools [] = pure ()
delistPools pools =
-- sqlite has a max of 2k variables or so, so we don't want
-- 'IN #{poolList my_pools}' to blow up.
Expand Down
67 changes: 64 additions & 3 deletions lib/core/src/Cardano/Pool/Metadata.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module Cardano.Pool.Metadata
-- * Fetch
fetchFromRemote
, StakePoolMetadataFetchLog (..)
, fetchDelistedPools

-- * Construct URLs
, identityUrlBuilder
Expand All @@ -32,6 +33,8 @@ import Cardano.BM.Data.Severity
( Severity (..) )
import Cardano.BM.Data.Tracer
( HasPrivacyAnnotation (..), HasSeverityAnnotation (..) )
import Cardano.Wallet.Api.Types
( ApiT (..) )
import Cardano.Wallet.Primitive.AddressDerivation
( hex )
import Cardano.Wallet.Primitive.Types
Expand Down Expand Up @@ -70,6 +73,7 @@ import Network.HTTP.Client
( HttpException (..)
, Manager
, ManagerSettings
, brConsume
, brReadSome
, managerResponseTimeout
, requestFromURI
Expand All @@ -83,6 +87,7 @@ import Network.HTTP.Types.Status
import Network.URI
( URI (..), parseURI, pathSegments )

import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T
Expand Down Expand Up @@ -130,6 +135,45 @@ registryUrlBuilder baseUrl pid _ (StakePoolMetadataHash bytes) =
hashStr = T.unpack $ T.decodeUtf8 $ convertToBase Base16 bytes
pidStr = T.unpack $ toText pid

fetchDelistedPools
:: Tracer IO StakePoolMetadataFetchLog
-> URI
-> Manager
-> IO (Maybe [PoolId])
fetchDelistedPools tr uri manager = runExceptTLog $ do
pl <- getPoolsPayload
except . (fmap . fmap) getApiT . eitherDecodeStrict @[ApiT PoolId] $ pl
where
getPoolsPayload :: ExceptT String IO ByteString
getPoolsPayload = do
req <- withExceptT show $ except $ requestFromURI uri
liftIO $ traceWith tr $ MsgFetchDelistedPools uri
ExceptT
$ handle fromIOException
$ handle fromHttpException
$ withResponse req manager $ \res -> do
case responseStatus res of
s | s == status200 -> do
let body = responseBody res
Right . BS.concat <$> brConsume body

s -> do
pure $ Left $ "The server replied something unexpected: " <> show s

runExceptTLog
:: ExceptT String IO [PoolId]
-> IO (Maybe [PoolId])
runExceptTLog action = runExceptT action >>= \case
Left msg ->
Nothing <$ traceWith tr (MsgFetchDelistedPoolsFailure msg)

Right meta ->
Just meta <$ traceWith tr (MsgFetchDelistedPoolsSuccess meta)

fromHttpException :: Monad m => HttpException -> m (Either String a)
fromHttpException = return . Left . ("HTTp Exception exception: " <>) . show

-- TODO: refactor/simplify this
fetchFromRemote
:: Tracer IO StakePoolMetadataFetchLog
-> [ PoolId
Expand Down Expand Up @@ -207,17 +251,21 @@ fetchFromRemote tr builders manager pid url hash = runExceptTLog $ do
s -> do
pure $ Left $ "The server replied something unexpected: " <> show s

fromIOException :: Monad m => IOException -> m (Either String a)
fromIOException = return . Left . ("IO exception: " <>) . show

fromHttpException :: Monad m => HttpException -> m (Either String (Maybe a))
fromHttpException = const (return $ Right Nothing)

fromIOException :: Monad m => IOException -> m (Either String a)
fromIOException = return . Left . ("IO exception: " <>) . show


data StakePoolMetadataFetchLog
= MsgFetchPoolMetadata StakePoolMetadataHash URI
| MsgFetchPoolMetadataSuccess StakePoolMetadataHash StakePoolMetadata
| MsgFetchPoolMetadataFailure StakePoolMetadataHash String
| MsgFetchPoolMetadataFallback URI Bool
| MsgFetchDelistedPools URI
| MsgFetchDelistedPoolsFailure String
| MsgFetchDelistedPoolsSuccess [PoolId]
deriving (Show, Eq)

instance HasPrivacyAnnotation StakePoolMetadataFetchLog
Expand All @@ -227,6 +275,9 @@ instance HasSeverityAnnotation StakePoolMetadataFetchLog where
MsgFetchPoolMetadataSuccess{} -> Info
MsgFetchPoolMetadataFailure{} -> Warning
MsgFetchPoolMetadataFallback{} -> Warning
MsgFetchDelistedPools{} -> Info
MsgFetchDelistedPoolsFailure{} -> Warning
MsgFetchDelistedPoolsSuccess{} -> Info

instance ToText StakePoolMetadataFetchLog where
toText = \case
Expand All @@ -247,3 +298,13 @@ instance ToText StakePoolMetadataFetchLog where
then ""
else " Falling back using a different strategy."
]
MsgFetchDelistedPools uri -> mconcat
[ "Fetching delisted pools from ", T.pack (show uri)
]
MsgFetchDelistedPoolsSuccess poolIds -> mconcat
[ "Successfully fetched delisted pools: "
, T.pack (show poolIds)
]
MsgFetchDelistedPoolsFailure err -> mconcat
[ "Failed to fetch delisted pools: ", T.pack err
]
2 changes: 1 addition & 1 deletion lib/core/src/Cardano/Wallet/DB/Sqlite/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ import Data.Text.Class
import Data.Text.Encoding
( decodeUtf8, encodeUtf8 )
import Data.Time.Clock.POSIX
(utcTimeToPOSIXSeconds, POSIXTime, posixSecondsToUTCTime )
( POSIXTime, posixSecondsToUTCTime, utcTimeToPOSIXSeconds )
import Data.Word
( Word32, Word64 )
import Data.Word.Odd
Expand Down
1 change: 1 addition & 0 deletions lib/shelley/cardano-wallet.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ library
, memory
, network
, network-mux
, network-uri
, optparse-applicative
, ouroboros-consensus
, ouroboros-consensus-byron
Expand Down
3 changes: 2 additions & 1 deletion lib/shelley/src/Cardano/Wallet/Shelley.hs
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ serveWallet

forM_ settings $ atomically . putSettings
void $ forkFinally (monitorStakePools tr gp nl db) onExit
spl <- newStakePoolLayer nl db $ forkFinally (monitorMetadata tr gp db) onExit
spl <- newStakePoolLayer nl db
$ forkFinally (monitorMetadata poolsEngineTracer tr gp db) onExit
action spl
where
tr = contramap (MsgFromWorker mempty) poolsEngineTracer
Expand Down
93 changes: 79 additions & 14 deletions lib/shelley/src/Cardano/Wallet/Shelley/Pools.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import Cardano.Pool.DB
import Cardano.Pool.Metadata
( StakePoolMetadataFetchLog
, defaultManagerSettings
, fetchDelistedPools
, fetchFromRemote
, identityUrlBuilder
, newManager
Expand Down Expand Up @@ -93,6 +94,8 @@ import Cardano.Wallet.Primitive.Types
, getPoolRetirementCertificate
, unSmashServer
)
import Cardano.Wallet.Registry
( WorkerLog, defaultWorkerAfter )
import Cardano.Wallet.Shelley.Compatibility
( Shelley
, StandardCrypto
Expand All @@ -106,9 +109,9 @@ import Cardano.Wallet.Shelley.Network
import Cardano.Wallet.Unsafe
( unsafeMkPercentage )
import Control.Concurrent
( threadDelay )
( forkFinally, threadDelay )
import Control.Exception
( SomeException (..), bracket, mask_, try )
( SomeException (..), bracket, finally, mask_, try )
import Control.Monad
( forM, forM_, forever, void, when, (<=<) )
import Control.Monad.IO.Class
Expand All @@ -134,13 +137,15 @@ import Data.Map
import Data.Map.Merge.Strict
( dropMissing, traverseMissing, zipWithAMatched, zipWithMatched )
import Data.Maybe
( catMaybes )
( catMaybes, fromMaybe )
import Data.Ord
( Down (..) )
import Data.Quantity
( Percentage (..), Quantity (..) )
import Data.Set
( Set )
import Data.Text
( Text )
import Data.Text.Class
( ToText (..) )
import Data.Tuple.Extra
Expand All @@ -153,6 +158,8 @@ import GHC.Conc
( TVar, ThreadId, killThread, newTVarIO, readTVarIO, writeTVar )
import GHC.Generics
( Generic )
import Network.URI
( URI (..) )
import Ouroboros.Consensus.Cardano.Block
( CardanoBlock, HardForkBlock (..) )
import System.Random
Expand All @@ -164,6 +171,8 @@ import qualified Data.List.NonEmpty as NE
import qualified Data.Map.Merge.Strict as Map
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
( getPOSIXTime, posixDayLength )
import qualified GHC.Conc as STM

--
Expand Down Expand Up @@ -682,24 +691,45 @@ monitorStakePools tr gp nl DBLayer{..} =

-- | Worker thread that monitors pool metadata and syncs it to the database.
monitorMetadata
:: Tracer IO StakePoolLog
:: Tracer IO (WorkerLog Text StakePoolLog)
-> Tracer IO StakePoolLog
-> GenesisParameters
-> DBLayer IO
-> IO ()
monitorMetadata tr gp DBLayer{..} = do
monitorMetadata tr' tr gp db@(DBLayer{..}) = do
settings <- atomically readSettings
manager <- newManager defaultManagerSettings

let fetcher fetchStrategies = fetchFromRemote trFetch fetchStrategies manager
forever $ do
(refs, successes) <- case poolMetadataSource settings of
FetchNone -> pure ([], [])
FetchDirect -> fetchThem $ fetcher [identityUrlBuilder]
FetchSMASH (unSmashServer -> uri) -> fetchThem $ fetcher [registryUrlBuilder uri]

when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency
loop getPoolMetadata = forever $ do
(refs, successes) <- getPoolMetadata
when (null refs || null successes) $ do
traceWith tr $ MsgFetchTakeBreak blockFrequency
threadDelay blockFrequency

case poolMetadataSource settings of
FetchNone -> loop (pure ([], [])) -- TODO: exit loop?
FetchDirect -> loop (fetchThem $ fetcher [identityUrlBuilder])
FetchSMASH (unSmashServer -> uri) -> do
tid <-
forkFinally
(gcDelistedPools tr db
(fetchDelistedPools trFetch (toDelistedPoolsURI uri) manager)
)
onExit
flip finally (killThread tid) $
loop (fetchThem $ fetcher [registryUrlBuilder uri])

where
-- Currently the SMASH url points to the full API path, e.g.
-- https://smash.cardano-testnet.iohkdev.io/api/v1/monitorMetadata
-- so we need to recover/infer the delisted pools url.
-- TODO:
-- - require the smash url to only specify sheme and host
-- - use smash servant types to call the endpoints
toDelistedPoolsURI uri =
uri { uriPath = "/api/v1/delisted" , uriQuery = "", uriFragment = "" }

trFetch = contramap MsgFetchPoolMetadata tr
-- We mask this entire section just in case, although the database
-- operations runs masked anyway. Unfortunately we cannot run
Expand All @@ -723,6 +753,34 @@ monitorMetadata tr gp DBLayer{..} = do
toMicroSecond = (`div` 1000000) . fromEnum
slotLength = unSlotLength $ getSlotLength gp
f = unActiveSlotCoefficient (getActiveSlotCoefficient gp)
onExit = defaultWorkerAfter tr'

gcDelistedPools
:: Tracer IO StakePoolLog
-> DBLayer IO
-> IO (Maybe [PoolId]) -- ^ delisted pools fetcher
-> IO ()
gcDelistedPools tr DBLayer{..} fetchDelisted = forever $ do
lastGC <- atomically readLastMetadataGC
currentTime <- getPOSIXTime

let timeSinceLastGC = currentTime - lastGC
sixHours = posixDayLength / 4
if timeSinceLastGC > sixHours
then do
delistedPools <- fmap (fromMaybe []) fetchDelisted
atomically $ do
putLastMetadataGC currentTime
delistPools delistedPools
else do
-- Sleep for 60 seconds. This is useful in case
-- something else is modifying the last sync time
-- in the database.
let sec_to_milisec = (* 1000000)
sleep_time = sec_to_milisec 60
traceWith tr $ MsgGCTakeBreak sleep_time
threadDelay sleep_time
pure ()

data StakePoolLog
= MsgFollow FollowLog
Expand All @@ -736,6 +794,7 @@ data StakePoolLog
| MsgErrProduction ErrPointAlreadyExists
| MsgFetchPoolMetadata StakePoolMetadataFetchLog
| MsgFetchTakeBreak Int
| MsgGCTakeBreak Int
deriving (Show, Eq)

data PoolGarbageCollectionInfo = PoolGarbageCollectionInfo
Expand Down Expand Up @@ -763,6 +822,7 @@ instance HasSeverityAnnotation StakePoolLog where
MsgErrProduction{} -> Error
MsgFetchPoolMetadata e -> getSeverityAnnotation e
MsgFetchTakeBreak{} -> Debug
MsgGCTakeBreak{} -> Debug

instance ToText StakePoolLog where
toText = \case
Expand Down Expand Up @@ -805,3 +865,8 @@ instance ToText StakePoolLog where
, "back to it in about "
, pretty (fixedF 1 (toRational delay / 1000000)), "s"
]
MsgGCTakeBreak delay -> mconcat
[ "Taking a little break from GCing delisted metadata pools, "
, "back to it in about "
, pretty (fixedF 1 (toRational delay / 1000000)), "s"
]
1 change: 1 addition & 0 deletions nix/.stack.nix/cardano-wallet.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c0dc034

Please sign in to comment.