第11章——测试反应式应用程序
11.3 异步测试
代码清单 11-1 测试一个完全同步的翻译函数
val input = "Hur mår du?"
val output = "How are you?"
translate(input) should be(output)
AsyncSpec.scala
val input = "Hur mår du?"
val output = "How are you?"
val future = translate(input)
// what now?
ActorSpec.scala
val input = "Hur mår du?"
val output = "How are you?"
val probe = TestProbe()
translationService ! Translate(input, probe.ref)
// when can we continue?
AsyncSpecWithWhile.scala
while (!future.isCompleted) Thread.sleep(50)
AsyncSpecWithWhileLoopIterationsBounded.scala
var i = 20
while (!future.isCompleted && i > 0) {
i -= 1
Thread.sleep(50)
}
if (i == 0) fail("translation was not received in time")
代码清单 11-2 等待结果的动作在翻译过程中同步阻塞
val input = "Hur mår du?"
val output = "How are you?"
val result = Await.result(translate(input), 1.second)
result should be(output)
代码清单 11-3 使用 TestProbe 预期答复
val input = "Hur mår du?"
val output = "How are you?"
val probe = TestProbe()
translationService ! Translate(input, probe.ref)
probe.expectMsg(1.second, output)
代码清单 11-4 使用 TestProbe 接收调度程序的响应
val probe = TestProbe()
val start = Timestamp.now
scheduler ! Schedule(probe.ref, "tick", 1.second)
probe.expectMsg(2.seconds, "tick")
val stop = Timestamp.now
val duration = stop - start
assert(duration > 950.millis, "tick came in early")
assert(duration < 1050.millis, "tick came in late")
// when can we continue?
代码清单 11-5 确定第 95 百分位的延迟
val probe = TestProbe()
val echo = echoService("keepSLA")
val N = 200
val timings = for (i <- 1 to N) yield {
val string = s"test$i"
val start = Timestamp.now
echo ! Request(string, probe.ref)
probe.expectMsg(100.millis, s"test run $i", Response(string))
val stop = Timestamp.now
stop - start
}
// discard top 5%
val sorted = timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
val SLA = if (Helpers.isCiTest) 25.milliseconds else 1.millisecond
ninetyfifthPercentile.toFiniteDuration should be <= SLA
代码清单 11-6 用 Ask 模式并行地生成测试样本
val echo = echoService("keepSLAWithFuture")
val N = 10000
val timingFutures = for (i <- 1 to N) yield {
val string = s"test$i"
val start = Timestamp.now
(echo ? (Request(string, _))).collect {
case Response(`string`) => Timestamp.now - start
}
}
val futureOfTimings = Future.sequence(timingFutures)
val timings = Await.result(futureOfTimings, 5.seconds)
// discard top 5%
val sorted = timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
val SLA = if (Helpers.isCiTest) 500.milliseconds else 100.milliseconds
ninetyfifthPercentile.toFiniteDuration should be < SLA
代码清单 11-7 使用自定义 Actor 来限制并行测试样本的数量
val echo = echoService("keepSLAInParallel")
val probe = TestProbe()
val N = 10000
val maxParallelism = 500
val controller = system.actorOf(Props[ParallelSLATester], "keepSLAInParallelController")
controller ! TestSLA(echo, N, maxParallelism, probe.ref)
val result = Try(probe.expectMsgType[SLAResponse]).recover {
case ae: AssertionError =>
controller ! AbortSLATest
val result = probe.expectMsgType[SLAResponse]
info(s"controller timed out, state so far is $result")
throw ae
}.get
// discard top 5%
val sorted = result.timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
val SLA = if (Helpers.isCiTest) 25.milliseconds else 2.milliseconds
ninetyfifthPercentile should be < SLA
代码清单 11-8 验证没有收到额外的消息
import scala.concurrent.duration._
val scheduler = system.actorOf(Scheduler.props)
val probe = TestProbe()
scheduler ! ScheduleRepeatedly(probe.ref, 1.second, "tick")
val token = probe.expectMsgType[SchedulerToken]
probe.expectMsg(1500.millis, "tick")
scheduler ! CancelSchedule(token, probe.ref)
probe.expectMsg(100.millis, ScheduleCanceled)
probe.expectNoMessage(2.seconds)
代码清单 11-9 对具有关联 ID 的请求匹配响应
val ingestService = system.actorOf(DataIngester.props)
val probe = TestProbe()
ingestService ! Retrieve(url, "myID", probe.ref)
val replies = probe.receiveWhile(1.second) {
case r @ Record("myID", _) => r
}
probe.expectMsg(0.seconds, EOF)
代码清单 11-10 强制同步执行:仅对非阻塞处理安全
val tr = new TranslationService
val input = "Hur mår du?"
val output = "How are you?"
val ec = SynchronousEventLoop
val future = tr.translate(input, ec)
future.value.get should be(Success(output))
代码清单 11-11 使用 CallingThreadDispatcher 来处理调用线程上的消息
val translationService =
system.actorOf(TranslationService.props.withDispatcher("akka.test.calling-thread-dispatcher"))
val input = "Hur mår du?"
val output = "How are you?"
val probe = TestProbe()
translationService ! Translate(input, probe.ref)
probe.expectMsg(0.seconds, output)
代码清单 11-12 将超时参数移到外部配置
val tr = new TranslationService
val input = "Hur mår du?"
val output = "How are you?"
val future = tr.translate(input)
eventually {
future.value.get should be(Success(output))
}
代码清单 11-13 异步地处理响应,从而创建完全反应式的测试
val echo = echoService()
val N = 10000
val maxParallelism = 500
val controller = system.actorOf(Props[ParallelSLATester], "keepSLAInParallelAndAsyncController")
val future = controller ? (TestSLA(echo, N, maxParallelism, _))
for (SLAResponse(timings, outstanding) <- future) yield {
val sorted = timings.sorted
val ninetyfifthPercentile = sorted.dropRight(N * 5 / 100).last
info(s"SLA min=${sorted.head} max=${sorted.last} 95th=$ninetyfifthPercentile")
val SLA = if (Helpers.isCiTest) 25.milliseconds else 2.milliseconds
ninetyfifthPercentile should be > SLA
}
代码清单 11-14 使用 async 和 await 提高异步测试的可读性
async {
val input = "Hur mår du?"
val output = "How are you?"
await(translate(input).withTimeout(5.seconds)) should be(output)
}
代码清单 11-15 在 JavaScript 中测试翻译服务
describe('Translator', function () {
describe('#translate()', function () {
it('should yield the correct result', function () {
return tr.translate('Hur mår du?')
.should.eventually.equal('How are you?');
})
})
});
代码清单 11-16 使用请求-响应工厂来生成测试流量
async {
val echo = echoService("keepSLAwithSupport")
val latencySupport = new LatencyTestSupport(system)
val latenciesFuture = latencySupport.measure(count = 10000, maxParallelism = 500) { i =>
val message = s"test$i"
SingleResult(echo ? (Request(message, _)), Response(message))
}
val latencies = await(akka.pattern.after(20.seconds, system.scheduler)(latenciesFuture))
info(s"latency info: $latencies")
latencies.failureCount should be(0)
val SLA = if (Helpers.isCiTest) 50.milliseconds else 10.milliseconds
latencies.quantile(0.99) should be < SLA
}
代码清单 11-17 简单的翻译 API
final case class TranslateV1(query: String, replyTo: ActorRef)
代码清单 11-18 将更严格的类型添加到翻译 API
final case class TranslateV2(phrase: String, inputLanguage: String, outputLanguage: String, replyTo: ActorRef)
sealed trait TranslationResponseV2
final case class TranslationV2(
inputPhrase: String,
outputPhrase: String,
inputLanguage: String,
outputLanguage: String)
final case class TranslationErrorV2(
inputPhrase: String,
inputLanguage: String,
outputLanguage: String,
errorMessage: String)
代码清单 11-19 测试翻译版本适配器
val v1 = TestProbe()
val v2 = system.actorOf(propsV2(v1.ref))
val client = TestProbe()
// initiate a request to the adapter
v2 ! TranslateV2("Hur mår du?", "sv", "en", client.ref)
// verify that the adapter asks the V1 service back-end
val req1 = v1.expectMsgType[TranslateV1]
req1.query should be("sv:en:Hur mår du?")
// initiate a reply
req1.replyTo ! "How are you?"
// verify that the adapter transforms it correctly
client.expectMsg(TranslationV2("Hur mår du?", "How are you?", "sv", "en"))
// now verify translation errors
v2 ! TranslateV2("Hur är läget?", "sv", "en", client.ref)
val req2 = v1.expectMsgType[TranslateV1]
// this implicitly verifies that no other communication happened with V1
req2.query should be("sv:en:Hur är läget?")
req2.replyTo ! "error:cannot parse input 'sv:en:Hur är läget?'"
client.expectMsg(TranslationErrorV2("Hur är läget?", "sv", "en", "cannot parse input 'sv:en:Hur är läget?'"))
v1.expectNoMessage(3.second)
代码清单 11-20 模拟错误过程
case object ExpectNominal
case object ExpectError
final case class Unexpected(msg: Any)
class MockV1(reporter: ActorRef) extends Actor {
def receive: Receive = initial
override def unhandled(msg: Any): Unit = {
reporter ! Unexpected(msg)
}
private val initial: Receive = {
case ExpectNominal => context.become(expectingNominal)
case ExpectError => context.become(expectingError)
}
def expectingNominal: Receive = {
case TranslateV1("sv:en:Hur mår du?", replyTo) =>
replyTo ! "How are you?"
context.become(initial)
}
def expectingError: Receive = {
case TranslateV1(other, replyTo) =>
replyTo ! s"error:cannot parse input '$other'"
context.become(initial)
}
}
代码清单 11-21 测试正确的错误处理
val asyncErrors = TestProbe()
val v1 = system.actorOf(mockV1props(asyncErrors.ref))
val v2 = system.actorOf(propsV2(v1))
val client = TestProbe()
// initiate a request to the adapter
v1 ! ExpectNominal
v2 ! TranslateV2("Hur mår du?", "sv", "en", client.ref)
// verify that the adapter transforms it correctly
client.expectMsg(TranslationV2("Hur mår du?", "How are you?", "sv", "en"))
// non-blocking check for async errors
asyncErrors.expectNoMessage(0.seconds)
// now verify translation errors
v1 ! ExpectError
v2 ! TranslateV2("Hur är läget?", "sv", "en", client.ref)
client.expectMsg(TranslationErrorV2("Hur är läget?", "sv", "en", "cannot parse input 'sv:en:Hur är läget?'"))
// final check for async errors
asyncErrors.expectNoMessage(1.second)
11.6 测试回弹性
代码清单 11-22 要测试的基本 Actor
class MyActor extends Actor {
def receive: Receive = {
case _ => throw new NullPointerException
}
}
代码清单 11-23 为被测 Actor 提供测试上下文
class StepParent extends Actor {
override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
case ex => Restart
}
def receive: Receive = {
case p: Props =>
sender ! context.actorOf(p, "child")
}
}
代码清单 11-24 在 StepParent 的上下文中测试 Actor
class StepParentSpec extends WordSpec with Matchers with BeforeAndAfterAll {
implicit val system: ActorSystem = ActorSystem()
"An actor that throws an exception" must {
"Result in the supervisor returning a reference to that actor" in {
val testProbe = TestProbe()
val parent = system.actorOf(Props[StepParent], "stepParent")
parent.tell(Props[MyActor], testProbe.ref)
val child = testProbe.expectMsgType[ActorRef]
// ...
// Test whatever we want in the actor
}
}
override def afterAll(): Unit = {
val terminated = system.terminate()
Await.ready(terminated, Duration.Inf)
}
}
代码清单 11-25 将失败报告给指定的 Actor
class FailureParent(failures: ActorRef) extends Actor {
private val props: Props = Props[MyFailureParentActor]
private val child: ActorRef = context.actorOf(props, "child")
override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
case f => failures ! f; Stop
}
def receive: Receive = {
case msg => child.forward(msg)
}
}
代码清单 11-26 在测试中移除监督
case object TestFailureParentMessage
class FailureParentSpec extends WordSpec with Matchers with BeforeAndAfterAll {
implicit val system: ActorSystem = ActorSystem()
"Using a FailureParent" must {
"Result in failures being collected and returned" in {
val failures = TestProbe()
val failureParent = system.actorOf(Props(new FailureParent(failures.ref)))
failureParent ! TestFailureParentMessage
failures.expectMsgType[NullPointerException]
}
}
override def afterAll(): Unit = {
val terminated = system.terminate()
Await.ready(terminated, Duration.Inf)
}
}