第17章——状态管理和持久化模式
前一章介绍了消息速率、负载和时间的各种概念;我们之前只考虑了不同组件之间和时间无关的关联。这一章增加了另一个正交维度来完成这整个版图:维持状态几乎是所有组件的共同目的,而我们还没有讨论应该怎么做到这一点。这里所呈现的模式彼此密切相关,并形成了一个有机整体。
17.1 领域对象模式
DomainObject.scala
final case class ItemRef(id: URI)
final case class CustomerRef(id: URI)
final case class ShoppingCartRef(id: URI)
清单 17-1 一个最小化的购物车定义
final case class ShoppingCart(items: Map[ItemRef, Int], owner: Option[CustomerRef]) {
def setOwner(customer: CustomerRef): ShoppingCart = {
require(owner.isEmpty, "owner cannot be overwritten")
copy(owner = Some(customer))
}
def addItem(item: ItemRef, count: Int): ShoppingCart = {
require(count > 0, s"count must be positive (trying to add $item with count $count)")
val currentCount = items.getOrElse(item, 0)
copy(items = items.updated(item, currentCount + count))
}
def removeItem(item: ItemRef, count: Int): ShoppingCart = {
require(count > 0, s"count must be positive (trying to remove $item with count $count)")
val currentCount = items.getOrElse(item, 0)
val newCount = currentCount - count
if (newCount <= 0)
copy(items = items - item)
else
copy(items = items.updated(item, newCount))
}
// 代码清单 17-6
def applyEvent(event: Event): ShoppingCart = event match {
case OwnerChanged(_, owner) => setOwner(owner)
case ItemAdded(_, item, count) => addItem(item, count)
case ItemRemoved(_, item, count) => removeItem(item, count)
}
// 代码清单 17-6
}
object ShoppingCart {
val empty = ShoppingCart(Map.empty, None)
}
清单 17-2 用于和购物车对象通信的消息
trait ShoppingCartMessage {
def shoppingCart: ShoppingCartRef
}
sealed trait Command extends ShoppingCartMessage
final case class SetOwner(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Command
final case class AddItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command
final case class RemoveItem(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Command
sealed trait Query extends ShoppingCartMessage
final case class GetItems(shoppingCart: ShoppingCartRef) extends Query
sealed trait Event extends ShoppingCartMessage
final case class OwnerChanged(shoppingCart: ShoppingCartRef, owner: CustomerRef) extends Event
final case class ItemAdded(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event
final case class ItemRemoved(shoppingCart: ShoppingCartRef, item: ItemRef, count: Int) extends Event
sealed trait Result extends ShoppingCartMessage
final case class GetItemsResult(shoppingCart: ShoppingCartRef, items: Map[ItemRef, Int]) extends Result
清单 17-3 一个购物车管理者 Actor
final case class ManagerCommand(cmd: Command, id: Long, replyTo: ActorRef)
final case class ManagerEvent(id: Long, event: Event)
final case class ManagerQuery(cmd: Query, id: Long, replyTo: ActorRef)
final case class ManagerResult(id: Long, result: Result)
final case class ManagerRejection(id: Long, reason: String)
class Manager(var shoppingCart: ShoppingCart) extends Actor {
/*
* this is the usual constructor, the above allows priming with
* previously persisted state.
*/
def this() = this(ShoppingCart.empty)
def receive: Receive = {
case ManagerCommand(cmd, id, replyTo) =>
try {
val event = cmd match {
case SetOwner(cart, owner) =>
shoppingCart = shoppingCart.setOwner(owner)
OwnerChanged(cart, owner)
case AddItem(cart, item, count) =>
shoppingCart = shoppingCart.addItem(item, count)
ItemAdded(cart, item, count)
case RemoveItem(cart, item, count) =>
shoppingCart = shoppingCart.removeItem(item, count)
ItemRemoved(cart, item, count)
}
replyTo ! ManagerEvent(id, event)
} catch {
case ex: IllegalArgumentException =>
replyTo ! ManagerRejection(id, ex.getMessage)
}
case ManagerQuery(cmd, id, replyTo) =>
try {
val result = cmd match {
case GetItems(cart) =>
GetItemsResult(cart, shoppingCart.items)
}
replyTo ! ManagerResult(id, result)
} catch {
case ex: IllegalArgumentException =>
replyTo ! ManagerRejection(id, ex.getMessage)
}
}
}
17.2 分片模式
清单 17-4 为购物车定义分片算法
object ShardSupport {
/*
* use the shoppingCart reference as the sharding key; the partial function
* must return both the key and the message to be forwarded, and if it does
* not match then the message is dropped
*/
val extractEntityId: ShardRegion.ExtractEntityId = {
case mc @ ManagerCommand(cmd, _, _) =>
cmd.shoppingCart.id.toString -> mc
case mc @ ManagerQuery(query, _, _) =>
query.shoppingCart.id.toString -> mc
}
/*
* allocate shoppingCarts into 256 shards based on the low 8 bits of their
* ID’s hash; this is a total function that must be defined for all messages
* that are forwarded
*/
val extractShardId: ShardRegion.ExtractShardId = {
case ManagerCommand(cmd, _, _) =>
toHex(cmd.shoppingCart.id.hashCode & 255)
case ManagerQuery(query, _, _) =>
toHex(query.shoppingCart.id.hashCode & 255)
}
private def toHex(b: Int) =
new java.lang.StringBuilder(2).append(hexDigits(b >> 4)).append(hexDigits(b & 15)).toString
private val hexDigits = "0123456789ABCDEF"
val RegionName = "ShoppingCart"
}
清单 17-5 启动一个群集来托管分片
val sys1 = ActorSystem("ShardingExample", node1Config.withFallback(clusterConfig))
val seed = Cluster(sys1).selfAddress
def startNode(sys: ActorSystem): Unit = {
Cluster(sys).join(seed)
ClusterSharding(sys).start(
typeName = ShardSupport.RegionName,
entityProps = Props(new Manager),
settings = ClusterShardingSettings(sys1),
extractEntityId = ShardSupport.extractEntityId,
extractShardId = ShardSupport.extractShardId)
}
startNode(sys1)
val sys2 = ActorSystem("ShardingExample", clusterConfig)
startNode(sys2)
Sharding.scala
val manager = ClusterSharding(sys1).shardRegion(ShardSupport.RegionName)
17.3 事件溯源模式
清单 17-6 将领域事件添加到业务逻辑
final case class ShoppingCart(items: Map[ItemRef, Int], owner: Option[CustomerRef]) {
def setOwner(customer: CustomerRef): ShoppingCart = {
require(owner.isEmpty, "owner cannot be overwritten")
copy(owner = Some(customer))
}
def addItem(item: ItemRef, count: Int): ShoppingCart = {
require(count > 0, s"count must be positive (trying to add $item with count $count)")
val currentCount = items.getOrElse(item, 0)
copy(items = items.updated(item, currentCount + count))
}
def removeItem(item: ItemRef, count: Int): ShoppingCart = {
require(count > 0, s"count must be positive (trying to remove $item with count $count)")
val currentCount = items.getOrElse(item, 0)
val newCount = currentCount - count
if (newCount <= 0)
copy(items = items - item)
else
copy(items = items.updated(item, newCount))
}
// 代码清单 17-6
def applyEvent(event: Event): ShoppingCart = event match {
case OwnerChanged(_, owner) => setOwner(owner)
case ItemAdded(_, item, count) => addItem(item, count)
case ItemRemoved(_, item, count) => removeItem(item, count)
}
// 代码清单 17-6
}
object ShoppingCart {
val empty = ShoppingCart(Map.empty, None)
}
清单 17-7 持久化一个事件溯源领域对象
class PersistentObjectManager extends PersistentActor {
// we expect the name to be the shopping card ID
override def persistenceId: String = context.self.path.name
private var shoppingCart: ShoppingCart = ShoppingCart.empty
def receiveCommand: Receive = {
case ManagerCommand(cmd, id, replyTo) =>
try {
val event = cmd match {
case SetOwner(cart, owner) => OwnerChanged(cart, owner)
case AddItem(cart, item, count) => ItemAdded(cart, item, count)
case RemoveItem(cart, item, count) => ItemRemoved(cart, item, count)
}
// perform the update here in order to treat validation errors immediately
shoppingCart = shoppingCart.applyEvent(event)
persist(event) { _ =>
replyTo ! ManagerEvent(id, event)
}
} catch {
case ex: IllegalArgumentException =>
replyTo ! ManagerRejection(id, ex.getMessage)
}
case ManagerQuery(cmd, id, replyTo) =>
try {
val result = cmd match {
case GetItems(cart) => GetItemsResult(cart, shoppingCart.items)
}
replyTo ! ManagerResult(id, result)
} catch {
case ex: IllegalArgumentException =>
replyTo ! ManagerRejection(id, ex.getMessage)
}
}
def receiveRecover: Receive = {
case e: Event => shoppingCart = shoppingCart.applyEvent(e)
}
}
17.4 事件流模式
清单 17-8 在写日志期间对事件打标签
class ShoppingCartTagging(system: ExtendedActorSystem) extends WriteEventAdapter {
def manifest(event: Any): String = "" // no additional manifest needed
def toJournal(event: Any): Any =
event match {
case s: ShoppingCartMessage => Tagged(event, Set("shoppingCart"))
case other => other
}
}
清单 17-9 一个正在监听事件流的 Actor
object TopProductListener {
private class IntHolder(var value: Int)
}
class TopProductListener extends Actor with ActorLogging {
import TopProductListener._
private implicit val materializer: ActorMaterializer = ActorMaterializer()
private val readJournal: LeveldbReadJournal =
PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
readJournal
.eventsByTag(tag = "shoppingCart", offset = Sequence(0L))
.collect { case EventEnvelope(_, _, _, add: ItemAdded) => add }
.groupedWithin(100000, 1.second)
.addAttributes(Attributes.asyncBoundary)
.runForeach { seq: Seq[ItemAdded] =>
val histogram = seq.foldLeft(Map.empty[ItemRef, IntHolder]) { (map, event) =>
map.get(event.item) match {
case Some(holder) =>
holder.value += event.count
map
case None =>
map.updated(event.item, new IntHolder(event.count))
}
}
self ! TopProducts(0, histogram.map(p => (p._1, p._2.value)))
}
private var topProducts = Map.empty[ItemRef, Int]
def receive: Receive = {
case GetTopProducts(id, replyTo) =>
replyTo ! TopProducts(id, topProducts)
case TopProducts(_, products) =>
topProducts = products
log.info("new {}", products)
}
}
1.0.0