转载

了解Vert.x:事件总线

Vert.x基于轻量级actor,名为Verticles。

Verticle是一个独立的工作单元,可以独立扩展。

通常,actor模型要求actor具有称为“传入邮箱”的概念,该概念通常是队列(更具体地说是阻塞缓冲队列)。

因此,如果一个actor想要由另一个actor完成一些工作,它会将一条消息推送到其邮箱中。

这个Event Bus到底是什么?

我们不是让actor彼此意识到对方存在,而是介绍一个知道所有actor的中间人broker,所有actor都可以与broker沟通。此组件称为消息总线,或Vert.x中的事件总线.

这是Observer设计模式的实现,或者更确切地说,是PubSub风格的实现。

了解Vert.x:事件总线 如果您通过事件总线发送消息,并且没有订阅者,则该消息将永久丢失,因为事件总线不会以任何方式保留消息。

出于同样的原因,也无法向新订阅者重复旧消息。

编码器

关于Vert.x中事件总线的一个 常见误解 是,使用它时会产生开销,即使您的Verticle与通常一样驻留在同一个JVM中也是如此。

让我们把这个说法进行测试。

首先,我们将创建一个非常大的对象(大约36MB):

class BigSerializedObject{
    private String message;

    @Override
    public String toString() {
        return message;
    }

    public BigSerializedObject() {
        StringBuilder sb = new StringBuilder(UUID.randomUUID().toString());
        
        for (int i = 0; i < 20; i++) {
            sb.append(sb);
        }
        this.message = sb.toString();
    }
}

一个Verticle将实例化并通过Event Bus发送此对象:

class SenderVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx.eventBus().send("address", new BigSerializedObject());
    }
}

而另一个会收到它:

class ReceiverVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx.eventBus().consumer("address", (message) -> {
            System.out.println(message.body().toString().length());
        });
    }
}

我们试着运行:

 Vertx vertx = Vertx.vertx();

        vertx.deployVerticle(new ReceiverVerticle());
        vertx.deployVerticle(new SenderVerticle());

出错了:

java.lang.IllegalArgumentException: No message codec for type: class eventbus.BigSerializedObject

Vert.x中事件总线的工作方式是,它可以将消息传递到在不同JVM上运行并以不同语言编写的Verticle,只要它们都是同一Vert.x集群的一部分。

出于这个原因,它需要通过为它们指定编解码器来指定如何通过线路对对象进行编码和解码。

String 和JsonObject 有编解码器开箱即用。

这里我们创建自己的编解码器:

class BigSerializedObjectCodec implements MessageCodec<BigSerializedObject,
        BigSerializedObject> {
    @Override
    public void encodeToWire(Buffer buffer, BigSerializedObject o) {
        System.out.println("encodeToWire");
    }

    @Override
    public BigSerializedObject decodeFromWire(int pos, Buffer buffer) {
        System.out.println("decodeFromWire");
        return new BigSerializedObject();
    }

    @Override
    public BigSerializedObject transform(BigSerializedObject o) {
        System.out.println("transform");
        return o;
    }

    @Override
    public String name() {
        return "BrokenSerializedObjectCodec";
    }

    @Override
    public byte systemCodecID() {
        return -1;
    }
}

注册:

vertx.eventBus().registerDefaultCodec(BigSerializedObject.class,
                new BigSerializedObjectCodec());

现在我们可以看到结果:

To create message: 45ms
transform
To get message: 6ms

如您所见,在同一JVM内部进行通信时,对象将作为Verticle之间的内存引用传递。因此,对于这种情况,实际上没有开销。

结论

以下是几个要点:

  • Vert.x使用轻量级actor模型
  • Vert.x中的Actor称为Verticles
  • Verticle使用Event Bus进行通信
  • 事件总线中的消息不会保留
  • 使用事件总线传递具有相同JVM的消息没有任何开销
原文  https://www.jdon.com/50912
正文到此结束
Loading...