转载

使用Kafka和ZeroMQ实现分布式QuasarActor

Quasar 是一个类似Scala的Akka的Java Actor模型实现框架,它使用 fiber(纤程)绿色线程实现类似Erlang的Actor模型,本站介绍结合Apache Kafka和ZeroMQ实现Quasar的分布式Actor模型。

当然, Galaxy 也是一种非常好的选择,它是一个快速in-memory数据网格,专门为数据本地化复制,可选的持久和分布式actor注册,甚至能够在节点服务器之间迁移actor。

Kafka

Apache Kafka是目前流行的提供事件日志流的项目,它的API包括两种生产者:同步和异步,消费者只有一种:同步。一个Kafka生产者处理是线程安全的,易于使用。

Comsat项目 包括一个纤程友好fiber-friendly的  Kafka producer 集成, .我们使用它在actor内实现kafka的生产者,我们案例是演示从生产者发送几千个序列化的消息到消费者。

生产者代码如下:

final Properties producerConfig = new Properties();

producerConfig.put("bootstrap.servers", "localhost:9092");

producerConfig.put("client.id", "DemoProducer");

producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");

producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {

final byte[] myBytes = getMyBytes(); // ...

final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes));

res.get(); // 可选,堵塞绿色线程直到记录被持久化,也可以 `producer.flush()`

}

我们使用Comsat的FiberKafkaProducer 包装了KafkaProducer 对象,这是为了获得fiber-blocking的future。

Kafka的消费处理不是线程安全的,且只有线程堵塞的同步方式:

final Properties producerConfig = new Properties();

consumerConfig = new Properties();

consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);

consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");

consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");

consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {

consumer.subscribe(Collections.singletonList(TOPIC));

final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);

for (final ConsumerRecord<Integer, byte[]> record : records) {

final byte[] v = record.value();

useMyBytes(v); // ...

}

}

因为我们并不想堵塞fiber的底层线程池,这里就不能使用FiberAsync.runBlocking来将数据喂给固定大小的线程池了,而是在我们的actor的doRun中,我们使用一个异步任务,它会堵塞fiber会等到poll(会在指定池中执行)返回:

final ExecutorService e = Executors.newFixedThreadPool(2);

try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {

consumer.subscribe(Collections.singletonList(TOPIC));

final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));

for (final ConsumerRecord<Integer, byte[]> record : records) {

final byte[] v = record.value();

useMyBytes(v); // ...

}

}

这里的call是一个工具方法,如下定义,如果没有 这个Java编译bug 是不必要的。

@Suspendable

public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {

try {

return runBlocking(es, (CheckedCallable<V, Exception>) c::call);

} catch (final InterruptedException | SuspendExecution e) {

throw e;

} catch (final Exception e) {

throw new RuntimeException(e);

}

}

完整案例代码见: complete example ,这是一个从生产者发送几千个序列化的消息到消费者。

ØMQ

ØMQ (或 ZeroMQ)是一个非集中的broker解决方案,有各种socket适合不同通讯模式(请求/应答或pub/sub发布/订阅),在我们的案例中我们使用最简单的请求应答模式,也是演示从生产者发送消息到消费者,生产者代码如下:

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);

final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {

trgt.connect("tcp://localhost:8000");

final byte[] myBytes = getMyBytes(); // ...

trgt.send(baos.toByteArray(), 0 /* flags */)

trgt.recv(); // Reply, e.g. ACK

}

这里context实际作为一个socket工厂,其中有I/O线程数量作为context参数,这是因为ZeroMQ的socket不是基于connection-bound OS处理,而是一个简单前端,能够进行连接重试处理,或多个连接处理,或有效率的并发I/O和提供队列。者就是为什么send方法调用几乎从来不会堵塞,而recv方法不是基于连接的I/O调用,而是一个基于你的线程和指定I/O任务之间的同步调用,所谓指定I/O任务是指从一个或多个连接中接受进来的字节数据。

在Actor我们就不使用堵塞fiber的方式了,因为这里send不会堵塞,因此使用FiberAsync.runBlocking堵塞住read调用。

因此改写上面生产者代码如下,这是Actor的代码:

final ExecutorService ep = Executors.newFixedThreadPool(2);

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);

final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {

exec(e, () -> trgt.connect("tcp://localhost:8000"));

final byte[] myBytes = getMyBytes(); // ...

call(e, trgt.send(myBytes, 0 /* flags */));

call(e, trgt::recv); // Reply, e.g. ACK

}

下面是消费者代码:

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);

final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {

exec(e, () -> src.bind("tcp://*:8000"));

final byte[] v = call(e, src::recv);

exec(e, () -> src.send("ACK"));

useMyBytes(v); // ...

}

这里exec是一个工具函数,类似之前的call,代码如下:

@Suspendable

public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {

try {

runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });

} catch (final InterruptedException | SuspendExecution e) {

throw e;

} catch (final Exception e) {

throw new RuntimeException(e);

}

}

完整代码见: 这里example

更深入应用见: Distributed Quasar Actors with Kafka and ZeroMQ

源码项目: on GitHub

Quasar与Akka比较

Quasar专题

Actor专题

Reactive专题

原文  http://www.jdon.com/artichect/distributed-quasar-actors-kafka-zeromq.html
正文到此结束
Loading...