第17章——状态管理和持久化模式

前一章介绍了消息速率、负载和时间的各种概念;我们之前只考虑了不同组件之间和时间无关的关联。这一章增加了另一个正交维度来完成这整个版图:维持状态几乎是所有组件的共同目的,而我们还没有讨论应该怎么做到这一点。这里所呈现的模式彼此密切相关,并形成了一个有机整体。

17.1 领域对象模式

DomainObject.scala

final case class ItemRef(id: URI)

final case class CustomerRef(id: URI)

final case class ShoppingCartRef(id: URI)

清单 17-1 一个最小化的购物车定义

final case class ShoppingCart(items: Map[ItemRef, Int], owner: Option[CustomerRef]) {
  def setOwner(customer: CustomerRef): ShoppingCart = {
    require(owner.isEmpty, "owner cannot be overwritten")
    copy(owner = Some(customer))
  }

  def addItem(item: ItemRef, count: Int): ShoppingCart = {
    require(count > 0, s"count must be positive (trying to add $item with count $count)")
    val currentCount = items.getOrElse(item, 0)
    copy(items = items.updated(item, currentCount + count))
  }

  def removeItem(item: ItemRef, count: Int): ShoppingCart = {
    require(count > 0, s"count must be positive (trying to remove $item with count $count)")
    val currentCount = items.getOrElse(item, 0)
    val newCount = currentCount - count
    if (newCount <= 0)
      copy(items = items - item)
    else
      copy(items = items.updated(item, newCount))
  }

  // 代码清单 17-6
  def applyEvent(event: Event): ShoppingCart = event match {
    case OwnerChanged(_, owner)      => setOwner(owner)
    case ItemAdded(_, item, count)   => addItem(item, count)
    case ItemRemoved(_, item, count) => removeItem(item, count)
  }

  // 代码清单 17-6
}

object ShoppingCart {
  val empty = ShoppingCart(Map.empty, None)
}

清单 17-2 用于和购物车对象通信的消息

trait ShoppingCartMessage {
  def shoppingCart: ShoppingCartRef
}

sealed trait Command extends ShoppingCartMessage

final case class SetOwner(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Command

final case class AddItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command

final case class RemoveItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command

sealed trait Query extends ShoppingCartMessage

final case class GetItems(shoppingCart: ShoppingCartRef) extends Query

sealed trait Event extends ShoppingCartMessage

final case class OwnerChanged(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Event

final case class ItemAdded(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event

final case class ItemRemoved(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event

sealed trait Result extends ShoppingCartMessage

final case class GetItemsResult(shoppingCart: ShoppingCartRef, items: Map[ItemRef, Int]) extends Result

清单 17-3 一个购物车管理者 Actor

final case class ManagerCommand(cmd: Command, id: Long, replyTo: ActorRef)

final case class ManagerEvent(id: Long, event: Event)

final case class ManagerQuery(cmd: Query, id: Long, replyTo: ActorRef)

final case class ManagerResult(id: Long, result: Result)

final case class ManagerRejection(id: Long, reason: String)

class Manager(var shoppingCart: ShoppingCart) extends Actor {
  /*
   * this is the usual constructor, the above allows priming with
   * previously persisted state.
   */
  def this() = this(ShoppingCart.empty)

  def receive: Receive = {
    case ManagerCommand(cmd, id, replyTo) =>
      try {
        val event = cmd match {
          case SetOwner(cart, owner) =>
            shoppingCart = shoppingCart.setOwner(owner)
            OwnerChanged(cart, owner)
          case AddItem(cart, item, count) =>
            shoppingCart = shoppingCart.addItem(item, count)
            ItemAdded(cart, item, count)
          case RemoveItem(cart, item, count) =>
            shoppingCart = shoppingCart.removeItem(item, count)
            ItemRemoved(cart, item, count)
        }
        replyTo ! ManagerEvent(id, event)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)
      }
    case ManagerQuery(cmd, id, replyTo) =>
      try {
        val result = cmd match {
          case GetItems(cart) =>
            GetItemsResult(cart, shoppingCart.items)
        }
        replyTo ! ManagerResult(id, result)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)
      }
  }
}

17.2 分片模式

清单 17-4 为购物车定义分片算法

object ShardSupport {
  /*
   * use the shoppingCart reference as the sharding key; the partial function
   * must return both the key and the message to be forwarded, and if it does
   * not match then the message is dropped
   */
  val extractEntityId: ShardRegion.ExtractEntityId = {
    case mc @ ManagerCommand(cmd, _, _) =>
      cmd.shoppingCart.id.toString -> mc
    case mc @ ManagerQuery(query, _, _) =>
      query.shoppingCart.id.toString -> mc
  }

  /*
   * allocate shoppingCarts into 256 shards based on the low 8 bits of their
   * ID’s hash; this is a total function that must be defined for all messages
   * that are forwarded
   */
  val extractShardId: ShardRegion.ExtractShardId = {
    case ManagerCommand(cmd, _, _) =>
      toHex(cmd.shoppingCart.id.hashCode & 255)
    case ManagerQuery(query, _, _) =>
      toHex(query.shoppingCart.id.hashCode & 255)
  }

  private def toHex(b: Int) =
    new java.lang.StringBuilder(2).append(hexDigits(b >> 4)).append(hexDigits(b & 15)).toString

  private val hexDigits = "0123456789ABCDEF"

  val RegionName = "ShoppingCart"
}

清单 17-5 启动一个群集来托管分片

val sys1 = ActorSystem("ShardingExample", node1Config.withFallback(clusterConfig))
val seed = Cluster(sys1).selfAddress

def startNode(sys: ActorSystem): Unit = {
  Cluster(sys).join(seed)
  ClusterSharding(sys).start(
    typeName = ShardSupport.RegionName,
    entityProps = Props(new Manager),
    settings = ClusterShardingSettings(sys1),
    extractEntityId = ShardSupport.extractEntityId,
    extractShardId = ShardSupport.extractShardId)
}

startNode(sys1)

val sys2 = ActorSystem("ShardingExample", clusterConfig)
startNode(sys2)

Sharding.scala

val manager = ClusterSharding(sys1).shardRegion(ShardSupport.RegionName)

17.3 事件溯源模式

清单 17-6 将领域事件添加到业务逻辑

final case class ShoppingCart(items: Map[ItemRef, Int], owner: Option[CustomerRef]) {
  def setOwner(customer: CustomerRef): ShoppingCart = {
    require(owner.isEmpty, "owner cannot be overwritten")
    copy(owner = Some(customer))
  }

  def addItem(item: ItemRef, count: Int): ShoppingCart = {
    require(count > 0, s"count must be positive (trying to add $item with count $count)")
    val currentCount = items.getOrElse(item, 0)
    copy(items = items.updated(item, currentCount + count))
  }

  def removeItem(item: ItemRef, count: Int): ShoppingCart = {
    require(count > 0, s"count must be positive (trying to remove $item with count $count)")
    val currentCount = items.getOrElse(item, 0)
    val newCount = currentCount - count
    if (newCount <= 0)
      copy(items = items - item)
    else
      copy(items = items.updated(item, newCount))
  }

  // 代码清单 17-6
  def applyEvent(event: Event): ShoppingCart = event match {
    case OwnerChanged(_, owner)      => setOwner(owner)
    case ItemAdded(_, item, count)   => addItem(item, count)
    case ItemRemoved(_, item, count) => removeItem(item, count)
  }

  // 代码清单 17-6
}

object ShoppingCart {
  val empty = ShoppingCart(Map.empty, None)
}

清单 17-7 持久化一个事件溯源领域对象

class PersistentObjectManager extends PersistentActor {
  // we expect the name to be the shopping card ID
  override def persistenceId: String = context.self.path.name

  private var shoppingCart: ShoppingCart = ShoppingCart.empty

  def receiveCommand: Receive = {
    case ManagerCommand(cmd, id, replyTo) =>
      try {
        val event = cmd match {
          case SetOwner(cart, owner)         => OwnerChanged(cart, owner)
          case AddItem(cart, item, count)    => ItemAdded(cart, item, count)
          case RemoveItem(cart, item, count) => ItemRemoved(cart, item, count)
        }
        // perform the update here in order to treat validation errors immediately
        shoppingCart = shoppingCart.applyEvent(event)
        persist(event) { _ =>
          replyTo ! ManagerEvent(id, event)
        }
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)
      }
    case ManagerQuery(cmd, id, replyTo) =>
      try {
        val result = cmd match {
          case GetItems(cart) => GetItemsResult(cart, shoppingCart.items)
        }
        replyTo ! ManagerResult(id, result)
      } catch {
        case ex: IllegalArgumentException =>
          replyTo ! ManagerRejection(id, ex.getMessage)
      }
  }

  def receiveRecover: Receive = {
    case e: Event => shoppingCart = shoppingCart.applyEvent(e)
  }
}

17.4 事件流模式

清单 17-8 在写日志期间对事件打标签

class ShoppingCartTagging(system: ExtendedActorSystem) extends WriteEventAdapter {
  def manifest(event: Any): String = "" // no additional manifest needed

  def toJournal(event: Any): Any =
    event match {
      case s: ShoppingCartMessage => Tagged(event, Set("shoppingCart"))
      case other                  => other
    }
}

清单 17-9 一个正在监听事件流的 Actor

object TopProductListener {

  private class IntHolder(var value: Int)

}

class TopProductListener extends Actor with ActorLogging {

  import TopProductListener._

  private implicit val materializer: ActorMaterializer = ActorMaterializer()

  private val readJournal: LeveldbReadJournal =
    PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

  readJournal
    .eventsByTag(tag = "shoppingCart", offset = Sequence(0L))
    .collect { case EventEnvelope(_, _, _, add: ItemAdded) => add }
    .groupedWithin(100000, 1.second)
    .addAttributes(Attributes.asyncBoundary)
    .runForeach { seq: Seq[ItemAdded] =>
      val histogram = seq.foldLeft(Map.empty[ItemRef, IntHolder]) { (map, event) =>
        map.get(event.item) match {
          case Some(holder) =>
            holder.value += event.count
            map
          case None =>
            map.updated(event.item, new IntHolder(event.count))
        }
      }
      self ! TopProducts(0, histogram.map(p => (p._1, p._2.value)))
    }

  private var topProducts = Map.empty[ItemRef, Int]

  def receive: Receive = {
    case GetTopProducts(id, replyTo) =>
      replyTo ! TopProducts(id, topProducts)
    case TopProducts(_, products) =>
      topProducts = products
      log.info("new {}", products)
  }
}