第13章——使用Actor编程
在编写复杂、耗时的应用程序时,我们经常会使用多线程以及并发来降低响应时间或者 提高性能。可惜,传统的并发解决方案导致了一些问题,如线程安全、竞态条件、死锁、活 锁以及不容易理解的、容易出错的代码。共享的可变性是罪魁祸首。
13.1 一个顺序耗时问题
ProgrammingActors/CountFilesSequential.scala
import java.io.File
def getChildren(file: File) = {
val children = file.listFiles()
if (children != null) children.toList else List()
}
val start = System.nanoTime
val exploreFrom = new File(args(0))
var count = 0L
var filesToVisit = List(exploreFrom)
while (filesToVisit.nonEmpty) {
val head = filesToVisit.head
filesToVisit = filesToVisit.tail
val children = getChildren(head)
count = count + children.count { !_.isDirectory }
filesToVisit = filesToVisit ::: children.filter { _.isDirectory }
}
val end = System.nanoTime
println(s"Number of files found: $count")
println(s"Time taken: ${(end - start) / 1.0e9} seconds")
执行命令
scala countFilesSequential.scala /Users/venkats/agility
运行结果
Number of files found: 479758
Time taken: 66.524453436 seconds
13.3 创建 Actor
ProgrammingActors/HollywoodActor.scala
import akka.actor._
class HollywoodActor() extends Actor {
def receive: Receive = {
case message => println(s"playing the role of $message")
}
}
执行命令——编译
scalac -d classes HollywoodActor.scala CreateActors.scala
执行命令——执行
scala -classpath classes CreateActors
ProgrammingActors/CreateActors.scala
import akka.actor._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object CreateActors extends App {
val system = ActorSystem("sample")
val depp = system.actorOf(Props[HollywoodActor])
depp ! "Wonka"
val terminateFuture = system.terminate()
Await.ready(terminateFuture, Duration.Inf)
}
运行结果
playing the role of Wonka
ProgrammingActors/HollywoodActor2.scala
case message => println(s"$message - ${Thread.currentThread}")
ProgrammingActors/CreateActors2.scala
val depp = system.actorOf(Props[HollywoodActor])
val hanks = system.actorOf(Props[HollywoodActor])
depp ! "Wonka"
hanks ! "Gump"
depp ! "Sparrow"
hanks ! "Phillips"
println(s"Calling from ${Thread.currentThread}")
运行结果
Wonka - Thread[sample-akka.actor.default-dispatcher-2,5,main]
Gump - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Calling from Thread[main,5,main]
Phillips - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Sparrow - Thread[sample-akka.actor.default-dispatcher-2,5,main]
13.4 Actor 和线程
ProgrammingActors/CreateActors3.scala
depp ! "Wonka"
hanks ! "Gump"
Thread.sleep(100)
depp ! "Sparrow"
hanks ! "Phillips"
运行结果
Wonka - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Gump - Thread[sample-akka.actor.default-dispatcher-4,5,main]
Sparrow - Thread[sample-akka.actor.default-dispatcher-4,5,main]
Phillips - Thread[sample-akka.actor.default-dispatcher-3,5,main]
Calling from Thread[main,5,main]
13.5 隔离可变性
ProgrammingActors/HollywoodActor4.scala
import akka.actor._
import scala.collection._
case class Play(role: String)
case class ReportCount(role: String)
class HollywoodActor() extends Actor {
val messagesCount: mutable.Map[String, Int] = mutable.Map()
def receive: Receive = {
case Play(role) =>
val currentCount = messagesCount.getOrElse(role, 0)
messagesCount.update(role, currentCount + 1)
println(s"Playing $role")
case ReportCount(role) =>
sender ! messagesCount.getOrElse(role, 0)
}
}
ProgrammingActors/UseActor.scala
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
object UseActor extends App {
case class Play(role: String)
case class ReportCount(role: String)
val system = ActorSystem("sample")
val depp = system.actorOf(Props[HollywoodActor])
val hanks = system.actorOf(Props[HollywoodActor])
depp ! Play("Wonka")
hanks ! Play("Gump")
depp ! Play("Wonka")
depp ! Play("Sparrow")
println("Sent roles to play")
implicit val timeout: Timeout = Timeout(2.seconds)
val wonkaFuture = depp ? ReportCount("Wonka")
val sparrowFuture = depp ? ReportCount("Sparrow")
val gumpFuture = hanks ? ReportCount("Gump")
val wonkaCount = Await.result(wonkaFuture, timeout.duration)
val sparrowCount = Await.result(sparrowFuture, timeout.duration)
val gumpCount = Await.result(gumpFuture, timeout.duration)
println(s"Depp played Wonka $wonkaCount time(s)")
println(s"Depp played Sparrow $sparrowCount time(s)")
println(s"Hanks played Gump $gumpCount time(s)")
val terminateFuture = system.terminate()
Await.ready(terminateFuture, Duration.Inf)
}
case class Play(role: String)
case class ReportCount(role: String)
运行结果
Sent roles to play
Playing Wonka
Playing Gump
Playing Wonka
Playing Sparrow
Depp played Wonka 2 time(s)
Depp played Sparrow 1 time(s)
Hanks played Gump 1 time(s)
13.6 使用 Actor 模型进行并发
ProgrammingActors/FileExplorer.scala
import akka.actor._
import java.io._
class FileExplorer extends Actor {
def receive: Receive = {
case dirName: String =>
val file = new File(dirName)
val children = file.listFiles()
var filesCount = 0
if (children != null) {
children.filter { _.isDirectory }.foreach { sender ! _.getAbsolutePath }
filesCount = children.count { !_.isDirectory }
}
sender ! filesCount
}
}
ProgrammingActors/FilesCounter.scala
import akka.actor._
import akka.routing._
class FilesCounter extends Actor {
val start: Long = System.nanoTime
var filesCount = 0L
var pending = 0
val fileExplorers: ActorRef =
context.actorOf(RoundRobinPool(100).props(Props[FileExplorer]))
def receive: Receive = {
case dirName: String =>
pending = pending + 1
fileExplorers ! dirName
case count: Int =>
filesCount = filesCount + count
pending = pending - 1
if (pending == 0) {
val end = System.nanoTime
println(s"Files count: $filesCount")
println(s"Time taken: ${(end - start) / 1.0e9} seconds")
context.system.terminate()
}
}
}
ProgrammingActors/CountFiles.scala
import akka.actor._
object CountFiles extends App {
val system = ActorSystem("sample")
val filesCounter = system.actorOf(Props[FilesCounter])
filesCounter ! args(0)
}
执行命令
scalac -d classes FilesCounter.scala FileExplorer.scala CountFiles.scala
scala -classpath classes CountFiles /Users/venkats/agility
运行结果
Files count: 479758
Time taken: 5.609851764 seconds
1.0.0