This repository has been archived by the owner on Jun 29, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
SNAP Performance.html
183 lines (183 loc) · 9.59 KB
/
SNAP Performance.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.0/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script>
</head>
<body>
<h1>Factors that influence SNAP performance </h1>
<h2>Ingestion Performance </h2>
<p>SNAP ingestion consists of two parts. </p>
<ol type="1">
<li><p>- Flattening the star schema ( optimally when there is a star schema)</p></li>
<li><p>- Building an index</p></li>
</ol>
<h3>Design factors</h3>
<ul>
<li><p>Qube design </p>
<ul>
<li><p>column cardinality </p>
<ul>
<li><p>High number of unique values in a column increases the time to build the index </p></li>
<li><p>When creating the star schema, create with stats and analyze the various statistics of the columns to determine potential impact. </p></li>
</ul></li>
<li><p>Number of dimension columns </p>
<ul>
<li><p>Time to index 100 columns is lesser than 1000 columns for example .</p></li>
<li><p>The time taken to build the index is also determined by the number of dimensions. </p></li>
</ul></li>
<li><p>Star schema joins and flattening </p>
<ul>
<li><p>The number of tables in the star schema influences the time to flatten prior to building the index. SNAP flattens the star schema and then builds an index for the dimensions. When there are more tables there is more time spent to flatten </p></li>
<li><p>The skewness of data in the join graph can influence the time to flatten. </p></li>
<li><p>Shuffle tuning is critical since the flattening is executed as a Spark job. </p></li>
</ul></li>
<li><p>Partitioning strategy </p>
<ul>
<li><p>Building a SNAP index on 100Gb non-paritioned table can have different resource requirements than the same table partitioned by say YEAR or other columns. SNAP ingestion can be done per partition and hence the performance of ingestion can be significantly improved by partitioning.</p></li>
</ul></li>
<li><p>Settings on the Qube </p>
<ul>
<li><p>SNAP prefers its segments( files that make up the index) to be around 250MB for normal sized Qubes( 100-200 columns ) or < 5million rows per file.</p></li>
<li><p>The setting “preferredSegmentSize”, when defining the OLAP Index, determines the size of the segment. </p></li>
<li><p>The setting “avgSizePerPartition” is a rough estimate of the uncompressed size of the SNAP partition upon indexing and is a function of the original data being ingested in SNAP. </p>
<ul>
<li><p>For example if your original dataset is csv and you are ingesting 100GB of a partition, then avgSizePerPartition=20g based on our experience - not all datasets are the same and so this can vary but we can expect 70-80% compression. </p></li>
<li><p>If your dataset is Orc or Parquet, SNAP can be 1-1.5 times the partition size.</p></li>
<li><p>The ratio of avgSizePerPartition and preferredSegmentSize determines the number of physical SNAP files created per partition for the index. </p></li>
<li><p>Example indexing a 100GB csv partition </p>
<ul>
<li><p>avgSizePerPartition=20g</p></li>
<li><p>preferredSegmentSize=250mb</p></li>
<li><p>Number of tasks running indexing is 20g/250mb= 80 and 80 files will be produced.</p></li>
<li><p>If the avgSizePerPartition=2g ( by mistake for example) then 8 tasks will index the data and 8 files totalling 20Gb will be produced. This can cause the indexing to be much slower since each task is carrying a much heavier load due to reduction in parallelism </p></li>
<li><p>If the avgSizePerPartition=200g ( again by mistake) then 800 tasks will be spawned and 800 files each of 25mb approximately will be produced. This is again very inefficient for SNAP since small files impact query performance. </p></li>
<li><p>The Spark UI will have information on number of tasks, size being processed etc while the indexing is running </p></li>
<li><p>To come up with the most efficient avgSizePerPartition it is best to index the data on a TABLE SAMPLE and estimate the size based on the sample index before doing it for all the data.</p></li>
</ul></li>
</ul></li>
</ul></li>
</ul></li>
</ul>
<h2>Run time</h2>
<ul>
<li><p>Parallelism of the jobs.</p>
<ul>
<li><p>Flattening the data involves creating multiple tasks to create files that are used as input into the indexing stage. At this stage, depending on the dataset being ingested, care must be taken to set spark.sql.shuffle.partitions appropriately so that few tasks don’t end up processing all the data. Typically a task should be processing 250-500MB of data.</p></li>
</ul></li>
<li><p>Broadcast of small tables</p>
<p>At the time of ingestion, large tables can be joined with smaller tables. In these cases spark.sql.autoBroadcastJoinThreshold can be set to a higher value to make use of broadcast joins and avoid sort merge on large clusters</p></li>
<li><p>Off-heap memory</p>
<p>The indexing process uses off-heap. Depending on the index size the off-heap has to be adjusted. Some times this can be 4GB to 6GB per core depending on the size of the partition being indexed.</p></li>
</ul>
<h1>Query Performance </h1>
<ul>
<li><p>Number of cores available in the cluster </p>
<ul>
<li><p>For a given index the number of waves determines the run time query performance </p>
<ul>
<li><p>Example: An index has 1000 segments. The cluster has 100 cores. This means a query that executes on all segments( partition filters can reduce the number of segments scanned) will have 1000/100 = 10 Waves. If the median query segment time is 100ms then total query time will be approximately 100ms*10 = 1s. </p>
<ul>
<li><p>The same query, when it has 200 cores will execute in 5 waves. So total time will be 0.5s. </p></li>
</ul></li>
</ul></li>
</ul></li>
<li><p>Per segment time </p>
<ul>
<li><p>For a given query one can see the Task time( which is the time to query a given segment) on the Spark UI. The segment times should be ideally < 0.5s of the cube is properly designed and segments are memory mapped. Segment times can be high when there more than 5 million rows per segment.</p>
<ul>
<li><p>Segment times can be large when </p>
<ul>
<li><p>The segment has to be pulled from deep storage to local cache</p></li>
<li><p>The segment has to be read from disk </p></li>
</ul></li>
</ul></li>
</ul></li>
<li><p>Shuffle</p>
<ul>
<li><p>SNAP queries have multiple stages - a Stage that queries the segment and stages that are executed in Spark. </p>
<ul>
<li><p>The setting spark.sql.shuffle.partitions at query time should be set to 20 for most adHoc query usecases. ( note when indexing this may not be sufficient and so this should be a session level query time setting)</p></li>
</ul></li>
</ul></li>
<li><p>Executor memory </p>
<ul>
<li><p>The executor memory is the Heap memory available to the Spark jobs. Typically for queries we set this as 2Gb per core.</p></li>
</ul></li>
<li><p>Off-heap memory </p>
<ul>
<li><p>For some operations the off-heap is used. The off heap is a function of another setting + a constant. It is typically set atleast to number of cores per executor * the value below.</p>
<ul>
<li><p>spark.sparklinedata.spmd.gByEngine.offheapsize=128mb</p></li>
</ul></li>
<li><p>For example if the number of cores per executor is 4 then off-heap should be atleast 512mb( 4* 128) and should be set to 1g to accomodate other spark use of the off-heap.</p></li>
<li><p>Off-heap memory is set as follows in the properties file - the setting MaxDirectMemorySize is the off-heap.</p>
<p>spark.executor.extraJavaOptions= -XX:MaxDirectMemorySize=2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:OnOutOfMemoryError='kill -9 %p'</p></li>
</ul></li>
<li><p>Auto broadcast joins </p>
<ul>
<li><p>For query, turn off auto broadcast. The driver can become a bottleneck in doing broadcast joins and can impact performance</p></li>
</ul></li>
</ul>
<p></p>
<h1>Infrastucture</h1>
<p></p>
<h2>IO/Disk</h2>
<ul>
<li><p>SNAP depends on Fast IO and memory mapping. SNAP downloads its segments ( files) to a local cache. This local cache</p>
<ul>
<li><p>should be across multiple disks</p>
<ul>
<li><p>Should be SSDs</p>
<ul>
<li><p>Should <strong>not</strong> be on a disk where other apps have a lot of IO ( /tmp)</p></li>
</ul></li>
</ul></li>
</ul></li>
</ul>
<p></p>
<h2>Nodes</h2>
<p>Total RAM</p>
<table>
<colgroup>
<col style="width: 10%" />
<col style="width: 20%" />
<col style="width: 70%" />
</colgroup>
<tbody>
<tr class="odd">
<td><p>HEAP</p></td>
<td><p>OFF-HEAP</p></td>
<td><p>SPACE FOR SEGMENTS IN MEMORY</p></td>
</tr>
</tbody>
</table>
<p></p>
<p>The space for segments to be loaded in memory is typically SHARED across apps in a YARN cluster. This could be an issue because SNAP will have to access the segments from DISK when the segments are not in memory. IO is typically 10x slower than RAM access. In a dedicated NODE this is not an issue because no other process will be using the RAM other than SNAP processes.</p>
<p></p>
<h2>CPU</h2>
<p></p>
<p>The number of CPUs allocated for a SNAP cluster should be a function of the number of segments constituting the SNAP indexes that are used in the cluster.</p>
<p></p>
<p>Example</p>
<p>Index 1 = 100 GB, 400 segments</p>
<p>Index 2 = 50 GB, 200 segments</p>
<p>Index 3 = 200 GB , 800 segments</p>
<p></p>
<p>Total across all Indexes = 1400 segments</p>
<p></p>
<p>Assuming median 100ms per segment query time the number of cores needed to achieve</p>
<p></p>
<ul>
<li><p>1s per query performance = 1400 / ( 1s/100ms ) = 140 cores.</p>
<ul>
<li><p>2s per query performance = 1400/ ( 2s/100ms)=70 cores.</p>
<ul>
<li><p>3s per query performance = 1400 / ( 3s/100ms) = 47 cores.</p></li>
</ul></li>
</ul></li>
</ul>
<p></p>
<p>NOTE: 100ms per segment times are when the segment is in memory and is optimal and can be as low as 25-50ms depending on the use case.</p>
</body>
</html>