# 1.7.EventBus初探

    Event Bus的直译称为“事件总线”，提到它就会涉及到事件驱动模型，实际上Event Bus是Vert.x的核心[^1](https://vertx.io/docs/vertx-core/java/#event_bus)：

> The event bus is the nervous system of Vert.x.

    通常我们提到Vert.x时都会说它是事件驱动、无阻塞、纯异步化的，那么这里的事件驱动的中枢神经就是本文中的Event Bus，关于Event Bus的内容，读者可以先看看Reactor模式——不了解Reactor模式在未来很长一段时间设计Vert.x的系统都会遇到思路上的障碍，所以在理解Event Bus之前需要先理解Reactor模式。

* 在传统服务器中（如Tomcat/Jetty），服务器接收到一个客户端请求，这个请求将直接进入某个Servlet组件中，该线程不仅会接收此请求，同样会去处理这个请求，一旦此处发生阻塞操作（访问数据库、网络、文件系统），整个线程就必须等待执行结果，最终生成响应发送回客户端。
* 在Vert.x中（Netty服务器），服务器接收到一个客户端请求后，这个请求会直接由Standard类型的Verticle组件来接收，当然对于简单的请求，这个Verticle组件直接执行完成就可以生成响应了，这种模式等价于上述传统服务器执行请求的模式。但若遇到了复杂的请求如阻塞操作，该Verticle组件可以将请求封装成事件发送到Event Bus，由它另一头的Worker类型的Verticle组件消费该事件、执行该事件、生成响应发送回客户端。

***

\#「Reactor模式」

    Reactor模式[^2](https://github.com/silentbalanceyh/vertx-book/blob/master/chapter01/《Reactor（反应器）模式初探》，%3Chttps:/blog.csdn.net/pistolove/article/details/53152708%3E，作者：皮斯特劳沃/README.md)又称为Reactor设计模式，这个模式是从NIO中出来的，有时候直接翻译过来称为“反应器模式”，这种设计模式是为高并发处理量身打造的设计模式，而且是一种基于事件驱动模型的设计模式。本章我们就来说说它的前世今生。

### 传统模式

> \*\*「注」\*\*本章大部分内容源自于引用的文章，读者也可以直接点击进入去看一些代码层面的东西，本书只介绍这种模式的基础细节。

    读者先看看下边的图示：

![](/files/WnvF4HZvKPxOueVmEgvM)

    上图中的这种模式是传统的服务设计，每一个请求到来时，服务器会分配一个线程去处理，如果请求暴涨起来，那么意味着需要更多的线程来处理该请求。按照引用文章中的说明，一个请求处理的关注点主要包含以下：

* 请求读取——Read Request；
* 请求解码——Decode Request；
* 服务执行——Process Service；
* 编码响应——Encode / Reply；
* 发送答复——Send Reply；

    在真实的环境中，上边五个步骤每个步骤的执行过程效率不一样（开销不对等），而在传统服务设计中，这五个步骤是由同一个线程来执行（上图中的Handler），那么当并发数量比较高的时候，系统创建的线程数会线性递增。关键的问题是：资源浪费，这一点从何说起？比如一个线程在“服务执行”这个步骤，由于这个线程和“请求读取”是同一个线程，服务执行如果需要耗费很长时间，那么此时的线程是只能“等待”，什么事都不能做，这无疑是对系统资源的一种占用。

    在这种传统请求模型中，五个步骤的线程处理是一比一的，它的核心性能瓶颈在于线程池，线程池本身创建和销毁线程时候就存在一定的开销，再者线程池这种结构本来对于高并发的增长就不是一个良好的解决方案，若请求出现海量增长，线程池的工作线程数量一旦满载，那么剩余的请求全部会出现“等待”或者索性被“抛弃”。而根据上边的请求处理步骤，实际上这个请求处理完整过程是可以被拆分的，可以切割成一些小任务——若每个小任务都可以使用非阻塞的模式，然后基于异步回调模式，那么这种情况下，`1:1`的局面就被打破了，系统吞吐量会大大提高。

### Reactor模式

    有没有一种方式，可以让系统线程专注于某一块的任务，而实现软件设计中的"单一职责“的原则？最初的Reactor模式又称为单线程的Reactor，在这种模式下，服务端引入了一个新角色：Acceptor，它会不断去监听是否有客户端请求进来，一旦收到新的请求，它就会将这个请求发送给分发器，让它将请求分发修去执行。

![](/files/M9TiXcA97QOa0DkxfvqO)

    这种模式下是最早的Reactor，中间还没有引入事件总线的概念，只是加入了一些机制上的改动：

* 将传统的请求处理步骤细化，不同的步骤由不同的线程来处理；
* 添加了两个新角色：Acceptor和Reactor；
* Acceptor负责接收请求以及等待新的请求，一旦有请求过来，则直接将请求发送给Reactor；
* Reactor内部有一个新操作Dispatch，它专程负责请求的分发处理；

    从上边的图上可以知道，Dispatch和Vert.x中的Event Loop线程做的事情挺像的，实际上这个时候已经完成了事件循环的监听动作，并且完成了请求分发，分发过程会将请求丢给不同的线程去处理。在这种模式下，有一个小问题在于每个任务线程并没有区分阻塞/非阻塞的任务，虽然进行了基于职责的线程拆分，当客户端请求挂起在某个线程中时，还是需要客户端等待最终的执行结果，这种模式的异步化还没有实现事件驱动模型的全貌。

### 工作线程池

    随着Reactor模式的演进，任务线程池又进行了细分设计，为了复用工作线程（在上图中处理decode、compute、encode的专用线程），引入了工作线程池的概念，类似Vert.x中的Worker Pool。

![](/files/CtUto5MfGAgsAhqYeFq5)

    这种模型就是现在成熟的Reactor模式，实际上对于上图中，按照Vert.x的内容来解读的话，Reactor部分接收请求，就是Standard类型的Verticle组件干的事，而其他任务就是Event Bus事件总线之后Worker类型的Verticle组件干的事。工作线程池的引入彻底分离了阻塞/非阻塞的任务，由于工作线程一直是异步执行，所以真正在运行过程中，这种系统的吞吐量是完全可胜任高并发的，而这种模型就是我们通常讲的事件驱动模型。

    虽然图中看不到Event Bus的影子，但是读者可以开出自己的脑洞，用Verticle组件化的思维有下边的这种结构：

![](/files/S4lAHW24vYT6ykNQZA14)

    假设`read`工作和`send`工作由Standard类型的Verticle来完成，其他三种`encode, compute, decode`的工作由Worker类型的Verticle组件来完成，而这些线程各自独立，它们通信的基础就是Event Bus，而不是调用；也就是说当一个`read`操作完成后，它会将数据以消息【也就是事件驱动模型中的Event事件】的方式直接发送到Event Bus中，然后Worker类型的Verticle组件会直接消费该事件或者消息，然后执行，这种通信不限于Worker和Standard的通信，包括Worker和Worker，Standard和Standard都可以统一使用这种模式通信。

    最后，谈谈Reactor模式带来的优缺点，让读者对这种设计模式有更加清晰的认识：

### 优点

* 响应速度快，由于分离了`read`和后续操作，所以客户端不需要为单个同步操作被阻塞——但Reactor本身依旧是同步的；
* 编程更简单，开发人员不用去考虑复杂的多线程、数据同步问题，并且也避免掉线程切换的开销；
* 扩展性强，上图中只是提到了`read, decode, encode, compute, reply`五种最基本的操作，在真实场景中，这些事件类型可让开发人员自己扩展，比如验证、过滤、转换等，而在扩展过程只需要遵循Event Bus中的消息规范，切换掉消息地址，就可以直接将新组建编连到系统中了；
* 由于该模式本身与事件处理的逻辑无关，所以之中的每一个节点都是可复用的，只要你的粒度设计得适当。

### 缺点

* 由于粒度更加细致，将传统一个线程的事情交给了多个线程来完成，复杂度有了提高；
* 由于启用了纯异步的编程模型，增加了调试的复杂度，单纯依靠断点的方式调试可能不一定凑效（对于一些异步调试，作者推荐使用数据流监控分析的调试方法来实现）。
* 由于Reactor的设计模型还依赖于底层的同步事件多路分离器（Synchronous Event Demultiplexer）来实现，这一部分是操作系统、容器、或框架应该自带的，Vert.x就是Netty本身作为纯异步Java服务器带了该底层组件，若您的条件不满足自己去实现，将会遇到很大的难题。

    归根到底，这种模式在解决高并发请求服务的过程中具有很明显的性能优势，这也是为什么我推荐使用Vert.x，而不是Spring的主要原因，这是一种理论和编程思维的革新，而不单单是旧瓶装新酒的操作。换句话来说，实际上Spring中也可以自己引入第三方的事件驱动模型来实现这种非阻塞、异步化的请求流程操作，有兴趣的读者可以自己去钻研。

***

## 1. 关于阻塞

    上述流程中，第一种模式下的阻塞会成为一个常态，而第二种请求中也会有一部分任务是阻塞式的，好了，问题就来了：读者在前边的章节已经看到，既然Vert.x的官方提到的黄金法则，是不要阻塞Event Loop，那么遇到阻塞任务时，怎么解决这种需求？您若想要写一个程序不访问网络、数据库、文件系统几乎是不可能的事，一旦出现这种IO装置的操作，就意味着阻塞有可能开始暴走了——比如您读取了一个4G的大文件。

    Vert.x中提供了两种解决阻塞任务的方案，我称为”快速方案“和”标准方案“。

### 1.1. 快速方案

    快速方案的核心就是直接调用`executeBlocking`方法[^3](https://vertx.io/docs/vertx-core/java/#blocking_code)，前文中我们已经分析过这个方法的源代码了，再次提到这个方法是因为它提供了处理阻塞任务的绿色通道，这种方式很快速，只需要写上下边几行代码就足够了。

```java
vertx.executeBlocking(future -> {
    // Call some blocking API that takes a significant amount of time to return
    String result = someAPI.blockingMethod("hello");
    future.complete(result);
}, res -> {
    System.out.println("The result is: " + res.result());
});
```

    关于这个方法的阐述，读者可以直接参考官方文档，这里主要谈谈这部分内容的经验分享。zero的前身是我的硕士论文的主体部分，最初的系统名称叫做`vie`，它是一个纯数据驱动的RESTful API引擎，它的基本代码如下（有`vertx-web`部分剧透）：

```java
    @Override
    public void start() {
        Fn.safeVie(LOGGER, () -> {
            /** 0.Http Server **/
            final ConcurrentMap<Integer, HttpServerOptions> configMap = INTAKER.ingest();
            final HttpServerOptions options = configMap.get(Integer.parseInt(VertxGrid.getPortEndpoint()));
            final HttpServer server = this.vertx.createHttpServer(options);
            /** 1.读取Router引用，处理Block Thread的问题 **/
            this.vertx.<Router>executeBlocking(future -> {
                /** 2.初始化Router，使用Kinetic初始化维持唯一Router **/
                final Router router = Router.router(this.vertx);
                /** 3.初始化标准Router，注入方式操作 **/
                final RouterHuber routerHuber =
                        Fn.pool(ROUTERS, RouterHuber.class,
                                () -> Instance.instance(RouterHubor.class));
                routerHuber.immitRouter(router);
                /** 4.添加引用池 **/
                // this.kinetic.addReference(Thread.currentThread().getName(), router);
                /** 5.完成初始化 **/
                future.complete(router);
            }, result -> {
                if (result.succeeded()) {
                    final Router router = result.result();
                    /** 6.Api Default 配置 **/
                    RouterKit.injectRouter(router);
                    /** 7.Server监听 **/
                    server.requestHandler(router::accept).listen();
                    // 必须使用该变量控制日志仅写一次，而且RMI的内容也只写一次
                    if (Constants.ONE == FLAG.getAndIncrement()) {
                        /** 9.成功写入过后输出最终日志 **/
                        final String pubApi = INCEPTOR.getString(Point.Web.Api.PUBLIC);
                        LOGGER.info(Info.VX_API, getClass().getSimpleName(), options.getHost(),
                                String.valueOf(options.getPort()), pubApi);
                        LOGGER.info(Info.VX_SERVER, getClass().getSimpleName());
                    }
                }
            });
        });
    }
```

    上述代码是路由管理器RouterAgent中的核心代码，其中`executeBlocking`方法体的内容如下：

```java
/** 2.初始化Router，使用Kinetic初始化维持唯一Router **/
final Router router = Router.router(this.vertx);
/** 3.初始化标准Router，注入方式操作 **/
final RouterHuber routerHuber =
    Fn.pool(ROUTERS, RouterHuber.class, () -> Instance.instance(RouterHubor.class));
routerHuber.immitRouter(router);
```

    这里的`immitRouter`方法会访问ZooKeeper配置中心去初始化该系统中的`Router`实例，并且完成RESTful API的注入工作，这个过程中ZooKeeper配置中心的访问工作是”阻塞“方式。最开始这段代码没有写在`executeBlocking`的代码块中，所以在很长一段时间，启动该Verticle组件时遇到了`Block`的异常，后来修改成上述代码后，整个过程就正常了。——这也是我第一次赤裸裸地挑战了Vert.x框架中的黄金法则：还是那句话，不要阻塞Event Loop。

### 1.2. 标准方案

    有了上述的快速方案，为什么还需要标准方案呢？原因在下边这句话：

> Worker verticles are designed for calling blocking code, as they won’t block any event loops.

    是的，标准方案就是启用Event Bus，然后开启Vert.x中的Worker类型的Verticle组件，这样的方式就实现了完整的事件驱动模型的系统设计原型——后来几乎所有RESTful类型的API都是使用这种方式来设计的，如果读者习惯后会觉得这种方式的可扩展性非常好，那么这里用另外一个例子来看看这种方案的具体代码应该如何写。

#### Acceptor——AcceptorVerticle

```java
package io.vertx.up._01.verticles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;

public class AcceptorVerticle extends AbstractVerticle {

    @Override
    public void start() {

        final HttpServer server = this.vertx.createHttpServer();
        System.out.println(Thread.currentThread().getName() + ", Start Acceptor...");
        server.requestHandler(request -> {
            // 调用Event Bus
            final EventBus event = this.vertx.eventBus();
            System.out.println(Thread.currentThread().getName() + ", Accept Request...");
            // 发送消息
            event.<JsonObject>send("MSG://EVENT/BUS",
                    new JsonObject().put("message", "Event Communication"),
                    reply -> {
                        if (reply.succeeded()) {
                            // 发送回客户端
                            System.out.println(Thread.currentThread().getName() + ", Reply Message...");
                            System.out.println(" Message: " + reply.result().body());
                            request.response().end(reply.result().body().encode());
                        }
                    });
        });

        server.listen(8099);
    }
}
```

#### Worker——WorkerVerticle

```java
package io.vertx.up._01.verticles;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;

public class WorkerVerticle extends AbstractVerticle {

    @Override
    public void start() {
        System.out.println(Thread.currentThread().getName() + ", Start Worker...");
        final EventBus event = this.vertx.eventBus();
        // 接收消息
        event.<JsonObject>consumer("MSG://EVENT/BUS", reply -> {
            System.out.println(Thread.currentThread().getName() + ", Consume Message...");
            // 提取接收消息
            final JsonObject message = reply.body();
            System.out.println(" Message: " + message.encode());
            // 回复消息
            reply.reply(new JsonObject().put("worker", "Worker Message"));
        });
    }
}
```

#### Launcher

```java
package io.vertx.up._01.event;

import io.vertx.core.DeploymentOptions;
import io.vertx.up._01.lanucher.ClusterLauncher;
import io.vertx.up._01.lanucher.Launcher;
import io.vertx.up._01.lanucher.SingleLauncher;

public class EventLauncher {
    public static void main(final String[] args) {
        // 哪种模式？
        final boolean isClustered = false;
        final Launcher launcher = isClustered ? new ClusterLauncher() :
                new SingleLauncher();
        System.out.println(Thread.currentThread().getName() + ","
                + Thread.currentThread().getId());
        launcher.start(vertx -> {
            // 发布Standard
            vertx.deployVerticle("io.vertx.up._01.verticles.AcceptorVerticle",
                    new DeploymentOptions().setInstances(4));
            // 发布Worker
            vertx.deployVerticle("io.vertx.up._01.verticles.WorkerVerticle",
                    new DeploymentOptions().setWorker(true).setInstances(16));
        });
    }
}
```

    上边的三段代码，基本上把标准方案的数据流程解释清楚了。

## 2. 再谈生命周期

    运行第一个章节中的代码，在Launcher启动过程中您将会看到如下的输出：

```shell
main,1
vert.x-worker-thread-0, Start Worker...
vert.x-worker-thread-1, Start Worker...
vert.x-worker-thread-2, Start Worker...
vert.x-worker-thread-3, Start Worker...
vert.x-worker-thread-4, Start Worker...
vert.x-worker-thread-5, Start Worker...
vert.x-worker-thread-6, Start Worker...
vert.x-worker-thread-7, Start Worker...
vert.x-worker-thread-8, Start Worker...
vert.x-worker-thread-9, Start Worker...
vert.x-worker-thread-10, Start Worker...
vert.x-worker-thread-11, Start Worker...
vert.x-worker-thread-12, Start Worker...
vert.x-worker-thread-13, Start Worker...
vert.x-worker-thread-14, Start Worker...
vert.x-worker-thread-15, Start Worker...
vert.x-eventloop-thread-3, Start Acceptor...
vert.x-eventloop-thread-2, Start Acceptor...
vert.x-eventloop-thread-0, Start Acceptor...
vert.x-eventloop-thread-1, Start Acceptor...
```

    在解析这个过程之前，我们先等等，您可以在浏览器中打开链接地址：<http://localhost:8099/>，您将会看到下边的输出：

```shell
vert.x-eventloop-thread-2, Accept Request...
vert.x-worker-thread-16, Consume Message...
 Message: {"message":"Event Communication"}
vert.x-eventloop-thread-2, Reply Message...
 Message: {"worker":"Worker Message"}

vert.x-eventloop-thread-2, Accept Request...
vert.x-worker-thread-17, Consume Message...
 Message: {"message":"Event Communication"}
vert.x-eventloop-thread-2, Reply Message...
 Message: {"worker":"Worker Message"}
```

> 不要奇怪，您的确看到了两次请求的输出，这不是Vert.x的问题，而是Chrome浏览器导致的结果，由于我们在这个地方只监听了端口，没有设置路径，所以Chrome在请求过程中它会发送两次请求，分别是： <http://localhost:8099/> <http://localhost:8099/favicon.ico> 所以您看到上边的输出一点都不奇怪，如果在后续的`vertx-web`中加入了路由匹配，那么就只有一次请求的输出了。

    我不止一次在以前的文章中提到过Verticle在`start`方法中有两个细粒度的生命周期：**启动周期**和**请求周期**——很多初学者由于对函数式编程不是很熟悉，不太理解lambda部分的代码究竟在什么时候执行，我想这个例子很容易说明一切。第一段输出就属于启动周期的代码部分，而第二段输出就属于请求周期部分，当然后边我们还会谈到这一点，只是刚好有这样一个例子来说明，那么也希望读者尽可能去理解这两个生命周期。

## 3. 开发关注点

    同样的，在上边的例子中，您可以看到Event Bus如何在Acceptor和Worker之间通信。这里简单谈几点：

* 发送方调用的方法为`send`，消费方调用的方法为`consumer`，由于请求是从Acceptor到Worker，所以这时候Acceptor是发送方，Worker是消费方。
* Worker在执行完成过后，一定要调用`reply`方法，否则这个过程会导致`Timeout`的异常（因为发送者不会接收到对应的响应信息）。
* 二者通信的地址需要保持一致：上边例子中都是`MSG://EVENT/BUS`，这个概念对应到官方文档理论部分中的`Addressing`的概念，而在Vert.x中这个地址就是一个单纯的字符串——这也造成了在使用Event Bus过程中的架构设计的难度。
* 其次有一点就是关于数据类型，也就是官方提到的`Message Types`的内容，Vert.x中原生支持`JsonObject`，`Buffer`，以及`String`这种基础类型，它并不支持复杂的数据结构，如果您有自己的数据类型要在`Event Bus`中传输，怎么办？这种情况就需要您开发`Codec`消息编解码器来实现。

    这里看看上边代码的执行流程图：

![](/files/sALJ0oSCW5rpdlKae9e9)

    结合上图，是不是就更容易理解前文中的代码交互信息了？在上图中，从输出可以知道：Acceptor使用的线程名为：`vert.x-eventloop-thread-2`，而Worker在两次请求中使用了不同的线程：`vert.x-worker-thread-11`和`vert.x-worker-thread-15`，也就是说，在Vert.x中，它的Event Loop判断同一请求是基于会话的，这时候由于会话没有改变，所以这里的Standard的Verticle组件接收请求的是同一个线程，Worker则不然，两次使用的Worker线程不是同一个，这一点也印证了`Reactor`模式中涉及到的非`1:1`的线程对等问题（这个例子中发布了4个Acceptor，却发布了16个工作线程Worker）。

    在zero中可能有一个读者容易混淆的地方，就是关于命名，通常一个Standard的Verticle组件用来接收请求时，我在定义时会给一个更符合它意义的名字，比如Router Agent，而后端的Worker类型的Verticle组件通常称为Worker。但是在zero中出现了另外的两个名字：Sender和Consumer，实际上这两个组件就是Agent和Worker中真正执行代码的Handler处理器线程的名称，如果读者熟悉了基于Event Bus的请求架构过后，理解这几个概念就不难了。它们之间可能存在如下关系：

* 一类（请原谅我用一类）Agent组件中可能不止一种Sender，因为上边发送消息给Event Bus的代码完全有可能是几次而不是一次，也可能发送的时候将不同的消息发送给不同的地址来处理；
* 一类Worker组件中也可能不止一种Consumer，道理同上。

## 4.总结

    好吧，天下无不散之筵席，这个时候，EventBus的初探过程算是告一段落，至少读者应该从上边的例子知道在一个纯的Vert.x中（不包含`vertx-web`）应该如何调用Event Bus，后边的章节我们会谈到应用级别，那个时候我们再回过头来讨论Event Bus的使用场景以及真正起到的异步事件的作用。同样这里只是针对Web应用中最常见的**Request-Response**的模式提供了相关代码，还没涉及到**Point-To-Point**和**Publish-Subscribe**这两种模式，毕竟不同的模式它的场景会有所区别。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://lang-yu.gitbook.io/vert.x/01-index/01-7-eventbus-primary.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
