

17.1 领域对象模式


final case class ItemRef(id: URI)

final case class CustomerRef(id: URI)

final case class ShoppingCartRef(id: URI)

清单 17-1 一个最小化的购物车定义

final case class ShoppingCart(items: Map[ItemRef, Int], owner: Option[CustomerRef]) {
  def setOwner(customer: CustomerRef): ShoppingCart = {
    require(owner.isEmpty, "owner cannot be overwritten")
    copy(owner = Some(customer))

  def addItem(item: ItemRef, count: Int): ShoppingCart = {
    require(count > 0, s"count must be positive (trying to add $item with count $count)")
    val currentCount = items.getOrElse(item, 0)
    copy(items = items.updated(item, currentCount + count))

  def removeItem(item: ItemRef, count: Int): ShoppingCart = {
    require(count > 0, s"count must be positive (trying to remove $item with count $count)")
    val currentCount = items.getOrElse(item, 0)
    val newCount = currentCount - count
    if (newCount <= 0)
      copy(items = items - item)
      copy(items = items.updated(item, newCount))

  // 代码清单 17-6
  def applyEvent(event: Event): ShoppingCart = event match {
    case OwnerChanged(_, owner)      => setOwner(owner)
    case ItemAdded(_, item, count)   => addItem(item, count)
    case ItemRemoved(_, item, count) => removeItem(item, count)

  // 代码清单 17-6

object ShoppingCart {
  val empty = ShoppingCart(Map.empty, None)

清单 17-2 用于和购物车对象通信的消息

trait ShoppingCartMessage {
  def shoppingCart: ShoppingCartRef

sealed trait Command extends ShoppingCartMessage

final case class SetOwner(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Command

final case class AddItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command

final case class RemoveItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command

sealed trait Query extends ShoppingCartMessage

final case class GetItems(shoppingCart: ShoppingCartRef) extends Query

sealed trait Event extends ShoppingCartMessage

final case class OwnerChanged(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Event

final case class ItemAdded(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event

final case class ItemRemoved(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event

sealed trait Result extends ShoppingCartMessage

final case class GetItemsResult(shoppingCart: ShoppingCartRef, items: Map[ItemRef, Int]) extends Result

清单 17-3 一个购物车管理者 Actor

final case class ManagerCommand(cmd: Command, id: Long, replyTo: ActorRef)

final case class ManagerEvent(id: Long, event: Event)

final case class ManagerQuery(cmd: Query, id: Long, replyTo: ActorRef)

final case class ManagerResult(id: Long, result: Result)

final case class ManagerRejection(id: Long, reason: String)

class Manager(var shoppingCart: ShoppingCart) extends Actor {
   * this is the usual constructor, the above allows priming with
   * previously persisted state.
  def this() = this(ShoppingCart.empty)

  def receive: Receive = {
    case ManagerCommand(cmd, id, replyTo) =>
      try {
        val event = cmd match {
          case SetOwner(cart, owner) =>
            shoppingCart = shoppingCart.setOwner(owner)
            OwnerChanged(cart, owner)
          case AddItem(cart, item, count) =>
            shoppingCart = shoppingCart.addItem(item, count)
            ItemAdded(cart, item, count)
          case RemoveItem(cart, item, count) =>
            shoppingCart = shoppingCart.removeItem(item, count)
            ItemRemoved(cart, item, count)
        replyTo ! ManagerEvent(id, event)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)
    case ManagerQuery(cmd, id, replyTo) =>
      try {
        val result = cmd match {
          case GetItems(cart) =>
            GetItemsResult(cart, shoppingCart.items)
        replyTo ! ManagerResult(id, result)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)

17.2 分片模式

清单 17-4 为购物车定义分片算法

object ShardSupport {
   * use the shoppingCart reference as the sharding key; the partial function
   * must return both the key and the message to be forwarded, and if it does
   * not match then the message is dropped
  val extractEntityId: ShardRegion.ExtractEntityId = {
    case mc @ ManagerCommand(cmd, _, _) =>
      cmd.shoppingCart.id.toString -> mc
    case mc @ ManagerQuery(query, _, _) =>
      query.shoppingCart.id.toString -> mc

   * allocate shoppingCarts into 256 shards based on the low 8 bits of their
   * ID’s hash; this is a total function that must be defined for all messages
   * that are forwarded
  val extractShardId: ShardRegion.ExtractShardId = {
    case ManagerCommand(cmd, _, _) =>
      toHex(cmd.shoppingCart.id.hashCode & 255)
    case ManagerQuery(query, _, _) =>
      toHex(query.shoppingCart.id.hashCode & 255)

  private def toHex(b: Int) =
    new java.lang.StringBuilder(2).append(hexDigits(b >> 4)).append(hexDigits(b & 15)).toString

  private val hexDigits = "0123456789ABCDEF"

  val RegionName = "ShoppingCart"

清单 17-5 启动一个群集来托管分片

val sys1 = ActorSystem("ShardingExample", node1Config.withFallback(clusterConfig))
val seed = Cluster(sys1).selfAddress

def startNode(sys: ActorSystem): Unit = {
    typeName = ShardSupport.RegionName,
    entityProps = Props(new Manager),
    settings = ClusterShardingSettings(sys1),
    extractEntityId = ShardSupport.extractEntityId,
    extractShardId = ShardSupport.extractShardId)


val sys2 = ActorSystem("ShardingExample", clusterConfig)


val manager = ClusterSharding(sys1).shardRegion(ShardSupport.RegionName)

17.3 事件溯源模式

清单 17-6 将领域事件添加到业务逻辑

清单 17-7 持久化一个事件溯源领域对象

class PersistentObjectManager extends PersistentActor {
  // we expect the name to be the shopping card ID
  override def persistenceId: String = context.self.path.name

  private var shoppingCart: ShoppingCart = ShoppingCart.empty

  def receiveCommand: Receive = {
    case ManagerCommand(cmd, id, replyTo) =>
      try {
        val event = cmd match {
          case SetOwner(cart, owner)         => OwnerChanged(cart, owner)
          case AddItem(cart, item, count)    => ItemAdded(cart, item, count)
          case RemoveItem(cart, item, count) => ItemRemoved(cart, item, count)
        // perform the update here in order to treat validation errors immediately
        shoppingCart = shoppingCart.applyEvent(event)
        persist(event) { _ =>
          replyTo ! ManagerEvent(id, event)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)
    case ManagerQuery(cmd, id, replyTo) =>
      try {
        val result = cmd match {
          case GetItems(cart) => GetItemsResult(cart, shoppingCart.items)
        replyTo ! ManagerResult(id, result)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)

  def receiveRecover: Receive = {
    case e: Event => shoppingCart = shoppingCart.applyEvent(e)

17.4 事件流模式

清单 17-8 在写日志期间对事件打标签

class ShoppingCartTagging(system: ExtendedActorSystem) extends WriteEventAdapter {
  def manifest(event: Any): String = "" // no additional manifest needed

  def toJournal(event: Any): Any =
    event match {
      case s: ShoppingCartMessage => Tagged(event, Set("shoppingCart"))
      case other                  => other

清单 17-9 一个正在监听事件流的 Actor

object TopProductListener {

  private class IntHolder(var value: Int)


class TopProductListener extends Actor with ActorLogging {

  import TopProductListener._

  private implicit val materializer: ActorMaterializer = ActorMaterializer()

  private val readJournal: LeveldbReadJournal =

    .eventsByTag(tag = "shoppingCart", offset = Sequence(0L))
    .collect { case EventEnvelope(_, _, _, add: ItemAdded) => add }
    .groupedWithin(100000, 1.second)
    .runForeach { seq: Seq[ItemAdded] =>
      val histogram = seq.foldLeft(Map.empty[ItemRef, IntHolder]) { (map, event) =>
        map.get(event.item) match {
          case Some(holder) =>
            holder.value += event.count
          case None =>
            map.updated(event.item, new IntHolder(event.count))
      self ! TopProducts(0, histogram.map(p => (p._1, p._2.value)))

  private var topProducts = Map.empty[ItemRef, Int]

  def receive: Receive = {
    case GetTopProducts(id, replyTo) =>
      replyTo ! TopProducts(id, topProducts)
    case TopProducts(_, products) =>
      topProducts = products
      log.info("new {}", products)