flink使用processfunction求分数下滑

本文介绍如何使用Apache Flink实现一个KeyedProcessFunction,监测学生分数变化,当分数下滑时记录警告。通过实例展示了如何修改参数类型、获取历史数据和更新状态来实现这一功能。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

 

1. 修改参数类型 找个类型是你要看的下滑值的类型(需要注意继承的是keyprofunction)

2.获取上一次的数据,并进行判断

3.更新一下分数数据

代码

/**
 * @author jiasongfan
 * @date 2022/5/31
 * @apiNote
 */
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.{KeyedProcessFunction, ProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object Test03 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val text = env.socketTextStream("hdp1", 9999)
    val mapDS: DataStream[StuScore] = text.map(line => {
      val li: Array[String] = line.split(",")
      StuScore(li(0), li(1), li(2).trim.toInt,li(3).trim.toLong)
    })

    val keyS: KeyedStream[StuScore, String] = mapDS.keyBy(_.id)
    keyS.process(new MyPro1).print()


    env.execute("Window Stream WordCount")
  }
}
case class StuScore(id:String,obj:String,score:Int,ts:Long)
//<I, O>
class MyPro1 extends KeyedProcessFunction[String,StuScore,String]{
  //修改参数类型
  lazy val state: ValueState[Int] = getRuntimeContext
    .getState(new ValueStateDescriptor[Int]("myState", classOf[Int]))

  override def processElement(i: StuScore, context: KeyedProcessFunction[String, StuScore, String]#Context, collector: Collector[String]): Unit = {
    //获取上一次的数据
    val v: Int = state.value()
    if(i.score<v){
      collector.collect(i.id+"分数下滑, 上一次"+v+"本次 "+i.score )
    }
    //更新数据
    state.update(i.score)
  }
}

数据

001,math,1,1646038440
001,math,2,1646038441
001,math,1,1646038442

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值