初次写作尝试:本文试以问答形式对quartz做一些介绍。
Ⅰ Quartz是什么?为什么要有这样一篇文章?
Quartz 是一个完全由 Java 编写的开源作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制。Quartz最早的issue记录在jira.terracotta.org,时间可以追溯到大约2010年的2月。Quartz年代久远难以满足技术人的新奇感,也没有hadoop、spark那么重量、时髦,但深入了解一下这位老前辈的设计思想,仍能使我们获得一些开发方面的参考,这也正是本文的初衷。
Ⅱ 作业调度是什么?
想象一下,所有程序都是被动触发的,比如对于web程序,一次外部请求就是一次触发。有一类程序需要由时间触发,比如抢火车票的场景,每隔一段时间查询一次余票,最简单的实现,就是构造一个死循环,不断查询余票,每次启动主程序时,同时启动这个线程,这样就有了一个随时间周期性自动运行的程序。周期性触发称为一种调度规则,“由时间触发程序运行”就称为“作业调度”。
Ⅲ 我们为什么需要作业调度?
时间的流动不知疲倦永不停歇,由时间触发程序能自动化大量业务过程。
Ⅳ Quartz怎样实现作业调度?
仔细考虑一下我们需要的作业调度应该有些什么:
1. 一个“调度者”,
2. 一个“调度规则”,
3. 一个被调度的“作业”
这三者可以构成一个有基本功能的作业调度,在quartz中,Scheduler对应“调度者”, Trigger对应“调度规则”,Job对应“作业”。
调度规则和作业配置在quartz的保存方式有很多实现,基于内存的RAMJobStore、基于JDBC的JobStoreSupport等等。在Scheduler和JobStore之间还有一层DriveDelegate,DriveDelegate非常像JobStore,只是扩展了基于不同database实现时的一些实现。Listener则提供了Scheduler、Trigger、Job运行时,一些时间点的通知。
自绘业务架构图:
Ⅴ 描述一下Quartz的一些基本活动?
首先是Quartz启动时的活动图:
看一下对应的源码片段:
启动的入口在QuartzScheduler的start:
1. 在QuartzScheduler.start()前,主调度线程SchedulerThread和其他配置通过StdSchedulerFactory初始化,源代码在StdSchedulerFactory.instantiate(),接近800行,此处不列举。
2. 检查Scheduler是否正在关闭或者已关闭。
3. 通知SchedulerListener正在启动。
4. 启动Scheduler,启动插件。
5. 如果Scheduler从暂停恢复运行,通知JobSupport恢复Scheduler。
6. 通知主调度线程开始运行。
7. 通知SchedulerListener启动完成。
public class QuartzScheduler implements RemotableQuartzScheduler {
public void start() throws SchedulerException {
// 检查Scheduler是否正在关闭或者已关闭
if (shuttingDown|| closed) {
throw new SchedulerException(
"The Scheduler cannot be restarted after shutdown() has been called.");
}
//通知SchedulerListener正在启动
notifySchedulerListenersStarting();
if (initialStart == null) {
initialStart = new Date();
//启动Scheduler
this.resources.getJobStore().schedulerStarted();
//启动各种插件
startPlugins();
} else {
//如果Scheduler从暂停恢复运行,通知JobSupport恢复Scheduler
resources.getJobStore().schedulerResumed();
}
//通知主调度线程可以开始运行
schedThread.togglePause(false);
//通知SchedulerListener启动完成
notifySchedulerListenersStarted();
}
}
通知监听:创建SchedulerListener的列表并逐个通知。
public class QuartzScheduler implements RemotableQuartzScheduler {
public void notifySchedulerListenersStarting() {
// 创建SchedulerListener的列表.
List<SchedulerListener> schedListeners = buildSchedulerListenerList();
// 逐个通知Listener
for (SchedulerListener sl : schedListeners) {
sl.schedulerStarting();
}
}
}
获取监听列表:从ListenerManager获取Listener,这里需要开发者主动将自己的Listener注册到ListenerManager。
public class QuartzScheduler implements RemotableQuartzScheduler {
private List<SchedulerListener> buildSchedulerListenerList() {
List<SchedulerListener> allListeners = new LinkedList<SchedulerListener>();
// 从ListenerManager获取Listener 这里需要开发者主动将自己的Listener注册到ListenerManager
allListeners.addAll(getListenerManager().getSchedulerListeners());
allListeners.addAll(getInternalSchedulerListeners());
return allListeners;
}
}
正式启动Scheduler
1. 集群部署时初始化ClusterManager线程并启动。
2. 单机部署时恢复Job。
3. 初始化MisFire处理线程并启动。
public abstract class JobStoreSupportimplements JobStore,Constants {
public void schedulerStarted() throws SchedulerException {
// 集群部署时初始化ClusterManager线程并启动
if (isClustered()) {
clusterManagementThread = new ClusterManager();
clusterManagementThread.initialize();
} else {
//单机部署直接恢复Job
recoverJobs();
}
//初始化MisFire处理线程并启动
misfireHandler = new MisfireHandler();
misfireHandler.initialize();
}
}
首先初始化ClusterManager,把ClusterManager放进线程池执行。
class ClusterManager extends Thread {
public void initialize() {
// 初始化ClusterManager
this.manage();
ThreadExecutor executor = getThreadExecutor();
// 把ClusterManager放进线程池执行
executor.execute(ClusterManager.this);
}
}
ClusterManage 进一步doCheckin。
class ClusterManager extends Thread {
private boolean manage() {
boolean res = false;
// 节点登入集群
res = doCheckin();
return res;
}
}
Checkin 细节:
1. 每次checkin都检查是否有意外中断的作业。
2. 从db获取锁后,再恢复作业。
public abstract class JobStoreSupportimplements JobStore,Constants {
protected boolean doCheckin() throws JobPersistenceException {
boolean recovered = false;
Connection conn = getNonManagedTXConnection();
try {
// 每次都要检查是否有意外中断的作业
List<SchedulerStateRecord> failedRecords = null;
if (!firstCheckIn) {
failedRecords = clusterCheckIn(conn);
commitConnection(conn);
}
if (firstCheckIn || (failedRecords.size() > 0)) {
// 从db获取锁
getLockHandler().obtainLock(conn, LOCK_STATE_ACCESS);
transStateOwner = true;
failedRecords = (firstCheckIn) ? clusterCheckIn(conn) : findFailedInstances(conn);
if (failedRecords.size() > 0) {
getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
// 恢复中断的作业
clusterRecover(conn, failedRecords);
recovered = true;
}
}
commitConnection(conn);
} catch (JobPersistenceException e) {
rollbackConnection(conn);
} finally {
}
firstCheckIn = false;
return recovered;
}
}
恢复作业:首先找到意外中断的调度记录,保存更新节点checkin的时间。
public abstract class JobStoreSupportimplements JobStore,Constants {
protected List<SchedulerStateRecord> clusterCheckIn(Connection conn) throws JobPersistenceException {
//找到意外中断的调度记录
List<SchedulerStateRecord> failedInstances = findFailedInstances(conn);
try {
// 保存更新节点checkin的时间
lastCheckin = System.currentTimeMillis();
if (getDelegate().updateSchedulerState(conn, getInstanceId(), lastCheckin) == 0) {
getDelegate().insertSchedulerState(conn, getInstanceId(),
lastCheckin, getClusterCheckinInterval());
}
} catch (Exception e) {
}
return failedInstances;
}
}
恢复细节:查询获得一个集群中所有失败节点的列表,如果当前节点首次checkin,列表里会有当前节点:
1. 查询所有节点的调度记录。
2. 找到当前节点的调度记录。
3. 找到所有调度意外中断的节点的记录。
如果是当前节点第一次checkIn,还要把有触发记录但丢失调度记录的补全,构造虚拟的调度记录。
public abstract class JobStoreSupportimplements JobStore,Constants {
protected List<SchedulerStateRecord> findFailedInstances(Connection conn) throws JobPersistenceException {
try {
List<SchedulerStateRecord> failedInstances = new LinkedList<SchedulerStateRecord>();
long timeNow = System.currentTimeMillis();
// 查询所有节点的调度记录
List<SchedulerStateRecord> states = getDelegate().selectSchedulerStateRecords(conn, null);
for (SchedulerStateRecord rec : states) {
// 找到当前节点的记录
if (rec.getSchedulerInstanceId().equals(getInstanceId())) {
if (firstCheckIn) {
failedInstances.add(rec);
}
} else {
// 找到所有调度意外中断的节点的记录
if (calcFailedIfAfter(rec) < timeNow) {
failedInstances.add(rec);
}
}
}
// 如果是当前节点第一次checkIn,还要把有触发记录但丢失调度记录的补全,构造虚拟的调度记录.
if (firstCheckIn) {
failedInstances.addAll(findOrphanedFailedInstances(conn, states));
}
return failedInstances;
} catch (Exception e) {
}
}
}
细节:针对有Trigger调度记录但没有Scheduler调度记录的,创建虚拟的Scheduler调度记录 。
public abstract class JobStoreSupportimplements JobStore,Constants {
private List<SchedulerStateRecord> findOrphanedFailedInstances(
Connection conn,
List<SchedulerStateRecord> schedulerStateRecords)
throws SQLException, NoSuchDelegateException {
List<SchedulerStateRecord> orphanedInstances = new ArrayList<SchedulerStateRecord>();
Set<String> allFiredTriggerInstanceNames = getDelegate().selectFiredTriggerInstanceNames(conn);
if (!allFiredTriggerInstanceNames.isEmpty()) {
for (SchedulerStateRecord rec : schedulerStateRecords) {
allFiredTriggerInstanceNames.remove(rec.getSchedulerInstanceId());
}
for (String inst : allFiredTriggerInstanceNames) {
SchedulerStateRecord orphanedInstance = new SchedulerStateRecord();
orphanedInstance.setSchedulerInstanceId(inst);
orphanedInstances.add(orphanedInstance);
}
}
return orphanedInstances;
}
}
完成作业恢复:然后用这些调度记录创建SimpleTriggerImpl,恢复对应的Trigger,通知主调度线程调度。
public abstract class JobStoreSupportimplements JobStore,Constants {
protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances) throws JobPersistenceException {
if (failedInstances.size() > 0) {
long recoverIds = System.currentTimeMillis();
try {
for (SchedulerStateRecord rec : failedInstances) {
List<FiredTriggerRecord> firedTriggerRecs = getDelegate().selectInstancesFiredTriggerRecords(conn, rec.getSchedulerInstanceId());
int acquiredCount = 0;
int recoveredCount = 0;
int otherCount = 0;
Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();
for (FiredTriggerRecord ftRec : firedTriggerRecs) {
TriggerKey tKey = ftRec.getTriggerKey();
JobKey jKey = ftRec.getJobKey();
triggerKeys.add(tKey);
if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {
getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_WAITING, STATE_BLOCKED);
} else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {
getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {
getDelegate().updateTriggerStateFromOtherState(conn, tKey, STATE_WAITING, STATE_ACQUIRED);
acquiredCount++;
} else if (ftRec.isJobRequestsRecovery()) {
if (jobExists(conn, jKey)) {
SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl("recover_" + rec.getSchedulerInstanceId() + "_" + String.valueOf(recoverIds++), Scheduler.DEFAULT_RECOVERY_GROUP, new Date(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobName(jKey.getName());
rcvryTrig.setJobGroup(jKey.getGroup());
rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
rcvryTrig.setPriority(ftRec.getPriority());
JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
rcvryTrig.setJobDataMap(jd);
rcvryTrig.computeFirstFireTime(null);
storeTrigger(conn, rcvryTrig, null, false, STATE_WAITING, false, true);
recoveredCount++;
} else {
otherCount++;
}
} else {
otherCount++;
}
if (ftRec.isJobDisallowsConcurrentExecution()) {
getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_WAITING, STATE_BLOCKED);
getDelegate().updateTriggerStatesForJobFromOtherState(conn, jKey, STATE_PAUSED, STATE_PAUSED_BLOCKED);
}
}
getDelegate().deleteFiredTriggers(conn, rec.getSchedulerInstanceId());
int completeCount = 0;
for (TriggerKey triggerKey : triggerKeys) {
if (getDelegate().selectTriggerState(conn, triggerKey).equals(STATE_COMPLETE)) {
List<FiredTriggerRecord> firedTriggers = getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
if (firedTriggers.isEmpty()) {
if (removeTrigger(conn, triggerKey)) {
completeCount++;
}
}
}
}
if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {
getDelegate().deleteSchedulerState(conn, rec.getSchedulerInstanceId());
}
}
} catch (Throwable e) {
}
}
}
}
此外,ClusterManager运行时也会周期性地恢复其他异常节点调度的Trigger,并且立即通知当前节点的调度线程插入这些立即触发的Trigger。
class ClusterManager extends Thread {
public void run() {
while (!shutdown) {
if (!shutdown) {
long timeToSleep = getClusterCheckinInterval();
long transpiredTime = (System.currentTimeMillis() - lastCheckin);
timeToSleep = timeToSleep - transpiredTime;
if (timeToSleep <= 0) {
timeToSleep = 100L;
}
if (numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}
if (!shutdown && this.manage()) {
//通知当前节点的主调度线程插入一批新的Trigger触发
signalSchedulingChangeImmediately(0L);
}
}
}
}
错过了Trigger的触发时间会怎样?
有一个专门处理错过触发时间超过一定阈值(60s)的线程,活动图如下:
Misfire 处理线程启动:
1. 查询错过触发时间阈值的作业。
2. 通知主调度线程插入这些重新调度的作业。
class MisfireHandler extends Thread {
@Override
public void run() {
while (!shutdown) {
long sTime = System.currentTimeMillis();
// 查询错过触发阈值的作业
RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
// 通知主调度线程插入这些重新调度的作业
if (recoverMisfiredJobsResult.getProcessedMisfiredTriggerCount() > 0) {
signalSchedulingChangeImmediately(recoverMisfiredJobsResult.getEarliestNewTime());
}
if (!shutdown) {
long timeToSleep = 50l;
//sleep直到下个循环
if (!recoverMisfiredJobsResult.hasMoreMisfiredTriggers()) {
timeToSleep = getMisfireThreshold() - (System.currentTimeMillis() - sTime);
if (timeToSleep <= 0) {
timeToSleep = 50l;
}
if (numFails > 0) {
timeToSleep = Math.max(getDbRetryInterval(), timeToSleep);
}
}
try {
Thread.sleep(timeToSleep);
} catch (Exception ignore) {
}
}//while !shutdown
}
}
}
manage 继续调用doRecoverMisfires。
class MisfireHandler extends Thread {
private RecoverMisfiredJobsResult manage() {
try {
//查询错过触发的job
RecoverMisfiredJobsResult res = doRecoverMisfires();
return res;
} catch (Exception e) {
}
return RecoverMisfiredJobsResult.NO_OP;
}
}
Count 是否有错过触发需要重新调度的作业,再获取集群锁,然后再获取作业。
class MisfireHandler extends Thread {
protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
boolean transOwner = false;
Connection conn = getNonManagedTXConnection();
try {
RecoverMisfiredJobsResult result = RecoverMisfiredJobsResult.NO_OP;
// Count是否有错过触发需要重新调度的作业,再获取集群锁,然后再获取作业
int misfireCount = (getDoubleCheckLockMisfireHandler()) ?
getDelegate().countMisfiredTriggersInState(conn, STATE_WAITING, getMisfireTime()) :
Integer.MAX_VALUE;
if (misfireCount == 0) {
} else {
transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
result = recoverMisfiredJobs(conn, false);
}
commitConnection(conn);
return result;
} finally {
}
}
}
最后通知主调度线程正式启动。
public class QuartzSchedulerThread extends Thread {
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;
if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}
}
然后是调度作业的活动图:
再来看主调度线程的代码片段:
1. 检查是否关闭、暂停主调度线程,然后wait。
2. DB 有异常时稍微等待再继续。
3. 获取空闲线程
4. 获取一批即将调度的Trigger。
5. 距触发时间有一段时间时,检查是否有其他插入的Trigger,wait
6. 如果找到了其他插入的Trigger,释放当前的一批Trigger,重新循环。
7. 通知JobStore trigger已经被触发,获取触发结果。
8. Trigger 触发结果是失败时释放这个Trigger。
9. Trigger 触发成功时,创建JobRunShell对象,JobRunlShell初始化获取调度作业。
10. 线程池运行调度作业。
public class QuartzSchedulerThread extends Thread {
public void run() {
int acquiresFailed = 0;
// 检查是否关闭、暂停主调度线程,然后wait
while (!halted.get()) {
try {
synchronized (sigLock) {
while (paused && !halted.get()) {
sigLock.wait(1000L);
acquiresFailed = 0;
}
if (halted.get()) {
break;
}
}
// DB有异常时稍微等待再继续
if (acquiresFailed > 1) {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
}
// 获取空闲线程,但这里其实恒为true
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if (availThreadCount > 0) {
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
// 获取一批即将调度的Trigger
triggers = qsRsrcs.getJobStore().acquireNextTriggers(now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError("An error occurred while scanning for the next triggers to fire.", jpe);
}
acquiresFailed++;
continue;
}
acquiresFailed++;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
// 距触发时间有一段时间时,检查是否有其他插入的Trigger,wait
while (timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
sigLock.wait(timeUntilTrigger);
}
}
//如果找到了其他插入的Trigger,释放当前的一批Trigger,重新循环
if (releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
if (triggers.isEmpty())
continue;
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized (sigLock) {
goAhead = !halted.get();
}
if (goAhead) {
try {
//通知JobStore trigger已经被触发,获取触发结果
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if (res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("An error occurred while firing triggers '" + triggers + "'", se);
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
//Trigger触发结果是失败时释放这个Trigger
if (exception instanceof RuntimeException) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
//Trigger触发成功时,创建JobRunShell对象,JobRunlShell初始化获取调度作业
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
//线程池运行调度作业,
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue;
} else {
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized (sigLock) {
if (!halted.get()) {
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
}
} catch (RuntimeException re) {
}
}
}
}
默认线程池运行作业:
1. wait 空闲线程或继续执行。
2. 即使线程池被关闭,依然可以继续执行作业。
public class SimpleThreadPool implements ThreadPool {
public boolean runInThread(Runnable runnable) {
synchronized (nextRunnableLock) {
handoffPending = true;
// wait空闲线程或继续执行
// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
nextRunnableLock.wait(500);
}
if (!isShutdown) {
WorkerThread wt = (WorkerThread) availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// 即使线程池被关闭,依然可以继续执行作业
WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}
return true;
}
}
这里的run只是唤醒当前对象在另一个线程里的wait。
class WorkerThread extends Thread {
public void run(Runnable newRunnable) {
synchronized (lock) {
runnable = newRunnable;
lock.notifyAll();
}
}
}
JobRunShell 在这里执行。
class WorkerThread extends Thread {
public void run() {
boolean ran = false;
while (run.get()) {
try {
synchronized (lock) {
// 没有作业时wait空循环
while (runnable == null && run.get()) {
lock.wait(500);
}
// 有作业时执行
if (runnable != null) {
ran = true;
runnable.run();
}
}
} finally {
if (getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}
if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if (ran) {
ran = false;
makeAvailable(this);
}
}
}
}
}
JobShell 内部实现:
1. 如果有事务,开启事务。
2. 通知TriggerListener和JobListener。
3. 传入Job的上下文运行Job。
4. 事务控制的Job在事务异常时才允许重复运行job。
5. 结束事务。
public class JobRunShell extends SchedulerListenerSupport implements Runnable {
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
try {
// 如果有事务,开启事务
begin();
} catch (SchedulerException se) {
break;
}
// 通知TriggerListener和JobListener
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch (VetoedException ve) {
try {
CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
if (jec.getTrigger().getNextFireTime() == null) {
qs.notifySchedulerListenersFinalized(jec.getTrigger());
}
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job (" + jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
break;
}
long startTime = System.currentTimeMillis();
long endTime = startTime;
// 传入Job的上下文运行Job
// execute the job
try {
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
} catch (Throwable e) {
endTime = System.currentTimeMillis();
qs.notifySchedulerListenersError("Job (" + jec.getJobDetail().getKey() + " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}
jec.setJobRunTime(endTime - startTime);
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}
CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se);
}
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}
// 事务控制的Job在事务异常时才允许重复运行job.
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se);
}
continue;
}
try {
// 结束事务.
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job (" + jec.getJobDetail().getKey() + ": couldn't finalize execution.", se);
continue;
}
qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
}
Ⅵ 对比一下Quartz和其他流行的调度作业框架?
58 的作业调度框架基于XXL-JOB,XXL-JOB早期基于Quartz实现调度。由于对XXL-JOB并不熟悉,因此直接参考了官方文档,XXL-JOB官方文档将自己和Quartz做了对比:
Quartz 作为开源作业调度中的佼佼者,是作业调度的首选。但是集群环境中Quartz采用API的方式对任务进行管理,从而可以避免上述问题,但是同样存在以下问题:
问题一:调用API的的方式操作任务,不人性化;
问题二:需要持久化业务QuartzJobBean到底层数据表中,系统侵入性相当严重。
问题三:调度逻辑和QuartzJobBean耦合在同一个项目中,这将导致一个问题,在调度任务数量逐 渐增多,同时调度任务逻辑逐渐加重的情况加,此时调度系统的性能将大大受限于业务;
问题四:quartz底层以“抢占式”获取DB锁并由抢占成功节点负责运行任务,会导致节点负载悬殊非 常大;而XXL-JOB通过执行器实现“协同分配式”运行任务,充分发挥集群优势,负载各节点均衡。
XXL-JOB 弥补了quartz的上述不足之处。
通过XXL-JOB官方文档我们主要了解到,XXL-JOB提供了一套界面且操作API管理调度作业,优化了各节点负载,但并非XXL-JOB能优化负载而Quartz不能,比如作业以Job为单元执行,将作业分散部署在多个集群,将作业量接近的Job部署在同一个集群内,节点内控制合适的线程池数量,负载问题可以缓解一大块。对于问题三,调度任务增多影响性能的问题,根源实际上在于将作业代码写在调度集群内,通过进程隔离解决这个问题并不难。对于问题二,持久化业务需要保存数据到数据表,事实上任何作业调度都无法避免,通过服务拆分和进程隔离,仍然可以一定程度缓解这个问题。总的来看,XXL-JOB对Quartz做出了一些优化,也不失为一个作业调度的选择。
最后总结一下本文:
本文从两方面初步介绍了Quartz的基本实现:
1. Quartz 启动的主流程:
1) 通过配置初始化Scheduler和SchedulerThread主调度线程。
2) 以集群节点或单机的身份恢复作业调度。
3) 启动Misfire处理,检查恢复错过调度一定时间阈值的作业。
4) 在各个节点通知Listener。
2. Quartz 调度作业的流程:
1) 获取Trigger和对应的Job。
2) 检查并立即调度插入的Trigger和Job。
3) 将Trigger和Job交给作业线程池执行。
4) 在各个节点通知Listener。
本文没有涉及的:
1. Quartz 初始化配置的过程。
2. Quartz 的JobStore的多种实现以及细节。
3. Job 数据的持久化。
4. Job 的事务控制。
5. Listener 和Plugin。
6. Cluster 的细节。
感兴趣的可以自己下载源码阅读。