Skip to content

Commit

Permalink
[fix] some fixes as asked for by Gregory and some streamlining
Browse files Browse the repository at this point in the history
- derivive pretty via show instead of using viaShow directly
- more documentation
- more where clauses where possible
- separation of the startWorker function
- factor out some part of the serverlog
  • Loading branch information
MangoIV committed Apr 3, 2023
1 parent 88d01f5 commit f46bd78
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 32 deletions.
49 changes: 38 additions & 11 deletions delegate-app/HydraAuction/Delegate/Server.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
module HydraAuction.Delegate.Server (
-- * Delegate server types

-- ** Server config types
DelegateServerConfig (..),

-- ** Server log types
DelegateServerLog (..),
DelegateError (..),
ThreadSort (..),
ThreadEvent (..),
QueueAuctionPhaseEvent (..),

-- ** delegate tracing
DelegateTracerT,

-- * wai extras
ServerAppT,

-- * pretty extras
ViaShow (..),
extraInfo,
) where

-- Prelude imports
Expand Down Expand Up @@ -49,8 +58,7 @@ data DelegateServerLog
| FrontendInput FrontendRequest
| DelegateOutput DelegateResponse
| DelegateError DelegateError
| StartThread ThreadSort
| CancelThread ThreadSort
| ThreadEvent ThreadEvent ThreadSort
| QueueAuctionPhaseEvent QueueAuctionPhaseEvent
deriving stock (Eq, Show, Generic)

Expand All @@ -60,18 +68,30 @@ instance Pretty DelegateServerLog where
FrontendConnected -> "Frontend connected to Server"
DelegateOutput out -> "Delegate output" <> extraInfo (viaShow out)
FrontendInput inp -> "Frontend input" <> extraInfo (viaShow inp)
DelegateError err -> "Delegate error" <> extraInfo (pretty err)
StartThread info -> "Thread" <+> viaShow info <+> "was started"
CancelThread info -> "Thread" <+> viaShow info <+> "was cancelled"
DelegateError err -> "Delegate runner error occured" <> extraInfo (pretty err)
ThreadEvent ev info -> "Thread" <+> pretty info <> ":" <> extraInfo (pretty ev)
QueueAuctionPhaseEvent ev -> "Auction phase queueing" <> extraInfo (pretty ev)

-- | Which specific thrad was cancelled
-- | The event type to occur
data ThreadEvent
= ThreadStarted
| ThreadCancelled
deriving stock (Eq, Ord, Show)

instance Pretty ThreadEvent where
pretty = \case
ThreadStarted -> "Thread started"
ThreadCancelled -> "Thread cancelled"

-- | Which specific thrad the event originates from
data ThreadSort
= WebsocketThread
| DelegateRunnerThread
| QueueAuctionStageThread
deriving stock (Eq, Ord, Show)
deriving (Pretty) via ViaShow ThreadSort

-- | an event happening in the queueAuctionPhase thread
newtype QueueAuctionPhaseEvent
= ReceivedAuctionSet AuctionTerms
deriving stock (Eq, Ord, Show)
Expand All @@ -84,19 +104,26 @@ instance Pretty QueueAuctionPhaseEvent where
transformer
-}
newtype DelegateError = FrontendNoParse String
deriving stock (Eq, Show, Generic)
deriving stock (Eq, Ord, Show, Generic)
deriving anyclass (FromJSON, ToJSON)

instance Pretty DelegateError where
pretty = \case
FrontendNoParse err -> "Could not parse the input provided by the frontend" <> extraInfo (pretty err)

-- | additional information on a log event
extraInfo :: forall ann. Doc ann -> Doc ann
extraInfo = (line <>) . indent 2

-- | trace a 'DelegateServerLog'
type DelegateTracerT = TracerT DelegateServerLog

-- | like @ServerApp@ but can be used with a transformer
type ServerAppT m = PendingConnection -> m ()

-- | a newtype for deriving Pretty via Show
newtype ViaShow a = ViaShow {unViaShow :: a}
deriving stock (Eq, Ord, Generic)

instance Show a => Pretty (ViaShow a) where
pretty = viaShow . unViaShow

-- | additional information on a log event
extraInfo :: forall ann. Doc ann -> Doc ann
extraInfo = (line <>) . indent 2
56 changes: 35 additions & 21 deletions delegate-app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Control.Concurrent.STM (
)
import Control.Monad (forever, (>=>))
import Control.Monad.Trans (MonadIO (liftIO), MonadTrans (lift))
import Control.Tracer (Tracer, contramap, stdoutTracer, traceWith)
import Control.Tracer (Tracer, contramap, stdoutTracer)
import Data.Aeson (ToJSON, eitherDecode, encode)
import Data.Maybe (fromMaybe)
import Network.WebSockets (
Expand All @@ -36,7 +36,8 @@ import Network.WebSockets (
import Prettyprinter (Pretty (pretty))

-- Hydra imports
import Hydra.Network (IP)
-- Hydra imports
import Hydra.Network (IP, PortNumber)

-- Plutus imports
import Plutus.V2.Ledger.Api (POSIXTime (..))
Expand All @@ -60,18 +61,18 @@ import HydraAuction.Delegate.Server (
tick
),
DelegateServerLog (
CancelThread,
DelegateError,
DelegateOutput,
FrontendConnected,
FrontendInput,
QueueAuctionPhaseEvent,
StartThread,
Started
Started,
ThreadEvent
),
DelegateTracerT,
QueueAuctionPhaseEvent (ReceivedAuctionSet),
ServerAppT,
ThreadEvent (ThreadCancelled, ThreadStarted),
ThreadSort (DelegateRunnerThread, QueueAuctionStageThread, WebsocketThread),
)
import HydraAuction.OnChain.Common (secondsLeftInInterval, stageToInterval)
Expand Down Expand Up @@ -103,6 +104,7 @@ delegateServerApp pingSecs reqq broadcast pending = do
let encodeSend :: forall a. ToJSON a => a -> IO ()
encodeSend = sendTextData connection . encode

receive :: forall void. IO void
receive = forever $
runWithTracer' tracer $ do
inp <- liftIO $ receiveData connection
Expand All @@ -115,6 +117,7 @@ delegateServerApp pingSecs reqq broadcast pending = do
trace $ FrontendInput req
liftIO . atomically . writeTQueue reqq $ FrontendRequest req

send :: forall void. IO void
send =
forever $
atomically (readTChan broadcast')
Expand All @@ -130,13 +133,14 @@ delegateServerApp pingSecs reqq broadcast pending = do
accordingly
-}
mkRunner ::
forall void.
-- | the time in milliseconds that the runner sleeps between acts
Int ->
-- | the queue of incoming messages
TQueue DelegateInput ->
-- | the broadcast queue of outgoing messages (write only)
TChan DelegateResponse ->
DelegateRunnerT (DelegateTracerT IO) ()
DelegateRunnerT (DelegateTracerT IO) void
-- FIXME: we need to abort at some point but this doesn't seem
-- to be implemented yet so we just go on
mkRunner tick reqq broadcast = forever $ do
Expand Down Expand Up @@ -165,32 +169,33 @@ runDelegateServer conf = do
pure q
delegateBroadCastChan <- liftIO . atomically $ newTChan

let startWorker :: ThreadSort -> IO () -> IO ()
startWorker thread act = do
traceWith tracer $ StartThread thread
act
traceWith tracer $ CancelThread thread

wsServer, delegateRunner, queueAuctionPhases :: IO ()
let wsServer, delegateRunner, queueAuctionPhases :: IO ()
wsServer =
runServer (show $ host conf) (fromIntegral $ port conf) $
runWithTracer' tracer . delegateServerApp (ping conf) delegateInputChan delegateBroadCastChan

delegateRunner =
runWithTracer' tracer $ execDelegateRunnerT $ mkRunner (tick conf) delegateInputChan delegateBroadCastChan
runWithTracer' tracer . execDelegateRunnerT $ mkRunner (tick conf) delegateInputChan delegateBroadCastChan

queueAuctionPhases = runWithTracer' tracer $ mbQueueAuctionPhases delegateInputChan delegateBroadCastChan
queueAuctionPhases = runWithTracer' tracer $ mbQueueAuctionPhases (tick conf) delegateInputChan delegateBroadCastChan

liftIO $
mapConcurrently_
(uncurry startWorker)
(runWithTracer' tracer . uncurry startWorker)
[ (WebsocketThread, wsServer)
, (QueueAuctionStageThread, queueAuctionPhases)
, (DelegateRunnerThread, delegateRunner)
]

mbQueueAuctionPhases :: TQueue DelegateInput -> TChan DelegateResponse -> DelegateTracerT IO ()
mbQueueAuctionPhases reqq broadcast = do
-- | start a worker and log it
startWorker :: ThreadSort -> IO () -> DelegateTracerT IO ()
startWorker thread act = do
trace $ ThreadEvent ThreadStarted thread
liftIO act
trace $ ThreadEvent ThreadCancelled thread

mbQueueAuctionPhases :: Int -> TQueue DelegateInput -> TChan DelegateResponse -> DelegateTracerT IO ()
mbQueueAuctionPhases tick reqq broadcast = do
terms <- liftIO . atomically $ getTerms =<< dupTChan broadcast
liftIO $ queueStages terms
traceQueueEvent terms
Expand All @@ -213,9 +218,12 @@ mbQueueAuctionPhases reqq broadcast = do
case mSecsLeft of
Nothing -> pure ()
Just s -> do
threadDelay (fromInteger s * 1000)
threadDelay (fromInteger s * tick)
queueStages terms

{- | start a delegate server at a @$PORT@, it accepts incoming websocket connections and
runs the delegate
-}
main :: IO ()
main = do
port <- lookupEnv "PORT"
Expand All @@ -224,9 +232,9 @@ main = do
conf =
DelegateServerConfig
{ host
, port = fromMaybe 8080 $ port >>= readMaybe
, port = fromMaybe portDefault $ port >>= readMaybe
, tick = tick
, ping = 30
, ping = ping
}
runWithTracer' tracer $ runDelegateServer conf
where
Expand All @@ -238,3 +246,9 @@ main = do

tick :: Int
tick = 1_000

ping :: Int
ping = 30

portDefault :: PortNumber
portDefault = 8080

0 comments on commit f46bd78

Please sign in to comment.