flinksql 从流到kafka和clickhouse

  val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val lineDS: DataStream[String] = env.readTextFile("data/order_detail.txt")
  //  lineDS.rep
    //lineDS.print()
    val mapDS: DataStream[Order] = lineDS.map(t => {
      val str: String = t.replace("'", "")
      val li: Array[String] = str.split(",")

      //编号,订单编号,sku_id,sku名称,图片名称,购买商品单价,购买个数,创建时间,来源类型,来源编号
      //val str: String = li(6).replace("'", "")
    //  val date: Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(li(7))
      Order(li(0), li(1), li(2).trim.toInt, li(3), li(4), li(5).trim.toDouble, li(6).toInt, li(7), li(8), li(9))
    })
   // mapDS.print()


    //2)创建FlinkTable流式环境,将上述数据注册成订单明细表。(5分)
    //val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)
    tEnv.createTemporaryView("t_order",mapDS)
    tEnv.executeSql(
      """
        |select * from t_order
        |""".stripMargin)
     // .print()

写入到kafka  (带聚合的 写入到kafka)   使用sqlquery  

循环的时候使用  hasnext

输出的时候使用next

 //8)使用Flink Table将上述(6)的结果写入到Kafka,结果准确。(5分)
    val table: Table = tEnv.sqlQuery(
      """
        |select sum(num*price) cnt,ts from t_order
        |where ts like '2020-04-01 %'
        |group by ts
        |""".stripMargin)

    val value = table.execute().collect()

    val props = new Properties()
    props.put("bootstrap.servers", "hdp1:9092,hdp2:9092,hdp3:9092")
    props.put("acks", "all")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)
   // for (i <- 0 until 100) {
      while (value.hasNext){
        producer.send(new ProducerRecord[String, String]("week3", null, value.next().toString))
      }

特别注意 ,连接到卡夫卡的窗口  timess  as to_timestamp(ts)  ,

如果不写 from_unixtime('ts','yyyy')  那么里面的ts 就是 string类型

连接文件注意 字段有时候要加上``

写入到clickhouse

clickhouse 需要启动

启动

systemctl start clickhouse-server

客户端登录

clickhouse-client -m

 网页端 

hdp1:8123/play

clickhouse   bigint 类型 是 unit64

/**
 * @author jiasongfan
 * @date 2022/7/6
 * @apiNote
 */
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.table.api.{EnvironmentSettings, Table, TableEnvironment, TableResult}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

import java.sql.{Connection, DriverManager, PreparedStatement}
object Test02 {
  def main(args: Array[String]): Unit = {
    //table sink  clickhouse


    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      //.inBatchMode()
      .build()

  //这里必须是stream
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //参数可以写两个
    val tEnv = StreamTableEnvironment.create(env,settings)
   // val tEnv = TableEnvironment.create(settings)
    //读文件
    tEnv.executeSql(
      """
        |CREATE TABLE t_count (
        |  word String,
        |  yy int,
        |  ts bigint
        | ) WITH (
        |  'connector' = 'filesystem',           -- required: specify the connector
        |  'path' = 'file:///D:\E\month9class\day7-5\data\a.txt',  -- required: path to a directory
        |  'format' = 'csv'                  -- required: file system connector requires to specify a format,
        |)
        |""".stripMargin)

    val table: Table = tEnv.sqlQuery(
      """
        |select * from t_count
        |""".stripMargin)

    //转换成流
    val tableDS: DataStream[Row] = tEnv.toDataStream(table)
    val mapDS: DataStream[(String, Int, Long)] = tableDS.map(t => {
      (t.getField(0).toString, t.getField(1).toString.toInt, t.getField(2).toString.toLong)
    })
    mapDS.print()

    //写入到clickhouse
    mapDS.addSink(new MySinkClickhouse1)

      //.print()
    env.execute()
  }
}
class MySinkClickhouse1 extends RichSinkFunction[(String,Int,Long)]{
  var conn: Connection =null
  var ps: PreparedStatement =null
  override def open(parameters: Configuration): Unit = {
    Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
    conn=DriverManager.getConnection("jdbc:clickhouse://hdp1:8123", "default", "")
    ps= conn.prepareStatement("insert  into t_count2 values (?,?,?)")
  }

  override def close(): Unit = {
    conn.close()
    ps.close()
  }

  override def invoke(value: (String, Int,Long), context: SinkFunction.Context): Unit = {
    ps.setString(1,value._1)
    ps.setInt(2,value._2)
    ps.setLong(3,value._3)
    ps.executeUpdate()
  }
}

创建表

带窗口写入到clickhouse

 val table2: Table = tEnv.sqlQuery(
      """
        |
        |SELECT * FROM (
        |SELECT *,ROW_NUMBER() OVER (PARTITION BY window_start,window_end ORDER BY prices DESC) rownum
        |FROM (SELECT window_start, window_end,title, sum(price) prices FROM
        |TABLE(
        |   TUMBLE(TABLE f_game, DESCRIPTOR(timess), INTERVAL '5' MINUTES))
        |   GROUP BY window_start, window_end,title
        |)
        |) WHERE rownum<=5
        |""".stripMargin)

    //将Table转换成DataStream
    val rowDS: DataStream[Row] = tEnv.toDataStream(table2)

    rowDS.addSink(new ClickhouseSinkSQL)




class ClickhouseSinkSQL extends RichSinkFunction[Row]{
  var conn:Connection = null
  var ps: PreparedStatement = null
  override def open(parameters: Configuration): Unit = {
    Class.forName("ru.yandex.clickhouse.ClickHouseDriver")
    conn = DriverManager.getConnection("jdbc:clickhouse://hdp1:8123","default","")
    ps = conn.prepareStatement("insert into c_game values (?,?,?,?,?)")
  }
  //获得到stream当中的每一条数据,输出
  override def invoke(row: Row, context: SinkFunction.Context): Unit = {
    val s: LocalDateTime = row.getFieldAs[LocalDateTime]("window_start")
    val e: LocalDateTime = row.getFieldAs[LocalDateTime]("window_end")
    val start  = s.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
    val end  = e.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
    val id: String = row.getFieldAs[String]("title")
    val price: Double = row.getFieldAs[Double]("price")
    val rn: Long = row.getFieldAs[Long]("rownum")
    println(start+","+end+","+id+","+price+","+rn)

    ps.setString(1,start)
    ps.setString(2,end)
    ps.setString(3,id)
    ps.setDouble(4,price)
    ps.setLong(5,rn)
    //加载驱动 创建连接 insert
    ps.executeUpdate()
  }

  override def close(): Unit = {
    ps.close()
    conn.close()
  }
}

好的,请稍等,我正在为您编写代码。 ``` import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.types.Row; import java.util.Properties; import ru.yandex.clickhouse.ClickHouseDataSource; public class FlinkClickhouseSink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Kafka consumer properties Properties consumerProperties = new Properties(); consumerProperties.setProperty("bootstrap.servers", "localhost:9092"); consumerProperties.setProperty("group.id", "test-group"); // Kafka producer properties Properties producerProperties = new Properties(); producerProperties.setProperty("bootstrap.servers", "localhost:9092"); // ClickHouse properties Properties clickHouseProperties = new Properties(); clickHouseProperties.setProperty("user", "default"); clickHouseProperties.setProperty("password", ""); clickHouseProperties.setProperty("database", "test"); ClickHouseDataSource clickHouseDataSource = new ClickHouseDataSource("jdbc:clickhouse://localhost:8123/test", clickHouseProperties); // Kafka consumer DataStream<String> stream = env .addSource(new FlinkKafkaConsumer<>("test-topic", new SimpleStringSchema(), consumerProperties)); // Flink transformation, e.g. filter or map DataStream<Tuple2<Integer, String>> mappedStream = stream .map(new MapFunction<String, Tuple2<Integer, String>>() { @Override public Tuple2<Integer, String> map(String value) throws Exception { Integer key = Integer.parseInt(value.split(",")[0]); String message = value.split(",")[1]; return new Tuple2<>(key, message); } }); // Kafka producer serialization schema KafkaSerializationSchema<Tuple2<Integer, String>> kafkaSerializationSchema = new KafkaSerializationSchema<Tuple2<Integer, String>>() { @Override public void open(SerializationSchema.InitializationContext context) throws Exception { } @Override public ProducerRecord<byte[], byte[]> serialize(Tuple2<Integer, String> element, Long timestamp) { return new ProducerRecord<>("test-topic", (element.f0 + "," + element.f1).getBytes()); } }; // Kafka producer FlinkKafkaProducer<Tuple2<Integer, String>> kafkaProducer = new FlinkKafkaProducer<>("test-topic", kafkaSerializationSchema, producerProperties); // ClickHouse sink function SinkFunction<Tuple2<Integer, String>> clickHouseSinkFunction = new SinkFunction<Tuple2<Integer, String>>() { @Override public void invoke(Tuple2<Integer, String> value, Context context) throws Exception { String sql = "INSERT INTO test (id, message) values (?, ?)"; try (ClickHouseConnection connection = clickHouseDataSource.getConnection(); ClickHousePreparedStatement stmt = (ClickHousePreparedStatement) connection.prepareStatement(sql)) { stmt.setInt(1, value.f0); stmt.setString(2, value.f1); stmt.execute(); } } }; // ClickHouse sink mappedStream.addSink(clickHouseSinkFunction); // Kafka producer sink mappedStream.addSink(kafkaProducer); env.execute(); } } ``` 这是一个简单的 Flink 流程序,将从 Kafka 中读取数据,经过 Flink 的转换后将数据写入 Kafka ClickHouse 中。请注意,这个程序只是示例代码,如果您在实际环境中使用,请根据实际情况进行修改。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值