1. 修改参数类型 找个类型是你要看的下滑值的类型(需要注意继承的是keyprofunction)
2.获取上一次的数据,并进行判断
3.更新一下分数数据
代码
/** * @author jiasongfan * @date 2022/5/31 * @apiNote */ import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector object Test03 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val text = env.socketTextStream("hdp1", 9999) val mapDS: DataStream[StuScore] = text.map(line => { val li: Array[String] = line.split(",") StuScore(li(0), li(1), li(2).trim.toInt,li(3).trim.toLong) }) val keyS: KeyedStream[StuScore, String] = mapDS.keyBy(_.id) keyS.process(new MyPro1).print() env.execute("Window Stream WordCount") } } case class StuScore(id:String,obj:String,score:Int,ts:Long) //<I, O> class MyPro1 extends KeyedProcessFunction[String,StuScore,String]{ //修改参数类型 lazy val state: ValueState[Int] = getRuntimeContext .getState(new ValueStateDescriptor[Int]("myState", classOf[Int])) override def processElement(i: StuScore, context: KeyedProcessFunction[String, StuScore, String]#Context, collector: Collector[String]): Unit = { //获取上一次的数据 val v: Int = state.value() if(i.score<v){ collector.collect(i.id+"分数下滑, 上一次"+v+"本次 "+i.score ) } //更新数据 state.update(i.score) } }
数据
001,math,1,1646038440
001,math,2,1646038441
001,math,1,1646038442