第15章——消息流模式

在这一章中,我们将探讨一些存在于反应式组件之间最基本的通信模式:具体而言,我们将讨论消息是如何在它们之间流动的。在第10章中,我们讨论了理论背景,提到了系统中的通信路径设计对于系统的成功来说至关重要——无论是对现实世界的组织,还是反应式应用程序。

15.1 请求——响应模式

代码清单15-1 服务器向发起请求的地址发送响应

public class Server {
  static final int SERVER_PORT = 8888;

  public static void main(String[] args) throws IOException {
    // bind a socket for receiving packets
    try (final DatagramSocket socket = new DatagramSocket(SERVER_PORT)) {

      // receive one packet
      final byte[] buffer = new byte[1500];
      final DatagramPacket packet1 = new DatagramPacket(buffer, buffer.length);
      socket.receive(packet1);

      final SocketAddress sender = packet1.getSocketAddress();
      System.out.println("server: received " + new String(packet1.getData()));
      System.out.println("server: sender was " + sender);

      // send response back
      final byte[] response = "got it!".getBytes();
      final DatagramPacket packet2 = new DatagramPacket(response, response.length, sender);
      socket.send(packet2);
    }
  }
}

代码清单15-2 客户端发送一个请求,然后阻塞直到接收到服务器的响应

public class Client {
  public static void main(String[] args) throws IOException {
    // get local socket with random port
    try (final DatagramSocket socket = new DatagramSocket()) {

      // send message to server
      final byte[] request = "hello".getBytes();
      final SocketAddress serverAddress = new InetSocketAddress("localhost", SERVER_PORT);
      final DatagramPacket packet1 = new DatagramPacket(request, request.length, serverAddress);
      socket.send(packet1);

      // receive one packet
      final byte[] buffer = new byte[1500];
      final DatagramPacket packet2 = new DatagramPacket(buffer, buffer.length);
      socket.receive(packet2);

      final SocketAddress sender = packet2.getSocketAddress();
      System.out.println("client: received " + new String(packet2.getData()));
      System.out.println("client: sender was " + sender);
    }
  }
}

Server 的输出结果

server: received hello
server: sender was /127.0.0.1:55589

Client 的可能输出结果

client: received got it!
client: sender was /127.0.0.1:8888

HTTP Request Header

GET /request?msg=hello HTTP/1.1
Host: client-interface.our.application.domain
Accept: application/json

HTTP Response Header

HTTP/1.1 200 OK
...
Content-Type: application/json
Content-Length: 22
{"response":"got it!"}

代码清单15-3 使用Akka UntypedActor 来建模请求——响应模式

object RequestResponseActors {

  final case class Request(msg: String)

  final case class Response(msg: String)

  class Responder extends Actor {
    def receive: Receive = {
      case Request(msg) =>
        println(s"got request: $msg")
        sender() ! Response("got it!")
    }
  }

  class Requester(responder: ActorRef) extends Actor {
    responder ! Request("hello")

    def receive: Receive = {
      case Response(msg) =>
        println(s"got response: $msg")
        context.system.terminate()
    }
  }

  def main(args: Array[String]): Unit = {
    val sys = ActorSystem("ReqRes")
    val responder = sys.actorOf(Props[Responder], "responder")
    val requester = sys.actorOf(Props(new Requester(responder)), "requester")
  }

}

运行结果

got request: hello
got response: got it!

代码清单15-4 在请求消息中显式地包含响应地址

object RequestResponseTypedActors {

  final case class Request(msg: String, replyTo: ActorRef[Response])

  final case class Response(msg: String)

  val responder: Behavior[Request] =
    Static {
      case Request(msg, replyTo) =>
        println(s"got request: $msg")
        replyTo ! Response("got it!")
    }

  def requester(responder: ActorRef[Request]): Behavior[Response] =
    SelfAware { self =>
      responder ! Request("hello", self)
      Total {
        case Response(msg) =>
          println(s"got response: $msg")
          Stopped
      }
    }

  def main(args: Array[String]): Unit = {
    ActorSystem("ReqResTyped", ContextAware[Unit] { ctx =>
      val res = ctx.spawn(responder, "responder")
      val req = ctx.watch(ctx.spawn(requester(res), "requester"))
      Full {
        case Sig(_, Terminated(`req`)) => Stopped
      }
    })
  }
}

代码清单15-5 基于单向消息传递的请求——响应模式

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function (err, conn) {
    conn.createChannel(function (err, ch) {
        var q = 'rpc_queue';
        ch.assertQueue(q, {durable: false});
        ch.prefetch(1);
        ch.consume(q, function reply(msg) {
            console.log("got request: %s", msg.content.toString());
            ch.sendToQueue(msg.properties.replyTo,
                new Buffer('got it!'),
                {correlationId: msg.properties.correlationId});
            ch.ack(msg);
        });
    });
});

代码清单15-6 监听与原始请求具有相同的关联ID的响应

var uuid = require('node-uuid');

amqp.connect('amqp://localhost', function (err, conn) {
    conn.createChannel(function (err, ch) {
        ch.assertQueue('responses', {}, function (err, q) {
            var corr = uuid.v1();
            ch.consume(q.queue, function (msg) {
                if (msg.properties.correlationId === corr) {
                    console.log('got response: %s', msg.content.toString());
                    setTimeout(function () {
                        conn.close();
                        process.exit(0)
                    }, 500);
                }
            }, {noAck: true});
            ch.sendToQueue('rpc_queue',
                new Buffer('hello'),
                {correlationId: corr, replyTo: q.queue});
        });
    });
});

15.2 消息自包含模式

SMTP


S: 220 mailhost.example.com ESMTP Postfix C: HELO alice-workstation.example.com S: 250 Hello alice-workstation.example.com C: MAIL FROM:<alice@example.com> S: 250 Ok C: RCPT TO:<bob@example.com> S: 250 Ok C: DATA S: 354 End data with <CR><LF>.<CR><LF> C: From: "Alice" <alice@example.com> C: To: "Bob" <bob@example.com> C: Date: Fri, 23 October 2015 10:34:12 +0200 C: Subject: lunch C: C: Hi Bob, C: C: sorry, I cannot make it, something else came up. C: C: Regards, Alice C: . S: 250 Ok, queued as 4567876345 C: QUIT S: 221 Bye

代码清单15-7 封装了多次 SMTP 交换所需要的信息

final case class SendEmail(
    sender: String,
    recipients: List[String],
    body: String,
    correlationID: UUID,
    replyTo: ActorRef[SendEmailResult])
final case class SendEmailResult(correlationID: UUID, status: StatusCode, explanation: Option[String]) extends Result

代码清单15-8 分离电子邮件主体,使得其可以按需投递

final case class SendEmail(
    sender: String,
    recipients: List[String],
    correlationID: UUID,
    replyTo: ActorRef[SendEmailResult])(body: Source[String])
    extends StreamedRequest {
  override def payload: Source[String] = body
}

代码清单15-9 允许邮件主体可以被接收者拉取

final case class SendEmail(
    sender: String,
    recipients: List[String],
    bodyLocation: URL,
    correlationID: UUID,
    replyTo: ActorRef[SendEmailResult])

15.3 询问模式

代码清单15-10 请求启动验证过程的简单协议

final case class StartVerificationProcess(userEmail: String, replyTo: ActorRef[VerificationProcessResponse])
    extends MyCommands

sealed trait VerificationProcessResponse

final case class VerificationProcessStarted(userEmail: String) extends VerificationProcessResponse

final case class VerificationProcessFailed(userEmail: String) extends VerificationProcessResponse

代码清单15-11 一个转发结果的匿名子Actor

def withChildActor(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
  ContextAware { ctx: ActorContext[StartVerificationProcess] =>
    val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)

    Static {
      case StartVerificationProcess(userEmail, replyTo) =>
        val corrID = UUID.randomUUID()
        val childActor = ctx.spawnAnonymous(FullTotal[Result] {
          case Sig(ctx, PreStart) =>
            ctx.setReceiveTimeout(5.seconds, ReceiveTimeout)
            Same
          case Msg(_, ReceiveTimeout) =>
            log.warning("verification process initiation timed out for {}", userEmail)
            replyTo ! VerificationProcessFailed(userEmail)
            Stopped
          case Msg(_, SendEmailResult(`corrID`, StatusCode.OK, _)) =>
            log.debug("successfully started the verification process for {}", userEmail)
            replyTo ! VerificationProcessStarted(userEmail)
            Stopped
          case Msg(_, SendEmailResult(`corrID`, StatusCode.Failed, explanation)) =>
            log.info("failed to start the verification process for {}: {}", userEmail, explanation)
            replyTo ! VerificationProcessFailed(userEmail)
            Stopped
          case Msg(_, SendEmailResult(wrongID, _, _)) =>
            log.error("received wrong SendEmailResult for corrID {}", corrID)
            Same
        })
        val request =
          SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, childActor)
        emailGateway ! request
    }
  }

代码清单15-12 由询问模式所产生的Future,并进行了转换

def withAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
  ContextAware { ctx =>
    val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
    implicit val timeout: Timeout = Timeout(5.seconds)
    import ctx.executionContext
    implicit val scheduler: Scheduler = ctx.system.scheduler

    Static {
      case StartVerificationProcess(userEmail, replyTo) =>
        val corrID = UUID.randomUUID()
        val response: Future[SendEmailResult] =
          emailGateway ? (SendEmail(
            "verification@example.com",
            List(userEmail),
            constructBody(userEmail, corrID),
            corrID,
            _))
        response
          .map {
            case SendEmailResult(`corrID`, StatusCode.OK, _) =>
              log.debug("successfully started the verification process for {}", userEmail)
              VerificationProcessStarted(userEmail)
            case SendEmailResult(`corrID`, StatusCode.Failed, explanation) =>
              log.info("failed to start the verification process for {}: {}", userEmail, explanation)
              VerificationProcessFailed(userEmail)
            case SendEmailResult(wrongID, _, _) =>
              log.error("received wrong SendEmailResult for corrID {}", corrID)
              VerificationProcessFailed(userEmail)
          }
          .recover {
            case _: AskTimeoutException =>
              log.warning("verification process initiation timed out for {}", userEmail)
              VerificationProcessFailed(userEmail)
          }
          .foreach(result => replyTo ! result)
    }
  }

代码清单15-13 不使用内置支持实现询问模式

def withoutAskPattern(emailGateway: ActorRef[SendEmail]): Behavior[StartVerificationProcess] =
  ContextAware[MyCommands] { ctx =>
    val log = new BusLogging(ctx.system.eventStream, "VerificationProcessManager", getClass, ctx.system.logFilter)
    var statusMap = Map.empty[UUID, (String, ActorRef[VerificationProcessResponse])]
    val adapter = ctx.spawnAdapter((s: SendEmailResult) => MyEmailResult(s.correlationID, s.status, s.explanation))

    Static {
      case StartVerificationProcess(userEmail, replyTo) =>
        val corrID = UUID.randomUUID()
        val request =
          SendEmail("verification@example.com", List(userEmail), constructBody(userEmail, corrID), corrID, adapter)
        emailGateway ! request
        statusMap += corrID -> (userEmail, replyTo)
        ctx.schedule(5.seconds, ctx.self, MyEmailResult(corrID, StatusCode.Failed, Some("timeout")))
      case MyEmailResult(correlationID, status, explanation) =>
        statusMap.get(correlationID) match {
          case None =>
            log.error("received SendEmailResult for unknown correlation ID {}", correlationID)
          case Some((userEmail, replyTo)) =>
            status match {
              case StatusCode.OK =>
                log.debug("successfully started the verification process for {}", userEmail)
                replyTo ! VerificationProcessStarted(userEmail)
              case StatusCode.Failed =>
                log.info("failed to start the verification process for {}: {}", userEmail, explanation)
                replyTo ! VerificationProcessFailed(userEmail)
            }
            statusMap -= correlationID
        }
    }
  }.narrow[StartVerificationProcess]

15.5 聚合器模式

代码清单15-14 使用for推导式来聚合三个Future表达式的结果。

def futureFrontPage(
    themes: ActorRef[GetTheme],
    personalNews: ActorRef[GetPersonalNews],
    topNews: ActorRef[GetTopNews]): Behavior[GetFrontPage] =
  ContextAware { ctx =>
    import ctx.executionContext
    implicit val timeout: Timeout = Timeout(1.second)
    implicit val scheduler: Scheduler = ctx.system.scheduler

    Static {
      case GetFrontPage(user, replyTo) =>
        val cssFuture =
          (themes ? (GetTheme(user, _: ActorRef[ThemeResult]))).map(_.css).recover {
            case _: AskTimeoutException => "default.css"
          }
        val personalNewsFuture =
          (personalNews ? (GetPersonalNews(user, _: ActorRef[PersonalNewsResult]))).map(_.news).recover {
            case _: AskTimeoutException => Nil
          }
        val topNewsFuture =
          (topNews ? (GetTopNews(_: ActorRef[TopNewsResult]))).map(_.news).recover {
            case _: AskTimeoutException => Nil
          }
        for {
          css <- cssFuture
          personalNews <- personalNewsFuture
          topNews <- topNewsFuture
        } {
          val topSet = topNews.toSet
          val allNews = topNews ::: personalNews.filterNot(topSet.contains)
          replyTo ! FrontPageResult(user, css, allNews)
        }
    }
  }

代码清单15-15 使用子Actor替代Future组合子的使用

private def pf(p: PartialFunction[AnyRef, Unit]): p.type = p

def frontPage(
    themes: ActorRef[GetTheme],
    personalNews: ActorRef[GetPersonalNews],
    topNews: ActorRef[GetTopNews]): Behavior[GetFrontPage] =
  ContextAware { ctx =>
    Static {
      case GetFrontPage(user, replyTo) =>
        val childRef = ctx.spawnAnonymous(Deferred { () =>
          val builder = new FrontPageResultBuilder(user)
          Partial[AnyRef](pf {
            case ThemeResult(css)         => builder.addCSS(css)
            case PersonalNewsResult(news) => builder.addPersonalNews(news)
            case TopNewsResult(news)      => builder.addTopNews(news)
            case ReceiveTimeout           => builder.timeout()
          }.andThen { _ =>
            if (builder.isComplete) {
              replyTo ! builder.result
              Stopped
            } else Same
          })
        })
        themes ! GetTheme(user, childRef)
        personalNews ! GetPersonalNews(user, childRef)
        topNews ! GetTopNews(childRef)
        ctx.schedule(1.second, childRef, ReceiveTimeout)
    }
  }

代码清单15-16 使用一个构建器来更加直接地表达领域模型

class FrontPageResultBuilder(user: String) {
  private var css: Option[String] = None
  private var personalNews: Option[List[String]] = None
  private var topNews: Option[List[String]] = None

  def addCSS(css: String): Unit = this.css = Option(css)

  def addPersonalNews(news: List[String]): Unit =
    this.personalNews = Option(news)

  def addTopNews(news: List[String]): Unit =
    this.topNews = Option(news)

  def timeout(): Unit = {
    if (css.isEmpty) css = Some("default.css")
    if (personalNews.isEmpty) personalNews = Some(Nil)
    if (topNews.isEmpty) topNews = Some(Nil)
  }

  def isComplete: Boolean =
    css.isDefined &&
    personalNews.isDefined && topNews.isDefined

  def result: FrontPageResult = {
    val topSet = topNews.get.toSet
    val allNews = topNews.get :::
      personalNews.get.filterNot(topSet.contains)
    FrontPageResult(user, css.get, allNews)
  }
}

代码清单15-17 添加第4个服务,降低了代码的可读性

val overrideFuture =
  (overrides ? (GetOverride(_: ActorRef[OverrideResult]))).recover {
    case _: AskTimeoutException => NoOverride
  }
for {
  css <- cssFuture
  personalNews <- personalNewsFuture
  topNews <- topNewsFuture
  ovr <- overrideFuture
} ovr match {
  case NoOverride =>
    val topSet = topNews.toSet
    val allNews = topNews ::: personalNews.filterNot(topSet.contains)
    replyTo ! FrontPageResult(user, css, allNews)
  case _ => // nothing to do here
}
for {
  ovr <- overrideFuture
} ovr match {
  case NoOverride => // nothing to do here
  case Override(css, news) =>
    replyTo ! FrontPageResult(user, css, news)
}

15.7 业务握手协议(又名可靠投递模式)

代码清单15-18 使用Actor实现上面的(信息)交换

final case class ChangeBudget(amount: BigDecimal, replyTo: ActorRef)

case object ChangeBudgetDone

final case class CannotChangeBudget(reason: String)

class Sam(alice: ActorRef, bob: ActorRef, amount: BigDecimal) extends Actor {
  def receive: Receive = talkToAlice()

  def talkToAlice(): Receive = {
    alice ! ChangeBudget(-amount, self)
    context.setReceiveTimeout(1.second)

    LoggingReceive {
      case ChangeBudgetDone           => context.become(talkToBob())
      case CannotChangeBudget(reason) => context.stop(self)
      case ReceiveTimeout             => alice ! ChangeBudget(-amount, self)
    }
  }

  def talkToBob(): Receive = {
    context.system.terminate()
    Actor.emptyBehavior
  }
}

class Alice extends Actor {
  private var budget: BigDecimal = 10
  private var alreadyDone: Set[ActorRef] = Set.empty

  def receive = LoggingReceive {
    case ChangeBudget(amount, replyTo) if alreadyDone(replyTo) =>
      replyTo ! ChangeBudgetDone
    case ChangeBudget(amount, replyTo) if amount + budget > 0 =>
      budget += amount
      alreadyDone += replyTo
      context.watch(replyTo)
      replyTo ! ChangeBudgetDone
    case ChangeBudget(_, replyTo) =>
      replyTo ! CannotChangeBudget("insufficient budget")
    case Terminated(saga) =>
      alreadyDone -= saga
  }
}

代码清单15-19 向预算消息添加持久性

final case class AliceConfirmedChange(deliveryId: Long)

final case class AliceDeniedChange(deliveryId: Long)

class PersistentSam(alice: ActorPath, bob: ActorPath, amount: BigDecimal, override val persistenceId: String)
    extends PersistentActor
    with AtLeastOnceDelivery
    with ActorLogging {

  def receiveCommand: Receive = Actor.emptyBehavior

  override def preStart(): Unit = {
    context.become(talkToAlice())
  }

  def talkToAlice(): Receive = {
    log.debug("talking to Alice")
    var deliveryId: Long = 0
    deliver(alice)(id => {
      deliveryId = id
      ChangeBudget(-amount, self, persistenceId)
    })

    LoggingReceive({
      case ChangeBudgetDone =>
        persist(AliceConfirmedChange(deliveryId)) { ev =>
          confirmDelivery(ev.deliveryId)
          context.become(talkToBob())
        }
      case CannotChangeBudget(reason) =>
        persist(AliceDeniedChange(deliveryId)) { ev =>
          confirmDelivery(ev.deliveryId)
          context.stop(self)
        }
    }: Receive)
  }

  def talkToBob(): Receive = {
    context.system.terminate()
    Actor.emptyBehavior
  }

  def receiveRecover = LoggingReceive {
    case AliceConfirmedChange(deliveryId) =>
      confirmDelivery(deliveryId)
      context.become(talkToBob())
    case AliceDeniedChange(deliveryId) =>
      confirmDelivery(deliveryId)
      context.stop(self)
  }
}

代码清单15-20 Alice Actor的持久化版本

final case class BudgetChanged(amount: BigDecimal, persistenceId: String)

case object CleanupDoneList

final case class ChangeDone(persistenceId: String)

class PersistentAlice extends PersistentActor with ActorLogging {
  def persistenceId: String = "Alice"

  private implicit val mat: ActorMaterializer = ActorMaterializer()

  import context.dispatcher

  private var alreadyDone: Set[String] = Set.empty
  private var budget: BigDecimal = 10

  private val cleanupTimer: Cancellable = context.system.scheduler.schedule(1.hour, 1.hour, self, CleanupDoneList)

  def receiveCommand = LoggingReceive {
    case ChangeBudget(amount, replyTo, id) if alreadyDone(id) =>
      replyTo ! ChangeBudgetDone
    case ChangeBudget(amount, replyTo, id) if amount + budget > 0 =>
      persist(BudgetChanged(amount, id)) { ev =>
        budget += ev.amount
        alreadyDone += ev.persistenceId
        replyTo ! ChangeBudgetDone
      }
    case ChangeBudget(_, replyTo, _) =>
      replyTo ! CannotChangeBudget("insufficient budget")
    case CleanupDoneList =>
      val journal = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      for (persistenceId <- alreadyDone) {
        val stream = journal.currentEventsByPersistenceId(persistenceId).map(_.event).collect {
          case AliceConfirmedChange(_) => ChangeDone(persistenceId)
        }
        stream.runWith(Sink.head).pipeTo(self)
      }
    case ChangeDone(id) =>
      persist(ChangeDone(id)) { ev =>
        alreadyDone -= ev.persistenceId
      }
  }

  def receiveRecover = LoggingReceive {
    case BudgetChanged(amount, id) =>
      budget += amount
      alreadyDone += id
    case ChangeDone(id) =>
      alreadyDone -= id
  }

  override def postStop(): Unit = {
    cleanupTimer.cancel()
  }
}