Skip to content

Commit

Permalink
Generic pagination based on seek (#99)
Browse files Browse the repository at this point in the history
Builds on `seek`-support added in #100 

## Pagination with `ClientCursor`
On top of `seek` I managed to write a completely generic pagination mechanism. It works for any query, including when you join many tables together. It also works with most SQL expressions, as long as they can be compared in PG. There is a new DSL for this, built on top of the normal one. It's not built directly in, because JSON is used, and the SQL DSL shouldn't have a dependency on a JSON library.

It looks like this:
```scala
    // import a facade for the pagination DSL for your chosen JSON library
    import PaginationQueryZioJson.*

    businessentityRepo.select
      .where(_.businessentityid < rows.last.businessentityid)
      // first ordering
      .seekPaginationOn(_.modifieddate.desc)
      // add a second ordering. supports any sql expression that can be sorted
      .andOn(x => (x.businessentityid.underlying - 2).asc)
      // `continueFrom` is where you plug in a cursor you got from the client to continue
      .toChunk(limit = limit, continueFrom = None)
      .tap { case (_, maybeClientCursor) =>
        ZIO.succeed(println(maybeClientCursor.map(clientCursor => JsonEncoder[ClientCursor[Json]].encodeJson(clientCursor))))
      }

```
prints:
```
Some({"businessentity1.modifieddate":"2020-12-29T00:00:00","(businessentity1.businessentityid - 2::INTEGER)":1})
```

## Uses `seek`
In the implementation you can see that it's built on top of `seek`:
```scala
    continueFrom match {
      case None =>
        val newQuery = initialCursor.part2s
          .foldLeft(query) { case (q, part2: ServerCursor.Part2[Fields, Row, t, n, E]) =>
            q.orderBy(part2.part1.v)
          }
          .limit(limit)
        Right((newQuery, initialCursor))

      case Some(clientCursor) =>
        initialCursor.withTupleFrom(clientCursor).map { cursor =>
          val newQuery = cursor.part3s
            .foldLeft(query) { case (q, part3: ServerCursor.Part3[Fields, Row, _, _, E]) =>
              q.seek(part3.part2.part1.v)(part3.value)
            }
            .limit(limit)
          (newQuery, cursor)
        }
    }
```


### Properties
I'll copy in the documentation for `ClientCursor` to describe the properties of the cursor:
```scala
/** This will typically be JSON encoded and passed to clients.
  *
  * It represents a cursor that can be used to fetch the next page of results.
  *
  * The position is a given row for a set of [[order by]] expressions.
  *
  * The [[ClientCursor]] itself is a glorified [[Map]], with pairs of stringified `order by` expressions and a JSON representation of the corresponding value from the current row
  *
  * The design has a few interesting tradeoffs:
  *   - it leaks database column names to the client, so you may want to obfuscate/encrypt it
  *   - it can be re-used for "similar" queries, not just the exact same query. Fewer `order by` expressions or different ordering of `order by` expressions is fine.
  *   - [[SortOrderRepr]] does not encode ascending/descending or nulls first/last, but you're still anchored to a specific row for a set of orderings. If you want, you can change your query to go
  *     both ways from a given cursor.
  */
case class ClientCursor[E](parts: Map[SortOrderRepr, E]) 
```
  • Loading branch information
oyvindberg authored May 20, 2024
1 parent 742a7b1 commit fcb1a96
Show file tree
Hide file tree
Showing 18 changed files with 817 additions and 2 deletions.
11 changes: 11 additions & 0 deletions typo-dsl-anorm/src/scala/typo/dsl/SqlExpr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ object SqlExpr {
}
}

object Const {
trait As[T, N[_], R] {
def apply(value: N[T]): SqlExpr.Const[T, N, R]
}

object As {
implicit def as[T, N[_], R](implicit T: ToParameterValue[N[T]], P: ParameterMetaData[T]): As[T, N, R] =
(value: N[T]) => SqlExpr.Const(value, T, P)
}
}

case class ArrayIndex[T, N1[_], N2[_], R](arr: SqlExpr[Array[T], N1, R], idx: SqlExpr[Int, N2, R], N: Nullability2[N1, N2, Option]) extends SqlExpr[T, Option, R] {
override def eval(row: R): Option[T] = {
N.mapN(arr.eval(row), idx.eval(row)) { (arr, idx) =>
Expand Down
24 changes: 24 additions & 0 deletions typo-dsl-anorm/src/scala/typo/dsl/pagination/SortOrderRepr.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package typo.dsl.pagination

import typo.dsl.SortOrderNoHkt

import java.util.concurrent.atomic.AtomicInteger

/** A client cursor is inherently tied to a set of sort orderings.
*
* As such we encode them in the cursor itself, and verify them on the way in.
*/
case class SortOrderRepr(expr: String) extends AnyVal

object SortOrderRepr {
def from[NT, R](x: SortOrderNoHkt[NT, R]): SortOrderRepr = {
// note `x.expr`! the value is independent of ascending/descending and nulls first/last
val fragment = x.expr.render(new AtomicInteger(0))
// todo: deconstructing the sql string and replacing `?` with the value would yield a more readable result
val sql = fragment.params match {
case Nil => fragment.sql
case nonEmpty => fragment.sql + ":" + nonEmpty.map(_.value.show).mkString(",")
}
SortOrderRepr(sql.trim)
}
}
10 changes: 10 additions & 0 deletions typo-dsl-doobie/src/scala/typo/dsl/SqlExpr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ object SqlExpr {
fr"${W.toFragment(value)}$cast"
}
}
object Const {
trait As[T, N[_], R] {
def apply(value: N[T]): SqlExpr.Const[T, N, R]
}

object As {
implicit def as[T, N[_], R](implicit P: Put[T], W: Write[N[T]]): As[T, N, R] =
(value: N[T]) => SqlExpr.Const(value, P, W)
}
}

case class ArrayIndex[T, N1[_], N2[_], R](arr: SqlExpr[Array[T], N1, R], idx: SqlExpr[Int, N2, R], N: Nullability2[N1, N2, Option]) extends SqlExpr[T, Option, R] {
override def eval(row: R): Option[T] = {
Expand Down
23 changes: 23 additions & 0 deletions typo-dsl-doobie/src/scala/typo/dsl/pagination/SortOrderRepr.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package typo.dsl.pagination

import typo.dsl.SortOrderNoHkt

import java.util.concurrent.atomic.AtomicInteger

/** A client cursor is inherently tied to a set of sort orderings.
*
* As such we encode them in the cursor itself, and verify them on the way in.
*/
case class SortOrderRepr(expr: String) extends AnyVal

object SortOrderRepr {
def from[NT, R](x: SortOrderNoHkt[NT, R]): SortOrderRepr = {
val internals = x.expr.render(new AtomicInteger(0)).internals
// todo: deconstructing the sql string and replacing `?` with the value would yield a more readable result
val sql = internals.elements match {
case Nil => internals.sql
case nonEmpty => internals.sql + ":" + nonEmpty.mkString(",")
}
SortOrderRepr(sql.trim)
}
}
7 changes: 7 additions & 0 deletions typo-dsl-shared/typo/dsl/pagination/AbstractJsonCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package typo.dsl.pagination

/** abstract over json libraries */
class AbstractJsonCodec[T, E](
val encode: T => E,
val decode: E => Either[String, T]
)
17 changes: 17 additions & 0 deletions typo-dsl-shared/typo/dsl/pagination/ClientCursor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package typo.dsl.pagination

/** This will typically be JSON encoded and passed to clients.
*
* It represents a cursor that can be used to fetch the next page of results.
*
* The position is a given row for a set of [[order by]] expressions.
*
* The [[ClientCursor]] itself is a glorified [[Map]], with pairs of stringified `order by` expressions and a JSON representation of the corresponding value from the current row
*
* The design has a few interesting tradeoffs:
* - it leaks database column names to the client, so you may want to obfuscate/encrypt it
* - it can be re-used for "similar" queries, not just the exact same query. Fewer `order by` expressions or different ordering of `order by` expressions is fine.
* - [[SortOrderRepr]] does not encode ascending/descending or nulls first/last, but you're still anchored to a specific row for a set of orderings. If you want, you can change your query to go
* both ways from a given cursor.
*/
case class ClientCursor[E](parts: Map[SortOrderRepr, E])
49 changes: 49 additions & 0 deletions typo-dsl-shared/typo/dsl/pagination/PaginationQuery.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package typo.dsl.pagination

import typo.dsl.*

import java.util.concurrent.atomic.AtomicInteger

/** You'll typically use this behind a facade which accounts for your json library of choice.
*
* If you don't, you'll just have to fill in [[AbstractJsonCodec]] yourself.
*/
class PaginationQuery[Fields[_], Row, E](
query: SelectBuilder[Fields, Row],
part1s: List[ServerCursor.Part1NoHkt[Fields, Row, ?, E]]
) {
def andOn[T, N[_]](v: Fields[Row] => SortOrder[T, N, Row])(codec: AbstractJsonCodec[N[T], E])(implicit asConst: SqlExpr.Const.As[T, N, Row]): PaginationQuery[Fields, Row, E] =
new PaginationQuery(query, part1s :+ new ServerCursor.Part1(v, codec, asConst))

def done(limit: Int, continueFrom: Option[ClientCursor[E]]): Either[String, (SelectBuilder[Fields, Row], ServerCursor[Fields, Row, E])] = {
val initialCursor = {
// hack: improve DSL to avoid instantiating structure twice
val structure: Structure[Fields, Row] = query match {
case mock: SelectBuilderMock[Fields, Row] => mock.structure
case sql: SelectBuilderSql[Fields, Row] => sql.instantiate(new AtomicInteger(0)).structure
case _ => sys.error("unsupported query type")
}
new ServerCursor.Initial(part1s.map(_.asPart2(structure.fields)), limit)
}

continueFrom match {
case None =>
val newQuery = initialCursor.part2s
.foldLeft(query) { case (q, part2: ServerCursor.Part2[Fields, Row, t, n, E] @unchecked /* for 2.13 */ ) =>
q.orderBy(part2.part1.v.asInstanceOf[Fields[Hidden] => SortOrder[t, n, Hidden]])
}
.limit(limit)
Right((newQuery, initialCursor))

case Some(clientCursor) =>
initialCursor.withTupleFrom(clientCursor).map { cursor =>
val newQuery = cursor.part3s
.foldLeft(query) { case (q, part3: ServerCursor.Part3[Fields, Row, _, _, E] @unchecked /* for 2.13 */ ) =>
q.seek(part3.part2.part1.v)(part3.value)
}
.limit(limit)
(newQuery, cursor)
}
}
}
}
88 changes: 88 additions & 0 deletions typo-dsl-shared/typo/dsl/pagination/ServerCursor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package typo.dsl.pagination

import typo.dsl.{SortOrder, SqlExpr}
import typo.dsl.pagination.internal.ListOps

sealed trait ServerCursor[Fields[_], Row, E] {
def part2s: List[ServerCursor.Part2NoHkt[Fields, Row, ?, E]]
val limit: Int

final def withNewResults(rows: Seq[Row]): Option[ServerCursor.AtTuple[Fields, Row, E]] =
if (rows.length < limit) None // no new cursor if we know we reached end
else rows.lastOption.map(lastRow => new ServerCursor.AtTuple(part2s.map(_.asPart3(lastRow)), limit))

final def withTupleFrom(clientCursor: ClientCursor[E]): Either[String, ServerCursor.AtTuple[Fields, Row, E]] =
part2s.traverse(_.decodeDataFrom(clientCursor)).map(parts => new ServerCursor.AtTuple(parts, limit))
}

object ServerCursor {
class Initial[Fields[_], Row, E](val part2s: List[Part2NoHkt[Fields, Row, ?, E]], val limit: Int) extends ServerCursor[Fields, Row, E]

class AtTuple[Fields[_], Row, E](val part3s: List[Part3NoHkt[Fields, Row, ?, E]], val limit: Int) extends ServerCursor[Fields, Row, E] {
def part2s: List[Part2NoHkt[Fields, Row, ?, E]] = part3s.map(_.part2)
def clientCursor: ClientCursor[E] = ClientCursor(part3s.map(_.encoded).toMap)
}

/** A constituent in a server-side cursor.
*
* At this point we have a function from `Fields` to a [[SortOrder]], making it dependant on the *shape* of the query, but not an *instance* of a query.
*
* You very likely don't have to worry about this, as it is used internally.
*/
class Part1[Fields[_], Row, T, N[_], E](
val v: Fields[Row] => SortOrder[T, N, Row],
val codec: AbstractJsonCodec[N[T], E],
val asConst: SqlExpr.Const.As[T, N, Row]
) extends Part1NoHkt[Fields, Row, N[T], E] {
def asPart2(fields: Fields[Row]): ServerCursor.Part2[Fields, Row, T, N, E] =
new ServerCursor.Part2(this, v(fields))
}
sealed trait Part1NoHkt[Fields[_], Row, NT, E] { // for scala 2.13
def asPart2(fields: Fields[Row]): ServerCursor.Part2NoHkt[Fields, Row, NT, E]
}

/** A constituent in a server-side cursor.
*
* At this point we have extracted the [[SortOrder]] from a query, making the value somewhat depending on it. That dependency affects the rendering of columns only
*
* The benefit is that we can now render it to a [[SortOrderRepr]], so we can encode it in a [[ClientCursor]]
*/
class Part2[Fields[_], Row, T, N[_], E](val part1: Part1[Fields, Row, T, N, E], val sortOrder: SortOrder[T, N, Row]) extends Part2NoHkt[Fields, Row, N[T], E] {
val sortOrderRepr: SortOrderRepr = SortOrderRepr.from(sortOrder)

override def asPart3(row: Row): Part3[Fields, Row, T, N, E] =
new ServerCursor.Part3[Fields, Row, T, N, E](this, part1.asConst(sortOrder.expr.eval(row)))

override def decodeDataFrom(clientCursor: ClientCursor[E]): Either[String, Part3[Fields, Row, T, N, E]] =
clientCursor.parts.get(sortOrderRepr) match {
case None => Left(s"Cursor is missing value for column '${sortOrderRepr.expr}'")
case Some(value) =>
part1.codec.decode(value) match {
case Left(msg) =>
Left(s"Cursor had invalid value '$value' for column '${sortOrderRepr.expr}': $msg")
case Right(nt) =>
Right(new ServerCursor.Part3[Fields, Row, T, N, E](this, part1.asConst(nt)))
}
}
}
sealed trait Part2NoHkt[Fields[_], Row, NT, E] { // for scala 2.13
def asPart3(row: Row): Part3NoHkt[Fields, Row, NT, E]
def decodeDataFrom(clientCursor: ClientCursor[E]): Either[String, Part3NoHkt[Fields, Row, NT, E]]
}

/** A constituent in a server-side cursor.
*
* At this point there is also data, essentially a column reference plus a value. This makes it an actual cursor.
*
* The data is wrapped in [[SqlExpr.Const]] so it's ready to embed into a query without threading through the type class instances
*/
class Part3[Fields[_], Row, T, N[_], E](val part2: Part2[Fields, Row, T, N, E], val value: SqlExpr.Const[T, N, Row]) extends Part3NoHkt[Fields, Row, N[T], E] {
override def encoded: (SortOrderRepr, E) =
(part2.sortOrderRepr, part2.part1.codec.encode(value.value))
}

sealed trait Part3NoHkt[Fields[_], Row, NT, E] { // for scala 2.13
val part2: Part2NoHkt[Fields, Row, ?, E]
def encoded: (SortOrderRepr, E)
}
}
18 changes: 18 additions & 0 deletions typo-dsl-shared/typo/dsl/pagination/internal.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package typo.dsl.pagination

object internal {
// avoid cats dependency
implicit class ListOps[T](private val list: List[T]) extends AnyVal {
def traverse[U](f: T => Either[String, U]): Either[String, List[U]] = {
val it = list.iterator
val result = List.newBuilder[U]
while (it.hasNext) {
f(it.next()) match {
case Left(e) => return Left(e)
case Right(u) => result += u
}
}
Right(result.result())
}
}
}
11 changes: 11 additions & 0 deletions typo-dsl-zio-jdbc/src/scala/typo/dsl/SqlExpr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ object SqlExpr {
override def render(counter: AtomicInteger): SqlFragment = sql"${E.encode(value)}" ++ s"::${P.sqlType}"
}

object Const {
trait As[T, N[_], R] {
def apply(value: N[T]): SqlExpr.Const[T, N, R]
}

object As {
implicit def as[T, N[_], R](implicit E: JdbcEncoder[N[T]], P: ParameterMetaData[T]): As[T, N, R] =
(value: N[T]) => SqlExpr.Const(value, E, P)
}
}

final case class ArrayIndex[T, N1[_], N2[_], R](arr: SqlExpr[Array[T], N1, R], idx: SqlExpr[Int, N2, R], N: Nullability2[N1, N2, Option]) extends SqlExpr[T, Option, R] {
override def eval(row: R): Option[T] = {
N.mapN(arr.eval(row), idx.eval(row)) { (arr, idx) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package typo.dsl.pagination

import typo.dsl.SortOrderNoHkt
import zio.jdbc.SqlFragment

import java.util.concurrent.atomic.AtomicInteger

/** A client cursor is inherently tied to a set of sort orderings.
*
* As such we encode them in the cursor itself, and verify them on the way in.
*/
case class SortOrderRepr(expr: String) extends AnyVal

object SortOrderRepr {
def from[NT, R](x: SortOrderNoHkt[NT, R]): SortOrderRepr = {
val sql = new StringBuilder()

// no usable way to just get the sql without parameters in zio-jdbc :(
def recAdd(x: SqlFragment): Unit =
x.segments.foreach {
case SqlFragment.Segment.Empty => ()
case SqlFragment.Segment.Syntax(value) => sql.append(value)
case SqlFragment.Segment.Param(value, _) => sql.append(value)
case SqlFragment.Segment.Nested(sql) => recAdd(sql)
}

// note `x.expr`! the value is independent of ascending/descending and nulls first/last
recAdd(x.expr.render(new AtomicInteger(0)))

SortOrderRepr(sql.result().trim)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package adventureworks

import play.api.libs.json.*
import typo.dsl.pagination.*
import typo.dsl.{SelectBuilder, SortOrder, SqlExpr}

import java.sql.Connection

class PaginationQueryCirce[Fields[_], Row](underlying: PaginationQuery[Fields, Row, JsValue]) {
def andOn[T, N[_]](v: Fields[Row] => SortOrder[T, N, Row])(implicit
e: Writes[N[T]],
d: Reads[N[T]],
asConst: SqlExpr.Const.As[T, N, Row]
): PaginationQueryCirce[Fields, Row] =
new PaginationQueryCirce(underlying.andOn(v)(PaginationQueryCirce.abstractCodec)(asConst))

def done(limit: Int, continueFrom: Option[ClientCursor[JsValue]]): Either[String, (SelectBuilder[Fields, Row], ServerCursor[Fields, Row, JsValue])] =
underlying.done(limit, continueFrom)

def toList(limit: Int, continueFrom: Option[ClientCursor[JsValue]])(implicit c: Connection): (List[Row], Option[ClientCursor[JsValue]]) =
underlying.done(limit, continueFrom) match {
case Left(msg) =>
throw new IllegalArgumentException(msg)
case Right((newQuery, cursor)) =>
val rows = newQuery.toList
(rows, cursor.withNewResults(rows).map(_.clientCursor))
}
}

object PaginationQueryCirce {
implicit val clientCursorEncoder: Writes[ClientCursor[JsValue]] =
implicitly[Writes[Map[String, JsValue]]].contramap(_.parts.map { case (k, v) => (k.expr, v) }.toMap)
implicit val clientCursorDecoder: Reads[ClientCursor[JsValue]] =
implicitly[Reads[Map[String, JsValue]]].map(parts => ClientCursor(parts.map { case (k, v) => (SortOrderRepr(k), v) }))

implicit class PaginationQuerySyntax[Fields[_], Row](private val query: SelectBuilder[Fields, Row]) extends AnyVal {
def seekPaginationOn[T, N[_]](v: Fields[Row] => SortOrder[T, N, Row])(implicit
e: Writes[N[T]],
d: Reads[N[T]],
asConst: SqlExpr.Const.As[T, N, Row]
): PaginationQueryCirce[Fields, Row] =
new PaginationQueryCirce(new PaginationQuery(query, Nil).andOn(v)(PaginationQueryCirce.abstractCodec))
}

def abstractCodec[N[_], T](implicit e: Writes[N[T]], d: Reads[N[T]]): AbstractJsonCodec[N[T], JsValue] =
new AbstractJsonCodec[N[T], JsValue](e.writes, json => d.reads(json).asEither.left.map(_.toString())) // todo: improve error message
}
Loading

0 comments on commit fcb1a96

Please sign in to comment.