第3章——行业工具

3.2 函数式编程

代码清单3-1 不安全的、可变的消息类,可能会隐含非预期的行为


import java.util.Date; public class Unsafe { private Date timestamp; private final StringBuffer message; public Unsafe(Date timestamp, StringBuffer message) { this.timestamp = timestamp; this.message = message; } public synchronized Date getTimestamp() { return timestamp; } public synchronized void setTimestamp(Date timestamp) { this.timestamp = timestamp; } public StringBuffer getMessage() { return message; } }

代码清单 3-2 不可变的消息类,其行为是可预知的,并易于推断


import java.util.Date; public class Immutable { private final Date timestamp; private final String message; public Immutable(final Date timestamp, final String message) { this.timestamp = new Date(timestamp.getTime()); this.message = message; } public Date getTimestamp() { return new Date(timestamp.getTime()); } public String getMessage() { return message; } }

Message.scala

import java.util.Date

final case class Message(timestamp: Date, message: String)

UsingStringBuffer.java

final StringBuffer original = new StringBuffer("foo");
final StringBuffer reversed = original.reverse();
System.out.println(String.format("original '%s', new value '%s'", original, reversed));

代码清单 3-3 引用透明性:允许代换预先计算好的值

public class Rooter {
  private final double value;
  private Double root = null;

  public Rooter(double value) {
    this.value = value;
  }

  public double getValue() {
    return value;
  }

  public double getRoot() {
    if (root == null) {
      root = Math.sqrt(value);
    }
    return root;
  }
}

代码清单3-4 因副作用而受限的可用性

public class SideEffecting implements Serializable, Cloneable {
  private int count;

  public SideEffecting(int start) {
    this.count = start;
  }

  public int next() {
    this.count += Math.incrementExact(this.count);
    return this.count;
  }
}

IntSeeding.java

final int next = se.next();
if (logger.isDebugEnabled()) {
  logger.debug("Next is " + se.next());
}
return next;

UsingMapFunction.java

final List<Integer> numbers = Arrays.asList(1, 2, 3);
final List<Integer> numbersPlusOne =
    numbers.stream().map(number -> number + 1).collect(Collectors.toList());

Python REPL


>>> def addOne(x): ... return x+1 ... >>> myFunction = addOne >>> myFunction(3) 4

3.4 对反应式设计的现有支持

sample.js

var http = require('http');

var counter = 0;

http.createServer(function (req, res) {
    counter++;
    res.writeHead(200, {'Content-Type': 'text/plain'});
    res.end('Sending response: ' + counter + ' via callback!\n');
}).listen(8888, '127.0.0.1');

console.log('Server up on 127.0.0.1:8888, send requests!');

sample.go

package main

import (
	"fmt"
	"time"
)

func main() {
	iterations := 10
	myChannel := make(chan int)

	go producer(myChannel, iterations)
	go consumer(myChannel, iterations)

	time.Sleep(500 * time.Millisecond)
}

func producer(myChannel chan int, iterations int) {
	for i := 1; i <= iterations; i++ {
		fmt.Println("Sending: ", i)
		myChannel <- i
	}
}

func consumer(myChannel chan int, iterations int) {
	for i := 1; i <= iterations; i++ {
		recVal := <-myChannel
		fmt.Println("Received: ", recVal)
	}
}

代码清单3-5 从更快的数据源获取结果

public class ParallelRetrievalExample {
  private final CacheRetriever cacheRetriever;
  private final DBRetriever dbRetriever;

  ParallelRetrievalExample(CacheRetriever cacheRetriever, DBRetriever dbRetriever) {
    this.cacheRetriever = cacheRetriever;
    this.dbRetriever = dbRetriever;
  }

  public Object retrieveCustomer(final long id) {
    final CompletableFuture<Object> cacheFuture =
        CompletableFuture.supplyAsync(() -> cacheRetriever.getCustomer(id));
    final CompletableFuture<Object> dbFuture =
        CompletableFuture.supplyAsync(() -> dbRetriever.getCustomer(id));

    return CompletableFuture.anyOf(cacheFuture, dbFuture);
  }
}

代码清单3-6 使用Scala编程语言将两个Future的结果组合为单一结果

// Provide the thread pool to be applied
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(ForkJoinPool.commonPool())

def getProductInventoryByPostalCode(productSku: Long, postalCode: String): Future[(Long, Map[String, Long])] = {

  // Define the futures so they can start doing their work
  val localInventoryFuture = Future {
    inventoryService.currentInventoryInWarehouse(productSku, postalCode)
  }
  val overallInventoryFutureByWarehouse = Future {
    inventoryService.currentInventoryOverallByWarehouse(productSku)
  }

  // Retrieve the values and return a future of the combined result
  for {
    local <- localInventoryFuture
    overall <- overallInventoryFutureByWarehouse
  } yield (local, overall)
}

StagedFuturesAsyncExample.scala

import scala.async.Async.{ async, await }
val resultFuture = async {
  val localInventoryFuture = async {
    inventoryService.currentInventoryInWarehouse(productSku, postalCode)
  }
  val overallInventoryFutureByWarehouse = async {
    inventoryService.currentInventoryOverallByWarehouse(productSku)
  }
  (await(localInventoryFuture), await(overallInventoryFutureByWarehouse))
}

RxJavaExample.java

package chapter03.rxjava;

import io.reactivex.Observable;

public class RxJavaExample {
  public void observe(String[] strings) {
    Observable.fromArray(strings).subscribe((s) -> System.out.println("Received " + s));
  }
}

RxJavaExampleDriver.java

package chapter03.rxjava;

public class RxJavaExampleDriver {
  private static final RxJavaExample RX_JAVA_EXAMPLE = new RxJavaExample();

  public static void main(String[] args) {
    String[] strings = {"a", "b", "c"};
    RX_JAVA_EXAMPLE.observe(strings);
  }
}

代码清单3-7 一个使用 Akka 的Actor的例子

package chapter03.actor

import akka.actor.SupervisorStrategy.Restart
import akka.actor._
import akka.event.LoggingReceive

case object Start

final case class CounterMessage(counterValue: Int)

final case class CounterTooLargeException(message: String) extends Exception(message)

class SupervisorActor extends Actor with ActorLogging {
  override val supervisorStrategy: OneForOneStrategy = OneForOneStrategy() {
    case _: CounterTooLargeException => Restart
  }

  private val actor2 = context.actorOf(Props[SecondActor], "second-actor")
  private val actor1 = context.actorOf(Props(new FirstActor(actor2)), "first-actor")

  def receive: Receive = {
    case Start => actor1 ! Start
  }
}

class AbstractCounterActor extends Actor with ActorLogging {
  protected var counterValue = 0

  def receive: Receive = {
    case _ =>
  }

  def counterReceive: Receive = LoggingReceive {
    case CounterMessage(i) if i <= 1000 =>
      counterValue = i
      log.info(s"Counter value: $counterValue")
      sender ! CounterMessage(counterValue + 1)
    case CounterMessage(_) =>
      throw CounterTooLargeException("Exceeded max value of counter!")
  }

  override def postRestart(reason: Throwable): Unit = {
    context.parent ! Start
  }
}

class FirstActor(secondActor: ActorRef) extends AbstractCounterActor {
  override def receive = LoggingReceive {
    case Start =>
      context.become(counterReceive)
      log.info("Starting counter passing.")
      secondActor ! CounterMessage(counterValue + 1)
  }
}

class SecondActor() extends AbstractCounterActor {
  override def receive: Receive = counterReceive
}

object Example extends App {
  val system = ActorSystem("counter-supervision-example")
  val supervisor = system.actorOf(Props[SupervisorActor])
  supervisor ! Start
}