3、Spark Streaming的状态操作
在Spark Streaming中存在两种状态操作
- UpdateStateByKey
- Windows操作
使用有状态的transformation,需要开启Checkpoint
- spark streaming 的容错机制
- 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复
3.1 updateStateByKey
Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。
无状态:指的是每个时间片段的数据之间是没有关联的。
需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作
一般超过一天都是用RDD或Spark SQL来进行离线批处理
如果没有UpdateStateByKey,我们需要将每一秒的数据计算好放入mysql中取,再用mysql来进行统计计算
Spark Streaming中提供这种状态保护机制,即updateStateByKey
步骤:
- 首先,要定义一个state,可以是任意的数据类型
- 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
- 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
- 对于每个新出现的key,也会执行state更新函数
举例:词统计。
案例:updateStateByKey
需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来
代码
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python"
JAVA_HOME='/root/bigdata/jdk'
SPARK_HOME = "/root/bigdata/spark"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
# 创建