Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala 2.13 Reactive/Akka Streams #926

Open
TRReeve opened this issue Mar 24, 2021 · 1 comment
Open

Scala 2.13 Reactive/Akka Streams #926

TRReeve opened this issue Mar 24, 2021 · 1 comment

Comments

@TRReeve
Copy link

TRReeve commented Mar 24, 2021

Hi there.

What was the status of the reactive streams connector for Scala 2.13? From reading the docs there was a dependency issue? I am working on a system where we have to load a load of data into a cluster and then I need to dump it back out. For now I will be create a Source with Akka streams and pagination (or some flavour of this) to get things moving but if there is work already existing maybe I could see If what I work on could be generalised and added in a PR to return a table Model as a reactive Publisher (and maybe a source too?)? I know I'd prefer it if I can roll some sort of reactive interface into phantom rather than managing our own or having both phantom and alpakka dependencies and their own dependencies on datastax connector. I'm a little intimidated by the macros and reflection aspects to Phantom but I'm not sure how much I'd need to touch that for creating a Source from pagination.

Thanks so much for all the work on Phantom.

All the Best

@TRReeve
Copy link
Author

TRReeve commented Mar 26, 2021

I have a naive implementation here that we're starting with. the main issue I have is I'm not sure how we can avoid the blocking calls. I was having a play with some system where it makes non-blocking calls when the buffer is below a certain level but

a) More than one request in flight and we risk adding to the buffer out of order and/or screwing with the page state on the next call.
b) Requesting non-blocking requests with a condition of being below a certain time means a request would be made on every pull until we have a response and the buffer is refilled meaning excessive requests.
c) It would get a little convoluted to try to make it fetch only on certain intervals and it would still be down to chance that we have the data available in the buffer by the time the request has completed and the downstream requests the next element.

For now this is with Akka Streams but if it's not completely dumb It could probably be done with reactivestreams as a Publisher and then Akka can build from that itself.

class CassandraAkkaSource[
  T <: CassandraTable[T, R],
  R,
  Limit <: LimitBound,
  Order <: OrderBound,
  Status <: ConsistencyBound,
  Chain <: WhereBound,
  PS <: HList](
                selectQuery: SelectQuery[T, R, Limit, Order, Status, Chain, PS],
                fetchSize: Option[Int] = None)
              (implicit
               ec: ExecutionContext,
               session: Session,
               keySpace: KeySpace)
  extends GraphStage[SourceShape[R]] {

  val batchFetchSize: Int = fetchSize match {
    case Some(value) => value
    case None => session.getCluster.getConfiguration.getQueryOptions.getFetchSize
  }

  val out: Outlet[R] = Outlet("records")

  override def shape: SourceShape[R] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      private var recordsBuffer: mutable.Queue[R] = mutable.Queue()
      private var mutablePagingState: Option[PagingState] = None
      private var mutableExhausted = false

      override def preStart(): Unit = {
        val initRecords = Await.result(selectQuery.paginateRecord(_.setFetchSize(batchFetchSize)), 5.seconds)
        mutablePagingState = initRecords.state
        initRecords.records.foreach(recordsBuffer.enqueue)
      }

      setHandler(out, new OutHandler {

        override def onPull(): Unit = {

          if (recordsBuffer.isEmpty && !mutableExhausted) {
            val fetch = Await.result(selectQuery.paginateRecord(mutablePagingState), 5.seconds)
            // Block to buffer first Record
            recordsBuffer.enqueue(fetch.records.head)

            // Enqueue remaining records in background for potentially large fetch
            Future(fetch.records.drop(1).foreach(recordsBuffer.enqueue))
            mutablePagingState = fetch.state
            if (fetch.result.isExhausted()) {
              mutableExhausted = fetch.result.isExhausted()

            }
          }

          // complete when local buffer is empty and resultset is empty
          if (recordsBuffer.isEmpty && mutableExhausted) {
            complete(out)
          } else {
            push(out, recordsBuffer.dequeue())
          }
        }
      })
    }
}

object CassandraAkkaSource {

  def apply[
    T <: CassandraTable[T, R],
    R,
    Limit <: LimitBound,
    Order <: OrderBound,
    Status <: ConsistencyBound,
    Chain <: WhereBound,
    PS <: HList]
  (selectQuery: SelectQuery[T, R, Limit, Order, Status, Chain, PS],
   fetchSize: Option[Int] = None)
  (implicit ec: ExecutionContext, 
   session: Session, 
   keySpace: KeySpace): Source[R, NotUsed]
  =
    Source.fromGraph(new CassandraAkkaSource(selectQuery, fetchSize)(ec, session, keySpace))
}

This can then be called with

  def streamTable: Source[MyType, NotUsed] = {

    CassandraAkkaSource(this.select)

  }

@TRReeve TRReeve changed the title Scala 2.13 Reactive Streams Scala 2.13 Akka/Reactive Streams Mar 26, 2021
@TRReeve TRReeve changed the title Scala 2.13 Akka/Reactive Streams Scala 2.13 Reactive/Akka Streams Mar 26, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant