Skip to content

Commit

Permalink
Fix logging for streaming and updateMany
Browse files Browse the repository at this point in the history
  • Loading branch information
jatcwang committed Jul 9, 2024
1 parent c870889 commit 48a8d3b
Show file tree
Hide file tree
Showing 15 changed files with 882 additions and 293 deletions.
2 changes: 1 addition & 1 deletion modules/bench/src/main/scala/doobie/bench/text.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class text {

def naive(n: Int): ConnectionIO[Int] =
ddl *> HC.prepareStatement("insert into bench_person (name, age) values (?, ?)")(
people(n).foldRight(HPS.executeBatch)((p, k) => HPS.set(p) *> HPS.addBatch *> k)
people(n).foldRight(HPS.executeBatch)((p, k) => HPS.set(p) *> FPS.addBatch *> k)
).map(_.combineAll)

def optimized(n: Int): ConnectionIO[Int] =
Expand Down
200 changes: 180 additions & 20 deletions modules/core/src/main/scala/doobie/hi/connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package doobie.hi

import cats.Foldable
import cats.data.Ior
import cats.effect.kernel.syntax.monadCancel._
import cats.effect.syntax.monadCancel._
import cats.syntax.all._
import doobie.enumerated.AutoGeneratedKeys
import doobie.enumerated.Holdability
Expand All @@ -19,20 +19,15 @@ import doobie.util.analysis.ColumnMeta
import doobie.util.analysis.ParameterMeta
import doobie.util.compat.propertiesToScala
import doobie.util.stream.repeatEvalChunks
import doobie.util.{ Get, Put, Read, Write }
import doobie.util.log.{LogEvent, LoggingInfo}
import doobie.util.{Get, Put, Read, Write}
import fs2.Stream
import fs2.Stream.{ eval, bracket }
import doobie.hi.{preparedstatement => IHPS}
import doobie.free.{
preparedstatement => IFPS,
connection => IFC,
resultset => IFRS,
databasemetadata => IFDMD,
statement => IFS,
callablestatement => IFCS
}
import doobie.free.{callablestatement => IFCS, connection => IFC, databasemetadata => IFDMD, preparedstatement => IFPS, resultset => IFRS, statement => IFS}

import java.sql.{ Savepoint, PreparedStatement, ResultSet }
import scala.Predef._
import scala.concurrent.duration.{Duration, FiniteDuration, NANOSECONDS}
import java.sql.{PreparedStatement, ResultSet, Savepoint}
import scala.collection.immutable.Map

/**
Expand All @@ -47,14 +42,155 @@ object connection {
def delay[A](a: => A): ConnectionIO[A] =
IFC.delay(a)

private def runPreExecWithLogging[A](connio: ConnectionIO[A], loggingInfo: LoggingInfo): ConnectionIO[A] = {
connio.onError { case e =>
// Duration is zero because we haven't actually gotten to "executing the SQL" yet
IFC.performLogging(
LogEvent.execFailure(loggingInfo, Duration.Zero, e)
)
}
}

private def calcDuration(a: Long, b: Long): FiniteDuration = FiniteDuration((a - b).abs, NANOSECONDS)

def attemptTimed[A](psio: PreparedStatementIO[A]): PreparedStatementIO[(Either[Throwable, A], FiniteDuration)] = {
for {
start <- IFPS.delay(System.nanoTime())
res <- psio.attempt
end <- IFPS.delay(System.nanoTime())
} yield (res, calcDuration(start, end))
}

def executeWithResultSet[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
process: ResultSetIO[A],
loggingInfo: LoggingInfo
): ConnectionIO[A] =
execImpl(
create,
prep,
Right((exec, process)),
loggingInfo
)

def executeWithoutResultSet[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[A],
loggingInfo: LoggingInfo
): ConnectionIO[A] =
execImpl(
create,
prep,
Left(exec),
loggingInfo
)

private def execImpl[A](
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
execAndProcess: Either[PreparedStatementIO[A], (PreparedStatementIO[ResultSet], ResultSetIO[A])],
loggingInfo: LoggingInfo
): ConnectionIO[A] = {
def execAndProcessLogged: PreparedStatementIO[A] = {
execAndProcess match {
case Left(execToResult) =>
attemptTimed(execToResult)
.flatMap {
case (Left(e), execDur) =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e))
.flatMap(_ => IFPS.raiseError(e))
case (Right(a), execDur) =>
IFPS.performLogging(LogEvent.success(loggingInfo, execDur, Duration.Zero))
.as(a)
}
case Right((execIO, resIO)) =>
attemptTimed(execIO)
.flatMap {
case (Left(e), execDur) =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e))
.flatMap(_ => IFPS.raiseError(e))
case (Right(resultSet), execDur) =>
IFPS.pure(resultSet).bracket(
IFPS.embed(_, processLogged(resIO, execDur))
)(
IFPS.embed(_, IFRS.close)
)
}
}
}

def processLogged(
process: ResultSetIO[A],
execDuration: FiniteDuration
): ResultSetIO[A] = {
for {
start <- IFRS.delay(System.nanoTime())
res <- process.attempt
end <- IFRS.delay(System.nanoTime())
processDuration = calcDuration(start, end)
a <- res match {
case Left(e) =>
IFRS.performLogging(LogEvent.processingFailure(loggingInfo, execDuration, processDuration, e))
.flatMap(_ => IFRS.raiseError[A](e))
case Right(a) =>
IFRS.performLogging(LogEvent.success(loggingInfo, execDuration, processDuration))
.as(a)
}
} yield a
}

val createLogged = create.onError { case e =>
IFC.performLogging(LogEvent.execFailure(loggingInfo, Duration.Zero, e))
}

val prepLogged = prep.onError{case e =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, Duration.Zero, e))}

createLogged
.bracket(ps => IFC.embed(ps, prepLogged *> execAndProcessLogged))(ps => IFC.embed(ps, IFPS.close))
}

// FIXME: test PS and RS are correctly closed when there's exec error or processing error
def stream[A: Read](
chunkSize: Int,
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet],
loggingInfo: LoggingInfo
): Stream[ConnectionIO, A] = {
val execLogged: PreparedStatementIO[(ResultSet, FiniteDuration)] =
attemptTimed(exec)
.flatMap {
case (Left(e), execDur) =>
IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e))
.flatMap(_ => IFPS.raiseError(e))
case (Right(resultSet), execDur) =>
IFPS.performLogging(LogEvent.success(loggingInfo, execDur, Duration.Zero))
.as((resultSet, execDur))
}

for {
ps <- Stream.bracket(runPreExecWithLogging(create, loggingInfo))(IFC.embed(_, IFPS.close))
_ <- Stream.eval(runPreExecWithLogging(IFC.embed(ps, IFPS.setFetchSize(chunkSize) *> prep), loggingInfo))
rsAndExecDuration <- Stream.bracket(
IFC.embed(ps, execLogged)
){ case (rs, _) => IFC.embed(rs, IFRS.close) }
(resultSet, execDuration) = rsAndExecDuration
ele <- repeatEvalChunks(IFC.embed(resultSet, resultset.getNextChunk[A](chunkSize)))
} yield ele
}

private def liftStream[A: Read](
chunkSize: Int,
create: ConnectionIO[PreparedStatement],
prep: PreparedStatementIO[Unit],
exec: PreparedStatementIO[ResultSet]): Stream[ConnectionIO, A] = {

def prepared(ps: PreparedStatement): Stream[ConnectionIO, PreparedStatement] =
eval[ConnectionIO, PreparedStatement] {
Stream.eval[ConnectionIO, PreparedStatement] {
val fs = IFPS.setFetchSize(chunkSize)
IFC.embed(ps, fs *> prep).map(_ => ps)
}
Expand All @@ -63,10 +199,11 @@ object connection {
repeatEvalChunks(IFC.embed(rs, resultset.getNextChunk[A](chunkSize)))

val preparedStatement: Stream[ConnectionIO, PreparedStatement] =
bracket(create)(IFC.embed(_, IFPS.close)).flatMap(prepared)
Stream.bracket(create)(IFC.embed(_, IFPS.close)).flatMap(prepared)

def results(ps: PreparedStatement): Stream[ConnectionIO, A] =
bracket(IFC.embed(ps, exec))(IFC.embed(_, IFRS.close)).flatMap(unrolled)
def results(ps: PreparedStatement): Stream[ConnectionIO, A] = {
Stream.bracket(IFC.embed(ps, exec))(IFC.embed(_, IFRS.close)).flatMap(unrolled)
}

preparedStatement.flatMap(results)

Expand All @@ -77,6 +214,7 @@ object connection {
* action, and return results via a `Stream`.
* @group Prepared Statements
*/
@deprecated("Use the other stream which supports logging", "1.0.0-RC6")
def stream[A: Read](sql: String, prep: PreparedStatementIO[Unit], chunkSize: Int): Stream[ConnectionIO, A] =
liftStream(chunkSize, IFC.prepareStatement(sql), prep, IFPS.executeQuery)

Expand All @@ -87,12 +225,34 @@ object connection {
* `Stream`.
* @group Prepared Statements
*/
def updateWithGeneratedKeys[A: Read](cols: List[String])(sql: String, prep: PreparedStatementIO[Unit], chunkSize: Int): Stream[ConnectionIO, A] =
liftStream(chunkSize, IFC.prepareStatement(sql, cols.toArray), prep, IFPS.executeUpdate *> IFPS.getGeneratedKeys)
@deprecated("Consider using Update#withGeneratedKeys or " +
"doobie.HC.stream if you need more customization", "1.0.0-RC6")
def updateWithGeneratedKeys[A: Read](cols: List[String])(sql: String, prep: PreparedStatementIO[Unit], chunkSize: Int): Stream[ConnectionIO, A] =
liftStream(
chunkSize = chunkSize,
create = IFC.prepareStatement(sql, cols.toArray),
prep = prep,
exec = IFPS.executeUpdate *> IFPS.getGeneratedKeys
)

/** @group Prepared Statements */
def updateManyWithGeneratedKeys[F[_]: Foldable, A: Write, B: Read](cols: List[String])(sql: String, prep: PreparedStatementIO[Unit], fa: F[A], chunkSize: Int): Stream[ConnectionIO, B] =
liftStream[B](chunkSize, IFC.prepareStatement(sql, cols.toArray), prep, IHPS.addBatchesAndExecute(fa) *> IFPS.getGeneratedKeys)
@deprecated("Consider using Update#updateManyWithGeneratedKeys or " +
"doobie.HC.stream if you need more customization", "1.0.0-RC6")
def updateManyWithGeneratedKeys[F[_]: Foldable, A: Write, B: Read](
cols: List[String]
)(
sql: String,
prep: PreparedStatementIO[Unit],
fa: F[A],
chunkSize: Int
): Stream[ConnectionIO, B] = {
liftStream[B](
chunkSize = chunkSize,
create = IFC.prepareStatement(sql, cols.toArray),
prep = prep,
exec = IHPS.addBatchesAndExecute(fa) *> IFPS.getGeneratedKeys
)
}

/** @group Transaction Control */
val commit: ConnectionIO[Unit] =
Expand Down
66 changes: 46 additions & 20 deletions modules/core/src/main/scala/doobie/hi/preparedstatement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package doobie.hi

import doobie.enumerated.JdbcType
import doobie.util.{ Get, Put }
import doobie.util.{Get, Put}
import doobie.enumerated.ColumnNullable
import doobie.enumerated.ParameterNullable
import doobie.enumerated.ParameterMode
Expand All @@ -14,27 +14,19 @@ import doobie.enumerated.Nullability.NullabilityKnown
import doobie.enumerated.FetchDirection
import doobie.enumerated.ResultSetConcurrency
import doobie.enumerated.ResultSetType

import doobie.util.{ Read, Write }
import doobie.util.{Read, Write}
import doobie.util.analysis._
import doobie.util.stream.repeatEvalChunks
import doobie.free.{
preparedstatement => IFPS,
resultset => IFRS
}

import doobie.free.{preparedstatement => IFPS, resultset => IFRS}
import doobie.syntax.align._

import java.sql.{ ParameterMetaData, ResultSetMetaData, SQLWarning }

import scala.Predef.{ intArrayOps, intWrapper }

import java.sql.{ParameterMetaData, ResultSetMetaData, SQLWarning}
import scala.Predef.{intArrayOps, intWrapper}
import cats.Foldable
import cats.syntax.all._
import cats.implicits._
import cats.effect.syntax.monadCancel._
import cats.data.Ior
import cats.effect.kernel.syntax.monadCancel._
import fs2.Stream
import fs2.Stream.bracket

/**
* Module of high-level constructors for `PreparedStatementIO` actions. Batching operations are not
Expand All @@ -51,7 +43,7 @@ object preparedstatement {

/** @group Execution */
def stream[A: Read](chunkSize: Int): Stream[PreparedStatementIO, A] =
bracket(IFPS.executeQuery)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize))
Stream.bracket(IFPS.executeQuery)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize))

/**
* Non-strict unit for capturing effects.
Expand All @@ -65,6 +57,7 @@ object preparedstatement {
IFPS.executeBatch.map(_.toIndexedSeq.toList) // intArrayOps does not have `toList` in 2.13

/** @group Batching */
@deprecated("Use doobie.FPS.addBatch instead", "1.0.0-RC6")
val addBatch: PreparedStatementIO[Unit] =
IFPS.addBatch

Expand All @@ -76,33 +69,66 @@ object preparedstatement {
* See [[https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/Statement.html#executeBatch()]] for more information
* @group Batching
*/
@deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" +
"for logging support, or switch to addBatchesAndExecuteUnlogged instead",
"1.0.0-RC6")
def addBatchesAndExecute[F[_]: Foldable, A: Write](fa: F[A]): PreparedStatementIO[Int] =
addBatchesAndExecuteUnlogged(fa)

/**
* Add many sets of parameters and execute as a batch update, returning total rows updated. Note
* that when an error occurred while executing the batch, your JDBC driver may decide to continue executing the
* rest of the batch instead of raising a `BatchUpdateException`.
* Please refer to your JDBC driver's documentation for its exact behaviour.
* See [[https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/Statement.html#executeBatch()]] for more information
* @group Batching
*/
def addBatchesAndExecuteUnlogged[F[_]: Foldable, A: Write](fa: F[A]): PreparedStatementIO[Int] =
fa.toList
.foldRight(executeBatch)((a, b) => set(a) *> addBatch *> b)
.foldRight(IFPS.executeBatch)((a, b) => set(a) *> IFPS.addBatch *> b)
.map(_.foldLeft(0)((acc, n) => acc + (n max 0))) // treat negatives (failures) as no rows updated

/**
* Add many sets of parameters.
* @group Batching
*/
def addBatches[F[_]: Foldable, A: Write](fa: F[A]): PreparedStatementIO[Unit] =
fa.toList.foldRight(().pure[PreparedStatementIO])((a, b) => set(a) *> addBatch *> b)
fa.toList.foldRight(().pure[PreparedStatementIO])((a, b) => set(a) *> IFPS.addBatch *> b)

/** @group Execution */
@deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" +
"for logging support, or switch to addBatchesAndExecuteUnlogged instead",
"1.0.0-RC6")
def executeQuery[A](k: ResultSetIO[A]): PreparedStatementIO[A] =
executeQueryUnlogged(k)

def executeQueryUnlogged[A](k: ResultSetIO[A]): PreparedStatementIO[A] =
IFPS.executeQuery.bracket(s => IFPS.embed(s, k))(s => IFPS.embed(s, IFRS.close))

/** @group Execution */
@deprecated("Use doobie.FPS.executeUpdate instead", "1.0.0-RC6")
val executeUpdate: PreparedStatementIO[Int] =
IFPS.executeUpdate

/** @group Execution */
@deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" +
"for logging support, or switch to executeUpdateWithUniqueGeneratedKeysUnlogged instead",
"1.0.0-RC6")
def executeUpdateWithUniqueGeneratedKeys[A: Read]: PreparedStatementIO[A] =
executeUpdate.flatMap(_ => getUniqueGeneratedKeys[A])
executeUpdateWithUniqueGeneratedKeysUnlogged

def executeUpdateWithUniqueGeneratedKeysUnlogged[A: Read]: PreparedStatementIO[A] =
IFPS.executeUpdate.flatMap(_ => getUniqueGeneratedKeys[A])

/** @group Execution */
@deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" +
"for logging support, or switch to executeUpdateWithUniqueGeneratedKeysUnlogged instead",
"1.0.0-RC6")
def executeUpdateWithGeneratedKeys[A: Read](chunkSize: Int): Stream[PreparedStatementIO, A] =
bracket(IFPS.executeUpdate *> IFPS.getGeneratedKeys)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize))
executeUpdateWithGeneratedKeysUnlogged(chunkSize)

def executeUpdateWithGeneratedKeysUnlogged[A: Read](chunkSize: Int): Stream[PreparedStatementIO, A] =
Stream.bracket(IFPS.executeUpdate *> IFPS.getGeneratedKeys)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize))

/**
* Compute the column `JdbcMeta` list for this `PreparedStatement`.
Expand Down
Loading

0 comments on commit 48a8d3b

Please sign in to comment.