val system = ActorSystem("PingPongSystem")
ActorSystem object
def apply(name: String): ActorSystem = apply(name, None, None, None)
def apply(name: String, config: Option[Config] = None, classLoader: Option[ClassLoader] = None, defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem = {
//得到类加载器ClassLoader
val cl = classLoader.getOrElse(findClassLoader())
//加载配置信息
val appConfig = config.getOrElse(ConfigFactory.load(cl))
//创建ActorSystemImpl实例,并调用它的start方法
new ActorSystemImpl(name, appConfig, cl, defaultExecutionContext, None).start()
}
ActorSystemImpl类
private lazy val _start: this.type = try {
registerOnTermination(stopScheduler())
//调用LocalActorRefProvider类的init方法,参数为当前ActorSystemImpl类的实例
provider.init(this)
if (settings.LogDeadLetters > 0)
logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
eventStream.startUnsubscriber()
loadExtensions()
if (LogConfigOnStart) logConfiguration()
this
} catch {
case NonFatal(e) ⇒
try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) }
throw e
}
下面看看LocalActorRefProvider类的init方法
def init(_system: ActorSystemImpl) {
system = _system
rootGuardian.start()
//systemGuardian监控guardian
systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
//rootGuardian监控systemGuardian
rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
eventStream.startDefaultLoggers(_system)
}
上面调用的rootGuardian方法:
创建一个表示根的LocalActorRef
根ActorRef的路径为:akka://[ActorSystemName]/
它的监管策略为:对所有的Exception情况进行Stop终止
protected def rootGuardianStrategy: SupervisorStrategy = OneForOneStrategy() {
case ex ⇒
log.error(ex, "guardian failed, shutting down system")
SupervisorStrategy.Stop
}
override lazy val rootGuardian: LocalActorRef =
new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.Guardian], rootGuardianStrategy),
defaultDispatcher,
defaultMailbox,
theOneWhoWalksTheBubblesOfSpaceTime,
rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" ⇒ tempContainer
case "deadLetters" ⇒ deadLetters
case other ⇒ extraNames.get(other).getOrElse(super.getSingleChild(other))
}
}
接下来,是调用LocalActorRef类的start方法
override def start(): Unit = actorCell.start()
接下来是ActorCell类的start方法
def start(): this.type = {
dispatcher.attach(this)
this
}
然后是消息派发器MessageDispatcher类
final def attach(actor: ActorCell): Unit = {
//将Actor实例与消息派发器绑定在一起
register(actor)
//将Actor的Mailbox交给线程池中的线程执行
registerForExecution(actor.mailbox, false, true)
}
protected[akka] def register(actor: ActorCell) {
if (debug) actors.put(this, actor.self)
addInhabitants(+1)
}
创建system ActorRef
它的路径是:akka://[ActorSystem]/system
它的父监管者是:之前创建的rootGuardian(其路径为akka://[ActorSystemName]/)
它的监管策略是:
final val defaultDecider: Decider = {
case _: ActorInitializationException ⇒ Stop
case _: ActorKilledException ⇒ Stop
case _: DeathPactException ⇒ Stop
case _: Exception ⇒ Restart
}
override lazy val systemGuardian: LocalActorRef = {
val cell = rootGuardian.underlying
cell.reserveChild("system")
val ref = new LocalActorRef(
system,
Props(classOf[LocalActorRefProvider.SystemGuardian], systemGuardianStrategy, guardian),
defaultDispatcher,
defaultMailbox,
rootGuardian,
rootPath / "system")
cell.initChild(ref)
ref.start()
ref
}
创建user ActorRef
它的路径为:akka://[ActorSystem]/user
它的父监管者是: system ActorRef
它的默认监管策略和system ActorRef的一样
override lazy val guardian: LocalActorRef = {
val cell = rootGuardian.underlying
cell.reserveChild("user")
val ref = new LocalActorRef(
system,
system.guardianProps.getOrElse(Props(classOf[LocalActorRefProvider.Guardian], guardianStrategy)),
defaultDispatcher,
defaultMailbox,
rootGuardian,
rootPath / "user"
)
cell.initChild(ref)
ref.start()
ref
}
接下来,看看LocalActorRef类的sendSystemMessage方法
override def sendSystemMessage(message: SystemMessage): Unit = actorCell.sendSystemMessage(message)
然后是ActorCell类
override def sendSystemMessage(message: SystemMessage): Unit = try dispatcher.systemDispatch(this, message) catch handleException
然后是Dispatcher类
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
//得到接受者的邮箱
val mbox = receiver.mailbox
//向邮箱加入系统消息
mbox.systemEnqueue(receiver.self, invocation)
//将邮箱交给线程池中线程执行
registerForExecution(mbox, false, true)
}
以systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
为例说明上面的方法中的逻辑处理。
val mbox = receiver.mailbox
得到的是systemGuardian的邮箱,
然后执行registerForExecution(mbox, false, true)
,运行Mailbox的run
方法,在Mailbox的processAllSystemMessages
方法中,会调用systemGuardian的systemInvoke
方法处理Watch
SystemMessage。
最终会调用addWatch
方法:
protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) maintainAddressTerminatedSubscription(watcher) {
watchedBy += watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), s"now watched by $watcher"))
} //会进入这个分支,调用watch方法监控watchee
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
}
}