第13章——复制模式

13.1 主动-被动复制模式

Protocol

sealed trait Command

sealed trait Result

final case class Put(key: String, value: JsValue, replyTo: ActorRef) extends Command

final case class PutConfirmed(key: String, value: JsValue) extends Result

final case class PutRejected(key: String, value: JsValue) extends Result

final case class Get(key: String, replyTo: ActorRef) extends Command

final case class GetResult(key: String, value: Option[JsValue]) extends Result

代码清单 13-1 单例作为主动副本来接管

class Active(localReplica: ActorRef, replicationFactor: Int, maxQueueSize: Int)
    extends Actor
    with Stash
    with ActorLogging {

  private var theStore: Map[String, JsValue] = _
  private var seqNr: Iterator[Int] = _

  log.info("taking over from local replica")
  localReplica ! TakeOver(self)

  def receive: Receive = {
    case InitialState(m, s) =>
      log.info("took over at sequence {}", s)
      theStore = m
      seqNr = Iterator.from(s)
      context.become(running)
      unstashAll()
    case _ => stash()
  }

  val running: Receive = ??? //...
}

代码清单 13-2 主动副本传播复制请求

class Active(localReplica: ActorRef, replicationFactor: Int, maxQueueSize: Int)
    extends Actor
    with Stash
    with ActorLogging {
  // ...
  private val MaxOutstanding = maxQueueSize / 2

  private var theStore: Map[String, JsValue] = _
  private var seqNr: Iterator[Int] = _
  private val toReplicate = mutable.Queue.empty[Replicate]
  private var replicating = TreeMap.empty[Int, (Replicate, Int)]

  private var rejected = 0

  val timer: Cancellable =
    context.system.scheduler.schedule(1.second, 1.second, self, Tick)(context.dispatcher)

  override def postStop(): Unit = timer.cancel()

  log.info("taking over from local replica")
  localReplica ! TakeOver(self)

  def receive: Receive = {
    case InitialState(m, s) =>
      log.info("took over at sequence {}", s)
      theStore = m
      seqNr = Iterator.from(s)
      context.become(running)
      unstashAll()
    case _ => stash()
  }

  val running: Receive = {
    case p @ Put(key, value, replyTo) =>
      if (toReplicate.size < MaxOutstanding) {
        toReplicate.enqueue(Replicate(seqNr.next, key, value, replyTo))
        replicate()
      } else {
        rejected += 1
        replyTo ! PutRejected(key, value)
      }
    case Get(key, replyTo) =>
      replyTo ! GetResult(key, theStore.get(key))
    case Tick =>
      replicating.valuesIterator.foreach {
        case (replicate, _) => disseminate(replicate)
      }
      if (rejected > 0) {
        log.info("rejected {} PUT requests", rejected)
        rejected = 0
      }
    case Replicated(confirm) =>
      replicating.get(confirm) match {
        case None => // already removed
        case Some((rep, 1)) =>
          replicating -= confirm
          theStore += rep.key -> rep.value
          rep.replyTo ! PutConfirmed(rep.key, rep.value)
        case Some((rep, n)) =>
          replicating += confirm -> (rep, n - 1)
      }
      replicate()
  }

  private def replicate(): Unit =
    if (replicating.size < MaxOutstanding && toReplicate.nonEmpty) {
      val r = toReplicate.dequeue()
      replicating += r.seq -> (r, replicationFactor)
      disseminate(r)
    }

  private def disseminate(r: Replicate): Unit = {
    val req = r.copy(replyTo = self)
    val members = Cluster(context.system).state.members
    members.foreach(m => replicaOn(m.address) ! req)
  }

  private def replicaOn(addr: Address): ActorSelection =
    context.actorSelection(localReplica.path.toStringWithAddress(addr))
}

代码清单 13-3 通过将JSON文件写入到本地磁盘来实现持久化

import java.io.File

import akka.actor.ActorRef
import play.api.libs.json.{ JsValue, Json, OFormat }
import sbt.io.IO

object Persistence {

  final case class Database(seq: Int, kv: Map[String, JsValue])

  object Database {
    implicit val format: OFormat[Database] = Json.format[Database]
  }

  def persist(name: String, seq: Int, kv: Map[String, JsValue]): Unit = {
    val bytes = Json.stringify(Json.toJson(Database(seq, kv)))
    val current = new File(s"./theDataBase-$name.json")
    val next = new File(s"./theDataBase-$name.json.new")
    IO.write(next, bytes)
    IO.move(next, current) // atomically update the database
  }

  def readPersisted(name: String): Database = {
    val file = new File(s"theDataBase-$name.json")
    if (file.exists()) Json.parse(IO.read(file)).as[Database]
    else Database(0, Map.empty)
  }
}

代码清单 13-4 被动副本追踪它们是否是最新的版本

class Passive(askAroundCount: Int, askAroundInterval: FiniteDuration, maxLag: Int) extends Actor with ActorLogging {

  private val applied = mutable.Queue.empty[Replicate]

  private val name: String =
    Cluster(context.system).selfAddress.toString.replaceAll("[:/]", "_")

  def receive: Receive = readPersisted(name) match {
    case Database(s, kv) =>
      log.info("started at sequence {}", s)
      upToDate(kv, s + 1)
  }

  def upToDate(theStore: Map[String, JsValue], expectedSeq: Int): Receive = {
    case TakeOver(active) =>
      log.info("active replica starting at sequence {}", expectedSeq)
      active ! InitialState(theStore, expectedSeq)
    case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 =>
      replyTo ! Replicated(s)
    case r: Replicate if r.seq == expectedSeq =>
      val nextStore = theStore + (r.key -> r.value)
      persist(name, expectedSeq, nextStore)
      r.replyTo ! Replicated(r.seq)
      applied.enqueue(r)
      context.become(upToDate(nextStore, expectedSeq + 1))
    case r: Replicate =>
      if (r.seq - expectedSeq > maxLag)
        fallBehind(expectedSeq, TreeMap(r.seq -> r))
      else
        missingSomeUpdates(theStore, expectedSeq, Set.empty, TreeMap(r.seq -> r))
    case GetSingle(s, replyTo) =>
      log.info("GetSingle from {}", replyTo)
      if (applied.nonEmpty && applied.head.seq <= s && applied.last.seq >= s)
        replyTo ! applied.find(_.seq == s).get
      else if (s < expectedSeq) replyTo ! InitialState(theStore, expectedSeq)
    case GetFull(replyTo) =>
      log.info("sending full info to {}", replyTo)
      replyTo ! InitialState(theStore, expectedSeq)
  }

  def fallBehind(expectedSeq: Int, _waiting: TreeMap[Int, Replicate]): Unit = ???

  def missingSomeUpdates(
      theStore: Map[String, JsValue],
      expectedSeq: Int,
      prevOutstanding: Set[Int],
      waiting: TreeMap[Int, Replicate]): Unit = ???
}

代码清单 13-5 被动副本在滞后过多时请求一份全量更新

class Passive(askAroundCount: Int, askAroundInterval: FiniteDuration, maxLag: Int) extends Actor with ActorLogging {

  private val applied = mutable.Queue.empty[Replicate]
  private var awaitingInitialState = Option.empty[ActorRef]

  private val name: String =
    Cluster(context.system).selfAddress.toString.replaceAll("[:/]", "_")
  private val cluster = Cluster(context.system)
  private val random = new Random

  private var tickTask = Option.empty[Cancellable]

  def scheduleTick(): Unit = {
    tickTask.foreach(_.cancel())
    tickTask = Some(
      context.system.scheduler.scheduleOnce(askAroundInterval, self, DoConsolidate)(context.dispatcher))
  }

  def receive: Receive = readPersisted(name) match {
    case Database(s, kv) =>
      log.info("started at sequence {}", s)
      upToDate(kv, s + 1)
  }

  def caughtUp(theStore: Map[String, JsValue], expectedSeq: Int): Unit = {
    awaitingInitialState.foreach(_ ! InitialState(theStore, expectedSeq))
    awaitingInitialState = None
    context.become(upToDate(theStore, expectedSeq))
  }

  def upToDate(theStore: Map[String, JsValue], expectedSeq: Int): Receive = {
    // Cases shown previously elided
    case TakeOver(active)                                   => ??? //...
    case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 => ??? //...
    case r: Replicate if r.seq == expectedSeq               => ??? //...
    case r: Replicate                                       => ??? //...
    case GetSingle(s, replyTo)                              => ??? //...
    case GetFull(replyTo) =>
      log.info("sending full info to {}", replyTo)
      replyTo ! InitialState(theStore, expectedSeq)
  }

  def fallBehind(expectedSeq: Int, _waiting: TreeMap[Int, Replicate]): Unit = {
    askAroundFullState()
    scheduleTick()
    var waiting = _waiting
    context.become {
      case Replicate(s, _, _, replyTo) if s < expectedSeq =>
        replyTo ! Replicated(s)
      case r: Replicate =>
        waiting += (r.seq -> r)
      case TakeOver(active) =>
        log.info("delaying active replica takeOver, at seq {} while highest is {}", expectedSeq, waiting.lastKey)
        awaitingInitialState = Some(active)
      case InitialState(m, s) if s > expectedSeq =>
        log.info("received newer state at sequence {} (was at {})", s, expectedSeq)
        persist(name, s, m)
        waiting.to(s).valuesIterator.foreach(r => r.replyTo ! Replicated(r.seq))
        val nextWaiting = waiting.from(expectedSeq)
        consolidate(m, s + 1, Set.empty, nextWaiting)
      case DoConsolidate =>
        askAroundFullState()
        scheduleTick()
    }
  }

  private def consolidate(
      theStore: Map[String, JsValue],
      expectedSeq: Int,
      askedFor: Set[Int],
      waiting: TreeMap[Int, Replicate]): Unit = ??? //...

  private def getMembers(n: Int): Seq[Address] = {
    // using .iterator to avoid one intermediate collection to be created
    random.shuffle(cluster.state.members.iterator.map(_.address).toSeq).take(n)
  }

  private def askAroundFullState(): Unit = {
    log.info("asking for full data")
    getMembers(1).foreach(addr => replicaOn(addr) ! GetFull(self))
  }

  private def replicaOn(addr: Address): ActorSelection =
    context.actorSelection(self.path.toStringWithAddress(addr))
}

代码清单 13-6 计算直接可应用的队列前段的长度

private val matches = (p: (Int, Int)) => p._1 == p._2

private def consolidate(
    theStore: Map[String, JsValue],
    expectedSeq: Int,
    askedFor: Set[Int],
    waiting: TreeMap[Int, Replicate]): Unit = {

  // calculate applicable prefix length
  val prefix = waiting.keysIterator.zip(Iterator.from(expectedSeq)).takeWhile(matches).size

  val nextStore = waiting.valuesIterator.take(prefix).foldLeft(theStore) { (store, replicate) =>
    persist(name, replicate.seq, theStore)
    replicate.replyTo ! Replicated(replicate.seq)
    applied.enqueue(replicate)
    store + (replicate.key -> replicate.value)
  }
  val nextWaiting = waiting.drop(prefix)
  val nextExpectedSeq = expectedSeq + prefix

  // cap the size of the applied buffer
  applied.drop(Math.max(0, applied.size - maxLag))

  if (nextWaiting.nonEmpty) {
    // check if we fell behind by too much
    if (nextWaiting.lastKey - nextExpectedSeq > maxLag)
      fallBehind(nextExpectedSeq, nextWaiting)
    else missingSomeUpdates(nextStore, nextExpectedSeq, askedFor, nextWaiting)
  } else caughtUp(nextStore, nextExpectedSeq)
}

代码清单 13-7 确定更新队列里的数据缺口是否可以被一一填补

class Passive(askAroundCount: Int, askAroundInterval: FiniteDuration, maxLag: Int) extends Actor with ActorLogging {

  private val applied = mutable.Queue.empty[Replicate]
  private var awaitingInitialState = Option.empty[ActorRef]

  // ... Initialization elided
  def receive: Receive = ??? //...

  def upToDate(theStore: Map[String, JsValue], expectedSeq: Int): Receive = {
    case TakeOver(active)                                   => ??? //...
    case Replicate(s, _, _, replyTo) if s - expectedSeq < 0 => ??? //...
    case r: Replicate if r.seq == expectedSeq               => ??? //...
    case r: Replicate                                       => ??? //...
    case GetFull(replyTo)                                   => ??? //...
    case GetSingle(s, replyTo) =>
      log.info("GetSingle from {}", replyTo)
      if (applied.nonEmpty &&
          applied.head.seq <= s && applied.last.seq >= s) {
        replyTo ! applied.find(_.seq == s).get
      } else if (s < expectedSeq) {
        replyTo ! InitialState(theStore, expectedSeq)
      }
  }

  def missingSomeUpdates(
      theStore: Map[String, JsValue],
      expectedSeq: Int,
      prevOutstanding: Set[Int],
      waiting: TreeMap[Int, Replicate]): Unit = {

    val askFor = (expectedSeq to waiting.lastKey).iterator
      .filterNot(seq =>
        waiting.contains(seq) ||
        prevOutstanding.contains(seq))
      .toList

    askFor.foreach(askAround)

    if (prevOutstanding.isEmpty) {
      scheduleTick()
    }
    val outstanding = prevOutstanding ++ askFor

    context.become {
      case Replicate(s, _, _, replyTo) if s < expectedSeq =>
        replyTo ! Replicated(s)
      case r: Replicate =>
        consolidate(theStore, expectedSeq, outstanding - r.seq, waiting + (r.seq -> r))
      case TakeOver(active) =>
        log.info("delaying active replica takeOver, at seq {} while highest is {}", expectedSeq, waiting.lastKey)
        awaitingInitialState = Some(active)
      case GetSingle(s, replyTo) =>
        log.info("GetSingle from {}", replyTo)
        if (applied.nonEmpty &&
            applied.head.seq <= s &&
            applied.last.seq >= s) {
          replyTo ! applied.find(_.seq == s).get
        } else if (s < expectedSeq) {
          replyTo ! InitialState(theStore, expectedSeq)
        }
      case GetFull(replyTo) =>
        log.info("sending full info to {}", replyTo)
        replyTo ! InitialState(theStore, expectedSeq)
      case DoConsolidate =>
        outstanding.foreach(askAround)
        scheduleTick()
    }
  }

  private def askAround(seq: Int): Unit = {
    log.info("asking around for sequence number {}", seq)
    getMembers(askAroundCount).foreach(addr => replicaOn(addr) ! GetSingle(seq, self))
  }

  // ... Other helpers elided
  private def consolidate(
      theStore: Map[String, JsValue],
      expectedSeq: Int,
      askedFor: Set[Int],
      waiting: TreeMap[Int, Replicate]): Unit = ???

  private def getMembers(n: Int): Seq[Address] = ???

  private def replicaOn(addr: Address): ActorSelection = ???

  def scheduleTick(): Unit = ???

}

13.2 多主复制模式

代码清单 13-8 使用 CKite 来实现键值存储

class KVStore extends StateMachine {
  private var map = mutable.Map[String, String]()
  private var lastIndex: Long = 0

  def applyWrite: PartialFunction[(Long, WriteCommand[_]), String] = {
    case (index, Put(key: String, value: String)) =>
      map.put(key, value)
      lastIndex = index
      value
  }

  def applyRead: PartialFunction[ReadCommand[_], Option[String]] = {
    case Get(key) => map.get(key)
  }

  def getLastAppliedIndex: Long = lastIndex

  def restoreSnapshot(byteBuffer: ByteBuffer): Unit = {
    map = Serializer.deserialize[mutable.Map[String, String]](byteBuffer.array())
  }

  def takeSnapshot(): ByteBuffer =
    ByteBuffer.wrap(Serializer.serialize(map))

}

代码清单 13-9 按照复制状态机来实例化KVStore

object KVStoreBootstrap extends App {
  val ckite =
    CKiteBuilder().stateMachine(new KVStore()).rpc(FinagleThriftRpc).storage(MapDBStorage()).build
  ckite.start()

  HttpServer(ckite).start()
}

ckite api

val consistentRead: Future[Option[String]] = ckite.read(Get(key))
val possibleStaleRead: Future[Option[String]] = ckite.readLocal(Get(key))
val write: Future[String] = ckite.write(Put(key, value))

代码清单 13-10 图13-1中图形的代码表示

final case class Status(name: String)(_predecessor: => Set[Status], _successor: => Set[Status])
    extends ReplicatedData {

  type T = Status

  def merge(that: Status): Status = mergeStatus(this, that)

  @volatile lazy val predecessors: Set[Status] = _predecessor
  @volatile lazy val successors: Set[Status] = _successor

}

val New: Status =
  Status("new")(Set.empty, Set(Scheduled, Cancelled))
val Scheduled: Status =
  Status("scheduled")(Set(New), Set(Executing, Cancelled))
val Executing: Status =
  Status("executing")(Set(Scheduled), Set(Aborted, Finished))
val Finished: Status =
  Status("finished")(Set(Executing, Aborted), Set.empty)
val Cancelled: Status =
  Status("cancelled")(Set(New, Scheduled), Set(Aborted))
val Aborted: Status =
  Status("aborted")(Set(Cancelled, Executing), Set(Finished))

代码清单 13-11 合并两个状态来产生第三个合并后的状态

def mergeStatus(left: Status, right: Status): Status = {
  /*
   * Keep the left Status in hand and determine whether it is a predecessor of
   * the candidate, moving on to the candidate’s successor if not successful.
   * The list of exclusions is used to avoid performing already determined
   * unsuccessful comparisons again.
   */
  def innerLoop(candidate: Status, exclude: Set[Status]): Status =
    if (isSuccessor(candidate, left, exclude)) {
      candidate
    } else {
      val nextExclude = exclude + candidate
      val branches =
        candidate.successors.map(successor => innerLoop(successor, nextExclude))
      branches.reduce((l, r) => if (isSuccessor(l, r, nextExclude)) r else l)
    }

  def isSuccessor(candidate: Status, fixed: Status, exclude: Set[Status]): Boolean =
    if (candidate == fixed) true
    else {
      val toSearch = candidate.predecessors -- exclude
      toSearch.exists(pred => isSuccessor(pred, fixed, exclude))
    }

  innerLoop(right, Set.empty)
}

代码清单 13-12 使用 Akka Distributed Data 来传播状态变更

class ClientInterface extends Actor with ActorLogging {
  private val replicator: ActorRef = DistributedData(context.system).replicator
  private implicit val cluster: Cluster = Cluster(context.system)

  def receive: Receive = {
    case Submit(job) =>
      log.info("submitting job {}", job)
      replicator ! Replicator.Update(
        StorageComponent,
        ORMap.empty[String, Status],
        Replicator.WriteMajority(5.seconds),
        Some(s"submit $job"))(_ + (job -> New))
    case Cancel(job) =>
      log.info("cancelling job {}", job)
      replicator ! Replicator.Update(
        StorageComponent,
        ORMap.empty[String, Status],
        Replicator.WriteMajority(5.seconds),
        Some(s"cancel $job"))(_ + (job -> Cancelled))
    case r: Replicator.UpdateResponse[_] =>
      log.info("received update result: {}", r)
    case PrintStatus =>
      replicator ! Replicator.Get(StorageComponent, Replicator.ReadMajority(5.seconds))
    case g: Replicator.GetSuccess[_] =>
      log.info("overall status: {}", g.get(StorageComponent))
  }
}

代码清单 13-13 引入对于任务的请求标志

class Executor extends Actor with ActorLogging {
  private val replicator: ActorRef = DistributedData(context.system).replicator
  private implicit val cluster: Cluster = Cluster(context.system)

  private var lastState = Map.empty[String, Status]

  replicator ! Replicator.Subscribe(StorageComponent, self)

  def receive: Receive = {
    case Execute(job) =>
      log.info("executing job {}", job)
      replicator ! Replicator.Update(
        StorageComponent,
        ORMap.empty[String, Status],
        Replicator.WriteMajority(5.seconds),
        Some(job)) { map =>
        require(map.get(job).contains(New))
        map + (job -> Executing)
      }
    case Finish(job) =>
      log.info("job {} finished", job)
      replicator ! Replicator.Update(
        StorageComponent,
        ORMap.empty[String, Status],
        Replicator.WriteMajority(5.seconds))(_ + (job -> Finished))
    case Replicator.UpdateSuccess(StorageComponent, Some(job)) =>
      log.info("starting job {}", job)
    case r: Replicator.UpdateResponse[_] =>
      log.info("received update result: {}", r)
    case ch: Replicator.Changed[_] =>
      val current = ch.get(StorageComponent).entries
      for {
        (job, status) <- current.iterator
        if status == Aborted
        if !lastState.get(job).contains(Aborted)
      } {
        log.info("aborting job {}", job)
        lastState = current
      }
  }
}

13.3 主动-主动复制模式

代码清单 13-14 用无协调工作的实现来开始主动-主动复制模式

private final case class SeqCommand(seq: Int, cmd: Command, replyTo: ActorRef)

private final case class SeqResult(seq: Int, res: Result, replica: ActorRef, replyTo: ActorRef)

private final case class SendInitialData(toReplica: ActorRef)

private final case class InitialData(map: Map[String, JsValue])

class Replica extends Actor with Stash {
  private var map = Map.empty[String, JsValue]

  def receive: Receive = {
    case InitialData(m) =>
      map = m
      context.become(initialized)
      unstashAll()
    case _ => stash()
  }

  def initialized: Receive = {
    case SeqCommand(seq, cmd, replyTo) =>
      // tracking of sequence numbers and resents is elided here
      cmd match {
        case Put(key, value, r) =>
          map += key -> value
          replyTo ! SeqResult(seq, PutConfirmed(key, value), self, r)
        case Get(key, r) =>
          replyTo ! SeqResult(seq, GetResult(key, map.get(key)), self, r)
      }
    case SendInitialData(toReplica) => toReplica ! InitialData(map)
  }
}

代码清单 13-15 封装对于单个客户端请求的知悉情况

private sealed trait ReplyState {
  def deadline: Deadline

  def missing: Set[ActorRef]

  def add(res: SeqResult): ReplyState

  def isFinished: Boolean = missing.isEmpty
}

private final case class Unknown(deadline: Deadline, replies: Set[SeqResult], missing: Set[ActorRef], quorum: Int)
    extends ReplyState {

  override def add(res: SeqResult): ReplyState = {
    val nextReplies = replies + res
    val nextMissing = missing - res.replica
    if (nextReplies.size >= quorum) {
      val answer =
        replies.toSeq.groupBy(_.res).collectFirst {
          case (k, s) if s.size >= quorum => s.head
        }

      if (answer.isDefined) {
        val right = answer.get
        val wrong = replies.collect {
          case SeqResult(_, result, replica, _) if res != right => replica
        }
        Known(deadline, right, wrong, nextMissing)
      } else if (nextMissing.isEmpty) {
        Known.fromUnknown(deadline, nextReplies)
      } else Unknown(deadline, nextReplies, nextMissing, quorum)
    } else Unknown(deadline, nextReplies, nextMissing, quorum)
  }
}

private final case class Known(deadline: Deadline, reply: SeqResult, wrong: Set[ActorRef], missing: Set[ActorRef])
    extends ReplyState {

  override def add(res: SeqResult): ReplyState = {
    val nextWrong =
      if (res.res == reply.res)
        wrong
      else
        wrong + res.replica
    Known(deadline, reply, nextWrong, missing - res.replica)
  }
}

private object Known {
  def fromUnknown(deadline: Deadline, replies: Set[SeqResult]): Known = {
    // did not reach consensus on this one, pick simple majority
    val counts = replies.groupBy(_.res)
    val biggest = counts.iterator.map(_._2.size).max
    val winners = counts.collectFirst {
      case (res, win) if win.size == biggest => win
    }.get
    val losers = (replies -- winners).map(_.replica)
    Known(deadline, winners.head, losers, Set.empty)
  }
}

代码清单 13-16 将副本当作子Actor管理

class Coordinator(N: Int) extends Actor {
  private var replicas = (1 to N).map(_ => newReplica()).toSet
  private val seqNr = Iterator.from(0)
  private var replies = TreeMap.empty[Int, ReplyState]
  private var nextReply = 0

  override def supervisorStrategy: SupervisorStrategy =
    SupervisorStrategy.stoppingStrategy

  private def newReplica(): ActorRef =
    context.watch(context.actorOf(Replica.props))

  // schedule timeout messages for quiescent periods
  context.setReceiveTimeout(1.second)

  def receive: Receive =
    ({
      case cmd: Command =>
        val c = SeqCommand(seqNr.next, cmd, self)
        replicas.foreach(_ ! c)
        replies += c.seq -> Unknown(5.seconds(fromNow), Set.empty, replicas, (replicas.size + 1) / 2)
      case res: SeqResult
          if replies.contains(res.seq) &&
          replicas.contains(res.replica) =>
        val prevState = replies(res.seq)
        val nextState = prevState.add(res)
        replies += res.seq -> nextState
      case Terminated(ref) =>
        replaceReplica(ref, terminate = false)
      case ReceiveTimeout =>
    }: Receive).andThen { _ =>
      doTimeouts()
      sendReplies()
      evictFinished()
    }

  //...
  private def doTimeouts(): Unit = {
    val now = Deadline.now
    val expired = replies.iterator.takeWhile(_._2.deadline <= now)
    for ((seq, state) <- expired) {
      state match {
        case Unknown(deadline, received, _, _) =>
          val forced = Known.fromUnknown(deadline, received)
          replies += seq -> forced
        case Known(deadline, reply, wrong, missing) =>
          replies += seq -> Known(deadline, reply, wrong, Set.empty)
      }
    }
  }


  @tailrec private def sendReplies(): Unit =
    replies.get(nextReply) match {
      case Some(k @ Known(_, reply, _, _)) =>
        reply.replyTo ! reply.res
        nextReply += 1
        sendReplies()
      case _ =>
    }


  @tailrec private def evictFinished(): Unit =
    replies.headOption match {
      case Some((seq, k @ Known(_, _, wrong, _))) if k.isFinished =>
        wrong.foreach(replaceReplica(_, terminate = true))
        replies -= seq
        evictFinished()
      case _ =>
    }

  private def replaceReplica(r: ActorRef, terminate: Boolean): Unit =
    if (replicas contains r) {
      replicas -= r
      if (terminate) r ! PoisonPill
      val replica = newReplica()
      replicas.head ! SendInitialData(replica)
      replicas += replica
    }

}

代码清单 13-17 按序发送回复

@tailrec private def sendReplies(): Unit =
  replies.get(nextReply) match {
    case Some(k @ Known(_, reply, _, _)) =>
      reply.replyTo ! reply.res
      nextReply += 1
      sendReplies()
    case _ =>
  }

代码清单 13-18 一旦timeout,就强迫将“missing”回复转为“known”回复

private def doTimeouts(): Unit = {
  val now = Deadline.now
  val expired = replies.iterator.takeWhile(_._2.deadline <= now)
  for ((seq, state) <- expired) {
    state match {
      case Unknown(deadline, received, _, _) =>
        val forced = Known.fromUnknown(deadline, received)
        replies += seq -> forced
      case Known(deadline, reply, wrong, missing) =>
        replies += seq -> Known(deadline, reply, wrong, Set.empty)
    }
  }
}

代码清单 13-19 终止并替换未完成的副本

@tailrec private def evictFinished(): Unit =
  replies.headOption match {
    case Some((seq, k @ Known(_, _, wrong, _))) if k.isFinished =>
      wrong.foreach(replaceReplica(_, terminate = true))
      replies -= seq
      evictFinished()
    case _ =>
  }

private def replaceReplica(r: ActorRef, terminate: Boolean): Unit =
  if (replicas contains r) {
    replicas -= r
    if (terminate) r ! PoisonPill
    val replica = newReplica()
    replicas.head ! SendInitialData(replica)
    replicas += replica
  }