Spark MLIB中的Kmenas聚类算法,数据通过SparkStreaming 实时拉取kafka中的数据,并调用已经训练好的聚类模型;根据读取的数据实时的进行分类
package com.demo.cn.streaming
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.mllib.clustering.KMeansModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
object SparkStreamKafka {
val checkDir="E:\\file\\SparkCheckpoint"
def functionToCreateContext(): StreamingContext = {
val conf=new SparkConf()
.setAppName("SparkStreamKafka")
.setMaster("local[*]")