博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交
阅读量:4215 次
发布时间:2019-05-26

本文共 9073 字,大约阅读时间需要 30 分钟。

当用户生成sparkcontext是,在读入文件,可以看出这里直接调用rdd的saveAsTextFilespark-master\spark-master\core\src\main\scala\org\apache\spark\api\java\JavaRDDLike.scaladef saveAsTextFile(path: String): Unit = {#触发rdd的action    rdd.saveAsTextFile(path)  }spark-master\spark-master\core\src\main\scala\org\apache\spark\rdd\RDD.scala  def saveAsTextFile(path: String): Unit = withScope {    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]    val textClassTag = implicitly[ClassTag[Text]]    val r = this.mapPartitions { iter =>      val text = new Text()      iter.map { x =>        text.set(x.toString)        (NullWritable.get(), text)      }    }#将rdd保存为hadoop支持的文件系统    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)  }/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala  def saveAsHadoopFile(      path: String,      keyClass: Class[_],      valueClass: Class[_],      outputFormatClass: Class[_ <: OutputFormat[_, _]],      conf: JobConf = new JobConf(self.context.hadoopConfiguration),      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).    val hadoopConf = conf    hadoopConf.setOutputKeyClass(keyClass)    hadoopConf.setOutputValueClass(valueClass)    conf.setOutputFormat(outputFormatClass)    for (c <- codec) {      hadoopConf.setCompressMapOutput(true)      hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")      hadoopConf.setMapOutputCompressorClass(c)      hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)      hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",        CompressionType.BLOCK.toString)    }#调用saveAsHadoopDataset     FileOutputFormat.setOutputPath(hadoopConf,      SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))    saveAsHadoopDataset(hadoopConf)  }在saveAsHadoopDataset 中调用SparkHadoopWriter.writespark-master\spark-master\core\src\main\scala\org\apache\spark\rdd\PairRDDFunctions.scala  def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {    val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))    SparkHadoopWriter.write(      rdd = self,      config = config)  }spark-master\spark-master\core\src\main\scala\org\apache\spark\internal\io\SparkHadoopWriter.scaladef write[K, V: ClassTag](      rdd: RDD[(K, V)],      config: HadoopWriteConfigUtil[K, V]): Unit = {    // Extract context and configuration from RDD.    val sparkContext = rdd.context    val commitJobId = rdd.id    // Set up a job.    val jobTrackerId = createJobTrackerID(new Date())    val jobContext = config.createJobContext(jobTrackerId, commitJobId)    config.initOutputFormat(jobContext)    // Assert the output format/key/value class is set in JobConf.    config.assertConf(jobContext, rdd.conf)    val committer = config.createCommitter(commitJobId)    committer.setupJob(jobContext)    // Try to write all RDD partitions as a Hadoop OutputFormat.    try {#最终由调回到sparkcontext的runjob防范      val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {        // SPARK-24552: Generate a unique "attempt ID" based on the stage and task attempt numbers.        // Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.        val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber        executeTask(          context = context,          config = config,          jobTrackerId = jobTrackerId,          commitJobId = commitJobId,          sparkPartitionId = context.partitionId,          sparkAttemptNumber = attemptId,          committer = committer,          iterator = iter)      })spark-master\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala  def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      resultHandler: (Int, U) => Unit): Unit = {    if (stopped.get()) {      throw new IllegalStateException("SparkContext has been shutdown")    }    val callSite = getCallSite    val cleanedFunc = clean(func)    logInfo("Starting job: " + callSite.shortForm)    if (conf.getBoolean("spark.logLineage", false)) {      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)    }#调用dagScheduler.runJob    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)    progressBar.foreach(_.finishAll())    rdd.doCheckpoint()  }spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala  def runJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): Unit = {    val start = System.nanoTime#通过DAG的submit提交job    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)#等待job执行完成    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)#判断执行的结果是成功还是失败    waiter.completionFuture.value.get match {      case scala.util.Success(_) =>        logInfo("Job %d finished: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      case scala.util.Failure(exception) =>        logInfo("Job %d failed: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.        val callerStackTrace = Thread.currentThread().getStackTrace.tail        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)        throw exception    }  }  def submitJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): JobWaiter[U] = {    // Check to make sure we are not launching a task on a partition that does not exist.    val maxPartitions = rdd.partitions.length    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>      throw new IllegalArgumentException(        "Attempting to access a non-existent partition: " + p + ". " +          "Total number of partitions: " + maxPartitions)    }    val jobId = nextJobId.getAndIncrement()    if (partitions.size == 0) {      // Return immediately if the job is running 0 tasks      return new JobWaiter[U](this, jobId, 0, resultHandler)    }    assert(partitions.size > 0)    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]#创建JobWaiter对象    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)#将jobsubmit放到event的队列当中    eventProcessLoop.post(JobSubmitted(      jobId, rdd, func2, partitions.toArray, callSite, waiter,      SerializationUtils.clone(properties)))    waiter  }在EventLoop.scala 中有实现一个thread 会一直从eventProcessLoop的队列中取job来执行  def post(event: E): Unit = {    eventQueue.put(event)  }spark-master\core\src\main\scala\org\apache\spark\util\EventLoop.scalaprivate[spark] abstract class EventLoop[E](name: String) extends Logging {  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  private val stopped = new AtomicBoolean(false)  // Exposed for testing.  private[spark] val eventThread = new Thread(name) {    setDaemon(true)    override def run(): Unit = {      try {        while (!stopped.get) {          val event = eventQueue.take()          try {#核心是调用onReceive方法处理            onReceive(event)          } catch {            case NonFatal(e) =>              try {                onError(e)              } catch {                case NonFatal(e) => logError("Unexpected error in " + name, e)              }          }        }      } catch {最终在DAGSchedulerEventProcessLoop 中实现onReceivespark-master\spark-master\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scalaprivate[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {  private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer  /**   * The main event loop of the DAG scheduler.   */  override def onReceive(event: DAGSchedulerEvent): Unit = {    val timerContext = timer.time()    try {      doOnReceive(event)    } finally {      timerContext.stop()    }  }  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {#可见是调用dagScheduler.handleJobSubmitted来完成整个job的提交    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)    case StageCancelled(stageId, reason) =>      dagScheduler.handleStageCancellation(stageId, reason)    case JobCancelled(jobId, reason) =>      dagScheduler.handleJobCancellation(jobId, reason)  }

 

转载地址:http://zsnmi.baihongyu.com/

你可能感兴趣的文章
使用jmxtrans监控Spark JVM信息到grafana显示
查看>>
HBase - ROOT 和 META 表结构 (region定位原理)
查看>>
HBase API 和 基本操作
查看>>
Hbase的存储模型
查看>>
InfluxDB influxdbc.conf配置文件详解
查看>>
通过BulkLoad的方式快速导入海量数据
查看>>
Mysql根据内容查找在哪个表(Go版本)
查看>>
玩转Anaconda
查看>>
kali linux中文版安装
查看>>
安卓逆向之环境搭建
查看>>
修改包名实现app分身
查看>>
NDK静态注册之调用C层并返回字符串
查看>>
AndroidStudio踩坑记
查看>>
go-colly官方文档翻译(持续翻译中)
查看>>
adb禁用手机更新
查看>>
partition 函数使用练习
查看>>
set容器的并、交、差
查看>>
关于insert_iterator和inserter
查看>>
test
查看>>
关于拷贝构造函数
查看>>