2.6.Async之役

    本章将重点介绍Vert.x中的数据结构——Future/Promise(Promise是3.8之后的类),主要是让读者对异步编程有更深入的理解,实现多种不同场景的异步流编连,Vert.x中的异步编程是它的一大亮点,也是编程中最核心的一部分,所以本章标题使用了:役(战役),它的并不是编程本身的复杂度,而是编程思维上的一种革新,其实Vert.x也可定义成全异步框架。

    Vert.x中常用的四种异步编程方式如下:

  • Callback:回调方式

  • Future/Promise:Promise方式

  • ReactiveX:响应式方式

  • Coroutine:协程方式

    四种方式都可以在Vert.x实现异步,它们各自的特征如下:

    Callback回调方式——这种方式是官方文档中常用的方式,通过设置某个对象的回调处理器,去实现异步回调。这种方式的缺点十分明显:如果出现了多层嵌套过后,很容易陷入回调地狱,影响代码的可读性,所以这也是我不推荐的一种方式。不推荐并不代表不能用,在生产环境中,您可以选择让Callback和Promise方式配合,并且遵循一个基本原则:如果回调函数代码逻辑足够简单,那么你可才选择回调方式(简单的定义:单一职责!)

    Promise方式——这种方式参考了前端Promise的玩法,让整个异步代码变得更加平滑,解决了回调地狱的问题,而且这种方式比较中规中矩,对一般开发人员而言,是最容易上手的一种方式。——我的心得是,如果你对后边两种方式不熟悉的时候,可以优先选择Promise方式去开发,浅显易懂。

    ReactiveX方式——这是响应式编程方式,可以说这种编程方式源起于RxJava,在Vert.x中如果使用ReactiveX的方式需要引入另外的子模块:vertx-rx-javavertx-rx-java2,分别对应RxJava 1.x和RxJava 2.x的内容,由于RxJava从1.x升级到2.x做了很大的改动,所以在Vert.x中使用的是两个项目来完成。如果您要使用这种方式编程,推荐使用RxJava 2.x。

    Coroutine方式——协程方式,众所周知Java是不支持协程的,如果在Java语言中使用则需要引入项目vertx-sync来支持这种方式,不仅仅如此,您还需要在您的运行参数中加上:-javaagent:/path/quasar-core.jar让Java语言编写的程序在这种方式下运行;如果您使用的语言是kotlin,则需要引入vertx-lang-kotlin-coroutines项目,但比起Java语言,这种方式在kotlin中更易于编写,所以推荐在kotlin语言平台使用这种方式去开发。

    本章主要覆盖前两种方式,ReactiveX和Coroutine方式的确是两种更优秀的方式,但对开发人员的要求也会更高,最早我提过,Vert.x工具集不是单纯意义上的工具概念,很多时候它是编程思维的一种革新,如果说Promise方式还没有改变您的编程思维(毕竟开发过前端Promise的人都懂),那么后边两者绝对会让你感受到跨越式的思考,系统设计、代码编写、逻辑编排这几点都和传统的编程有些细微的区别,所以我们从难度最低的入手。

1. Callback模式

1.1 回调地狱

    初学者对回调地狱不是很了解,假设有这样一个场景:

我要给一家人打招呼,对每个人说一句 Hi。

    参考下边的代码:

package io.vertx.up._02.async;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.up.runtime.Runner;

public class CallbackFirst {

    public static void main(final String[] args) {
        hiAsync("Lang", r1 -> {
            if (r1.succeeded()) {
                System.out.println(Thread.currentThread().getName() +
                        ", " + r1.result());
            }
        });
        hiAsync("Huan", r2 -> {
            if (r2.succeeded()) {
                System.out.println(Thread.currentThread().getName() +
                        ", " + r2.result());
            }
        });
        hiAsync("Han", r3 -> {
            if (r3.succeeded()) {
                System.out.println(Thread.currentThread().getName() +
                        ", " + r3.result());
            }
        });
        System.out.println("Successful !");
    }

    private static void hiAsync(final String name,
                                final Handler<AsyncResult<String>> handler) {
        // 每个人开一个线程执行
        Runner.run(() -> handler.handle(Future.succeededFuture("Hi, " + name)), name);
    }
}

**「注」**代码是多个线程的运行,这里的Runner.run是Zero框架中开新线程执行的工具方法——为什么不使用同步代码?实际上在Vert.x编程中,开发人员需要解决的大部分场景就是多个异步的编排问题(这也是本章的重难点),并不是同步代码的顺序执行问题!

    Runner.run的内部实现代码如:

    public static void run(final Runnable hooker,
                           final String name) {
        final Thread thread = new Thread(hooker);
        // Append Thread id
        thread.setName(name + "-" + thread.getId());
        thread.start();
    }

    多次运行上边的代码,您将会看到类似如下的输出:

Successful !
Lang-12, Hi, Lang
Huan-13, Hi, Huan
Han-14, Hi, Han

    您会始终如一地发现,Successful!这句话一直都是最先打印,另外三句话的顺序会变得随机。好,让我们将需求改改:

我要给一家人打招呼,依次对每个人说一句 Hi。

    需求改了,但是请读者牢记,依旧是一个人一个线程,不能把每个打招呼的动作放到主线程中!先看看改动代码的回调版本:

package io.vertx.up._02.async;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.up.runtime.Runner;

public class CallbackHole {

    public static void main(final String[] args) {
        // 第一个打招呼
        hiAsync("Huan", r1 -> {
            if (r1.succeeded()) {
                System.out.println(r1.result());
                // 第二个打招呼
                hiAsync("Han", r2 -> {
                    if (r2.succeeded()) {
                        System.out.println(r2.result());
                        // 第三个打招呼
                        hiAsync("Lang", r3 -> {
                            if (r3.succeeded()) {
                                System.out.println(r3.result());
                                // 打完招呼
                                System.out.println("Successful !");
                            }
                        });
                    }
                });
            }
        });
    }

    private static void hiAsync(final String name,
                                final Handler<AsyncResult<String>> handler) {
        // 每个人开一个线程执行
        Runner.run(() -> handler.handle(Future.succeededFuture("Hi, " + name)), name);
    }
}

    这样当您运行这个程序时,它就会按照我们预期的顺序打印信息,而且每次的输出信息不变:

Hi, Huan
Hi, Han
Hi, Lang
Successful !

    但代码变得不是那么容易阅读,由于每一层回调中都包含了下一层的回调,形成了一个回调的嵌套结构,最后代码的形状会形成一个金字塔形状,这样的代码我们通常就称为回调地狱。示例中只有三个动作,如果您的异步动作越来越多,嵌套也会线性增加,最终将是开发人员的灾难!这种代码可读性差、可维护性更差、如果嵌套太深,逻辑上理解起来也会产生很强的耦合性,导致在代码修改时及容易犯错,相信大部分使用过Vert.x的初学者在写代码过程中都遇到过这种问题(Vert.x中的官方教程几乎全程使用了Callback方式编写,但是它的示例代码只有一层,很少出现嵌套!)。

1.2. 折中解决办法

    在改动上边示例之前,我们先尝试使用传统方法来对代码进行修正(不使用Promise模式),传统模式中解决回调地狱最常见的方法是函数拆分——直接将匿名函数拆分成函数。这种拆分从某种意义上讲是没有意义的,主要原因是只是拆分了代码块,拆分的粒度有点为了拆分而拆分的意思,这种拆分出来的函数同样增加了后期维护的压力。如下边代码:

package io.vertx.up._02.async;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.up.runtime.Runner;

public class CallbackHole1 {

    public static void main(final String[] args) {
        // 第一个打招呼
        hiHuan();
    }

    private static void hiHuan() {
        hiAsync("Huan", rs -> {
            if (rs.succeeded()) {
                System.out.println(rs.result());
                hiHan();
            }
        });
    }

    private static void hiHan() {
        hiAsync("Han", rs -> {
            if (rs.succeeded()) {
                System.out.println(rs.result());
                hiLang();
            }
        });
    }

    private static void hiLang() {
        hiAsync("Lang", rs -> {
            if (rs.succeeded()) {
                System.out.println(rs.result());
                System.out.println("Successful !");
            }
        });
    }

    private static void hiAsync(final String name,
                                final Handler<AsyncResult<String>> handler) {
        // 每个人开一个线程执行
        Runner.run(() -> handler.handle(Future.succeededFuture("Hi, " + name)), name);
    }
}

    上述程序的输出符合我们的预期,但是——治标不治本,我们只是将回调地狱拆分开了,思路上依旧是回调地狱,并没有从根本上解决这个问题。而且主程序逻辑上已经无法看到调用链,那么在维护代码时,函数和函数之间的调用也将是个大问题,当你调试程序需要一步一步跟进的时候,这个过程也会让你重新回到地狱去,所以才有了Promise模式。

1.3. 回调的打开方式

    看完了前边两部分内容,可能读者会被回调模式吓到,觉得这种模式不能用,恰好相反——如果你合理使用回调模式,那么在代码中还是可以设计出十分漂亮的结构的。

    我们在编程过程中很多时候不讲究,才导致代码质量下降,任何情况下,如果你对代码质量不做任何控制,很多问题会像滚雪球一样,逐渐累积,等到你的项目病入膏肓的时候,维护和变更将会引起雪崩一样的局面。所以从最初写代码开始,就要引入思考和设计,不要觉得设计是架构师的事,作为一个合格的开发人员,合情合理地调整代码结构让它变得优雅和易于维护,也是一个开发人员应该具有的基本素质,写代码的目的不是为了炫技,更不是为了向别人证明什么,写代码应该是一种创造艺术品的过程,这一点希望读者牢记。

    那么回调模式在哪种场景下是可以用的呢?这里我总结了几点,都是从实际项目中总结出来的经验,它不一定完美,但希望可以拓展读者的视野。

1.3.1. 简单逻辑回调

    本章最早提过,如果使用回调模式尽可能让自己的回调函数简单,这里的简单会让大部分人觉得是不是代码行数少就可以了,实际上不是,而是代码内容所作的事情很单一,遵循软件设计六个原则中的单一职责原则。通俗讲:就是回调函数中的内容只集中做一件事,代码本身内聚性很强,至于代码内部的复杂度可以根据不同场景有所区别,但从调用代码上看,后续所有代码执行都是可封装的。

    Zero中的权限认证部分,使用了很多回调模式的代码(我也承认是在最早对Vert.x认知不够时延续下来的),参考下边代码:

    /*
     * Get token from sessionTokens
     */
    this.sessionTokens.get(token, res -> {
        if (res.succeeded()) {
            /* Token verified from cache */
            LOGGER.info(Info.MAP_HIT, token, res.result());
            /*
             * Also this situation, the prerequisite step could be skipped because it has
             * been calculated before, the token is valid and we could process this token
             * go to 403 workflow directly.
             * 401 Validation Skip
             * 403 Validation ( If So )
             */
            this.securer.authorize(authInfo)
                    /* Mount Handler */
                    .onComplete(this.authorized(token, handler));
        } else {
            LOGGER.debug(Info.MAP_MISSING, token);
            /*
             * Repeat do 401 validation & 403 validation
             * 1) 401 Validation
             * 2) 403 Validation ( If So )
             */
            this.prerequisite(token)
                    /* 401 */
                    .compose(nil -> this.securer.authenticate(authInfo))
                    /* Mount Handler */
                    .onComplete(this.authorized(token, handler));
        }
    });

    上述代码不是最优代码,但是整体结构逻辑很清晰,并且回调函数中只做了一件事:带缓存的401验证,在权限认证过程中,如果已经认证过,在同样请求发过来时,拿着Token可以直接进入403的授权流程,而不用做重复性401验证。其实在使用Vert.x的原生代码时,很多时候使得你不得不用Handler<AsyncResult<T>>来处理回调,这种模式下,您需要思考的是如何对代码本身进行编排,防止自己掉入回调地狱,而这个时候,推荐的一种方式就是维持后续代码的封装性

    很多开发人员都在写代码时用过ifif-else的判断,判断是容易在计算机语言中出现逻辑分歧的点,而Vert.x中最常用的一个代码结构是异步执行结果的基础判断,初学者会经常遇到下边这种代码:

    /**
     * 这里是示例代码,reference表示触发异步操作的对象,call表示异步方法,xxx是传入实参
     **/
    reference.call(xxx, res -> {    
        if(res.succeeded()){
            // 成功回调
        }else{
            // 失败回调
        }
    });

    请读者牢记,在Vert.x编程中,把上述代码的使用作为黄金法则——定义为回调的极限。也就是说两个分支之下尽可能遵循下边几个原则:

  1. 不放任意执行逻辑,只放调用逻辑。比如在分支中,不使用判断、不使用循环、同样不计算,而是直接调用另外的函数或者接口来简化回调(实际上这种思路就是传统方法解决回调地狱的函数拆分)。

  2. 唯一允许的逻辑,就是入参准备逻辑。Java是强类型语言,当您需要调用函数或接口时,这些函数和接口的入参需要在回调之前准备完成(被调用者的Pre-Condition),个人觉得这是在回调分支中唯一允许出现的执行逻辑代码,对于某些复杂场景中的参数准备流程,有可能您还需要借用某个工具类或另外的调用步骤来完成。

  3. 如果执行步骤特别多,使用流水线。使用函数式编程思维,将每个执行步骤尽可能抽象成Monad,然后将所有的Monad放到一起形成函数调用流水线,来完成单逻辑编排。

  4. 错误分支必须有代码。这一点是容易被忽略的地方,实际上在另外一个分支(res.succeeded()判断为false)中,一定要有对应的错误代码,简易的写法可以在伪代码和示例代码中出现,但生产环境的代码,这些不起眼的错误分支有时候往往会酿成灾难。

    有了上述几个原则约束,那么您在真正的项目开发中所写的回调,就变得清晰了,而且避过了回调地狱——为什么说避过是因为一旦使用回调,就意味着很有可能您在写代码的过程中直接造成回调地狱。

1.3.2. 结果通知

    这种方式的回调专程拎出来说,显得有些鸡肋,但是这的确是回调的最佳摆放位置。读者仔细思考Vert.x中很多官方的例子,其实它提供的回调的使用很多时候都是通知结果,这种代码天生拥有单一职责的优势,它的存在有时候只是为了一个信号,这个信号是为了告诉人——我这个动作已经完成。如:

        vertx.deployVerticle(name, option, (result) -> {
            // Success or Failed.
            if (result.succeeded()) {
                // Success Log
                logger.info(Info.VTC_END,name, option.getInstances(), result.result(),flag);

                // Stored in instance manager
                INSTANCES.put(clazz, result.result());
            } else {
                // Failure Log
                logger.warn(Info.VTC_FAIL,name, option.getInstances(), result.result(),
                        null == result.cause() ? null : result.cause().getMessage(), flag);
            }
        });

    上边代码是Verticle的发布代码,发布成功过后,系统只是产生了两条发布日志,当您发现你的Verticle发布失败时,您可以在日志系统中找到发布失败的痕迹。为什么不使用printStackTrace将堆栈日志直接打印出来?——你别忘了最终的系统要上生产环境,生产环境不仅仅对日志的数量会有所考虑,更不要提直接使用printStackTrace这么暴力的方式来打印日志信息,通常我们在生产环境只会选择整个系统中关键性的位置做printStackTrace,而且数量尽可能少,系统的反馈信息是用您设计好的日志系统和容错两个环节来完成,而不是逢错就Trace

1.3.3. 单行函数回调

    单行函数回调是我们在系统开发中常用的一种,通常这种回调可以直接将回调逻辑从:() -> {}转换成() -> xx这种模式,Java 8开始引入了函数式接口,所以很多写法在原始编码风格中会有所改变,而单行函数回调往往会让你的代码本身变得朴素。如果您使用了IDE开发,一旦出现可直接转换的函数处理,IDE也会直接用警告的方式提示你去掉return关键字。

    比如前边示例中出现过的回调:

    Runner.run(() -> handler.handle(Future.succeededFuture("Hi, " + name)), name);

    上边代码是典型的单行函数回调,回调函数直接使用() -> xx的模式完成,而不需要借助花括号,初学者也可以把这种回调理解成:去花括号的回调。由于您的函数部分只执行了单一的函数逻辑而导致整个回调变得非常简单,如果设计得好,几乎是没有任何副作用的。

**「注」**单行函数回调不仅仅局限于异步调用,同步调用中也可以使用,实际上这种属于广义的回调使用场景,不局限于在Vert.x框架的异步调用中,当你觉得某些单一场景需要使用回调时,这种设计有可能不可或缺。

1.3.4. 转换代码

    这种模式下的回调,只可能说是不得不为之,Vert.x中的很多原生代码直接使用Handler<AsyncResult<T>>参数来执行异步返回信息,而为了解决这种场景,我们有时候会在Callback模式和Promise模式之间进行切换,切换时,回调代码成为了必须代码,所以说这是不得不为之。参考下边的转换代码:

    /*
     * 封装(无类型转换)
     */
    private static Future<String> hiString(final String name) {
        final Promise<String> result = Promise.promise();
        hiAsync(name, result);
        return result.future();
    }

    /*
     * 封装(有类型转换),版本1
     */
    private static Future<JsonObject> hiJson(final String name) {
        final Promise<JsonObject> result = Promise.promise();
        hiAsync(name, res -> {
            if (res.succeeded()) {
                result.complete(new JsonObject().put("result", res.result()));
            } else {
                result.fail(new RuntimeException("Exception found"));
            }
        });
        return result.future();
    }

    /*
     * 封装(有类型转换),版本2
     */
    private static Future<JsonObject> hiJson2(final String name) {
        final Promise<String> result = Promise.promise();
        hiAsync(name, result);
        return result.future().compose(done -> {
            final JsonObject response = new JsonObject().put("result", done);
            return Future.succeededFuture(response);
        });
    }
    /*
     * 原始函数
     */
    private static void hiAsync(final String name,
                                final Handler<AsyncResult<String>> handler) {
        // 每个人开一个线程执行
        Runner.run(() -> handler.handle(Future.succeededFuture("Hi, " + name)), name);
    }

    上边代码中,通过封装hiAsync方法直接将Callback模式下的接口转换成了Promise模式(这部分后边还会涉及)提供给上层调用,如果没有任何类型转换还可以无缝对接,若存在类型转换,难免需要有位置去写转换部分的代码,这是无法避开的。如果抽象到极致(这个我做过实验),您确实可以实现没有任何回调的转换流程,直接将Handler<AsyncResult<T>>转换成Future<T>,但那种模式下需要更抽象的思维方式,对很多初学者而言,代码设计和阅读上难免会碰壁,所以真正转换时可以根据实际情况来选择。

1.4. 小结

    本小节主要针对回调模式进行了详细讲解,也提供了某些折中解决方案解决有可能面临的回调地狱问题,以及回调模式的使用场景,不推荐在编程过程中使用回调主要是因为回调地狱,但并不表示禁用,——存在即是合理的,只看你在真实的项目开发过程中如何去摆放这种合理性,提高代码本身的可维护性。

2. Promise模式

    接下来我们步入Promise模式的殿堂,不可否认,这是我使用最多的一种模式,也是我在本书中会重点介绍的一种模式,——它非常Monad。——我使用它的目的很简单,在整个Vert.x的框架中,Promise模式是不需要引入任何子项目的,可以称得上是原生支持,而且用习惯过后,你可以写出可维护性非常高的代码。

    那么什么是Promise呢?

  • 从概念上讲,它是异步编程的一种解决方案。

  • 从语法上讲,它就是一个对象,从它这里可以拿到异步调用的结果。

  • 从语义上讲,它的含义是“承诺”,它会承诺不论哪种情形,它都给你一个结果。

    一个标准的Promise有三种状态:等待(Pending),成功(Fulfilled),失败(Rejected),在编写过程中,它包含了两个动作:resolve/reject,调用过后会发生状态迁移,它的状态迁移表如下:

    Promise的状态是不可逆的,而且在一次调用过程中,resolve/reject两个动作只能二选一,那么Promise为什么能解决异步编程中的问题呢?它解决了什么问题呢?前边一个章节提到了异步调用中的回调地狱问题,这里总结一下,异步编程的回调模式在编程过程会遇到下边几个问题:

  • 第一个典型的问题就是回调地狱,这里不重复。

  • 使用回调模式后,代码的执行逻辑和书写逻辑不一致,对代码的阅读和维护都是一种挑战。

  • 如果开发过程中需要调整异步代码的执行顺序,可能面临大规模的代码重构。

  • 使用了异步回调函数过后,不太容易拿到函数返回值,通常无法直接使用return来提取返回值信息。

    而Promise模式却解决了上述问题,有了这种模式后,回调地狱的问题解决了,不仅仅如此,Promise模式还支持顺序调用并发调用,给多个异步操作的调度提供了天生的土壤,而这两种调用在复杂的实战项目中属于高频场景,本小节也会提到这两种调用的一些编程细节,并提供完善的代码让读者可以真正将Vert.x的异步编程掌握。

2.1. Promise

    Vert.x早期版本中只有一个io.vertx.core.Future类来实现Promise模式(所以在提到Vert.x的异步编程时,我们有时候也将Promise模式称为Future模式,只是这是Vert.x中的术语。),从3.8.x之后引入了一个新的类io.vertx.core.Promise——这个类更像标准的Promise,它不像Future可以一直调用compose方法形成完善的顺序调用结构,这个类只提供了标准的二选一操作:要么成功、要么失败,但它和Future对象是可以相互转换的。由于Future类和Promise类都继承了Handler<AsyncResult<T>>,所以这里两个类都可以直接使用在回调模式中。

    在讲这两个类的用法之前,先看一段代码:

        // 构造一个 Promise
        final Promise<String> promise = Promise.promise();
        
        /*
         * 构造一个 Future
         * @Deprecated
         */
        final Future<String> future1 = Future.future();
        
        /*
         * 构造一个 Future
         */
        final Future<String> future2 = Future.future(handler -> {
            handler.complete("Hello");
        });
        
        /*
         * 转换一个 Future
         */
        final Future<String> future3 = promise.future();

    上边代码是Vert.x中用于创建Promise和Future的常用代码,注意注释里那个@Deprecated标记,从3.8.x之后,Future.future()这种构造方式已经被废弃了,由于引入Promise类,对于单个Future的构造方式可直接使用Promise来转换,或者用带有参数的方式来构造,而不再使用Future.future()。对入参构造不太了解的读者,可以先使用Promise.promise()的方式构造,所有的步骤完成过后直接转换回Future——参考上边代码的第一步和最后一步。

    前边代码演示的是Promise和Future的构造,按照上边的状态迁移图,那么如何去触发resolve/reject两个动作呢?再看一段代码:

    // ------------ Promise 三态 --------------
    final Promise<String> success = Promise.promise();
    final Promise<String> failure = Promise.promise();
    System.out.println(success);
    System.out.println(failure);
    success.complete("Ok!");
    failure.fail("Ko!");
    System.out.println(success);
    System.out.println(failure);

    执行上边代码,您可以看到如下输出信息:

Future{unresolved}
Future{unresolved}
Future{result=Ok!}
Future{cause=Ko!}

    这里从输出反向分析,所有结果都很清楚了:当您使用Promise.promise()后就执行了初始化create动作,这个时候创建出来的Promise的状态是Pending(unresolved),若调用了complete方法则触发resolve动作,将这个Promise转换成了Fulfilled(看输出中的result=Ok!);若调用了fail方法则触发reject动作,将这个Promise转换成了Rejected(看输出中的cause=Ko!)。正如Promise的状态是不可逆的,如果在上边代码之后再继续调用complete/fail就会遇到下边两种异常。

## Fulfilled 状态调用 complete / fail 
Exception in thread "main" java.lang.IllegalStateException: Result is already complete: succeeded
## Rejected 状态调用 complete / fail
Exception in thread "main" java.lang.IllegalStateException: Result is already complete: failed

    如果您用的Vert.x的版本小于3.8.x,您可以用下边代码和上边例子输出一样(3.8之后不推荐):

    // ------------ Future 三态 --------------
    // 3.8.x 之后 future() 方法被废弃了
    final Future<String> f1 = Future.future();
    final Future<String> f2 = Future.future();
    System.out.println(f1);
    System.out.println(f2);
    // 3.8.x 之后 complete() 和 fail() 方法被废弃了
    f1.complete("Ok!");
    f2.fail("Ko!");
    System.out.println(f1);
    System.out.println(f2);

「思」其实Future是Vert.x中最早的Promise模式的类,但是在实际过程中Future需要完成的不是单纯的Promise模式的状态迁移,使用过Vert.x的开发人员逐渐发现Future还需要承担更多调用层的东西;所以我猜想设计Promise的初衷就是把这部分逻辑从Future中切出来——原子化,这样就很清楚Future和Promise各自的职责。在真实项目过程中,单个异步调用的场景特别多,而这种场景下,直接使用Future难免有些杀鸡用牛刀的感觉,因为Future的功能更强大,包括组建不同的顺序调用并发调用的函数链,可以说Future站的视觉更高,所以剥离开Future和Promise后对开发者而言也是福音,虽然从编程上改动的代码不多,但在代码设计的时候更容易下手,使得设计粒度更细。

    Future的语义是将来——它的含义是:这个执行结果会在未来返回,而不是现在,所以当您使用Future时,最需要关注的就是如何读取返回结果,这也是Promise模式开发中的难点。我们在开发过程中,有同事就直接遇到了代码中无法读取返回结果的问题,这也是Future中的常见问题(异步流程理解错误),下一个小结我们针对Future的方方面面来探讨。

2.2. Future

    前文示例代码已经了解了Future在3.8.x之前的创建代码(目前已经弃用),本章节我们从头来看看Future的一些知识点,让读者结合Promise真正熟悉这种模式的用法。不得不说,Future类才是Promise模式的主角,除了具备Promise本身的特征以外,它还可以构造复杂的异步场景。

2.2.1. 成功/失败

    先看下边代码:

package io.vertx.up._02.async;

import io.vertx.core.Future;

public class FutureFirst {
    public static void main(final String[] args) {
        final Future<String> success = Future.succeededFuture("Ok!");
        final Future<String> failure = Future.failedFuture("Ko!");
        System.out.println(success);
        System.out.println(failure);
    }
}

    从执行结果可以看到两个Future不同的状态:

Future{result=Ok!}
Future{cause=Ko!}

    这两个API(succeededFuturefailedFuture)是在编程模式中高频使用的方法,它们可以直接根据Java语言的执行结果创建成功失败的Future对象,实际两端代码执行了前文流程图中提到的resolvereject动作。这里我们打开源代码可以看到两个方法的实现:

    // 成功
    static <T> Future<T> succeededFuture() {
        return factory.succeededFuture();
    }

    static <T> Future<T> succeededFuture(T result) {
        if (result == null) {
            return factory.succeededFuture();
        } else {
            return factory.succeededFuture(result);
        }
    }
    // 失败
    static <T> Future<T> failedFuture(Throwable t) {
        return factory.failedFuture(t);
    }
    static <T> Future<T> failedFuture(String failureMessage) {
        return factory.failureFuture(failureMessage);
    }

    两个方法各自有一个重载方法,成功的时候可以选择内容为null或传入类型T作为Future的内容,而失败的时候,除了例子中用到的String类型作为错误信息以外,还可以直接使用Throwable类型(java.lang.Exception的父类)作为失败过后Future的内容。有内容的成功信息返回和Throwable的失败返回在实际项目中我个人觉得是最常用的,特别是Thowable的容错处理,这里读者有兴趣可以进入到内部代码去看FutureFactory接口下的实现,默认创建Future使用了io.vertx.core.impl.FutureFactoryImpl类。

    本书第一章节介绍了函数式语言中的Monad,我们可以将Monad理解成一个容器,而容器内部则是将要传递给下一个Monad的内容作为它的输入,为什么我会说Promise模式很Monad,因为Future本身的结构就是如此,它在实战开发过程中是可以直接做各种编排的,下边的代码骨架是一个返回为Future<T>的函数最常用的结构:

    public <T,I,O> Future<O> method(I input){
        final boolean success = ...;
        if(success){
            // 成功的时候返回
            final O output = ...;
            return Future.succeededFuture(output);
        }else{
            // 失败的时候返回
            final Throwable xxx = ...;
            return Future.failedFuture(xxx);
        }
    }

    也就是说Future本身可以作为两态返回,成功的时候你可以拿到想要的结果,而失败的时候则可以将某个异常信息封装在Future中返回,这样,一个典型的Monad结构就构造好了,它实现了这样一个事情——不论你的程序运行成功还是因为任何原因失败,都会返回一个合理的Future,即使您的函数中有副作用,这种副作用也会跟着Future的内容直接向上传递(调用者),直到传递给真正要处理这种副作用的调用者,而函数本身就可以完全转换成全函数

    **「思」**这里思考一个小问题:一个函数的返回值究竟应该是什么?您在编程过程中是不是对一个函数中的Pre-Condition的检查厌恶至极?对这个函数究竟应该如何返回也纠结不已?甚至于出现异常信息的时候,也不太清楚这个异常应该以什么形态返回?这里推荐一部分作者的心得:

  1. 在函数式编程中,函数的返回永远只有一种类型,就是一个Monad,而Monda包含了两态:(成功+内容)/(失败+异常),有了这种结构过后,您就不用担心异常如何处理了。

  2. 对于Pre-Condition部分,直接使用防御式 + 默认值的方式,在执行函数代码之前,对于输入不满足函数执行条件,则提供默认的返回值,此时succeededFuture()方法就有了用武之地。

  3. 任何业务代码中禁止直接抛出异常(避免使用throw),而是对异常执行Future<Throwable>封装,实际上是构造了失败状态的Monad,然后递交给函数的下一个流程去处理。

2.2.2. 异步顺序编排

    本章的主题是异步顺序编排,那么Future如何实现异步顺序调用流程呢?先看下边代码:

package io.vertx.up._02.async;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.up.fn.Fn;
import io.vertx.up.runtime.Runner;

public class AsyncFirst {
    public static void main(final String[] args) {
        // 基本调用
        hiAsync("Begin").compose(result -> {
            // 最终执行
            System.out.println(Thread.currentThread().getName() + ", End Actual");
            return Future.succeededFuture();
        });
        // 最终执行
        System.out.println(Thread.currentThread().getName() + ", End");
    }

    // 转换成 Future
    private static Future<String> hiAsync(final String name) {
        final Promise<String> promise = Promise.promise();
        hiAsync(name, promise);
        return promise.future();
    }

    private static void hiAsync(final String name,
                                final Handler<AsyncResult<String>> handler) {
        // 每个人开一个线程执行
        Runner.run(() -> {
            System.out.println(Thread.currentThread().getName() + ", " + name);
            // 让子线程休眠
            Fn.safeJvm(() -> Thread.sleep(1000));
            handler.handle(Future.succeededFuture("Hi, " + name));
        }, name);
    }
}

    上述代码是在原始的示例代码中做的调整,最终打印结果如下:

Begin-12, Begin
main, End
Begin-12, End Actual

    从调用代码发现,打印End的代码会在End Actual之前执行,不论你运行多少次,都是这个结果,原因就是Future<String> hiAsync封装了异步线程调用(这里使用了Thread.sleep模拟异步执行),这是很多开发者容易犯错的地方,其实上边代码等价于JavaScript中的:

const promise = ...; // 异步操作Ajax
promise.then(response => {
    console.log("真实结果");
});
console.log("期望结果");

    也就是说,编写代码的顺序和代码执行顺序有些差异,这也是很多开发人员在编写异步程序时最容易出错的地方,我们往往会习惯性地将回调代码编写在打印End的地方,实际上这段代码很大的可能性会在compose内的代码之前执行。——之所以说很大可能是因为这个和Java的线程调度相关,我们不能保证这部分代码一定会在Future<T>的构造之后执行,所以这样的写法是错误的。这里讲主代码改成更加直观的例子:

        final List<String> response = new ArrayList<>();
        hiAsync("Lang").compose(result -> {
            response.add(result);
            return Future.succeededFuture();
        });
        System.out.println(response.size());

    上述代码执行过后的结果可能是0,也可能是1,如果子线程hiAsync方法执行速度够快,快过了主线程的下一行代码的调用,那么结果就是1,而如果hiAsync方法执行速度不够,如示例中强制性Thread.sleep了1秒,那么结果必然是0,大家可以参考下图对比:

这里由于子线程什么都没做,只是Thread.sleep了1秒,所以最终代码流程会走入第二个图,如果你注释掉那行,在自己本机运行时很大可能性会走入第一个图的流程,但往往异步操作意味着有复杂的代码执行逻辑,这样的代码写出来过后增加的是代码结果的不确定性,这种不确定性就是异步编程过程中Bug的根源,所以它不正确。 注释掉Thread.sleep过后,也许读者会发现System.out.println(Thread.currentThread().getName() + ", End Actual");这行代码打印出来的线程名称会有所不同,这个在上图中也标注了线程名,解释了原因。

    所以Future比较正确的打开方式是顺序调用,如果你的代码设计得非常紧凑,那么您就可以写出如下的代码:

        Unity.fetchView(dao, request, config)
                /* View parameters filling */
                .compose(input -> IxActor.view().procAsync(input, config))
                /* Uri filling, replace inited information: uri , method */
                .compose(input -> IxActor.uri().bind(request).procAsync(input, config))
                /* User filling */
                .compose(input -> IxActor.user().bind(request).procAsync(input, config))
                /* Fetch My Columns */
                .compose(stub.on(dao)::fetchMy)
                /* Return Result */
                .compose(IxHttp::success200)));

    不能说上边代码是完美代码,但是它却实现了Future的顺序调用,每一个compose方法中都是一个Monad的执行生成流程,执行完成过后会生成新的Monad传递给下一个节点,而compose方法中一般使用的是单行函数返回或直接使用JDK 8.x中的函数引用,这样的顺序调用在Promise模式中很常见,而且它的执行符合预期:下一个执行步骤一定会在上一个执行完成过后再执行,它们之间使用Monad传参,和示例中的End Actual打印结果一样,它一定会在Begin之后打印,这样就使得我们的异步代码变成了思维上的同步流程——这也是大部分业务场景所需要的。

    在构造顺序调用之前,我们先讲讲compose。compose的语义是“编排”,它的方法签名如下:

// 异步Handler转,桥接模式,最早的直接通过 Handler 转换的模式
// @Deprecated(已经废弃)
<U> Future<U> compose(Handler<T> handler, Future<U> next)
// 输出为单态的专用模式
<U> Future<U> compose(Function<T, Future<U>> mapper)
// 双态输出模式
<U> Future<U> compose(Function<T, Future<U>> successMapper, 
                      Function<Throwable, Future<U>> failureMapper)

    所有的节点的格式都是:T -> Future<U>,T代表泛型输入,U代表泛型输出的内容,实际上返回值是Future<U>,也就是返回了一个Monad结构,而下一个执行流程的入参已经被compose方法执行了解析(从上一个Monad中拿到了内容),也就是说,最终生成的执行流程如下图:

    也就是说,编程思维上思考这个问题,只要你调用了compose方法过后,整个代码执行流程就是顺序执行,不论执行过程是异步还是同步,它都会实现完美的顺序调用,这里再看个简单的例子:

package io.vertx.up._02.async;

import io.vertx.core.Future;

public class AsyncCompose {
    public static void main(final String[] args) {
        // 生成第一个节点
        Future.succeededFuture("Lang")
                // 打招呼
                .compose(AsyncCompose::sayHi)
                // 再打招呼
                .compose(lang -> AsyncCompose.sayHi("Hi", lang))
                // 切换人打招呼
                .compose(nil -> AsyncCompose.sayHi("Han"))
                // 再打招呼
                .compose(han -> AsyncCompose.sayHi("Hello", han));
    }

    private static Future<String> sayHi(final String prefix, final String name) {
        return At.hiAsync(() -> prefix + "," + name);
    }

    private static Future<String> sayHi(final String name) {
        return At.hiAsync(() -> name);
    }
}

这里的主代码通常在Vert.x的Promise模式中用作return 的返回值,所有的接口定义都返回对应的Future<T>的结构,代码中的At.hiAsync就是实例中对异步线程所做的封装,打印了线程名和传入的参数信息。

    运行上述代码,最终会得到如下输出:

hiAsync-12Lang
hiAsync-14,Hi,Lang
hiAsync-15Han
hiAsync-16,Hello,Han
// 第一个hiAsync-XX是线程名,线程名在您的环境中和我的环境有所不同

    通过上边的转换就将多个异步线程编排成了顺序结构,它们会依次执行,而且下一个执行线程会拿到上一个执行线程的运行结果,这和JavaScript中的then的原理是一样的。

    compose编排方法的双参数结构读者可以自行研究,它只是多了一个Rejected状态的Monad的生成流程,采用了双态结构,而上述例子中,所有的子线程都是异步调用流程,而且它们会形成一条顺序链——这是业务代码最常用的结构,至于每个节点的代码中您想要如何编写,就可以根据您真实的业务场景来设计。

    编排是Vert.x中的Promise模式的难点,除了本小节提到的顺序编排,后边还会讲到更多复杂的编排,我尽可能把项目过程中遇到的编排都放在本书中,作为核心参考,也帮助大家彻底理解Vert.x中的Promise模式。

2.2.3. 异步并发编排

    另外一种最常见的场景就是集合类型的转换,在编程过程中,您拿到一个集合,您需要针对集合中的每一个元素执行异步操作,最终合并到一个新的集合结果中——这也是比较常见的一种复杂场景,此时就需要使用类io.vertx.core.CompositeFuture。先看下边的代码:

package io.vertx.up._02.async;

import io.vertx.core.Future;
import io.vertx.up.unity.Ux;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class AsyncCombine {
    public static void main(final String[] args) {
        final List<String> nameSet = new ArrayList<>();
        nameSet.add("Lang");
        nameSet.add("Huan");
        nameSet.add("Han");

        Ux.thenCombineT(
                // 为每个人构造打招呼的行为
                nameSet.stream()
                        .map(At::hiAsync)
                        .collect(Collectors.toList())
        ).compose(response -> {
            // 最终返回的结果
            response.forEach(System.out::println);
            return Future.succeededFuture();
        });
    }
}

    上边的代码中,输入参数为一个List<String>,针对每一个元素,都执行了异步代码hiAsync,并且在最终结果中,按照输入参数的顺序将结果合并打印出来,它的输出如下:

Lang-12, Lang
Han-14, Han
Huan-13, Huan
Hi, Lang
Hi, Huan
Hi, Han

    读者会发现最终的输出结果的顺序和入参顺序一致:Lang, Huan, Han,这就是Ux.thenCombineT函数所作的事,它执行了合并编排,如下图:

    在看thenCombineT的代码细节之前,读者一定要先理解上图演示的转换,代码总共执行了两次核心转换,并且拿到我们期望的执行结果:

  • List<T>转换成List<Future<T>>结构,此时Future的数量为list.size()。

  • List<Future<T>>转换成Future<List<T>>结构,此时Future的数量为1。

    上述场景在很多业务代码开发的时经常出现,这种需求在稍稍复杂一点的场景中几乎是高频场景,如果您没有彻底理解数据结构的变化,那么仔细阅读前边的图,接下来我们一起看看Ux.thenCombineT究竟干了什么?参考下边代码:

    <T> Future<List<T>> thenCombineT(final List<Future<T>> futures) {
        // 将传入的 List<Future<T>> 构造一个不带类型的 List<Future>
        // 如此构造的目的是保证 CompositeFuture.join 可通过编译,它的签名只看 Future,不看 T
        final List<Future> futureList = new ArrayList<>(futures);

        // 直接返回合并过后的结果
        return CompositeFuture.join(futureList).compose(finished -> {

            // 将合并过后的结果抽取出来
            final List<T> result = new ArrayList<>();

            Ut.itList(finished.list(),
                    // 按顺序添加每一个 Future<T> 的结果到最终结果中
                    // Ut.itList 中做了元素 item 的非空检查,这里保证绝对不会是 null,视场景而定
                    (item, index) -> result.add((T) item));
            
            return Future.succeededFuture(result);
        });
    }

    这里用了一个新类io.vertx.core.CompositeFuture,它主要帮助我们实现合并编排,当多个异步操作同时执行时,我们如何处理响应结果,它有三个核心的方法:join,all,any,我们在业务中设计了自定义的容错架构,所以经常使用join,下边是三种方法的不同使用场景:

  1. all:该方法会等待多个Future全部成功,然后响应成功操作,构造最终的List<T>响应结果,如果某个Future失败了(被reject),那么直接结束等待,响应失败,返回最终的Future{cause=..}

  2. any:多个方法中只要有一个Future成功,就直接响应这个Future的成功操作,如果全部都失败的话,则响应失败操作。

  3. join:等待多个Future操作全部完成,然后响应成功操作,如果中途有future失败(被reject),依然会继续等待所有的future执行完成,若出现了future失败的时候,返回最终的Future{cause=...}

    all和join唯一的区别是是否等待剩余的Future执行完成,它们都会返回错误的结果,而all带有中断功能,一旦失败则不再执行,而join更倾向于全结果输出,即使某个Future失败依然会执行,直到所有的Future执行完成为止。Vert.x中调用这三个方法均使用了静态方法,而不是让我们去构造一个实现对象io.vertx.core.CompositeFutureImpl,主要是这三个方法已经可以满足所有的并发编排场景,对开发者而言三个API足够使用了,索性简单化。

2.2.4. 引用和元组

    在函数链操作中,经常遇到的一个问题是参数的传递问题,假设有下边这样一个场景:

    上边的每个节点async-x表示一个compose中编排的异步执行步骤,思考:如果async-3的执行步骤想要使用data1或者data2的数据,那么应该如何处理呢?这就是本章节带领大家去解决的问题。

    第一章函数式编程中我们已经讲过,一个双参函数fun(a,b)可以通过科里化将它转换成fun({a,b})的格式,这样的话,一个多参数函数就转换成一个单参数函数了,看过上一小结的代码后,您应该清楚为什么我们需要一个单参数函数,这也是compose中的每一个执行过程的限制,只能是T -> Future<U>的结构。但由于Java不是函数式语言,它也没有元组Tuple的数据结构,如果要将参数转换成单参数,唯一的办法是定义一个class,但对很多场景而言,这种定义似乎又显得为了函数而函数,所以是否可以找到一种折中的方式呢?

    先看下边代码

        final Refer dataArray = new Refer();
        final Refer roleRef = new Refer();
        return this.runPre(user)
                // 存储 users 到 Refer 中,并返回
                .compose(dataArray::future)
                .compose(nil -> IdcRole.create(this.sigma).fetchAsync())
                // 存储 roles 到 Refer 中,并返回
                .compose(roleRef::future)
                .compose(nil -> {
                     // 读取 users / roles 引用
                     final JsonArray users = dataArray.get();
                     final JsonArray roles = roleRef.get();
                     // 返回……
                })

    上边两段代码创建了两个引用容器Refer,这两个引用容器可以在当前环境中暂存引用信息,并且在后续过程中需要使用时直接提取数据,步骤1和步骤2中的返回结果存储在不同的引用中,在最后的compose方法内可直接消费这两份数据,这个场景就类似于最早的那个问题,async-3消费了data1data2两份数据。——但是这种做法只适合一份或者两份数据的情况,即Refer最好创建不超过三个,主要是代码的维护问题,而且这种代码是破坏就近原则(尽可能在使用变量的附近去定义该变量,而不应该过早声明变量)的。这里的Refer代码如下:

/*
 * 这里只演示了上边用到的两个方法的代码,Zero中实际的Refer
 * 代码更复杂。
 **/
@SuppressWarnings("all")
public final class Refer {

    private Object reference;
    /*
     * For vert.x compose method only.
     */
    public <T> Future<T> future(final T reference) {
        this.reference = reference;
        return Future.succeededFuture(reference);
    }
    @SuppressWarnings("unchecked")
    public <T> T get() {
        return Fn.getNull(() -> (T) this.reference, this.reference);
    }
}

    如果上述的场景在您的业务流程上需要特别多,那么最好的办法是封装您的参数,可选择:

  • 使用class自定义参数类型,该参数可实现不同的引用存储,横跨整个函数链——这种模式相当于使用了统一的数据结构来串联Monad。

  • 使用JsonObject类型,由于它本身具有对象化的特性,所以也可以直接使用JsonObject类型,——这种数据格式的唯一缺陷就是需要您在每个执行函数中手工读取数据,并校验Pre-Condition,在复杂场景中语义不强。

    以下是我们在生产环境中使用class自定义类的代码:

        /* 绑定请求数据 */
        final DataOrder order = DataOrder.start().bind(data);

        /* 构造订单项,直接创建,需要在 data 中填充 items 节点 */
        final JsonArray items = Et.itemCompress(travelers, order);
        /* 计算总价专用 */
        data.put(KeField.ITEMS, items);
        return this.orderStub.createAsync(data, "Registered")
                .compose(order::nextOrder)

                /* 创建订单项 */
                .compose(nil -> this.itemStub.createAsync(items, order))
                .compose(order::nextItems)

                /* 创建排房记录 */
                .compose(nil -> this.planStub.createAsync(rooms, order))
                .compose(order::nextSchedules)

                /* 办理入住 - 宾客记录/入住记录 */
                .compose(nil -> this.checkInStub.createAsync(travelers, order))
                .compose(order::nextTravelers)

                /* 初始化账本 */
                .compose(nil -> this.bookStub.createAsync(rooms, order))
                .compose(order::nextBooks)

                /* 更新房间状态 */
                .compose(nil -> Et.statusDict(order.order().getZSigma()))
                .compose(statusMap -> this.statusStub
                    .saveStatus(Et.etatTaken(statusMap), statusMap, order))
                .compose(nil -> order.out());

    上边代码中构造了一个DataOrder的class,它内部帮忙处理了不同节点的数据收集以及计算,并且根据不同节点的计算,横跨整个流程进行流程编排和数据传输,这种方式是比较重的一种方式,复杂的业务比较合适,如果业务简单,那么这种模式就显得稍稍重了一点。

2.2.5. 容错

    前边讨论的和Future相关的都是代码正常执行流程,本小节最后一起讨论下异常流程,主要针对Future中的两个核心方法otherwiserecover。先看看这两个方法的签名:

// 错误信息发生后的继续操作
Future<T> recover(Function<Throwable, Future<T>> mapper);
// 错误信息发生后的终止操作
Future<T> otherwise(Function<Throwable, T> mapper);
// 分支流程上的继续操作
Future<T> otherwise(T value);
// null 分支返回
Future<T> otherwiseEmpty();

    在异步编程的编排过程,每一个compose方法都有一个执行结果,它的类型是Future<U>,也就是我们通常提到的Monad结构,这种结构会有两种状态,如果被resolve那么就是成功,如果reject那么就失败,而上述方法主要是用于处理另一分支的情况(多半是异常),从注释部分大家可以窥探不同的使用场景,但这里还是要一起过一次。

  • otherwizeEmpty():通常用于分支结构中的null分支(虽然NullPointerException是Java中臭名昭著的问题,但如果你对null结果控制得好的话,它可以用于鉴别“无意义”和“无数据”的语义)。

  • otherwise(T value):这种情况和otherwizeEmpty()是相对的,用于处理默认值分支,如果你希望在条件不满足时返回默认值,可以使用这个方法生成一个带有默认值的分支。

  • otherwise(Function<Throwable, T> mapper)/recover(Function<Throwable, Future<T>> mapper):这两个函数的参数都是Function的函数类型,入参是Throwable,它表示内部执行过程中出现了Throwable异常,并且这个异常触发了fail(Throwable)动作;二者唯一的区别在于返回值:细心的读者会看到,如果使用的是recover,那么它在容错流程中依旧支持异步调用,您可以继续执行另外的异步代码生成一个新的Future<T>,而otherwize的语言更简洁:如果出错,什么都不做,返回一个默认值。

    上述四个方法都处理异常分支,而后两个方法带有诊断的语义,它的参数是一个函数,而入参就是当前compose流程执行过程中生成的Throwable对象,您可以使用日志系统或者在开发环境中直接printStackTrace()记录错误原因;recover的语义更多:它不仅包含诊断,还包含修复。——通常在compose过程中执行的是一段异步代码,如果要在错误出现过后再执行其他代码,使用异步方式是最合理的容错,仅仅返回某个值达不到修复的效果。而且按照Vert.x中编程的黄金法则,一般在异步编程过程中,尽可能减少同步IO的使用,如果您想使用otherwise(Function<Throwable, T> mapper)去实现修复流程——这个是允许的,但是它很有可能因为您对IO操作的不当触发Vert.x中经典的Blocked Issue,如此会引起更多的问题。所以既然Future提供了异步修复的接口,使用recover的语义去做修复工作是最恰当的。

    还有一点是读者必须要清楚的,就是这四个API影响的并不是当前代码的执行流程,而是compose的执行流程,它的控制点在输出,而不是输入。参考下边代码(仔细看结果和注释对比):

package io.vertx.up._02.async;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.up.runtime.Runner;

public class AsyncOther {

    public static void main(final String[] args) {
        hiAsyncError("Lang").compose(result -> {
            // 这里不再输出
            System.out.println(result);
            return Future.succeededFuture();
        }).otherwise("Huan").compose(finalResult -> {
            // 受影响的位置
            System.err.println(finalResult);
            return Future.succeededFuture();
        });
    }

    static Future<String> hiAsyncError(final String name) {
        final Promise<String> promise = Promise.promise();
        Runner.run(() -> {
            // 当前 执行 不受影响
            System.out.println(Thread.currentThread().getName() + "," + name);
            promise.fail(name);
        }, "hiAsync");
        return promise.future();
    }
}

    上边代码执行后会产生下边输出:

hiAsync-12,Lang
Huan # 如果使用的IDE则颜色会有区别

    也就是说上边代码的当前方法并不会受到fail的影响,依旧会执行,但是原始的compose方法中的代码没有执行,而是走入了otherwize之后的代码流程中,所以打印了Huan。这里读者需要注意,compose究竟属于谁?这是在阅读这种类型的代码的难点,由于它不像if-else那么直观,所以一旦看到上述四个方法的时候,脑子里需要有一个意识,就是这里有一个代码分支,而之后的compose都是分支后的代码,也就是例外情况。如果您使用的是带有Throwable的方法时,通常会遇到这样一个异常:

io.vertx.core.impl.NoStackTraceThrowable

    这个异常表示您在手动触发fail动作时,传入的参数并不是一个合法的Throwable,如果您想要它有内容,则需要在fail时传入一个合法的异常信息,才可以继续诊断——一个上述异常是在处理容错时常见的异常,很多时候并没有触发fail(Throwable)。将代码做适当调整:

        // 修改过后的 otherwize
        .otherwise(ex -> {
            ex.printStackTrace();
            return "Huan";
        })

        // 修改过后的 fail
        promise.fail(new RuntimeException("Exception, " + name));

    这样您又可以像以前一样调试和诊断了,同样出错的时候也可以看到完美的Java堆栈信息如:

java.lang.RuntimeException: Exception, Lang
	at io.vertx.up._02.async.AsyncOther.lambda$hiAsyncError$3(AsyncOther.java:29)
	at java.lang.Thread.run(Thread.java:748)

    到这里,Future的基本编排、复杂编排、容错就给读者介绍完了,接下来我们再看一个比较大的知识点:Callback模式和Promise模式的相互转换,虽然最早的示例中演示了它们之间的转换流程,但那些代码都不是生产代码;下一节我们结合Vert.x中固有的Callback相关API,再来讨论下两种模式的协同,加深读者对这块知识的理解,并且可将这两种模式应用在生产环境中。

3. 协同

3.1. Future回调

    前边两个小节我们学习了Vert.x中的Callback模式和Promise模式的基本用法,那么这个章节我们把二者串联起来,我们在学习Future时,其实有几个方法遗漏了,那几个方法是本章的重点。在3.8.x之前,回调模式下的Future只有一个方法:

  @Deprecated
  @Fluent
  default Future<T> setHandler(Handler<AsyncResult<T>> handler) {
    return onComplete(handler);
  }

    上述代码遇到了我们的老朋友Handler<AsyncResult<T>>,它表示异步结果处理器,而在3.8.x之后,Future新开了三个接口对回调结果进行了职责分离。

 // 包含了 success / failure 两种情况
  @Fluent
  Future<T> onComplete(Handler<AsyncResult<T>> handler);

  // 只包含了 success
  @Fluent
  Future<T> onSuccess(Handler<T> handler)

  // 只包含了 failure
  @Fluent
  Future<T> onFailure(Handler<Throwable> handler)

    不论使用哪个,核心的执行代码都是一致的:根据Monad的双态执行代码执行,如果熟悉之前的setHandler的伙伴可以直接用onComplete代替,先看一段简单的代码:

package io.vertx.up._02.async;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.up.runtime.Runner;

public class AsyncHandle {
    public static void main(final String[] args) {
        final Future<String> response = hiChoice("Huan");
        response.onFailure(error -> {
            System.out.println("Error");
            error.printStackTrace();
        });
        response.onComplete(item -> {
            if (item.succeeded()) {
                System.out.println("Complete");
                System.out.println(item.result());
            } else {
                item.cause().printStackTrace();
            }
        });
        response.onSuccess(handler -> {
            System.out.println("Success");
            System.out.println(handler);
        });
    }


    static Future<String> hiChoice(final String name) {
        final Promise<String> promise = Promise.promise();
        Runner.run(() -> {
            // 当前 执行 不受影响
            System.out.println(Thread.currentThread().getName() + "," + name);
            if ("Lang".equals(name)) {
                promise.complete(name);
            } else {
                promise.fail(new RuntimeException("Exception, " + name));
            }
        }, "hiAsync");
        return promise.future();
    }
}

    这段代码非常有意思,如果你传入的是Lang,那么onSuccess和onComplete都会执行,而且会按照您编写的代码顺序执行(先onComplete,再onSuccess);如果传入的是Huan,那么onFailure和onComplete同样会执行,执行顺序也是按照您编写的代码顺序执行(先onFailure,再onComplete)。所以我们在选择代码套装的时候可以考虑二选一,要么选择onComplete单方法,要么选择onSuccess/onFailure的成对方法来处理回调结果,真正在实战过程中,如果执行两次代码会引起很多问题,而且这里的方法在执行的时候并不会抛出我们期望的Result is already complete: succeeded异常,这是读者最需要小心的地方。

3.2. 同异步转换

    有了本文的Future基础和Callback基础后,那么同步转异步和异步转同步就变得相对容易多了。

3.2.1. Callback转Future

    这种转换在最早的示例就提到过,主要是借用Promise的complete来完成,这里不重复。

3.2.2. Future转callback

    如果要将一个Future转换成Callback模式,可以参考下边的代码(如果您仔细阅读过本章内容,这个不难):

package io.vertx.up._02.async;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;

public class AsyncCall {

    public static void main(final String[] args) {
        hiCallback(handler -> {
            if (handler.succeeded()) {
                System.out.println(handler.result());
            } else {
                handler.cause().printStackTrace();
            }
        });
    }

    private static void hiCallback(final Handler<AsyncResult<String>> handler) {
        final Future<String> response = At.hiAsync("Huan");
        response.onComplete(handler);
    }
}

3.2.3. 异步转同步之殇

这种做法绝对是Vert.x中的反人类的用法,你会明白丧钟就是为这种操作而鸣的!

  Vert.x是纯异步框架,在这种框架中,如果我们要做异步转同步的操作,其实应该考虑的是设计问题,而不是编程问题,因为这样的做法有可能会破坏它定下的黄金法则,而且我们需要使用Java的多线程编程。先看代码(很恶心,需要心灵强大):

package io.vertx.up._02.async;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.up.atom.Refer;
import io.vertx.up.fn.Fn;
import io.vertx.up.fn.JvmActuator;
import io.vertx.up.runtime.Runner;

import java.util.concurrent.CountDownLatch;

public class AsyncSync {

    public static void main(final String[] args) {
        final String callback = hiCallback();
        System.out.println(callback);
    }

    private static String hiCallback() {
        final Future<String> response = hiChoice("Lang");
        final CountDownLatch latch = new CountDownLatch(1);
        final Refer refer = new Refer();
        response.onComplete(res -> {
            if (res.succeeded()) {
                //
                // Java中的限制 line = res.result();
                refer.add(res.result());
            } else {
                //
                res.cause().printStackTrace();
            }
            latch.countDown();
        });
        // 手动阻塞,破坏黄金法则
        Fn.safeJvm((JvmActuator) latch::await);
        return refer.get();
    }

    static Future<String> hiChoice(final String name) {
        final Promise<String> promise = Promise.promise();
        Runner.run(() -> {
            // 当前 执行 不受影响
            System.out.println(Thread.currentThread().getName() + "," + name);
            Fn.safeJvm(() -> Thread.sleep(1000));
            if ("Lang".equals(name)) {
                promise.complete(name);
            } else {
                promise.fail(new RuntimeException("Exception, " + name));
            }
        }, "hiAsync");
        return promise.future();
    }
}

    上边代码执行后,您的确可以在返回值callback中拿到Lang这个值,但这段代码有很多缺陷,最大的缺陷就是通过多线程编程的方式破坏了Vert.x中定义的黄金法则,我们手工阻塞了主线程,并且让主线程等待结果直到有结果过后返回数据信息,这不是好做法。当然有可能是我没有拿捏到Vert.x中异步转同步的精髓,在我们的项目中,很多场景下,我们会采取全异步流设计,而不牵涉异步转同步的场景,反过来转换的场景倒是挺多,而且同步转异步仅仅需要一句Future.succeededFuture就可以完成,所以我并不推荐这样的做法。代码虽然实现了我们想要的结果,但劣质成为了它不可磨灭的污点

4. 总结

    本章主要学习了Vert.x中的其中两种异步编程模式:Callback和Future模式,并且对两种模式的不同应用场景进行了代码解析,希望读者对Vert.x的异步编程有一个初步了解,代码中部分内容是模拟操作,只是为了给读者一个结构,让它可以在生产环境中举一反三,用同样的结构来设计代码,劣质部分的演示代码希望读者心里有自己的一杆秤。

最后更新于