1.Spark Streaming功能介绍
1)定义
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams
2.NC服务安装并运行Spark Streaming
1)在线安装nc命令
- rpm –ivh nc-1.84-22.el6.x86_64.rpm(优先选择)
#安装
上传nc-1.84-22.el6.x86_64.rpm包到software目录,再安装
[kfk@bigdata-pro02 softwares]$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm
Preparing... ########################################### [100%]
1:nc ########################################### [100%]
[kfk@bigdata-pro02 softwares]$ which nc
/usr/bin/n
#启动
nc -lk 9999(类似于一个接收器)
启动之后在下边可以进行数据输入,然后就能够从spark端进行词频统计(如2)所示)
- yum install -y nc
2)运行Spark Streaming 的WordCount
bin/run-example --master local[2] streaming.NetworkWordCount localhost 9999
#数据输入
#结果统计
注:把日志级别调整为WARN才能出现以上效果,否则会被日志覆盖,影响观察
3)把文件通过管道作为nc的输入,然后观察spark Streaming计算结果
cat test.txt | nc -lk 9999
文件具体内容
hadoop storm spark
hbase spark flume
spark dajiangtai spark
hdfs mapreduce spark
hive hdfs solr
spark flink storm
hbase storm es
3.Spark Streaming工作原理
1)Spark Streaming数据流处理
2)接收器工作原理
3)综合工作原理
4.Spark Streaming编程模型
1)StreamingContext初始化的两种方式
#第一种
val ssc = new StreamingContext(sc, Seconds(5))
#第二种
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
2)集群测试
#启动spark
bin/spark-shell --master local[2]
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
// Exiting paste mode, now interpreting.
#在nc服务器端输入数据
spark
hive hbase
hadoop hive
hbase hbase
spark hadoop
hive hbase
spark Hadoop
#结果统计</