-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstreaming-kafka-integration.html
executable file
·398 lines (326 loc) · 25.1 KB
/
streaming-kafka-integration.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<title>Spark Streaming + Kafka Integration Guide - Spark 2.0.0 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
padding-top: 60px;
padding-bottom: 40px;
}
</style>
<meta name="viewport" content="width=device-width">
<link rel="stylesheet" href="css/bootstrap-responsive.min.css">
<link rel="stylesheet" href="css/main.css">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
</head>
<body>
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<div class="navbar navbar-fixed-top" id="topbar">
<div class="navbar-inner">
<div class="container">
<div class="brand"><a href="index.html">
<img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">2.0.0</span>
</div>
<ul class="nav">
<!--TODO(andyk): Add class="active" attribute to li some how.-->
<li><a href="index.html">Overview</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
<li class="divider"></li>
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
<li><a href="sql-programming-guide.html">DataFrames, Datasets and SQL</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="sparkr.html">SparkR (R on Spark)</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li>
<li><a href="api/java/index.html">Java</a></li>
<li><a href="api/python/index.html">Python</a></li>
<li><a href="api/R/index.html">R</a></li>
</ul>
</li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="spark-standalone.html">Spark Standalone</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
<li><a href="running-on-yarn.html">YARN</a></li>
</ul>
</li>
<li class="dropdown">
<a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li class="divider"></li>
<li><a href="building-spark.html">Building Spark</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Supplemental+Spark+Projects">Supplemental Projects</a></li>
</ul>
</li>
</ul>
<!--<p class="navbar-text pull-right"><span class="version-text">v2.0.0</span></p>-->
</div>
</div>
</div>
<div class="container-wrapper">
<div class="content" id="content">
<h1 class="title">Spark Streaming + Kafka Integration Guide</h1>
<p><a href="http://kafka.apache.org/">Apache Kafka</a> is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka’s high-level API, and a new experimental approach (introduced in Spark 1.3) without using Receivers. They have different programming models, performance characteristics, and semantics guarantees, so read on for more details.</p>
<h2 id="approach-1-receiver-based-approach">Approach 1: Receiver-based Approach</h2>
<p>This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data. </p>
<p>However, under default configuration, this approach can lose data under failures (see <a href="streaming-programming-guide.html#receiver-reliability">receiver reliability</a>. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the streaming programming guide for more details on Write Ahead Logs.</p>
<p>Next, we discuss how to use this approach in your streaming application.</p>
<ol>
<li>
<p><strong>Linking:</strong> For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>
<pre><code> groupId = org.apache.spark
artifactId = spark-streaming-kafka_2.11
version = 2.0.0
</code></pre>
<p>For Python applications, you will have to add this above library and its dependencies when deploying your application. See the <em>Deploying</em> subsection below.</p>
</li>
<li>
<p><strong>Programming:</strong> In the streaming application code, import <code>KafkaUtils</code> and create an input DStream as follows.</p>
<div class="codetabs">
<div data-lang="scala">
<pre><code> import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
</code></pre>
<p>You can also specify the key and value classes and their corresponding decoder classes using variations of <code>createStream</code>. See the <a href="api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$">API docs</a>
and the <a href="https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala">example</a>.</p>
</div>
<div data-lang="java">
<pre><code> import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);
</code></pre>
<p>You can also specify the key and value classes and their corresponding decoder classes using variations of <code>createStream</code>. See the <a href="api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html">API docs</a>
and the <a href="https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java">example</a>.</p>
</div>
<div data-lang="python">
<pre><code> from pyspark.streaming.kafka import KafkaUtils
kafkaStream = KafkaUtils.createStream(streamingContext, \
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
</code></pre>
<p>By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the <a href="api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils">API docs</a>
and the <a href="https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py">example</a>. </p>
</div>
</div>
<p><strong>Points to remember:</strong></p>
<ul>
<li>
<p>Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the <code>KafkaUtils.createStream()</code> only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.</p>
</li>
<li>
<p>Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.</p>
</li>
<li>
<p>If you have enabled Write Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream to <code>StorageLevel.MEMORY_AND_DISK_SER</code> (that is, use
<code>KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)</code>).</p>
</li>
</ul>
</li>
<li>
<p><strong>Deploying:</strong> As with any Spark applications, <code>spark-submit</code> is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.</p>
<p>For Scala and Java applications, if you are using SBT or Maven for project management, then package <code>spark-streaming-kafka_2.11</code> and its dependencies into the application JAR. Make sure <code>spark-core_2.11</code> and <code>spark-streaming_2.11</code> are marked as <code>provided</code> dependencies as those are already present in a Spark installation. Then use <code>spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide). </p>
<p>For Python applications which lack SBT/Maven project management, <code>spark-streaming-kafka_2.11</code> and its dependencies can be directly added to <code>spark-submit</code> using <code>--packages</code> (see <a href="submitting-applications.html">Application Submission Guide</a>). That is, </p>
<pre><code> ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.11:2.0.0 ...
</code></pre>
<p>Alternatively, you can also download the JAR of the Maven artifact <code>spark-streaming-kafka-assembly</code> from the
<a href="http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.11%22%20AND%20v%3A%222.0.0%22">Maven repository</a> and add it to <code>spark-submit</code> with <code>--jars</code>.</p>
</li>
</ol>
<h2 id="approach-2-direct-approach-no-receivers">Approach 2: Direct Approach (No Receivers)</h2>
<p>This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.</p>
<p>This approach has the following advantages over the receiver-based approach (i.e. Approach 1).</p>
<ul>
<li>
<p><em>Simplified Parallelism:</em> No need to create multiple input Kafka streams and union them. With <code>directStream</code>, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.</p>
</li>
<li>
<p><em>Efficiency:</em> Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.</p>
</li>
<li>
<p><em>Exactly-once semantics:</em> The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see <a href="streaming-programming-guide.html#semantics-of-output-operations">Semantics of output operations</a> in the main programming guide for further information).</p>
</li>
</ul>
<p>Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).</p>
<p>Next, we discuss how to use this approach in your streaming application.</p>
<ol>
<li>
<p><strong>Linking:</strong> This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>
<pre><code> groupId = org.apache.spark
artifactId = spark-streaming-kafka_2.11
version = 2.0.0
</code></pre>
</li>
<li>
<p><strong>Programming:</strong> In the streaming application code, import <code>KafkaUtils</code> and create an input DStream as follows.</p>
<div class="codetabs">
<div data-lang="scala">
<pre><code> import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
</code></pre>
<p>You can also pass a <code>messageHandler</code> to <code>createDirectStream</code> to access <code>MessageAndMetadata</code> that contains metadata about the current message and transform it to any desired type.
See the <a href="api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$">API docs</a>
and the <a href="https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala">example</a>.</p>
</div>
<div data-lang="java">
<pre><code> import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(streamingContext,
[key class], [value class], [key decoder class], [value decoder class],
[map of Kafka parameters], [set of topics to consume]);
</code></pre>
<p>You can also pass a <code>messageHandler</code> to <code>createDirectStream</code> to access <code>MessageAndMetadata</code> that contains metadata about the current message and transform it to any desired type.
See the <a href="api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html">API docs</a>
and the <a href="https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java">example</a>.</p>
</div>
<div data-lang="python">
<pre><code> from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
</code></pre>
<p>You can also pass a <code>messageHandler</code> to <code>createDirectStream</code> to access <code>KafkaMessageAndMetadata</code> that contains metadata about the current message and transform it to any desired type.
By default, the Python API will decode Kafka data as UTF8 encoded strings. You can specify your custom decoding function to decode the byte arrays in Kafka records to any arbitrary data type. See the <a href="api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils">API docs</a>
and the <a href="https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/direct_kafka_wordcount.py">example</a>.</p>
</div>
</div>
<p>In the Kafka parameters, you must specify either <code>metadata.broker.list</code> or <code>bootstrap.servers</code>.
By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration <code>auto.offset.reset</code> in Kafka parameters to <code>smallest</code>, then it will start consuming from the smallest offset. </p>
<p>You can also start consuming from any arbitrary offset using other variations of <code>KafkaUtils.createDirectStream</code>. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following. </p>
<div class="codetabs">
<div data-lang="scala">
<pre><code> // Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}
</code></pre>
</div>
<div data-lang="java">
<pre><code> // Hold a reference to the current offset ranges, so it can be used downstream
final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}
).map(
...
).foreachRDD(
new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws IOException {
for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);
}
...
return null;
}
}
);
</code></pre>
</div>
<div data-lang="python">
<pre><code> offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
</code></pre>
</div>
</div>
<p>You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.</p>
<p>Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().</p>
<p>Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, <a href="configuration.html">configurations</a> of the form <code>spark.streaming.receiver.*</code> ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the <a href="configuration.html">configurations</a> <code>spark.streaming.kafka.*</code>. An important one is <code>spark.streaming.kafka.maxRatePerPartition</code> which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.</p>
</li>
<li>
<p><strong>Deploying:</strong> This is same as the first approach.</p>
</li>
</ol>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-1.8.0.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>