diff --git a/modules/bench/src/main/scala/doobie/bench/text.scala b/modules/bench/src/main/scala/doobie/bench/text.scala index fae452f6d..f604bccaf 100644 --- a/modules/bench/src/main/scala/doobie/bench/text.scala +++ b/modules/bench/src/main/scala/doobie/bench/text.scala @@ -27,7 +27,7 @@ class text { def naive(n: Int): ConnectionIO[Int] = ddl *> HC.prepareStatement("insert into bench_person (name, age) values (?, ?)")( - people(n).foldRight(HPS.executeBatch)((p, k) => HPS.set(p) *> HPS.addBatch *> k) + people(n).foldRight(HPS.executeBatch)((p, k) => HPS.set(p) *> FPS.addBatch *> k) ).map(_.combineAll) def optimized(n: Int): ConnectionIO[Int] = diff --git a/modules/core/src/main/scala/doobie/hi/connection.scala b/modules/core/src/main/scala/doobie/hi/connection.scala index 03dd8a926..c69120be2 100644 --- a/modules/core/src/main/scala/doobie/hi/connection.scala +++ b/modules/core/src/main/scala/doobie/hi/connection.scala @@ -6,7 +6,7 @@ package doobie.hi import cats.Foldable import cats.data.Ior -import cats.effect.kernel.syntax.monadCancel._ +import cats.effect.syntax.monadCancel._ import cats.syntax.all._ import doobie.enumerated.AutoGeneratedKeys import doobie.enumerated.Holdability @@ -19,42 +19,215 @@ import doobie.util.analysis.ColumnMeta import doobie.util.analysis.ParameterMeta import doobie.util.compat.propertiesToScala import doobie.util.stream.repeatEvalChunks -import doobie.util.{ Get, Put, Read, Write } +import doobie.util.log.{LogEvent, LoggingInfo} +import doobie.util.{Get, Put, Read, Write} import fs2.Stream -import fs2.Stream.{ eval, bracket } import doobie.hi.{preparedstatement => IHPS} -import doobie.free.{ - preparedstatement => IFPS, - connection => IFC, - resultset => IFRS, - databasemetadata => IFDMD, - statement => IFS, - callablestatement => IFCS -} +import doobie.free.{callablestatement => IFCS, connection => IFC, databasemetadata => IFDMD, preparedstatement => IFPS, resultset => IFRS, statement => IFS} -import java.sql.{ Savepoint, PreparedStatement, ResultSet } +import scala.Predef._ +import scala.concurrent.duration.{Duration, FiniteDuration, NANOSECONDS} +import java.sql.{PreparedStatement, ResultSet, Savepoint} import scala.collection.immutable.Map /** * Module of high-level constructors for `ConnectionIO` actions. + * * @group Modules */ object connection { + import implicits._ /** @group Lifting */ def delay[A](a: => A): ConnectionIO[A] = IFC.delay(a) + private def runPreExecWithLogging[A](connio: ConnectionIO[A], loggingInfo: LoggingInfo): ConnectionIO[A] = { + connio.onError { case e => + // Duration is zero because we haven't actually gotten to "executing the SQL" yet + IFC.performLogging( + LogEvent.execFailure(loggingInfo, Duration.Zero, e) + ) + } + } + + private def calcDuration(a: Long, b: Long): FiniteDuration = FiniteDuration((a - b).abs, NANOSECONDS) + + private def attemptTimed[A](psio: PreparedStatementIO[A]): PreparedStatementIO[(Either[Throwable, A], FiniteDuration)] = { + for { + start <- IFPS.delay(System.nanoTime()) + res <- psio.attempt + end <- IFPS.delay(System.nanoTime()) + } yield (res, calcDuration(start, end)) + } + + /** + * Create and execute a PreparedStatement and then process the ResultSet. + * This generalized two uses query patterns: + * + * - Execute a normal SELECT query + * - Executing an update and using withGeneratedKeys/RETURNING to return some rows + * + * In both cases, a ResultSet is returned which need to be processed. + * + * Errors at each step are handled and logged, with cleanups (closing the + * PreparedStatement/ResultSet.) + * + * @param create Create the PreparedStatement, using e.g. `doobie.FC.prepareStatement` + * @param prep Prepare steps before execution, such as setting parameters + * @param exec How to execute the PreparedStatment. e.g. `doobie.FPS.executeQuery` + * @param process Process steps for the ResultSet + * @param loggingInfo Information for logging + */ + def executeWithResultSet[A]( + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[ResultSet], + process: ResultSetIO[A], + loggingInfo: LoggingInfo + ): ConnectionIO[A] = + execImpl( + create, + prep, + Right((exec, process)), + loggingInfo + ) + + /** + * Create and execute a PreparedStatement which immediately returns + * the result without reading from a ResultSet. The most common case + * is executing an INSERT/UPDATE and it returning the rows inserted/updated. + * If the query you're executing returns a ResultSet, use `executeWithResultSet` + * instead for better logging and resource cleanup. + * + * Errors at each step are handled and logged, with the PreparedStatement + * being closed at the end. + * + * @param create Create the PreparedStatement, using e.g. `doobie.FC.prepareStatement` + * @param prep Prepare steps before execution, such as setting parameters + * @param exec How to execute the PreparedStatment. e.g. `doobie.FPS.executeQuery` + * @param process Process steps for the ResultSet + * @param loggingInfo Information for logging + */ + def executeWithoutResultSet[A]( + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[A], + loggingInfo: LoggingInfo + ): ConnectionIO[A] = + execImpl( + create, + prep, + Left(exec), + loggingInfo + ) + + private def execImpl[A]( + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + execAndProcess: Either[PreparedStatementIO[A], (PreparedStatementIO[ResultSet], ResultSetIO[A])], + loggingInfo: LoggingInfo + ): ConnectionIO[A] = { + def execAndProcessLogged: PreparedStatementIO[A] = { + execAndProcess match { + case Left(execToResult) => + attemptTimed(execToResult) + .flatMap { + case (Left(e), execDur) => + IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e)) + .flatMap(_ => IFPS.raiseError(e)) + case (Right(a), execDur) => + IFPS.performLogging(LogEvent.success(loggingInfo, execDur, Duration.Zero)) + .as(a) + } + case Right((execIO, resIO)) => + attemptTimed(execIO) + .flatMap { + case (Left(e), execDur) => + IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e)) + .flatMap(_ => IFPS.raiseError(e)) + case (Right(resultSet), execDur) => + IFPS.pure(resultSet).bracket( + IFPS.embed(_, processLogged(resIO, execDur)) + )( + IFPS.embed(_, IFRS.close) + ) + } + } + } + + def processLogged( + process: ResultSetIO[A], + execDuration: FiniteDuration + ): ResultSetIO[A] = { + for { + start <- IFRS.delay(System.nanoTime()) + res <- process.attempt + end <- IFRS.delay(System.nanoTime()) + processDuration = calcDuration(start, end) + a <- res match { + case Left(e) => + IFRS.performLogging(LogEvent.processingFailure(loggingInfo, execDuration, processDuration, e)) + .flatMap(_ => IFRS.raiseError[A](e)) + case Right(a) => + IFRS.performLogging(LogEvent.success(loggingInfo, execDuration, processDuration)) + .as(a) + } + } yield a + } + + val createLogged = create.onError { case e => + IFC.performLogging(LogEvent.execFailure(loggingInfo, Duration.Zero, e)) + } + + val prepLogged = prep.onError { case e => + IFPS.performLogging(LogEvent.execFailure(loggingInfo, Duration.Zero, e)) + } + + createLogged + .bracket(ps => IFC.embed(ps, prepLogged *> execAndProcessLogged))(ps => IFC.embed(ps, IFPS.close)) + } + + // FIXME: test PS and RS are correctly closed when there's exec error or processing error + def stream[A: Read]( + chunkSize: Int, + create: ConnectionIO[PreparedStatement], + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[ResultSet], + loggingInfo: LoggingInfo + ): Stream[ConnectionIO, A] = { + val execLogged: PreparedStatementIO[(ResultSet, FiniteDuration)] = + attemptTimed(exec) + .flatMap { + case (Left(e), execDur) => + IFPS.performLogging(LogEvent.execFailure(loggingInfo, execDur, e)) + .flatMap(_ => IFPS.raiseError(e)) + case (Right(resultSet), execDur) => + IFPS.performLogging(LogEvent.success(loggingInfo, execDur, Duration.Zero)) + .as((resultSet, execDur)) + } + + for { + ps <- Stream.bracket(runPreExecWithLogging(create, loggingInfo))(IFC.embed(_, IFPS.close)) + _ <- Stream.eval(runPreExecWithLogging(IFC.embed(ps, IFPS.setFetchSize(chunkSize) *> prep), loggingInfo)) + rsAndExecDuration <- Stream.bracket( + IFC.embed(ps, execLogged) + ) { case (rs, _) => IFC.embed(rs, IFRS.close) } + (resultSet, execDuration) = rsAndExecDuration + ele <- repeatEvalChunks(IFC.embed(resultSet, resultset.getNextChunk[A](chunkSize))) + } yield ele + } + private def liftStream[A: Read]( chunkSize: Int, create: ConnectionIO[PreparedStatement], - prep: PreparedStatementIO[Unit], - exec: PreparedStatementIO[ResultSet]): Stream[ConnectionIO, A] = { + prep: PreparedStatementIO[Unit], + exec: PreparedStatementIO[ResultSet]): Stream[ConnectionIO, A] = { def prepared(ps: PreparedStatement): Stream[ConnectionIO, PreparedStatement] = - eval[ConnectionIO, PreparedStatement] { + Stream.eval[ConnectionIO, PreparedStatement] { val fs = IFPS.setFetchSize(chunkSize) IFC.embed(ps, fs *> prep).map(_ => ps) } @@ -63,10 +236,11 @@ object connection { repeatEvalChunks(IFC.embed(rs, resultset.getNextChunk[A](chunkSize))) val preparedStatement: Stream[ConnectionIO, PreparedStatement] = - bracket(create)(IFC.embed(_, IFPS.close)).flatMap(prepared) + Stream.bracket(create)(IFC.embed(_, IFPS.close)).flatMap(prepared) - def results(ps: PreparedStatement): Stream[ConnectionIO, A] = - bracket(IFC.embed(ps, exec))(IFC.embed(_, IFRS.close)).flatMap(unrolled) + def results(ps: PreparedStatement): Stream[ConnectionIO, A] = { + Stream.bracket(IFC.embed(ps, exec))(IFC.embed(_, IFRS.close)).flatMap(unrolled) + } preparedStatement.flatMap(results) @@ -75,8 +249,10 @@ object connection { /** * Construct a prepared statement from the given `sql`, configure it with the given `PreparedStatementIO` * action, and return results via a `Stream`. + * * @group Prepared Statements */ + @deprecated("Use the other stream which supports logging", "1.0.0-RC6") def stream[A: Read](sql: String, prep: PreparedStatementIO[Unit], chunkSize: Int): Stream[ConnectionIO, A] = liftStream(chunkSize, IFC.prepareStatement(sql), prep, IFPS.executeQuery) @@ -85,14 +261,37 @@ object connection { * type `A`) and sql source, configure it with the given `PreparedStatementIO` action, and return * the generated key results via a * `Stream`. + * * @group Prepared Statements */ + @deprecated("Consider using Update#withGeneratedKeys or " + + "doobie.HC.stream if you need more customization", "1.0.0-RC6") def updateWithGeneratedKeys[A: Read](cols: List[String])(sql: String, prep: PreparedStatementIO[Unit], chunkSize: Int): Stream[ConnectionIO, A] = - liftStream(chunkSize, IFC.prepareStatement(sql, cols.toArray), prep, IFPS.executeUpdate *> IFPS.getGeneratedKeys) + liftStream( + chunkSize = chunkSize, + create = IFC.prepareStatement(sql, cols.toArray), + prep = prep, + exec = IFPS.executeUpdate *> IFPS.getGeneratedKeys + ) /** @group Prepared Statements */ - def updateManyWithGeneratedKeys[F[_]: Foldable, A: Write, B: Read](cols: List[String])(sql: String, prep: PreparedStatementIO[Unit], fa: F[A], chunkSize: Int): Stream[ConnectionIO, B] = - liftStream[B](chunkSize, IFC.prepareStatement(sql, cols.toArray), prep, IHPS.addBatchesAndExecute(fa) *> IFPS.getGeneratedKeys) + @deprecated("Consider using Update#updateManyWithGeneratedKeys or " + + "doobie.HC.stream if you need more customization", "1.0.0-RC6") + def updateManyWithGeneratedKeys[F[_] : Foldable, A: Write, B: Read]( + cols: List[String] + )( + sql: String, + prep: PreparedStatementIO[Unit], + fa: F[A], + chunkSize: Int + ): Stream[ConnectionIO, B] = { + liftStream[B]( + chunkSize = chunkSize, + create = IFC.prepareStatement(sql, cols.toArray), + prep = prep, + exec = IHPS.addBatchesAndExecute(fa) *> IFPS.getGeneratedKeys + ) + } /** @group Transaction Control */ val commit: ConnectionIO[Unit] = diff --git a/modules/core/src/main/scala/doobie/hi/preparedstatement.scala b/modules/core/src/main/scala/doobie/hi/preparedstatement.scala index 602a1d41e..c458179d8 100644 --- a/modules/core/src/main/scala/doobie/hi/preparedstatement.scala +++ b/modules/core/src/main/scala/doobie/hi/preparedstatement.scala @@ -5,7 +5,7 @@ package doobie.hi import doobie.enumerated.JdbcType -import doobie.util.{ Get, Put } +import doobie.util.{Get, Put} import doobie.enumerated.ColumnNullable import doobie.enumerated.ParameterNullable import doobie.enumerated.ParameterMode @@ -14,27 +14,19 @@ import doobie.enumerated.Nullability.NullabilityKnown import doobie.enumerated.FetchDirection import doobie.enumerated.ResultSetConcurrency import doobie.enumerated.ResultSetType - -import doobie.util.{ Read, Write } +import doobie.util.{Read, Write} import doobie.util.analysis._ import doobie.util.stream.repeatEvalChunks -import doobie.free.{ - preparedstatement => IFPS, - resultset => IFRS -} - +import doobie.free.{preparedstatement => IFPS, resultset => IFRS} import doobie.syntax.align._ -import java.sql.{ ParameterMetaData, ResultSetMetaData, SQLWarning } - -import scala.Predef.{ intArrayOps, intWrapper } - +import java.sql.{ParameterMetaData, ResultSetMetaData, SQLWarning} +import scala.Predef.{intArrayOps, intWrapper} import cats.Foldable -import cats.syntax.all._ +import cats.implicits._ +import cats.effect.syntax.monadCancel._ import cats.data.Ior -import cats.effect.kernel.syntax.monadCancel._ import fs2.Stream -import fs2.Stream.bracket /** * Module of high-level constructors for `PreparedStatementIO` actions. Batching operations are not @@ -51,7 +43,7 @@ object preparedstatement { /** @group Execution */ def stream[A: Read](chunkSize: Int): Stream[PreparedStatementIO, A] = - bracket(IFPS.executeQuery)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize)) + Stream.bracket(IFPS.executeQuery)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize)) /** * Non-strict unit for capturing effects. @@ -65,6 +57,7 @@ object preparedstatement { IFPS.executeBatch.map(_.toIndexedSeq.toList) // intArrayOps does not have `toList` in 2.13 /** @group Batching */ + @deprecated("Use doobie.FPS.addBatch instead", "1.0.0-RC6") val addBatch: PreparedStatementIO[Unit] = IFPS.addBatch @@ -76,9 +69,23 @@ object preparedstatement { * See [[https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/Statement.html#executeBatch()]] for more information * @group Batching */ + @deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" + + "for logging support, or switch to addBatchesAndExecuteUnlogged instead", + "1.0.0-RC6") def addBatchesAndExecute[F[_]: Foldable, A: Write](fa: F[A]): PreparedStatementIO[Int] = + addBatchesAndExecuteUnlogged(fa) + + /** + * Add many sets of parameters and execute as a batch update, returning total rows updated. Note + * that when an error occurred while executing the batch, your JDBC driver may decide to continue executing the + * rest of the batch instead of raising a `BatchUpdateException`. + * Please refer to your JDBC driver's documentation for its exact behaviour. + * See [[https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/Statement.html#executeBatch()]] for more information + * @group Batching + */ + def addBatchesAndExecuteUnlogged[F[_]: Foldable, A: Write](fa: F[A]): PreparedStatementIO[Int] = fa.toList - .foldRight(executeBatch)((a, b) => set(a) *> addBatch *> b) + .foldRight(IFPS.executeBatch)((a, b) => set(a) *> IFPS.addBatch *> b) .map(_.foldLeft(0)((acc, n) => acc + (n max 0))) // treat negatives (failures) as no rows updated /** @@ -86,23 +93,42 @@ object preparedstatement { * @group Batching */ def addBatches[F[_]: Foldable, A: Write](fa: F[A]): PreparedStatementIO[Unit] = - fa.toList.foldRight(().pure[PreparedStatementIO])((a, b) => set(a) *> addBatch *> b) + fa.toList.foldRight(().pure[PreparedStatementIO])((a, b) => set(a) *> IFPS.addBatch *> b) /** @group Execution */ + @deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" + + "for logging support, or switch to addBatchesAndExecuteUnlogged instead", + "1.0.0-RC6") def executeQuery[A](k: ResultSetIO[A]): PreparedStatementIO[A] = + executeQueryUnlogged(k) + + def executeQueryUnlogged[A](k: ResultSetIO[A]): PreparedStatementIO[A] = IFPS.executeQuery.bracket(s => IFPS.embed(s, k))(s => IFPS.embed(s, IFRS.close)) /** @group Execution */ + @deprecated("Use doobie.FPS.executeUpdate instead", "1.0.0-RC6") val executeUpdate: PreparedStatementIO[Int] = IFPS.executeUpdate /** @group Execution */ + @deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" + + "for logging support, or switch to executeUpdateWithUniqueGeneratedKeysUnlogged instead", + "1.0.0-RC6") def executeUpdateWithUniqueGeneratedKeys[A: Read]: PreparedStatementIO[A] = - executeUpdate.flatMap(_ => getUniqueGeneratedKeys[A]) + executeUpdateWithUniqueGeneratedKeysUnlogged + + def executeUpdateWithUniqueGeneratedKeysUnlogged[A: Read]: PreparedStatementIO[A] = + IFPS.executeUpdate.flatMap(_ => getUniqueGeneratedKeys[A]) /** @group Execution */ + @deprecated("Consider using doobie.HC.execute{With/Without}ResultSet" + + "for logging support, or switch to executeUpdateWithUniqueGeneratedKeysUnlogged instead", + "1.0.0-RC6") def executeUpdateWithGeneratedKeys[A: Read](chunkSize: Int): Stream[PreparedStatementIO, A] = - bracket(IFPS.executeUpdate *> IFPS.getGeneratedKeys)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize)) + executeUpdateWithGeneratedKeysUnlogged(chunkSize) + + def executeUpdateWithGeneratedKeysUnlogged[A: Read](chunkSize: Int): Stream[PreparedStatementIO, A] = + Stream.bracket(IFPS.executeUpdate *> IFPS.getGeneratedKeys)(IFPS.embed(_, IFRS.close)).flatMap(unrolled[A](_, chunkSize)) /** * Compute the column `JdbcMeta` list for this `PreparedStatement`. diff --git a/modules/core/src/main/scala/doobie/util/package.scala b/modules/core/src/main/scala/doobie/util/package.scala index 164cde0b2..4ba41f78d 100644 --- a/modules/core/src/main/scala/doobie/util/package.scala +++ b/modules/core/src/main/scala/doobie/util/package.scala @@ -12,4 +12,5 @@ package object util { private[util] def void(a: Any*): Unit = { val _ = a } + } diff --git a/modules/core/src/main/scala/doobie/util/query.scala b/modules/core/src/main/scala/doobie/util/query.scala index e1d65a82e..5f885bdc4 100644 --- a/modules/core/src/main/scala/doobie/util/query.scala +++ b/modules/core/src/main/scala/doobie/util/query.scala @@ -7,18 +7,12 @@ package doobie.util import cats._ import cats.arrow.Profunctor import cats.data.NonEmptyList -import cats.syntax.all._ -import cats.effect.kernel.syntax.monadCancel._ import doobie._ -import doobie.implicits._ import doobie.util.analysis.Analysis import doobie.util.compat.FactoryCompat -import doobie.util.log.{ LogEvent, ExecFailure, ProcessingFailure, Success } +import doobie.util.log.{Arguments, LoggingInfo} import doobie.util.pos.Pos -import doobie.free.{ - preparedstatement => IFPS, - resultset => IFRS, -} +import doobie.free.{preparedstatement => IFPS, connection => IFC} import doobie.hi.{ connection => IHC, preparedstatement => IHPS, @@ -26,8 +20,7 @@ import doobie.hi.{ } import fs2.Stream -import scala.Predef.longWrapper -import scala.concurrent.duration.{ FiniteDuration, NANOSECONDS } + import scala.collection.immutable.Map import doobie.util.MultiVersionTypeSupport.=:= @@ -46,37 +39,6 @@ object query { protected implicit val write: Write[A] protected implicit val read: Read[B] - private val now: PreparedStatementIO[Long] = - IFPS.delay(System.nanoTime) - - // Equivalent to IHPS.executeQuery(k) but with logging - private def executeQuery[T](a: A, k: ResultSetIO[T]): PreparedStatementIO[T] = { - val args = write.toList(a) - def diff(a: Long, b: Long) = FiniteDuration((a - b).abs, NANOSECONDS) - def log(e: LogEvent): PreparedStatementIO[Unit] = - for { - _ <- IFPS.performLogging(e) - } yield () - - for { - t0 <- now - eet <- IFPS.executeQuery.flatMap(rs => (for { - t1 <- now - et <- IFPS.embed(rs, k).attempt - t2 <- now - } yield (t1, et, t2)).guarantee(IFPS.embed(rs, IFRS.close))).attempt - tuple <- eet.liftTo[PreparedStatementIO].onError { case e => - for { - t1 <- now - _ <- log(ExecFailure(sql, args, label, diff(t1, t0), e)) - } yield () - } - (t1, et, t2) = tuple - t <- et.liftTo[PreparedStatementIO].onError { case e => log(ProcessingFailure(sql, args, label, diff(t1, t0), diff(t2, t1), e)) } - _ <- log(Success(sql, args, label, diff(t1, t0), diff(t2, t1))) - } yield t - } - /** * The SQL string. * @group Diagnostics @@ -129,7 +91,17 @@ object query { * @group Results */ def streamWithChunkSize(a: A, chunkSize: Int): Stream[ConnectionIO, B] = - IHC.stream[B](sql, IHPS.set(a), chunkSize) + IHC.stream( + chunkSize = chunkSize, + create = IFC.prepareStatement(sql), + prep = IHPS.set(a), + exec = IFPS.executeQuery, + loggingInfo = LoggingInfo( + sql = sql, + args = Arguments.NonBatch(Write[A].toList(a)), + label = label + ) + ) /** * Apply the argument `a` to construct a `Stream` with `DefaultChunkSize`, with @@ -139,15 +111,16 @@ object query { */ def stream(a: A): Stream[ConnectionIO, B] = streamWithChunkSize(a, DefaultChunkSize) - + /** * Apply the argument `a` to construct a program in *`[[doobie.free.connection.ConnectionIO ConnectionIO]]` yielding an `F[B]` accumulated * via the provided `CanBuildFrom`. This is the fastest way to accumulate a collection. * @group Results */ - def to[F[_]](a: A)(implicit f: FactoryCompat[B, F[B]]): ConnectionIO[F[B]] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeQuery(a, IHRS.build[F,B])) + def to[F[_]](a: A)(implicit f: FactoryCompat[B, F[B]]): ConnectionIO[F[B]] = { + toConnectionIO(a, IHRS.build[F, B]) + } /** * Apply the argument `a` to construct a program in @@ -157,7 +130,10 @@ object query { * @group Results */ def toMap[K, V](a: A)(implicit ev: B =:= (K, V), f: FactoryCompat[(K, V), Map[K, V]]): ConnectionIO[Map[K, V]] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeQuery(a, IHRS.buildPair[Map, K, V](f, read.map(ev)))) + toConnectionIO( + a, + IHRS.buildPair[Map, K, V](f, read.map(ev)) + ) /** * Apply the argument `a` to construct a program in @@ -166,7 +142,7 @@ object query { * @group Results */ def accumulate[F[_]: Alternative](a: A): ConnectionIO[F[B]] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeQuery(a, IHRS.accumulate[F, B])) + toConnectionIO(a, IHRS.accumulate[F, B]) /** * Apply the argument `a` to construct a program in @@ -175,8 +151,8 @@ object query { * @group Results */ def unique(a: A): ConnectionIO[B] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeQuery(a, IHRS.getUnique[B])) - + toConnectionIO(a, IHRS.getUnique[B]) + /** * Apply the argument `a` to construct a program in * `[[doobie.free.connection.ConnectionIO ConnectionIO]]` yielding an optional `B` and @@ -184,7 +160,7 @@ object query { * @group Results */ def option(a: A): ConnectionIO[Option[B]] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeQuery(a, IHRS.getOption[B])) + toConnectionIO(a, IHRS.getOption[B]) /** * Apply the argument `a` to construct a program in @@ -193,8 +169,25 @@ object query { * @group Results */ def nel(a: A): ConnectionIO[NonEmptyList[B]] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeQuery(a, IHRS.nel[B])) + toConnectionIO(a, IHRS.nel[B]) + + private def toConnectionIO[C](a: A, rsio: ResultSetIO[C]): ConnectionIO[C] = { + IHC.executeWithResultSet( + create = IFC.prepareStatement(sql), + prep = IHPS.set(a), + exec = IFPS.executeQuery, + process = rsio, + loggingInfo = mkLoggingInfo(a) + ) + } + private def mkLoggingInfo(a: A): LoggingInfo = + LoggingInfo( + sql = sql, + args = Arguments.NonBatch(write.toList(a)), + label = label + ) + /** @group Transformations */ def map[C](f: B => C): Query[A, C] = new Query[A, C] { diff --git a/modules/core/src/main/scala/doobie/util/update.scala b/modules/core/src/main/scala/doobie/util/update.scala index 73dea4774..868badd45 100644 --- a/modules/core/src/main/scala/doobie/util/update.scala +++ b/modules/core/src/main/scala/doobie/util/update.scala @@ -9,16 +9,13 @@ import cats.syntax.all._ import doobie._ import doobie.implicits._ import doobie.util.analysis.Analysis -import doobie.util.log.{ Success, ExecFailure, LogEvent } import doobie.util.pos.Pos -import doobie.free.{preparedstatement => IFPS} -import doobie.hi.{ - connection => IHC, - preparedstatement => IHPS, -} +import doobie.free.{connection => IFC, preparedstatement => IFPS} +import doobie.hi.{connection => IHC, preparedstatement => IHPS, resultset => IHRS} +import doobie.util.log.{Arguments, LoggingInfo} import fs2.Stream -import scala.Predef.longWrapper -import scala.concurrent.duration.{ FiniteDuration, NANOSECONDS } + +import scala.Predef.genericArrayOps /** Module defining updates parameterized by input type. */ object update { @@ -44,27 +41,6 @@ object update { // Contravariant coyoneda trick for A protected implicit val write: Write[A] - private val now: PreparedStatementIO[Long] = - IFPS.delay(System.nanoTime) - - // Equivalent to HPS.executeUpdate(k) but with logging if logHandler is defined - private def executeUpdate[T](a: A): PreparedStatementIO[Int] = { - val args = write.toList(a) - def diff(a: Long, b: Long) = FiniteDuration((a - b).abs, NANOSECONDS) - def log(e: LogEvent): PreparedStatementIO[Unit] = - for { - _ <- IFPS.performLogging(e) - } yield () - - for { - t0 <- now - en <- IFPS.executeUpdate.attempt - t1 <- now - n <- en.liftTo[PreparedStatementIO].onError { case e => log(ExecFailure(sql, args, label, diff(t1, t0), e)) } - _ <- log(Success(sql, args, label, diff(t1, t0), FiniteDuration(0L, NANOSECONDS))) - } yield n - } - /** * The SQL string. * @group Diagnostics @@ -114,9 +90,19 @@ object update { * writable argument `a`. * @group Execution */ - def run(a: A): ConnectionIO[Int] = - IHC.prepareStatement(sql)(IHPS.set(a) *> executeUpdate(a)) - + def run(a: A): ConnectionIO[Int] = { + IHC.executeWithoutResultSet( + IFC.prepareStatement(sql), + IHPS.set(a), + IFPS.executeUpdate, + LoggingInfo( + sql, + Arguments.NonBatch(write.toList(a)), + label + ) + ) + } + /** * Add many sets of parameters and execute as a batch update, returning total rows updated. Note * that when an error occurred while executing the batch, your JDBC driver may decide to continue executing the @@ -126,7 +112,16 @@ object update { * @group Execution */ def updateMany[F[_]: Foldable](fa: F[A]): ConnectionIO[Int] = - IHC.prepareStatement(sql)(IHPS.addBatchesAndExecute(fa)) + IHC.executeWithoutResultSet( + create = IFC.prepareStatement(sql), + prep = fa.foldMap(a => IHPS.set(a) *> IFPS.addBatch), + exec = IFPS.executeBatch.map(updateCounts => updateCounts.foldLeft(0)((acc, n) => acc + (n.max(0)))), + loggingInfo = LoggingInfo( + sql, + Arguments.Batch(() => fa.toList.map(write.toList)), + label + ) + ) /** * Construct a stream that performs a batch update as with `updateMany`, yielding generated @@ -137,7 +132,17 @@ object update { def updateManyWithGeneratedKeys[K](columns: String*): UpdateManyWithGeneratedKeysPartiallyApplied[A, K] = new UpdateManyWithGeneratedKeysPartiallyApplied[A, K] { def withChunkSize[F[_]](as: F[A], chunkSize: Int)(implicit F: Foldable[F], K: Read[K]): Stream[ConnectionIO, K] = - IHC.updateManyWithGeneratedKeys[List,A,K](columns.toList)(sql, IFPS.unit, as.toList, chunkSize) + IHC.stream( + chunkSize = chunkSize, + create = IFC.prepareStatement(sql, columns.toArray), + prep = IHPS.addBatches(as), + exec = IFPS.executeBatch *> IFPS.getGeneratedKeys, + loggingInfo = LoggingInfo( + sql, + Arguments.Batch(() => as.toList.map(Write[A].toList)), + label + ) + ) } /** @@ -156,8 +161,18 @@ object update { * @group Execution */ def withGeneratedKeysWithChunkSize[K: Read](columns: String*)(a: A, chunkSize: Int): Stream[ConnectionIO, K] = - IHC.updateWithGeneratedKeys[K](columns.toList)(sql, IHPS.set(a), chunkSize) - + IHC.stream( + chunkSize = chunkSize, + create = IFC.prepareStatement(sql, columns.toArray), + prep = IHPS.set(a), + exec = IFPS.executeUpdate *> IFPS.getGeneratedKeys, + loggingInfo = LoggingInfo( + sql = sql, + args = Arguments.NonBatch(Write[A].toList(a)), + label = label + ) + ) + /** * Construct a program that performs the update, yielding a single set of generated keys of * readable type `K`, identified by the specified columns, given a writable argument `a`. @@ -165,7 +180,17 @@ object update { * @group Execution */ def withUniqueGeneratedKeys[K: Read](columns: String*)(a: A): ConnectionIO[K] = - IHC.prepareStatementS(sql, columns.toList)(IHPS.set(a) *> IHPS.executeUpdateWithUniqueGeneratedKeys) + IHC.executeWithResultSet( + create = IFC.prepareStatement(sql, columns.toArray), + prep = IHPS.set(a), + exec = IFPS.executeUpdate *> IFPS.getGeneratedKeys, + process = IHRS.getUnique, + loggingInfo = LoggingInfo( + sql, + Arguments.NonBatch(write.toList(a)), + label + ) + ) /** * Update is a contravariant functor. diff --git a/modules/core/src/test/scala-2/doobie/util/LogSuitePlatform.scala b/modules/core/src/test/scala-2/doobie/util/QueryLogSuitePlatform.scala similarity index 59% rename from modules/core/src/test/scala-2/doobie/util/LogSuitePlatform.scala rename to modules/core/src/test/scala-2/doobie/util/QueryLogSuitePlatform.scala index 6e6e77f17..f4c1d9910 100644 --- a/modules/core/src/test/scala-2/doobie/util/LogSuitePlatform.scala +++ b/modules/core/src/test/scala-2/doobie/util/QueryLogSuitePlatform.scala @@ -4,17 +4,17 @@ package doobie.util -import doobie.util.log.{ Success, ProcessingFailure } +import doobie.util.log.{Arguments, ProcessingFailure, Success} import shapeless._ -trait LogSuitePlatform { self: LogSuite => +trait QueryLogSuitePlatform { self: QueryLogSuite => import doobie.generic.auto._ test("[Query] n-arg success") { val Sql = "select 1 where ? = ?" val Arg = 1 :: 1 :: HNil eventForUniqueQuery(Sql, Arg) match { - case Success(Sql, List(1, 1), _, _, _) => () + case Success(Sql, Arguments.NonBatch(List(1, 1)), _, _, _) => () case a => fail(s"no match: $a") } } @@ -23,16 +23,7 @@ trait LogSuitePlatform { self: LogSuite => val Sql = "select 1 where ? = ?" val Arg = 1 :: 2 :: HNil eventForUniqueQuery(Sql, Arg) match { - case ProcessingFailure(Sql, List(1, 2), _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Update] n-arg success") { - val Sql = "update foo set bar = ?" - val Arg = 42 :: HNil - eventForUniqueUpdate(Sql, Arg) match { - case Success(Sql, List(42), _, _, _) => () + case ProcessingFailure(Sql, Arguments.NonBatch(List(1, 2)), _, _, _, _) => () case a => fail(s"no match: $a") } } diff --git a/modules/core/src/test/scala-3/doobie/util/LogSuitePlatform.scala b/modules/core/src/test/scala-3/doobie/util/QueryLogSuitePlatform.scala similarity index 58% rename from modules/core/src/test/scala-3/doobie/util/LogSuitePlatform.scala rename to modules/core/src/test/scala-3/doobie/util/QueryLogSuitePlatform.scala index cfe1d4725..f3c29c27e 100644 --- a/modules/core/src/test/scala-3/doobie/util/LogSuitePlatform.scala +++ b/modules/core/src/test/scala-3/doobie/util/QueryLogSuitePlatform.scala @@ -4,16 +4,16 @@ package doobie.util -import doobie.util.log.{ Success, ProcessingFailure } +import doobie.util.log.{Arguments, Success, ProcessingFailure } -trait LogSuitePlatform { self: LogSuite => +trait QueryLogSuitePlatform { self: QueryLogSuite => import doobie.generic.auto._ test("[Query] n-arg success") { val Sql = "select 1 where ? = ?" val Arg = 1 *: 1 *: EmptyTuple eventForUniqueQuery(Sql, Arg) match { - case Success(Sql, List(1, 1), _, _, _) => () + case Success(Sql, Arguments.NonBatch(List(1, 1)), _, _, _) => () case a => fail(s"no match: $a") } } @@ -22,16 +22,7 @@ trait LogSuitePlatform { self: LogSuite => val Sql = "select 1 where ? = ?" val Arg = 1 *: 2 *: EmptyTuple eventForUniqueQuery(Sql, Arg) match { - case ProcessingFailure(Sql, List(1, 2), _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Update] n-arg success") { - val Sql = "update foo set bar = ?" - val Arg = 42 *: EmptyTuple - eventForUniqueUpdate(Sql, Arg) match { - case Success(Sql, List(42), _, _, _) => () + case ProcessingFailure(Sql, Arguments.NonBatch(List(1, 2)), _, _, _, _) => () case a => fail(s"no match: $a") } } diff --git a/modules/core/src/test/scala/doobie/util/LogSuite.scala b/modules/core/src/test/scala/doobie/util/LogSuite.scala deleted file mode 100644 index 1bb361245..000000000 --- a/modules/core/src/test/scala/doobie/util/LogSuite.scala +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (c) 2013-2020 Rob Norris and Contributors -// This software is licensed under the MIT License (MIT). -// For more information see LICENSE or https://opensource.org/licenses/MIT - -package doobie.util - -import cats.syntax.all._ -import cats.effect.{IO, IOLocal} -import doobie._ -import doobie.implicits._ -import doobie.util.log.{LogEvent, ProcessingFailure, Success} - -class LogSuite extends munit.FunSuite { - - import cats.effect.unsafe.implicits.global - - val ioLocal: IOLocal[LogEvent] = - IOLocal[LogEvent](null).unsafeRunSync() - - val xa = Transactor.fromDriverManager[IO]( - "org.h2.Driver", - "jdbc:h2:mem:queryspec;DB_CLOSE_DELAY=-1", - "sa", "", - logHandler = Some(ev => ioLocal.set(ev)) - ) - - def eventForCIO[A](cio: ConnectionIO[A]): LogEvent = - cio.transact(xa).attempt.flatMap(_ => ioLocal.get).unsafeRunSync() - - def eventForUniqueQuery[A: Write](sql: String, arg: A): LogEvent = { - eventForCIO(Query[A, Unit](sql, None).unique(arg)) - } - - def eventForUniqueUpdate[A: Write](sql: String, arg: A): LogEvent = { - val cio = sql"create table if not exists foo (bar integer)".update.run *> - Update[A](sql, None).run(arg) - eventForCIO(cio) - } - - test("[Query] default handler") { - sql"select 1".query[Int] - } - - test("[Query] implicit handler") { - eventForCIO(sql"select 1".query[Int].unique) match { - case Success(_, _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Query] explicit handler") { - eventForCIO(sql"select 1".query[Int].unique) match { - case Success(_, _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Query] zero-arg success") { - val Sql = "select 1" - eventForUniqueQuery(Sql, ()) match { - case Success(Sql, Nil, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Query] zero-arg execution failure".ignore) { - () - } - - test("[Query] n-arg execution failure".ignore) { - () - } - - test("[Query] zero-arg processing failure") { - val Sql = "select 1 where 1 = 2" - eventForUniqueQuery(Sql, ()) match { - case ProcessingFailure(Sql, Nil, _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Update] default handler") { - sql"drop table if exists barf".update - } - - test("[Update] implicit handler") { - val cio = sql"drop table if exists barf".update.run - eventForCIO(cio) match { - case Success(_, _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Update] explicit handler") { - val cio = sql"drop table if exists barf".update.run - eventForCIO(cio) match { - case Success(_, _, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Update] zero-arg success") { - val Sql = "update foo set bar = 42" - eventForUniqueUpdate(Sql, ()) match { - case Success(Sql, Nil, _, _, _) => () - case a => fail(s"no match: $a") - } - } - - test("[Update] zero-arg execution failure".ignore) { - () - } - - test("[Update] n-arg execution failure".ignore) { - () - } - -} diff --git a/modules/core/src/test/scala/doobie/util/QueryLogSuite.scala b/modules/core/src/test/scala/doobie/util/QueryLogSuite.scala new file mode 100644 index 000000000..9086885dc --- /dev/null +++ b/modules/core/src/test/scala/doobie/util/QueryLogSuite.scala @@ -0,0 +1,196 @@ +// Copyright (c) 2013-2020 Rob Norris and Contributors +// This software is licensed under the MIT License (MIT). +// For more information see LICENSE or https://opensource.org/licenses/MIT + +package doobie.util + +import cats.syntax.all._ +import cats.effect.{IO, IOLocal} +import doobie._ +import doobie.implicits._ +import doobie.util.log.Arguments.NonBatch +import doobie.util.log.{Arguments, ExecFailure, LogEvent, ProcessingFailure, Success} + +class QueryLogSuite extends munit.FunSuite with QueryLogSuitePlatform { + + import cats.effect.unsafe.implicits.global + + val ioLocal: IOLocal[LogEvent] = + IOLocal[LogEvent](null).unsafeRunSync() + + val xa = Transactor.fromDriverManager[IO]( + "org.h2.Driver", + "jdbc:h2:mem:queryspec;DB_CLOSE_DELAY=-1", + "sa", "", + logHandler = Some(ev => ioLocal.set(ev)) + ) + + def eventForCIO[A](cio: ConnectionIO[A]): LogEvent = + cio.transact(xa).attempt.flatMap(_ => ioLocal.get).unsafeRunSync() + + def successEventForCIO[A](cio: ConnectionIO[A]): Success = + eventForCIO(cio) match { + case s: Success => s + case other => fail(s"Expected Success log event but got $other") + } + + def execFailureEventForCIO[A](cio: ConnectionIO[A]): ExecFailure = + eventForCIO(cio) match { + case ev: ExecFailure => ev + case other => fail(s"Expected ExecFailure log event but got $other") + } + + def processFailureEventForCIO[A](cio: ConnectionIO[A]): ProcessingFailure = + eventForCIO(cio) match { + case ev: ProcessingFailure => ev + case other => fail(s"Expected ProcessingFailure log event but got $other") + } + + def eventForUniqueQuery[A: Write](sql: String, arg: A): LogEvent = { + eventForCIO(Query[A, Unit](sql, None).unique(arg)) + } + + def eventForUpdate[A: Write](sql: String, arg: A): LogEvent = { + val cio = sql"create table if not exists foo (bar integer)".update.run *> + Update[A](sql, None).run(arg) + eventForCIO(cio) + } + + test("simple") { + val q = sql"select 1, 2".query[(Int, Int)] + val succEvents = List( + successEventForCIO(q.to[List]), + successEventForCIO(q.toMap[Int, Int]), + successEventForCIO(q.accumulate[List]), + successEventForCIO(q.unique), + successEventForCIO(q.option), + successEventForCIO(q.nel), + ) + succEvents.foreach { succ => + assertEquals(succ.sql, "select 1, 2") + assertEquals(succ.args, NonBatch(Nil)) + assertEquals(succ.label, "unlabeled") + } + } + + test("With params and label") { + val q = sql"select ${1}, ${"2"}".queryWithLabel[(Int, String)]("mylabel") + val succEvents = List( + successEventForCIO(q.to[List]), + successEventForCIO(q.toMap[Int, String]), + successEventForCIO(q.accumulate[List]), + successEventForCIO(q.unique), + successEventForCIO(q.option), + successEventForCIO(q.nel), + ) + succEvents.foreach { succ => + assertEquals(succ.sql, "select ?, ?") + assertEquals(succ.args, NonBatch(List(1, "2"))) + assertEquals(succ.label, "mylabel") + assert(succ.exec.toNanos > 0L) + assert(succ.processing.toNanos > 0L) + } + } + + test("execution failure (Error during PreparedStatement construction)") { + val q = sql"select bad_column".query[(Int, String)] + List( + execFailureEventForCIO(q.to[List]), + execFailureEventForCIO(q.toMap[Int, String]), + execFailureEventForCIO(q.accumulate[List]), + execFailureEventForCIO(q.unique), + execFailureEventForCIO(q.option), + execFailureEventForCIO(q.nel), + ).foreach { ev => + assertEquals(ev.sql, "select bad_column") + assertEquals(ev.args, Arguments.nonBatchEmpty) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + } + + test("execution failure") { + val q = Query[String, (Int, String)]("select ? :: Int") + List( + execFailureEventForCIO(q.to[List]("not_int")), + execFailureEventForCIO(q.toMap[Int, String]("not_int")), + execFailureEventForCIO(q.accumulate[List]("not_int")), + execFailureEventForCIO(q.unique("not_int")), + execFailureEventForCIO(q.option("not_int")), + execFailureEventForCIO(q.nel("not_int")), + ).foreach { ev => + assertEquals(ev.sql, "select ? :: Int") + assertEquals(ev.args, NonBatch(List("not_int"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + } + + test("processing failure") { + val q = sql"select 'not_int'".query[(Int, String)] + List( + processFailureEventForCIO(q.to[List]), + processFailureEventForCIO(q.toMap[Int, String]), + processFailureEventForCIO(q.accumulate[List]), + processFailureEventForCIO(q.unique), + processFailureEventForCIO(q.option), + processFailureEventForCIO(q.nel), + ).foreach { ev => + assertEquals(ev.sql, "select 'not_int'") + assertEquals(ev.args, Arguments.nonBatchEmpty) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + } + + test("stream") { + val sql = "select * from values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)" + val succ = successEventForCIO( + Query[Unit, Int](sql).stream(()).compile.toList + ) + assertEquals(succ.sql, sql) + assertEquals(succ.args, NonBatch(Nil)) + assertEquals(succ.label, "unlabeled") + assert(succ.exec.toNanos > 0L) + assertEquals(succ.processing.toNanos, 0L) + } + + test("streamWithChunkSize") { + val sql = "select * from values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)" + val succ = successEventForCIO( + Query[Unit, Int](sql).streamWithChunkSize((), 5).compile.toList + ) + assertEquals(succ.sql, sql) + assertEquals(succ.args, NonBatch(Nil)) + assertEquals(succ.label, "unlabeled") + assert(succ.exec.toNanos > 0L) + assertEquals(succ.processing.toNanos, 0L) + } + + test("stream: Log ExecFailure on failed PreparedStatement construction") { + val q0 = sql"select bad_column".query[Int] + List( + execFailureEventForCIO(q0.stream.compile.toList), + execFailureEventForCIO(q0.streamWithChunkSize(1).compile.toList), + ).foreach { ev => + assertEquals(ev.sql, "select bad_column") + assertEquals(ev.args, Arguments.nonBatchEmpty) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + } + + test("stream: Log ExecFailure on failed PreparedStatement execution") { + val q0 = Query[String, (Int, String)]("select ? :: Int") + List( + execFailureEventForCIO(q0.stream("not_int").compile.toList), + execFailureEventForCIO(q0.streamWithChunkSize("not_int", 1).compile.toList), + ).foreach { ev => + assertEquals(ev.sql, "select ? :: Int") + assertEquals(ev.args, NonBatch(List("not_int"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + } + +} diff --git a/modules/core/src/test/scala/doobie/util/UpdateLogSuite.scala b/modules/core/src/test/scala/doobie/util/UpdateLogSuite.scala new file mode 100644 index 000000000..ad1831535 --- /dev/null +++ b/modules/core/src/test/scala/doobie/util/UpdateLogSuite.scala @@ -0,0 +1,244 @@ +// Copyright (c) 2013-2020 Rob Norris and Contributors +// This software is licensed under the MIT License (MIT). +// For more information see LICENSE or https://opensource.org/licenses/MIT + +package doobie.util + +import cats.effect.{IO, IOLocal} +import cats.syntax.all._ +import doobie._ +import doobie.implicits._ +import doobie.util.log.Arguments.NonBatch +import doobie.util.log._ + +import scala.annotation.nowarn + +@nowarn("msg=.*inferred to be `Any`.*") +class UpdateLogSuite extends munit.FunSuite { + + import cats.effect.unsafe.implicits.global + + val ioLocal: IOLocal[LogEvent] = + IOLocal[LogEvent](null).unsafeRunSync() + + val xa = Transactor.fromDriverManager[IO]( + "org.h2.Driver", + "jdbc:h2:mem:queryspec;DB_CLOSE_DELAY=-1", + "sa", "", + logHandler = Some(ev => ioLocal.set(ev)) + ) + + def eventForCIO[A](cio: ConnectionIO[A]): LogEvent = + ( + sql"create table if not exists foo (c1 integer, c2 varchar)".update.run *> cio + ) + .transact(xa) + .attempt + .flatMap{res => + val _ = res + ioLocal.get + } + .unsafeRunSync() + + def successEventForCIO[A](cio: ConnectionIO[A]): Success = + eventForCIO(cio) match { + case s: Success => s + case other => fail(s"Expected Success log event but got $other") + } + + def execFailureEventForCIO[A](cio: ConnectionIO[A]): ExecFailure = + eventForCIO(cio) match { + case ev: ExecFailure => ev + case other => fail(s"Expected ExecFailure log event but got $other") + } + + def processFailureEventForCIO[A](cio: ConnectionIO[A]): ProcessingFailure = + eventForCIO(cio) match { + case ev: ProcessingFailure => ev + case other => fail(s"Expected ProcessingFailure log event but got $other") + } + + // FIXME: assert exception messages + test("update") { + val cio = sql"INSERT INTO foo VALUES (${1}, ${"str"})".update.run + val ev = successEventForCIO(cio) + assertEquals(ev.sql, "INSERT INTO foo VALUES (?, ?)") + assertEquals(ev.args, NonBatch(List(1, "str"))) + assertEquals(ev.label, "unlabeled") + } + + test("update: Log ExecFailure on failed PreparedStatement construction") { + val ev = execFailureEventForCIO(sql"insert into hm".update.run) + + assertEquals(ev.sql, "insert into hm") + assertEquals(ev.args, Arguments.nonBatchEmpty) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + + test("update: Log ExecFailure on failed PreparedStatement execution") { + val cio = Update[(String, String)]("insert into foo values (?, ?)") + .run(("s", "k")) + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args, NonBatch(List("s", "k"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + + test("updateMany") { + val cio = Update[(Int, Int)]("INSERT INTO foo VALUES (?, ?)", label = "some_label") + .updateMany(List((1,2), (3, 4), (5, 6))) + val ev = successEventForCIO(cio) + assertEquals(ev.sql, "INSERT INTO foo VALUES (?, ?)") + ev.args match { + case NonBatch(_) => fail("Expect batched args") + case Arguments.Batch(argsAsLists) => + assertEquals(argsAsLists(), List(List(1, 2), List(3, 4), List(5, 6))) + } + assertEquals(ev.label, "some_label") + assert(ev.exec.toNanos > 0L) + } + + test("updateMany: Log ExecFailure on failed PreparedStatement construction") { + val ev = execFailureEventForCIO(Update[Int]("insert into hm").updateMany(List(1,2))) + assertEquals(ev.sql, "insert into hm") + assertEquals(ev.args.allArgs, List(List(1), List(2))) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + + test("updateMany Log ExecFailure on failed PreparedStatement execution") { + val cio = Update[(String, String)]("insert into foo values (?, ?)") + .updateMany(List(("s", "k"))) + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args.allArgs, List(List("s", "k"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + + test("updateManyWithGeneratedKeys") { + val cio = Update[(Int, String)]("insert into foo values (?, ?)") + .updateManyWithGeneratedKeys[String]("c1")(List((1, "a"), (2, "b"))) + .compile + .toList + + val ev = successEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args.allArgs, List(List(1, "a"), List(2, "b"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + assertEquals(ev.processing.toNanos, 0L) + } + + test("updateManyWithGeneratedKeys: Log ExecFailure on failed PreparedStatement construction") { + val cio = Update[(Int, String)]("insert into foo") + .updateManyWithGeneratedKeys[String]("c1")(List((1, "a"), (2, "b"))) + .compile + .toList + + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo") + assertEquals(ev.args.allArgs, List(List(1, "a"), List(2, "b"))) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + + test("updateManyWithGeneratedKeys: Log ExecFailure on failed PreparedStatement execution") { + val cio = Update[(String, String)]("insert into foo values (?, ?)") + .updateManyWithGeneratedKeys[String]("c1")(List(("s", "k"))) + .compile + .toList + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args.allArgs, List(List("s", "k"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + + + test("withGeneratedKeys") { + val cio = Update[(Int, String)]("insert into foo values (?, ?)") + .withGeneratedKeys[String]("c1")((1, "a")) + .compile + .toList + + val ev = successEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args, NonBatch(List(1, "a"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + assertEquals(ev.processing.toNanos, 0L) + } + + test("withGeneratedKeys: Log ExecFailure on failed PreparedStatement construction") { + val cio = Update[(Int, String)]("insert into foo") + .withGeneratedKeys[String]("c1")((1, "a")) + .compile + .toList + + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo") + assertEquals(ev.args, NonBatch(List(1, "a"))) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + + test("withGeneratedKeys: Log ExecFailure on failed PreparedStatement execution") { + val cio = Update[(String, String)]("insert into foo values (?, ?)") + .withGeneratedKeys[String]("c1")(("s", "k")) + .compile + .toList + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args, NonBatch(List("s", "k"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + + test("withUniqueGeneratedKeys") { + val cio = Update[(Int, String)]("insert into foo values (?, ?)") + .withUniqueGeneratedKeys[String]("c1")((1, "a")) + + val ev = successEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args, NonBatch(List(1, "a"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + assert(ev.processing.toNanos > 0L) + } + + test("withUniqueGeneratedKeys: Log ExecFailure on failed PreparedStatement construction") { + val cio = Update[(Int, String)]("insert into foo") + .withUniqueGeneratedKeys[String]("c1")((1, "a")) + + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo") + assertEquals(ev.args, NonBatch(List(1, "a"))) + assertEquals(ev.label, "unlabeled") + assertEquals(ev.exec.toNanos, 0L) + } + + test("withUniqueGeneratedKeys: Log ExecFailure on failed PreparedStatement execution") { + val cio = Update[(String, String)]("insert into foo values (?, ?)") + .withUniqueGeneratedKeys[String]("c1")(("s", "k")) + val ev = execFailureEventForCIO(cio) + + assertEquals(ev.sql, "insert into foo values (?, ?)") + assertEquals(ev.args, NonBatch(List("s", "k"))) + assertEquals(ev.label, "unlabeled") + assert(ev.exec.toNanos > 0L) + } + +} diff --git a/modules/example/src/main/scala/example/Dynamic.scala b/modules/example/src/main/scala/example/Dynamic.scala index bca52411b..1ff87980e 100644 --- a/modules/example/src/main/scala/example/Dynamic.scala +++ b/modules/example/src/main/scala/example/Dynamic.scala @@ -40,7 +40,7 @@ object Dynamic extends IOApp.Simple { for { md <- HPS.getMetaData // lots of useful info here cols = (1 to md.getColumnCount).toList - data <- HPS.executeQuery(readAll(cols)) + data <- HPS.executeQueryUnlogged(readAll(cols)) } yield (cols.map(md.getColumnName), data) // Read the specified columns from the resultset. diff --git a/modules/free/src/main/scala/doobie/util/log.scala b/modules/free/src/main/scala/doobie/util/log.scala index 921283ec8..1ea6628f9 100644 --- a/modules/free/src/main/scala/doobie/util/log.scala +++ b/modules/free/src/main/scala/doobie/util/log.scala @@ -8,11 +8,30 @@ import cats.Applicative import cats.effect.Sync import java.util.logging.Logger -import scala.concurrent.duration.{ FiniteDuration => FD } import scala.Predef.augmentString +import scala.concurrent.duration.FiniteDuration /** A module of types and instances for logged statements. */ object log { + + // Wrapper for a few information about a query for logging purposes + final case class LoggingInfo(sql: String, args: Arguments, label: String) + + sealed trait Arguments { + def allArgs: List[List[Any]] ={ + this match { + case arg: Arguments.NonBatch => List(arg.argsAsList) + case arg: Arguments.Batch => arg.argsAsLists() + } + } + } + + object Arguments { + final case class NonBatch(argsAsList: List[Any]) extends Arguments + final case class Batch(argsAsLists: () => List[List[Any]]) extends Arguments + + val nonBatchEmpty: NonBatch = NonBatch(Nil) + } /** * Algebraic type of events that can be passed to a `LogHandler`, both parameterized by the @@ -25,15 +44,57 @@ object log { def sql: String /** The query arguments. */ - def args: List[Any] + def args: Arguments def label: String } - /** @group Events */ final case class Success (sql: String, args: List[Any], label: String, exec: FD, processing: FD ) extends LogEvent - /** @group Events */ final case class ProcessingFailure(sql: String, args: List[Any], label: String, exec: FD, processing: FD, failure: Throwable) extends LogEvent - /** @group Events */ final case class ExecFailure (sql: String, args: List[Any], label: String, exec: FD, failure: Throwable) extends LogEvent + /** @group Events */ final case class Success (sql: String, args: Arguments, label: String, exec: FiniteDuration, processing: FiniteDuration ) extends LogEvent + /** @group Events */ final case class ProcessingFailure(sql: String, args: Arguments, label: String, exec: FiniteDuration, processing: FiniteDuration, failure: Throwable) extends LogEvent + /** @group Events */ final case class ExecFailure (sql: String, args: Arguments, label: String, exec: FiniteDuration, failure: Throwable) extends LogEvent + + object LogEvent { + def success( + info: LoggingInfo, + execDuration: FiniteDuration, + processDuration: FiniteDuration + ): Success = Success( + sql = info.sql, + args = info.args, + label = info.label, + exec = execDuration, + processing = processDuration + ) + + def processingFailure( + info: LoggingInfo, + execDuration: FiniteDuration, + processDuration: FiniteDuration, + error: Throwable + ): ProcessingFailure = + ProcessingFailure( + sql = info.sql, + args = info.args, + label = info.label, + exec = execDuration, + processing = processDuration, + failure = error + ) + + def execFailure( + info: LoggingInfo, + execDuration: FiniteDuration, + error: Throwable + ): ExecFailure = + ExecFailure( + sql = info.sql, + args = info.args, + label = info.label, + exec = execDuration, + failure = error + ) + } /** * Provides additional processing for Doobie `LogEvent`s. @@ -67,32 +128,40 @@ object log { override def run(logEvent: LogEvent): M[Unit] = Sync[M].delay( logEvent match { case Success(s, a, l, e1, e2) => + val argsStr = a match { + case nonBatch: Arguments.NonBatch => s"[${nonBatch.argsAsList.mkString(", ")}]" + case _: Arguments.Batch => "" + } jdkLogger.info(s"""Successful Statement Execution: | | ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")} | - | arguments = [${a.mkString(", ")}] + | arguments = $argsStr | label = $l | elapsed = ${e1.toMillis.toString} ms exec + ${e2.toMillis.toString} ms processing (${(e1 + e2).toMillis.toString} ms total) """.stripMargin) case ProcessingFailure(s, a, l, e1, e2, t) => + val argsStr = a.allArgs.map(thisArgs => thisArgs.mkString("(", ", ", ")")) + .mkString("[", ", ", "]") jdkLogger.severe(s"""Failed Resultset Processing: | | ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")} | - | arguments = [${a.mkString(", ")}] + | arguments = $argsStr | label = $l | elapsed = ${e1.toMillis.toString} ms exec + ${e2.toMillis.toString} ms processing (failed) (${(e1 + e2).toMillis.toString} ms total) | failure = ${t.getMessage} """.stripMargin) case ExecFailure(s, a, l, e1, t) => + val argsStr = a.allArgs.map(thisArgs => thisArgs.mkString("(", ", ", ")")) + .mkString("[", ", ", "]") jdkLogger.severe(s"""Failed Statement Execution: | | ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")} | - | arguments = [${a.mkString(", ")}] + | arguments = $argsStr | label = $l | elapsed = ${e1.toMillis.toString} ms exec (failed) | failure = ${t.getMessage} diff --git a/modules/log4cats/src/main/scala/doobie/log4cats/Log4CatsDebuggingLogHandler.scala b/modules/log4cats/src/main/scala/doobie/log4cats/Log4CatsDebuggingLogHandler.scala index df050c754..c31bb2390 100644 --- a/modules/log4cats/src/main/scala/doobie/log4cats/Log4CatsDebuggingLogHandler.scala +++ b/modules/log4cats/src/main/scala/doobie/log4cats/Log4CatsDebuggingLogHandler.scala @@ -16,39 +16,50 @@ import org.typelevel.log4cats._ */ class Log4CatsDebuggingLogHandler[F[_]](logger: MessageLogger[F]) extends LogHandler[F]{ override def run(logEvent: LogEvent): F[Unit] = logEvent match { - case Success(s, a, l, e1, e2) => + case Success(s, a, l, e1, e2) => { + val argsStr = a match { + case nonBatch: Arguments.NonBatch => s"[${nonBatch.argsAsList.mkString(", ")}]" + case _: Arguments.Batch => "" + } logger.info( s"""Successful Statement Execution: | | ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")} | - | arguments = [${a.mkString(", ")}] + | arguments = $argsStr | label = $l | elapsed = ${e1.toMillis.toString} ms exec + ${e2.toMillis.toString} ms processing (${(e1 + e2).toMillis.toString} ms total) """.stripMargin) + } - case ProcessingFailure(s, a, l, e1, e2, t) => + case ProcessingFailure(s, a, l, e1, e2, t) => { + val argsStr = a.allArgs.map(thisArgs => thisArgs.mkString("(", ", ", ")")) + .mkString("[", ", ", "]") logger.warn( s"""Failed Resultset Processing: | | ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")} | - | arguments = [${a.mkString(", ")}] + | arguments = $argsStr | label = $l | elapsed = ${e1.toMillis.toString} ms exec + ${e2.toMillis.toString} ms processing (failed) (${(e1 + e2).toMillis.toString} ms total) | failure = ${t.getMessage} """.stripMargin) + } - case ExecFailure(s, a, l, e1, t) => + case ExecFailure(s, a, l, e1, t) => { + val argsStr = a.allArgs.map(thisArgs => thisArgs.mkString("(", ", ", ")")) + .mkString("[", ", ", "]") logger.error( s"""Failed Statement Execution: | | ${s.linesIterator.dropWhile(_.trim.isEmpty).mkString("\n ")} | - | arguments = [${a.mkString(", ")}] + | arguments = $argsStr | label = $l | elapsed = ${e1.toMillis.toString} ms exec (failed) | failure = ${t.getMessage} """.stripMargin) + } } } diff --git a/modules/postgres/src/main/scala/doobie/postgres/hi/connection.scala b/modules/postgres/src/main/scala/doobie/postgres/hi/connection.scala index 89df35ca0..a39522a72 100644 --- a/modules/postgres/src/main/scala/doobie/postgres/hi/connection.scala +++ b/modules/postgres/src/main/scala/doobie/postgres/hi/connection.scala @@ -81,6 +81,6 @@ object connection { // a helper private def execVoid(sql: String): ConnectionIO[Unit] = - HC.prepareStatement(sql)(HPS.executeUpdate).map(_ => ()) + HC.prepareStatement(sql)(FPS.executeUpdate).map(_ => ()) } diff --git a/modules/postgres/src/main/scala/doobie/postgres/syntax/syntax.scala b/modules/postgres/src/main/scala/doobie/postgres/syntax/syntax.scala index 8bfef0388..c966422b1 100644 --- a/modules/postgres/src/main/scala/doobie/postgres/syntax/syntax.scala +++ b/modules/postgres/src/main/scala/doobie/postgres/syntax/syntax.scala @@ -646,7 +646,7 @@ class PostgresExplainQuery0Ops(self: Query0[_]) { */ def explain: ConnectionIO[List[String]] = self.inspect { (sql, prepare) => - HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } /** @@ -657,7 +657,7 @@ class PostgresExplainQuery0Ops(self: Query0[_]) { */ def explainAnalyze: ConnectionIO[List[String]] = self.inspect { (sql, prepare) => - HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } } @@ -670,7 +670,7 @@ class PostgresExplainQueryOps[A](self: Query[A, _]) { */ def explain(a: A): ConnectionIO[List[String]] = { self.inspect(a){ (sql, prepare) => - HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } } @@ -682,7 +682,7 @@ class PostgresExplainQueryOps[A](self: Query[A, _]) { */ def explainAnalyze(a: A): ConnectionIO[List[String]] = self.inspect(a) { (sql, prepare) => - HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } } @@ -694,7 +694,7 @@ class PostgresExplainUpdate0Ops(self: Update0) { */ def explain: ConnectionIO[List[String]] = self.inspect { (sql, prepare) => - HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } /** @@ -705,7 +705,7 @@ class PostgresExplainUpdate0Ops(self: Update0) { */ def explainAnalyze: ConnectionIO[List[String]] = self.inspect { (sql, prepare) => - HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } } @@ -718,7 +718,7 @@ class PostgresExplainUpdateOps[A](self: Update[A]) { */ def explain(a: A): ConnectionIO[List[String]] = { self.inspect(a){ (sql, prepare) => - HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } } @@ -730,7 +730,7 @@ class PostgresExplainUpdateOps[A](self: Update[A]) { */ def explainAnalyze(a: A): ConnectionIO[List[String]] = self.inspect(a) { (sql, prepare) => - HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQuery(HRS.build[List, String])) + HC.prepareStatement(s"EXPLAIN ANALYZE $sql")(prepare *> HPS.executeQueryUnlogged(HRS.build[List, String])) } }