摘要: 原创出处 http://www.iocoder.cn/Eureka/batch-tasks/ 「芋道源码」欢迎转载,保留摘要,谢谢!
关注**微信公众号:【芋道源码】**有福利:
本文主要分享 任务批处理 。Eureka-Server 集群通过任务批处理同步应用实例注册实例,所以本文也是为 Eureka-Server 集群同步的分享做铺垫。
本文涉及类在 com.netflix.eureka.util.batcher 包下,涉及到主体类的类图如下(打开大图 ):
任务执行的整体流程如下(打开大图 ):
细箭头 —— 任务执行经历的操作
粗箭头 —— 任务队列流转的方向
不同于一般情况下,任务提交了 立即 同步或异步执行,任务的执行拆分了 三层队列 :
第一层,接收队列( acceptorQueue ),重新处理队列( reprocessQueue )。
第二层,待执行队列( processingOrder )
第三层,工作队列( workQueue )
maxBatchingSize )将任务合并成 批量任务 ,调度( 提交 )到工作队列。 com.netflix.eureka.util.batcher.TaskProcessor ,任务处理器 接口 。接口代码如下:
public interface TaskProcessor<T>{
/**
* A processed task/task list ends up in one of the following states:
* <ul>
* <li>{@code Success} processing finished successfully</li>
* <li>{@code TransientError} processing failed, but shall be retried later</li>
* <li>{@code PermanentError} processing failed, and is non recoverable</li>
* </ul>
*/
enum ProcessingResult {
/**
* 成功
*/
Success,
/**
* 拥挤错误
*/
Congestion,
/**
* 瞬时错误
*/
TransientError,
/**
* 永久错误
*/
PermanentError
}
/**
* 处理单任务
* In non-batched mode a single task is processed at a time.
*/
ProcessingResult process(T task);
/**
* 处理批量任务
*
* For batched mode a collection of tasks is run at a time. The result is provided for the aggregated result,
* and all tasks are handled in the same way according to what is returned (for example are rescheduled, if the
* error is transient).
*/
ProcessingResult process(List<T> tasks);
}
Success ,成功。 Congestion ,拥挤错误, 任务将会被重试 。例如,请求被限流。 TransientError ,瞬时错误, 任务将会被重试 。例如,网络请求超时。 PermanentError ,永久错误, 任务将会被丢弃 。例如,执行时发生程序异常。 #process(task) 方法,处理单任务。 #process(tasks) 方法,处理批量任务。 com.netflix.eureka.util.batcher.TaskDispatcher ,任务分发器 接口 。接口代码如下:
public interface TaskDispatcher<ID, T>{
void process(ID id, T task, long expiryTime);
void shutdown();
}
#process(...) 方法,提交任务编号,任务,任务过期时间给任务分发器处理。 com.netflix.eureka.util.batcher.TaskDispatchers ,任务分发器 工厂类 ,用于创建任务分发器。其内部提供两种任务分发器的实现:
com.netflix.eureka.cluster.ReplicationTaskProcessor ,实现 TaskDispatcher ,Eureka-Server 集群任务处理器。感兴趣的同学,可以点击 链接 自己研究,我们将在TODO[0021]:集群同步 详细解析。
调用 TaskDispatchers#createBatchingTaskDispatcher(...) 方法,创建 批量任务 执行的分发器,实现代码如下:
// TaskDispatchers.java
1: /**
2: * 创建批量任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workloadSize 单个批量任务包含任务最大数量
7: * @param workerCount 任务执行器工作线程数
8: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
9: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
10: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
11: * @param taskProcessor 任务处理器
12: * @param <ID> 任务编号泛型
13: * @param <T> 任务泛型
14: * @return 批量任务执行的分发器
15: */
16: public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
17: int maxBufferSize,
18: int workloadSize,
19: int workerCount,
20: long maxBatchingDelay,
21: long congestionRetryDelayMs,
22: long networkFailureRetryMs,
23: TaskProcessor<T> taskProcessor){
24: // 创建 任务接收执行器
25: final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
26: id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
27: );
28: // 创建 批量任务执行器
29: final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
30: // 创建 批量任务分发器
31: return new TaskDispatcher<ID, T>() {
32: @Override
33: public void process(ID id, T task, long expiryTime){
34: acceptorExecutor.process(id, task, expiryTime);
35: }
36:
37: @Override
38: public void shutdown(){
39: acceptorExecutor.shutdown();
40: taskExecutor.shutdown();
41: }
42: };
43: }
workloadSize 参数,单个批量任务包含任务最大数量。 taskProcessor 参数, 自定义任务执行器实现 。 #process() 方法的实现,调用 AcceptorExecutor#process(...) 方法,提交 [ 任务编号 , 任务 , 任务过期时间 ] 给任务分发器处理。 调用 TaskDispatchers#createNonBatchingTaskDispatcher(...) 方法,创建 单任务 执行的分发器,实现代码如下:
1: /**
2: * 创建单任务执行的分发器
3: *
4: * @param id 任务执行器编号
5: * @param maxBufferSize 待执行队列最大数量
6: * @param workerCount 任务执行器工作线程数
7: * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
8: * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
9: * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
10: * @param taskProcessor 任务处理器
11: * @param <ID> 任务编号泛型
12: * @param <T> 任务泛型
13: * @return 单任务执行的分发器
14: */
15: public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
16: int maxBufferSize,
17: int workerCount,
18: long maxBatchingDelay,
19: long congestionRetryDelayMs,
20: long networkFailureRetryMs,
21: TaskProcessor<T> taskProcessor){
22: // 创建 任务接收执行器
23: final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
24: id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
25: );
26: final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
27: return new TaskDispatcher<ID, T>() {
28: @Override
29: public void process(ID id, T task, long expiryTime){
30: acceptorExecutor.process(id, task, expiryTime);
31: }
32:
33: @Override
34: public void shutdown(){
35: acceptorExecutor.shutdown();
36: taskExecutor.shutdown();
37: }
38: };
39: }
workloadSize 参数
#createBatchingTaskDispatcher(...) 少这个参数。 在第 24 行,你会发现该参数传递给 AcceptorExecutor 使用 1 噢 。 taskProcessor 参数, 自定义任务执行器实现 。 #createBatchingTaskDispatcher(...) 只差 workloadSize = 1 参数。在详细解析。 #createBatchingTaskDispatcher(...) 差别很大 。「6.2 创建单任务执行器」详细解析。 #createBatchingTaskDispatcher(...) 一样。 com.netflix.eureka.util.batcher.AcceptorExecutor ,任务接收执行器。创建构造方法代码如下:
1: class AcceptorExecutor<ID, T>{
2:
3: private static final Logger logger = LoggerFactory.getLogger(AcceptorExecutor.class);
4:
5: /**
6: * 待执行队列最大数量
7: * {@link #processingOrder}
8: */
9: private final int maxBufferSize;
10: /**
11: * 单个批量任务包含任务最大数量
12: */
13: private final int maxBatchingSize;
14: /**
15: * 批量任务等待最大延迟时长,单位:毫秒
16: */
17: private final long maxBatchingDelay;
18:
19: /**
20: * 是否关闭
21: */
22: private final AtomicBoolean isShutdown = new AtomicBoolean(false);
23: /**
24: * 接收任务队列
25: */
26: private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
27: /**
28: * 重新执行任务队列
29: */
30: private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
31: /**
32: * 接收任务线程
33: */
34: private final Thread acceptorThread;
35:
36: /**
37: * 待执行任务映射
38: */
39: private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
40: /**
41: * 待执行队列
42: */
43: private final Deque<ID> processingOrder = new LinkedList<>();
44:
45: /**
46: * 单任务工作请求信号量
47: */
48: private final Semaphore singleItemWorkRequests = new Semaphore(0);
49: /**
50: * 单任务工作队列
51: */
52: private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
53:
54: /**
55: * 批量任务工作请求信号量
56: */
57: private final Semaphore batchWorkRequests = new Semaphore(0);
58: /**
59: * 批量任务工作队列
60: */
61: private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
62:
63: /**
64: * 网络通信整形器
65: */
66: private final TrafficShaper trafficShaper;
67:
68: AcceptorExecutor(String id,
69: int maxBufferSize,
70: int maxBatchingSize,
71: long maxBatchingDelay,
72: long congestionRetryDelayMs,
73: long networkFailureRetryMs) {
74: this.maxBufferSize = maxBufferSize;
75: this.maxBatchingSize = maxBatchingSize;
76: this.maxBatchingDelay = maxBatchingDelay;
77:
78: // 创建 网络通信整形器
79: this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
80:
81: // 创建 接收任务线程
82: ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
83: this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
84: this.acceptorThread.setDaemon(true);
85: this.acceptorThread.start();
86:
87: // TODO (省略代码)芋艿:监控相关,暂时无视
88: }
89: }
singleItemWorkQueue ),又存在批量任务工作队列( batchWorkQueue ) ,在「9. 任务接收线程【调度任务】」会解答这个疑惑。 com.netflix.eureka.util.batcher.TaskExecutors ,任务执行器。 其内部提供创建单任务和批量任务执行器的两种方法 。TaskExecutors 构造方法如下:
class TaskExecutors<ID, T>{
private static final Logger logger = LoggerFactory.getLogger(TaskExecutors.class);
/**
* 是否关闭
*/
private final AtomicBoolean isShutdown;
/**
* 工作线程池
*/
private final List<Thread> workerThreads;
TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
this.isShutdown = isShutdown;
this.workerThreads = new ArrayList<>();
// 创建 工作线程池
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
for (int i = 0; i < workerCount; i++) {
WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
workerThreads.add(workerThread);
workerThread.setDaemon(true);
workerThread.start();
}
}
/**
* 创建工作线程工厂
*
* @param <ID> 任务编号泛型
* @param <T> 批量任务执行器
*/
interface WorkerRunnableFactory<ID, T>{
WorkerRunnable<ID, T> create(int idx);
}
}
workerThreads 属性,工作线程 池 。 工作任务队列会被工作线程池并发拉取,并发执行 。 com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory ,创建工作线程工厂 接口 。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建。 调用 TaskExecutors#batchExecutors(...) 方法,创建批量任务执行器。实现代码如下:
/**
* 创建批量任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 批量任务执行器
*/
static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name,
int workerCount,
final TaskProcessor<T> processor,
final AcceptorExecutor<ID, T> acceptorExecutor){
final AtomicBoolean isShutdown = new AtomicBoolean();
final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
// 创建批量任务执行器
return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() { // 批量任务工作线程工厂
@Override
public WorkerRunnable<ID, T> create(int idx){
return new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx /* 线程名 */, isShutdown, metrics, processor, acceptorExecutor);
}
}, workerCount, isShutdown);
}
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable ,批量任务工作线程。 调用 TaskExecutors#singleItemExecutors(...) 方法,创建批量任务执行器。实现代码如下:
/**
* 创建单任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 单任务执行器
*/
static <ID, T> TaskExecutors<ID, T> singleItemExecutors(final String name,
int workerCount,
final TaskProcessor<T> processor,
final AcceptorExecutor<ID, T> acceptorExecutor){
final AtomicBoolean isShutdown = new AtomicBoolean();
final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
// 创建单任务执行器
return new TaskExecutors<>(new WorkerRunnableFactory<ID, T>() { // 单任务工作线程工厂
@Override
public WorkerRunnable<ID, T> create(int idx){
return new SingleTaskWorkerRunnable<>("TaskNonBatchingWorker-" + name + '-' + idx /* 线程名 */, isShutdown, metrics, processor, acceptorExecutor);
}
}, workerCount, isShutdown);
}
com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable ,单任务工作线程。 com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable ,任务工作线程抽象类。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都实现该类,差异在 #run() 的自定义实现。WorkerRunnable 实现代码如下:
abstract static class WorkerRunnable<ID, T> implements Runnable{
/**
* 线程名
*/
final String workerName;
/**
* 是否关闭
*/
final AtomicBoolean isShutdown;
final TaskExecutorMetrics metrics;
/**
* 任务处理器
*/
final TaskProcessor<T> processor;
/**
* 任务接收执行器
*/
final AcceptorExecutor<ID, T> taskDispatcher;
// ... 省略构造方法和 getting 方法。
}
com.netflix.eureka.util.batcher.TrafficShaper ,网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。TrafficShaper 实现代码如下:
class TrafficShaper{
/**
* Upper bound on delay provided by configuration.
*/
private static final long MAX_DELAY = 30 * 1000;
/**
* 请求限流延迟重试时间,单位:毫秒
*/
private final long congestionRetryDelayMs;
/**
* 网络失败延迟重试时长,单位:毫秒
*/
private final long networkFailureRetryMs;
/**
* 最后请求限流时间戳,单位:毫秒
*/
private volatile long lastCongestionError;
/**
* 最后网络失败时间戳,单位:毫秒
*/
private volatile long lastNetworkFailure;
TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) {
this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs);
this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs);
}
void registerFailure(ProcessingResult processingResult){
if (processingResult == ProcessingResult.Congestion) {
lastCongestionError = System.currentTimeMillis();
} else if (processingResult == ProcessingResult.TransientError) {
lastNetworkFailure = System.currentTimeMillis();
}
}
/**
* 计算提交延迟,单位:毫秒
*
* @return 延迟
*/
long transmissionDelay(){
// 无延迟
if (lastCongestionError == -1 && lastNetworkFailure == -1) {
return 0;
}
long now = System.currentTimeMillis();
// 计算最后请求限流带来的延迟
if (lastCongestionError != -1) {
long congestionDelay = now - lastCongestionError;
if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) { // 范围内
return congestionRetryDelayMs - congestionDelay; // 补充延迟
}
lastCongestionError = -1; // 重置时间戳
}
// 计算最后网络失败带来的延迟
if (lastNetworkFailure != -1) {
long failureDelay = now - lastNetworkFailure;
if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) { // 范围内
return networkFailureRetryMs - failureDelay; // 补充延迟
}
lastNetworkFailure = -1; // 重置时间戳
}
// 无延迟
return 0;
}
}
#registerFailure(...) ,在任务执行失败时,提交任务结果给 TrafficShaper ,记录发生时间。在「10. 任务执行器【执行任务】」会看到调用该方法。 #transmissionDelay(...) ,计算提交延迟,单位:毫秒。「9. 任务接收线程【调度任务】」会看到调用该方法。 调用 AcceptorExecutor#process(...) 方法,添加任务到接收任务队列。实现代码如下:
// AcceptorExecutor.java
void process(ID id, T task, long expiryTime){
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
com.netflix.eureka.util.batcher.TaskHolder ,任务持有者,实现代码如下:
class TaskHolder<ID, T>{
/**
* 任务编号
*/
private final ID id;
/**
* 任务
*/
private final T task;
/**
* 任务过期时间戳
*/
private final long expiryTime;
/**
* 任务提交时间戳
*/
private final long submitTimestamp;
}
后台线程执行 AcceptorRunner#run(...) 方法,调度任务。实现代码如下:
1: @Override
2: public void run(){
3: long scheduleTime = 0;
4: while (!isShutdown.get()) {
5: try {
6: // 处理完输入队列( 接收队列 + 重新执行队列 )
7: drainInputQueues();
8:
9: // 待执行任务数量
10: int totalItems = processingOrder.size();
11:
12: // 计算调度时间
13: long now = System.currentTimeMillis();
14: if (scheduleTime < now) {
15: scheduleTime = now + trafficShaper.transmissionDelay();
16: }
17:
18: // 调度
19: if (scheduleTime <= now) {
20: // 调度批量任务
21: assignBatchWork();
22: // 调度单任务
23: assignSingleItemWork();
24: }
25:
26: // 1)任务执行器无任务请求,正在忙碌处理之前的任务;或者 2)任务延迟调度。睡眠 10 秒,避免资源浪费。
27: // If no worker is requesting data or there is a delay injected by the traffic shaper,
28: // sleep for some time to avoid tight loop.
29: if (totalItems == processingOrder.size()) {
30: Thread.sleep(10);
31: }
32: } catch (InterruptedException ex) {
33: // Ignore
34: } catch (Throwable e) {
35: // Safe-guard, so we never exit this loop in an uncontrolled way.
36: logger.warn("Discovery AcceptorThread error", e);
37: }
38: }
39: }
第 4 行 :无限循环执行调度,直到关闭。
第 6 至 7 行 :调用 #drainInputQueues() 方法, 循环 处理完输入队列( 接收队列 + 重新执行队列 ), 直到 有待执行的任务。实现代码如下:
1: private void drainInputQueues() throws InterruptedException{
2: do {
3: // 处理完重新执行队列
4: drainReprocessQueue();
5: // 处理完接收队列
6: drainAcceptorQueue();
7:
8: // 所有队列为空,等待 10 ms,看接收队列是否有新任务
9: if (!isShutdown.get()) {
10: // If all queues are empty, block for a while on the acceptor queue
11: if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
12: TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
13: if (taskHolder != null) {
14: appendTaskHolder(taskHolder);
15: }
16: }
17: }
18: } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()); // 处理完输入队列( 接收队列 + 重新执行队列 )
19: }
第 2 行 && 第 18 行 : 循环 ,直到 同时 满足如下全部条件:
reprocessQueue ) 和接收队列( acceptorQueue )为空 pendingTasks ) 不为空 第 3 至 4 行 :处理完重新执行队列( reprocessQueue )。实现代码如下:
1: private void drainReprocessQueue(){
2: long now = System.currentTimeMillis();
3: while (!reprocessQueue.isEmpty() && !isFull()) {
4: TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); // 优先拿较新的任务
5: ID id = taskHolder.getId();
6: if (taskHolder.getExpiryTime() <= now) { // 过期
7: expiredTasks++;
8: } else if (pendingTasks.containsKey(id)) { // 已存在
9: overriddenTasks++;
10: } else {
11: pendingTasks.put(id, taskHolder);
12: processingOrder.addFirst(id); // 提交到队头
13: }
14: }
15: // 如果待执行队列已满,清空重新执行队列,放弃较早的任务
16: if (isFull()) {
17: queueOverflows += reprocessQueue.size();
18: reprocessQueue.clear();
19: }
20: }
pendingTasks ) 里。 processingOrder ) 的头部。效果如下图:
pendingTasks )已满,清空重新执行队列( processingOrder ),放弃较早的任务。 第 5 至 6 行 :处理完接收队列( acceptorQueue ),实现代码如下:
private void drainAcceptorQueue(){
while (!acceptorQueue.isEmpty()) { // 循环,直到接收队列为空
appendTaskHolder(acceptorQueue.poll());
}
}
private void appendTaskHolder(TaskHolder<ID, T> taskHolder){
// 如果待执行队列已满,移除待处理队列,放弃较早的任务
if (isFull()) {
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
// 添加到待执行队列
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else {
overriddenTasks++;
}
}
第 8 至 17 行 :当所有队列为空,阻塞从接收队列( acceptorQueue ) 拉取任务 10 ms。若拉取到,添加到待执行队列( processingOrder )。
第 12 至 16 行 :计算可调度任务的最小时间( scheduleTime )。
scheduleTime 小于当前时间,不重新计算,即此时需要延迟等待调度。 scheduleTime 大于等于当前时间,配合 TrafficShaper#transmissionDelay(...) 重新计算。 第 19 行 :当 scheduleTime 小于当前时间,执行任务的调度。
第 21 行 :调用 #assignBatchWork() 方法,调度批量任务。实现代码如下:
1: void assignBatchWork(){
2: if (hasEnoughTasksForNextBatch()) {
3: // 获取 批量任务工作请求信号量
4: if (batchWorkRequests.tryAcquire(1)) {
5: // 获取批量任务
6: long now = System.currentTimeMillis();
7: int len = Math.min(maxBatchingSize, processingOrder.size());
8: List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
9: while (holders.size() < len && !processingOrder.isEmpty()) {
10: ID id = processingOrder.poll();
11: TaskHolder<ID, T> holder = pendingTasks.remove(id);
12: if (holder.getExpiryTime() > now) { // 过期
13: holders.add(holder);
14: } else {
15: expiredTasks++;
16: }
17: }
18: //
19: if (holders.isEmpty()) { // 未调度到批量任务,释放请求信号量
20: batchWorkRequests.release();
21: } else { // 添加批量任务到批量任务工作队列
22: batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
23: batchWorkQueue.add(holders);
24: }
25: }
26: }
27: }
第 2 行 :调用 #hasEnoughTasksForNextBatch() 方法,判断是否有足够任务进行下一次批量任务调度:1)待执行任务( processingOrder )映射已满;或者 2)到达批量任务处理最大等待延迟。实现代码如下:
private boolean hasEnoughTasksForNextBatch(){
// 待执行队列为空
if (processingOrder.isEmpty()) {
return false;
}
// 待执行任务映射已满
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
// 到达批量任务处理最大等待延迟( 通过待处理队列的头部任务判断 )
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
return delay >= maxBatchingDelay;
}
第 5 至 17 行 :获取批量任务( holders )。 你会发现,本文说了半天的批量任务,实际是 List<TaskHolder<ID, T>> 哈 。
第 4 行 :获取批量任务工作请求信号量( batchWorkRequests ) 。在任务执行器的批量任务执行器,每次执行时,发出 batchWorkRequests 。 每一个信号量需要保证获取到一个批量任务 。
第 19 至 20 行 :未调度到批量任务,释放请求信号量, 代表请求实际未完成,每一个信号量需要保证获取到一个批量任务 。
第 21 至 24 行 :添加批量任务到批量任务工作队列。
第 23 行 :调用 #assignSingleItemWork() 方法,调度单任务。
第 23 行 :调用 #assignSingleItemWork() 方法,调度单任务,和 #assignBatchWork() 方法 类似 。实现代码如下:
void assignSingleItemWork(){
if (!processingOrder.isEmpty()) { // 待执行任队列不为空
// 获取 单任务工作请求信号量
if (singleItemWorkRequests.tryAcquire(1)) {
// 【循环】获取单任务
long now = System.currentTimeMillis();
while (!processingOrder.isEmpty()) {
ID id = processingOrder.poll(); // 一定不为空
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
singleItemWorkQueue.add(holder);
return;
}
expiredTasks++;
}
// 获取不到单任务,释放请求信号量
singleItemWorkRequests.release();
}
}
}
第 26 至 31 行 :当调度任务前的待执行任务数( totalItems )等于当前待执行队列( processingOrder )的任务数,意味着:1)任务执行器无任务请求,正在忙碌处理之前的任务;或者 2)任务延迟调度。睡眠 10 秒,避免资源浪费。
批量任务工作后台线程( BatchWorkerRunnable )执行 #run(...) 方法,调度任务。实现代码如下:
//
1: @Override
2: public void run(){
3: try {
4: while (!isShutdown.get()) {
5: // 获取批量任务
6: List<TaskHolder<ID, T>> holders = getWork();
7:
8: // TODO 芋艿:监控相关,暂时无视
9: metrics.registerExpiryTimes(holders);
10:
11: // 获得实际批量任务
12: List<T> tasks = getTasksOf(holders);
13: // 调用处理器执行任务
14: ProcessingResult result = processor.process(tasks);
15: switch (result) {
16: case Success:
17: break;
18: case Congestion:
19: case TransientError:
20: taskDispatcher.reprocess(holders, result); // 提交重新处理
21: break;
22: case PermanentError:
23: logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
24: }
25:
26: // TODO 芋艿:监控相关,暂时无视
27: metrics.registerTaskResult(result, tasks.size());
28: }
29: } catch (InterruptedException e) {
30: // Ignore
31: } catch (Throwable e) {
32: // Safe-guard, so we never exit this loop in an uncontrolled way.
33: logger.warn("Discovery WorkerThread error", e);
34: }
35: }
第 4 行 :无限循环执行调度,直到关闭。
第 6 行 :调用 getWork() 方法,获取 一个 批量任务直到成功。实现代码如下:
1: private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
2: // 发起请求信号量,并获得批量任务的工作队列
3: BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
4: // 【循环】获取批量任务,直到成功
5: List<TaskHolder<ID, T>> result;
6: do {
7: result = workQueue.poll(1, TimeUnit.SECONDS);
8: } while (!isShutdown.get() && result == null);
9: return result;
10: }
第 3 行 :调用 TaskDispatcher#requestWorkItems() 方法,发起请求信号量,并获得批量任务的工作队列。实现代码如下:
// TaskDispatcher.java
/**
* 批量任务工作请求信号量
*/
private final Semaphore batchWorkRequests = new Semaphore(0);
/**
* 批量任务工作队列
*/
private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
batchWorkRequests.release();
return batchWorkQueue;
}
batchWorkQueue ) 和单任务工作队列( singleItemWorkQueue ) 是 不同的队列 。 第 5 至 8 行 : 循环 获取 一个 批量任务,直到成功。
第 12 行 :调用 #getTasksOf(...) 方法,获得 实际 批量任务。实现代码如下:
private List<T> getTasksOf(List<TaskHolder<ID, T>> holders){
List<T> tasks = new ArrayList<>(holders.size());
for (TaskHolder<ID, T> holder : holders) {
tasks.add(holder.getTask());
}
return tasks;
}
第 14 至 24 行 :调用处理器( TaskProcessor ) 执行任务。当任务执行结果为 Congestion 或 TransientError ,调用 AcceptorExecutor#reprocess(...) 提交 整个批量任务 重新处理,实现代码如下:
// AcceptorExecutor.java
void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult){
// 添加到 重新执行队列
reprocessQueue.addAll(holders);
// TODO 芋艿:监控相关,暂时无视
replayedTasks += holders.size();
// 提交任务结果给 TrafficShaper
trafficShaper.registerFailure(processingResult);
}
单任务工作后台线程( SingleTaskWorkerRunnable )执行 #run(...) 方法,调度任务,和 BatchWorkerRunnable#run(...) 基本类似,就不啰嗦了。实现代码如下:
@Override
// SingleTaskWorkerRunnable.java
public void run(){
try {
while (!isShutdown.get()) {
// 发起请求信号量,并获得单任务的工作队列
BlockingQueue<TaskHolder<ID, T>> workQueue = taskDispatcher.requestWorkItem();
TaskHolder<ID, T> taskHolder;
// 【循环】获取单任务,直到成功
while ((taskHolder = workQueue.poll(1, TimeUnit.SECONDS)) == null) {
if (isShutdown.get()) {
return;
}
}
// TODO 芋艿:监控相关,暂时无视
metrics.registerExpiryTime(taskHolder);
if (taskHolder != null) {
// 调用处理器执行任务
ProcessingResult result = processor.process(taskHolder.getTask());
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(taskHolder, result); // 提交重新处理
break;
case PermanentError:
logger.warn("Discarding a task of {} due to permanent error", workerName);
}
// TODO 芋艿:监控相关,暂时无视
metrics.registerTaskResult(result, 1);
}
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
:smiling_imp: 又是一篇长文。建议边看代码,边对照着整体流程图,理解实际不难。
当然,欢迎你有任何疑问,在我的公众号( 芋道源码 ) 留言。
胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?