diff --git a/hasql.cabal b/hasql.cabal index c7c2719..0ea4f23 100644 --- a/hasql.cabal +++ b/hasql.cabal @@ -31,6 +31,7 @@ common base NoMonomorphismRestriction Arrows BangPatterns + BlockArguments ConstraintKinds DataKinds DefaultSignatures @@ -82,6 +83,7 @@ library Hasql.Connection Hasql.Decoders Hasql.Encoders + Hasql.Pipeline Hasql.Session Hasql.Statement @@ -101,6 +103,7 @@ library Hasql.Encoders.Value Hasql.Errors Hasql.IO + Hasql.Pipeline.Core Hasql.PostgresTypeInfo Hasql.Prelude Hasql.PreparedStatementRegistry diff --git a/library/Hasql/Decoders/Result.hs b/library/Hasql/Decoders/Result.hs index 8bd3d55..434bbe9 100644 --- a/library/Hasql/Decoders/Result.hs +++ b/library/Hasql/Decoders/Result.hs @@ -25,6 +25,7 @@ noResult = checkExecStatus $ \case LibPQ.CommandOk -> True LibPQ.TuplesOk -> True + LibPQ.PipelineSync -> True _ -> False {-# INLINE rowsAffected #-} @@ -66,7 +67,11 @@ checkExecStatus predicate = LibPQ.NonfatalError -> serverError LibPQ.FatalError -> serverError LibPQ.EmptyQuery -> return () - _ -> Result $ lift $ ExceptT $ pure $ Left $ UnexpectedResult $ "Unexpected result status: " <> (fromString $ show status) + _ -> unexpectedResult $ "Unexpected result status: " <> (fromString $ show status) + +unexpectedResult :: Text -> Result a +unexpectedResult = + Result . lift . ExceptT . pure . Left . UnexpectedResult {-# INLINE serverError #-} serverError :: Result () diff --git a/library/Hasql/Encoders/Params.hs b/library/Hasql/Encoders/Params.hs index de4cd81..43cd779 100644 --- a/library/Hasql/Encoders/Params.hs +++ b/library/Hasql/Encoders/Params.hs @@ -7,6 +7,13 @@ import Hasql.Prelude import PostgreSQL.Binary.Encoding qualified as B import Text.Builder qualified as E +renderReadable :: Params a -> a -> [Text] +renderReadable (Params (Op encoderOp)) params = + foldr step [] (encoderOp params) + where + step (_, _, _, rendering) acc = + rendering : acc + -- | -- Encoder of some representation of a parameters product. newtype Params a diff --git a/library/Hasql/Pipeline.hs b/library/Hasql/Pipeline.hs new file mode 100644 index 0000000..dce834f --- /dev/null +++ b/library/Hasql/Pipeline.hs @@ -0,0 +1,7 @@ +module Hasql.Pipeline + ( Pipeline, + statement, + ) +where + +import Hasql.Pipeline.Core diff --git a/library/Hasql/Pipeline/Core.hs b/library/Hasql/Pipeline/Core.hs new file mode 100644 index 0000000..440fd20 --- /dev/null +++ b/library/Hasql/Pipeline/Core.hs @@ -0,0 +1,66 @@ +module Hasql.Pipeline.Core where + +import Database.PostgreSQL.LibPQ qualified as Pq +import Hasql.Connection.Core qualified as Connection +import Hasql.Decoders.All qualified as Decoders +import Hasql.Encoders.All qualified as Encoders +import Hasql.Encoders.Params qualified as Encoders.Params +import Hasql.Errors +import Hasql.IO qualified as IO +import Hasql.Prelude +import Hasql.PreparedStatementRegistry qualified as PreparedStatementRegistry +import Hasql.Statement qualified as Statement + +run :: Pipeline a -> Connection.Connection -> IO (Either QueryError a) +run (Pipeline send recv) (Connection.Connection pqConnectionRef integerDatetimes registry) = + withMVar pqConnectionRef \pqConnection -> do + Pq.enterPipelineMode pqConnection + sendResult <- send pqConnection integerDatetimes registry + Pq.pipelineSync pqConnection + recvResult <- recv pqConnection integerDatetimes + Pq.exitPipelineMode pqConnection + pure (sendResult *> recvResult) + +data Pipeline a + = Pipeline + -- | Send commands. + (Pq.Connection -> Bool -> PreparedStatementRegistry.PreparedStatementRegistry -> IO (Either QueryError ())) + -- | Receive results. + (Pq.Connection -> Bool -> IO (Either QueryError a)) + deriving (Functor) + +instance Applicative Pipeline where + pure a = + Pipeline send recv + where + send _ _ _ = + pure (Right ()) + recv _ _ = + pure (Right a) + + Pipeline lSend lRecv <*> Pipeline rSend rRecv = + Pipeline send recv + where + send pqConn idt pReg = do + lSendRes <- lSend pqConn idt pReg + rSendRes <- rSend pqConn idt pReg + pure (lSendRes *> rSendRes) + recv pqConn idt = do + lRecvRes <- lRecv pqConn idt + rRecvRes <- rRecv pqConn idt + pure (lRecvRes <*> rRecvRes) + +statement :: params -> Statement.Statement params result -> Pipeline result +statement params (Statement.Statement template (Encoders.Params paramsEncoder) (Decoders.Result decoder) preparable) = + Pipeline send recv + where + send pqConnection integerDatetimes registry = + mapLeft commandToQueryError + <$> IO.sendParametricStatement pqConnection integerDatetimes registry template paramsEncoder preparable params + + recv pqConnection integerDatetimes = + mapLeft commandToQueryError + <$> IO.getResults pqConnection integerDatetimes decoder + + commandToQueryError = + QueryError template (Encoders.Params.renderReadable paramsEncoder params) diff --git a/library/Hasql/Session/Core.hs b/library/Hasql/Session/Core.hs index f57c9d0..1b7176e 100644 --- a/library/Hasql/Session/Core.hs +++ b/library/Hasql/Session/Core.hs @@ -1,12 +1,14 @@ module Hasql.Session.Core where import Hasql.Connection.Core qualified as Connection +import Hasql.Decoders.All qualified as Decoders import Hasql.Decoders.Result qualified as Decoders.Result import Hasql.Decoders.Results qualified as Decoders.Results import Hasql.Encoders.All qualified as Encoders import Hasql.Encoders.Params qualified as Encoders.Params import Hasql.Errors import Hasql.IO qualified as IO +import Hasql.Pipeline.Core qualified as Pipeline import Hasql.Prelude import Hasql.Statement qualified as Statement @@ -46,20 +48,18 @@ sql sql = -- | -- Parameters and a specification of a parametric single-statement query to apply them to. statement :: params -> Statement.Statement params result -> Session result -statement input (Statement.Statement template (Encoders.Params paramsEncoder) decoder preparable) = +statement input (Statement.Statement template (Encoders.Params paramsEncoder) (Decoders.Result decoder) preparable) = Session $ ReaderT $ \(Connection.Connection pqConnectionRef integerDatetimes registry) -> ExceptT - $ fmap (mapLeft (QueryError template inputReps)) + $ fmap (mapLeft (QueryError template (Encoders.Params.renderReadable paramsEncoder input))) $ withMVar pqConnectionRef $ \pqConnection -> do r1 <- IO.sendParametricStatement pqConnection integerDatetimes registry template paramsEncoder preparable input - r2 <- IO.getResults pqConnection integerDatetimes (unsafeCoerce decoder) + r2 <- IO.getResults pqConnection integerDatetimes decoder return $ r1 *> r2 - where - inputReps = - let Encoders.Params.Params (Op encoderOp) = paramsEncoder - step (_, _, _, rendering) acc = - rendering : acc - in foldr step [] (encoderOp input) + +pipeline :: Pipeline.Pipeline result -> Session result +pipeline pipeline = + Session $ ReaderT $ ExceptT . Pipeline.run pipeline