Spark-Streaming实时数据分析

 


1.Spark Streaming功能介绍

1)定义

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams

https://i-blog.csdnimg.cn/blog_migrate/d376f236c6fcd4d3d1df85883ea24370.png

2.NC服务安装并运行Spark Streaming

1)在线安装nc命令

  • rpm –ivh nc-1.84-22.el6.x86_64.rpm(优先选择)

#安装

上传nc-1.84-22.el6.x86_64.rpm包到software目录,再安装

[kfk@bigdata-pro02 softwares]$ sudo rpm -ivh nc-1.84-22.el6.x86_64.rpm
Preparing...                ########################################### [100%]
   1:nc                     ########################################### [100%]

[kfk@bigdata-pro02 softwares]$ which nc
/usr/bin/n

#启动

nc -lk 9999(类似于一个接收器)

启动之后在下边可以进行数据输入,然后就能够从spark端进行词频统计(如2)所示)

  • yum install -y nc

2)运行Spark Streaming 的WordCount

bin/run-example --master local[2] streaming.NetworkWordCount localhost 9999

#数据输入

#结果统计

注:把日志级别调整为WARN才能出现以上效果,否则会被日志覆盖,影响观察

3)把文件通过管道作为nc的输入,然后观察spark Streaming计算结果

cat test.txt | nc -lk 9999

文件具体内容

hadoop  storm   spark
hbase   spark   flume
spark   dajiangtai     spark
hdfs    mapreduce      spark
hive    hdfs    solr
spark   flink   storm
hbase   storm   es

3.Spark Streaming工作原理

1)Spark Streaming数据流处理

https://i-blog.csdnimg.cn/blog_migrate/cebbd22ee827a75f07ad661d230ace57.png

2)接收器工作原理

https://i-blog.csdnimg.cn/blog_migrate/480c91f5fd3c1892907e7d5df2a20b9b.png

https://i-blog.csdnimg.cn/blog_migrate/5d52db156bc8726622363b160cccaf40.png

https://i-blog.csdnimg.cn/blog_migrate/aec0fd1a530110466ded6887b9954c97.png

https://i-blog.csdnimg.cn/blog_migrate/b589ef627f8611bbe6eae083cdc4f9ea.png

3)综合工作原理

https://i-blog.csdnimg.cn/blog_migrate/1faa552b727fdf2f80611b6caa899595.png

https://i-blog.csdnimg.cn/blog_migrate/a3b1e85451efe0fb19d2d43d14904d96.png

4.Spark Streaming编程模型

1)StreamingContext初始化的两种方式

#第一种

val ssc = new StreamingContext(sc, Seconds(5))

#第二种

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

2)集群测试

#启动spark

bin/spark-shell --master local[2]

scala> :paste

// Entering paste mode (ctrl-D to finish)

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

// Exiting paste mode, now interpreting.

 

#在nc服务器端输入数据

spark
hive hbase
hadoop hive
hbase hbase
spark hadoop
hive hbase
spark Hadoop

#结果统计</

评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值