转载

【Spark Core】DAGScheduler源码浅析2

引入

上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的 Spark源码分析 – DAGScheduler 一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数。

事件处理

在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理。在Spark 1.0源码中,事件处理通过Actor的方式进行,涉及的DAGEventProcessActor类进行主要的事件处理工作。可能由于scala不再支持原生actor方式,而将akka actor作为官方标准的原因,在我查看Spark 1.4的源码中,DAGScheduler重新采用eventQueue的方式进行事件处理,为了代码逻辑更加清晰,耦合性更小,1.4的源码中编写了DAGSchedulerEventProcessLoop类进行事件处理。

private[scheduler]classDAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extendsEventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop")withLogging { 

这里DAGSchedulerEventProcessLoop继承了EventLoop类,其中:

private[spark]abstractclassEventLoop[E](name: String)extendsLogging{  privatevaleventQueue: BlockingQueue[E] =newLinkedBlockingDeque[E]()  privatevalstopped =newAtomicBoolean(false)  privatevaleventThread =newThread(name) {  setDaemon(true)  overridedefrun(): Unit = { try{ while(!stopped.get) { valevent = eventQueue.take() try{  onReceive(event)  } catch{ caseNonFatal(e) => { try{  onError(e)  } catch{ caseNonFatal(e) => logError("Unexpected error in "+ name, e)  }  }  }  }  } catch{ caseie: InterruptedException =>// exit even if eventQueue is not empty caseNonFatal(e) => logError("Unexpected error in "+ name, e)  }  }   }   ...... 

我们可以看到,DAGScheduler通过向DAGSchedulerEventProcessLoop对象投递event,即向eventQueue发送event,eventThread不断从eventQueue中获取event并调用onReceive函数进行处理。

overridedefonReceive(event: DAGSchedulerEvent): Unit = eventmatch{ caseJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>  dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,  listener, properties)  ...... 

JobWaiter

JobWaiter首先实现JobListener的taskSucceeded和jobFailed函数,当DAGScheduler收到tasksuccess或fail的event就会调用相应的函数在tasksuccess会判断当所有task都success时,就表示jobFinished而awaitResult,就是一直等待jobFinished被置位。

可以看到在submitJob函数中创建了JobWaiter实例,作为参数传入的事件实例中,最终在调用handleJobSubmitted函数中,如果发生错误,就会调用JobWaiter的jobFailed函数。

下面是JobWaiter类的代码:

private[spark]classJobWaiter[T](  dagScheduler: DAGScheduler,  val jobId: Int,  totalTasks: Int,  resultHandler: (Int, T) => Unit) extendsJobListener {  privatevarfinishedTasks =0  // Is the job as a whole finished (succeeded or failed)? @volatile privatevar_jobFinished = totalTasks ==0  defjobFinished = _jobFinished  // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero // partition RDDs), we set the jobResult directly to JobSucceeded. privatevarjobResult: JobResult =if(jobFinished) JobSucceededelsenull  /**  * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled  * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it  * will fail this job with a SparkException.  */ defcancel() {  dagScheduler.cancelJob(jobId)  }  overridedeftaskSucceeded(index: Int, result: Any): Unit = synchronized { if(_jobFinished) { thrownewUnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")  }  resultHandler(index, result.asInstanceOf[T])  finishedTasks += 1 if(finishedTasks == totalTasks) {  _jobFinished = true  jobResult = JobSucceeded this.notifyAll()  }  }  overridedefjobFailed(exception: Exception): Unit = synchronized {  _jobFinished = true  jobResult = JobFailed(exception) this.notifyAll()  }  defawaitResult(): JobResult = synchronized { while(!_jobFinished) { this.wait()  } returnjobResult  } } 

小结

这一小节内容介绍了DAGScheduler.scala文件中的几个小细节,下一篇文章我会就DAGScheduler.scala文件中stage划分和依赖性进行分析介绍。

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

正文到此结束
Loading...