forked from filodb/FiloDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ColumnStoreScanner.scala
executable file
·126 lines (113 loc) · 5.45 KB
/
ColumnStoreScanner.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package filodb.core.store
import com.typesafe.scalalogging.slf4j.StrictLogging
import java.nio.ByteBuffer
import org.velvia.filo.RowReader
import org.velvia.filo.RowReader.TypedFieldExtractor
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import filodb.core._
import filodb.core.metadata.{Column, Projection, RichProjection}
case class SegmentIndex[P, S](binPartition: Types.BinaryPartition,
segmentId: Types.SegmentId,
partition: P,
segment: S,
chunkMap: BinaryChunkRowMap)
/**
* Encapsulates the reading and scanning logic of the ColumnStore.
* We are careful to separate out ExecutionContext for reading only.
*/
trait ColumnStoreScanner extends StrictLogging {
import filodb.core.Types._
import filodb.core.Iterators._
// Use a separate ExecutionContext for reading. This is important to prevent deadlocks.
// Also, we do not make this implicit so that this trait can be mixed in elsewhere.
def readEc: ExecutionContext
/**
* Reads back all the chunks from multiple columns of a keyRange at once. Note that no paging
* is performed - so don't ask for too large of a range. Recommendation is to read the ChunkRowMaps
* first and use the info there to determine how much to read.
* @param dataset the DatasetRef of the dataset to read chunks from
* @param columns the columns to read back chunks from
* @param keyRange the binary range of segments to read from. Note endExclusive flag.
* @param version the version to read back
* @return a sequence of ChunkedData, each triple is (segmentId, chunkId, bytes) for each chunk
* must be sorted in order of increasing segmentId
*/
def readChunks(dataset: DatasetRef,
columns: Set[ColumnId],
keyRange: BinaryKeyRange,
version: Int)(implicit ec: ExecutionContext): Future[Seq[ChunkedData]]
type ChunkMapInfo = (BinaryPartition, SegmentId, BinaryChunkRowMap)
/**
* Scans over ChunkRowMaps according to the method.
* @return an iterator over SegmentIndex.
*/
def scanChunkRowMaps(projection: RichProjection,
version: Int,
method: ScanMethod)
(implicit ec: ExecutionContext):
Future[Iterator[SegmentIndex[projection.PK, projection.SK]]]
def toSegIndex(projection: RichProjection, chunkMapInfo: ChunkMapInfo):
SegmentIndex[projection.PK, projection.SK] = {
SegmentIndex(chunkMapInfo._1,
chunkMapInfo._2,
projection.partitionType.fromBytes(chunkMapInfo._1),
projection.segmentType.fromBytes(chunkMapInfo._2),
chunkMapInfo._3)
}
// NOTE: this is more or less a single-threaded implementation. Reads of chunks for multiple columns
// happen in parallel, but we still block to wait for all of them to come back.
def scanSegments(projection: RichProjection,
columns: Seq[Column],
version: Int,
method: ScanMethod): Future[Iterator[Segment]] = {
implicit val ec = readEc
val segmentGroupSize = 3
for { segmentIndexIt <- scanChunkRowMaps(projection, version, method) }
yield
(for { // Group by partition key first
(part, partChunkMaps) <- segmentIndexIt
.sortedGroupBy { case SegmentIndex(part, _, _, _, _) => part }
// Subdivide chunk row maps in each partition so we don't read more than we can chew
// TODO: make a custom grouping function based on # of rows accumulated
groupChunkMaps <- partChunkMaps.grouped(segmentGroupSize) }
yield {
val chunkMaps = groupChunkMaps.toSeq
val binKeyRange = BinaryKeyRange(part,
chunkMaps.head.segmentId,
chunkMaps.last.segmentId,
endExclusive = false)
val chunks = Await.result(readChunks(projection.datasetRef, columns.map(_.name).toSet,
binKeyRange, version), 5.minutes)
buildSegments(projection, chunkMaps, chunks, columns).toIterator
}).flatten
}
import scala.util.control.Breaks._
private def buildSegments[P, S](projection: RichProjection,
rowMaps: Seq[SegmentIndex[P, S]],
chunks: Seq[ChunkedData],
schema: Seq[Column]): Seq[Segment] = {
val segments = rowMaps.map { case SegmentIndex(_, _, partition, segStart, rowMap) =>
val segInfo = SegmentInfo(partition, segStart)
new RowReaderSegment(projection, segInfo, rowMap, schema)
}
chunks.foreach { case ChunkedData(columnName, chunkTriples) =>
var segIndex = 0
breakable {
chunkTriples.foreach { case (segmentId, chunkId, chunkBytes) =>
// Rely on the fact that chunks are sorted by segmentId, in the same order as the rowMaps
while (segmentId != rowMaps(segIndex).segmentId) {
segIndex += 1
if (segIndex >= segments.length) {
logger.warn(s"Chunks with segmentId=$segmentId (part ${rowMaps.head.partition})" +
" with no rowmap; corruption?")
break
}
}
segments(segIndex).addChunk(chunkId, columnName, chunkBytes)
}
}
}
segments
}
}