实际中可能有这样的应用场景,得到一个记录不需要立即去处理它,而是等累积到一定数量时再批量处理它们。我们可以用一个计数器,来一个加一个,量大时一块处理,然后又重零开始计数。如果记录的来源单一还好办,要是有多个数据源来提供记录就会有多线程环境下数据丢失的问题。
这里我编写了一个最简单的任务批处理的队列,构造了告诉它批处理数量,消费者,然后就只管往队列里添加记录,队列在满足条件时自动进行批处理。因为内部使用的是 BlockingQuque 来存储记录,所以多线程往里同时添加记录也没关系,最后的未达到 batchSize , 的那些记录需要主动调用 done() 函数来触发批处理,并且结束队列内的循环线程,从而终止当前应用。
注意: 多线程环境下往一个无线程保护的集合或结构中,如 ArrayList, LinkedList, HashMap, StringBuilder 中添加记录非常容易造成数据的丢失,而往有线程保护的目的地写东西就安全了,如 Vector, Hashtable, StringBuffer, BlockingQueue。当然性能上要付出一点代价,不过对于使用了可重入锁(ReentrantLock), 而非同步锁(synchronized) 的数据结构还是可以放心使用的。
下面是 BatchQueue 的简单实现
package cc.unmi;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
public class BatchQueue<T> {
private final int batchSize;
private final Consumer<List<T>> consumer;
private AtomicBoolean inUse = new AtomicBoolean(true);
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
public BatchQueue(int batchSize, Consumer<List<T>> consumer) {
this.batchSize = batchSize;
this.consumer = consumer;
startLoop();
}
public boolean add(T t) {
if(!inUse.get()) {
throw new RuntimeException("This queue is aready shutdown");
}
return queue.add(t);
}
public void shutdown() {
inUse.set(false);
}
private void startLoop() {
new Thread(() -> {
while(inUse.get()) {
if(queue.size() >= batchSize) {
drainToConsume();
}
}
drainToConsume();
}).start();
}
private void drainToConsume() {
List<T> drained = new ArrayList<>();
queue.drainTo(drained, batchSize);
consumer.accept(drained);
}
}
客户端 Client 的使用代码如下:
package cc.unmi;
import java.util.Scanner;
public class Client {
public static void main(String[] args) {
BatchQueue<String> batchQueue = new BatchQueue<>(3, System.out::println);
while (true) {
String line = new Scanner(System.in).nextLine();
if (line.equals("done")) {
batchQueue.shutdown();
break;
}
batchQueue.add(line);
}
}
}
运行效果
调用 shutdown() 方法时把队列中剩下的不足数额的记录也处理掉,并且结果内部循环,才能终止当前应用。队列 shutdown 之后将不可再使用了。
如果每次批处理任务要在新线程里执行,那么只要在提供的 Consumer 中开新线程或提交任务到线程池就行了。
更实际的应用中,可能不易找到时机去主动调用队列的 shutdown() 方法,可能就需要一个计时器来判断。比如进程一直在执行,没有新的记录进来,队列中有未到 batchSize 的记录,总不能让这个队列中的记录一直等几天吧。这时候就需要一个计时器,即使数量不够,但时间到了照样要处理。计时器需在添加第一条记录时启动,并在每次批处理进行后复位。