第16章——流控制模式
在前面的章节里,你学会了如何将系统拆分成较小的部分,以及这些部分之间如何通信以解决较大的任务。有一个我们到目前为止尚未提及的角度:除了确定谁与谁交互之外,你必须同样考虑通信的时效性。为了让你的系统在不同的负载下都更具有回弹性,你需要能够阻止组件因为过量的请求速率而不可控制地失败的机制。为此,这一章介绍了下面四种基本模式。
16.1 拉取模式
代码清单16-1 在拉取输入的工作者内部处理昂贵的计算
class Worker(manager: ActorRef) extends Actor {
private val mc = new MathContext(100, RoundingMode.HALF_EVEN)
private val plus = BigDecimal(1, mc)
private val minus = BigDecimal(-1, mc)
private var requested = 0
def request(): Unit =
if (requested < 5) {
manager ! WorkRequest(self, 10)
requested += 10
}
request()
def receive: Receive = {
case Job(id, data, replyTo) =>
requested -= 1
request()
val sign = if ((data & 1) == 1) plus else minus
val result = sign / data
replyTo ! JobResult(id, result)
}
}
代码清单16-2 按工作者所请求的数目给它们提供任务
class Manager extends Actor {
private val works: Iterator[Job] =
Iterator.from(1).map(x => Job(x, x, self)).take(1000000)
private val aggregator: (BigDecimal, BigDecimal) => BigDecimal = (x: BigDecimal, y: BigDecimal) => x + y
private val mc = new MathContext(10000, RoundingMode.HALF_EVEN)
private var approximation = BigDecimal(0, mc)
private var outstandingWork = 0
(1 to 8).foreach(_ => context.actorOf(Props(new Worker(self))))
def receive: Receive = {
case WorkRequest(worker, items) =>
works.toStream.take(items).foreach { job =>
worker ! job
outstandingWork += 1
}
case JobResult(id, report) =>
approximation = aggregator(approximation, report)
outstandingWork -= 1
if (outstandingWork == 0 && works.isEmpty) {
println("final result: " + approximation)
context.system.terminate()
}
}
}
16.2 托管队列模式
代码清单16-3 管理一个工作队列以对过载作出反应
class Manager extends Actor {
private var workQueue: Queue[Job] = Queue.empty[Job]
private var requestQueue: Queue[WorkRequest] = Queue.empty[WorkRequest]
(1 to 8).foreach(_ => context.actorOf(Props(new Worker(self))))
def receive: Receive = {
case job @ Job(id, _, replyTo) =>
if (requestQueue.isEmpty) {
if (workQueue.size < 1000) workQueue :+= job
else replyTo ! JobRejected(id)
} else {
val WorkRequest(worker, items) = requestQueue.head
worker ! job
if (items > 1) {
worker ! DummyWork(items - 1)
}
requestQueue = requestQueue.drop(1)
}
case wr @ WorkRequest(worker, items) =>
if (workQueue.isEmpty) {
requestQueue :+= wr
} else {
workQueue.iterator.take(items).foreach(job => worker ! job)
val sent = Math.min(workQueue.size, items)
if (sent < items) {
worker ! DummyWork(items - sent)
}
workQueue = workQueue.drop(items)
}
}
}
QueuePattern.scala
class Worker(manager: ActorRef) extends Actor {
private val plus = BigDecimal(1, mc)
private val minus = BigDecimal(-1, mc)
private var requested = 0
def request(): Unit =
if (requested < 5) {
manager ! WorkRequest(self, 10)
requested += 10
}
request()
def receive: Receive = {
case Job(id, data, replyTo) =>
requested -= 1
request()
val sign = if ((data & 1) == 1) plus else minus
val result = sign / data
replyTo ! JobResult(id, result)
case DummyWork(count) =>
requested -= count
request()
}
}
16.3 丢弃模式
DropPattern.scala
private val queueThreshold = 1000
private val dropThreshold = 1384
def random: ThreadLocalRandom = ThreadLocalRandom.current
def shallEnqueue(atSize: Int): Boolean =
(atSize < queueThreshold) || {
val dropFactor = (atSize - queueThreshold) >> 6
random.nextInt(dropFactor + 2) == 0
}
DropPattern.scala
case job @ Job(id, _, replyTo) =>
if (requestQueue.isEmpty) {
val atSize = workQueue.size
if (shallEnqueue(atSize)) {
workQueue :+= job
} else if (atSize < dropThreshold) {
replyTo ! JobRejected(id)
}
} else {
DropPatternWithProtection.scala
private final case class WorkEnvelope(job: Job) {
@volatile var consumed = false
}
private class IncomingQueue(manager: ActorRef) extends Actor {
private var workQueue: Queue[WorkEnvelope] = Queue.empty[WorkEnvelope]
def receive: Receive = {
case job: Job =>
workQueue = workQueue.dropWhile(_.consumed)
if (workQueue.size < 1000) {
val envelope = WorkEnvelope(job)
workQueue :+= envelope
manager ! envelope
}
}
}
16.4 限流模式
代码清单16-4 根据特定速率使用令牌桶来拉取工作
class CalculatorClient(
workSource: ActorRef,
calculator: ActorRef,
ratePerSecond: Long,
bucketSize: Int,
batchSize: Int)
extends Actor {
def now(): Long = System.nanoTime()
private val nanoSecondsBetweenTokens: Long =
1000000000L / ratePerSecond
private var tokenBucket: Int = bucketSize
private var lastTokenTime: Long = now()
def refillBucket(time: Long): Unit = {
val accrued = (time -
lastTokenTime) * ratePerSecond / 1000000000L
if (tokenBucket + accrued >= bucketSize) {
tokenBucket = bucketSize
lastTokenTime = time
} else {
tokenBucket += accrued.toInt
lastTokenTime += accrued * nanoSecondsBetweenTokens
}
}
def consumeToken(time: Long): Unit = {
// always refill first since we do it upon activity and not scheduled
refillBucket(time)
tokenBucket -= 1
}
/**
* second part: managing the pull pattern’s demand
*/
private var requested = 0
def request(time: Long): Unit =
if (tokenBucket - requested >= batchSize) {
sendRequest(time, batchSize)
} else if (requested == 0) {
if (tokenBucket > 0) {
sendRequest(time, tokenBucket)
} else {
val timeForNextToken =
lastTokenTime + nanoSecondsBetweenTokens - time
context.system.scheduler.scheduleOnce(timeForNextToken.nanos, workSource, WorkRequest(self, 1))(
context.dispatcher)
requested = 1
if (Debug) {
println(s"$time: request(1) scheduled for ${time + timeForNextToken}")
}
}
} else if (Debug) {
println(s"$time: not requesting (requested=$requested tokenBucket=$tokenBucket)")
}
def sendRequest(time: Long, items: Int): Unit = {
if (Debug) {
println(s"$time: requesting $items items (requested=$requested tokenBucket=$tokenBucket)")
}
workSource ! WorkRequest(self, items)
requested += items
}
request(lastTokenTime)
/**
* third part: using the above for rate-regulated message forwarding
*/
def receive: Receive = {
case job: Job =>
val time = now()
if (Debug && requested == 1) {
println(s"$time: received job")
}
consumeToken(time)
requested -= 1
request(time)
calculator ! job
}
}
1.0.0