第15章——消息流模式
在这一章中,我们将探讨一些存在于反应式组件之间最基本的通信模式:具体而言,我们将讨论消息是如何在它们之间流动的。在第10章中,我们讨论了理论背景,提到了系统中的通信路径设计对于系统的成功来说至关重要——无论是对现实世界的组织,还是反应式应用程序。
15.1 请求——响应模式
代码清单15-1 服务器向发起请求的地址发送响应
public class Server {
static final int SERVER_PORT = 8888;
public static void main(String[] args) throws IOException {
// bind a socket for receiving packets
try (final DatagramSocket socket = new DatagramSocket(SERVER_PORT)) {
// receive one packet
final byte[] buffer = new byte[1500];
final DatagramPacket packet1 = new DatagramPacket(buffer, buffer.length);
socket.receive(packet1);
final SocketAddress sender = packet1.getSocketAddress();
System.out.println("server: received " + new String(packet1.getData()));
System.out.println("server: sender was " + sender);
// send response back
final byte[] response = "got it!".getBytes();
final DatagramPacket packet2 = new DatagramPacket(response, response.length, sender);
socket.send(packet2);
}
}
}
代码清单15-2 客户端发送一个请求,然后阻塞直到接收到服务器的响应
public class Client {
public static void main(String[] args) throws IOException {
// get local socket with random port
try (final DatagramSocket socket = new DatagramSocket()) {
// send message to server
final byte[] request = "hello".getBytes();
final SocketAddress serverAddress = new InetSocketAddress("localhost", SERVER_PORT);
final DatagramPacket packet1 = new DatagramPacket(request, request.length, serverAddress);
socket.send(packet1);
// receive one packet
final byte[] buffer = new byte[1500];
final DatagramPacket packet2 = new DatagramPacket(buffer, buffer.length);
socket.receive(packet2);
final SocketAddress sender = packet2.getSocketAddress();
System.out.println("client: received " + new String(packet2.getData()));
System.out.println("client: sender was " + sender);
}
}
}
Server 的输出结果
server: received hello
server: sender was /127.0.0.1:55589
Client 的可能输出结果
client: received got it!
client: sender was /127.0.0.1:8888
HTTP Request Header
GET /request?msg=hello HTTP/1.1
Host: client-interface.our.application.domain
Accept: application/json
HTTP Response Header
HTTP/1.1 200 OK
...
Content-Type: application/json
Content-Length: 22
{"response":"got it!"}
代码清单15-3 使用Akka UntypedActor 来建模请求——响应模式
object RequestResponseActors {
final case class Request(msg: String)
final case class Response(msg: String)
class Responder extends Actor {
def receive: Receive = {
case Request(msg) =>
println(s"got request: $msg")
sender() ! Response("got it!")
}
}
class Requester(responder: ActorRef) extends Actor {
responder ! Request("hello")
def receive: Receive = {
case Response(msg) =>
println(s"got response: $msg")
context.system.terminate()
}
}
def main(args: Array[String]): Unit = {
val sys = ActorSystem("ReqRes")
val responder = sys.actorOf(Props[Responder], "responder")
val requester = sys.actorOf(Props(new Requester(responder)), "requester")
}
}
运行结果
got request: hello
got response: got it!
代码清单15-4 在请求消息中显式地包含响应地址
object RequestResponseTypedActors {
final case class Request(msg: String, replyTo: ActorRef[Response])
final case class Response(msg: String)
val responder: Behavior[Request] =
Static {
case Request(msg, replyTo) =>
println(s"got request: $msg")
replyTo ! Response("got it!")
}
def requester(responder: ActorRef[Request]): Behavior[Response] =
SelfAware { self =>
responder ! Request("hello", self)
Total {
case Response(msg) =>
println(s"got response: $msg")
Stopped
}
}
def main(args: Array[String]): Unit = {
ActorSystem("ReqResTyped", ContextAware[Unit] { ctx =>
val res = ctx.spawn(responder, "responder")
val req = ctx.watch(ctx.spawn(requester(res), "requester"))
Full {
case Sig(_, Terminated(`req`)) => Stopped
}
})
}
}
代码清单15-5 基于单向消息传递的请求——响应模式
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
var q = 'rpc_queue';
ch.assertQueue(q, {durable: false});
ch.prefetch(1);
ch.consume(q, function reply(msg) {
console.log("got request: %s", msg.content.toString());
ch.sendToQueue(msg.properties.replyTo,
new Buffer('got it!'),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
});
});
});
代码清单15-6 监听与原始请求具有相同的关联ID的响应
var uuid = require('node-uuid');
amqp.connect('amqp://localhost', function (err, conn) {
conn.createChannel(function (err, ch) {
ch.assertQueue('responses', {}, function (err, q) {
var corr = uuid.v1();
ch.consume(q.queue, function (msg) {
if (msg.properties.correlationId === corr) {
console.log('got response: %s', msg.content.toString());
setTimeout(function () {
conn.close();
process.exit(0)
}, 500);
}
}, {noAck: true});
ch.sendToQueue('rpc_queue',
new Buffer('hello'),
{correlationId: corr, replyTo: q.queue});
});
});
});
15.2 消息自包含模式
SMTP
S: 220 mailhost.example.com ESMTP Postfix
C: HELO alice-workstation.example.com
S: 250 Hello alice-workstation.example.com
C: MAIL FROM:<alice@example.com>
S: 250 Ok
C: RCPT TO:<bob@example.com>
S: 250 Ok
C: DATA
S: 354 End data with <CR><LF>.<CR><LF>
C: From: "Alice" <alice@example.com>
C: To: "Bob" <bob@example.com>
C: Date: Fri, 23 October 2015 10:34:12 +0200
C: Subject: lunch
C:
C: Hi Bob,
C:
C: sorry, I cannot make it, something else came up.
C:
C: Regards, Alice
C: .
S: 250 Ok, queued as 4567876345
C: QUIT
S: 221 Bye
代码清单15-7 封装了多次 SMTP 交换所需要的信息
final case class SendEmail(
sender: String,
recipients: List[String],
body: String,
correlationID: UUID,
replyTo: ActorRef[SendEmailResult])
final case class SendEmailResult(correlationID: UUID, status: StatusCode, explanation: Option[String]) extends Result
代码清单15-8 分离电子邮件主体,使得其可以按需投递
final case class SendEmail(
sender: String,
recipients: List[String],
correlationID: UUID,
replyTo: ActorRef[SendEmailResult])(body: Source[String])
extends StreamedRequest {
override def payload: Source[String] = body
}
代码清单15-9 允许邮件主体可以被接收者拉取
final case class SendEmail(
sender: String,
recipients: List[String],
bodyLocation: URL,
correlationID: UUID,
replyTo: ActorRef[SendEmailResult])
15.3 询问模式
代码清单15-10 请求启动验证过程的简单协议
final case class StartVerificationProcess(userEmail: String, replyTo: ActorRef[VerificationProcessResponse])
extends MyCommands
sealed trait VerificationProcessResponse
final case class VerificationProcessStarted(userEmail: String) extends VerificationProcessResponse
final case class VerificationProcessFailed(userEmail: String) extends VerificationProcessResponse
代码清单15-11 一个转发结果的匿名子Actor
def withChildActor(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
ContextAware { ctx: ActorContext[StartVerificationProcess] =>
val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
Static {
case StartVerificationProcess(userEmail, replyTo) =>
val corrID = UUID.randomUUID()
val childActor = ctx.spawnAnonymous(FullTotal[Result] {
case Sig(ctx, PreStart) =>
ctx.setReceiveTimeout(5.seconds, ReceiveTimeout)
Same
case Msg(_, ReceiveTimeout) =>
log.warning("verification process initiation timed out for {}", userEmail)
replyTo ! VerificationProcessFailed(userEmail)
Stopped
case Msg(_, SendEmailResult(`corrID`, StatusCode.OK, _)) =>
log.debug("successfully started the verification process for {}", userEmail)
replyTo ! VerificationProcessStarted(userEmail)
Stopped
case Msg(_, SendEmailResult(`corrID`, StatusCode.Failed, explanation)) =>
log.info("failed to start the verification process for {}: {}", userEmail, explanation)
replyTo ! VerificationProcessFailed(userEmail)
Stopped
case Msg(_, SendEmailResult(wrongID, _, _)) =>
log.error("received wrong SendEmailResult for corrID {}", corrID)
Same
})
val request =
SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, childActor)
emailGateway ! request
}
}
代码清单15-12 由询问模式所产生的Future,并进行了转换
def withAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
ContextAware { ctx =>
val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
implicit val timeout: Timeout = Timeout(5.seconds)
import ctx.executionContext
implicit val scheduler: Scheduler = ctx.system.scheduler
Static {
case StartVerificationProcess(userEmail, replyTo) =>
val corrID = UUID.randomUUID()
val response: Future[SendEmailResult] =
emailGateway ? (SendEmail(
"verification@example.com",
List(userEmail),
constructBody(userEmail, corrID),
corrID,
_))
response
.map {
case SendEmailResult(`corrID`, StatusCode.OK, _) =>
log.debug("successfully started the verification process for {}", userEmail)
VerificationProcessStarted(userEmail)
case SendEmailResult(`corrID`, StatusCode.Failed, explanation) =>
log.info("failed to start the verification process for {}: {}", userEmail, explanation)
VerificationProcessFailed(userEmail)
case SendEmailResult(wrongID, _, _) =>
log.error("received wrong SendEmailResult for corrID {}", corrID)
VerificationProcessFailed(userEmail)
}
.recover {
case _: AskTimeoutException =>
log.warning("verification process initiation timed out for {}", userEmail)
VerificationProcessFailed(userEmail)
}
.foreach(result => replyTo ! result)
}
}
代码清单15-13 不使用内置支持实现询问模式
def withoutAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
ContextAware[MyCommands] { ctx =>
val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
var statusMap = Map.empty[UUID, (String, ActorRef[VerificationProcessResponse])]
val adapter = ctx.spawnAdapter((s: SendEmailResult) => MyEmailResult(s.correlationID, s.status, s.explanation))
Static {
case StartVerificationProcess(userEmail, replyTo) =>
val corrID = UUID.randomUUID()
val request =
SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, adapter)
emailGateway ! request
statusMap += corrID -> (userEmail, replyTo)
ctx.schedule(5.seconds, ctx.self, MyEmailResult(corrID, StatusCode.Failed, Some("timeout")))
case MyEmailResult(correlationID, status, explanation) =>
statusMap.get(correlationID) match {
case None =>
log.error("received SendEmailResult for unknown correlation ID {}", correlationID)
case Some((userEmail, replyTo)) =>
status match {
case StatusCode.OK =>
log.debug("successfully started the verification process for {}", userEmail)
replyTo ! VerificationProcessStarted(userEmail)
case StatusCode.Failed =>
log.info("failed to start the verification process for {}: {}", userEmail, explanation)
replyTo ! VerificationProcessFailed(userEmail)
}
statusMap -= correlationID
}
}
}.narrow[StartVerificationProcess]
15.5 聚合器模式
代码清单15-14 使用for推导式来聚合三个Future表达式的结果。
def futureFrontPage(
themes: ActorRef[GetTheme],
personalNews: ActorRef[GetPersonalNews],
topNews: ActorRef[GetTopNews]): Behavior[GetFrontPage] =
ContextAware { ctx =>
import ctx.executionContext
implicit val timeout: Timeout = Timeout(1.second)
implicit val scheduler: Scheduler = ctx.system.scheduler
Static {
case GetFrontPage(user, replyTo) =>
val cssFuture =
(themes ? (GetTheme(user, _: ActorRef[ThemeResult]))).map(_.css).recover {
case _: AskTimeoutException => "default.css"
}
val personalNewsFuture =
(personalNews ? (GetPersonalNews(user, _: ActorRef[PersonalNewsResult]))).map(_.news).recover {
case _: AskTimeoutException => Nil
}
val topNewsFuture =
(topNews ? (GetTopNews(_: ActorRef[TopNewsResult]))).map(_.news).recover {
case _: AskTimeoutException => Nil
}
for {
css <- cssFuture
personalNews <- personalNewsFuture
topNews <- topNewsFuture
} {
val topSet = topNews.toSet
val allNews = topNews ::: personalNews.filterNot(topSet.contains)
replyTo ! FrontPageResult(user, css, allNews)
}
}
}
代码清单15-15 使用子Actor替代Future组合子的使用
private def pf(p: PartialFunction[AnyRef, Unit]): p.type = p
def frontPage(
themes: ActorRef[GetTheme],
personalNews: ActorRef[GetPersonalNews],
topNews: ActorRef[GetTopNews]): Behavior[GetFrontPage] =
ContextAware { ctx =>
Static {
case GetFrontPage(user, replyTo) =>
val childRef = ctx.spawnAnonymous(Deferred { () =>
val builder = new FrontPageResultBuilder(user)
Partial[AnyRef](pf {
case ThemeResult(css) => builder.addCSS(css)
case PersonalNewsResult(news) => builder.addPersonalNews(news)
case TopNewsResult(news) => builder.addTopNews(news)
case ReceiveTimeout => builder.timeout()
}.andThen { _ =>
if (builder.isComplete) {
replyTo ! builder.result
Stopped
} else Same
})
})
themes ! GetTheme(user, childRef)
personalNews ! GetPersonalNews(user, childRef)
topNews ! GetTopNews(childRef)
ctx.schedule(1.second, childRef, ReceiveTimeout)
}
}
代码清单15-16 使用一个构建器来更加直接地表达领域模型
class FrontPageResultBuilder(user: String) {
private var css: Option[String] = None
private var personalNews: Option[List[String]] = None
private var topNews: Option[List[String]] = None
def addCSS(css: String): Unit = this.css = Option(css)
def addPersonalNews(news: List[String]): Unit =
this.personalNews = Option(news)
def addTopNews(news: List[String]): Unit =
this.topNews = Option(news)
def timeout(): Unit = {
if (css.isEmpty) css = Some("default.css")
if (personalNews.isEmpty) personalNews = Some(Nil)
if (topNews.isEmpty) topNews = Some(Nil)
}
def isComplete: Boolean =
css.isDefined &&
personalNews.isDefined && topNews.isDefined
def result: FrontPageResult = {
val topSet = topNews.get.toSet
val allNews = topNews.get :::
personalNews.get.filterNot(topSet.contains)
FrontPageResult(user, css.get, allNews)
}
}
代码清单15-17 添加第4个服务,降低了代码的可读性
val overrideFuture =
(overrides ? (GetOverride(_: ActorRef[OverrideResult]))).recover {
case _: AskTimeoutException => NoOverride
}
for {
css <- cssFuture
personalNews <- personalNewsFuture
topNews <- topNewsFuture
ovr <- overrideFuture
} ovr match {
case NoOverride =>
val topSet = topNews.toSet
val allNews = topNews ::: personalNews.filterNot(topSet.contains)
replyTo ! FrontPageResult(user, css, allNews)
case _ => // nothing to do here
}
for {
ovr <- overrideFuture
} ovr match {
case NoOverride => // nothing to do here
case Override(css, news) =>
replyTo ! FrontPageResult(user, css, news)
}
15.7 业务握手协议(又名可靠投递模式)
代码清单15-18 使用Actor实现上面的(信息)交换
final case class ChangeBudget(amount: BigDecimal, replyTo: ActorRef)
case object ChangeBudgetDone
final case class CannotChangeBudget(reason: String)
class Sam(alice: ActorRef, bob: ActorRef, amount: BigDecimal) extends Actor {
def receive: Receive = talkToAlice()
def talkToAlice(): Receive = {
alice ! ChangeBudget(-amount, self)
context.setReceiveTimeout(1.second)
LoggingReceive {
case ChangeBudgetDone => context.become(talkToBob())
case CannotChangeBudget(reason) => context.stop(self)
case ReceiveTimeout => alice ! ChangeBudget(-amount, self)
}
}
def talkToBob(): Receive = {
context.system.terminate()
Actor.emptyBehavior
}
}
class Alice extends Actor {
private var budget: BigDecimal = 10
private var alreadyDone: Set[ActorRef] = Set.empty
def receive = LoggingReceive {
case ChangeBudget(amount, replyTo) if alreadyDone(replyTo) =>
replyTo ! ChangeBudgetDone
case ChangeBudget(amount, replyTo) if amount + budget > 0 =>
budget += amount
alreadyDone += replyTo
context.watch(replyTo)
replyTo ! ChangeBudgetDone
case ChangeBudget(_, replyTo) =>
replyTo ! CannotChangeBudget("insufficient budget")
case Terminated(saga) =>
alreadyDone -= saga
}
}
代码清单15-19 向预算消息添加持久性
final case class AliceConfirmedChange(deliveryId: Long)
final case class AliceDeniedChange(deliveryId: Long)
class PersistentSam(alice: ActorPath, bob: ActorPath, amount: BigDecimal, override val persistenceId: String)
extends PersistentActor
with AtLeastOnceDelivery
with ActorLogging {
def receiveCommand: Receive = Actor.emptyBehavior
override def preStart(): Unit = {
context.become(talkToAlice())
}
def talkToAlice(): Receive = {
log.debug("talking to Alice")
var deliveryId: Long = 0
deliver(alice)(id => {
deliveryId = id
ChangeBudget(-amount, self, persistenceId)
})
LoggingReceive({
case ChangeBudgetDone =>
persist(AliceConfirmedChange(deliveryId)) { ev =>
confirmDelivery(ev.deliveryId)
context.become(talkToBob())
}
case CannotChangeBudget(reason) =>
persist(AliceDeniedChange(deliveryId)) { ev =>
confirmDelivery(ev.deliveryId)
context.stop(self)
}
}: Receive)
}
def talkToBob(): Receive = {
context.system.terminate()
Actor.emptyBehavior
}
def receiveRecover = LoggingReceive {
case AliceConfirmedChange(deliveryId) =>
confirmDelivery(deliveryId)
context.become(talkToBob())
case AliceDeniedChange(deliveryId) =>
confirmDelivery(deliveryId)
context.stop(self)
}
}
代码清单15-20 Alice Actor的持久化版本
final case class BudgetChanged(amount: BigDecimal, persistenceId: String)
case object CleanupDoneList
final case class ChangeDone(persistenceId: String)
class PersistentAlice extends PersistentActor with ActorLogging {
def persistenceId: String = "Alice"
private implicit val mat: ActorMaterializer = ActorMaterializer()
import context.dispatcher
private var alreadyDone: Set[String] = Set.empty
private var budget: BigDecimal = 10
private val cleanupTimer: Cancellable = context.system.scheduler.schedule(1.hour, 1.hour, self, CleanupDoneList)
def receiveCommand = LoggingReceive {
case ChangeBudget(amount, replyTo, id) if alreadyDone(id) =>
replyTo ! ChangeBudgetDone
case ChangeBudget(amount, replyTo, id) if amount + budget > 0 =>
persist(BudgetChanged(amount, id)) { ev =>
budget += ev.amount
alreadyDone += ev.persistenceId
replyTo ! ChangeBudgetDone
}
case ChangeBudget(_, replyTo, _) =>
replyTo ! CannotChangeBudget("insufficient budget")
case CleanupDoneList =>
val journal = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
for (persistenceId <- alreadyDone) {
val stream = journal.currentEventsByPersistenceId(persistenceId).map(_.event).collect {
case AliceConfirmedChange(_) => ChangeDone(persistenceId)
}
stream.runWith(Sink.head).pipeTo(self)
}
case ChangeDone(id) =>
persist(ChangeDone(id)) { ev =>
alreadyDone -= ev.persistenceId
}
}
def receiveRecover = LoggingReceive {
case BudgetChanged(amount, id) =>
budget += amount
alreadyDone += id
case ChangeDone(id) =>
alreadyDone -= id
}
override def postStop(): Unit = {
cleanupTimer.cancel()
}
}