cep 规则

简单的 案例

 

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

import java.util


object Test05 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
 //   env.enableCheckpointing(1000)
    val text = env.socketTextStream("hdp1", 9999)
   
    val mapDS: DataStream[MyLogin] = text.map(t => {
      val li: Array[String] = t.split(",")
      MyLogin (li(0), li(1), li(2), li(3).trim.toLong*1000)
    })

    val timeDS: DataStream[MyLogin] = mapDS.assignAscendingTimestamps(_.ts)


    val pattern = Pattern.begin[MyLogin]("start")
      .where(_.cz.equals("fail"))
      .next("middle").where(_.cz.equals("fail"))
      .within(Time.seconds(5))

    val patternStream = CEP.pattern(timeDS, pattern)
    val ps: DataStream[String] = patternStream.select(new MyLoginSelect)
    ps.print()
    env.execute()

  }
}
case class MyLogin (uid:String,pid:String,cz:String,ts:Long)
class MyLoginSelect extends PatternSelectFunction[MyLogin,String]{
  override def select(map: util.Map[String, util.List[MyLogin]]): String = {
    "连续登录失败"+map
  }
}

2.案例

 

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

import java.util

/**
 * @author jiasongfan
 * @date 2022/6/29
 * @apiNote
 */
object Test06 {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //   env.enableCheckpointing(1000)
    val text = env.socketTextStream("hdp1", 9999)
    val mapDS: DataStream[MyPay] = text.map(t => {
      val li: Array[String] = t.split(",")
      MyPay (li(0), li(1), li(2).trim.toLong*1000)
    })

    val timeDS: DataStream[MyPay] = mapDS.assignAscendingTimestamps(_.ts)
    val keyDS: KeyedStream[MyPay, String] = timeDS.keyBy(_.uid)
    val pattern = Pattern.begin[MyPay]("start").where(_.stats.equals("create"))
     // .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
      .followedBy("end").where(_.stats.equals("pay"))
      .within(Time.seconds(10))

    val patternStream = CEP.pattern(keyDS, pattern)

    val ps1: DataStream[String] = patternStream.select(new MyPaySelect)
    ps1.print()

    //超时数据
    val tag1 = new OutputTag[String]("late-data")
    val ps2: DataStream[String] = patternStream.select(tag1, new MyTimepay, new MyPaySelect)
    ps2.getSideOutput(tag1).print()

    env.execute()
  }
}
case class MyPay (uid:String,stats:String,ts:Long)
class MyPaySelect extends  PatternSelectFunction[MyPay,String]{
  override def select(map: util.Map[String, util.List[MyPay]]): String = {
    "正常数据"+map
  }
}
class MyTimepay extends PatternTimeoutFunction[MyPay,String]{
  override def timeout(map: util.Map[String, util.List[MyPay]], l: Long): String = {
    l+"超时数据"+map
  }
}

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值