主要思路
1、读取历史偏移量
2、根据偏移量加载数据 kafkaUtils.createDirectStream 获取Dstream
3、遍历Dstream foreachRdd 获取到kafkaRdd
4、从kafkaRdd as获取当前偏移量 kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
5、开启事务
6、kafkaRdd.value 获取kafka输入的数据
7、rdd转换 处理、将数据写入mysql
8、根据新的偏移量进行偏移量更新
9、提交事务
主方法
package com.ws.sparkstreaming.kafkamysql
import java.sql.Connection
import com.ws.sparkstreaming.utils.{MysqlPool, OffsetUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCountJoinKafkaMysqlManagerOffset {
def main(args: Array[String]): Unit = {
val name = this.getClass.getSimpleName
val conf = new SparkConf().setAppName(name).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val group = "group2"
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "dream1:9092,dream2:9092,dream3:9092", // kafka地址
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", // 设置反序列化组件
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> group, // 消费者组
"auto.offset.reset" -> "earliest", // 指定消费者从哪开始消费[latest,earliest]
"enable.auto.commit" -> "false" // 是否自动提交偏移量,默认是true
)
val topic: Iterable[String] = Array("wordcount").toIterable
// 读取偏移量,要根据gropid和appname来确认offset,如果offset有数据,他会根据我们查到的偏移量,读取kafka数据,否则从头开始读取
val offset: Map[TopicPartition, Long] = OffsetUtils.selectOffsetFromMysql(name, group)
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topic, kafkaParams, offset)
)
dstream.foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
// 获取偏移量,为什么偏移量是个array呢???,因为dstram里也是多个kafkardd,一个kafkardd对应一个offsetRange
val ranges: Array[OffsetRange] = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 拿到kafkaRdd,先判断是否为空,因为sparkstream,5秒钟就会执行一次,不管有没有数据都会执行,所以先判断rdd内有没有数据
if (!kafkaRdd.isEmpty()) {
// 把wordcount写入mysql
val line: RDD[String] = kafkaRdd.map(_.value().trim)
val words: RDD[String] = line.flatMap(_.split(" "))
val wordAnd1: RDD[(String, Int)] = words.map((_, 1))
val wordAndNums: RDD[(String, Int)] = wordAnd1.reduceByKey(_ + _)
// 如果想要通过mysql实现exactlyOne 必须要收集到Driver端,在处理,绝对不能在executor端 利用mysql写入数据。因为多个executor是无法控制的
val res: Array[(String, Int)] = wordAndNums.collect()
var conn: Connection = null
try {
// 获取连接
conn = MysqlPool.getConnection
// 开启事务
conn.setAutoCommit(false)
// 对收集的数据进行处理,写入数据库
WordCountMethods.wordcountToMysql(res,conn)
// 更新偏移量
OffsetUtils.updateOffsetToMysql(ranges,conn,name,group)
// 提交事务
conn.commit()
} catch {
case exception: Exception =>
exception.printStackTrace()
// 如果错误 回滚
conn.rollback()
// 关闭程序
ssc.stop()
} finally {
// 关闭连接
if (conn != null) {
conn.close()
}
}
}
})
ssc.start()
ssc.awaitTermination()
}
}
mysql连接池
package com.ws.sparkstreaming.utils
import java.sql.Connection
import java.util.Properties
import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource
object MysqlPool {
private val prop = new Properties()
prop.put("driverClassName","com.mysql.jdbc.Driver")
prop.put("url","jdbc:mysql://dream3:3306/bigdata?characterEncoding=UTF-8")
prop.put("username","root")
prop.put("password","root")
private val source: DataSource = DruidDataSourceFactory.createDataSource(prop)
def getConnection: Connection ={
source.getConnection
}
}
OffsetUtils 偏移量管理工具
package com.ws.sparkstreaming.utils
import java.sql.Connection
import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import scala.collection.mutable
object OffsetUtils {
def updateOffsetToMysql(ranges: Array[OffsetRange], conn: Connection, appname: String, goupId: String) = {
val offsetsql = "insert into spark_stream_offset(app,groupid,topic,`partition`,`offset`) values(?,?,?,?,?) ON DUPLICATE KEY UPDATE offset = ?"
val ppsm = conn.prepareStatement(offsetsql)
ranges.foreach(range => {
ppsm.setString(1, appname)
ppsm.setString(2, goupId)
ppsm.setString(3, range.topic)
ppsm.setInt(4, range.partition)
ppsm.setLong(5, range.untilOffset)
ppsm.setLong(6, range.untilOffset)
ppsm.addBatch()
})
ppsm.executeBatch()
}
def selectOffsetFromMysql(appname: String, goupId: String): Map[TopicPartition, Long] = {
val conn = MysqlPool.getConnection
val offsets = new mutable.HashMap[TopicPartition, Long]()
val offsetsql = "select `topic`,`partition`,offset from spark_stream_offset where app=? and groupid=?"
val ppsm = conn.prepareStatement(offsetsql)
ppsm.setString(1, appname)
ppsm.setString(2, goupId)
val set = ppsm.executeQuery()
while (set.next()) {
val topic: String = set.getString("topic")
val partition: Int = set.getInt("partition")
val offset: Long = set.getLong("offset")
val part = new TopicPartition(topic, partition)
offsets.put(part,offset)
}
conn.close()
ppsm.close()
offsets.toMap
}
}