-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathprogramming-guide.html
executable file
·1761 lines (1336 loc) · 142 KB
/
programming-guide.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Spark Programming Guide - Spark 2.0.0 Documentation</title>
<meta name="description" content="Spark 2.0.0 programming guide in Java, Scala and Python">
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<div class="navbar navbar-fixed-top" id="topbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">2.0.0</span>
</div>
<ul class="nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
<li class="divider"></li>
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
<li><a href="sql-programming-guide.html">DataFrames, Datasets and SQL</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="sparkr.html">SparkR (R on Spark)</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
<li><a href="api/java/index.html">Java</a></li>
<li><a href="api/python/index.html">Python</a></li>
<li><a href="api/R/index.html">R</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
</ul>
</li>
<li class="dropdown">
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li class="divider"></li>
<li><a href="building-spark.html">Building Spark</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects">Supplemental Projects</a></li>
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v2.0.0</span></p>-->
</div>
</div>
</div>
<div class="container-wrapper">
<div class="content" id="content">
<h1 class="title">Spark Programming Guide</h1>
<ul id="markdown-toc">
<li><a href="#overview">Overview</a></li>
<li><a href="#linking-with-spark">Linking with Spark</a></li>
<li><a href="#initializing-spark">Initializing Spark</a> <ul>
<li><a href="#using-the-shell">Using the Shell</a></li>
</ul>
</li>
<li><a href="#resilient-distributed-datasets-rdds">Resilient Distributed Datasets (RDDs)</a> <ul>
<li><a href="#parallelized-collections">Parallelized Collections</a></li>
<li><a href="#external-datasets">External Datasets</a></li>
<li><a href="#rdd-operations">RDD Operations</a> <ul>
<li><a href="#basics">Basics</a></li>
<li><a href="#passing-functions-to-spark">Passing Functions to Spark</a></li>
<li><a href="#understanding-closures-a-nameclosureslinka">Understanding closures <a name="ClosuresLink"></a></a> <ul>
<li><a href="#example">Example</a></li>
<li><a href="#local-vs-cluster-modes">Local vs. cluster modes</a></li>
<li><a href="#printing-elements-of-an-rdd">Printing elements of an RDD</a></li>
</ul>
</li>
<li><a href="#working-with-key-value-pairs">Working with Key-Value Pairs</a></li>
<li><a href="#transformations">Transformations</a></li>
<li><a href="#actions">Actions</a></li>
<li><a href="#shuffle-operations">Shuffle operations</a> <ul>
<li><a href="#background">Background</a></li>
<li><a href="#performance-impact">Performance Impact</a></li>
</ul>
</li>
</ul>
</li>
<li><a href="#rdd-persistence">RDD Persistence</a> <ul>
<li><a href="#which-storage-level-to-choose">Which Storage Level to Choose?</a></li>
<li><a href="#removing-data">Removing Data</a></li>
</ul>
</li>
</ul>
</li>
<li><a href="#shared-variables">Shared Variables</a> <ul>
<li><a href="#broadcast-variables">Broadcast Variables</a></li>
<li><a href="#accumulators">Accumulators</a></li>
</ul>
</li>
<li><a href="#deploying-to-a-cluster">Deploying to a Cluster</a></li>
<li><a href="#launching-spark-jobs-from-java--scala">Launching Spark jobs from Java / Scala</a></li>
<li><a href="#unit-testing">Unit Testing</a></li>
<li><a href="#migrating-from-pre-10-versions-of-spark">Migrating from pre-1.0 Versions of Spark</a></li>
<li><a href="#where-to-go-from-here">Where to Go from Here</a></li>
</ul>
<h1 id="overview">Overview</h1>
<p>At a high level, every Spark application consists of a <em>driver program</em> that runs the user’s <code>main</code> function and executes various <em>parallel operations</em> on a cluster. The main abstraction Spark provides is a <em>resilient distributed dataset</em> (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to <em>persist</em> an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.</p>
<p>A second abstraction in Spark is <em>shared variables</em> that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: <em>broadcast variables</em>, which can be used to cache a value in memory on all nodes, and <em>accumulators</em>, which are variables that are only “added” to, such as counters and sums.</p>
<p>This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow
along with if you launch Spark’s interactive shell – either <code>bin/spark-shell</code> for the Scala shell or
<code>bin/pyspark</code> for the Python one.</p>
<h1 id="linking-with-spark">Linking with Spark</h1>
<div class="codetabs">
<div data-lang="scala">
<p>Spark 2.0.0-SNAPSHOT uses Scala 2.11. To write
applications in Scala, you will need to use a compatible Scala version (e.g. 2.11.X).</p>
<p>To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:</p>
<pre><code>groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.0.0-SNAPSHOT
</code></pre>
<p>In addition, if you wish to access an HDFS cluster, you need to add a dependency on
<code>hadoop-client</code> for your version of HDFS.</p>
<pre><code>groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
</code></pre>
<p>Finally, you need to import some Spark classes into your program. Add the following lines:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.SparkContext</span>
<span class="k">import</span> <span class="nn">org.apache.spark.SparkConf</span></code></pre></div>
<p>(Before Spark 1.3.0, you need to explicitly <code>import org.apache.spark.SparkContext._</code> to enable essential implicit conversions.)</p>
</div>
<div data-lang="java">
<p>Spark 2.0.0-SNAPSHOT works with Java 7 and higher. If you are using Java 8, Spark supports
<a href="http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html">lambda expressions</a>
for concisely writing functions, otherwise you can use the classes in the
<a href="api/java/index.html?org/apache/spark/api/java/function/package-summary.html">org.apache.spark.api.java.function</a> package.</p>
<p>To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:</p>
<pre><code>groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.0.0-SNAPSHOT
</code></pre>
<p>In addition, if you wish to access an HDFS cluster, you need to add a dependency on
<code>hadoop-client</code> for your version of HDFS.</p>
<pre><code>groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
</code></pre>
<p>Finally, you need to import some Spark classes into your program. Add the following lines:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.api.java.JavaSparkContext</span>
<span class="k">import</span> <span class="nn">org.apache.spark.api.java.JavaRDD</span>
<span class="k">import</span> <span class="nn">org.apache.spark.SparkConf</span></code></pre></div>
</div>
<div data-lang="python">
<p>Spark 2.0.0-SNAPSHOT works with Python 2.6+ or Python 3.4+. It can use the standard CPython interpreter,
so C libraries like NumPy can be used. It also works with PyPy 2.3+.</p>
<p>To run Spark applications in Python, use the <code>bin/spark-submit</code> script located in the Spark directory.
This script will load Spark’s Java/Scala libraries and allow you to submit applications to a cluster.
You can also use <code>bin/pyspark</code> to launch an interactive Python shell.</p>
<p>If you wish to access HDFS data, you need to use a build of PySpark linking
to your version of HDFS.
<a href="http://spark.apache.org/downloads.html">Prebuilt packages</a> are also available on the Spark homepage
for common HDFS versions.</p>
<p>Finally, you need to import some Spark classes into your program. Add the following line:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="nn">pyspark</span> <span class="kn">import</span> <span class="n">SparkContext</span><span class="p">,</span> <span class="n">SparkConf</span></code></pre></div>
<p>PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH,
you can specify which version of Python you want to use by <code>PYSPARK_PYTHON</code>, for example:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ PYSPARK_PYTHON</span><span class="o">=</span>python3.4 bin/pyspark
<span class="nv">$ PYSPARK_PYTHON</span><span class="o">=</span>/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py</code></pre></div>
</div>
</div>
<h1 id="initializing-spark">Initializing Spark</h1>
<div class="codetabs">
<div data-lang="scala">
<p>The first thing a Spark program must do is to create a <a href="api/scala/index.html#org.apache.spark.SparkContext">SparkContext</a> object, which tells Spark
how to access a cluster. To create a <code>SparkContext</code> you first need to build a <a href="api/scala/index.html#org.apache.spark.SparkConf">SparkConf</a> object
that contains information about your application.</p>
<p>Only one SparkContext may be active per JVM. You must <code>stop()</code> the active SparkContext before creating a new one.</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">conf</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">SparkConf</span><span class="o">().</span><span class="n">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="n">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">)</span>
<span class="k">new</span> <span class="nc">SparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">)</span></code></pre></div>
</div>
<div data-lang="java">
<p>The first thing a Spark program must do is to create a <a href="api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html">JavaSparkContext</a> object, which tells Spark
how to access a cluster. To create a <code>SparkContext</code> you first need to build a <a href="api/java/index.html?org/apache/spark/SparkConf.html">SparkConf</a> object
that contains information about your application.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">SparkConf</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SparkConf</span><span class="o">().</span><span class="na">setAppName</span><span class="o">(</span><span class="n">appName</span><span class="o">).</span><span class="na">setMaster</span><span class="o">(</span><span class="n">master</span><span class="o">);</span>
<span class="n">JavaSparkContext</span> <span class="n">sc</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JavaSparkContext</span><span class="o">(</span><span class="n">conf</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="python">
<p>The first thing a Spark program must do is to create a <a href="api/python/pyspark.html#pyspark.SparkContext">SparkContext</a> object, which tells Spark
how to access a cluster. To create a <code>SparkContext</code> you first need to build a <a href="api/python/pyspark.html#pyspark.SparkConf">SparkConf</a> object
that contains information about your application.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">conf</span> <span class="o">=</span> <span class="n">SparkConf</span><span class="p">()</span><span class="o">.</span><span class="n">setAppName</span><span class="p">(</span><span class="n">appName</span><span class="p">)</span><span class="o">.</span><span class="n">setMaster</span><span class="p">(</span><span class="n">master</span><span class="p">)</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span></code></pre></div>
</div>
</div>
<p>The <code>appName</code> parameter is a name for your application to show on the cluster UI.
<code>master</code> is a <a href="submitting-applications.html#master-urls">Spark, Mesos or YARN cluster URL</a>,
or a special “local” string to run in local mode.
In practice, when running on a cluster, you will not want to hardcode <code>master</code> in the program,
but rather <a href="submitting-applications.html">launch the application with <code>spark-submit</code></a> and
receive it there. However, for local testing and unit tests, you can pass “local” to run Spark
in-process.</p>
<h2 id="using-the-shell">Using the Shell</h2>
<div class="codetabs">
<div data-lang="scala">
<p>In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called <code>sc</code>. Making your own SparkContext will not work. You can set which master the
context connects to using the <code>--master</code> argument, and you can add JARs to the classpath
by passing a comma-separated list to the <code>--jars</code> argument. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the <code>--packages</code> argument. Any additional repositories where dependencies might exist (e.g. SonaType)
can be passed to the <code>--repositories</code> argument. For example, to run <code>bin/spark-shell</code> on exactly
four cores, use:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-shell --master <span class="nb">local</span><span class="o">[</span>4<span class="o">]</span></code></pre></div>
<p>Or, to also add <code>code.jar</code> to its classpath, use:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-shell --master <span class="nb">local</span><span class="o">[</span>4<span class="o">]</span> --jars code.jar</code></pre></div>
<p>To include a dependency using maven coordinates:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/spark-shell --master <span class="nb">local</span><span class="o">[</span>4<span class="o">]</span> --packages <span class="s2">"org.example:example:0.1"</span></code></pre></div>
<p>For a complete list of options, run <code>spark-shell --help</code>. Behind the scenes,
<code>spark-shell</code> invokes the more general <a href="submitting-applications.html"><code>spark-submit</code> script</a>.</p>
</div>
<div data-lang="python">
<p>In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called <code>sc</code>. Making your own SparkContext will not work. You can set which master the
context connects to using the <code>--master</code> argument, and you can add Python .zip, .egg or .py files
to the runtime path by passing a comma-separated list to <code>--py-files</code>. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the <code>--packages</code> argument. Any additional repositories where dependencies might exist (e.g. SonaType)
can be passed to the <code>--repositories</code> argument. Any python dependencies a Spark Package has (listed in
the requirements.txt of that package) must be manually installed using pip when necessary.
For example, to run <code>bin/pyspark</code> on exactly four cores, use:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/pyspark --master <span class="nb">local</span><span class="o">[</span>4<span class="o">]</span></code></pre></div>
<p>Or, to also add <code>code.py</code> to the search path (in order to later be able to <code>import code</code>), use:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ </span>./bin/pyspark --master <span class="nb">local</span><span class="o">[</span>4<span class="o">]</span> --py-files code.py</code></pre></div>
<p>For a complete list of options, run <code>pyspark --help</code>. Behind the scenes,
<code>pyspark</code> invokes the more general <a href="submitting-applications.html"><code>spark-submit</code> script</a>.</p>
<p>It is also possible to launch the PySpark shell in <a href="http://ipython.org">IPython</a>, the
enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
use IPython, set the <code>PYSPARK_DRIVER_PYTHON</code> variable to <code>ipython</code> when running <code>bin/pyspark</code>:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ PYSPARK_DRIVER_PYTHON</span><span class="o">=</span>ipython ./bin/pyspark</code></pre></div>
<p>You can customize the <code>ipython</code> command by setting <code>PYSPARK_DRIVER_PYTHON_OPTS</code>. For example, to launch
the <a href="http://ipython.org/notebook.html">IPython Notebook</a> with PyLab plot support:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash"><span class="nv">$ PYSPARK_DRIVER_PYTHON</span><span class="o">=</span>ipython <span class="nv">PYSPARK_DRIVER_PYTHON_OPTS</span><span class="o">=</span><span class="s2">"notebook"</span> ./bin/pyspark</code></pre></div>
<p>After the IPython Notebook server is launched, you can create a new “Python 2” notebook from
the “Files” tab. Inside the notebook, you can input the command <code>%pylab inline</code> as part of
your notebook before you start to try Spark from the IPython notebook.</p>
</div>
</div>
<h1 id="resilient-distributed-datasets-rdds">Resilient Distributed Datasets (RDDs)</h1>
<p>Spark revolves around the concept of a <em>resilient distributed dataset</em> (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: <em>parallelizing</em>
an existing collection in your driver program, or referencing a dataset in an external storage system, such as a
shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.</p>
<h2 id="parallelized-collections">Parallelized Collections</h2>
<div class="codetabs">
<div data-lang="scala">
<p>Parallelized collections are created by calling <code>SparkContext</code>’s <code>parallelize</code> method on an existing collection in your driver program (a Scala <code>Seq</code>). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">data</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">)</span>
<span class="k">val</span> <span class="n">distData</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">)</span></code></pre></div>
<p>Once created, the distributed dataset (<code>distData</code>) can be operated on in parallel. For example, we might call <code>distData.reduce((a, b) => a + b)</code> to add up the elements of the array. We describe operations on distributed datasets later on.</p>
</div>
<div data-lang="java">
<p>Parallelized collections are created by calling <code>JavaSparkContext</code>’s <code>parallelize</code> method on an existing <code>Collection</code> in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">data</span> <span class="o">=</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span>
<span class="n">JavaRDD</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">distData</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">);</span></code></pre></div>
<p>Once created, the distributed dataset (<code>distData</code>) can be operated on in parallel. For example, we might call <code>distData.reduce((a, b) -> a + b)</code> to add up the elements of the list.
We describe operations on distributed datasets later on.</p>
<p><strong>Note:</strong> <em>In this guide, we’ll often use the concise Java 8 lambda syntax to specify Java functions, but
in older versions of Java you can implement the interfaces in the
<a href="api/java/index.html?org/apache/spark/api/java/function/package-summary.html">org.apache.spark.api.java.function</a> package.
We describe <a href="#passing-functions-to-spark">passing functions to Spark</a> in more detail below.</em></p>
</div>
<div data-lang="python">
<p>Parallelized collections are created by calling <code>SparkContext</code>’s <code>parallelize</code> method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">data</span> <span class="o">=</span> <span class="p">[</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">,</span> <span class="mi">4</span><span class="p">,</span> <span class="mi">5</span><span class="p">]</span>
<span class="n">distData</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">data</span><span class="p">)</span></code></pre></div>
<p>Once created, the distributed dataset (<code>distData</code>) can be operated on in parallel. For example, we can call <code>distData.reduce(lambda a, b: a + b)</code> to add up the elements of the list.
We describe operations on distributed datasets later on.</p>
</div>
</div>
<p>One important parameter for parallel collections is the number of <em>partitions</em> to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to <code>parallelize</code> (e.g. <code>sc.parallelize(data, 10)</code>). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.</p>
<h2 id="external-datasets">External Datasets</h2>
<div class="codetabs">
<div data-lang="scala">
<p>Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, <a href="http://wiki.apache.org/hadoop/AmazonS3">Amazon S3</a>, etc. Spark supports text files, <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, and any other Hadoop <a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">InputFormat</a>.</p>
<p>Text file RDDs can be created using <code>SparkContext</code>’s <code>textFile</code> method. This method takes an URI for the file (either a local path on the machine, or a <code>hdfs://</code>, <code>s3n://</code>, etc URI) and reads it as a collection of lines. Here is an example invocation:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">scala</span><span class="o">></span> <span class="k">val</span> <span class="n">distFile</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">)</span>
<span class="n">distFile</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="nc">MappedRDD</span><span class="k">@</span><span class="mi">1</span><span class="n">d4cee08</span></code></pre></div>
<p>Once created, <code>distFile</code> can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the <code>map</code> and <code>reduce</code> operations as follows: <code>distFile.map(s => s.length).reduce((a, b) => a + b)</code>.</p>
<p>Some notes on reading files with Spark:</p>
<ul>
<li>
<p>If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.</p>
</li>
<li>
<p>All of Spark’s file-based input methods, including <code>textFile</code>, support running on directories, compressed files, and wildcards as well. For example, you can use <code>textFile("/my/directory")</code>, <code>textFile("/my/directory/*.txt")</code>, and <code>textFile("/my/directory/*.gz")</code>.</p>
</li>
<li>
<p>The <code>textFile</code> method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.</p>
</li>
</ul>
<p>Apart from text files, Spark’s Scala API also supports several other data formats:</p>
<ul>
<li>
<p><code>SparkContext.wholeTextFiles</code> lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code>textFile</code>, which would return one record per line in each file.</p>
</li>
<li>
<p>For <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, use SparkContext’s <code>sequenceFile[K, V]</code> method where <code>K</code> and <code>V</code> are the types of key and values in the file. These should be subclasses of Hadoop’s <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html">Writable</a> interface, like <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html">IntWritable</a> and <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html">Text</a>. In addition, Spark allows you to specify native types for a few common Writables; for example, <code>sequenceFile[Int, String]</code> will automatically read IntWritables and Texts.</p>
</li>
<li>
<p>For other Hadoop InputFormats, you can use the <code>SparkContext.hadoopRDD</code> method, which takes an arbitrary <code>JobConf</code> and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use <code>SparkContext.newAPIHadoopRDD</code> for InputFormats based on the “new” MapReduce API (<code>org.apache.hadoop.mapreduce</code>).</p>
</li>
<li>
<p><code>RDD.saveAsObjectFile</code> and <code>SparkContext.objectFile</code> support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.</p>
</li>
</ul>
</div>
<div data-lang="java">
<p>Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, <a href="http://wiki.apache.org/hadoop/AmazonS3">Amazon S3</a>, etc. Spark supports text files, <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, and any other Hadoop <a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">InputFormat</a>.</p>
<p>Text file RDDs can be created using <code>SparkContext</code>’s <code>textFile</code> method. This method takes an URI for the file (either a local path on the machine, or a <code>hdfs://</code>, <code>s3n://</code>, etc URI) and reads it as a collection of lines. Here is an example invocation:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">distFile</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span></code></pre></div>
<p>Once created, <code>distFile</code> can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the <code>map</code> and <code>reduce</code> operations as follows: <code>distFile.map(s -> s.length()).reduce((a, b) -> a + b)</code>.</p>
<p>Some notes on reading files with Spark:</p>
<ul>
<li>
<p>If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.</p>
</li>
<li>
<p>All of Spark’s file-based input methods, including <code>textFile</code>, support running on directories, compressed files, and wildcards as well. For example, you can use <code>textFile("/my/directory")</code>, <code>textFile("/my/directory/*.txt")</code>, and <code>textFile("/my/directory/*.gz")</code>.</p>
</li>
<li>
<p>The <code>textFile</code> method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.</p>
</li>
</ul>
<p>Apart from text files, Spark’s Java API also supports several other data formats:</p>
<ul>
<li>
<p><code>JavaSparkContext.wholeTextFiles</code> lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code>textFile</code>, which would return one record per line in each file.</p>
</li>
<li>
<p>For <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, use SparkContext’s <code>sequenceFile[K, V]</code> method where <code>K</code> and <code>V</code> are the types of key and values in the file. These should be subclasses of Hadoop’s <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Writable.html">Writable</a> interface, like <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/IntWritable.html">IntWritable</a> and <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/Text.html">Text</a>.</p>
</li>
<li>
<p>For other Hadoop InputFormats, you can use the <code>JavaSparkContext.hadoopRDD</code> method, which takes an arbitrary <code>JobConf</code> and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use <code>JavaSparkContext.newAPIHadoopRDD</code> for InputFormats based on the “new” MapReduce API (<code>org.apache.hadoop.mapreduce</code>).</p>
</li>
<li>
<p><code>JavaRDD.saveAsObjectFile</code> and <code>JavaSparkContext.objectFile</code> support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.</p>
</li>
</ul>
</div>
<div data-lang="python">
<p>PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, <a href="http://wiki.apache.org/hadoop/AmazonS3">Amazon S3</a>, etc. Spark supports text files, <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html">SequenceFiles</a>, and any other Hadoop <a href="http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html">InputFormat</a>.</p>
<p>Text file RDDs can be created using <code>SparkContext</code>’s <code>textFile</code> method. This method takes an URI for the file (either a local path on the machine, or a <code>hdfs://</code>, <code>s3n://</code>, etc URI) and reads it as a collection of lines. Here is an example invocation:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">>>></span> <span class="n">distFile</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span></code></pre></div>
<p>Once created, <code>distFile</code> can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the <code>map</code> and <code>reduce</code> operations as follows: <code>distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)</code>.</p>
<p>Some notes on reading files with Spark:</p>
<ul>
<li>
<p>If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.</p>
</li>
<li>
<p>All of Spark’s file-based input methods, including <code>textFile</code>, support running on directories, compressed files, and wildcards as well. For example, you can use <code>textFile("/my/directory")</code>, <code>textFile("/my/directory/*.txt")</code>, and <code>textFile("/my/directory/*.gz")</code>.</p>
</li>
<li>
<p>The <code>textFile</code> method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.</p>
</li>
</ul>
<p>Apart from text files, Spark’s Python API also supports several other data formats:</p>
<ul>
<li>
<p><code>SparkContext.wholeTextFiles</code> lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with <code>textFile</code>, which would return one record per line in each file.</p>
</li>
<li>
<p><code>RDD.saveAsPickleFile</code> and <code>SparkContext.pickleFile</code> support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.</p>
</li>
<li>
<p>SequenceFile and Hadoop Input/Output Formats</p>
</li>
</ul>
<p><strong>Note</strong> this feature is currently marked <code>Experimental</code> and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.</p>
<p><strong>Writable Support</strong></p>
<p>PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the
resulting Java objects using <a href="https://github.com/irmen/Pyrolite/">Pyrolite</a>. When saving an RDD of key-value pairs to SequenceFile,
PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following
Writables are automatically converted:</p>
<table class="table">
<tr><th>Writable Type</th><th>Python Type</th></tr>
<tr><td>Text</td><td>unicode str</td></tr>
<tr><td>IntWritable</td><td>int</td></tr>
<tr><td>FloatWritable</td><td>float</td></tr>
<tr><td>DoubleWritable</td><td>float</td></tr>
<tr><td>BooleanWritable</td><td>bool</td></tr>
<tr><td>BytesWritable</td><td>bytearray</td></tr>
<tr><td>NullWritable</td><td>None</td></tr>
<tr><td>MapWritable</td><td>dict</td></tr>
</table>
<p>Arrays are not handled out-of-the-box. Users need to specify custom <code>ArrayWritable</code> subtypes when reading or writing. When writing,
users also need to specify custom converters that convert arrays to custom <code>ArrayWritable</code> subtypes. When reading, the default
converter will convert custom <code>ArrayWritable</code> subtypes to Java <code>Object[]</code>, which then get pickled to Python tuples. To get
Python <code>array.array</code> for arrays of primitive types, users need to specify custom converters.</p>
<p><strong>Saving and Loading SequenceFiles</strong></p>
<p>Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value
classes can be specified, but for standard Writables this is not required.</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="o">>>></span> <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="nb">range</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">4</span><span class="p">))</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">x</span><span class="p">:</span> <span class="p">(</span><span class="n">x</span><span class="p">,</span> <span class="s">"a"</span> <span class="o">*</span> <span class="n">x</span> <span class="p">))</span>
<span class="o">>>></span> <span class="n">rdd</span><span class="o">.</span><span class="n">saveAsSequenceFile</span><span class="p">(</span><span class="s">"path/to/file"</span><span class="p">)</span>
<span class="o">>>></span> <span class="nb">sorted</span><span class="p">(</span><span class="n">sc</span><span class="o">.</span><span class="n">sequenceFile</span><span class="p">(</span><span class="s">"path/to/file"</span><span class="p">)</span><span class="o">.</span><span class="n">collect</span><span class="p">())</span>
<span class="p">[(</span><span class="mi">1</span><span class="p">,</span> <span class="s">u'a'</span><span class="p">),</span> <span class="p">(</span><span class="mi">2</span><span class="p">,</span> <span class="s">u'aa'</span><span class="p">),</span> <span class="p">(</span><span class="mi">3</span><span class="p">,</span> <span class="s">u'aaa'</span><span class="p">)]</span></code></pre></div>
<p><strong>Saving and Loading Other Hadoop Input/Output Formats</strong></p>
<p>PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both ‘new’ and ‘old’ Hadoop MapReduce APIs.
If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the
Elasticsearch ESInputFormat:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="err">$</span> <span class="n">SPARK_CLASSPATH</span><span class="o">=/</span><span class="n">path</span><span class="o">/</span><span class="n">to</span><span class="o">/</span><span class="n">elasticsearch</span><span class="o">-</span><span class="n">hadoop</span><span class="o">.</span><span class="n">jar</span> <span class="o">./</span><span class="nb">bin</span><span class="o">/</span><span class="n">pyspark</span>
<span class="o">>>></span> <span class="n">conf</span> <span class="o">=</span> <span class="p">{</span><span class="s">"es.resource"</span> <span class="p">:</span> <span class="s">"index/type"</span><span class="p">}</span> <span class="c"># assume Elasticsearch is running on localhost defaults</span>
<span class="o">>>></span> <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">newAPIHadoopRDD</span><span class="p">(</span><span class="s">"org.elasticsearch.hadoop.mr.EsInputFormat"</span><span class="p">,</span>\
<span class="s">"org.apache.hadoop.io.NullWritable"</span><span class="p">,</span> <span class="s">"org.elasticsearch.hadoop.mr.LinkedMapWritable"</span><span class="p">,</span> <span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">)</span>
<span class="o">>>></span> <span class="n">rdd</span><span class="o">.</span><span class="n">first</span><span class="p">()</span> <span class="c"># the result is a MapWritable that is converted to a Python dict</span>
<span class="p">(</span><span class="s">u'Elasticsearch ID'</span><span class="p">,</span>
<span class="p">{</span><span class="s">u'field1'</span><span class="p">:</span> <span class="bp">True</span><span class="p">,</span>
<span class="s">u'field2'</span><span class="p">:</span> <span class="s">u'Some Text'</span><span class="p">,</span>
<span class="s">u'field3'</span><span class="p">:</span> <span class="mi">12345</span><span class="p">})</span></code></pre></div>
<p>Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and
the key and value classes can easily be converted according to the above table,
then this approach should work well for such cases.</p>
<p>If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to
transform that data on the Scala/Java side to something which can be handled by Pyrolite’s pickler.
A <a href="api/scala/index.html#org.apache.spark.api.python.Converter">Converter</a> trait is provided
for this. Simply extend this trait and implement your transformation code in the <code>convert</code>
method. Remember to ensure that this class, along with any dependencies required to access your <code>InputFormat</code>, are packaged into your Spark job jar and included on the PySpark
classpath.</p>
<p>See the <a href="https://github.com/apache/spark/tree/master/examples/src/main/python">Python examples</a> and
the <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters">Converter examples</a>
for examples of using Cassandra / HBase <code>InputFormat</code> and <code>OutputFormat</code> with custom converters.</p>
</div>
</div>
<h2 id="rdd-operations">RDD Operations</h2>
<p>RDDs support two types of operations: <em>transformations</em>, which create a new dataset from an existing one, and <em>actions</em>, which return a value to the driver program after running a computation on the dataset. For example, <code>map</code> is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, <code>reduce</code> is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel <code>reduceByKey</code> that returns a distributed dataset).</p>
<p>All transformations in Spark are <i>lazy</i>, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently – for example, we can realize that a dataset created through <code>map</code> will be used in a <code>reduce</code> and return only the result of the <code>reduce</code> to the driver, rather than the larger mapped dataset.</p>
<p>By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also <em>persist</em> an RDD in memory using the <code>persist</code> (or <code>cache</code>) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.</p>
<h3 id="basics">Basics</h3>
<div class="codetabs">
<div data-lang="scala">
<p>To illustrate RDD basics, consider the simple program below:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">)</span>
<span class="k">val</span> <span class="n">lineLengths</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">s</span> <span class="k">=></span> <span class="n">s</span><span class="o">.</span><span class="n">length</span><span class="o">)</span>
<span class="k">val</span> <span class="n">totalLength</span> <span class="k">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="n">reduce</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">)</span></code></pre></div>
<p>The first line defines a base RDD from an external file. This dataset is not loaded in memory or
otherwise acted on: <code>lines</code> is merely a pointer to the file.
The second line defines <code>lineLengths</code> as the result of a <code>map</code> transformation. Again, <code>lineLengths</code>
is <em>not</em> immediately computed, due to laziness.
Finally, we run <code>reduce</code>, which is an action. At this point Spark breaks the computation into tasks
to run on separate machines, and each machine runs both its part of the map and a local reduction,
returning only its answer to the driver program.</p>
<p>If we also wanted to use <code>lineLengths</code> again later, we could add:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="n">lineLengths</span><span class="o">.</span><span class="n">persist</span><span class="o">()</span></code></pre></div>
<p>before the <code>reduce</code>, which would cause <code>lineLengths</code> to be saved in memory after the first time it is computed.</p>
</div>
<div data-lang="java">
<p>To illustrate RDD basics, consider the simple program below:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span>
<span class="n">JavaRDD</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">s</span> <span class="o">-></span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">());</span>
<span class="kt">int</span> <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="na">reduce</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="o">-></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">);</span></code></pre></div>
<p>The first line defines a base RDD from an external file. This dataset is not loaded in memory or
otherwise acted on: <code>lines</code> is merely a pointer to the file.
The second line defines <code>lineLengths</code> as the result of a <code>map</code> transformation. Again, <code>lineLengths</code>
is <em>not</em> immediately computed, due to laziness.
Finally, we run <code>reduce</code>, which is an action. At this point Spark breaks the computation into tasks
to run on separate machines, and each machine runs both its part of the map and a local reduction,
returning only its answer to the driver program.</p>
<p>If we also wanted to use <code>lineLengths</code> again later, we could add:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">lineLengths</span><span class="o">.</span><span class="na">persist</span><span class="o">(</span><span class="n">StorageLevel</span><span class="o">.</span><span class="na">MEMORY_ONLY</span><span class="o">());</span></code></pre></div>
<p>before the <code>reduce</code>, which would cause <code>lineLengths</code> to be saved in memory after the first time it is computed.</p>
</div>
<div data-lang="python">
<p>To illustrate RDD basics, consider the simple program below:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"data.txt"</span><span class="p">)</span>
<span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="nb">len</span><span class="p">(</span><span class="n">s</span><span class="p">))</span>
<span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="n">reduce</span><span class="p">(</span><span class="k">lambda</span> <span class="n">a</span><span class="p">,</span> <span class="n">b</span><span class="p">:</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="p">)</span></code></pre></div>
<p>The first line defines a base RDD from an external file. This dataset is not loaded in memory or
otherwise acted on: <code>lines</code> is merely a pointer to the file.
The second line defines <code>lineLengths</code> as the result of a <code>map</code> transformation. Again, <code>lineLengths</code>
is <em>not</em> immediately computed, due to laziness.
Finally, we run <code>reduce</code>, which is an action. At this point Spark breaks the computation into tasks
to run on separate machines, and each machine runs both its part of the map and a local reduction,
returning only its answer to the driver program.</p>
<p>If we also wanted to use <code>lineLengths</code> again later, we could add:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">lineLengths</span><span class="o">.</span><span class="n">persist</span><span class="p">()</span></code></pre></div>
<p>before the <code>reduce</code>, which would cause <code>lineLengths</code> to be saved in memory after the first time it is computed.</p>
</div>
</div>
<h3 id="passing-functions-to-spark">Passing Functions to Spark</h3>
<div class="codetabs">
<div data-lang="scala">
<p>Spark’s API relies heavily on passing functions in the driver program to run on the cluster.
There are two recommended ways to do this:</p>
<ul>
<li><a href="http://docs.scala-lang.org/tutorials/tour/anonymous-function-syntax.html">Anonymous function syntax</a>,
which can be used for short pieces of code.</li>
<li>Static methods in a global singleton object. For example, you can define <code>object MyFunctions</code> and then
pass <code>MyFunctions.func1</code>, as follows:</li>
</ul>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">object</span> <span class="nc">MyFunctions</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">func1</span><span class="o">(</span><span class="n">s</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
<span class="o">}</span>
<span class="n">myRdd</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">MyFunctions</span><span class="o">.</span><span class="n">func1</span><span class="o">)</span></code></pre></div>
<p>Note that while it is also possible to pass a reference to a method in a class instance (as opposed to
a singleton object), this requires sending the object that contains that class along with the method.
For example, consider:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MyClass</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">func1</span><span class="o">(</span><span class="n">s</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
<span class="k">def</span> <span class="n">doStuff</span><span class="o">(</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> <span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">func1</span><span class="o">)</span> <span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>Here, if we create a <code>new MyClass</code> and call <code>doStuff</code> on it, the <code>map</code> inside there references the
<code>func1</code> method <em>of that <code>MyClass</code> instance</em>, so the whole object needs to be sent to the cluster. It is
similar to writing <code>rdd.map(x => this.func1(x))</code>.</p>
<p>In a similar way, accessing fields of the outer object will reference the whole object:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">MyClass</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">field</span> <span class="k">=</span> <span class="s">"Hello"</span>
<span class="k">def</span> <span class="n">doStuff</span><span class="o">(</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> <span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="n">field</span> <span class="o">+</span> <span class="n">x</span><span class="o">)</span> <span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>is equivalent to writing <code>rdd.map(x => this.field + x)</code>, which references all of <code>this</code>. To avoid this
issue, the simplest way is to copy <code>field</code> into a local variable instead of accessing it externally:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">doStuff</span><span class="o">(</span><span class="n">rdd</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">String</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span>
<span class="k">val</span> <span class="n">field_</span> <span class="k">=</span> <span class="k">this</span><span class="o">.</span><span class="n">field</span>
<span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="n">field_</span> <span class="o">+</span> <span class="n">x</span><span class="o">)</span>
<span class="o">}</span></code></pre></div>
</div>
<div data-lang="java">
<p>Spark’s API relies heavily on passing functions in the driver program to run on the cluster.
In Java, functions are represented by classes implementing the interfaces in the
<a href="api/java/index.html?org/apache/spark/api/java/function/package-summary.html">org.apache.spark.api.java.function</a> package.
There are two ways to create such functions:</p>
<ul>
<li>Implement the Function interfaces in your own class, either as an anonymous inner class or a named one,
and pass an instance of it to Spark.</li>
<li>In Java 8, use <a href="http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html">lambda expressions</a>
to concisely define an implementation.</li>
</ul>
<p>While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs
in long-form. For example, we could have written our code above as follows:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span>
<span class="n">JavaRDD</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">();</span> <span class="o">}</span>
<span class="o">});</span>
<span class="kt">int</span> <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="n">Integer</span> <span class="n">a</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">b</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">;</span> <span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>Or, if writing the functions inline is unwieldy:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">class</span> <span class="nc">GetLength</span> <span class="kd">implements</span> <span class="n">Function</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">();</span> <span class="o">}</span>
<span class="o">}</span>
<span class="kd">class</span> <span class="nc">Sum</span> <span class="kd">implements</span> <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="o">{</span>
<span class="kd">public</span> <span class="n">Integer</span> <span class="nf">call</span><span class="o">(</span><span class="n">Integer</span> <span class="n">a</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">b</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">;</span> <span class="o">}</span>
<span class="o">}</span>
<span class="n">JavaRDD</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span>
<span class="n">JavaRDD</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">lineLengths</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="nf">GetLength</span><span class="o">());</span>
<span class="kt">int</span> <span class="n">totalLength</span> <span class="o">=</span> <span class="n">lineLengths</span><span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="k">new</span> <span class="nf">Sum</span><span class="o">());</span></code></pre></div>
<p>Note that anonymous inner classes in Java can also access variables in the enclosing scope as long
as they are marked <code>final</code>. Spark will ship copies of these variables to each worker node as it does
for other languages.</p>
</div>
<div data-lang="python">
<p>Spark’s API relies heavily on passing functions in the driver program to run on the cluster.
There are three recommended ways to do this:</p>
<ul>
<li><a href="https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions">Lambda expressions</a>,
for simple functions that can be written as an expression. (Lambdas do not support multi-statement
functions or statements that do not return a value.)</li>
<li>Local <code>def</code>s inside the function calling into Spark, for longer code.</li>
<li>Top-level functions in a module.</li>
</ul>
<p>For example, to pass a longer function than can be supported using a <code>lambda</code>, consider
the code below:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="sd">"""MyScript.py"""</span>
<span class="k">if</span> <span class="n">__name__</span> <span class="o">==</span> <span class="s">"__main__"</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">myFunc</span><span class="p">(</span><span class="n">s</span><span class="p">):</span>
<span class="n">words</span> <span class="o">=</span> <span class="n">s</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s">" "</span><span class="p">)</span>
<span class="k">return</span> <span class="nb">len</span><span class="p">(</span><span class="n">words</span><span class="p">)</span>
<span class="n">sc</span> <span class="o">=</span> <span class="n">SparkContext</span><span class="p">(</span><span class="o">...</span><span class="p">)</span>
<span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="p">(</span><span class="s">"file.txt"</span><span class="p">)</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">myFunc</span><span class="p">)</span></code></pre></div>
<p>Note that while it is also possible to pass a reference to a method in a class instance (as opposed to
a singleton object), this requires sending the object that contains that class along with the method.
For example, consider:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">MyClass</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">func</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">s</span><span class="p">):</span>
<span class="k">return</span> <span class="n">s</span>
<span class="k">def</span> <span class="nf">doStuff</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdd</span><span class="p">):</span>
<span class="k">return</span> <span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">func</span><span class="p">)</span></code></pre></div>
<p>Here, if we create a <code>new MyClass</code> and call <code>doStuff</code> on it, the <code>map</code> inside there references the
<code>func</code> method <em>of that <code>MyClass</code> instance</em>, so the whole object needs to be sent to the cluster.</p>
<p>In a similar way, accessing fields of the outer object will reference the whole object:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">class</span> <span class="nc">MyClass</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">field</span> <span class="o">=</span> <span class="s">"Hello"</span>
<span class="k">def</span> <span class="nf">doStuff</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdd</span><span class="p">):</span>
<span class="k">return</span> <span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">field</span> <span class="o">+</span> <span class="n">s</span><span class="p">)</span></code></pre></div>
<p>To avoid this issue, the simplest way is to copy <code>field</code> into a local variable instead
of accessing it externally:</p>
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="k">def</span> <span class="nf">doStuff</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">rdd</span><span class="p">):</span>
<span class="n">field</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">field</span>
<span class="k">return</span> <span class="n">rdd</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">s</span><span class="p">:</span> <span class="n">field</span> <span class="o">+</span> <span class="n">s</span><span class="p">)</span></code></pre></div>
</div>
</div>
<h3 id="understanding-closures-a-nameclosureslinka">Understanding closures <a name="ClosuresLink"></a></h3>
<p>One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses <code>foreach()</code> to increment a counter, but similar issues can occur for other operations as well.</p>
<h4 id="example">Example</h4>
<p>Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in <code>local</code> mode (<code>--master = local[n]</code>) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):</p>
<div class="codetabs">
<div data-lang="scala">
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">var</span> <span class="n">counter</span> <span class="k">=</span> <span class="mi">0</span>
<span class="k">var</span> <span class="n">rdd</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">)</span>
<span class="c1">// Wrong: Don't do this!!</span>
<span class="n">rdd</span><span class="o">.</span><span class="n">foreach</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span><span class="o">)</span>
<span class="n">println</span><span class="o">(</span><span class="s">"Counter value: "</span> <span class="o">+</span> <span class="n">counter</span><span class="o">)</span></code></pre></div>
</div>
<div data-lang="java">
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kt">int</span> <span class="n">counter</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="n">JavaRDD</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="na">parallelize</span><span class="o">(</span><span class="n">data</span><span class="o">);</span>
<span class="c1">// Wrong: Don't do this!!</span>
<span class="n">rdd</span><span class="o">.</span><span class="na">foreach</span><span class="o">(</span><span class="n">x</span> <span class="o">-></span> <span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span><span class="o">);</span>
<span class="n">println</span><span class="o">(</span><span class="s">"Counter value: "</span> <span class="o">+</span> <span class="n">counter</span><span class="o">);</span></code></pre></div>
</div>
<div data-lang="python">
<div class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">counter</span> <span class="o">=</span> <span class="mi">0</span>
<span class="n">rdd</span> <span class="o">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">parallelize</span><span class="p">(</span><span class="n">data</span><span class="p">)</span>
<span class="c"># Wrong: Don't do this!!</span>
<span class="k">def</span> <span class="nf">increment_counter</span><span class="p">(</span><span class="n">x</span><span class="p">):</span>
<span class="k">global</span> <span class="n">counter</span>
<span class="n">counter</span> <span class="o">+=</span> <span class="n">x</span>
<span class="n">rdd</span><span class="o">.</span><span class="n">foreach</span><span class="p">(</span><span class="n">increment_counter</span><span class="p">)</span>
<span class="k">print</span><span class="p">(</span><span class="s">"Counter value: "</span><span class="p">,</span> <span class="n">counter</span><span class="p">)</span></code></pre></div>
</div>
</div>
<h4 id="local-vs-cluster-modes">Local vs. cluster modes</h4>
<p>The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s <strong>closure</strong>. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case <code>foreach()</code>). This closure is serialized and sent to each executor.</p>
<p>The variables within the closure sent to each executor are now copies and thus, when <strong>counter</strong> is referenced within the <code>foreach</code> function, it’s no longer the <strong>counter</strong> on the driver node. There is still a <strong>counter</strong> in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of <strong>counter</strong> will still be zero since all operations on <strong>counter</strong> were referencing the value within the serialized closure.</p>
<p>In local mode, in some circumstances the <code>foreach</code> function will actually execute within the same JVM as the driver and will reference the same original <strong>counter</strong>, and may actually update it.</p>
<p>To ensure well-defined behavior in these sorts of scenarios one should use an <a href="#accumulators"><code>Accumulator</code></a>. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. </p>
<p>In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.</p>
<h4 id="printing-elements-of-an-rdd">Printing elements of an RDD</h4>
<p>Another common idiom is attempting to print out the elements of an RDD using <code>rdd.foreach(println)</code> or <code>rdd.map(println)</code>. On a single machine, this will generate the expected output and print all the RDD’s elements. However, in <code>cluster</code> mode, the output to <code>stdout</code> being called by the executors is now writing to the executor’s <code>stdout</code> instead, not the one on the driver, so <code>stdout</code> on the driver won’t show these! To print all elements on the driver, one can use the <code>collect()</code> method to first bring the RDD to the driver node thus: <code>rdd.collect().foreach(println)</code>. This can cause the driver to run out of memory, though, because <code>collect()</code> fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the <code>take()</code>: <code>rdd.take(100).foreach(println)</code>.</p>
<h3 id="working-with-key-value-pairs">Working with Key-Value Pairs</h3>
<div class="codetabs">
<div data-lang="scala">
<p>While most Spark operations work on RDDs containing any type of objects, a few special operations are
only available on RDDs of key-value pairs.
The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements
by a key.</p>
<p>In Scala, these operations are automatically available on RDDs containing
<a href="http://www.scala-lang.org/api/2.11.7/index.html#scala.Tuple2">Tuple2</a> objects
(the built-in tuples in the language, created by simply writing <code>(a, b)</code>). The key-value pair operations are available in the
<a href="api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions">PairRDDFunctions</a> class,
which automatically wraps around an RDD of tuples.</p>
<p>For example, the following code uses the <code>reduceByKey</code> operation on key-value pairs to count how
many times each line of text occurs in a file:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">)</span>
<span class="k">val</span> <span class="n">pairs</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">s</span> <span class="k">=></span> <span class="o">(</span><span class="n">s</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
<span class="k">val</span> <span class="n">counts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKey</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="k">=></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">)</span></code></pre></div>
<p>We could also use <code>counts.sortByKey()</code>, for example, to sort the pairs alphabetically, and finally
<code>counts.collect()</code> to bring them back to the driver program as an array of objects.</p>
<p><strong>Note:</strong> when using custom objects as the key in key-value pair operations, you must be sure that a
custom <code>equals()</code> method is accompanied with a matching <code>hashCode()</code> method. For full details, see
the contract outlined in the <a href="http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()">Object.hashCode()
documentation</a>.</p>
</div>
<div data-lang="java">
<p>While most Spark operations work on RDDs containing any type of objects, a few special operations are
only available on RDDs of key-value pairs.
The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements
by a key.</p>
<p>In Java, key-value pairs are represented using the
<a href="http://www.scala-lang.org/api/2.11.7/index.html#scala.Tuple2">scala.Tuple2</a> class
from the Scala standard library. You can simply call <code>new Tuple2(a, b)</code> to create a tuple, and access
its fields later with <code>tuple._1()</code> and <code>tuple._2()</code>.</p>
<p>RDDs of key-value pairs are represented by the
<a href="api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html">JavaPairRDD</a> class. You can construct
JavaPairRDDs from JavaRDDs using special versions of the <code>map</code> operations, like
<code>mapToPair</code> and <code>flatMapToPair</code>. The JavaPairRDD will have both standard RDD functions and special
key-value ones.</p>
<p>For example, the following code uses the <code>reduceByKey</code> operation on key-value pairs to count how
many times each line of text occurs in a file:</p>
<div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nc">JavaRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">lines</span> <span class="k">=</span> <span class="n">sc</span><span class="o">.</span><span class="n">textFile</span><span class="o">(</span><span class="s">"data.txt"</span><span class="o">);</span>
<span class="nc">JavaPairRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">></span> <span class="n">pairs</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">mapToPair</span><span class="o">(</span><span class="n">s</span> <span class="o">-></span> <span class="k">new</span> <span class="nc">Tuple2</span><span class="o">(</span><span class="n">s</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span>
<span class="nc">JavaPairRDD</span><span class="o"><</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">></span> <span class="n">counts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKey</span><span class="o">((</span><span class="n">a</span><span class="o">,</span> <span class="n">b</span><span class="o">)</span> <span class="o">-></span> <span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">);</span></code></pre></div>