sparkStream-wordCount实现exactlyOne第一种--Mysql

该博客介绍了如何使用Spark Streaming从Kafka中读取数据,处理后写入MySQL,并管理消费的偏移量。主要步骤包括创建Direct Stream,遍历DStream,获取和更新偏移量,以及在事务中处理数据和提交事务。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

 

 

主要思路

1、读取历史偏移量

2、根据偏移量加载数据 kafkaUtils.createDirectStream 获取Dstream

3、遍历Dstream  foreachRdd 获取到kafkaRdd

4、从kafkaRdd  as获取当前偏移量 kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges

5、开启事务

6、kafkaRdd.value 获取kafka输入的数据

7、rdd转换 处理、将数据写入mysql

8、根据新的偏移量进行偏移量更新

9、提交事务
 

 主方法

package com.ws.sparkstreaming.kafkamysql

import java.sql.Connection

import com.ws.sparkstreaming.utils.{MysqlPool, OffsetUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCountJoinKafkaMysqlManagerOffset {
  def main(args: Array[String]): Unit = {
    val name = this.getClass.getSimpleName
    val conf = new SparkConf().setAppName(name).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val group = "group2"
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "dream1:9092,dream2:9092,dream3:9092", // kafka地址
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", // 设置反序列化组件
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "group.id" -> group, // 消费者组
      "auto.offset.reset" -> "earliest", // 指定消费者从哪开始消费[latest,earliest]
      "enable.auto.commit" -> "false" // 是否自动提交偏移量,默认是true
    )
    val topic: Iterable[String] = Array("wordcount").toIterable
    // 读取偏移量,要根据gropid和appname来确认offset,如果offset有数据,他会根据我们查到的偏移量,读取kafka数据,否则从头开始读取
    val offset: Map[TopicPartition, Long] = OffsetUtils.selectOffsetFromMysql(name, group)
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topic, kafkaParams, offset)
    )
    dstream.foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
      // 获取偏移量,为什么偏移量是个array呢???,因为dstram里也是多个kafkardd,一个kafkardd对应一个offsetRange
      val ranges: Array[OffsetRange] = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
      // 拿到kafkaRdd,先判断是否为空,因为sparkstream,5秒钟就会执行一次,不管有没有数据都会执行,所以先判断rdd内有没有数据
      if (!kafkaRdd.isEmpty()) {
        // 把wordcount写入mysql
        val line: RDD[String] = kafkaRdd.map(_.value().trim)
        val words: RDD[String] = line.flatMap(_.split(" "))
        val wordAnd1: RDD[(String, Int)] = words.map((_, 1))
        val wordAndNums: RDD[(String, Int)] = wordAnd1.reduceByKey(_ + _)
        // 如果想要通过mysql实现exactlyOne 必须要收集到Driver端,在处理,绝对不能在executor端 利用mysql写入数据。因为多个executor是无法控制的
        val res: Array[(String, Int)] = wordAndNums.collect()
        var conn: Connection = null
        try {
          // 获取连接
          conn = MysqlPool.getConnection
          // 开启事务
          conn.setAutoCommit(false)
          // 对收集的数据进行处理,写入数据库
          WordCountMethods.wordcountToMysql(res,conn)
          // 更新偏移量
          OffsetUtils.updateOffsetToMysql(ranges,conn,name,group)
          // 提交事务
          conn.commit()
        } catch {
          case exception: Exception =>
            exception.printStackTrace()
            // 如果错误 回滚
            conn.rollback()
            // 关闭程序
            ssc.stop()
        } finally {
          // 关闭连接
          if (conn != null) {
            conn.close()
          }
        }
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

 mysql连接池

package com.ws.sparkstreaming.utils

import java.sql.Connection
import java.util.Properties

import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource

object MysqlPool {
  private val prop = new Properties()
  prop.put("driverClassName","com.mysql.jdbc.Driver")
  prop.put("url","jdbc:mysql://dream3:3306/bigdata?characterEncoding=UTF-8")
  prop.put("username","root")
  prop.put("password","root")
  private val source: DataSource = DruidDataSourceFactory.createDataSource(prop)

  def getConnection: Connection ={
    source.getConnection
  }
}

OffsetUtils 偏移量管理工具 

package com.ws.sparkstreaming.utils

import java.sql.Connection

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange

import scala.collection.mutable

object OffsetUtils {

  def updateOffsetToMysql(ranges: Array[OffsetRange], conn: Connection, appname: String, goupId: String) = {
    val offsetsql = "insert into spark_stream_offset(app,groupid,topic,`partition`,`offset`) values(?,?,?,?,?) ON DUPLICATE KEY UPDATE offset = ?"
    val ppsm = conn.prepareStatement(offsetsql)
    ranges.foreach(range => {
      ppsm.setString(1, appname)
      ppsm.setString(2, goupId)
      ppsm.setString(3, range.topic)
      ppsm.setInt(4, range.partition)
      ppsm.setLong(5, range.untilOffset)
      ppsm.setLong(6, range.untilOffset)
      ppsm.addBatch()
    })
    ppsm.executeBatch()
  }

  def selectOffsetFromMysql(appname: String, goupId: String): Map[TopicPartition, Long] = {
    val conn = MysqlPool.getConnection
    val offsets = new mutable.HashMap[TopicPartition, Long]()
    val offsetsql = "select `topic`,`partition`,offset from spark_stream_offset where app=? and groupid=?"
    val ppsm = conn.prepareStatement(offsetsql)
    ppsm.setString(1, appname)
    ppsm.setString(2, goupId)
    val set = ppsm.executeQuery()
    while (set.next()) {
      val topic: String = set.getString("topic")
      val partition: Int = set.getInt("partition")
      val offset: Long = set.getLong("offset")
      val part = new TopicPartition(topic, partition)
      offsets.put(part,offset)
    }
    conn.close()
    ppsm.close()
    offsets.toMap
  }

}

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值