Actor的信箱与使用

2019/10/24 AkkaActor 共 8240 字,约 24 分钟
梦境迷离

依赖

若要使用邮箱,必须在项目中添加下列依赖项:

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.26"

Akka的邮箱保存着发送给Actor的消息。通常,每个Actor都有自己的邮箱,但是使用BalancingPool,所有的路由器都将共享一个邮箱实例。 此处的邮箱与信箱意思等同,传递意思等同发送。

PS:利用邮箱缓存消息并非Actor模型自带的功能,而是Akka库的功能。

邮箱选择

Actor需要的消息队列类型

通过让某个Actor扩展参数化特征RequiresMessageQueue,可以为特定类型的Actor要求某种类型的消息队列。以下是一个例子:

import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics

class MyBoundedActor extends MyActor with RequiresMessageQueue[BoundedMessageQueueSemantics]

需要将RequiresMessageQueue特质的类型参数映射到配置中的邮箱,如下所示:

bounded-mailbox {
  mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
  mailbox-capacity = 1000 
}

akka.actor.mailbox.requirements {
  "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

现在,每次创建MyBoundedActor类型的Actor时,它都会尝试获得一个有界的邮箱。如果Actor在部署中配置了不同的邮箱(直接或通过具有指定邮箱类型的dispatcher),则将覆盖此映射配置。

为Actor创建的邮箱中的队列类型将根据该特质中的所需类型进行检查,如果队列没有实现所需的类型,则Actor的创建将失败。

调度器需要的消息队列类型

调度器还可能需要运行在其上的Actor使用的邮箱类型。一个例子是BalancingDispatcher,它需要一个对多个并发消费者来说是线程安全的消息队列。这样的要求是在dispatcher配置部分中配置的,如下所示:

my-dispatcher {
  mailbox-requirement = org.example.MyInterface //暂称requirement为“要求的或需要的邮箱类型”
}

给定的是一个类或接口的全限定名,然后确保它是消息队列的实现的超类型。如有冲突,例如,如果Actor需要一个不能满足此要求的邮箱类型,那么Actor的创建就会失败。

如何选择邮箱类型

当创建一个Actor时,ActorRefProvider首先确定将执行它的调度器,然后按以下方式确定邮箱:

  1. 如果Actor的部署配置部分包含mailbox key,那么指定的此配置部分将描述要使用的邮箱类型。
  2. 如果actor的Props包含邮箱选择(即在其上调用了WithMailbox),那么指定的此配置部分将描述要使用的邮箱类型(请注意,这需要是一个绝对配置路径,例如myapp.Special-mailbox,而不是嵌套在Akka命名空间中的相对路径)。
  3. 如果dispatcher的配置部分包含mailbox-type key,则将使用相同部分来配置邮箱类型。
  4. 如果Actor需要如上所述的邮箱类型,则将使用该要求的映射来确定要使用的邮箱类型;如果失败则将尝试dispatcher的(如果有的话)。
  5. 如果dispatcher需要如上所述的邮箱类型,则将使用该要求的映射来确定要使用的邮箱类型。
  6. 默认邮箱akka.actor.default-mailbox将被使用。

此处有点难以理解可能有偏差,可以参考英文理解,官网文档。我理解这里是一个优先级关系,但可能描述不太好理解,这里“该要求的映射应该是指配置中mailbox-requirement的值”。

默认邮箱

如果未如上所述指定需要的邮箱,则使用默认邮箱。默认情况下,它是一个无边界邮箱,由java.util.concurrent.concurrentlinkedqueue支持。

SingleConsumerOnlyUndeddedMailbox是一个更高效的邮箱,它可以用作默认邮箱,但不能与BalancingDispatcher一起使用。

将SingleConsumerOnlyUnderdedMailbox配置为默认邮箱:

akka.actor.default-mailbox {
  mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

将哪些配置传递给邮箱类型

每个邮箱类型都由一个类实现,该类扩展了MailboxType,并接受两个构造函数参数:ActorSystem.Settings对象和Config部分。后者是通过从Actor系统的配置中获取命名的配置部分,用邮箱类型的配置路径覆盖其id key并向默认邮箱配置部分添加fall-back来实现的

内建邮箱的实现

Akka附带了许多邮箱实现,如下所示:

这些邮箱几乎都是无界且非阻塞的。

  • UnboundedMailbox(默认)
    • 默认邮箱
    • 实现:基于java.util.concurrent.ConcurrentLinkedQueue
    • 阻塞:否
    • 有界性:否
    • 配置名称:unbounded 或 akka.dispatch.UnboundedMailbox
  • SingleConsumerOnUnedMailbox
    • 这个队列可能比默认队列快,也可能不快,这取决于您的用例(一定要正确地进行基准测试)
    • 实现:基于多生产者单消费者的队列,不能与BalancingDispatcher同时使用
    • 阻塞:否
    • 有界性:否
    • 配置名称:akka.dispatch.SingleConsumerOnlyUnboundedMailbox
  • NonBlockingBoundedMailbox
    • 实现:基于一种高效的多生产者单消费者队列
    • 阻塞:否,将溢出的消息丢弃到死信中
    • 有界性:是
    • 配置名称:akka.dispatch.NonBlockingBoundedMailbox
  • UnboundedControlAwareMailbox
    • 传递具有更高优先级的继承自akka.dispatch.ControlMessage的消息
    • 实现:基于两个java.util.concurrent.ConcurrentLinkedQueue
    • 阻塞:否
    • 有界性:否
    • 配置名称:akka.dispatch.UnboundedControlAwareMailbox
  • UnboundedPriorityMailbox
    • 相同优先级消息的传递顺序是未定义的(与StablePriorityMailbox形成对比)
    • 实现:基于java.util.concurrent.PriorityBlockingQueue
    • 阻塞:否
    • 有界性:否
    • 配置名称:akka.dispatch.UnboundedPriorityMailbox
  • UnboundedStablePriorityMailbox
    • 对于优先级相同的消息保留FIFO顺序(与UnboundedPriorityMailbox形成对比)
    • 实现:基于akka.util.PriorityQueueStabilizer(封装了java.util.concurrent.PriorityBlockingQueue)
    • 阻塞:否
    • 有界性:否
    • 配置名称:akka.dispatch.UnboundedStablePriorityMailbox

当达到容量并配置的mailbox-push-timeout-time为非0,其他有界限的邮箱实现将阻止发送者。

以下邮箱只应与mailbox-push-timeout-time为0的一起使用,因为当mailbox-push-timeout-time非0时,下面所有的信箱都是阻塞的,反应式不推荐使用阻塞,万不得已也应当隔离出阻塞操作到独立的调度线程。

  • BoundedMailbox
    • 实现:基于java.util.concurrent.LinkedBlockingQueue
    • 阻塞:若与mailbox-push-timeout-time不为0时一起使用则是,否则不是
    • 有界性:是
    • 配置名称:bounded 或 akka.dispatch.BoundedMailbox
  • BoundedPriorityMailbox
    • 实现:基于akka.util.BoundedBlockingQueue(封装了java.util.PriorityQueue)
    • 相同优先级消息的传递顺序是未定义的(与BoundedStablePriorityMailbox形成对比)
    • 阻塞:若与mailbox-push-timeout-time不为0时一起使用则是,否则不是
    • 有界性:是
    • 配置名称:akka.dispatch.BoundedPriorityMailbox
  • BoundedStablePriorityMailbox
    • 实现:基于akka.util.PriorityQueueStabilizer(封装了java.util.PriorityQueue)
    • 阻塞:若与mailbox-push-timeout-time不为0时一起使用则是,否则不是
    • 有界性:是
    • 配置名称:akka.dispatch.BoundedStablePriorityMailbox
  • BoundedControlAwareMailbox
    • 传递具有更高优先级的继承自akka.dispatch.ControlMessage的消息
    • 实现:基于两个java.util.concurrent.ConcurrentLinkedQueue,如果已达到容量,则阻塞队列。
    • 阻塞:若与mailbox-push-timeout-time不为0时一起使用则是,否则不是
    • 有界性:是
    • 配置名称:akka.dispatch.BoundedControlAwareMailbox

邮箱配置示例

优先级邮箱

如何创建PriorityMailbox:

import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config

// We inherit, in this case, from UnboundedStablePriorityMailbox
// and seed it with the priority generator
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
    extends UnboundedStablePriorityMailbox(
      // Create a new PriorityGenerator, lower prio means more important
      PriorityGenerator {
        //如果可能的话,应该首先处理高优先级的消息
        case 'highpriority => 0

        //如果可能的话,应该最后处理低优先级的消息
        case 'lowpriority => 2

        // PoisonPill when no other left
        case PoisonPill => 3

        // We default to 1, which is in between high and low
        case otherwise => 1
      }

然后将其添加到配置中:

prio-dispatcher {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other dispatcher configuration goes here
}

然后是一个如何使用它的例子:

// We create a new Actor that just prints out what it processes
class Logger extends Actor {
  val log: LoggingAdapter = Logging(context.system, this)

  self ! 'lowpriority
  self ! 'lowpriority
  self ! 'highpriority
  self ! 'pigdog
  self ! 'pigdog2
  self ! 'pigdog3
  self ! 'highpriority
  self ! PoisonPill

  def receive = {
    case x => log.info(x.toString)
  }
}
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher("prio-dispatcher"))

/*
 * Logs:
 * 'highpriority
 * 'highpriority
 * 'pigdog
 * 'pigdog2
 * 'pigdog3
 * 'lowpriority
 * 'lowpriority
 */

还可以像这样直接配置邮箱类型(这是顶级配置条目):

prio-mailbox {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other mailbox configuration goes here
}

akka.actor.deployment {
  /priomailboxactor {
    mailbox = prio-mailbox
  }
}

然后在部署中使用它,如下所示:

import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "priomailboxactor")

或者这样的代码:

import akka.actor.Props
val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))

控制消息

如果一个Actor需要能够立即接收控制消息,那么ControlAwareMailbox就会非常有用,而不管它的邮箱中已经有多少其他消息。如前介绍,这个消息类型优先级更高。

它可以配置如下:

control-aware-dispatcher {
  mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
  //Other dispatcher configuration goes here
}

控制消息需要扩展ControlMessage特质:

import akka.dispatch.ControlMessage

case object MyControlMessage extends ControlMessage

然后是一个如何使用它的例子:

// We create a new Actor that just prints out what it processes
class Logger extends Actor {
  val log: LoggingAdapter = Logging(context.system, this)

  self ! 'foo
  self ! 'bar
  self ! MyControlMessage
  self ! PoisonPill

  def receive = {
    case x => log.info(x.toString)
  }
}
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher("control-aware-dispatcher"))

/*
 * Logs:
 * MyControlMessage
 * 'foo
 * 'bar
 */

创建自己的邮箱类型

// Marker trait used for mailbox requirements mapping
trait MyUnboundedMessageQueueSemantics
boundedMailbox extends MailboxType with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._

  // 此构造函数签名必须存在,它将由Akka调用。
  def this(settings: ActorSystem.Settings, config: Config) = {
    // 将你的初始化代码放在这里
    this()
  }

  // 调用create方法来创建MessageQueue
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new MyMessageQueue()
}

然后将MailboxType的FQCN指定为dispatcher配置或邮箱配置中“mailbox-type”的值。

确保包含一个构造函数,该构造函数需要akka.actor.ActorSystem.Settings和com.ypesafe.config.Config参数,因为此构造函数是以反射方式调用来构造你的邮箱类型的。作为第二个参数传入的配置是配置中使用此邮箱类型描述dispatcher或邮箱设置的部分;将对使用该配置类型的每个dispatcher或邮箱设置实例化邮箱类型一次。

您还可以使用邮箱作为调度器的要求(requirement ),如下所示:

custom-dispatcher {
  mailbox-requirement =
  "jdocs.dispatcher.MyUnboundedMessageQueueSemantics"
}

akka.actor.mailbox.requirements {
  "jdocs.dispatcher.MyUnboundedMessageQueueSemantics" =
  custom-dispatcher-mailbox
}

custom-dispatcher-mailbox {
  mailbox-type = "jdocs.dispatcher.MyUnboundedMailbox"
}

或者像这样定义Actor类的要求:

class MySpecialActor extends Actor with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
  // ...
}

system.actorOf的特殊语义

为了使system.actorOf同步和非阻塞,同时保持返回类型ActorRef(以及返回的ref全部功能的语义),对这种情况进行特殊处理。 在幕后,构造了一种无意义的Actor引用,它被发送给系统的监护人Actor,实际上是监护人Actor创建了该Actor及其上下文,并将这些内容放入到该Actor的引用中。在此之前,发送给ActorRef的消息将在本地排队,只有在交换了真正的填充后,它们才会被传输到真正的邮箱中。因此,

val props: Props = ...
//此actor使用MyCustomMailbox,假定它是一个单例
system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")

可能会失败;您必须留出一些时间来通过并重试检查

  • 使用搜狗翻译、百度翻译、谷歌翻译,仅供参考
  • 来自官方文档、参考《响应式架构 消息模式Actor实现与Scala、Akka应用集成》
  • 后续随着理解深入会继续修改错误和描述,以便更好理解,本博客开源,欢迎指出错误

文档信息

Search

    Table of Contents