// 任务队列
private BlockingQueue<Task> queue = new LinkedBlockingQueue<>(2000);
// 启动5个消费者线程,执行批量任务
public void start() {
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
pool.execute(() -> {
try {
while (true) {
// 获取批量任务
List<Task> tasks = pollTasks();
// 执行批量任务
execTasks(tasks);
}
} catch (InterruptedException ignored) {
}
});
}
}
// 从任务队列中获取批量任务
private List<Task> pollTasks() throws InterruptedException {
List<Task> tasks = new LinkedList<>();
// 阻塞式获取一个任务
// 首先采用阻塞式的方式,如果任务队列中没有任务,能够避免无谓的循环
Task task = queue.take();
while (task != null) {
tasks.add(task);
// 非阻塞式获取一个任务
task = queue.poll();
}
return tasks;
}
// 批量执行任务
private void execTasks(List<Task> tasks) {
}
public class Logger {
// 批量异步刷新的数量
private static final int FLUSH_BATCH_SIZE = 500;
// 任务队列
private final BlockingQueue<LogMsg> queue = new LinkedBlockingQueue<>();
// 只需要一个线程写日志
private ExecutorService pool = Executors.newFixedThreadPool(1);
// 启动写日志线程
public void start() throws IOException {
File file = File.createTempFile("test", ".log");
FileWriter writer = new FileWriter(file);
pool.execute(() -> {
// 未刷盘日志数量
int curIdx = 0;
long preFlushTime = System.currentTimeMillis();
while (true) {
try {
LogMsg logMsg = queue.poll(5, TimeUnit.SECONDS);
// 写日志
if (logMsg != null) {
writer.write(logMsg.toString());
++curIdx;
}
// 如果不存在未刷盘数据,则无需刷盘
if (curIdx <= 0) {
continue;
}
// 异步刷盘规则
if (logMsg != null && logMsg.getLevel() == LEVEL.ERROR ||
curIdx == FLUSH_BATCH_SIZE ||
System.currentTimeMillis() - preFlushTime > 5_000) {
writer.flush();
curIdx = 0;
preFlushTime = System.currentTimeMillis();
}
} catch (InterruptedException | IOException ignored) {
} finally {
try {
writer.flush();
writer.close();
} catch (IOException ignored) {
}
}
}
});
}
private void info(@NonNull String msg) throws InterruptedException {
queue.put(new LogMsg(LEVEL.INFO, msg));
}
private void error(@NonNull String msg) throws InterruptedException {
queue.put(new LogMsg(LEVEL.ERROR, msg));
}
}
@Data
@AllArgsConstructor
class LogMsg {
private LEVEL level;
private String msg;
}
enum LEVEL {
INFO, ERROR
}
转载请注明出处:http://zhongmingmao.me/2019/05/26/java-concurrent-producer-consumer/
访问原文「 Java并发 -- 生产者-消费者模式 」获取最佳阅读体验并参与讨论