第14章——资源管理模式

大多数系统都面临着一个共同的问题,那就是你需要管理或展现资源:文件存储空间、计算能力、对数据库或 Web API 的访问、如打印机和读卡器等物理设备,诸如此类。你所创建的某个组件,可能会独自为系统的其余部分提供某种资源,又或者你可能需要将其与外部资源整合。在这一章中,我们将讨论处理反应式应用程序中资源的模式。我们将着重讨论以下几种模式:

14.1 资源封装模式

代码清单 14-1 亚马逊EC2实例作为工作者节点

public Instance startInstance(final AWSCredentials credentials) {
  final AmazonEC2 amazonEC2Client =
      AmazonEC2ClientBuilder.standard()
          .withCredentials(new AWSStaticCredentialsProvider(credentials))
          .build();

  final RunInstancesRequest runInstancesRequest =
      new RunInstancesRequest()
          .withImageId("")
          .withInstanceType("m1.small")
          .withMinCount(1)
          .withMaxCount(1);

  final RunInstancesResult runInstancesResult = amazonEC2Client.runInstances(runInstancesRequest);

  final Reservation reservation = runInstancesResult.getReservation();
  final List<Instance> instances = reservation.getInstances();

  // there will be exactly one INSTANCE in this list, otherwise
  // runInstances() would have thrown an exception
  return instances.get(0);
}

代码清单 14-2 将EC2节点提升到一个Future中,从而简化失败处理过程

private ExecutionContext executionContext; // value from somewhere
private CircuitBreaker circuitBreaker; // value from somewhere

public Future<Instance> startInstanceAsync(AWSCredentials credentials) {
  final Future<Instance> f =
      circuitBreaker.callWithCircuitBreaker(
          () -> Futures.future(() -> startInstance(credentials), executionContext));

  final PartialFunction<Throwable, Future<Instance>> recovery =
      new PFBuilder<Throwable, Future<Instance>>()
          .match(
              AmazonClientException.class,
              AmazonClientException::isRetryable,
              ex -> startInstanceAsync(credentials))
          .build();

  return f.recoverWith(recovery, executionContext);
}

代码清单 14-3 通过桥接客户端代码执行亚马逊的异步客户端

public Future<RunInstancesResult> runInstancesAsync(
    final RunInstancesRequest request, final AmazonEC2Async client) {

  final Promise<RunInstancesResult> promise = Futures.promise();
  client.runInstancesAsync(
      request,
      new AsyncHandler<RunInstancesRequest, RunInstancesResult>() {

        @Override
        public void onSuccess(RunInstancesRequest request, RunInstancesResult result) {
          promise.success(result);
        }

        @Override
        public void onError(Exception exception) {
          promise.failure(exception);
        }
      });
  return promise.future();
}

代码清单 14-4 关闭 EC2 实例

public Future<TerminateInstancesResult> terminateInstancesAsync(
    final AmazonEC2Client client, final Instance... instances) {

  final List<String> ids =
      Arrays.stream(instances).map(Instance::getInstanceId).collect(Collectors.toList());
  final TerminateInstancesRequest request = new TerminateInstancesRequest(ids);

  final Future<TerminateInstancesResult> f =
      circuitBreaker.callWithCircuitBreaker(
          () -> Futures.future(() -> client.terminateInstances(request), executionContext));

  final PartialFunction<Throwable, Future<TerminateInstancesResult>> recovery =
      new PFBuilder<Throwable, Future<TerminateInstancesResult>>()
          .match(
              AmazonClientException.class,
              AmazonClientException::isRetryable,
              ex -> terminateInstancesAsync(client, instances))
          .build();

  return f.recoverWith(recovery, executionContext);
}

代码清单 14-5 执行组件和看作工作者节点的 Actor 通信

class WorkerNode extends AbstractActor {
  private final Cancellable checkTimer;

  public WorkerNode(final InetAddress address, final Duration checkInterval) {
    checkTimer =
        getContext()
            .getSystem()
            .getScheduler()
            .schedule(
                checkInterval,
                checkInterval,
                self(),
                DoHealthCheck.INSTANCE,
                getContext().dispatcher(),
                self());
  }

  @Override
  public Receive createReceive() {
    final List<WorkerNodeMessage> msgs = new ArrayList<>();
    return receiveBuilder()
        .match(WorkerNodeMessage.class, msgs::add)
        .match(
            DoHealthCheck.class,
            dhc -> {
              /* perform check */
            })
        .match(
            Shutdown.class,
            s -> {
              msgs.forEach(
                  msg ->
                      msg.replyTo()
                          .tell(new WorkerCommandFailed("shutting down", msg.id()), self()));
              /* ask Resource Pool to shut down this INSTANCE */
            })
        .match(
            WorkerNodeReady.class,
            wnr -> {
              /* send msgs to the worker */
              getContext().become(initialized());
            })
        .build();
  }

  private Receive initialized() {
    /* forward commands and deal with responses from worker node */
    // ...
    return null;
  }

  @Override
  public void postStop() {
    checkTimer.cancel();
  }
}

14.2 资源借贷模式

代码清单 14-6 分离资源和任务的管理

class WorkerNodeForExecution extends AbstractActor {

  @Override
  public Receive createReceive() {
    List<WorkerNodeMessage> msgs = new ArrayList<>();
    return receiveBuilder()
        .match(WorkerNodeMessage.class, msgs::add)
        .match(
            Shutdown.class,
            s -> {
              msgs.forEach(
                  msg -> {
                    WorkerCommandFailed failMsg =
                        new WorkerCommandFailed("shutting down", msg.id());
                    msg.replyTo().tell(failMsg, self());
                  });
            })
        .match(WorkerNodeReady.class, wnr -> getContext().become(initialized()))
        .build();
  }

  private Receive initialized() {
    /* forward commands and deal with responses from worker node */
    // ...
    return null;
  }
}

class WorkNodeForResourcePool extends AbstractActor {
  private final Cancellable checkTimer;

  public WorkNodeForResourcePool(InetAddress address, FiniteDuration checkInterval) {
    checkTimer =
        getContext()
            .system()
            .scheduler()
            .schedule(
                checkInterval,
                checkInterval,
                self(),
                DoHealthCheck.INSTANCE,
                getContext().dispatcher(),
                self());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            DoHealthCheck.class,
            dhc -> {
              /* perform check */
            })
        .match(
            Shutdown.class,
            s -> {
              /* Cleans up this resource */
            })
        .build();
  }

  @Override
  public void postStop() {
    checkTimer.cancel();
  }
}

14.3 复杂命令模式

代码清单 14-7 批处理作业的基本构成

public interface ProcessingLogic {
  PartialResult process(Stream<DataElement> input);
}

//
public interface MergeLogic {
  Result merge(Collection<PartialResult> partialResults);
}

public class BatchJob {
  public final String dataSelector;
  public final ProcessingLogic processingLogic;
  public final MergeLogic mergeLogic;

  public BatchJob(String dataSelector, ProcessingLogic processingLogic, MergeLogic mergeLogic) {
    this.dataSelector = dataSelector;
    this.processingLogic = processingLogic;
    this.mergeLogic = mergeLogic;
  }

  public BatchJob withDataSelector(String selector) {
    return new BatchJob(selector, processingLogic, mergeLogic);
  }
}

代码清单 14-8 通过调用Nashorn JavaScript脚本引擎执行处理逻辑

public class PartSuccess implements PartialResult {
  public final int value;

  public PartSuccess(int value) {
    this.value = value;
  }

  @Override
  public String toString() {
    return "ResultSuccess(" + value + ")";
  }
}

public class PartFailure implements PartialResult {
  public final Throwable failure;

  public PartFailure(Throwable failure) {
    this.failure = failure;
  }

  @Override
  public String toString() {
    return "ResultFailure(" + failure.getMessage() + ")";
  }
}

public class BatchJobJS {
  public final String dataSelector;
  public final String processingLogic;
  public final String mergeLogic;

  public BatchJobJS(String dataSelector, String processingLogic, String mergeLogic) {
    this.dataSelector = dataSelector;
    this.processingLogic = processingLogic;
    this.mergeLogic = mergeLogic;
  }

  public BatchJobJS withDataSelector(String selector) {
    return new BatchJobJS(selector, processingLogic, mergeLogic);
  }
}

public class WorkerJS {
  private static final ScriptEngine ENGINE = new ScriptEngineManager().getEngineByName("nashorn");

  public PartialResult runJob(final BatchJobJS job) {
    final Invocable invocable = (Invocable) ENGINE;

    try {
      ENGINE.eval(job.processingLogic);
      final Stream<DataElement> input = provideData(job.dataSelector);
      final PartialResult result = (PartialResult) invocable.invokeFunction("process", input);
      return result;
    } catch (Exception e) {
      return new PartFailure(e);
    }
  }

  private Stream<DataElement> provideData(String selector) {
    /* fetch data from persistent storage in streaming fashion */
    return Stream.of(1, 2, 3).map(DataElement::new);
  }
}

job.js

var PartSuccess = Java.type(
    'chapter14.ComplexCommand.PartSuccess');

var process = function (input) {
    // 'input' is a Java 8 Stream
    var value = input.count();
    return new PartSuccess(value);
}

代码清单 14-9 外部 DSL 使用了不同于宿主编程语言的语法

FOREACH Car (_, _, year, price)
SELECT year ? 1950 && year < 1960
MEDIAN OF price
REMEMBER AS p

FOREACH Car (make, model, _, price)
SELECT price > p
DISTINCT VALUES OF (make, model)
RETURN AS RESULT

代码清单 14-10 内部 DSL

public static void akkaStreamDSL() {
  final RunnableGraph<CompletionStage<Long>> p =
      Source.<DataElement>empty()
          .filter(new InRange("year", 1950, 1960))
          .toMat(Sink.fold(0L, new Median<>("price")), Keep.right());

  Source.<DataElement>empty()
      .map(new Inject<>(p, "p"))
      .filter(new Filter("price > p"))
      .to(
          Sink.fold(
              Collections.emptySet(), new DistinctValues<Pair<String, String>>("make", "model")));
}

14.5 托管阻塞模式

代码清单 14-11 维护一个私有的ExecutorService

public enum AccessRights {
  READ_JOB_STATUS,
  SUBMIT_JOB;

  public static final AccessRights[] EMPTY = new AccessRights[] {};
}

public class CheckAccess {
  public final String username;
  public final String credentials;
  public final AccessRights[] rights;
  public final ActorRef replyTo;

  public CheckAccess(
      String username, String credentials, AccessRights[] rights, ActorRef replyTo) {
    this.username = username;
    this.credentials = credentials;
    this.rights = rights;
    this.replyTo = replyTo;
  }
}

public class CheckAccessResult {
  public final String username;
  public final String credentials;
  public final AccessRights[] rights;

  public CheckAccessResult(CheckAccess ca, AccessRights[] rights) {
    this.username = ca.username;
    this.credentials = ca.credentials;
    this.rights = rights;
  }
}

public class AccessService extends AbstractActor {
  private final ExecutorService pool;
  private final DataSource db;

  public AccessService(DataSource db, int poolSize, int queueSize) {
    this.db = db;
    pool =
        new ThreadPoolExecutor(
            0,
            poolSize,
            60,
            SECONDS,
            new LinkedBlockingDeque<>(queueSize),
            new NamedPoolThreadFactory("ManagedBlocking-" + getSelf().path().name(), true));
  }

  @Override
  public Receive createReceive() {
    final ActorRef self = self();
    return ReceiveBuilder.create()
        .match(
            CheckAccess.class,
            ca -> {
              try {
                pool.execute(() -> checkAccess(db, ca, self));
              } catch (RejectedExecutionException e) {
                ca.replyTo.tell(new CheckAccessResult(ca, AccessRights.EMPTY), self);
              }
            })
        .build();
  }

  @Override
  public void postStop() {
    pool.shutdownNow();
  }

  private static void checkAccess(DataSource db, CheckAccess ca, ActorRef self) {
    try (Connection conn = db.getConnection()) {
      final ResultSet result = conn.createStatement().executeQuery("<get access rights>");
      final List<AccessRights> rights = new LinkedList<>();
      while (result.next()) {
        rights.add(AccessRights.valueOf(result.getString(0)));
      }
      ca.replyTo.tell(new CheckAccessResult(ca, rights.toArray(AccessRights.EMPTY)), self);
    } catch (Exception e) {
      ca.replyTo.tell(new CheckAccessResult(ca, AccessRights.EMPTY), self);
    }
  }
}