ActorSystem

2020/02/28 Akka源码 共 29956 字,约 86 分钟
梦境迷离
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.actor

import java.io.Closeable
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicReference
import java.util.Optional

import akka.actor.dungeon.ChildrenContainer
import akka.actor.setup.{ ActorSystemSetup, Setup }
import akka.annotation.InternalApi
import akka.dispatch._
import akka.event._
import akka.japi.Util.immutableSeq
import akka.util._
import akka.util.Helpers.toRootLowerCase
import com.typesafe.config.{ Config, ConfigFactory }

import scala.annotation.tailrec
import scala.collection.immutable
import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
import scala.util.{ Failure, Success, Try }
import scala.util.control.{ ControlThrowable, NonFatal }

//引导程序配置
object BootstrapSetup {

  /**
   * Scala API: Scala API:使用默认值构造引导程序设置
   * 请注意,将其传递给actor系统与根本不传递任何[[BootstrapSetup]]相同
   * 您可以使用BootstrapSetup的各种“with”方法,返回的实例派生一个具有不同于默认值的值的实例
   */
  def apply(): BootstrapSetup = {
    new BootstrapSetup()
  }

  /**
   * Scala API: 创建启动actor系统所需的引导程序设置
   *
   * @see [[BootstrapSetup]] 类加载器、配置、默认的线程池
   */
  def apply(classLoader: Option[ClassLoader], config: Option[Config], defaultExecutionContext: Option[ExecutionContext]): BootstrapSetup =
    new BootstrapSetup(classLoader, config, defaultExecutionContext)

  /**
   * Scala API: 保留默认的类加载器和默认的执行上下文,但使用自定义的配置
   */
  def apply(config: Config): BootstrapSetup = apply(None, Some(config), None)

  /**
   * Java API: 同上,给Java用,Scala使用apply构造,Java使用静态方法
   *
   * @see [[BootstrapSetup]] for description of the properties
   */
  def create(classLoader: Optional[ClassLoader], config: Optional[Config], defaultExecutionContext: Optional[ExecutionContext]): BootstrapSetup =
    apply(classLoader.asScala, config.asScala, defaultExecutionContext.asScala)

  /**
   * Java  API: 同上,给Java用,Scala使用apply构造,Java使用静态方法
   */
  def create(config: Config): BootstrapSetup = apply(config)

  /**
   * Java API: 同上,给Java用,Scala使用apply构造,Java使用静态方法
   */
  def create(): BootstrapSetup = {
    new BootstrapSetup()
  }

}

//选择器,唯一描述符 目前只有下面三种,构造函数私有
abstract class ProviderSelection private(private[akka] val identifier: String)

object ProviderSelection {

  //Akka支持本地、远程和集群使用选择器(一种类似文件路径又比较特殊的URL,具有层级关系)
  case object Local extends ProviderSelection("local")

  case object Remote extends ProviderSelection("remote")

  case object Cluster extends ProviderSelection("cluster")

  /**
   * JAVA API
   */
  def local(): ProviderSelection = Local

  /**
   * JAVA API
   */
  def remote(): ProviderSelection = Remote

  /**
   * JAVA API
   */
  def cluster(): ProviderSelection = Cluster

}

/**
 * 使用[[Bootstrap Setup]]构造函数中的工厂之一创建的actor系统的核心引导程序设置是内部 API
 *
 * @param classLoader             如果未提供ClassLoader,它将通过首先检查当前线程的getContextClassLoader来获取当前的ClassLoader,
 *                                然后尝试遍历堆栈以查找调用方的类加载器,然后退回到与ActorSystem类关联的ClassLoader
 * @param config                  用于actor系统的配置 如果未提供配置,则将从ClassLoader获取默认参考配置
 * @param defaultExecutionContext 如果定义,ExecutionContext将用作此ActorSystem中的默认执行线程池
 *                                如果未提供ExecutionContext,则系统将回退到在“akka.actor.default-dispatcher.default-executor.fallback”下配置的执行线程池
 * @param actorRefProvider        覆盖config中的akka.actor.provider设置,可以是local(默认),remote或cluster 它也可以是提供者的完全限定的类名
 */
final class BootstrapSetup private(
  val classLoader: Option[ClassLoader] = None,
  val config: Option[Config] = None,
  val defaultExecutionContext: Option[ExecutionContext] = None,
  val actorRefProvider: Option[ProviderSelection] = None) extends Setup {
  //修复默认配置会返回一个新的实例
  def withClassloader(classLoader: ClassLoader): BootstrapSetup =
    new BootstrapSetup(Some(classLoader), config, defaultExecutionContext, actorRefProvider)

  def withConfig(config: Config): BootstrapSetup =
    new BootstrapSetup(classLoader, Some(config), defaultExecutionContext, actorRefProvider)

  def withDefaultExecutionContext(executionContext: ExecutionContext): BootstrapSetup =
    new BootstrapSetup(classLoader, config, Some(executionContext), actorRefProvider)

  //使用远程或集群时指定选择器提供者本地不推荐使用,因为选择器遍历actor树会对性能有影响
  def withActorRefProvider(name: ProviderSelection): BootstrapSetup =
    new BootstrapSetup(classLoader, config, defaultExecutionContext, Some(name))

}

//actor系统的单例对象,用来对外暴露构建ActorSystem实例的接口
object ActorSystem {

  //获取当前akka的版本
  val Version: String = akka.Version.current // generated file

  //获取操作系统的环境  windows linux unix
  val EnvHome: Option[String] = System.getenv("AKKA_HOME") match {
    case null | "" | "."  None
    case value  Some(value)
  }
  //获取系统中名字为akka.home的属性值
  val SystemHome: Option[String] = System.getProperty("akka.home") match {
    case null | ""  None
    case value  Some(value)
  }

  //若没有配置SystemHome则使用EnvHome
  val GlobalHome: Option[String] = SystemHome orElse EnvHome

  /**
   * 创建一个名称为“default”的新ActorSystem,通过首先检查当前线程的getContextClassLoader来获取当前的ClassLoader,
   * 然后尝试遍历堆栈以查找调用方的类加载器,然后退回到与ActorSystem类关联的ClassLoader 然后,它使用ClassLoader加载默认的参考配置
   */
  def create(): ActorSystem = apply()

  /**
   * 同上,但名字由使用者指定
   */
  def create(name: String): ActorSystem = apply(name)

  /**
   * Java API: 使用指定的名称和设置创建一个新的actor系统,核心actor系统设置在[[BootstrapSetup]]中定义的
   */
  def create(name: String, setups: ActorSystemSetup): ActorSystem = apply(name, setups)

  /**
   * Java API: 同上,内部调用Scala接口
   * def apply(settings: Setup*): ActorSystemSetup = new ActorSystemSetup(settings.map(s ⇒ s.getClass → s).toMap)
   */
  def create(name: String, bootstrapSetup: BootstrapSetup): ActorSystem =
    create(name, ActorSystemSetup.create(bootstrapSetup))

  /**
   * 使用配置config来创建一个名为name的actor系统
   */
  def create(name: String, config: Config): ActorSystem = apply(name, config)

  /**
   * 同上,需要传入类加载器
   */
  def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader)

  /**
   * 请注意,给定的ExecutionContext将由已配置executor =“ default-executor”的所有调度程序使用,
   * 包括那些尚未定义executor设置的调度程序,因此回退至默认值“default-dispatcher.executor”的
   *
   */
  def create(name: String, config: Config, classLoader: ClassLoader, defaultExecutionContext: ExecutionContext): ActorSystem = apply(name, Option(config), Option(classLoader), Option(defaultExecutionContext))

  /**
   * Scala API: 同create,Scala使用 val system = ActorSystem() 创建单例对象的实例
   */
  def apply(): ActorSystem = apply("default")

  /**
   * Scala API: 同create
   */
  def apply(name: String): ActorSystem = apply(name, None, None, None)

  /**
   * Scala API: 同create
   */
  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    //获取引导程序配置
    val bootstrapSettings = setup.get[BootstrapSetup]
    //从引导程序配置中获取类加载器,否则查找,底层查找顺序如下(如上classLoader参数的解释)
    //Option(Thread.currentThread.getContextClassLoader) orElse
    //(Reflect.getCallerClass map findCaller) getOrElse getClass.getClassLoader
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    //从引导程序配置中获取配置,否则使用ConfigFactory加载指定的类加载下的默认配置("reference.conf")
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    //从引导程序配置中获取默认的执行线程池
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)
    //actorSystem的具体实现
    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
  }

  /**
   * Scala API: 同上apply,参数不同
   */
  def apply(name: String, bootstrapSetup: BootstrapSetup): ActorSystem =
    create(name, ActorSystemSetup.create(bootstrapSetup))

  /**
   * Scala API: 同上apply,参数不同
   */
  def apply(name: String, config: Config): ActorSystem = apply(name, Option(config), None, None)

  /**
   * Scala API: 同上apply,参数不同
   */
  def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, Option(config), Option(classLoader), None)

  /**
   * Scala API: 同上apply,参数不同
   */
  def apply(
    name: String,
    config: Option[Config] = None,
    classLoader: Option[ClassLoader] = None,
    defaultExecutionContext: Option[ExecutionContext] = None): ActorSystem =
    apply(name, ActorSystemSetup(BootstrapSetup(classLoader, config, defaultExecutionContext)))

  /**
   * 设置是ActorSystem的总体设置,还可以方便地访问Config对象(The Typesafe Config Library API)
   */
  class Settings(classLoader: ClassLoader, cfg: Config, final val name: String, val setup: ActorSystemSetup) {

    def this(classLoader: ClassLoader, cfg: Config, name: String) = this(classLoader, cfg, name, ActorSystemSetup())

    /**
     * actor系统配置的后备配置
     */
    final val config: Config = {
      //检查配置
      val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader))
      config.checkValid(ConfigFactory.defaultReference(classLoader), "akka")
      config
    }

    //获取当前akka的版本,和当前使用的是哪中actor(本地、远程、集群)
    final val ConfigVersion: String = getString("akka.version")
    final val ProviderClass: String =
      setup.get[BootstrapSetup]
        .flatMap(_.actorRefProvider).map(_.identifier)
        .getOrElse(getString("akka.actor.provider")) match {
        case "local"  classOf[LocalActorRefProvider].getName
        //这两个类不能被类引用,因为它们可能不在类路径中(需要额外的依赖,非基础actor系统所需)
        case "remote"  "akka.remote.RemoteActorRefProvider"
        case "cluster"  "akka.cluster.ClusterActorRefProvider"
        case fqcn  fqcn
      }
    //以下配置在akka-actor reference.conf,默认值可能已经更改
    //获取当前配置监护人策略,指/user路径下的监护
    final val SupervisorStrategyClass: String = getString("akka.actor.guardian-supervisor-strategy")
    //创建超时,指ActorSystem.actorOf的超时
    final val CreationTimeout: Timeout = Timeout(config.getMillisDuration("akka.actor.creation-timeout"))
    //向正在启动的顶级actor发送操作的超时 仅当将绑定信箱或CallingThreadDispatcher用于顶级actor时,这才有意义
    final val UnstartedPushTimeout: Timeout = Timeout(config.getMillisDuration("akka.actor.unstarted-push-timeout"))
    //是否允许Java序列化,默认on,以后可能为off不推荐用
    final val AllowJavaSerialization: Boolean = getBoolean("akka.actor.allow-java-serialization")
    //是否允许启用其他序列化绑定,用于兼容
    final val EnableAdditionalSerializationBindings: Boolean =
      !AllowJavaSerialization || getBoolean("akka.actor.enable-additional-serialization-bindings")
    //是否序列化所有消息  序列化和反序列化(非原始)消息以确保不变性,这仅用于测试默认off
    final val SerializeAllMessages: Boolean = getBoolean("akka.actor.serialize-messages")
    //对创建者进行序列化和反序列化(在Props中)以确保可以通过网络发送它们,这仅用于测试 标记为deploy.scope == LocalScope的纯本地部署免于验证默认off
    final val SerializeAllCreators: Boolean = getBoolean("akka.actor.serialize-creators")
    //日志等级 OFF, ERROR, WARNING, INFO, DEBUG 可选
    final val LogLevel: String = getString("akka.loglevel")
    //控制台的日志等级
    final val StdoutLogLevel: String = getString("akka.stdout-loglevel")
    //日志实现,可多个,在引导时注册,默认akka.event.Logging$DefaultLogger
    final val Loggers: immutable.Seq[String] = immutableSeq(getStringList("akka.loggers"))
    //默认的日志调度器
    final val LoggersDispatcher: String = getString("akka.loggers-dispatcher")
    //日志的过滤器 过滤LoggingAdapter在将日志事件发布到eventStream之前使用的日志事件,默认akka.event.DefaultLoggingFilter
    final val LoggingFilter: String = getString("akka.logging-filter")
    //日志启动超时大小 日志是在ActorSystem启动期间同步创建和注册的,并且由于它们是actor,因此此超时用于限制等待时间,默认5s
    final val LoggerStartTimeout: Timeout = Timeout(config.getMillisDuration("akka.logger-startup-timeout"))
    //启动actor系统时,将完整的配置记录在INFO级别这在不确定使用哪种配置时很有用默认off
    final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
    //将被记录的死信个数,以info级别记录,默认10
    final val LogDeadLetters: Int = toRootLowerCase(config.getString("akka.log-dead-letters")) match {
      case "off" | "false"  0
      case "on" | "true"  Int.MaxValue
      case _  config.getInt("akka.log-dead-letters")
    }
    //在actor系统关闭时,关闭对死信的记录
    final val LogDeadLettersDuringShutdown: Boolean = config.getBoolean("akka.log-dead-letters-during-shutdown")

    //该功能以DEBUG级别记录任何收到的消息(这里的记录是在开启debug后会打印出来)
    final val AddLoggingReceive: Boolean = getBoolean("akka.actor.debug.receive")
    //记录所有消息,包括Kill, PoisonPill
    final val DebugAutoReceive: Boolean = getBoolean("akka.actor.debug.autoreceive")
    //记录actor生命周期的更改
    final val DebugLifecycle: Boolean = getBoolean("akka.actor.debug.lifecycle")
    //启用所有LoggingFSM的调试日志,包括events, transitions and timers
    final val FsmDebugEvent: Boolean = getBoolean("akka.actor.debug.fsm")
    //记录eventStream上的订阅的更改
    final val DebugEventStream: Boolean = getBoolean("akka.actor.debug.event-stream")
    //记录为处理的消息(与死信不同)
    final val DebugUnhandledMessage: Boolean = getBoolean("akka.actor.debug.unhandled")
    //启用错误配置的路由器的WARN日志记录
    final val DebugRouterMisconfiguration: Boolean = getBoolean("akka.actor.debug.router-misconfiguration")

    final val Home: Option[String] = config.getString("akka.home") match {
      case ""  None
      case x  Some(x)
    }

    //定时器的具体实现类
    final val SchedulerClass: String = getString("akka.scheduler.implementation")
    //此ActorSystem创建的线程是否应为守护程序
    final val Daemonicity: Boolean = getBoolean("akka.daemonic")
    final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
    //JVM shutdown, System.exit(-1), in case of a fatal error,such as OutOfMemoryError
    final val JvmShutdownHooks: Boolean = getBoolean("akka.jvm-shutdown-hooks")
    //一致性hash路由器的每个节点的虚拟节点的数量,这里感觉是每个节点之间的虚拟节点数
    final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor")

    if (ConfigVersion != Version)
      throw new akka.ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")

    /**
     * Returns the String representation of the Config that this Settings is backed by
     */
    override def toString: String = config.root.render

  }

  private[akka] def findClassLoader(): ClassLoader = Reflect.findClassLoader()
}

/**
 * 此类不打算由用户代码扩展 如果你想实际使用自己的Akka,最好考虑扩展[[akka.actor.ExtendedActorSystem]]
 *
 */
abstract class ActorSystem extends ActorRefFactory {

  import ActorSystem._

  /**
   * actor系统的名称,用以区分同一jvm或类加载器下的不同的actor系统
   */
  def name: String

  /**
   * 从提供的配置中提取核心设置
   */
  def settings: Settings

  /**
   * 日志配置
   */
  def logConfiguration(): Unit

  /**
   * 在应用程序守护程序下面构造一个路径,以与[[ActorSystem#actorSelection]]一起使用
   */
  def /(name: String): ActorPath

  /**
   * Java API: 同/
   */
  def child(child: String): ActorPath = /(child)

  /**
   * 同上
   */
  def /(name: Iterable[String]): ActorPath

  /**
   * Java API: 递归创建,会附加所有孩子actor的名称
   */
  def descendant(names: java.lang.Iterable[String]): ActorPath = /(immutableSeq(names))

  /**
   * 自该时间起的启动时间(以毫秒为单位)
   */
  val startTime: Long = System.currentTimeMillis

  /**
   * 该actor系统的正常运行时间(以秒为单位)
   */
  def uptime: Long = (System.currentTimeMillis - startTime) / 1000

  /**
   * 这个actor系统的主事件总线,例如用于日志
   */
  def eventStream: EventStream

  /**
   * Java API: 同上
   */
  def getEventStream: EventStream = eventStream

  /**
   * 方便的日志记录适配器,用于[[ActorSystem#eventStream]]
   */
  def log: LoggingAdapter

  /**
   * Actor引用,将消息发送给已停止或不存在的actor时会将消息重新路由到该引用 尽力而为地交付给此actor,因此不能严格保证
   */
  def deadLetters: ActorRef

  //#scheduler
  /**
   * 轻型调度程序,用于在将来某个截止日期之后运行异步任务 不是很精确,但是很廉价
   */
  def scheduler: Scheduler

  //#scheduler

  /**
   * Java API: 同上
   */
  def getScheduler: Scheduler = scheduler

  /**
   * 用于查找已配置的调度程序的Helper对象
   */
  def dispatchers: Dispatchers

  /**
   * 默认的调度器
   **/
  implicit def dispatcher: ExecutionContextExecutor

  /**
   * Java API: 同上
   */
  def getDispatcher: ExecutionContextExecutor = dispatcher

  /**
   * 查找配置的信箱类型的帮助对象
   */
  def mailboxes: Mailboxes

  /**
   * 注册一个代码块(回调),以便在[actor system.terminate()]]发出并且此actor系统中的所有actor都已停止后运行
   * 通过多次调用此方法,可以注册多个代码块回调将按与注册相反的顺序依次运行,即先运行最后注册的回调
   * 请注意,在完成所有已注册的回调之前,ActorSystem不会终止
   *
   * 如果系统已终止或终止已启动,则抛出RejectedExecutionException
   *
   * Scala API
   */
  def registerOnTermination[T](code:  T): Unit

  /**
   * Java API: 注册的是Runnable任务
   */
  def registerOnTermination(code: Runnable): Unit

  /**
   * 终止此actor系统这将停止守护者actor,而守护者(监护者)actor又将递归地停止其所有子actor,系统守护者(日志记录参与者所在的位置),然后执行所有注册的终止处理程序
   * 注意不要使用此actor系统的“dispatcher”在返回的future完成时安排任何操作,因为它在future完成之前已经关闭
   */
  def terminate(): Future[Terminated]

  /**
   * 返回在ActorSystem终止并执行终止挂钩后将完成的未来如果您使用[[registerOnTermination]]注册了任何回调,
   * 则在完成所有注册的回调之前,此方法返回的Future将不会完成
   * 注意不要在这个actor系统的“dispatcher”上安排任何操作,因为它在将来完成之前已经被关闭
   */
  def whenTerminated: Future[Terminated]

  /**
   * 返回一个CompletionStage,它将在ActorSystem终止并执行终止挂钩后完成如果您使用[[registerOnTermination]]注册了任何回调,
   * 则在完成所有注册的回调之前,此方法返回的CompletionStage将不会完成
   * 注意不要在这个actor系统的“dispatcher”上安排任何操作,因为它在将来完成之前已经被关闭
   */
  def getWhenTerminated: CompletionStage[Terminated]

  /**
   * 注册提供的扩展并创建其负载(payload),如果此扩展尚未注册,则此方法具有putIfAbsent语义,如果正在从另一个执行线程注册,则此方法可能会阻塞,等待负载的初始化
   */
  def registerExtension[T <: Extension](ext: ExtensionId[T]): T

  /**
   * 返回与提供的扩展关联的有效负载,如果未注册,则引发IllegalStateException如果正在从另一个执行线程注册,则此方法可能会阻塞,等待负载的初始化
   */
  def extension[T <: Extension](ext: ExtensionId[T]): T

  /**
   * 返回指定的扩展是否已注册,如果正在从另一个执行线程注册,则此方法可能会阻塞,等待负载的初始化
   */
  def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean
}

/**
 * 拓展ActorSystem
 */
abstract class ExtendedActorSystem extends ActorSystem {

  /**
   * The ActorRefProvider is the only entity which creates all actor references within this actor system.
   */
  def provider: ActorRefProvider

  /**
   * The top-level supervisor of all actors created using system.actorOf(...).
   */
  def guardian: InternalActorRef

  /**
   * The top-level supervisor of all system-internal services like logging.
   */
  def systemGuardian: InternalActorRef

  /**
   * Create an actor in the "/system" namespace. This actor will be shut down
   * during system.terminate only after all user actors have terminated.
   */
  def systemActorOf(props: Props, name: String): ActorRef

  /**
   * A ThreadFactory that can be used if the transport needs to create any Threads
   */
  def threadFactory: ThreadFactory

  /**
   * ClassLoader wrapper which is used for reflective accesses internally. This is set
   * to use the context class loader, if one is set, or the class loader which
   * loaded the ActorSystem implementation. The context class loader is also
   * set on all threads created by the ActorSystem, if one was set during
   * creation.
   */
  def dynamicAccess: DynamicAccess

  /**
   * Filter of log events that is used by the LoggingAdapter before
   * publishing log events to the eventStream
   */
  def logFilter: LoggingFilter

  /**
   * For debugging: traverse actor hierarchy and make string representation.
   * Careful, this may OOM on large actor systems, and it is only meant for
   * helping debugging in case something already went terminally wrong.
   */
  private[akka] def printTree: String

}

/**
 * ActorSystem内部具体实现
 */
@InternalApi
private[akka] class ActorSystemImpl(
  val name: String,
  applicationConfig: Config,
  classLoader: ClassLoader,
  defaultExecutionContext: Option[ExecutionContext],
  val guardianProps: Option[Props], //构造actor实例的配方
  setup: ActorSystemSetup) extends ExtendedActorSystem {

  //验证ActorSystem名字的有效性
  if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-_]*$"""))
    throw new IllegalArgumentException(
      "invalid ActorSystem name [" + name +
        "], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')")

  import ActorSystem._

  //死信日志监听器 actor
  @volatile private var logDeadLetterListener: Option[ActorRef] = None
  //配置
  final val settings: Settings = new Settings(classLoader, applicationConfig, name, setup)

  //处理未捕获的异常
  protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
    new Thread.UncaughtExceptionHandler() {
      def uncaughtException(thread: Thread, cause: Throwable): Unit = {
        cause match {
          //非致命异常
          case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable  log.error(cause, "Uncaught error from thread [{}]", thread.getName)
          case _ ⇒
            //致命异常 结束
            if (cause.isInstanceOf[IncompatibleClassChangeError] && cause.getMessage.startsWith("akka"))
              System.err.println(
                s"""Detected ${cause.getClass.getName} error, which MAY be caused by incompatible Akka versions on the classpath.
                   | Please note that a given Akka version MUST be the same across all modules of Akka that you are using,
                   | e.g. if you use akka-actor [${akka.Version.current} (resolved from current classpath)] all other core
                   | Akka modules MUST be of the same version. External projects like Alpakka, Persistence plugins or Akka
                   | HTTP etc. have their own version numbers - please make sure you're using a compatible set of libraries.
                  """.stripMargin.replaceAll("[\r\n]", ""))

            if (settings.JvmExitOnFatalError)
              try logFatalError("shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for", cause, thread)
              finally System.exit(-1)
            else
              try logFatalError("shutting down", cause, thread)
              finally terminate()
        }
      }

      //内联函数,记录致命异常
      @inline
      private def logFatalError(message: String, cause: Throwable, thread: Thread): Unit = {
        // First log to stderr as this has the best chance to get through in an 'emergency panic' situation:
        import System.err
        err.print("Uncaught error from thread [")
        err.print(thread.getName)
        err.print("]: ")
        err.print(cause.getMessage)
        err.print(", ")
        err.print(message)
        err.print(" ActorSystem[")
        err.print(name)
        err.println("]")
        System.err.flush()
        cause.printStackTrace(System.err)
        System.err.flush()

        // Also log using the normal infrastructure - hope for the best:
        markerLogging.error(LogMarker.Security, cause, "Uncaught error from thread [{}]: " + cause.getMessage + ", " + message + " ActorSystem[{}]", thread.getName, name)
      }
    }

  //可监控的线程池
  final val threadFactory: MonitorableThreadFactory =
    MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)

  /**
   * 这是一个扩展点:通过重写此方法,子类可以控制参与者系统的所有反射活动
   */
  protected def createDynamicAccess(): DynamicAccess = new ReflectiveDynamicAccess(classLoader)

  private val _pm: DynamicAccess = createDynamicAccess()

  def dynamicAccess: DynamicAccess = _pm

  //在INFO级别记录消息
  def logConfiguration(): Unit = log.info(settings.toString)

  //本实例作为系统实现
  protected def systemImpl: ActorSystemImpl = this

  //在系统监护人(/system)下构建actor(系统监护人和ActorSystem是不同的东西,这里的ActorSystem实例实际上是在/user下的,而不是/system,具体参考actor系统的监护)
  //https://dreamylost.cn/akka/Akka-Actor%E7%9A%84%E7%9B%91%E7%9D%A3%E4%B8%8E%E7%9B%91%E6%8E%A7.html
  def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true)

  //在用户监护人(/user)下创建actor
  def actorOf(props: Props, name: String): ActorRef =
    if (guardianProps.isEmpty) guardian.underlying.attachChild(props, name, systemService = false)
    else throw new UnsupportedOperationException(
      //无法在带有自定义用户监护人的ActorSystem上从外部创建顶级参与者
      s"cannot create top-level actor [$name] from the outside on ActorSystem with custom user guardian")

  //同时,但使用默认的随机名,具体实现在Children中如下:
  //val num = Unsafe.instance.getAndAddLong(this, AbstractActorCell.nextNameOffset, 1)
  //Helpers.base64(num)
  def actorOf(props: Props): ActorRef =
    if (guardianProps.isEmpty) guardian.underlying.attachChild(props, systemService = false)
    else throw new UnsupportedOperationException("cannot create top-level actor from the outside on ActorSystem with custom user guardian")

  //关闭,递归关闭孩子节点,关闭是异步的
  def stop(actor: ActorRef): Unit = {
    val path = actor.path
    val guard = guardian.path
    val sys = systemGuardian.path
    path.parent match {
      case `guard`  guardian ! StopChild(actor)
      case `sys`  systemGuardian ! StopChild(actor)
      case _  actor.asInstanceOf[InternalActorRef].stop()
    }
  }

  import settings._

  // 这提供了基本的日志记录(到stdout),直到在下面调用.start()为止
  val eventStream = new EventStream(this, DebugEventStream)
  eventStream.startStdoutLogger(settings)

  val logFilter: LoggingFilter = {
    val arguments = Vector(classOf[Settings]  settings, classOf[EventStream]  eventStream)
    dynamicAccess.createInstanceFor[LoggingFilter](LoggingFilter, arguments).get
  }

  private[this] val markerLogging = new MarkerLoggingAdapter(eventStream, getClass.getName + "(" + name + ")", this.getClass, logFilter)
  val log: LoggingAdapter = markerLogging

  val scheduler: Scheduler = createScheduler()

  val provider: ActorRefProvider = try {
    val arguments = Vector(
      classOf[String]  name,
      classOf[Settings]  settings,
      classOf[EventStream]  eventStream,
      classOf[DynamicAccess]  dynamicAccess)

    dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get
  } catch {
    case NonFatal(e) 
      Try(stopScheduler())
      throw e
  }

  //死信引用
  def deadLetters: ActorRef = provider.deadLetters

  //信箱
  val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)

  //调度程序
  val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
    threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))

  val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

  val internalCallingThreadExecutionContext: ExecutionContext =
    dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse(
      new ExecutionContext with BatchingExecutor {
        override protected def unbatchedExecute(r: Runnable): Unit = r.run()

        override protected def resubmitOnBlock: Boolean = false // Since we execute inline, no gain in resubmitting
        override def reportFailure(t: Throwable): Unit = dispatcher reportFailure t
      })

  //关闭actor系统的回调
  private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher)

  override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture

  override def getWhenTerminated: CompletionStage[Terminated] = FutureConverters.toJava(whenTerminated)

  //查找actor系统的顶级守护者(/root监护者)
  def lookupRoot: InternalActorRef = provider.rootGuardian

  //用户actor顶级的守护者(/user)
  def guardian: LocalActorRef = provider.guardian

  //系统actor的顶级守护者(/system)
  def systemGuardian: LocalActorRef = provider.systemGuardian

  //在/user下查询指定actor名称的actor的路径
  def /(actorName: String): ActorPath = guardian.path / actorName

  def /(path: Iterable[String]): ActorPath = guardian.path / path

  //检查版本用
  private def allModules: List[String] = List(
    "akka-actor",
    "akka-actor-testkit-typed",
    "akka-actor-typed",
    "akka-agent",
    "akka-camel",
    "akka-cluster",
    "akka-cluster-metrics",
    "akka-cluster-sharding",
    "akka-cluster-sharding-typed",
    "akka-cluster-tools",
    "akka-cluster-typed",
    "akka-discovery",
    "akka-distributed-data",
    "akka-multi-node-testkit",
    "akka-osgi",
    "akka-persistence",
    "akka-persistence-query",
    "akka-persistence-shared",
    "akka-persistence-typed",
    "akka-protobuf",
    "akka-remote",
    "akka-slf4j",
    "akka-stream",
    "akka-stream-testkit",
    "akka-stream-typed")

  @volatile private var _initialized = false

  /**
   * 断言ActorSystem已完全初始化
   */
  def assertInitialized(): Unit =
    if (!_initialized)
      throw new IllegalStateException(
        "The calling code expected that the ActorSystem was initialized but it wasn't yet. " +
          "This is probably a bug in the ActorSystem initialization sequence often related to initialization of extensions. " +
          "Please report at https://github.com/akka/akka/issues."
      )

  //启动,注册关闭回调任务,验证模块的版本,对事件系统,拓展和日志系统进行初始化
  private lazy val _start: this.type = try {

    registerOnTermination(stopScheduler())
    // the provider is expected to start default loggers, LocalActorRefProvider does this
    provider.init(this)
    // at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise
    _initialized = true

    //配置死信个数时开启死信监听器,使用systemActorOf在system/节点上添加子actor
    if (settings.LogDeadLetters > 0)
      logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
    //必须在actor系统“就绪”后调用,Starts system actor that takes care of unsubscribing subscribers that have terminated.
    eventStream.startUnsubscriber()
    ManifestInfo(this).checkSameVersion("Akka", allModules, logWarning = true)
    //加载所有拓展
    loadExtensions()
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) 
      try terminate() catch {
        case NonFatal(_)  Try(stopScheduler())
      }
      throw e
  }

  def start(): this.type = _start

  //注册runnable任务,在关闭时执行回退 与下面不同的是,这个是传入处理任务的函数而不是Runnable对象
  def registerOnTermination[T](code:  T): Unit = {
    registerOnTermination(new Runnable {
      def run = code
    })
  }

  //注册runnable任务,在关闭时执行回退
  def registerOnTermination(code: Runnable): Unit = {
    terminationCallbacks.add(code)
  }

  override def terminate(): Future[Terminated] = {
    //在actor系统关闭时,不需要对死信记录则将死信监听器全部关闭
    if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener foreach stop
    //关闭监护人actor,ActorSystem
    guardian.stop()
    whenTerminated //获取terminationCallbacks的Future
  }

  @volatile var aborting = false

  /**
   * 这种关闭试图关闭系统并释放资源比普通关机更强大例如,它不会等待远程部署的子角色终止,然后终止其父角色
   */
  def abort(): Unit = {
    aborting = true
    terminate()
  }

  //#create-scheduler
  /**
   * Create the scheduler service. This one needs one special behavior: if
   * Closeable, it MUST execute all outstanding tasks upon .close() in order
   * to properly shutdown all dispatchers.
   *
   * Furthermore, this timer service MUST throw IllegalStateException if it
   * cannot schedule a task. Once scheduled, the task MUST be executed. If
   * executed upon close(), the task may execute before its timeout.
   */
  protected def createScheduler(): Scheduler =
    dynamicAccess.createInstanceFor[Scheduler](settings.SchedulerClass, immutable.Seq(
      classOf[Config]  settings.config,
      classOf[LoggingAdapter]  log,
      classOf[ThreadFactory]  threadFactory.withName(threadFactory.name + "-scheduler"))).get

  //#create-scheduler

  /*
   * This is called after the last actor has signaled its termination, i.e.
   * after the last dispatcher has had its chance to schedule its shutdown
   * action.
   */
  protected def stopScheduler(): Unit = scheduler match {
    case x: Closeable  x.close()
    case _ 
  }

  private val extensions = new ConcurrentHashMap[ExtensionId[_], AnyRef]

  /**
   * 返回注册到指定扩展的任何扩展;如果未注册,则返回null
   */
  @tailrec
  private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
    case c: CountDownLatch 
      c.await(); findExtension(ext) //正在注册,等待完成并重试
    case t: Throwable  throw t //初始化失败,再次抛出相同的
    case other 
      other.asInstanceOf[T] //可以是T或null,在这种情况下,我们将null返回为T
  }

  /**
   * 注册拓展
   */
  @tailrec
  final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = {
    findExtension(ext) match {
      case null  //不存在,开始注册
        val inProcessOfRegistration = new CountDownLatch(1)
        extensions.putIfAbsent(ext, inProcessOfRegistration) match { //表示正在进行注册的信号
          case null  try { // 信号已成功发送
            ext.createExtension(this) match { // 创建并初始化扩展
              case null  throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]")
              case instance 
                extensions.replace(ext, inProcessOfRegistration, instance) //用初始化的扩展名替换inProcess信号
                instance //Profit!
            }
          } catch {
            case t: Throwable 
              extensions.replace(ext, inProcessOfRegistration, t) //移除inProcess信号
              throw t //升级 向上抛出异常
          } finally {
            inProcessOfRegistration.countDown //始终将inProcess信号通知监听器
          }
          case _  registerExtension(ext) //其他人正在注册此扩展,请重试
        }
      case existing  existing.asInstanceOf[T]
    }
  }

  //获取拓展
  def extension[T <: Extension](ext: ExtensionId[T]): T = findExtension(ext) match {
    case null  throw new IllegalArgumentException(s"Trying to get non-registered extension [$ext]")
    case some  some.asInstanceOf[T]
  }

  def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null

  //加载拓展
  private def loadExtensions(): Unit = {
    /**
     * 加载扩展失败时引发异常(向后兼容需要)
     */
    def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {
      immutableSeq(settings.config.getStringList(key)) foreach { fqcn 
        dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _  dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match {
          case Success(p: ExtensionIdProvider)  registerExtension(p.lookup())
          case Success(p: ExtensionId[_])  registerExtension(p)
          case Success(_) 
            if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
            else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'")
          case Failure(problem) 
            if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
            else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
        }
      }
    }

    //列出应在actor系统启动时加载的扩展的(Full Qualified Class Name)
    //库扩展是在启动时加载的常规扩展,并且
    //可供第三方库作者在类路径上显示扩展时启用自动加载
    loadExtensions("akka.library-extensions", throwOnLoadFail = true)
    //列出应在actor系统启动时加载的扩展的FQCN
    loadExtensions("akka.extensions", throwOnLoadFail = false)
  }

  override def toString: String = lookupRoot.path.root.address.toString

  //打印actor树
  override def printTree: String = {
    def printNode(node: ActorRef, indent: String): String = {
      node match {
        case wc: ActorRefWithCell 
          val cell = wc.underlying
          (if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") +
            node.path.name + " " + Logging.simpleName(node) + " " +
            (cell match {
              case real: ActorCell  if (real.actor ne null) real.actor.getClass else "null"
              case _  Logging.simpleName(cell)
            }) +
            (cell match {
              case real: ActorCell  " status=" + real.mailbox.currentStatus
              case _ ⇒ ""
            }) +
            " " + (cell.childrenRefs match {
            case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason) ⇒
              "Terminating(" + reason + ")" +
                (toDie.toSeq.sorted mkString("\n" + indent + "   |    toDie: ", "\n" + indent + "   |           ", ""))
            case x@(ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer)  x.toString
            case n: ChildrenContainer.NormalChildrenContainer  n.c.size + " children"
            case x  Logging.simpleName(x)
          }) +
            (if (cell.childrenRefs.children.isEmpty) "" else "\n") +
            ({
              val children = cell.childrenRefs.children.toSeq.sorted
              val bulk = children.dropRight(1) map (printNode(_, indent + "   |"))
              bulk ++ (children.lastOption map (printNode(_, indent + "    ")))
            } mkString ("\n"))
        case _ 
          indent + node.path.name + " " + Logging.simpleName(node)
      }
    }

    printNode(lookupRoot, "")
  }

  //关闭时的回调实现
  final class TerminationCallbacks[T](upStreamTerminated: Future[T])(implicit ec: ExecutionContext) {
    private[this] final val done = Promise[T]()
    private[this] final val ref = new AtomicReference(done)

    //onComplete从不触发两次这样安全以避免空检查
    upStreamTerminated onComplete {
      //设置为null并返回旧值
      t  ref.getAndSet(null).complete(t)
    }

    /**
     * 添加将在ActorSystem终止时执行的任务注意,回调的执行顺序与插入顺序相反
     *
     * @param r 如果在ActorSystem终止后调用抛出RejectedExecutionException
     */
    final def add(r: Runnable): Unit = {
      @tailrec def addRec(r: Runnable, p: Promise[T]): Unit = ref.get match {
        case null  throw new RejectedExecutionException("ActorSystem already terminated.")
        case some if ref.compareAndSet(some, p)  some.completeWith(p.future.andThen { case _  r.run() })
        case _  addRec(r, p)
      }

      addRec(r, Promise[T]())
    }

    /**
     * 返回一个future,该future将在执行所有已注册的回调之后完成
     */
    def terminationFuture: Future[T] = done.future
  }

}

文档信息

Search

    Table of Contents