第12章——容错及恢复模式

在这一章中,你将学习到在设计应用程序时如何应对失败出现的可能性。我们将通过具体地构建一个具有回弹性的计算引擎来演示几个相应的模式,这个系统允许提交批处理作业,并在有着弹性调度的硬件资源上执行。我们将基于你在第6章和第7章所学习到的知识进行展开,因此,你可能想要回顾一下之前的内容。

12.4. 断路器模式

代码清单 12-1 利用断路器使得失败组件有时间恢复

private object StorageFailed extends RuntimeException

import akka.rdpextras.ExecutionContexts.sameThreadExecutionContext

private def sendToStorage(job: Job): Future[StorageStatus] = {
  val f: Future[StorageStatus] = ??? //...
  f.map {
    case StorageStatus.Failed => throw StorageFailed
    case other                => other
  }
}

import scala.concurrent.duration._

private val breaker = CircuitBreaker(system.scheduler, 5, 300.millis, 30.seconds)

def persist(job: Job): Future[StorageStatus] = {
  breaker.withCircuitBreaker(sendToStorage(job)).recover {
    case StorageFailed                  => StorageStatus.Failed
    case _: TimeoutException            => StorageStatus.Unknown
    case _: CircuitBreakerOpenException => StorageStatus.Failed
  }
}

代码清单12-2 使用速率限制器保护组件

import scala.concurrent.Future
import scala.concurrent.duration.{ Deadline, FiniteDuration }

case object RateLimitExceeded extends RuntimeException

class RateLimiter(requests: Int, period: FiniteDuration) {
  private val startTimes = {
    val onePeriodAgo = Deadline.now - period
    Array.fill(requests)(onePeriodAgo)
  }
  private var position = 0

  private def lastTime = startTimes(position)

  private def enqueue(time: Deadline): Unit = {
    startTimes(position) = time
    position += 1
    if (position == requests) position = 0
  }

  def call[T](block: => Future[T]): Future[T] = {
    val now = Deadline.now
    if ((now - lastTime) < period) {
      Future.failed(RateLimitExceeded)
    } else {
      enqueue(now)
      block
    }
  }
}

代码清单12-3 断路器:限制来自同一个客户端的请求

private val limiter = new RateLimiter(100, 2.seconds)

def persistForThisClient(job: Job): Future[StorageStatus] = {
  import akka.rdpextras.ExecutionContexts.sameThreadExecutionContext
  limiter.call(persist(job)).recover {
    case RateLimitExceeded => StorageStatus.Failed
  }
}

代码清单12-4 门控一个客户端

private val limiter = new RateLimiter(100, 2.seconds)
private val breaker = CircuitBreaker(system.scheduler, 10, Duration.Zero, 10.seconds)

def persistForThisClient(job: Job): Future[StorageStatus] = {
  import akka.rdpextras.ExecutionContexts.sameThreadExecutionContext
  breaker.withCircuitBreaker(limiter.call(persist(job))).recover {
    case RateLimitExceeded              => StorageStatus.Failed
    case _: CircuitBreakerOpenException => StorageStatus.Gated
  }
}