Skip to content

Commit

Permalink
Fix logging for streaming and updateMany
Browse files Browse the repository at this point in the history
  • Loading branch information
jatcwang committed Jul 16, 2024
1 parent d8a2afd commit a78f0b4
Show file tree
Hide file tree
Showing 24 changed files with 1,176 additions and 359 deletions.
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ lazy val weaverVersion = "0.8.4"
ThisBuild / tlBaseVersion := "1.0"
ThisBuild / tlCiReleaseBranches := Seq("main") // publish snapshots on `main`
ThisBuild / tlCiScalafmtCheck := true
ThisBuild / scalaVersion := scala3Version
ThisBuild / scalaVersion := scala213Version
ThisBuild / crossScalaVersions := Seq(scala212Version, scala213Version, scala3Version)
ThisBuild / developers += tlGitHubDev("tpolecat", "Rob Norris")
ThisBuild / tlSonatypeUseLegacyHost := false
Expand Down Expand Up @@ -239,7 +239,9 @@ lazy val core = project
"com.chuusai" %% "shapeless" % shapelessVersion
).filterNot(_ => tlIsScala3.value) ++ Seq(
"org.tpolecat" %% "typename" % "1.1.0",
"com.h2database" % "h2" % h2Version % "test"
"com.h2database" % "h2" % h2Version % "test",
"org.postgresql" % "postgresql" % postgresVersion % "test",
"org.mockito" % "mockito-core" % "5.12.0" % Test
),
scalacOptions += "-Yno-predef",
Compile / unmanagedSourceDirectories += {
Expand Down
2 changes: 1 addition & 1 deletion modules/bench/src/main/scala/doobie/bench/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class text {

def naive(n: Int): ConnectionIO[Int] =
ddl *> HC.prepareStatement("insert into bench_person (name, age) values (?, ?)")(
people(n).foldRight(HPS.executeBatch)((p, k) => HPS.set(p) *> HPS.addBatch *> k)
people(n).foldRight(HPS.executeBatch)((p, k) => HPS.set(p) *> FPS.addBatch *> k)
).map(_.combineAll)

def optimized(n: Int): ConnectionIO[Int] =
Expand Down
252 changes: 234 additions & 18 deletions modules/core/src/main/scala/doobie/hi/connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package doobie.hi

import cats.Foldable
import cats.data.Ior
import cats.effect.kernel.syntax.monadCancel._
import cats.effect.Sync
import cats.effect.syntax.monadCancel._
import cats.syntax.all._
import doobie.enumerated.AutoGeneratedKeys
import doobie.enumerated.Holdability
Expand All @@ -19,33 +20,208 @@ import doobie.util.analysis.ColumnMeta
import doobie.util.analysis.ParameterMeta
import doobie.util.compat.propertiesToScala
import doobie.util.stream.repeatEvalChunks
import doobie.util.log.{LogEvent, LoggingInfo}
import doobie.util.{Get, Put, Read, Write}
import fs2.Stream
import fs2.Stream.{eval, bracket}
import doobie.hi.{preparedstatement => IHPS}
import doobie.free.{
preparedstatement => IFPS,
callablestatement => IFCS,
connection => IFC,
resultset => IFRS,
databasemetadata => IFDMD,
statement => IFS,
callablestatement => IFCS
preparedstatement => IFPS,
resultset => IFRS,
statement => IFS
}

import java.sql.{Savepoint, PreparedStatement, ResultSet}
import scala.Predef._
import scala.concurrent.duration.{Duration, FiniteDuration, NANOSECONDS}
import java.sql.{PreparedStatement, ResultSet, Savepoint}
import scala.collection.immutable.Map

/** Module of high-level constructors for `ConnectionIO` actions.
*
* @group Modules
*/

object connection {

import implicits._

/** @group Lifting */
def delay[A](a: => A): ConnectionIO[A] =
IFC.delay(a)

/** Create and execute a PreparedStatement and then process the ResultSet. This generalized two uses query patterns:
*
* - Execute a normal SELECT query
* - Executing an update and using withGeneratedKeys/RETURNING to return some rows
*
* In both cases, a ResultSet is returned which need to be processed.
*
* Errors at each step are handled and logged, with cleanups (closing the PreparedStatement/ResultSet.)
*
* For usage example, see [[doobie.util.query.Query.to]] or [[doobie.util.update.Update.withUniqueGeneratedKeys]]
* which uses this function.
*
* @param create
* Create the PreparedStatement, using e.g. `doobie.FC.prepareStatement`
* @param prep
* Prepare steps before execution, such as setting parameters
* @param exec
* How to execute the PreparedStatment. e.g. `doobie.FPS.executeQuery`
* @param process
* Process steps for the ResultSet
* @param loggingInfo
* Information for logging
*/
def executeWithResultSet[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
process: ResultSetIO[A],
loggingInfo: LoggingInfo
): ConnectionIO[A] =
execImpl(
create,
prep,
Right((exec, process)),
loggingInfo
)

/** Create and execute a PreparedStatement which immediately returns the result without reading from a ResultSet. The
* most common case is executing an INSERT/UPDATE and it returning the rows inserted/updated. If the query you're
* executing returns a ResultSet, use `executeWithResultSet` instead for better logging and resource cleanup.
*
* Errors at each step are handled and logged, with the PreparedStatement being closed at the end.
*
* For usage examples, see [[doobie.util.update.Update.updateMany]] which is built on this function
* @param create
* Create the PreparedStatement, using e.g. `doobie.FC.prepareStatement`
* @param prep
* Prepare steps before execution, such as setting parameters
* @param exec
* How the PreparedStatement will be executed. e.g. `doobie.FPS.executeQuery`
* @param loggingInfo
* Information for logging
*/
def executeWithoutResultSet[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[A],
loggingInfo: LoggingInfo
): ConnectionIO[A] =
execImpl(
create,
prep,
Left(exec),
loggingInfo
)

private def execImpl[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
execAndProcess: Either[PreparedStatementIO[A], (PreparedStatementIO[ResultSet], ResultSetIO[A])],
loggingInfo: LoggingInfo
): ConnectionIO[A] = {
def execAndProcessLogged: PreparedStatementIO[A] = {
execAndProcess match {
case Left(execDirectToResult) =>
attemptTimed(execDirectToResult)
.flatMap {
case (Left(e), execDur) =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e))
.flatMap(_ => IFPS.raiseError(e))
case (Right(a), execDur) =>
IFPS.performLogging(LogEvent.success(loggingInfo, execDur, Duration.Zero))
.as(a)
}
case Right((execIO, rsIO)) =>
attemptTimed(execIO)
.flatMap {
case (Left(e), execDur) =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e))
.flatMap(_ => IFPS.raiseError(e))
case (Right(resultSet), execDur) =>
IFPS.pure(resultSet).bracket(
IFPS.embed(_, processLogged(rsIO, execDur))
)(
IFPS.embed(_, IFRS.close)
)
}
}
}

def processLogged(
process: ResultSetIO[A],
execDuration: FiniteDuration
): ResultSetIO[A] = {
attemptTimed(process)
.flatMap {
case (Left(e), processDuration) =>
IFRS.performLogging(LogEvent.processingFailure(loggingInfo, execDuration, processDuration, e))
.flatMap(_ => IFRS.raiseError[A](e))
case (Right(a), processDuration) =>
IFRS.performLogging(LogEvent.success(loggingInfo, execDuration, processDuration))
.as(a)
}
}

val createLogged = create.onError { case e =>
IFC.performLogging(LogEvent.execFailure(loggingInfo, Duration.Zero, e))
}

val prepLogged = prep.onError { case e =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, Duration.Zero, e))
}

createLogged
.bracket(ps => IFC.embed(ps, prepLogged *> execAndProcessLogged))(ps => IFC.embed(ps, IFPS.close))
}

/** Execute a PreparedStatement query and provide rows from the ResultSet in chunks
*
* For usage examples, see [[doobie.util.query.Query.streamWithChunkSize]] which is implemented on top of this
* function.
* @param create
* Create the PreparedStatement, using e.g. `doobie.FC.prepareStatement`
* @param prep
* Prepare steps before execution, such as setting parameters
* @param exec
* How the PreparedStatement will be executed. e.g. `doobie.FPS.executeQuery`
* @param chunkSize
* Fetch size to hint to JDBC driver
* @param loggingInfo
* Logging information
*/
def stream[A: Read](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
chunkSize: Int,
loggingInfo: LoggingInfo
): Stream[ConnectionIO, A] = {
val execLogged: PreparedStatementIO[ResultSet] =
attemptTimed(exec)
.flatMap {
case (Left(e), execDur) =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e))
.flatMap(_ => IFPS.raiseError(e))
case (Right(resultSet), execDur) =>
IFPS.performLogging(LogEvent.success(loggingInfo, execDur, Duration.Zero))
.as(resultSet)
}

for {
ps <- Stream.bracket(runPreExecWithLogging(create, loggingInfo))(IFC.embed(_, IFPS.close))
_ <- Stream.eval(runPreExecWithLogging(IFC.embed(ps, IFPS.setFetchSize(chunkSize) *> prep), loggingInfo))
resultSet <- Stream.bracket(
IFC.embed(ps, execLogged)
)(rs => IFC.embed(rs, IFRS.close))
ele <- repeatEvalChunks(IFC.embed(resultSet, resultset.getNextChunk[A](chunkSize)))
} yield ele
}

// Old implementation, used by deprecated methods
private def liftStream[A: Read](
chunkSize: Int,
create: ConnectionIO[PreparedStatement],
Expand All @@ -54,7 +230,7 @@ object connection {
): Stream[ConnectionIO, A] = {

def prepared(ps: PreparedStatement): Stream[ConnectionIO, PreparedStatement] =
eval[ConnectionIO, PreparedStatement] {
Stream.eval[ConnectionIO, PreparedStatement] {
val fs = IFPS.setFetchSize(chunkSize)
IFC.embed(ps, fs *> prep).map(_ => ps)
}
Expand All @@ -63,46 +239,86 @@ object connection {
repeatEvalChunks(IFC.embed(rs, resultset.getNextChunk[A](chunkSize)))

val preparedStatement: Stream[ConnectionIO, PreparedStatement] =
bracket(create)(IFC.embed(_, IFPS.close)).flatMap(prepared)
Stream.bracket(create)(IFC.embed(_, IFPS.close)).flatMap(prepared)

def results(ps: PreparedStatement): Stream[ConnectionIO, A] =
bracket(IFC.embed(ps, exec))(IFC.embed(_, IFRS.close)).flatMap(unrolled)
def results(ps: PreparedStatement): Stream[ConnectionIO, A] = {
Stream.bracket(IFC.embed(ps, exec))(IFC.embed(_, IFRS.close)).flatMap(unrolled)
}

preparedStatement.flatMap(results)

}

private def runPreExecWithLogging[A](connio: ConnectionIO[A], loggingInfo: LoggingInfo): ConnectionIO[A] = {
connio.onError { case e =>
// Duration is zero because we haven't actually gotten to "executing the SQL" yet
IFC.performLogging(
LogEvent.execFailure(loggingInfo, Duration.Zero, e)
)
}
}

private def calcDuration(a: Long, b: Long): FiniteDuration = FiniteDuration((a - b).abs, NANOSECONDS)

private def attemptTimed[F[_]: Sync, A](fa: F[A]): F[(Either[Throwable, A], FiniteDuration)] = {
for {
start <- Sync[F].delay(System.nanoTime())
res <- fa.attempt
end <- Sync[F].delay(System.nanoTime())
} yield (res, calcDuration(start, end))
}

/** Construct a prepared statement from the given `sql`, configure it with the given `PreparedStatementIO` action, and
* return results via a `Stream`.
*
* @group Prepared Statements
*/
@deprecated("Use the other stream which supports logging", "1.0.0-RC6")
def stream[A: Read](sql: String, prep: PreparedStatementIO[Unit], chunkSize: Int): Stream[ConnectionIO, A] =
liftStream(chunkSize, IFC.prepareStatement(sql), prep, IFPS.executeQuery)

/** Construct a prepared update statement with the given return columns (and readable destination type `A`) and sql
* source, configure it with the given `PreparedStatementIO` action, and return the generated key results via a
* `Stream`.
*
* @group Prepared Statements
*/
@deprecated(
"Consider using Update#withGeneratedKeys or " +
"doobie.HC.stream if you need more customization",
"1.0.0-RC6")
def updateWithGeneratedKeys[A: Read](cols: List[String])(
sql: String,
prep: PreparedStatementIO[Unit],
chunkSize: Int
): Stream[ConnectionIO, A] =
liftStream(chunkSize, IFC.prepareStatement(sql, cols.toArray), prep, IFPS.executeUpdate *> IFPS.getGeneratedKeys)
liftStream(
chunkSize = chunkSize,
create = IFC.prepareStatement(sql, cols.toArray),
prep = prep,
exec = IFPS.executeUpdate *> IFPS.getGeneratedKeys
)

/** @group Prepared Statements */
def updateManyWithGeneratedKeys[F[_]: Foldable, A: Write, B: Read](cols: List[String])(
@deprecated(
"Consider using Update#updateManyWithGeneratedKeys or " +
"doobie.HC.stream if you need more customization",
"1.0.0-RC6")
def updateManyWithGeneratedKeys[F[_]: Foldable, A: Write, B: Read](
cols: List[String]
)(
sql: String,
prep: PreparedStatementIO[Unit],
fa: F[A],
chunkSize: Int
): Stream[ConnectionIO, B] =
): Stream[ConnectionIO, B] = {
liftStream[B](
chunkSize,
IFC.prepareStatement(sql, cols.toArray),
prep,
IHPS.addBatchesAndExecute(fa) *> IFPS.getGeneratedKeys)
chunkSize = chunkSize,
create = IFC.prepareStatement(sql, cols.toArray),
prep = prep,
exec = IHPS.addBatchesAndExecute(fa) *> IFPS.getGeneratedKeys
)
}

/** @group Transaction Control */
val commit: ConnectionIO[Unit] =
Expand Down
Loading

0 comments on commit a78f0b4

Please sign in to comment.