本文共 9073 字,大约阅读时间需要 30 分钟。
当用户生成sparkcontext是,在读入文件,可以看出这里直接调用rdd的saveAsTextFilespark-master\spark-master\core\src\main\scala\org\apache\spark\api\java\JavaRDDLike.scaladef saveAsTextFile(path: String): Unit = {#触发rdd的action rdd.saveAsTextFile(path) }spark-master\spark-master\core\src\main\scala\org\apache\spark\rdd\RDD.scala def saveAsTextFile(path: String): Unit = withScope { val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.mapPartitions { iter => val text = new Text() iter.map { x => text.set(x.toString) (NullWritable.get(), text) } }#将rdd保存为hadoop支持的文件系统 RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) }/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) hadoopConf.setOutputValueClass(valueClass) conf.setOutputFormat(outputFormatClass) for (c <- codec) { hadoopConf.setCompressMapOutput(true) hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") hadoopConf.setMapOutputCompressorClass(c) hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName) hadoopConf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) }#调用saveAsHadoopDataset FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriterUtils.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) }在saveAsHadoopDataset 中调用SparkHadoopWriter.writespark-master\spark-master\core\src\main\scala\org\apache\spark\rdd\PairRDDFunctions.scala def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf)) SparkHadoopWriter.write( rdd = self, config = config) }spark-master\spark-master\core\src\main\scala\org\apache\spark\internal\io\SparkHadoopWriter.scaladef write[K, V: ClassTag]( rdd: RDD[(K, V)], config: HadoopWriteConfigUtil[K, V]): Unit = { // Extract context and configuration from RDD. val sparkContext = rdd.context val commitJobId = rdd.id // Set up a job. val jobTrackerId = createJobTrackerID(new Date()) val jobContext = config.createJobContext(jobTrackerId, commitJobId) config.initOutputFormat(jobContext) // Assert the output format/key/value class is set in JobConf. config.assertConf(jobContext, rdd.conf) val committer = config.createCommitter(commitJobId) committer.setupJob(jobContext) // Try to write all RDD partitions as a Hadoop OutputFormat. try {#最终由调回到sparkcontext的runjob防范 val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers. // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber executeTask( context = context, config = config, jobTrackerId = jobTrackerId, commitJobId = commitJobId, sparkPartitionId = context.partitionId, sparkAttemptNumber = attemptId, committer = committer, iterator = iter) })spark-master\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) }#调用dagScheduler.runJob dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime#通过DAG的submit提交job val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)#等待job执行完成 ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)#判断执行的结果是成功还是失败 waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } } def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = { // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]#创建JobWaiter对象 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)#将jobsubmit放到event的队列当中 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }在EventLoop.scala 中有实现一个thread 会一直从eventProcessLoop的队列中取job来执行 def post(event: E): Unit = { eventQueue.put(event) }spark-master\core\src\main\scala\org\apache\spark\util\EventLoop.scalaprivate[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) // Exposed for testing. private[spark] val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try {#核心是调用onReceive方法处理 onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch {最终在DAGSchedulerEventProcessLoop 中实现onReceivespark-master\spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scalaprivate[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging { private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer /** * The main event loop of the DAG scheduler. */ override def onReceive(event: DAGSchedulerEvent): Unit = { val timerContext = timer.time() try { doOnReceive(event) } finally { timerContext.stop() } } private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {#可见是调用dagScheduler.handleJobSubmitted来完成整个job的提交 case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) case StageCancelled(stageId, reason) => dagScheduler.handleStageCancellation(stageId, reason) case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) }
转载地址:http://zsnmi.baihongyu.com/