第3章——行业工具
3.2 函数式编程
代码清单3-1 不安全的、可变的消息类,可能会隐含非预期的行为
import java.util.Date;
public class Unsafe {
private Date timestamp;
private final StringBuffer message;
public Unsafe(Date timestamp, StringBuffer message) {
this.timestamp = timestamp;
this.message = message;
}
public synchronized Date getTimestamp() {
return timestamp;
}
public synchronized void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public StringBuffer getMessage() {
return message;
}
}
代码清单 3-2 不可变的消息类,其行为是可预知的,并易于推断
import java.util.Date;
public class Immutable {
private final Date timestamp;
private final String message;
public Immutable(final Date timestamp, final String message) {
this.timestamp = new Date(timestamp.getTime());
this.message = message;
}
public Date getTimestamp() {
return new Date(timestamp.getTime());
}
public String getMessage() {
return message;
}
}
Message.scala
import java.util.Date
final case class Message(timestamp: Date, message: String)
UsingStringBuffer.java
final StringBuffer original = new StringBuffer("foo");
final StringBuffer reversed = original.reverse();
System.out.println(String.format("original '%s', new value '%s'", original, reversed));
代码清单 3-3 引用透明性:允许代换预先计算好的值
public class Rooter {
private final double value;
private Double root = null;
public Rooter(double value) {
this.value = value;
}
public double getValue() {
return value;
}
public double getRoot() {
if (root == null) {
root = Math.sqrt(value);
}
return root;
}
}
代码清单3-4 因副作用而受限的可用性
public class SideEffecting implements Serializable, Cloneable {
private int count;
public SideEffecting(int start) {
this.count = start;
}
public int next() {
this.count += Math.incrementExact(this.count);
return this.count;
}
}
IntSeeding.java
final int next = se.next();
if (logger.isDebugEnabled()) {
logger.debug("Next is " + se.next());
}
return next;
UsingMapFunction.java
final List<Integer> numbers = Arrays.asList(1, 2, 3);
final List<Integer> numbersPlusOne =
numbers.stream().map(number -> number + 1).collect(Collectors.toList());
Python REPL
>>> def addOne(x):
... return x+1
...
>>> myFunction = addOne
>>> myFunction(3)
4
3.4 对反应式设计的现有支持
sample.js
var http = require('http');
var counter = 0;
http.createServer(function (req, res) {
counter++;
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Sending response: ' + counter + ' via callback!\n');
}).listen(8888, '127.0.0.1');
console.log('Server up on 127.0.0.1:8888, send requests!');
sample.go
package main
import (
"fmt"
"time"
)
func main() {
iterations := 10
myChannel := make(chan int)
go producer(myChannel, iterations)
go consumer(myChannel, iterations)
time.Sleep(500 * time.Millisecond)
}
func producer(myChannel chan int, iterations int) {
for i := 1; i <= iterations; i++ {
fmt.Println("Sending: ", i)
myChannel <- i
}
}
func consumer(myChannel chan int, iterations int) {
for i := 1; i <= iterations; i++ {
recVal := <-myChannel
fmt.Println("Received: ", recVal)
}
}
代码清单3-5 从更快的数据源获取结果
public class ParallelRetrievalExample {
private final CacheRetriever cacheRetriever;
private final DBRetriever dbRetriever;
ParallelRetrievalExample(CacheRetriever cacheRetriever, DBRetriever dbRetriever) {
this.cacheRetriever = cacheRetriever;
this.dbRetriever = dbRetriever;
}
public Object retrieveCustomer(final long id) {
final CompletableFuture<Object> cacheFuture =
CompletableFuture.supplyAsync(() -> cacheRetriever.getCustomer(id));
final CompletableFuture<Object> dbFuture =
CompletableFuture.supplyAsync(() -> dbRetriever.getCustomer(id));
return CompletableFuture.anyOf(cacheFuture, dbFuture);
}
}
代码清单3-6 使用Scala编程语言将两个Future的结果组合为单一结果
// Provide the thread pool to be applied
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(ForkJoinPool.commonPool())
def getProductInventoryByPostalCode(productSku: Long, postalCode: String): Future[(Long, Map[String, Long])] = {
// Define the futures so they can start doing their work
val localInventoryFuture = Future {
inventoryService.currentInventoryInWarehouse(productSku, postalCode)
}
val overallInventoryFutureByWarehouse = Future {
inventoryService.currentInventoryOverallByWarehouse(productSku)
}
// Retrieve the values and return a future of the combined result
for {
local <- localInventoryFuture
overall <- overallInventoryFutureByWarehouse
} yield (local, overall)
}
StagedFuturesAsyncExample.scala
import scala.async.Async.{ async, await }
val resultFuture = async {
val localInventoryFuture = async {
inventoryService.currentInventoryInWarehouse(productSku, postalCode)
}
val overallInventoryFutureByWarehouse = async {
inventoryService.currentInventoryOverallByWarehouse(productSku)
}
(await(localInventoryFuture), await(overallInventoryFutureByWarehouse))
}
RxJavaExample.java
package chapter03.rxjava;
import io.reactivex.Observable;
public class RxJavaExample {
public void observe(String[] strings) {
Observable.fromArray(strings).subscribe((s) -> System.out.println("Received " + s));
}
}
RxJavaExampleDriver.java
package chapter03.rxjava;
public class RxJavaExampleDriver {
private static final RxJavaExample RX_JAVA_EXAMPLE = new RxJavaExample();
public static void main(String[] args) {
String[] strings = {"a", "b", "c"};
RX_JAVA_EXAMPLE.observe(strings);
}
}
代码清单3-7 一个使用 Akka 的Actor的例子
package chapter03.actor
import akka.actor.SupervisorStrategy.Restart
import akka.actor._
import akka.event.LoggingReceive
case object Start
final case class CounterMessage(counterValue: Int)
final case class CounterTooLargeException(message: String) extends Exception(message)
class SupervisorActor extends Actor with ActorLogging {
override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
case _: CounterTooLargeException => Restart
}
private val actor2 = context.actorOf(Props[SecondActor], "second-actor")
private val actor1 = context.actorOf(Props(new FirstActor(actor2)), "first-actor")
def receive: Receive = {
case Start => actor1 ! Start
}
}
class AbstractCounterActor extends Actor with ActorLogging {
protected var counterValue = 0
def receive: Receive = {
case _ =>
}
def counterReceive: Receive = LoggingReceive {
case CounterMessage(i) if i <= 1000 =>
counterValue = i
log.info(s"Counter value: $counterValue")
sender ! CounterMessage(counterValue + 1)
case CounterMessage(_) =>
throw CounterTooLargeException("Exceeded max value of counter!")
}
override def postRestart(reason: Throwable): Unit = {
context.parent ! Start
}
}
class FirstActor(secondActor: ActorRef) extends AbstractCounterActor {
override def receive = LoggingReceive {
case Start =>
context.become(counterReceive)
log.info("Starting counter passing.")
secondActor ! CounterMessage(counterValue + 1)
}
}
class SecondActor() extends AbstractCounterActor {
override def receive: Receive = counterReceive
}
object Example extends App {
val system = ActorSystem("counter-supervision-example")
val supervisor = system.actorOf(Props[SupervisorActor])
supervisor ! Start
}
1.0.0