第16章——流控制模式

在前面的章节里,你学会了如何将系统拆分成较小的部分,以及这些部分之间如何通信以解决较大的任务。有一个我们到目前为止尚未提及的角度:除了确定谁与谁交互之外,你必须同样考虑通信的时效性。为了让你的系统在不同的负载下都更具有回弹性,你需要能够阻止组件因为过量的请求速率而不可控制地失败的机制。为此,这一章介绍了下面四种基本模式。

16.1 拉取模式

代码清单16-1 在拉取输入的工作者内部处理昂贵的计算

class Worker(manager: ActorRef) extends Actor {
  private val mc = new MathContext(100, RoundingMode.HALF_EVEN)
  private val plus = BigDecimal(1, mc)
  private val minus = BigDecimal(-1, mc)

  private var requested = 0

  def request(): Unit =
    if (requested < 5) {
      manager ! WorkRequest(self, 10)
      requested += 10
    }

  request()

  def receive: Receive = {
    case Job(id, data, replyTo) =>
      requested -= 1
      request()
      val sign = if ((data & 1) == 1) plus else minus
      val result = sign / data
      replyTo ! JobResult(id, result)
  }
}

代码清单16-2 按工作者所请求的数目给它们提供任务

class Manager extends Actor {

  private val works: Iterator[Job] =
    Iterator.from(1).map(x => Job(x, x, self)).take(1000000)

  private val aggregator: (BigDecimal, BigDecimal) => BigDecimal = (x: BigDecimal, y: BigDecimal) => x + y
  private val mc = new MathContext(10000, RoundingMode.HALF_EVEN)
  private var approximation = BigDecimal(0, mc)

  private var outstandingWork = 0

  (1 to 8).foreach(_ => context.actorOf(Props(new Worker(self))))

  def receive: Receive = {
    case WorkRequest(worker, items) =>
      works.toStream.take(items).foreach { job =>
        worker ! job
        outstandingWork += 1
      }
    case JobResult(id, report) =>
      approximation = aggregator(approximation, report)
      outstandingWork -= 1
      if (outstandingWork == 0 && works.isEmpty) {
        println("final result: " + approximation)
        context.system.terminate()
      }
  }
}

16.2 托管队列模式

代码清单16-3 管理一个工作队列以对过载作出反应

class Manager extends Actor {

  private var workQueue: Queue[Job] = Queue.empty[Job]
  private var requestQueue: Queue[WorkRequest] = Queue.empty[WorkRequest]

  (1 to 8).foreach(_ => context.actorOf(Props(new Worker(self))))

  def receive: Receive = {
    case job @ Job(id, _, replyTo) =>
      if (requestQueue.isEmpty) {
        if (workQueue.size < 1000) workQueue :+= job
        else replyTo ! JobRejected(id)
      } else {
        val WorkRequest(worker, items) = requestQueue.head
        worker ! job
        if (items > 1) {
          worker ! DummyWork(items - 1)
        }
        requestQueue = requestQueue.drop(1)
      }
    case wr @ WorkRequest(worker, items) =>
      if (workQueue.isEmpty) {
        requestQueue :+= wr
      } else {
        workQueue.iterator.take(items).foreach(job => worker ! job)
        val sent = Math.min(workQueue.size, items)
        if (sent < items) {
          worker ! DummyWork(items - sent)
        }
        workQueue = workQueue.drop(items)
      }
  }
}

QueuePattern.scala

class Worker(manager: ActorRef) extends Actor {
  private val plus = BigDecimal(1, mc)
  private val minus = BigDecimal(-1, mc)

  private var requested = 0

  def request(): Unit =
    if (requested < 5) {
      manager ! WorkRequest(self, 10)
      requested += 10
    }

  request()

  def receive: Receive = {
    case Job(id, data, replyTo) =>
      requested -= 1
      request()
      val sign = if ((data & 1) == 1) plus else minus
      val result = sign / data
      replyTo ! JobResult(id, result)
    case DummyWork(count) =>
      requested -= count
      request()
  }
}

16.3 丢弃模式

DropPattern.scala

private val queueThreshold = 1000
private val dropThreshold = 1384

def random: ThreadLocalRandom = ThreadLocalRandom.current

def shallEnqueue(atSize: Int): Boolean =
  (atSize < queueThreshold) || {
    val dropFactor = (atSize - queueThreshold) >> 6
    random.nextInt(dropFactor + 2) == 0
  }

DropPattern.scala

case job @ Job(id, _, replyTo) =>
  if (requestQueue.isEmpty) {
    val atSize = workQueue.size
    if (shallEnqueue(atSize)) {
      workQueue :+= job
    } else if (atSize < dropThreshold) {
      replyTo ! JobRejected(id)
    }
  } else {

DropPatternWithProtection.scala

private final case class WorkEnvelope(job: Job) {
  @volatile var consumed = false
}

private class IncomingQueue(manager: ActorRef) extends Actor {
  private var workQueue: Queue[WorkEnvelope] = Queue.empty[WorkEnvelope]

  def receive: Receive = {
    case job: Job =>
      workQueue = workQueue.dropWhile(_.consumed)
      if (workQueue.size < 1000) {
        val envelope = WorkEnvelope(job)
        workQueue :+= envelope
        manager ! envelope
      }
  }
}

16.4 限流模式

代码清单16-4 根据特定速率使用令牌桶来拉取工作

class CalculatorClient(
    workSource: ActorRef,
    calculator: ActorRef,
    ratePerSecond: Long,
    bucketSize: Int,
    batchSize: Int)
    extends Actor {
  def now(): Long = System.nanoTime()

  private val nanoSecondsBetweenTokens: Long =
    1000000000L / ratePerSecond

  private var tokenBucket: Int = bucketSize
  private var lastTokenTime: Long = now()

  def refillBucket(time: Long): Unit = {
    val accrued = (time -
      lastTokenTime) * ratePerSecond / 1000000000L
    if (tokenBucket + accrued >= bucketSize) {
      tokenBucket = bucketSize
      lastTokenTime = time
    } else {
      tokenBucket += accrued.toInt
      lastTokenTime += accrued * nanoSecondsBetweenTokens
    }
  }

  def consumeToken(time: Long): Unit = {
    // always refill first since we do it upon activity and not scheduled
    refillBucket(time)
    tokenBucket -= 1
  }

  /**
   * second part: managing the pull pattern’s demand
   */
  private var requested = 0

  def request(time: Long): Unit =
    if (tokenBucket - requested >= batchSize) {
      sendRequest(time, batchSize)
    } else if (requested == 0) {
      if (tokenBucket > 0) {
        sendRequest(time, tokenBucket)
      } else {
        val timeForNextToken =
          lastTokenTime + nanoSecondsBetweenTokens - time
        context.system.scheduler.scheduleOnce(timeForNextToken.nanos, workSource, WorkRequest(self, 1))(
          context.dispatcher)
        requested = 1
        if (Debug) {
          println(s"$time: request(1) scheduled for ${time + timeForNextToken}")
        }
      }
    } else if (Debug) {
      println(s"$time: not requesting (requested=$requested tokenBucket=$tokenBucket)")
    }

  def sendRequest(time: Long, items: Int): Unit = {
    if (Debug) {
      println(s"$time: requesting $items items (requested=$requested tokenBucket=$tokenBucket)")
    }
    workSource ! WorkRequest(self, items)
    requested += items
  }

  request(lastTokenTime)

  /**
   * third part: using the above for rate-regulated message forwarding
   */
  def receive: Receive = {
    case job: Job =>
      val time = now()
      if (Debug && requested == 1) {
        println(s"$time: received job")
      }
      consumeToken(time)
      requested -= 1
      request(time)
      calculator ! job
  }
}