第11章——测试反应式应用程序

11.3 异步测试

代码清单 11-1 测试一个完全同步的翻译函数

val input = "Hur mår du?"
val output = "How are you?"
translate(input) should be(output)

AsyncSpec.scala

val input = "Hur mår du?"
val output = "How are you?"
val future = translate(input)
// what now?

ActorSpec.scala

val input = "Hur mår du?"
val output = "How are you?"
val probe = TestProbe()
translationService ! Translate(input, probe.ref)
// when can we continue?

AsyncSpecWithWhile.scala

while (!future.isCompleted) Thread.sleep(50)

AsyncSpecWithWhileLoopIterationsBounded.scala

var i = 20
while (!future.isCompleted && i > 0) {
  i -= 1
  Thread.sleep(50)
}
if (i == 0) fail("translation was not received in time")

代码清单 11-2 等待结果的动作在翻译过程中同步阻塞

val input = "Hur mår du?"
val output = "How are you?"
val result = Await.result(translate(input), 1.second)
result should be(output)

代码清单 11-3 使用 TestProbe 预期答复

val input = "Hur mår du?"
val output = "How are you?"
val probe = TestProbe()
translationService ! Translate(input, probe.ref)
probe.expectMsg(1.second, output)

代码清单 11-4 使用 TestProbe 接收调度程序的响应

val probe = TestProbe()

val start = Timestamp.now
scheduler ! Schedule(probe.ref, "tick", 1.second)
probe.expectMsg(2.seconds, "tick")

val stop = Timestamp.now

val duration = stop - start
assert(duration > 950.millis, "tick came in early")
assert(duration < 1050.millis, "tick came in late")
// when can we continue?

代码清单 11-5 确定第 95 百分位的延迟

val probe = TestProbe()

val echo = echoService("keepSLA")
val N = 200
val timings = for (i <- 1 to N) yield {
  val string = s"test$i"
  val start = Timestamp.now
  echo ! Request(string, probe.ref)
  probe.expectMsg(100.millis, s"test run $i", Response(string))
  val stop = Timestamp.now
  stop - start
}
// discard top 5%
val sorted = timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
val SLA = if (Helpers.isCiTest) 25.milliseconds else 1.millisecond
ninetyfifthPercentile.toFiniteDuration should be <= SLA

代码清单 11-6 用 Ask 模式并行地生成测试样本

val echo = echoService("keepSLAWithFuture")

val N = 10000
val timingFutures = for (i <- 1 to N) yield {
  val string = s"test$i"
  val start = Timestamp.now
  (echo ? (Request(string, _))).collect {
    case Response(`string`) => Timestamp.now - start
  }
}

val futureOfTimings = Future.sequence(timingFutures)
val timings = Await.result(futureOfTimings, 5.seconds)
// discard top 5%
val sorted = timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")

val SLA = if (Helpers.isCiTest) 500.milliseconds else 100.milliseconds

ninetyfifthPercentile.toFiniteDuration should be < SLA

代码清单 11-7 使用自定义 Actor 来限制并行测试样本的数量

val echo = echoService("keepSLAInParallel")
val probe = TestProbe()

val N = 10000
val maxParallelism = 500
val controller = system.actorOf(Props[ParallelSLATester], "keepSLAInParallelController")
controller ! TestSLA(echo, N, maxParallelism, probe.ref)

val result = Try(probe.expectMsgType[SLAResponse]).recover {
  case ae: AssertionError =>
    controller ! AbortSLATest
    val result = probe.expectMsgType[SLAResponse]
    info(s"controller timed out, state so far is $result")
    throw ae
}.get

// discard top 5%
val sorted = result.timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
val SLA = if (Helpers.isCiTest) 25.milliseconds else 2.milliseconds

ninetyfifthPercentile should be < SLA

代码清单 11-8 验证没有收到额外的消息

import scala.concurrent.duration._
val scheduler = system.actorOf(Scheduler.props)

val probe = TestProbe()
scheduler ! ScheduleRepeatedly(probe.ref, 1.second, "tick")
val token = probe.expectMsgType[SchedulerToken]
probe.expectMsg(1500.millis, "tick")
scheduler ! CancelSchedule(token, probe.ref)
probe.expectMsg(100.millis, ScheduleCanceled)
probe.expectNoMessage(2.seconds)

代码清单 11-9 对具有关联 ID 的请求匹配响应

val ingestService = system.actorOf(DataIngester.props)
val probe = TestProbe()
ingestService ! Retrieve(url, "myID", probe.ref)
val replies = probe.receiveWhile(1.second) {
  case r @ Record("myID", _) => r
}
probe.expectMsg(0.seconds, EOF)

代码清单 11-10 强制同步执行:仅对非阻塞处理安全

val tr = new TranslationService
val input = "Hur mår du?"
val output = "How are you?"
val ec = SynchronousEventLoop
val future = tr.translate(input, ec)
future.value.get should be(Success(output))

代码清单 11-11 使用 CallingThreadDispatcher 来处理调用线程上的消息

val translationService =
  system.actorOf(TranslationService.props.withDispatcher("akka.test.calling-thread-dispatcher"))
val input = "Hur mår du?"
val output = "How are you?"
val probe = TestProbe()
translationService ! Translate(input, probe.ref)
probe.expectMsg(0.seconds, output)

代码清单 11-12 将超时参数移到外部配置

val tr = new TranslationService
val input = "Hur mår du?"
val output = "How are you?"
val future = tr.translate(input)
eventually {
  future.value.get should be(Success(output))
}

代码清单 11-13 异步地处理响应,从而创建完全反应式的测试

val echo = echoService()
val N = 10000
val maxParallelism = 500
val controller = system.actorOf(Props[ParallelSLATester], "keepSLAInParallelAndAsyncController")
val future = controller ? (TestSLA(echo, N, maxParallelism, _))
for (SLAResponse(timings, outstanding) <- future) yield {

  val sorted = timings.sorted
  val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
  info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
  val SLA = if (Helpers.isCiTest) 25.milliseconds else 2.milliseconds

  ninetyfifthPercentile should be > SLA
}

代码清单 11-14 使用 async 和 await 提高异步测试的可读性

async {
  val input = "Hur mår du?"
  val output = "How are you?"
  await(translate(input).withTimeout(5.seconds)) should be(output)
}

代码清单 11-15 在 JavaScript 中测试翻译服务

describe('Translator', function () {
    describe('#translate()', function () {
        it('should yield the correct result', function () {
            return tr.translate('Hur mår du?')
                .should.eventually.equal('How are you?');
        })
    })
});

代码清单 11-16 使用请求-响应工厂来生成测试流量

async {
  val echo = echoService("keepSLAwithSupport")
  val latencySupport = new LatencyTestSupport(system)
  val latenciesFuture = latencySupport.measure(count = 10000, maxParallelism = 500) { i =>
    val message = s"test$i"
    SingleResult(echo ? (Request(message, _)), Response(message))
  }
  val latencies = await(akka.pattern.after(20.seconds, system.scheduler)(latenciesFuture))

  info(s"latency info: $latencies")
  latencies.failureCount should be(0)
  val SLA = if (Helpers.isCiTest) 50.milliseconds else 10.milliseconds
  latencies.quantile(0.99) should be < SLA
}

代码清单 11-17 简单的翻译 API

final case class TranslateV1(query: String, replyTo: ActorRef)

代码清单 11-18 将更严格的类型添加到翻译 API

final case class TranslateV2(phrase: String, inputLanguage: String, outputLanguage: String, replyTo: ActorRef)

sealed trait TranslationResponseV2

final case class TranslationV2(
    inputPhrase: String,
    outputPhrase: String,
    inputLanguage: String,
    outputLanguage: String)

final case class TranslationErrorV2(
    inputPhrase: String,
    inputLanguage: String,
    outputLanguage: String,
    errorMessage: String)

代码清单 11-19 测试翻译版本适配器

val v1 = TestProbe()
val v2 = system.actorOf(propsV2(v1.ref))
val client = TestProbe()

// initiate a request to the adapter
v2 ! TranslateV2("Hur mår du?", "sv", "en", client.ref)

// verify that the adapter asks the V1 service back-end
val req1 = v1.expectMsgType[TranslateV1]
req1.query should be("sv:en:Hur mår du?")

// initiate a reply
req1.replyTo ! "How are you?"

// verify that the adapter transforms it correctly
client.expectMsg(TranslationV2("Hur mår du?", "How are you?", "sv", "en"))

// now verify translation errors
v2 ! TranslateV2("Hur är läget?", "sv", "en", client.ref)
val req2 = v1.expectMsgType[TranslateV1]
// this implicitly verifies that no other communication happened with V1
req2.query should be("sv:en:Hur är läget?")
req2.replyTo ! "error:cannot parse input 'sv:en:Hur är läget?'"
client.expectMsg(TranslationErrorV2("Hur är läget?", "sv", "en", "cannot parse input 'sv:en:Hur är läget?'"))

v1.expectNoMessage(3.second)

代码清单 11-20 模拟错误过程

case object ExpectNominal

case object ExpectError

final case class Unexpected(msg: Any)

class MockV1(reporter: ActorRef) extends Actor {
  def receive: Receive = initial

  override def unhandled(msg: Any): Unit = {
    reporter ! Unexpected(msg)
  }

  private val initial: Receive = {
    case ExpectNominal => context.become(expectingNominal)
    case ExpectError   => context.become(expectingError)
  }

  def expectingNominal: Receive = {
    case TranslateV1("sv:en:Hur mår du?", replyTo) =>
      replyTo ! "How are you?"
      context.become(initial)
  }

  def expectingError: Receive = {
    case TranslateV1(other, replyTo) =>
      replyTo ! s"error:cannot parse input '$other'"
      context.become(initial)
  }
}

代码清单 11-21 测试正确的错误处理

val asyncErrors = TestProbe()
val v1 = system.actorOf(mockV1props(asyncErrors.ref))
val v2 = system.actorOf(propsV2(v1))
val client = TestProbe()

// initiate a request to the adapter
v1 ! ExpectNominal
v2 ! TranslateV2("Hur mår du?", "sv", "en", client.ref)

// verify that the adapter transforms it correctly
client.expectMsg(TranslationV2("Hur mår du?", "How are you?", "sv", "en"))

// non-blocking check for async errors
asyncErrors.expectNoMessage(0.seconds)

// now verify translation errors
v1 ! ExpectError
v2 ! TranslateV2("Hur är läget?", "sv", "en", client.ref)
client.expectMsg(TranslationErrorV2("Hur är läget?", "sv", "en", "cannot parse input 'sv:en:Hur är läget?'"))

// final check for async errors
asyncErrors.expectNoMessage(1.second)

11.6 测试回弹性

代码清单 11-22 要测试的基本 Actor

class MyActor extends Actor {
  def receive: Receive = {
    case _ => throw new NullPointerException
  }
}

代码清单 11-23 为被测 Actor 提供测试上下文

class StepParent extends Actor {
  override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
    case ex => Restart
  }

  def receive: Receive = {
    case p: Props =>
      sender ! context.actorOf(p, "child")
  }
}

代码清单 11-24 在 StepParent 的上下文中测试 Actor

class StepParentSpec extends WordSpec with Matchers with BeforeAndAfterAll {
  implicit val system: ActorSystem = ActorSystem()

  "An actor that throws an exception" must {
    "Result in the supervisor returning a reference to that actor" in {
      val testProbe = TestProbe()
      val parent = system.actorOf(Props[StepParent], "stepParent")
      parent.tell(Props[MyActor], testProbe.ref)
      val child = testProbe.expectMsgType[ActorRef]
      // ...
      // Test whatever we want in the actor
    }
  }

  override def afterAll(): Unit = {
    val terminated = system.terminate()
    Await.ready(terminated, Duration.Inf)
  }
}

代码清单 11-25 将失败报告给指定的 Actor

class FailureParent(failures: ActorRef) extends Actor {
  private val props: Props = Props[MyFailureParentActor]
  private val child: ActorRef = context.actorOf(props, "child")
  override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
    case f => failures ! f; Stop
  }

  def receive: Receive = {
    case msg => child.forward(msg)
  }
}

代码清单 11-26 在测试中移除监督

case object TestFailureParentMessage

class FailureParentSpec extends WordSpec with Matchers with BeforeAndAfterAll {
  implicit val system: ActorSystem = ActorSystem()

  "Using a FailureParent" must {
    "Result in failures being collected and returned" in {
      val failures = TestProbe()
      val failureParent = system.actorOf(Props(new FailureParent(failures.ref)))
      failureParent ! TestFailureParentMessage
      failures.expectMsgType[NullPointerException]
    }
  }

  override def afterAll(): Unit = {
    val terminated = system.terminate()
    Await.ready(terminated, Duration.Inf)
  }
}