updateStateByKey
package com.ws.sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCountUpdateState {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val stream: ReceiverInputDStream[String] = ssc.socketTextStream("dream1", 8888)
//java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
// 要使用updateStateByKey 必须要指定checkpoint目录,他是通过checkpoint来保存历史数据的
ssc.checkpoint("data/ck")
val result: DStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 这个函数 (s: Seq[Int], v: Option[Int]) => Option[Int]
// 第一个参数 s 是本次key的值,从多个分区中合并到一个seq里
// 第二个参数 v 是本次key的历史记录
// 我们之前的 加上这一次的 让他累计
val func = (s: Seq[Int], v: Option[Int]) => {
Some(s.sum + v.getOrElse(0))
}
val value: DStream[(String, Int)] = result.updateStateByKey(func)
value.print()
ssc.start()
ssc.awaitTermination()
}
}