-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathspark-programming-handbook.html
597 lines (405 loc) · 39.6 KB
/
spark-programming-handbook.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
<!doctype html>
<!--[if lt IE 7]><html class="no-js lt-ie9 lt-ie8 lt-ie7" lang="en"> <![endif]-->
<!--[if (IE 7)&!(IEMobile)]><html class="no-js lt-ie9 lt-ie8" lang="en"><![endif]-->
<!--[if (IE 8)&!(IEMobile)]><html class="no-js lt-ie9" lang="en"><![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en"><!--<![endif]-->
<head>
<meta charset="utf-8">
<title>Spark编程手抄 - Python – KINGX</title>
<meta name="description" content="Spark Programming">
<meta name="keywords" content="bigdata, spark">
<!-- Twitter Cards -->
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://kingx.me/images/">
<meta name="twitter:title" content="Spark编程手抄 - Python">
<meta name="twitter:description" content="Spark Programming">
<meta name="twitter:creator" content="@https://twitter.com/KINGX_CN">
<!-- Open Graph -->
<meta property="og:locale" content="en_US">
<meta property="og:type" content="article">
<meta property="og:title" content="Spark编程手抄 - Python">
<meta property="og:description" content="Spark Programming">
<meta property="og:url" content="https://kingx.me/spark-programming-handbook.html">
<meta property="og:site_name" content="KINGX">
<link rel="canonical" href="https://kingx.me/spark-programming-handbook.html">
<link href="https://kingx.me/feed.xml" type="application/atom+xml" rel="alternate" title="KINGX Feed">
<!-- http://t.co/dKP3o1e -->
<meta name="HandheldFriendly" content="True">
<meta name="MobileOptimized" content="320">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<!-- For all browsers -->
<link rel="stylesheet" href="https://kingx.me/assets/css/main.css">
<!-- Webfonts -->
<link href="//fonts.googleapis.com/css?family=Lato:300,400,700,300italic,400italic" rel="stylesheet" type="text/css">
<meta http-equiv="cleartype" content="on">
<!-- Load Modernizr -->
<script src="https://kingx.me/assets/js/vendor/modernizr-2.6.2.custom.min.js"></script>
<!-- Icons -->
<!-- 16x16 -->
<link rel="shortcut icon" href="https://kingx.me/favicon.ico">
<!-- 32x32 -->
<link rel="shortcut icon" href="https://kingx.me/favicon.png">
<!-- 57x57 (precomposed) for iPhone 3GS, pre-2011 iPod Touch and older Android devices -->
<link rel="apple-touch-icon-precomposed" href="https://kingx.me/images/apple-touch-icon-precomposed.png">
<!-- 72x72 (precomposed) for 1st generation iPad, iPad 2 and iPad mini -->
<link rel="apple-touch-icon-precomposed" sizes="72x72" href="https://kingx.me/images/apple-touch-icon-72x72-precomposed.png">
<!-- 114x114 (precomposed) for iPhone 4, 4S, 5 and post-2011 iPod Touch -->
<link rel="apple-touch-icon-precomposed" sizes="114x114" href="https://kingx.me/images/apple-touch-icon-114x114-precomposed.png">
<!-- 144x144 (precomposed) for iPad 3rd and 4th generation -->
<link rel="apple-touch-icon-precomposed" sizes="144x144" href="https://kingx.me/images/apple-touch-icon-144x144-precomposed.png">
<style type="text/css">body {background-image:url(https://kingx.me/images/triangular.png);}</style>
</head>
<body id="post" >
<!--[if lt IE 9]><div class="upgrade"><strong><a href="http://whatbrowser.org/">Your browser is quite old!</strong> Why not upgrade to a different browser to better enjoy this site?</a></div><![endif]-->
<nav id="dl-menu" class="dl-menuwrapper" role="navigation">
<button class="dl-trigger">Open Menu</button>
<ul class="dl-menu">
<li><a href="https://kingx.me/">Home</a></li>
<li>
<a href="#">About</a>
<ul class="dl-submenu">
<li>
<img src="https://kingx.me/images/avatar.jpg" alt="KINGX photo" class="author-photo">
<h4>KINGX</h4>
<p>What is Security</p>
</li>
<li><a href="https://kingx.me/about/"><span class="btn btn-inverse">Learn More</span></a></li>
<li>
<a href="mailto:root#kingx.me"><i class="fa fa-fw fa-envelope"></i> Email</a>
</li>
<li>
<a href="https://twitter.com/KINGX_CN"><i class="fa fa-fw fa-twitter"></i> Twitter</a>
</li>
<li>
<a href="https://weibo.com/u/1624430122"><i class="fa fa-fw fa-weibo"></i> Weibo</a>
</li>
<li>
<a href="https://github.com/KINGX-Code"><i class="fa fa-fw fa-github"></i> GitHub</a>
</li>
</ul><!-- /.dl-submenu -->
</li>
<!-- <li>
<a href="#">Posts</a>
<ul class="dl-submenu">
<li><a href="https://kingx.me/posts/">All Posts</a></li>
<li><a href="https://kingx.me/tags/">All Tags</a></li>
</ul>
</li> -->
<li><a href="https://kingx.me/latest-events/" >Security Incidents</a></li>
<li><a href="https://kingx.me/latest-vulns/" >Vulnerabilities</a></li>
<li><a href="https://kingx.me/pentest-tools/" >Red Team</a></li>
<li><a href="https://kingx.me/cheatsheet/" >CheatSheet</a></li>
<li><a href="https://kingx.me/stop-learning/" >Stop Learning</a></li>
<li><a href="https://kingx.me/posts/" >Archives</a></li>
<li><a href="https://kingx.me/tags/" >Tags</a></li>
<li><a href="https://kingx.me/links/" >Links</a></li>
<li><a href="https://kingx.me/feed.xml" >RSS</a></li>
</ul><!-- /.dl-menu -->
</nav><!-- /.dl-menuwrapper -->
<div id="main" role="main">
<article class="hentry">
<header class="header-title">
<div class="header-title-wrap">
<h1 class="entry-title"><a href="https://kingx.me/spark-programming-handbook.html" rel="bookmark" title="Spark编程手抄 - Python">Spark编程手抄 - Python</a></h1>
<h2><span class="entry-date date published"><time datetime="2017-05-31T00:00:00-04:00">May 31, 2017, KINGX</time></span></h2>
<p class="entry-reading-time">
<i class="fa fa-clock-o"></i>
Reading time ~2 minutes
<span id="busuanzi_container_page_pv">
/ Page View <span id="busuanzi_value_page_pv">0</span> / Site Visitor <span id="busuanzi_value_site_uv">0</span>
</span>
</p><!-- /.entry-reading-time -->
</div><!-- /.header-title-wrap -->
</header>
<div class="entry-content">
<span class="entry-tags" style="color:red;font-size:13px;margin-bottom: 0px;">「声明:本博客中涉及到的相关漏洞均为官方已经公开并修复的漏洞,涉及到的安全技术也仅用于企业安全建设和安全对抗研究。本文仅限业内技术研究与讨论,严禁用于非法用途,否则产生的一切后果自行承担。」</span>
<h1 id="0x00-关于spark">0x00 关于Spark</h1>
<p>Hadoop通过解决了大数据的可靠存储和处理<sup id="fnref:1"><a href="#fn:1" class="footnote">1</a></sup>。</p>
<ol>
<li>HDFS,在普通PC组成的集群上提供高可靠的文件存储。</li>
<li>MapReduce,通过简单的Mapper和Reducer的抽象提供一个编程模型,并发处理数据。</li>
</ol>
<p>但是MapReduce抽象程度低,复杂度高,缺乏整体性,缺乏表达力。</p>
<p>而Apache Spark是一个新兴的大数据处理引擎,主要特点是提供了一个集群的分布式内存抽象,以支持需要工作集的应用。这个抽象就是RDD。</p>
<p>Spark的优势不仅体现在性能提升上的,Spark框架为批处理(Spark Core),交互式(Spark SQL),流式(Spark Streaming),机器学习(MLlib),图计算(GraphX)提供一个统一的数据处理平台,这相对于使用Hadoop有很大优势。</p>
<p><img src="https://gigaom.com/wp-content/uploads/sites/1/2014/05/spark-stack-new.png" alt="Spark-Stack" /></p>
<p>Spark提供了Python编程API,可以利用其接口编写Python程序并提交给Spark集群运行。本文分析和部分翻译了官方编程指南<sup id="fnref:2"><a href="#fn:2" class="footnote">2</a></sup>中Python相关的内容。</p>
<h1 id="0x01-编程第一步初始化">0x01 编程第一步:初始化</h1>
<p>Spark提供的Python编程接口为<code class="highlighter-rouge">pyspark</code>。程序中,首先要创建一个SparkContext对象,用来访问Spark集群。创建SparkContext之前,可以先创建一个SparkConf对象,用来设置相关配置。如:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span><span class="p">,</span> <span class="n">SparkConf</span>
<span class="n">conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">()</span><span class="o">.</span><span class="n">setAppName</span><span class="p">(</span><span class="n">appName</span><span class="p">)</span><span class="o">.</span><span class="n">setMaster</span><span class="p">(</span><span class="n">master</span><span class="p">)</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span>
</code></pre></div></div>
<p>或直接创建SparkContext:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="n">appName</span><span class="o">=</span><span class="s">'test'</span><span class="p">)</span>
</code></pre></div></div>
<h1 id="0x02-resilient-distributed-datasets-rdds">0x02 Resilient Distributed Datasets (RDDs)</h1>
<p>Spark解决方案都是围绕着resilient distributed dataset (RDD)进行的。RDD是一个可以并行操作的具有容错的数据集合。有两种方式创建RDDs, 一是将主程序中的已有数据集合parallelizing化,二是引入一个外部存储上的数据集,比如HDFS、文件系统、HBase等等</p>
<h2 id="21-parallelized-collections">2.1 Parallelized Collections</h2>
<p>使用SparkContext的parallelize()方法,可以将程序中的可迭代对象、数据集合等转换为Spark的并行集合(Parallelized Collections)。
集合中的各个元素被拷贝形成一个分布式数据集,从而可以并行操作。如:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">data</span> <span class="o">=</span> <span class="p">[</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">,</span> <span class="mi">4</span><span class="p">,</span> <span class="mi">5</span><span class="p">]</span>
<span class="n">distData</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
</code></pre></div></div>
<h2 id="22-external-datasets">2.2 External Datasets</h2>
<p>PySpark可以从任何Hadoop支持的数据源(如:本地文件系统, HDFS, Cassandra, HBase, Amazon S3等等)创建一个分布式数据集。Spark支持文本文件、<a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>、以及其他<a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">Hadoop输入格式</a>。</p>
<p>文本文件的RDDs,可以使用SparkContext的textFile()方法创建,该方法接收一个URI输入(本地文件路径, hdfs://, s3n://等等),并逐行读取内容。</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="o">>>></span> <span class="n">distFile</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span>
</code></pre></div></div>
<p>创建之后,可以对<code class="highlighter-rouge">distFile</code>进行数据集操作</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">distFile</span><span class="o">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="nb">len</span><span class="p">(</span><span class="n">s</span><span class="p">))</span><span class="o">.</span><span class="nb">reduce</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="p">)</span>
</code></pre></div></div>
<p><code class="highlighter-rouge">map(line)</code> 对传入的元素逐个进行操作。<code class="highlighter-rouge">reduce(function,list)</code> 接受一个函数和一个list,对list的每一列反复调用该函数,并返回最终结果。</p>
<ol>
<li>数据源如果为本地文件系统路径,则必须保证worker节点可以访问到这个路径,如:将文件拷贝到各个节点、或者使用网络共享的文件系统路径。</li>
<li>Spark中所有基于文件的输入方法(包括textFile),均支持文件目录、压缩包、或者通配符等等形式。如:<code class="highlighter-rouge">textFile("/my/directory")</code>, <code class="highlighter-rouge">textFile("/my/directory/*.txt")</code>和<code class="highlighter-rouge">textFile("/my/directory/*.gz")</code></li>
<li>textFile()函数可选的第二个参数,可用来指定文件的分区数。默认情况下,Spark为每块文件(128M)创建一个分区,也可以手工指定更多的分区。</li>
</ol>
<p>除此之外,Spark Python API 还支持其他数据格式:</p>
<ol>
<li>SparkContext.wholeTextFiles 可以用来读取一个包含很多小文本文件的目录,返回每个小文件键值对(文件名、内容)。与textFile相反,textFile会将每个文件的每一行作为一个记录返回。</li>
<li>RDD.saveAsPickleFile 和 SparkContext.pickleFile 支持将RDD 保存为由pickled Python 对象组成的简单格式。</li>
<li>SequenceFile 和 Hadoop Input/Output 格式</li>
</ol>
<h2 id="23-rdd-operations">2.3 RDD Operations</h2>
<p>RDDs 支持两种类型的操作:</p>
<ol>
<li><strong><code class="highlighter-rouge">transformations</code>,对一个数据集进行一定的计算后创建一个新数据集。</strong></li>
<li><strong><code class="highlighter-rouge">actions</code>,在数据集上运行一系列计算后返回一个值给主节点。</strong></li>
</ol>
<p>如, <code class="highlighter-rouge">map()</code>就是一个 <code class="highlighter-rouge">transformation</code>。它接受一个自定义函数,并将原数据集中的每个元素经过该函数计算后返回为新的RDDs。另一方面,
<code class="highlighter-rouge">reduce()</code>会将每一个元素通过指定函数重复聚集计算后,返回最终结果给主程序(另外还有一个并行的<code class="highlighter-rouge">reduceByKey</code>会返回一个分布式数据集)。
所有transformations都是lazy的,并不会马上计算。如果数据集map计算后会马上在reduce中使用,只会返回reduce的结果,而不是map运算后庞大的数据集。</p>
<p>默认情况下,每次操作一个转换后的RDD都需要重新计算,可以使用<code class="highlighter-rouge">persist</code>或者<code class="highlighter-rouge">cache</code>方法来将各个元素保存在内存中,从而更快的访问。</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span> <span class="c1"># lines仅仅是该文件的指针,在操作之前并不会将内容加载到内存中
</span><span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="nb">len</span><span class="p">(</span><span class="n">s</span><span class="p">))</span> <span class="c1"># 进行map转换,由于lazy机制,lineLengths也不会马上计算
</span><span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="nb">reduce</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="p">)</span> <span class="c1"># reduce是一个action,在这一步spark才会将计算任务分发到每一个工作节点
</span>
<span class="n">lineLengths</span><span class="o">.</span><span class="n">persist</span><span class="p">()</span> <span class="c1"># 如果我们后面还会用到lineLengths,可以在reduce前调用persist,在第一次计算后将它持久化。
</span></code></pre></div></div>
<h3 id="231-passing-functions-to-spark">2.3.1 Passing Functions to Spark</h3>
<p>Spark’s API非常依赖于函数传递,建议通过三种方式</p>
<ol>
<li>Lambda expressions</li>
<li>局部defs</li>
<li>modules的顶层函数</li>
</ol>
<p>当时传递一个类内的函数时,函数中引用到对象的成员变量的话,计算时会将整个对象传递给集群。可以将成员变量赋值给函数内的局部变量来避免这种问题。</p>
<p>https://stackoverflow.com/questions/28569374/spark-returning-pickle-error-cannot-lookup-attribute</p>
<p>Currently, PySpark can not support pickle a class object in current script ( ‘main’), the workaround could be put the implementation of the class into a separate module, then use “bin/spark-submit –py-files xxx.py” in deploy it</p>
<h3 id="232-understanding-closures">2.3.2 Understanding closures</h3>
<p>在集群环境下,Spark程序的变量和函数的生命周期和作用域较为难理解。对作用域之外变量的RDD操作经常会产生问题。</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">counter</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
<span class="c1"># Wrong: Don't do this!!
</span><span class="k">def</span> <span class="nf">increment_counter</span><span class="p">(</span><span class="n">x</span><span class="p">):</span>
<span class="k">global</span> <span class="n">counter</span>
<span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span>
<span class="n">rdd</span><span class="o">.</span><span class="n">foreach</span><span class="p">(</span><span class="n">increment_counter</span><span class="p">)</span>
<span class="k">print</span><span class="p">(</span><span class="s">"Counter value: "</span><span class="p">,</span> <span class="n">counter</span><span class="p">)</span>
</code></pre></div></div>
<p>Spark将RDD操作分成一个个子任务,分发给各个执行节点。执行之前,Spark会计算任务的闭包。闭包就是那些为了RDD操作计算而必须要对执行节点可见的变量和函数。比如上述代码中的<code class="highlighter-rouge">foreach</code>。闭包会序列化之后发送给执行节点。发送给执行节点的闭包内的变量都是副本,都不再是执行节点上原来的变量了。为了适用这些场景,需要适用<code class="highlighter-rouge"> Accumulator</code>,Accumulators是在任务被分割到不同的执行节点的场景下的,安全更新变量的机制。</p>
<p>闭包,有点像是循环或者本地定义的函数,不应该去操作全局状态。如果需要聚合全局数据时,请使用Accumulator。</p>
<h3 id="233-printing-elements-of-an-rdd">2.3.3 Printing elements of an RDD</h3>
<p>有时候需要打印出RDD的每一个元素,用<code class="highlighter-rouge">rdd.foreach(println)</code> 或者 <code class="highlighter-rouge">rdd.map(println)</code>的话,打印的标准输出会在各个执行节点上,而不会显示在主节点上。如果要在主节点上打印的话,需要使用<code class="highlighter-rouge">collect() </code>先将RDD拉取到主节点上,这可能会耗尽主节点的内存,所以如果是要查看一部分数据,可以使用<code class="highlighter-rouge">take()</code>,比如:<code class="highlighter-rouge">rdd.take(100).foreach(println)</code>。</p>
<h3 id="234-working-with-key-value-pairs">2.3.4 Working with Key-Value Pairs</h3>
<p>大多数Spark上的RDD操作包含了各种类型对象,但是有一部分操作只能应用于Key-Value的RDDs。比如 <code class="highlighter-rouge">reduceByKey()</code>。计算一个文本中,每一行出现了多少次:</p>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span>
<span class="n">pairs</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="p">(</span><span class="n">s</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span>
<span class="n">counts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKey</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="p">)</span>
</code></pre></div></div>
<h3 id="235-transformations">2.3.5 Transformations</h3>
<p>Transformations<a href="http://spark.apache.org/docs/latest/programming-guide.html#transformations">详细列表</a>。Spark支持的一些常用Transformation函数如:</p>
<blockquote>
<p>map(func)</p>
</blockquote>
<p>将传入每一个元素经过函数func处理后,返回一个新的分布式数据集</p>
<p><code class="highlighter-rouge">reduce(func)</code>
将每个元素聚合计算后最终返回一个值</p>
<blockquote>
<p>reduceByKey(func, [numTasks])</p>
</blockquote>
<p>reduceByKey就是对元素为KV键值对的RDD中Key相同的元素的Value经过函数func聚合操作。Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。</p>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">val</span> <span class="n">a</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="nc">List</span><span class="o">((</span><span class="mi">1</span><span class="o">,</span><span class="mi">2</span><span class="o">),(</span><span class="mi">1</span><span class="o">,</span><span class="mi">3</span><span class="o">),(</span><span class="mi">3</span><span class="o">,</span><span class="mi">4</span><span class="o">),(</span><span class="mi">3</span><span class="o">,</span><span class="mi">6</span><span class="o">)))</span>
<span class="n">a</span><span class="o">.</span><span class="n">reduceByKey</span><span class="o">((</span><span class="n">x</span><span class="o">,</span><span class="n">y</span><span class="o">)</span> <span class="k">=></span> <span class="n">x</span> <span class="o">+</span> <span class="n">y</span><span class="o">).</span><span class="n">collect</span>
<span class="c1">//结果 Array((1,5), (3,10))
//相同Key的Value进行累加
</span></code></pre></div></div>
<blockquote>
<p>sortByKey([ascending], [numTasks])</p>
</blockquote>
<p>When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.</p>
<blockquote>
<p>sortBy(func,[ascending],[numPartitions])</p>
</blockquote>
<p>func函数返回排序key,如 <code class="highlighter-rouge">sortBy(lambda line: (line[0],line[1]))</code></p>
<p>更多排序:
http://blog.csdn.net/jiangpeng59/article/details/52938465</p>
<h3 id="236-actions">2.3.6 Actions</h3>
<p>下面是一下常用的Action函数:</p>
<blockquote>
<p>reduce(func)</p>
</blockquote>
<p>使用传入的函数:func 将数据集中的元素进行聚合操作,该函数接受两个值(上一个计算结果和后一个元素),返回一个值(聚合结果)。而且函数必须为可交换的(commutative)并且联想的(associative),从而可以正确的进行并行计算。</p>
<blockquote>
<p>collect()</p>
</blockquote>
<p>将数据集的所有元素返回给主节点。这个函数通常用在filter或者其他操作之后,用于返回一个较小的数据子集。</p>
<blockquote>
<p>count()</p>
</blockquote>
<p>返回数据集的元素个数。</p>
<blockquote>
<p>take(n)</p>
</blockquote>
<p>返回数据集中前n个元素组成的数组。first()返回第一个元素,与take(1)类似。</p>
<blockquote>
<p>takeSample(withReplacement, num, [seed])</p>
</blockquote>
<p>从数据集中随机挑选num个元素,组成一个样本数组并返回。使用或者不适用replacement,可选传入一个随机种子seed</p>
<blockquote>
<p>countByKey()</p>
</blockquote>
<p>对于(K, V)形式的RDD,返回(K, Int)形式的键值对,包含了每个键的元素个数。</p>
<blockquote>
<p>foreach(func)</p>
</blockquote>
<p>对数据集的每一个元素运行函数func。</p>
<h3 id="237-shuffle-operations">2.3.7 Shuffle operations</h3>
<p>Spark中的一些操作会触发shuffle事件,shuffle是Spark中一种重新分配数据的机制,因此它在不同分区之间不同分组。通常包括在不同执行节点和机器之间拷贝数据,从而使shuffle事件变得耗费较大并且很复杂。</p>
<h4 id="background">background</h4>
<p>我们可以通过<code class="highlighter-rouge">reduceByKey</code>操作来理解shuffle过程中发生了什么。reduceByKey操作将每个Key对应的所有值进行聚合操作,并最终返回一个结果,由每个Key和其对应的聚合结果组成一个个新的元组,最终生成一个新的RDD。而挑战在于一个Key的所有值并不一定都存在于同一个分区上,甚至不在一台机器上,但是他们必须协同来完成计算。</p>
<p>Spark中以便操作,数据通常不会跨分区分布。计算过程中,单个任务一般只操作单个分区。从而为了<code class="highlighter-rouge">reduceByKey</code>这单个reduce任务筹备数据,Spark需要执行一个all-to-all操作,它需要读取所有分区寻找所有Key对应的Value,然后将跨分区的Value集中起来计算每个Key聚合结果。这就是所谓的shuffle。</p>
<p>新shuffle的分区中的数据不是有序的,如果想要在shuffle之后得到一些可预知顺序的结果,可以通过以下方法:</p>
<ul>
<li><code class="highlighter-rouge">mapPartitions</code>来给每个用到的分区进行排序,比如:<code class="highlighter-rouge">.sorted</code></li>
<li>使用<code class="highlighter-rouge">repartitionAndSortWithinPartitions</code>在重新分区的同时有效地进行分区排序</li>
<li>使用<code class="highlighter-rouge">sortBy</code>进行全局的RDD排序</li>
</ul>
<p>会引发shuffle的操作包括:<strong>重新分区操作</strong>,如<code class="highlighter-rouge">repartition</code>和<code class="highlighter-rouge">coalesce</code>;<strong>ByKey操作</strong>(除了计数类操作),比如 <code class="highlighter-rouge">groupByKey</code> 和 <code class="highlighter-rouge">reduceByKey</code>;<strong>join</strong>操作,比如<code class="highlighter-rouge"> cogroup</code>和<code class="highlighter-rouge">join</code>。</p>
<h4 id="performance-impact">Performance Impact</h4>
<p>Shuffle是一个耗资源的操作。在传输前需要内存数据来组织记录,所以某些shuffle操作会占用大量的堆内存。当数据量超过内存大小后,Spark会将这些表写入磁盘,同时会带来额外的磁盘I/O和垃圾回收。</p>
<p>Shuffle也会产生大量的中间文件。受Spark的垃圾回收机制影响,长时间运行的Spark任务会消耗大量的磁盘空间。临时存储路径由<code class="highlighter-rouge">spark.local.dir </code>在Spark上下文进行配置。</p>
<p>有很多配置可以用来调整Shuffle的行为,可以参见Spark配置指南中的‘‘Shuffle Behavior’章节。<a href="http://spark.apache.org/docs/latest/configuration.html">Spark Configuration Guide</a></p>
<h2 id="24-rdd-persistence">2.4 RDD Persistence</h2>
<p>Spark一个重要的功能就是在内存中持久化(或缓存)persisting (or caching) 一个数据集。当你持久化RDD后,每个节点会存储它在内存中计算的任何分区,并在对该数据集的其他操作上重用这部分数据。这会使后续操作快十倍以上。</p>
<p>使用<code class="highlighter-rouge">persist()</code> 或者 <code class="highlighter-rouge">cache()</code>方法来缓存RDD。它会现在节点上计算,然后缓存在内存中。存储时容错的,如果哪一个分区的数据丢失了,它会自动重新计算该部分数据。</p>
<p>可传参控制存储级别(序列化后存入内存、磁盘等等):
<a href="http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence">http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence</a></p>
<h3 id="remove-data">Remove Data</h3>
<p>Spark会自动将老旧的数据移除掉,根据最不常使用的规则。使用RDD.unpersist()方法可以手动移除数据。</p>
<h1 id="0x03-shared-variables">0x03 Shared Variables</h1>
<p>正常来说,当一个函数传给Spark操作(比如<code class="highlighter-rouge">map</code>和<code class="highlighter-rouge">reduce</code>)时,它会在远程集群节点上被执行,而函数中的所有变量会复制成好几份。这些变量被复制到每个机器上,当远程节点向主节点反馈结果时,也不会更新这些变量。任务之间的读写共享变量变得很低效。然而Spark为两种通用场景提供了受限的共享变量:<code class="highlighter-rouge">broadcast variables</code>和<code class="highlighter-rouge">accumulators</code>。</p>
<h2 id="31-broadcast-variables">3.1 Broadcast Variables</h2>
<p>Broadcast variables允许程序员在每台机器上维持一个只读的缓存变量,而不是在每个任务中传输该变量的副本。比如:它们可以用来有效地给每个节点提供一个大的数据集副本。Spark也尝试使用更有效的广播算法分发broadcast变量,来减少通信消耗。</p>
<p>Spark的操作会通过阶段集合(Stages)来执行,被分布式的“shuffle”操作分离。在每个阶段(Stage),Spark会自动广播所有任务都需要的共同数据。这种方式广播的数据以序列化的形式缓存,并在任务运行之前反序列化。这也就表明,显示的创建broadcast variables仅在任务跨多阶段并且需要相同的数据时有用。或者当数据缓存为反序列化形式非常重要时才需要显示创建broadcat Variables。</p>
<p>通过调用<code class="highlighter-rouge">SparkContext.broadcast(v)</code>可以从变量v创建一个Broadcast variables。broadcast variables包装了v,可以通过调用<code class="highlighter-rouge">value</code>方法来访问它的值。</p>
<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
</code></pre></div></div>
<p>Bradcast variables创建之后,每个在集群上运行的函数都应该使用它而不是变量v,这样v就不会多次重复的传输到各个节点上。另外,在广播之后,为了保证每个节点获得了相同的broadcast variables的值,v变量不应该再修改。(e.g. if the variable is shipped to a new node later).</p>
<h2 id="32-accumulators">3.2 Accumulators</h2>
<p>Accumulators是仅通过关联和交换操作来“added”的变量,因此可以有效的支持并行。可以用来实现计数器(在MapReduce中)或者求和。Spark原生支持数字类型的accumulators,程序员可以添加更多类型的支持。</p>
<p>作为一个用户,你可以创建命名的或者未命名的accumulators。如下图所示,对于修改了这个accumulator的stage,命名的accumulator会在web界面上显示。Spark在Tasks表格中显示了每个任务修改accumulator的值。</p>
<p><img src="http://spark.apache.org/docs/latest/img/spark-webui-accumulators.png" alt="spark-accumulator" /></p>
<p>调用<code class="highlighter-rouge">SparkContext.accumulator(v)</code>可以从v创建一个accumulator。集群上运行的任务可以通过<code class="highlighter-rouge">add</code>方法或者<code class="highlighter-rouge">+=</code>操作符来进行加运算。但是并不能读取accumulator的值。只有主节点可以读取它的值,使用<code class="highlighter-rouge">value</code>方法。</p>
<div class="highlighter-rouge"><div class="highlight"><pre class="highlight"><code>>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
>>> accum.value
10
</code></pre></div></div>
<p>上面的代码使用了accumulator内置支持的Int类型,程序员也可以通过定义AccumulatorParam子类来创建accumulator的其他类型。AccumulatorParam接口有两个方法: <code class="highlighter-rouge">zero</code> 为你的数据类型提供一个零值,<code class="highlighter-rouge">addInPlace</code>用来将两个值相加。</p>
<h1 id="0x09-references">0x09 References</h1>
<div class="footnotes">
<ol>
<li id="fn:1">
<p>https://www.zhihu.com/question/26568496 <a href="#fnref:1" class="reversefootnote">↩</a></p>
</li>
<li id="fn:2">
<p>http://spark.apache.org/docs/latest/programming-guide.html <a href="#fnref:2" class="reversefootnote">↩</a></p>
</li>
</ol>
</div>
<footer class="entry-meta">
<span class="entry-tags" style="color:black;font-size:13px;margin-bottom: 0px;">欢迎订阅我的微信公众号</span>
<img src="/images/secengine.jpg" alt="welcome subscribe"/>
<span class="entry-tags"><a href="https://kingx.me/tags/#bigdata" title="Pages tagged bigdata" class="tag"><span class="term">bigdata</span></a><a href="https://kingx.me/tags/#spark" title="Pages tagged spark" class="tag"><span class="term">spark</span></a></span>
<div class="social-share">
<ul class="socialcount socialcount-small inline-list">
<li class="weibo"><a href="http://service.weibo.com/share/share.php?title=分享KINGX的文章《Spark编程手抄 - Python》&url=https://kingx.me/spark-programming-handbook.html&source=bookmark" title="Share on Weibo" target="_blank"><span class="count"><i class="fa fa-weibo"></i> WEIBO</span></a></li>
<li class="facebook"><a href="https://www.facebook.com/sharer/sharer.php?u=https://kingx.me/spark-programming-handbook.html" title="Share on Facebook"><span class="count"><i class="fa fa-facebook-square"></i> Like</span></a></li>
<li class="twitter"><a href="https://twitter.com/intent/tweet?text=https://kingx.me/spark-programming-handbook.html" title="Share on Twitter"><span class="count"><i class="fa fa-twitter-square"></i> Tweet</span></a></li>
<li class="googleplus"><a href="https://plus.google.com/share?url=https://kingx.me/spark-programming-handbook.html" title="Share on Google Plus"><span class="count"><i class="fa fa-google-plus-square"></i> +1</span></a></li>
</ul>
</div><!-- /.social-share -->
<!--
<div class="ds-share" data-thread-key="/spark-programming-handbook" data-title="Spark编程手抄 - Python" data-images="" data-content="Spark编程手抄 - Python" data-url="https://kingx.me/spark-programming-handbook.html">
<div class="ds-share-inline">
<ul class="ds-share-icons-16">
<li data-toggle="ds-share-icons-more"><a class="ds-more" href="javascript:void(0);">分享到:</a></li>
<li><a class="ds-weibo" href="javascript:void(0);" data-service="weibo">微博</a></li>
<li><a class="ds-qzone" href="javascript:void(0);" data-service="qzone">QQ空间</a></li>
<li><a class="ds-qqt" href="javascript:void(0);" data-service="qqt">腾讯微博</a></li>
<li><a class="ds-wechat" href="javascript:void(0);" data-service="wechat">微信</a></li>
</ul>
<div class="ds-share-icons-more">
</div>
</div>
</div>
-->
</footer>
</div><!-- /.entry-content -->
<div class="read-more">
<div class="read-more-header">
<a href="https://kingx.me/samba-rce-cve-2017-7494.html" class="read-more-btn">Read More</a>
</div><!-- /.read-more-header -->
<div class="read-more-content">
<h3><a href="https://kingx.me/ai-driven-static-code-audit-vulnhuntr.html" title="探索 AI 驱动的代码安全工具 VulnHuntr">探索 AI 驱动的代码安全工具 VulnHuntr</a></h3>
<p>Explore VulnHuntr <a href="https://kingx.me/ai-driven-static-code-audit-vulnhuntr.html">Continue reading</a></p>
</div><!-- /.read-more-content -->
<div class="read-more-list">
<div class="list-item">
<h4><a href="https://kingx.me/Patch-log4j.html" title="Log4j 严重漏洞修复方案参考 CVE-2021-44228">Log4j 严重漏洞修复方案参考 CVE-2021-44228</a></h4>
<span>Published on December 12, 2021</span>
</div><!-- /.list-item -->
<div class="list-item">
<h4><a href="https://kingx.me/Thinking-about-the-RedTeam-Engagement.html" title="浅谈大规模红蓝对抗攻与防">浅谈大规模红蓝对抗攻与防</a></h4>
<span>Published on October 12, 2020</span>
</div><!-- /.list-item -->
</div><!-- /.read-more-list -->
</div><!-- /.read-more -->
</article>
</div><!-- /#main -->
<div class="footer-wrapper">
<footer role="contentinfo">
<span>© 2024 KINGX. Powered by Jekyll using the HPSTR Theme.</span>
</footer>
</div><!-- /.footer-wrapper -->
<!--<script src="//ajax.googleapis.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>-->
<!-- <script src="http://libs.baidu.com/jquery/1.9.1/jquery.min.js"></script> -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/1.9.1/jquery.min.js"></script>
<script>window.jQuery || document.write('<script src="https://kingx.me/assets/js/vendor/jquery-1.9.1.min.js"><\/script>')</script>
<script src="https://kingx.me/assets/js/scripts.min.js"></script>
<script>
var _hmt = _hmt || [];
(function() {
var hm = document.createElement("script");
if(location.host=="kingx.me"){
hm.src = "https://hm.baidu.com/hm.js?d11d8512e0bc6992b9c9bbf2d266ce31";
}else if(location.host=="kingx.sinaapp.com"){
hm.src = "https://hm.baidu.com/hm.js?d1b3dbd97b73868454f102755fdf51ba";
}
var s = document.getElementsByTagName("script")[0];
s.parentNode.insertBefore(hm, s);
})();
</script>
<!-- Busuanzi Analytics -->
<script async src="//busuanzi.ibruce.info/busuanzi/2.3/busuanzi.pure.mini.js"></script>
</body>
</html>