spark_入门04学习笔记
1、目标
- 1、掌握sparkStreaming原理和架构
- 2、掌握DStream常用的操作
- 3、掌握sparkStreaming整合flume
- 4、掌握sparkStreaming整合kafka (★★★★★)
2、sparkStreaming概述
2.1 sparkStreaming是什么
- Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
- sparkStreaming是一个可以非常容易构建可扩展、具有容错机制的流式应用程序。
2.2 sparkStreaming特性
- 1、易用性
- 可以开发一个实时处理程序像之前开发批处理一样。
- 可以使用多种不同语言进行代码开发
- java
- scala
- python
- 2、容错性
- sparkStreaming可以保证恰好一次语义
- 数据被处理且只被处理一次
- 避免数据的丢失和数据的重复处理
- 数据被处理且只被处理一次
- sparkStreaming可以在没有额外的代码情况下来恢复一些丢失的工作。
- sparkStreaming可以保证恰好一次语义
- 3、融合到spark生态系统
- sparkStreaming可以结合批处理和交互式查询
2.3 sparkStreaming与storm的区别
SparkStreaming | Storm |
---|---|
![]() | ![]() |
开发语言:Scala | 开发语言:Clojure |
编程模型:DStream | 编程模型:Spout/Bolt |
tips : sparkStreaming没有storm实时性强
3、sparkStreaming内部原理
3.1 sparkStreaming原理
Spark Streaming 是基于spark的流式批处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。
3.2 sparkStreaming计算流程
sparkStreaming按照某一个时间维度把源源不断的数据,划分成了很多个短小的批处理作业,每一个批次处理作业中都是Dstream,Dstream内部封装了RDD,RDD中有分区,分区里面才是真正的数据。
后期对Dstream做相应的转换操作,其本质这些转换操作是作用在它的内部的rdd上。
3.3 sparkStreaming容错性
可以通过rdd中血统记录下rdd上面的转换操作,后期可以通过血统重新计算恢复得到某些rdd丢失的分区数据。
SparkStreaming中底层就是去操作Dstream,Dstream内部封装了rdd,后期对rdd进行大量的转换操作,也是通过血统计算下这些转换信息。sparkStreaming的容错性就是根据血统而来。
sparkStreaming它会把接受到的数据保留2份,------>目的:保证数据源端安全性
血统在进行数据恢复的时候是有条件:
某个rdd的分区的数据丢失:血统+原始的数据源
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
rdd1—>rdd2---->rdd3---->rdd4
3.4 sparkStreaming实时性
storm是来一条数据就处理一条,实时性比较高,也就是延迟是比较低。
sparkStreaming是以某一时间间隔的批处理,实时性比较低,也就是延迟是比较高。
后期再实际企业中,具体使用哪一种实时处理技术,一定要结合当前业务
比如说:公司的高层领导允许数据有一定的延迟,从数据源产生到看到最后的结果数据在一定的时间之内,那么这个时候就可以优先考虑sparkStreaming,如果要求实时性比较高,就可以考虑另一个框架storm。
4、Dstream
4.1 什么是Dstream
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark算子操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据 .
4.2 Dstream操作
- Dstream的操作分为2大类
- transformation(转换)
- 在一个Dstream基础之上调用对应的转换方法生成新的Dstream,它是延迟加载,不会触发任务的真正运行(类似于rdd中transformation)
- outputOperation(输出)
- 它会触发任务的真正运行(类似于action)
- transformation(转换)
- 常用的Transformations on DStreams
Transformation | Meaning |
---|---|
map(func) | 对DStream中的各个元素进行func函数操作,然后返回一个新的DStream |
flatMap(func) | 与map方法类似,只不过各个输入项可以被输出为零个或多个输出项 |
filter(func) | 过滤出所有函数func返回值为true的DStream元素并返回一个新的DStream |
repartition(numPartitions) | 增加或减少DStream中的分区数,从而改变DStream的并行度 |
union(otherStream) | 将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream. |
count() | 通过对DStream中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStream |
reduce(func) | 对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream. |
countByValue() | 对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数 |
reduceByKey(func, [numTasks]) | 利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStream |
join(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W))类型的DStream |
cogroup(otherStream, [numTasks]) | 输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStream |
transform(func) | 通过RDD-to-RDD函数作用于DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD |
updateStateByKey(func) | 根据key的之前状态值和key的新值,对key进行更新,返回一个新状态的DStream |
-
特殊的Transformation
- UpdateStateByKey : 用于记录历史记录,保存上次的状态
- Window : 滑动窗口转换操作
(1)红色的矩形就是一个窗口,窗口框住的是一段时间内的数据流。
(2)这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。
所以基于窗口的操作,需要指定2个参数:
-
window length - The duration of the window (3 in the figure)
-
slide interval - The interval at which the window-based operation is performed (2 in the figure).
a.窗口大小,一段时间内数据的容器。
b.滑动间隔,每隔多久计算一次。
- 常用的Output Operations on DStreams
Output Operation | Meaning |
---|---|
print() | 打印到控制台 |
saveAsTextFiles(prefix, [suffix]) | 保存流的内容为文本文件,文件名为 “prefix-TIME_IN_MS[.suffix]”. |
saveAsObjectFiles(prefix, [suffix]) | 保存流的内容为SequenceFile,文件名为 “prefix-TIME_IN_MS[.suffix]”. |
saveAsHadoopFiles(prefix, [suffix]) | 保存流的内容为hadoop文件,文件名为 “prefix-TIME_IN_MS[.suffix]”. |
foreachRDD(func) | 对Dstream里面的每个RDD执行func |
5、Dstream操作实战
(1)安装并启动生产者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)通过netcat工具向指定的端口发送数据
nc -lk 9999
架构图
分析图
5.1 通过sparkStreaming接受socket数据,实现单词统计
-
1、引入依赖
<!--引入sparkStreaming依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.3</version> </dependency>
-
2、代码开发
package cn.itcast.socket import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} //todo:通过sparkStreaming接受socket数据实现单词统计 object SparkStreamingSocket { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[2]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //4、接受socket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node-1",9999) //5、切分每一行,获取所有的单词 val words: DStream[String] = socketTextStream.flatMap(_.split(" ")) //6、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //8、打印 result.print() //9、开启流式计算 ssc.start() ssc.awaitTermination() } }
5.2 通过sparkStreaming接受socket数据,把所有批次单词统计的累加
-
1、
updateStateByKey
-
2、代码开发
package cn.itcast.socket import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} //todo:通过sparkStreaming接受socket数据实现所有批次单词统计结果累加 object SparkStreamingSocketTotal { //currentValues:把当前批次相同的单词出现的所有的1聚在一起---->(hadoop,1)(hadoop,1)(hadoop,1)----->List(1,1,1) //historyValues:把之前所有批次每一个单词出现总次数获取得到了 Option类型:表示可能有(Some)或者没有(None) def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = { //所有批次单词出现结果累加 val newValue: Int = currentValues.sum + historyValues.getOrElse(0) Some(newValue) } def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketTotal").setMaster("local[2]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //设置一个checkpoint目录,作用:用来存储之前所有批次单词统计的结果 ssc.checkpoint("./socket") //4、接受socket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999) //5、切分每一行,获取所有的单词 val words: DStream[String] = socketTextStream.flatMap(_.split(" ")) //6、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc) //8、打印 result.print() //9、开启流式计算 ssc.start() ssc.awaitTermination() } }
5.3 通过sparkStreaming接受socket数据实现单词统计—开窗函数
-
1、
reduceByKeyAndWindow
-
2、代码开发
package cn.itcast.socket import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} //todo:利用spark中开窗函数来接受socket数据实现单词统计 object SparkStreamingSocketWindow { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketWindow").setMaster("local[2]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //4、接受socket数据 val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999) //5、切分每一行,获取所有的单词 val words: DStream[String] = socketTextStream.flatMap(_.split(" ")) //6、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的1累加 // reduceFunc: (V, V) => V, 就是一个函数 //windowDuration: Duration, 表示窗口的长度 //slideDuration: Duration 表示滑动窗口的时间间隔,在这里它意味着每隔多久计算一次 val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(10)) //8、打印 result.print() //9、开启流式计算 ssc.start() ssc.awaitTermination() } }
tips : 针对于滑动窗口时间间隔和窗口长度以及批处理时间间隔之间的关系
结论 :
(1) 需要保证窗口长度和滑动窗口的时间间隔相同
(2) 需要保证窗口长度和滑动窗口的时间间隔必须是最小批次时间的整数倍
5.4 通过sparkStreaming接受socket数据,实现一定时间内的热门词汇
-
1、
transform
-
2、代码开发
package cn.itcast.socket import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} //todo:通过sparkStreaming接受socket数据实现一定时间内的热门词汇 object SparkStreamingSocketWindowHotWords { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingSocketWindowHotWords").setMaster("local[2]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext val ssc = new StreamingContext(sc,Seconds(5)) //4、读取socket数据 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999) //5、切分每一行 val words: DStream[String] = stream.flatMap(_.split(" ")) //6、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x:Int,y:Int)=>x+y,Seconds(10),Seconds(10)) //8、按照单词出现的次数降序 val sortDstream: DStream[(String, Int)] = result.transform(rdd => { //按照单词的次数降序 val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._2, false) //取出出现次数最多的前3位 val top3: Array[(String, Int)] = sortRDD.take(3) println("------------------top3 start--------------------") top3.foreach(println) println("------------------top3 end-----------------------") sortRDD }) //9、打印 sortDstream.print() //10、开启流式计算 ssc.start() ssc.awaitTermination() } }
6、sparkStreaming整合flume
6.1 poll拉模式(优先考虑
)
-
1、安装flume1.6以上
-
2、需要把spark-streaming-flume-sink_2.11-2.1.3.jar、scala-library-2.11.8.jar拷贝到flume的lib目录下,需要把之前的scala依赖2.10删除掉
-
3、修改flume配置
-
vim flume-poll-spark.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/flumedata a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=node-1 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000
-
4、开发程序
- 引入依赖
<!--引入sparkStreaming与flume整合依赖--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>2.1.3</version> </dependency>
-
代码
package cn.itcast.flume import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} //todo:sparkStreaming整合Flume--------------Poll拉模式 object SparkStreamingFlumePoll { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingFlumePoll").setMaster("local[2]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //4、通过FlumeUtils工具类获取flume中数据 --poll拉模式 val pollingStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,"node-1",8888) //接受多个flume收集到的数据 // val addresses=List(new InetSocketAddress("node-1",8888),new InetSocketAddress("node-2",8888),new InetSocketAddress("node-3",8888)) // val pollingStream1: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,addresses,StorageLevel.MEMORY_AND_DISK_SER_2) // //5、获取到flume中的真实数据----> 数据的最小传输单元event:{"headers":xxxxxx,"body":xxxxxxx} //获取每个event中的body数据 val data: DStream[String] = pollingStream.map(x=>new String(x.event.getBody.array())) //5、切分每一行,获取所有的单词 val words: DStream[String] = data.flatMap(_.split(" ")) //6、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //8、打印 result.print() //9、开启流式计算 ssc.start() ssc.awaitTermination() } }
-
-
5、flume启动脚本(在flume目录下执行)
bin/flume-ng agent -n a1 -c conf -f conf/flume-poll-spark.conf -Dflume.root.logger=info,console
tips : 先启动flume , 在启动idea程序
local[N]此时需要N>1 , 当有多个M个task时 , N最小为M+1
6.2 push推模式
-
1、修改flume配置
-
vim flume-push-spark.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/flumedata a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro #自己本机的ip a1.sinks.k1.hostname=192.168.140.46 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000
-
-
2、代码开发
package cn.itcast.flume import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} //todo:sparkStreaming整合flume----------Push推模式 object SparkStreamingFlumePush { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingFlumePush").setMaster("local[2]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //4、push推模式进行整合 val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"192.168.140.46",8888) //5、获取到flume中的真实数据----> 数据的最小传输单元event:{"headers":xxxxxx,"body":xxxxxxx} //获取每个event中的body数据 val data: DStream[String] = stream.map(x=>new String(x.event.getBody.array())) //5、切分每一行,获取所有的单词 val words: DStream[String] = data.flatMap(_.split(" ")) //6、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //7、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //8、打印 result.print() //9、开启流式计算 ssc.start() ssc.awaitTermination() } }
-
3、flume启动脚本
bin/flume-ng agent -n a1 -c conf -f conf/flume-push-spark.conf -Dflume.root.logger=info,console
tips : 先启动idea程序 , 再启动flume
7、sparkStreaming整合kafka
7.1 KafkaUtils.createStream
它是利用了kafka高层次的api(消息的偏移量保存在zk中)去消费数据,利用receiver接受器去kafka把数据拉取过来,默认这种情况会出现数据丢失,开启WAL日志,把接受到的数据同步的写入到hdfs中,保证数据源端的安全,后期方便于进行数据恢复.
第一套api中:默认数据会丢失,开启WAL日志之后set(“spark.streaming.receiver.writeAheadLog.enable”,“true”),可以保证数据不丢失。
可以保证数据不丢失,但是保证不了数据被处理且只被处理一次。也就是说这一块会出现的数据的重复处理。
数据接收处理之后,去zk更新处理数据的偏移量,这个时候zk由于异常,没有把偏移量更新成功。最后导致数据的重复消费。
首先开启zk , kafka , 创建一个itcast主题 , 在kafka的bin目录下执行
- 查看当前topics列表
./kafka-topics.sh --list --zookeeper node-1:2181,node-2:2181,node-3:2181
- 创建topic
./kafka-topics.sh --create --topic itcast --partitions 3 --replication-factor 2 --zookeeper node-1:2181,node-2:2181,node-3:2181
-
1、代码开发
package cn.itcast.kafka import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import scala.collection.immutable //todo:sparkStreaming整合kafka------------基于recevier接受器 使用了消费者高级api object SparkStreamingKafkaReceiver { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("SparkStreamingKafkaReceiver") .setMaster("local[4]") //开启WAL日志,保证数据源端的安全性,它是把接受到的数据同步写入到hdfs中。 .set("spark.streaming.receiver.writeAheadLog.enable","true") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) //设置checkpoint目录保存接受的到原始数据 ssc.checkpoint("./spark-receiver") //4、指定zk服务地址 val zkQuorum="node-1:2181,node-2:2181,node-3:2181" //消费者组id val groupId="spark-receiver" //指定topic相关信息 key:表示topic的名称,vlaule:表示每一个receiver接收器使用多少个线程去拉取数据 val topics=Map("itcast" -> 1) //(String, String)----> 第一个String就是消息的key,第二个String就是消息的内容本身 //val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topics) //可以构建多个receiver接收器去快速拉取kafka中的topic的数据,下面可以构建3个receiver接收器去拉取数据 val numReceiverDstream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => { val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) stream }) //通过streamingContext对象调用union方法,把每一个recevier接收器产生的Dstream进行合并生成一个新的Dstream val totalReceiverDstream: DStream[(String, String)] = ssc.union(numReceiverDstream) //5、获取消息的内容 val data: DStream[String] = totalReceiverDstream.map(x=>x._2) //6、切分每一行,获取所有的单词 val words: DStream[String] = data.flatMap(_.split(" ")) //7、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //8、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //9、打印 result.print() //10、开启流式计算 ssc.start() ssc.awaitTermination() } }
当往zk上存储的时候zk发生异常了 , 此时offset未更新 , 等再次重启的时候就可能重复处理 , 因此我们采用下面这种方式
7.2 KafkaUtils.createDirectStream
(常用)
-
1、代码开发
package cn.itcast.kafka import kafka.serializer.StringDecoder import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils //todo:sparkStreaming整合kafka-----基于direct 采用了消费低级api 消费的偏移量不在由zk去保存 object SparkStreamingKafkaDirect { def main(args: Array[String]): Unit = { //1、创建SparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("SparkStreamingKafkaDirect") .setMaster("local[4]") //2、创建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn") //3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔 val ssc = new StreamingContext(sc,Seconds(5)) // 设置checkpoint目录,主要是用来保存消息的偏移量 ssc.checkpoint("./spark-direct") //4、指定kafka集群 val kafkaParams=Map("bootstrap.servers"->"node1:9092,node2:9092,node3:9092","group.id" ->"spark-direct") val topics=Set("itcast") //采用了直连的方式连接上每一个topic的分区,把数据读取进来,这个时候产生的Dstream内部的rdd分区数据跟kafka中topic的分区相等 val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) //5、获取topic中数据 val data: DStream[String] = dstream.map(_._2) //6、切分每一行,获取所有的单词 val words: DStream[String] = data.flatMap(_.split(" ")) //7、每个单词计为1 val wordAndOne: DStream[(String, Int)] = words.map((_,1)) //8、相同单词出现的1累加 val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_) //9、打印 result.print() //10、开启流式计算 ssc.start() ssc.awaitTermination() } }
第二套api,这个时候是并没有receiver接收器去拉取数据,它是直接连接上每一个topic分区,把数据获取得到,也并没有把接受到的数据写入到磁盘中。
如何保证数据不丢失和不被重复处理:
(1)就是利用自己去维护消息的偏移量
把数据处理成功和数据偏移保存都是由自己去控制,并且数据的处理成功和数据偏移量的保存必须在同一事务。
- linux上运行代码
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
//todo:sparkStreaming整合kafka-----基于direct 采用了消费低级api 消费的偏移量不在由zk去保存
object SparkStreamingKafkaDirect {
def main(args: Array[String]): Unit = {
//1、创建SparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingKafkaDirect")
//2、创建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")
//3、创建StreamingContext 需要一个sparkContext对象,后面设置sparkStreaming批处理的时间间隔
val ssc = new StreamingContext(sc,Seconds(5))
// 设置checkpoint目录,主要是用来保存消息的偏移量
ssc.checkpoint("/spark-direct")
//4、指定kafka集群
val kafkaParams=Map("bootstrap.servers"->"node1:9092,node2:9092,node3:9092","group.id" ->"spark-direct")
val topics=Set("itcast")
//采用了直连的方式连接上每一个topic的分区,把数据读取进来,这个时候产生的Dstream内部的rdd分区数据跟kafka中topic的分区相等
val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//5、获取topic中数据
val data: DStream[String] = dstream.map(_._2)
//6、切分每一行,获取所有的单词
val words: DStream[String] = data.flatMap(_.split(" "))
//7、每个单词计为1
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//8、相同单词出现的1累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//9、打印
result.print()
//10、开启流式计算
ssc.start()
ssc.awaitTermination()
}
}
-
4、打成jar包提交到集群中运行
spark-submit --master spark://node-1:7077 --class com.itck.spark_kafka.SparkStreamingKafkaDirect --executor-memory 512M --total-executor-cores 1 demo_spark-1.0-SNAPSHOT.jar
对于spark程序的开发,先在本地运行下,看一下程序处理逻辑对不对,如果没有问题,就可以把程序打成jar包提交到集群去运行。
提交到集群中运行的时候,需要指定任务运行时需要的资源大小,这个时候资源到底给多少是合理的????
比较理想的状态:就是在当前批次时间内就把该批次的数据处理完成。如何去看有没达到这个理想的状态:这个时候就可以访问: master主机名:8080
对应sparkStreaming流式处理程序来说,可以找到一个比较理想状态
第一个5s批次数据: ---------------------------------> 需要1分钟。
第二个5s批次数据: ---------------------------------> 需要1分钟。
第三个5s批次数据: ---------------------------------> 需要1分钟。
…
要处理的数据需要的计算时间大于批次的时间间隔,这里后面的批次数据会一直等待资源,同时这里也会出现数据的积压。出现了比较高的延迟。进行大量的测试:
(1) --executor-memory 5g --total-executor-cores 10
(2) --executor-memory 5g --total-executor-cores 20
(3) --executor-memory 10g --total-executor-cores 20通过大量的参数进行测试,找到一个比较好的一组参数,最后就可以通过这个比较合理的这组参数,提交到实际的生产环境中运行。