forked from filodb/FiloDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
RowSource.scala
executable file
·174 lines (141 loc) · 6.13 KB
/
RowSource.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package filodb.coordinator
import akka.actor.{Actor, ActorRef, Cancellable, PoisonPill}
import akka.event.LoggingReceive
import com.typesafe.scalalogging.slf4j.StrictLogging
import org.velvia.filo.RowReader
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import filodb.core._
object RowSource {
case object Start
case object GetMoreRows
case object AllDone
case object CheckCanIngest
case class SetupError(err: ErrorResponse)
case class IngestionErr(msg: String, cause: Option[Throwable] = None)
}
/**
* RowSource is a trait to make it easy to write sources (Actors) for specific
* input methods - eg from HTTP JSON, or CSV, or Kafka, etc.
* It has logic to handle flow control/backpressure.
* It talks to a NodeCoordinatorActor of a FiloDB node. RowSource may be remote.
*
* To start initialization and reading from source, send the Start message.
*
* Backpressure and at least once mechanism: RowSource sends batches of rows to the NodeCoordinator,
* but does not send more than maxUnackedBatches before getting an ack back. As long as it receives
* acks it will keep sending, but will stop once maxUnackedBatches is reached. It then goes into a waiting
* state, waiting for Acks to come back. Once waitingPeriod has elapsed, if it is allowed to ingest again,
* then it will replay unacked messages, hopefully get acks back, and once all messages are acked will go
* back into regular ingestion mode. This ensures that we do not skip any incoming messages.
*/
trait RowSource extends Actor with StrictLogging {
import RowSource._
// Maximum number of unacked batches to push at a time. Used for flow control.
def maxUnackedBatches: Int
def waitingPeriod: FiniteDuration = 5.seconds
def coordinatorActor: ActorRef
// Returns the SetupIngestion message needed for initialization
def getStartMessage(): IngestionCommands.SetupIngestion
def dataset: DatasetRef
def version: Int
// Returns newer batches of rows.
// The seqIDs should be increasing and unique per batch.
def batchIterator: Iterator[(Long, Seq[RowReader])]
// Anything additional to do when we hit end of data and it's all acked, before killing oneself
def allDoneAndGood(): Unit = {}
private var whoStartedMe: Option[ActorRef] = None
private val outstanding = new HashMap[Long, Seq[RowReader]]
import context.dispatcher
def start: Receive = LoggingReceive {
case Start =>
whoStartedMe = Some(sender)
coordinatorActor ! getStartMessage()
case IngestionCommands.IngestionReady =>
self ! GetMoreRows
logger.info(s" ==> Setup is all done, starting ingestion...")
context.become(reading)
case e: ErrorResponse =>
whoStartedMe.foreach(_ ! SetupError(e))
}
def reading: Receive = LoggingReceive {
case GetMoreRows if batchIterator.hasNext => sendRows()
case GetMoreRows =>
if (outstanding.isEmpty) { finish() }
else {
logger.info(s" ==> (${self.path.name}) doneReading: outstanding seqIds = ${outstanding.keys}")
schedule(waitingPeriod, CheckCanIngest)
context.become(doneReading)
}
case IngestionCommands.Ack(lastSequenceNo) =>
if (outstanding contains lastSequenceNo) outstanding.remove(lastSequenceNo)
case IngestionCommands.UnknownDataset =>
whoStartedMe.foreach(_ ! IngestionErr("Ingestion actors shut down, check error logs"))
case CheckCanIngest =>
}
def waitingAck: Receive = LoggingReceive {
case IngestionCommands.Ack(lastSequenceNo) =>
if (outstanding contains lastSequenceNo) outstanding.remove(lastSequenceNo)
if (outstanding.isEmpty) {
logger.debug(s" ==> reading, all unacked messages acked")
self ! GetMoreRows
context.become(reading)
}
}
def waiting: Receive = waitingAck orElse replay
val replay: Receive = LoggingReceive {
case CheckCanIngest =>
logger.debug(s"Checking if dataset $dataset can ingest...")
coordinatorActor ! IngestionCommands.CheckCanIngest(dataset, version)
case IngestionCommands.CanIngest(can) =>
if (can) {
logger.debug(s"Yay, we're allowed to ingest again! Replaying unacked messages")
outstanding.foreach { case (seqId, batch) =>
coordinatorActor ! IngestionCommands.IngestRows(dataset, version, batch, seqId)
}
}
logger.debug(s"Scheduling another CheckCanIngest in case any unacked messages left")
schedule(waitingPeriod, CheckCanIngest)
case IngestionCommands.UnknownDataset =>
whoStartedMe.foreach(_ ! IngestionErr("Ingestion actors shut down, check worker error logs"))
case t: Throwable =>
whoStartedMe.foreach(_ ! IngestionErr(t.getMessage, Some(t)))
case e: ErrorResponse =>
whoStartedMe.foreach(_ ! IngestionErr(e.toString))
}
def doneReadingAck: Receive = LoggingReceive {
case IngestionCommands.Ack(lastSequenceNo) =>
if (outstanding contains lastSequenceNo) outstanding.remove(lastSequenceNo)
if (outstanding.isEmpty) finish()
}
// If we don't get acks back, need to keep trying
def doneReading: Receive = doneReadingAck orElse replay
// Gets the next batch of data from the batchIterator, then determine if we can get more rows
// or need to wait.
def sendRows(): Unit = {
val (nextSeqId, nextBatch) = batchIterator.next
outstanding(nextSeqId) = nextBatch
coordinatorActor ! IngestionCommands.IngestRows(dataset, version, nextBatch, nextSeqId)
// Go get more rows
if (outstanding.size < maxUnackedBatches) {
self ! GetMoreRows
} else {
logger.debug(s" ==> waiting: outstanding seqIds = ${outstanding.keys}")
schedule(waitingPeriod, CheckCanIngest)
context.become(waiting)
}
}
private var scheduledTask: Option[Cancellable] = None
def schedule(delay: FiniteDuration, msg: Any): Unit = {
val task = context.system.scheduler.scheduleOnce(delay, self, msg)
scheduledTask = Some(task)
}
def finish(): Unit = {
logger.info(s"(${self.path.name}) Ingestion is all done")
allDoneAndGood()
whoStartedMe.foreach(_ ! AllDone)
scheduledTask.foreach(_.cancel)
self ! PoisonPill
}
val receive = start
}