第14章——资源管理模式
大多数系统都面临着一个共同的问题,那就是你需要管理或展现资源:文件存储空间、计算能力、对数据库或 Web API 的访问、如打印机和读卡器等物理设备,诸如此类。你所创建的某个组件,可能会独自为系统的其余部分提供某种资源,又或者你可能需要将其与外部资源整合。在这一章中,我们将讨论处理反应式应用程序中资源的模式。我们将着重讨论以下几种模式:
14.1 资源封装模式
代码清单 14-1 亚马逊EC2实例作为工作者节点
public Instance startInstance(final AWSCredentials credentials) {
final AmazonEC2 amazonEC2Client =
AmazonEC2ClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build();
final RunInstancesRequest runInstancesRequest =
new RunInstancesRequest()
.withImageId("")
.withInstanceType("m1.small")
.withMinCount(1)
.withMaxCount(1);
final RunInstancesResult runInstancesResult = amazonEC2Client.runInstances(runInstancesRequest);
final Reservation reservation = runInstancesResult.getReservation();
final List<Instance> instances = reservation.getInstances();
// there will be exactly one INSTANCE in this list, otherwise
// runInstances() would have thrown an exception
return instances.get(0);
}
代码清单 14-2 将EC2节点提升到一个Future中,从而简化失败处理过程
private ExecutionContext executionContext; // value from somewhere
private CircuitBreaker circuitBreaker; // value from somewhere
public Future<Instance> startInstanceAsync(AWSCredentials credentials) {
final Future<Instance> f =
circuitBreaker.callWithCircuitBreaker(
() -> Futures.future(() -> startInstance(credentials), executionContext));
final PartialFunction<Throwable, Future<Instance>> recovery =
new PFBuilder<Throwable, Future<Instance>>()
.match(
AmazonClientException.class,
AmazonClientException::isRetryable,
ex -> startInstanceAsync(credentials))
.build();
return f.recoverWith(recovery, executionContext);
}
代码清单 14-3 通过桥接客户端代码执行亚马逊的异步客户端
public Future<RunInstancesResult> runInstancesAsync(
final RunInstancesRequest request, final AmazonEC2Async client) {
final Promise<RunInstancesResult> promise = Futures.promise();
client.runInstancesAsync(
request,
new AsyncHandler<RunInstancesRequest, RunInstancesResult>() {
@Override
public void onSuccess(RunInstancesRequest request, RunInstancesResult result) {
promise.success(result);
}
@Override
public void onError(Exception exception) {
promise.failure(exception);
}
});
return promise.future();
}
代码清单 14-4 关闭 EC2 实例
public Future<TerminateInstancesResult> terminateInstancesAsync(
final AmazonEC2Client client, final Instance... instances) {
final List<String> ids =
Arrays.stream(instances).map(Instance::getInstanceId).collect(Collectors.toList());
final TerminateInstancesRequest request = new TerminateInstancesRequest(ids);
final Future<TerminateInstancesResult> f =
circuitBreaker.callWithCircuitBreaker(
() -> Futures.future(() -> client.terminateInstances(request), executionContext));
final PartialFunction<Throwable, Future<TerminateInstancesResult>> recovery =
new PFBuilder<Throwable, Future<TerminateInstancesResult>>()
.match(
AmazonClientException.class,
AmazonClientException::isRetryable,
ex -> terminateInstancesAsync(client, instances))
.build();
return f.recoverWith(recovery, executionContext);
}
代码清单 14-5 执行组件和看作工作者节点的 Actor 通信
class WorkerNode extends AbstractActor {
private final Cancellable checkTimer;
public WorkerNode(final InetAddress address, final Duration checkInterval) {
checkTimer =
getContext()
.getSystem()
.getScheduler()
.schedule(
checkInterval,
checkInterval,
self(),
DoHealthCheck.INSTANCE,
getContext().dispatcher(),
self());
}
@Override
public Receive createReceive() {
final List<WorkerNodeMessage> msgs = new ArrayList<>();
return receiveBuilder()
.match(WorkerNodeMessage.class, msgs::add)
.match(
DoHealthCheck.class,
dhc -> {
/* perform check */
})
.match(
Shutdown.class,
s -> {
msgs.forEach(
msg ->
msg.replyTo()
.tell(new WorkerCommandFailed("shutting down", msg.id()), self()));
/* ask Resource Pool to shut down this INSTANCE */
})
.match(
WorkerNodeReady.class,
wnr -> {
/* send msgs to the worker */
getContext().become(initialized());
})
.build();
}
private Receive initialized() {
/* forward commands and deal with responses from worker node */
// ...
return null;
}
@Override
public void postStop() {
checkTimer.cancel();
}
}
14.2 资源借贷模式
代码清单 14-6 分离资源和任务的管理
class WorkerNodeForExecution extends AbstractActor {
@Override
public Receive createReceive() {
List<WorkerNodeMessage> msgs = new ArrayList<>();
return receiveBuilder()
.match(WorkerNodeMessage.class, msgs::add)
.match(
Shutdown.class,
s -> {
msgs.forEach(
msg -> {
WorkerCommandFailed failMsg =
new WorkerCommandFailed("shutting down", msg.id());
msg.replyTo().tell(failMsg, self());
});
})
.match(WorkerNodeReady.class, wnr -> getContext().become(initialized()))
.build();
}
private Receive initialized() {
/* forward commands and deal with responses from worker node */
// ...
return null;
}
}
class WorkNodeForResourcePool extends AbstractActor {
private final Cancellable checkTimer;
public WorkNodeForResourcePool(InetAddress address, FiniteDuration checkInterval) {
checkTimer =
getContext()
.system()
.scheduler()
.schedule(
checkInterval,
checkInterval,
self(),
DoHealthCheck.INSTANCE,
getContext().dispatcher(),
self());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
DoHealthCheck.class,
dhc -> {
/* perform check */
})
.match(
Shutdown.class,
s -> {
/* Cleans up this resource */
})
.build();
}
@Override
public void postStop() {
checkTimer.cancel();
}
}
14.3 复杂命令模式
代码清单 14-7 批处理作业的基本构成
public interface ProcessingLogic {
PartialResult process(Stream<DataElement> input);
}
//
public interface MergeLogic {
Result merge(Collection<PartialResult> partialResults);
}
public class BatchJob {
public final String dataSelector;
public final ProcessingLogic processingLogic;
public final MergeLogic mergeLogic;
public BatchJob(String dataSelector, ProcessingLogic processingLogic, MergeLogic mergeLogic) {
this.dataSelector = dataSelector;
this.processingLogic = processingLogic;
this.mergeLogic = mergeLogic;
}
public BatchJob withDataSelector(String selector) {
return new BatchJob(selector, processingLogic, mergeLogic);
}
}
代码清单 14-8 通过调用Nashorn JavaScript脚本引擎执行处理逻辑
public class PartSuccess implements PartialResult {
public final int value;
public PartSuccess(int value) {
this.value = value;
}
@Override
public String toString() {
return "ResultSuccess(" + value + ")";
}
}
public class PartFailure implements PartialResult {
public final Throwable failure;
public PartFailure(Throwable failure) {
this.failure = failure;
}
@Override
public String toString() {
return "ResultFailure(" + failure.getMessage() + ")";
}
}
public class BatchJobJS {
public final String dataSelector;
public final String processingLogic;
public final String mergeLogic;
public BatchJobJS(String dataSelector, String processingLogic, String mergeLogic) {
this.dataSelector = dataSelector;
this.processingLogic = processingLogic;
this.mergeLogic = mergeLogic;
}
public BatchJobJS withDataSelector(String selector) {
return new BatchJobJS(selector, processingLogic, mergeLogic);
}
}
public class WorkerJS {
private static final ScriptEngine ENGINE = new ScriptEngineManager().getEngineByName("nashorn");
public PartialResult runJob(final BatchJobJS job) {
final Invocable invocable = (Invocable) ENGINE;
try {
ENGINE.eval(job.processingLogic);
final Stream<DataElement> input = provideData(job.dataSelector);
final PartialResult result = (PartialResult) invocable.invokeFunction("process", input);
return result;
} catch (Exception e) {
return new PartFailure(e);
}
}
private Stream<DataElement> provideData(String selector) {
/* fetch data from persistent storage in streaming fashion */
return Stream.of(1, 2, 3).map(DataElement::new);
}
}
job.js
var PartSuccess = Java.type(
'chapter14.ComplexCommand.PartSuccess');
var process = function (input) {
// 'input' is a Java 8 Stream
var value = input.count();
return new PartSuccess(value);
}
代码清单 14-9 外部 DSL 使用了不同于宿主编程语言的语法
FOREACH Car (_, _, year, price)
SELECT year ? 1950 && year < 1960
MEDIAN OF price
REMEMBER AS p
FOREACH Car (make, model, _, price)
SELECT price > p
DISTINCT VALUES OF (make, model)
RETURN AS RESULT
代码清单 14-10 内部 DSL
public static void akkaStreamDSL() {
final RunnableGraph<CompletionStage<Long>> p =
Source.<DataElement>empty()
.filter(new InRange("year", 1950, 1960))
.toMat(Sink.fold(0L, new Median<>("price")), Keep.right());
Source.<DataElement>empty()
.map(new Inject<>(p, "p"))
.filter(new Filter("price > p"))
.to(
Sink.fold(
Collections.emptySet(), new DistinctValues<Pair<String, String>>("make", "model")));
}
14.5 托管阻塞模式
代码清单 14-11 维护一个私有的ExecutorService
public enum AccessRights {
READ_JOB_STATUS,
SUBMIT_JOB;
public static final AccessRights[] EMPTY = new AccessRights[] {};
}
public class CheckAccess {
public final String username;
public final String credentials;
public final AccessRights[] rights;
public final ActorRef replyTo;
public CheckAccess(
String username, String credentials, AccessRights[] rights, ActorRef replyTo) {
this.username = username;
this.credentials = credentials;
this.rights = rights;
this.replyTo = replyTo;
}
}
public class CheckAccessResult {
public final String username;
public final String credentials;
public final AccessRights[] rights;
public CheckAccessResult(CheckAccess ca, AccessRights[] rights) {
this.username = ca.username;
this.credentials = ca.credentials;
this.rights = rights;
}
}
public class AccessService extends AbstractActor {
private final ExecutorService pool;
private final DataSource db;
public AccessService(DataSource db, int poolSize, int queueSize) {
this.db = db;
pool =
new ThreadPoolExecutor(
0,
poolSize,
60,
SECONDS,
new LinkedBlockingDeque<>(queueSize),
new NamedPoolThreadFactory("ManagedBlocking-" + getSelf().path().name(), true));
}
@Override
public Receive createReceive() {
final ActorRef self = self();
return ReceiveBuilder.create()
.match(
CheckAccess.class,
ca -> {
try {
pool.execute(() -> checkAccess(db, ca, self));
} catch (RejectedExecutionException e) {
ca.replyTo.tell(new CheckAccessResult(ca, AccessRights.EMPTY), self);
}
})
.build();
}
@Override
public void postStop() {
pool.shutdownNow();
}
private static void checkAccess(DataSource db, CheckAccess ca, ActorRef self) {
try (Connection conn = db.getConnection()) {
final ResultSet result = conn.createStatement().executeQuery("<get access rights>");
final List<AccessRights> rights = new LinkedList<>();
while (result.next()) {
rights.add(AccessRights.valueOf(result.getString(0)));
}
ca.replyTo.tell(new CheckAccessResult(ca, rights.toArray(AccessRights.EMPTY)), self);
} catch (Exception e) {
ca.replyTo.tell(new CheckAccessResult(ca, AccessRights.EMPTY), self);
}
}
}
1.0.0