第13章——复制模式
13.1 主动-被动复制模式
Protocol
sealed trait Command
sealed trait Result
final case class Put(key: String, value: JsValue, replyTo: ActorRef) extends Command
final case class PutConfirmed(key: String, value: JsValue) extends Result
final case class PutRejected(key: String, value: JsValue) extends Result
final case class Get(key: String, replyTo: ActorRef) extends Command
final case class GetResult(key: String, value: Option[JsValue]) extends Result
代码清单 13-1 单例作为主动副本来接管
class Active(localReplica: ActorRef, replicationFactor: Int, maxQueueSize: Int)
extends Actor
with Stash
with ActorLogging {
private var theStore: Map[String, JsValue] = _
private var seqNr: Iterator[Int] = _
log.info("taking over from local replica")
localReplica ! TakeOver(self)
def receive: Receive = {
case InitialState(m, s) =>
log.info("took over at sequence {}", s)
theStore = m
seqNr = Iterator.from(s)
context.become(running)
unstashAll()
case _ => stash()
}
val running: Receive = ??? //...
}
代码清单 13-2 主动副本传播复制请求
class Active(localReplica: ActorRef, replicationFactor: Int, maxQueueSize: Int)
extends Actor
with Stash
with ActorLogging {
// ...
private val MaxOutstanding = maxQueueSize / 2
private var theStore: Map[String, JsValue] = _
private var seqNr: Iterator[Int] = _
private val toReplicate = mutable.Queue.empty[Replicate]
private var replicating = TreeMap.empty[Int, (Replicate, Int)]
private var rejected = 0
val timer: Cancellable =
context.system.scheduler.schedule(1.second, 1.second, self, Tick)(context.dispatcher)
override def postStop(): Unit = timer.cancel()
log.info("taking over from local replica")
localReplica ! TakeOver(self)
def receive: Receive = {
case InitialState(m, s) =>
log.info("took over at sequence {}", s)
theStore = m
seqNr = Iterator.from(s)
context.become(running)
unstashAll()
case _ => stash()
}
val running: Receive = {
case p @ Put(key, value, replyTo) =>
if (toReplicate.size < MaxOutstanding) {
toReplicate.enqueue(Replicate(seqNr.next, key, value, replyTo))
replicate()
} else {
rejected += 1
replyTo ! PutRejected(key, value)
}
case Get(key, replyTo) =>
replyTo ! GetResult(key, theStore.get(key))
case Tick =>
replicating.valuesIterator.foreach {
case (replicate, _) => disseminate(replicate)
}
if (rejected > 0) {
log.info("rejected {} PUT requests", rejected)
rejected = 0
}
case Replicated(confirm) =>
replicating.get(confirm) match {
case None => // already removed
case Some((rep, 1)) =>
replicating -= confirm
theStore += rep.key -> rep.value
rep.replyTo ! PutConfirmed(rep.key, rep.value)
case Some((rep, n)) =>
replicating += confirm -> (rep, n - 1)
}
replicate()
}
private def replicate(): Unit =
if (replicating.size < MaxOutstanding && toReplicate.nonEmpty) {
val r = toReplicate.dequeue()
replicating += r.seq -> (r, replicationFactor)
disseminate(r)
}
private def disseminate(r: Replicate): Unit = {
val req = r.copy(replyTo = self)
val members = Cluster(context.system).state.members
members.foreach(m => replicaOn(m.address) ! req)
}
private def replicaOn(addr: Address): ActorSelection =
context.actorSelection(localReplica.path.toStringWithAddress(addr))
}
代码清单 13-3 通过将JSON文件写入到本地磁盘来实现持久化
import java.io.File
import akka.actor.ActorRef
import play.api.libs.json.{ JsValue, Json, OFormat }
import sbt.io.IO
object Persistence {
final case class Database(seq: Int, kv: Map[String, JsValue])
object Database {
implicit val format: OFormat[Database] = Json.format[Database]
}
def persist(name: String, seq: Int, kv: Map[String, JsValue]): Unit = {
val bytes = Json.stringify(Json.toJson(Database(seq, kv)))
val current = new File(s"./theDataBase-$name.json")
val next = new File(s"./theDataBase-$name.json.new")
IO.write(next, bytes)
IO.move(next, current) // atomically update the database
}
def readPersisted(name: String): Database = {
val file = new File(s"theDataBase-$name.json")
if (file.exists()) Json.parse(IO.read(file)).as[Database]
else Database(0, Map.empty)
}
}
代码清单 13-4 被动副本追踪它们是否是最新的版本
class Passive(askAroundCount: Int, askAroundInterval: FiniteDuration, maxLag: Int) extends Actor with ActorLogging {
private val applied = mutable.Queue.empty[Replicate]
private val name: String =
Cluster(context.system).selfAddress.toString.replaceAll("[:/]", "_")
def receive: Receive = readPersisted(name) match {
case Database(s, kv) =>
log.info("started at sequence {}", s)
upToDate(kv, s + 1)
}
def upToDate(theStore: Map[String, JsValue], expectedSeq: Int): Receive = {
case TakeOver(active) =>
log.info("active replica starting at sequence {}", expectedSeq)
active ! InitialState(theStore, expectedSeq)
case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 =>
replyTo ! Replicated(s)
case r: Replicate if r.seq == expectedSeq =>
val nextStore = theStore + (r.key -> r.value)
persist(name, expectedSeq, nextStore)
r.replyTo ! Replicated(r.seq)
applied.enqueue(r)
context.become(upToDate(nextStore, expectedSeq + 1))
case r: Replicate =>
if (r.seq - expectedSeq > maxLag)
fallBehind(expectedSeq, TreeMap(r.seq -> r))
else
missingSomeUpdates(theStore, expectedSeq, Set.empty, TreeMap(r.seq -> r))
case GetSingle(s, replyTo) =>
log.info("GetSingle from {}", replyTo)
if (applied.nonEmpty && applied.head.seq <= s && applied.last.seq >= s)
replyTo ! applied.find(_.seq == s).get
else if (s < expectedSeq) replyTo ! InitialState(theStore, expectedSeq)
case GetFull(replyTo) =>
log.info("sending full info to {}", replyTo)
replyTo ! InitialState(theStore, expectedSeq)
}
def fallBehind(expectedSeq: Int, _waiting: TreeMap[Int, Replicate]): Unit = ???
def missingSomeUpdates(
theStore: Map[String, JsValue],
expectedSeq: Int,
prevOutstanding: Set[Int],
waiting: TreeMap[Int, Replicate]): Unit = ???
}
代码清单 13-5 被动副本在滞后过多时请求一份全量更新
class Passive(askAroundCount: Int, askAroundInterval: FiniteDuration, maxLag: Int) extends Actor with ActorLogging {
private val applied = mutable.Queue.empty[Replicate]
private var awaitingInitialState = Option.empty[ActorRef]
private val name: String =
Cluster(context.system).selfAddress.toString.replaceAll("[:/]", "_")
private val cluster = Cluster(context.system)
private val random = new Random
private var tickTask = Option.empty[Cancellable]
def scheduleTick(): Unit = {
tickTask.foreach(_.cancel())
tickTask = Some(
context.system.scheduler.scheduleOnce(askAroundInterval, self, DoConsolidate)(context.dispatcher))
}
def receive: Receive = readPersisted(name) match {
case Database(s, kv) =>
log.info("started at sequence {}", s)
upToDate(kv, s + 1)
}
def caughtUp(theStore: Map[String, JsValue], expectedSeq: Int): Unit = {
awaitingInitialState.foreach(_ ! InitialState(theStore, expectedSeq))
awaitingInitialState = None
context.become(upToDate(theStore, expectedSeq))
}
def upToDate(theStore: Map[String, JsValue], expectedSeq: Int): Receive = {
// Cases shown previously elided
case TakeOver(active) => ??? //...
case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 => ??? //...
case r: Replicate if r.seq == expectedSeq => ??? //...
case r: Replicate => ??? //...
case GetSingle(s, replyTo) => ??? //...
case GetFull(replyTo) =>
log.info("sending full info to {}", replyTo)
replyTo ! InitialState(theStore, expectedSeq)
}
def fallBehind(expectedSeq: Int, _waiting: TreeMap[Int, Replicate]): Unit = {
askAroundFullState()
scheduleTick()
var waiting = _waiting
context.become {
case Replicate(s, _, _, replyTo) if s < expectedSeq =>
replyTo ! Replicated(s)
case r: Replicate =>
waiting += (r.seq -> r)
case TakeOver(active) =>
log.info("delaying active replica takeOver, at seq {} while highest is {}", expectedSeq, waiting.lastKey)
awaitingInitialState = Some(active)
case InitialState(m, s) if s > expectedSeq =>
log.info("received newer state at sequence {} (was at {})", s, expectedSeq)
persist(name, s, m)
waiting.to(s).valuesIterator.foreach(r => r.replyTo ! Replicated(r.seq))
val nextWaiting = waiting.from(expectedSeq)
consolidate(m, s + 1, Set.empty, nextWaiting)
case DoConsolidate =>
askAroundFullState()
scheduleTick()
}
}
private def consolidate(
theStore: Map[String, JsValue],
expectedSeq: Int,
askedFor: Set[Int],
waiting: TreeMap[Int, Replicate]): Unit = ??? //...
private def getMembers(n: Int): Seq[Address] = {
// using .iterator to avoid one intermediate collection to be created
random.shuffle(cluster.state.members.iterator.map(_.address).toSeq).take(n)
}
private def askAroundFullState(): Unit = {
log.info("asking for full data")
getMembers(1).foreach(addr => replicaOn(addr) ! GetFull(self))
}
private def replicaOn(addr: Address): ActorSelection =
context.actorSelection(self.path.toStringWithAddress(addr))
}
代码清单 13-6 计算直接可应用的队列前段的长度
private val matches = (p: (Int, Int)) => p._1 == p._2
private def consolidate(
theStore: Map[String, JsValue],
expectedSeq: Int,
askedFor: Set[Int],
waiting: TreeMap[Int, Replicate]): Unit = {
// calculate applicable prefix length
val prefix = waiting.keysIterator.zip(Iterator.from(expectedSeq)).takeWhile(matches).size
val nextStore = waiting.valuesIterator.take(prefix).foldLeft(theStore) { (store, replicate) =>
persist(name, replicate.seq, theStore)
replicate.replyTo ! Replicated(replicate.seq)
applied.enqueue(replicate)
store + (replicate.key -> replicate.value)
}
val nextWaiting = waiting.drop(prefix)
val nextExpectedSeq = expectedSeq + prefix
// cap the size of the applied buffer
applied.drop(Math.max(0, applied.size - maxLag))
if (nextWaiting.nonEmpty) {
// check if we fell behind by too much
if (nextWaiting.lastKey - nextExpectedSeq > maxLag)
fallBehind(nextExpectedSeq, nextWaiting)
else missingSomeUpdates(nextStore, nextExpectedSeq, askedFor, nextWaiting)
} else caughtUp(nextStore, nextExpectedSeq)
}
代码清单 13-7 确定更新队列里的数据缺口是否可以被一一填补
class Passive(askAroundCount: Int, askAroundInterval: FiniteDuration, maxLag: Int) extends Actor with ActorLogging {
private val applied = mutable.Queue.empty[Replicate]
private var awaitingInitialState = Option.empty[ActorRef]
// ... Initialization elided
def receive: Receive = ??? //...
def upToDate(theStore: Map[String, JsValue], expectedSeq: Int): Receive = {
case TakeOver(active) => ??? //...
case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 => ??? //...
case r: Replicate if r.seq == expectedSeq => ??? //...
case r: Replicate => ??? //...
case GetFull(replyTo) => ??? //...
case GetSingle(s, replyTo) =>
log.info("GetSingle from {}", replyTo)
if (applied.nonEmpty &&
applied.head.seq <= s && applied.last.seq >= s) {
replyTo ! applied.find(_.seq == s).get
} else if (s < expectedSeq) {
replyTo ! InitialState(theStore, expectedSeq)
}
}
def missingSomeUpdates(
theStore: Map[String, JsValue],
expectedSeq: Int,
prevOutstanding: Set[Int],
waiting: TreeMap[Int, Replicate]): Unit = {
val askFor = (expectedSeq to waiting.lastKey).iterator
.filterNot(seq =>
waiting.contains(seq) ||
prevOutstanding.contains(seq))
.toList
askFor.foreach(askAround)
if (prevOutstanding.isEmpty) {
scheduleTick()
}
val outstanding = prevOutstanding ++ askFor
context.become {
case Replicate(s, _, _, replyTo) if s < expectedSeq =>
replyTo ! Replicated(s)
case r: Replicate =>
consolidate(theStore, expectedSeq, outstanding - r.seq, waiting + (r.seq -> r))
case TakeOver(active) =>
log.info("delaying active replica takeOver, at seq {} while highest is {}", expectedSeq, waiting.lastKey)
awaitingInitialState = Some(active)
case GetSingle(s, replyTo) =>
log.info("GetSingle from {}", replyTo)
if (applied.nonEmpty &&
applied.head.seq <= s &&
applied.last.seq >= s) {
replyTo ! applied.find(_.seq == s).get
} else if (s < expectedSeq) {
replyTo ! InitialState(theStore, expectedSeq)
}
case GetFull(replyTo) =>
log.info("sending full info to {}", replyTo)
replyTo ! InitialState(theStore, expectedSeq)
case DoConsolidate =>
outstanding.foreach(askAround)
scheduleTick()
}
}
private def askAround(seq: Int): Unit = {
log.info("asking around for sequence number {}", seq)
getMembers(askAroundCount).foreach(addr => replicaOn(addr) ! GetSingle(seq, self))
}
// ... Other helpers elided
private def consolidate(
theStore: Map[String, JsValue],
expectedSeq: Int,
askedFor: Set[Int],
waiting: TreeMap[Int, Replicate]): Unit = ???
private def getMembers(n: Int): Seq[Address] = ???
private def replicaOn(addr: Address): ActorSelection = ???
def scheduleTick(): Unit = ???
}
13.2 多主复制模式
代码清单 13-8 使用 CKite 来实现键值存储
class KVStore extends StateMachine {
private var map = mutable.Map[String, String]()
private var lastIndex: Long = 0
def applyWrite: PartialFunction[(Long, WriteCommand[_]), String] = {
case (index, Put(key: String, value: String)) =>
map.put(key, value)
lastIndex = index
value
}
def applyRead: PartialFunction[ReadCommand[_], Option[String]] = {
case Get(key) => map.get(key)
}
def getLastAppliedIndex: Long = lastIndex
def restoreSnapshot(byteBuffer: ByteBuffer): Unit = {
map = Serializer.deserialize[mutable.Map[String, String]](byteBuffer.array())
}
def takeSnapshot(): ByteBuffer =
ByteBuffer.wrap(Serializer.serialize(map))
}
代码清单 13-9 按照复制状态机来实例化KVStore
object KVStoreBootstrap extends App {
val ckite =
CKiteBuilder().stateMachine(new KVStore()).rpc(FinagleThriftRpc).storage(MapDBStorage()).build
ckite.start()
HttpServer(ckite).start()
}
ckite api
val consistentRead: Future[Option[String]] = ckite.read(Get(key))
val possibleStaleRead: Future[Option[String]] = ckite.readLocal(Get(key))
val write: Future[String] = ckite.write(Put(key, value))
代码清单 13-10 图13-1中图形的代码表示
final case class Status(name: String)(_predecessor: => Set[Status], _successor: => Set[Status])
extends ReplicatedData {
type T = Status
def merge(that: Status): Status = mergeStatus(this, that)
@volatile lazy val predecessors: Set[Status] = _predecessor
@volatile lazy val successors: Set[Status] = _successor
}
val New: Status =
Status("new")(Set.empty, Set(Scheduled, Cancelled))
val Scheduled: Status =
Status("scheduled")(Set(New), Set(Executing, Cancelled))
val Executing: Status =
Status("executing")(Set(Scheduled), Set(Aborted, Finished))
val Finished: Status =
Status("finished")(Set(Executing, Aborted), Set.empty)
val Cancelled: Status =
Status("cancelled")(Set(New, Scheduled), Set(Aborted))
val Aborted: Status =
Status("aborted")(Set(Cancelled, Executing), Set(Finished))
代码清单 13-11 合并两个状态来产生第三个合并后的状态
def mergeStatus(left: Status, right: Status): Status = {
/*
* Keep the left Status in hand and determine whether it is a predecessor of
* the candidate, moving on to the candidate’s successor if not successful.
* The list of exclusions is used to avoid performing already determined
* unsuccessful comparisons again.
*/
def innerLoop(candidate: Status, exclude: Set[Status]): Status =
if (isSuccessor(candidate, left, exclude)) {
candidate
} else {
val nextExclude = exclude + candidate
val branches =
candidate.successors.map(successor => innerLoop(successor, nextExclude))
branches.reduce((l, r) => if (isSuccessor(l, r, nextExclude)) r else l)
}
def isSuccessor(candidate: Status, fixed: Status, exclude: Set[Status]): Boolean =
if (candidate == fixed) true
else {
val toSearch = candidate.predecessors -- exclude
toSearch.exists(pred => isSuccessor(pred, fixed, exclude))
}
innerLoop(right, Set.empty)
}
代码清单 13-12 使用 Akka Distributed Data 来传播状态变更
class ClientInterface extends Actor with ActorLogging {
private val replicator: ActorRef = DistributedData(context.system).replicator
private implicit val cluster: Cluster = Cluster(context.system)
def receive: Receive = {
case Submit(job) =>
log.info("submitting job {}", job)
replicator ! Replicator.Update(
StorageComponent,
ORMap.empty[String, Status],
Replicator.WriteMajority(5.seconds),
Some(s"submit $job"))(_ + (job -> New))
case Cancel(job) =>
log.info("cancelling job {}", job)
replicator ! Replicator.Update(
StorageComponent,
ORMap.empty[String, Status],
Replicator.WriteMajority(5.seconds),
Some(s"cancel $job"))(_ + (job -> Cancelled))
case r: Replicator.UpdateResponse[_] =>
log.info("received update result: {}", r)
case PrintStatus =>
replicator ! Replicator.Get(StorageComponent, Replicator.ReadMajority(5.seconds))
case g: Replicator.GetSuccess[_] =>
log.info("overall status: {}", g.get(StorageComponent))
}
}
代码清单 13-13 引入对于任务的请求标志
class Executor extends Actor with ActorLogging {
private val replicator: ActorRef = DistributedData(context.system).replicator
private implicit val cluster: Cluster = Cluster(context.system)
private var lastState = Map.empty[String, Status]
replicator ! Replicator.Subscribe(StorageComponent, self)
def receive: Receive = {
case Execute(job) =>
log.info("executing job {}", job)
replicator ! Replicator.Update(
StorageComponent,
ORMap.empty[String, Status],
Replicator.WriteMajority(5.seconds),
Some(job)) { map =>
require(map.get(job).contains(New))
map + (job -> Executing)
}
case Finish(job) =>
log.info("job {} finished", job)
replicator ! Replicator.Update(
StorageComponent,
ORMap.empty[String, Status],
Replicator.WriteMajority(5.seconds))(_ + (job -> Finished))
case Replicator.UpdateSuccess(StorageComponent, Some(job)) =>
log.info("starting job {}", job)
case r: Replicator.UpdateResponse[_] =>
log.info("received update result: {}", r)
case ch: Replicator.Changed[_] =>
val current = ch.get(StorageComponent).entries
for {
(job, status) <- current.iterator
if status == Aborted
if !lastState.get(job).contains(Aborted)
} {
log.info("aborting job {}", job)
lastState = current
}
}
}
13.3 主动-主动复制模式
代码清单 13-14 用无协调工作的实现来开始主动-主动复制模式
private final case class SeqCommand(seq: Int, cmd: Command, replyTo: ActorRef)
private final case class SeqResult(seq: Int, res: Result, replica: ActorRef, replyTo: ActorRef)
private final case class SendInitialData(toReplica: ActorRef)
private final case class InitialData(map: Map[String, JsValue])
class Replica extends Actor with Stash {
private var map = Map.empty[String, JsValue]
def receive: Receive = {
case InitialData(m) =>
map = m
context.become(initialized)
unstashAll()
case _ => stash()
}
def initialized: Receive = {
case SeqCommand(seq, cmd, replyTo) =>
// tracking of sequence numbers and resents is elided here
cmd match {
case Put(key, value, r) =>
map += key -> value
replyTo ! SeqResult(seq, PutConfirmed(key, value), self, r)
case Get(key, r) =>
replyTo ! SeqResult(seq, GetResult(key, map.get(key)), self, r)
}
case SendInitialData(toReplica) => toReplica ! InitialData(map)
}
}
代码清单 13-15 封装对于单个客户端请求的知悉情况
private sealed trait ReplyState {
def deadline: Deadline
def missing: Set[ActorRef]
def add(res: SeqResult): ReplyState
def isFinished: Boolean = missing.isEmpty
}
private final case class Unknown(deadline: Deadline, replies: Set[SeqResult], missing: Set[ActorRef], quorum: Int)
extends ReplyState {
override def add(res: SeqResult): ReplyState = {
val nextReplies = replies + res
val nextMissing = missing - res.replica
if (nextReplies.size >= quorum) {
val answer =
replies.toSeq.groupBy(_.res).collectFirst {
case (k, s) if s.size >= quorum => s.head
}
if (answer.isDefined) {
val right = answer.get
val wrong = replies.collect {
case SeqResult(_, result, replica, _) if res != right => replica
}
Known(deadline, right, wrong, nextMissing)
} else if (nextMissing.isEmpty) {
Known.fromUnknown(deadline, nextReplies)
} else Unknown(deadline, nextReplies, nextMissing, quorum)
} else Unknown(deadline, nextReplies, nextMissing, quorum)
}
}
private final case class Known(deadline: Deadline, reply: SeqResult, wrong: Set[ActorRef], missing: Set[ActorRef])
extends ReplyState {
override def add(res: SeqResult): ReplyState = {
val nextWrong =
if (res.res == reply.res)
wrong
else
wrong + res.replica
Known(deadline, reply, nextWrong, missing - res.replica)
}
}
private object Known {
def fromUnknown(deadline: Deadline, replies: Set[SeqResult]): Known = {
// did not reach consensus on this one, pick simple majority
val counts = replies.groupBy(_.res)
val biggest = counts.iterator.map(_._2.size).max
val winners = counts.collectFirst {
case (res, win) if win.size == biggest => win
}.get
val losers = (replies -- winners).map(_.replica)
Known(deadline, winners.head, losers, Set.empty)
}
}
代码清单 13-16 将副本当作子Actor管理
class Coordinator(N: Int) extends Actor {
private var replicas = (1 to N).map(_ => newReplica()).toSet
private val seqNr = Iterator.from(0)
private var replies = TreeMap.empty[Int, ReplyState]
private var nextReply = 0
override def supervisorStrategy: SupervisorStrategy =
SupervisorStrategy.stoppingStrategy
private def newReplica(): ActorRef =
context.watch(context.actorOf(Replica.props))
// schedule timeout messages for quiescent periods
context.setReceiveTimeout(1.second)
def receive: Receive =
({
case cmd: Command =>
val c = SeqCommand(seqNr.next, cmd, self)
replicas.foreach(_ ! c)
replies += c.seq -> Unknown(5.seconds(fromNow), Set.empty, replicas, (replicas.size + 1) / 2)
case res: SeqResult
if replies.contains(res.seq) &&
replicas.contains(res.replica) =>
val prevState = replies(res.seq)
val nextState = prevState.add(res)
replies += res.seq -> nextState
case Terminated(ref) =>
replaceReplica(ref, terminate = false)
case ReceiveTimeout =>
}: Receive).andThen { _ =>
doTimeouts()
sendReplies()
evictFinished()
}
//...
private def doTimeouts(): Unit = {
val now = Deadline.now
val expired = replies.iterator.takeWhile(_._2.deadline <= now)
for ((seq, state) <- expired) {
state match {
case Unknown(deadline, received, _, _) =>
val forced = Known.fromUnknown(deadline, received)
replies += seq -> forced
case Known(deadline, reply, wrong, missing) =>
replies += seq -> Known(deadline, reply, wrong, Set.empty)
}
}
}
@tailrec private def sendReplies(): Unit =
replies.get(nextReply) match {
case Some(k @ Known(_, reply, _, _)) =>
reply.replyTo ! reply.res
nextReply += 1
sendReplies()
case _ =>
}
@tailrec private def evictFinished(): Unit =
replies.headOption match {
case Some((seq, k @ Known(_, _, wrong, _))) if k.isFinished =>
wrong.foreach(replaceReplica(_, terminate = true))
replies -= seq
evictFinished()
case _ =>
}
private def replaceReplica(r: ActorRef, terminate: Boolean): Unit =
if (replicas contains r) {
replicas -= r
if (terminate) r ! PoisonPill
val replica = newReplica()
replicas.head ! SendInitialData(replica)
replicas += replica
}
}
代码清单 13-17 按序发送回复
@tailrec private def sendReplies(): Unit =
replies.get(nextReply) match {
case Some(k @ Known(_, reply, _, _)) =>
reply.replyTo ! reply.res
nextReply += 1
sendReplies()
case _ =>
}
代码清单 13-18 一旦timeout,就强迫将“missing”回复转为“known”回复
private def doTimeouts(): Unit = {
val now = Deadline.now
val expired = replies.iterator.takeWhile(_._2.deadline <= now)
for ((seq, state) <- expired) {
state match {
case Unknown(deadline, received, _, _) =>
val forced = Known.fromUnknown(deadline, received)
replies += seq -> forced
case Known(deadline, reply, wrong, missing) =>
replies += seq -> Known(deadline, reply, wrong, Set.empty)
}
}
}
代码清单 13-19 终止并替换未完成的副本
@tailrec private def evictFinished(): Unit =
replies.headOption match {
case Some((seq, k @ Known(_, _, wrong, _))) if k.isFinished =>
wrong.foreach(replaceReplica(_, terminate = true))
replies -= seq
evictFinished()
case _ =>
}
private def replaceReplica(r: ActorRef, terminate: Boolean): Unit =
if (replicas contains r) {
replicas -= r
if (terminate) r ! PoisonPill
val replica = newReplica()
replicas.head ! SendInitialData(replica)
replicas += replica
}
1.0.0