目标 : 使用 Akka 实现一个简易版的 spark 通信框架
项目概述
1. 需求
目前大多数的分布式架构底层通信都是通过 RPC 实现的,RPC 框架非常多,比如前我们学过的 Hadoop 项目的 RPC 通信框架,但是 Hadoop 在设计之初就是为了运行长达数小时的批量而设计的,在某些极端的情况下,任务提交的延迟很高,所以 Hadoop 的 RPC 显得有些笨重。
Spark 的 RPC 是通过 Akka 类库实现的,Akka 用 Scala 语言开发,基于 Actor并发模型实现,Akka 具有高可靠、高性能、可扩展
等特点,使用 Akka 可以轻松实现分布式 RPC 功能。
2 . Akka简介
Akka 基于 Actor 模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。
Actor 模型:在计算机科学领域,Actor 模型是一个并行计算(Concurrent Computation)模型,它把 actor 作为并行计算的基本元素来对待:为响应一个接收到的消息,一个 actor 能够自己做出一些决策,如创建更多的 actor,或发送更多的消息,或者确定如何去响应接收到的下一个消息。
Actor 是 Akka 中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个 Actor 都有自己的收件箱(Mailbox)。通过 Actor 能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor 具有如下特性:
- (1)、提供了一种高级抽象,能够简化在并发(Concurrency)/并行(Parallelism)应用场景下的编程开发
- (2)、提供了异步非阻塞的、高性能的事件驱动编程模型
- (3)、超级轻量级事件处理(每 GB 堆内存几百万 Actor)
Demo1 : 利用 Akka 的 的 actor 编程模型,实现 2 个进程间的通信
架构图
重要类介绍
ActorSystem
:在 Akka 中,ActorSystem 是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem 通常是一个单例对象,我们可以使用这个ActorSystem 创建很多 Actor。
注意 :
(1)ActorSystem 是一个进程中的老大,它负责创建和监督 actor
(2)ActorSystem 是一个单例对象
(3)actor 负责通信
Actor
在 Akka 中,Actor 负责通信,在 Actor 中有一些重要的生命周期方法。
(1)preStart()
方法:该方法在 Actor 对象构造方法执行后执行,整个 Actor 生命周期中仅执行一次。
(2)receive()
方法:该方法在 Actor 的 preStart 方法执行完成后执行,用于接收消息,会被反复执行。
具体代码
- Master类
- 首先创建ActorSystem(即老大) , 参数为老大的名字和config参数
- config参数通过ConfigFactory.parseString()方法获取 , 参数为configStr
- 定义configStr : shift + 3次
"
键 , 填写配置信息即可
- 通过masterActorSystem老大构建master actor
- perStart()方法会在构造代码执行后被调用 , 并且只会执行一次 , 因此常用来作为初始化
- receive() 方法在preStart()方法调用之后被执行 , 意味着可以
不断接收数据
- 当接收到worker的注册信息之后返回成功注册信息给worker
- 首先创建ActorSystem(即老大) , 参数为老大的名字和config参数
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
//todo:利用akka中的actor来实现2个进程间的通信------------Master端
class Master extends Actor{
//构造代码先运行
println("Master constructor invoked")
//preStart方法会在构造代码执行后,被调用,并且只会执行一次
override def preStart(): Unit = {
println("preStart method invoked")
}
//receive方法会在preStart方法调用后被执行,意味着可以不断的接受数据
override def receive: Receive = {
//接受worker的注册信息
case "connect" =>{
println("a client connected")
//master反馈一个注册成功信息给worker
//通过sender方法拿到worker的引用
sender ! "success"
}
}
}
object Master{
def main(args: Array[String]): Unit = {
//定义master的IP地址
val host=args(0)
//定义amster的端口
val port=args(1)
//准备字符串配置信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通过ConfigFactory调用parseString解析字符串获取config对象
val config:Config=ConfigFactory.parseString(configStr)
//1、创建ActorSystem老大,它负责创建和监督actor
val masterActorSystem = ActorSystem("masterActorSystem",config)
//2、通过masterActorSystem老大构建master actor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor")
//3、测试
//masterActor ! "connect"
}
}
- Worker类
- 大致步骤与Master一致
- 由于worker需要将注册信息发送给master , 故此时需要传递一个字符串给master
- 通过ActorContext上下文对象调用actorSelection可以从已知的actor中寻找目标
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
//todo:利用akka中的actor来实现2个进程间的通信------------Worker端
class Worker extends Actor{
println("Worker constructor invoked")
//preStart方法会在构造代码执行后被调用,并且只被运行一次,可以在这里做一些初始化的工作
override def preStart(): Unit = {
println("preStart method invoked")
//获取master引用
//通过ActorContext上下文对象调用actorSelection可以从已知的actor中寻找目标
//方法中需要一个字符串,它包括了构建master actor的条件
//1、通信协议 2、master的ip 3、master端口 4、创建master actor老大 5、master actor 名称 6、actor层级关系
val master: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@192.168.140.52:8888/user/masterActor")
//通过master引用向master发送注册信息
master ! "connect"
}
//不断接受消息
override def receive: Receive = {
//worker接受master反馈的注册成功信息
case "success" =>{
println("当前worker注册成功")
}
}
}
object Worker{
def main(args: Array[String]): Unit = {
//定义worker的IP地址
val host=args(0)
//定义worker的端口
val port=args(1)
//准备字符串配置信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通过ConfigFactory调用parseString解析字符串获取config对象
val config:Config=ConfigFactory.parseString(configStr)
//1、创建ActorSystem 负责创建和监督actor
val workerActorSystem = ActorSystem("workerActorSystem",config)
//2、创建worker actor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker),"workerActor")
//3、测试
//workerActor ! "connect"
}
}
Demo2 : 使用 Akka 实现一个简易版的 spark 通信框架
架构图
大致步骤 :
- 在worker端中的preStart方法中获取master引用 , 然后通过这个引用向master发送注册信息 , 我们将注册信息通过样例类去封装
- master接收到worker的注册消息 , 然后将这些worker的注册信息进行保存 , 最后返回一个注册成功的信息给worker
- worker接收master返回的注册成功信息 , 然后worker向master定时发送心跳 , 发送心跳的目的 : 报活
- master接收worker的心跳 , 先判断哪些worker是超时的 , 发现超时的worker之后 , 定时清除掉超时的worker信息
- 先启动master , 然后在启动worker
tips : 当我们看到如下报错时 , 需要手动导入隐式转换
Error:(24, 38) Cannot find an implicit ExecutionContext. You might pass an (implicit ec: ExecutionContext) parameter to your method or import scala.concurrent.ExecutionContext.Implicits.global. context.system.scheduler.schedule(0 millis,checkOutTimeInterval millis,self,CheckOutTime)
时间包 :
import scala.concurrent.duration._
具体代码
- Master类
package cn.itcast.spark
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
//todo: 利用akka中的actor模型来实现一个简易版的spark通信框架------------Master端
class Master extends Actor{
//构造代码先运行
println("Master constructor invoked")
//定义一个map集合,用于保存worker的注册信息,key: workerId value: WorkerInfo
private val workerInfoMap = new mutable.HashMap[String,WorkerInfo]()
//定义一个list集合,用于保存workerinfo
private val workerInfoList = new ListBuffer[WorkerInfo]
//定义master定时检查时间间隔
val checkOutTimeInterval=15000 //15秒
//preStart方法会在构造代码执行后,被调用,并且只会执行一次
override def preStart(): Unit = {
println("preStart method invoked")
//master定时检查超时的worker
//手动导入隐式转换
import context.dispatcher
context.system.scheduler.schedule(0 millis,checkOutTimeInterval millis,self,CheckOutTime)
}
//receive方法会在preStart方法调用后被执行,意味着可以不断的接受数据
override def receive: Receive = {
//接受worker的注册信息
case RegisterMessage(workerId,memory,cores) =>{
//判断worker是否注册,master只接受没有注册过的worker信息
if(!workerInfoMap.contains(workerId)){
//构建WorkerInfo对象
val workerInfo = new WorkerInfo(workerId,memory,cores)
//把信息保存在map集合中
workerInfoMap +=(workerId -> workerInfo)
//把workerinfo信息保存在list集合中
workerInfoList +=workerInfo
//master反馈一个注册成功信息给worker
//通过sender方法拿到worker的引用
sender ! RegisteredMessage(s"workerId:$workerId 注册成功")
}
}
//master接受worker的心跳
case HeartBeat(workerId) =>{
//master要判断worker是否注册,master只接受注册过的worker的心跳信息
if(workerInfoMap.contains(workerId)){
//获取当前系统时间
val now: Long = System.currentTimeMillis()
//通过workerId获取对应的WorkerInfo信息
val workerInfo: WorkerInfo = workerInfoMap(workerId)
//给workerInfo中上一次心跳时间赋值
workerInfo.lastHeartBeatTime=now
}
}
//master接受自己发送过来的信息
case CheckOutTime =>{
//需要判断哪些worker超时
//超时的worker判断逻辑: now - worker上一次心跳时间 > master定时检查的时间间隔 15s
val now: Long = System.currentTimeMillis()
val outTimeWorkerInfo: ListBuffer[WorkerInfo] = workerInfoList.filter( x=> now -x.lastHeartBeatTime >checkOutTimeInterval )
//遍历
for(out <- outTimeWorkerInfo){
//从map集合中移除超时的worker信息
val workerId: String = out.workerId
workerInfoMap -=(workerId)
//从list集合中移除超时的worker信息
workerInfoList -=out
println(s"workerId:$workerId 超时")
}
println("活着的worker总数:"+workerInfoList.size)
//按照worker自身的内存大小降序排列
println(workerInfoList.sortBy(x=>x.memory).reverse)
}
}
}
object Master{
def main(args: Array[String]): Unit = {
//定义master的IP地址
val host=args(0)
//定义master的端口
val port=args(1)
//准备字符串配置信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通过ConfigFactory调用parseString解析字符串获取config对象
val config:Config=ConfigFactory.parseString(configStr)
//1、创建ActorSystem老大,它负责创建和监督actor
val masterActorSystem = ActorSystem("masterActorSystem",config)
//2、通过masterActorSystem老大构建master actor
val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor")
//3、测试
//masterActor ! "connect"
}
}
- Worker类
package cn.itcast.spark
import java.util.UUID
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration._
//todo: 利用akka中的actor模型来实现一个简易版的spark通信框架------------Worker端
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{
println("Worker constructor invoked")
//定义一个workerId
private val workerId: String = UUID.randomUUID().toString
//定义常量 表示每隔多久向master发送一次心跳
val sendHeartBeatInterval=10000 //10秒
//定义master全局变量
var master: ActorSelection=_
//preStart方法会在构造代码执行后被调用,并且只被运行一次,可以在这里做一些初始化的工作
override def preStart(): Unit = {
println("preStart method invoked")
//获取master引用
//通过ActorContext上下文对象调用actorSelection可以从已知的actor中寻找目标
//方法中需要一个字符串,它包括了构建master actor的条件
//1、通信协议 2、master的ip 3、master端口 4、创建master actor老大 5、master actor 名称 6、actor层级关系
master = context.actorSelection(s"akka.tcp://masterActorSystem@$masterHost:$masterPort/user/masterActor")
//通过master引用向master发送注册信息,通过样例类封装注册信息
master ! RegisterMessage(workerId,memory,cores)
}
//不断接受消息
override def receive: Receive = {
//worker接受master反馈的注册成功信息
case RegisteredMessage(message) =>{
println(message)
//定时向master发送心跳
//正常是需要向master引用发送心跳,由于这里类型不符合,就需要使用self---->表示worker actor本身
//手动导入隐式转换
import context.dispatcher
context.system.scheduler.schedule(0 millis, sendHeartBeatInterval millis,self,SendHeartBeat)
}
//worker自己向自己发送信息
case SendHeartBeat =>{
//worker真正向master发送心跳
master ! HeartBeat(workerId)
}
}
}
object Worker{
def main(args: Array[String]): Unit = {
//定义worker的IP地址
val host=args(0)
//定义worker的端口
val port=args(1)
//定义worker的内存大小
val memory=args(2).toInt
//定义worker的cpu核数
val cores=args(3).toInt
//定义master的IP地址
val masterHost=args(4)
//定义master的端口
val masterPort=args(5)
//准备字符串配置信息
val configStr=
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
//通过ConfigFactory调用parseString解析字符串获取config对象
val config:Config=ConfigFactory.parseString(configStr)
//1、创建ActorSystem 负责创建和监督actor
val workerActorSystem = ActorSystem("workerActorSystem",config)
//2、创建worker actor
val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory,cores,masterHost,masterPort)),"workerActor")
//3、测试
//workerActor ! "connect"
}
}
- RemoteMessage
package cn.itcast.spark
trait RemoteMessage extends Serializable{
}
//worker向master发送注册信息,由于不在同一进程中,需要实现序列化
case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage
//master向worker反馈注册成功信息,由于不在同一进程中,需要实现序列化
case class RegisteredMessage(val message:String) extends RemoteMessage
//worker自己给自己发送消息,由于在同一进程中,不需要实现序列化
case object SendHeartBeat
//worker向master发送心跳,由于不在同一进程中,需要实现序列化
case class HeartBeat(val workerId:String) extends RemoteMessage
//master自己给自己发送消息,由于在同一进程中,不需要实现序列化
case object CheckOutTime
- WorkerInfo
package cn.itcast.spark
//通过一个对象保存worker的信息
class WorkerInfo(val workerId:String,val memory:Int,val cores:Int) {
//定义一个变量,用于保存worker的上一次心跳时间
var lastHeartBeatTime:Long=_
//重写toString
override def toString: String = {
s"workerId:$workerId memory:$memory cores:$cores"
}
}