快捷搜索:  汽车  科技

spark的参数调整(Dataset源码分析--Dataset.head)

spark的参数调整(Dataset源码分析--Dataset.head)返回Dataset。/** * Convert a `BaseRelation` created for external data sources into a `DataFrame`. * * @since 2.0.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self LogicalRelation(baseRelation)) } Dataset.ofRow函数: private def loadV1Source(paths: String*) = { // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply(

spark的参数调整(Dataset源码分析--Dataset.head)(1)

1.Dataset

看下Dataset类的注释。

* Datasets are "lazy" i.e. computations are only triggered when an action is invoked. Internally * a Dataset represents a logical plan that describes the computation required to produce the data. * When an action is invoked Spark's query optimizer optimizes the logical plan and generates a * physical plan for efficient execution in a parallel and distributed manner. To explore the * logical plan as well as optimized physical plan use the `explain` function. 1.1.DataSet Head()函数源码分析

分析的代码就是下面的两行:

val ds = spark.read.text("data/test.txt") ds.head(2).foreach(println)

  • 1.1.1.spark.read.text("data/test.txt") 代码分析:

DataFramerReader.loadV1Source()函数:

调用sparkSession.baseRelationToDataFrame函数。

private def loadV1Source(paths: String*) = { // Code path for data source v1. sparkSession.baseRelationToDataFrame( DataSource.apply( sparkSession paths = paths userSpecifiedSchema = userSpecifiedSchema className = source options = extraOptions.toMap).resolveRelation()) }

sparkSession.baseRelationToDataFrame函数:

转化外部数据源成为DataFrame LogicalRelation类继承了Logicalplan

调用Dataset.ofRow函数。

/** * Convert a `BaseRelation` created for external data sources into a `DataFrame`. * * @since 2.0.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { Dataset.ofRows(self LogicalRelation(baseRelation)) }

Dataset.ofRow函数:

返回Dataset。

def ofRows(sparkSession: SparkSession logicalPlan: LogicalPlan): DataFrame = { //返回QueryExecution val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() new Dataset[Row](sparkSession qe RowEncoder(qe.analyzed.schema)) }

  • 1.1.2.ds.head(2)代码分析

head()函数:

调用withAction函数

def head(n: Int): Array[T] = withAction("head" limit(n).queryExecution)(collectFromPlan)

withAction函数:

调用SparkSession.collectFromPlan函数

/** * Wrap a Dataset action to track the QueryExecution and time cost then report to the * user-registered callback functions. */ private def withAction[U](name: String qe: QueryExecution)(action: SparkPlan => U) = { try { qe.executedPlan.foreach { plan => plan.resetMetrics() } val start = System.nanoTime() val result = SQLExecution.withNewExecutionId(sparkSession qe) { action(qe.executedPlan) } val end = System.nanoTime() sparkSession.listenerManager.onSuccess(name qe end - start) result } catch { case e: Exception => sparkSession.listenerManager.onFailure(name qe e) throw e } }

collectFromPlan函数:

从sparkPlan收集所有元素。

调用CollectLimitExec.executeCollect()函数。

/** * Collect all elements from a spark plan. */ private def collectFromPlan(plan: SparkPlan): Array[T] = { // This projection writes output to a `InternalRow` which means applying this projection is not // thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe. val objProj = GenerateSafeProjection.generate(deserializer :: Nil) //调用CollectLimitExec的executeCollect 函数 plan.executeCollect().map { row => // The row returned by SafeProjection is `SpecificInternalRow` which ignore the data type // parameter of its `get` method so it's safe to use null here. objProj(row).get(0 null).asInstanceOf[T] } }

CollectLimitExec.executeCollect()函数: 调用sparkPlan.executeTake() override def executeCollect(): Array[InternalRow] = child.executeTake(limit) sparkPlan.executeTake()函数: 调用SparkPlan.getByteArrayRDD()获取rdd. 提交spark任务。 /** * Runs this query returning the first `n` rows as an array. * * This is modeled after `RDD.take` but never runs any job locally on the driver. */ def executeTake(n: Int): Array[InternalRow] = { if (n == 0) { return new Array[InternalRow](0) } val childRDD = getByteArrayRdd(n).map(_._2) val buf = new ArrayBuffer[InternalRow] val totalParts = childRDD.partitions.length var partsScanned = 0 while (buf.size < n && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L if (partsScanned > 0) { // If we didn't find any rows after the previous iteration quadruple and retry. // Otherwise interpolate the number of partitions we need to try but overestimate // it by 50%. We also cap the estimation in the end. val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor 2) if (buf.isEmpty) { numPartsToTry = partsScanned * limitScaleUpFactor } else { val left = n - buf.size // As left > 0 numPartsToTry is always >= 1 numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt numPartsToTry = Math.min(numPartsToTry partsScanned * limitScaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned numPartsToTry totalParts).toInt) val sc = sqlContext.sparkContext val res = sc.runJob(childRDD (it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte] p) buf = res.flatMap(decodeUnsafeRows) partsScanned = p.size } if (buf.size > n) { buf.take(n).toArray } else { buf.toArray } } SparkPlan.getByteArrayRdd()函数: 调用SparkPlan.execute()函数 /** * Packing the UnsafeRows into byte array for faster serialization. * The byte arrays are in the following format: * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] * * UnsafeRow is highly compressible (at least 8 bytes for any column) the byte array is also * compressed. */ private def getByteArrayRdd(n: Int = -1): RDD[(Long Array[Byte])] = { execute().mapPartitionsInternal { iter => var count = 0 val buffer = new Array[Byte](4 << 10) // 4K val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) // `iter.hasNext` may produce one row and buffer it we should only call it when the limit is // not hit. while ((n < 0 || count < n) && iter.hasNext) { val row = iter.next().asInstanceOf[UnsafeRow] out.writeInt(row.getSizeInBytes) row.writeToStream(out buffer) count = 1 } out.writeInt(-1) out.flush() out.close() Iterator((count bos.toByteArray)) } } SparkPlan.execute函数: 调用WholeStageCodegenExec.doExecute()方法 /** * Returns the result of this query as an RDD[InternalRow] by delegating to `doExecute` after * preparations. * * Concrete implementations of SparkPlan should override `doExecute`. */ final def execute(): RDD[InternalRow] = executeQuery { if (isCanonicalizedPlan) { throw new IllegalStateException("A canonicalized plan is not supposed to be executed.") } 执行WholeStageCodegenExec.doExecute()方法 doExecute() }

WholeStageCodegenExec.doExecute函数:

调用FileSourceScanExec.inputRDD()方法

override def doExecute(): RDD[InternalRow] = { val (ctx cleanedSource) = doCodeGen() // try to compile and fallback if it failed val (_ maxCodeSize) = try { CodeGenerator.compile(cleanedSource) } catch { case NonFatal(_) if !Utils.isTesting && sqlContext.conf.codegenFallback => // We should already saw the error message logWarning(s"Whole-stage codegen disabled for plan (id=$codegenStageId):\n $treeString") return child.execute() } // Check if compiled code has a too large function if (maxCodeSize > sqlContext.conf.hugeMethodLimit) { logInfo(s"Found too long generated codes and JIT optimization might not work: " s"the bytecode size ($maxCodeSize) is above the limit " s"${sqlContext.conf.hugeMethodLimit} and the whole-stage codegen was disabled " s"for this plan (id=$codegenStageId). To avoid this you can raise the limit " s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString") child match { // The fallback solution of batch file source scan still uses WholeStageCodegenExec case f: FileSourceScanExec if f.supportsBatch => // do nothing case _ => return child.execute() } } val references = ctx.references.toArray val durationMs = longMetric("pipelineTime") val rdds = child.asInstanceOf[CodegenSupport].inputRDDs() assert(rdds.size <= 2 "Up to two input RDDs can be supported") if (rdds.length == 1) { rdds.head.mapPartitionsWithIndex { (index iter) => val (clazz _) = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index Array(iter)) new Iterator[InternalRow] { override def hasNext: Boolean = { val v = buffer.hasNext if (!v) durationMs = buffer.durationMs() v } override def next: InternalRow = buffer.next() } } } else { // Right now we support up to two input RDDs. rdds.head.zipPartitions(rdds(1)) { (leftIter rightIter) => Iterator((leftIter rightIter)) // a small hack to obtain the correct partition index }.mapPartitionsWithIndex { (index zippedIter) => val (leftIter rightIter) = zippedIter.next() val (clazz _) = CodeGenerator.compile(cleanedSource) val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator] buffer.init(index Array(leftIter rightIter)) new Iterator[InternalRow] { override def hasNext: Boolean = { val v = buffer.hasNext if (!v) durationMs = buffer.durationMs() v } override def next: InternalRow = buffer.next() } } } }

inputRDD函数:

调用createBucketedReadRDD()创建RDD

private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession dataSchema = relation.dataSchema partitionSchema = relation.partitionSchema requiredSchema = requiredSchema filters = pushedDownFilters options = relation.options hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing readFile selectedPartitions relation) case _ => createNonBucketedReadRDD(readFile selectedPartitions relation) } } override def inputRDDs(): Seq[RDD[InternalRow]] = { inputRDD :: Nil }

createBucketedReadRDD()函数:

返回FileScanRDD对象,接下来spark就可以提交任务了。

/** * Create an RDD for bucketed reads. * The non-bucketed variant of this function is [[createNonBucketedReadRDD]]. * * The algorithm is pretty simple: each RDD partition being returned should include all the files * with the same bucket id from all the given Hive partitions. * * @param bucketSpec the bucketing spec. * @param readFile a function to read each (part of a) file. * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. */ private def createBucketedReadRDD( bucketSpec: BucketSpec readFile: (PartitionedFile) => Iterator[InternalRow] selectedPartitions: Seq[PartitionDirectory] fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => p.files.map { f => val hosts = getBlockHosts(getBlockLocations(f) 0 f.getLen) PartitionedFile(p.values f.getPath.toUri.toString 0 f.getLen hosts) } }.groupBy { f => BucketingUtils .getBucketId(new Path(f.filePath).getName) .getOrElse(sys.error(s"Invalid bucket file ${f.filePath}")) } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { val bucketSet = optionalBucketSet.get filesGroupedToBuckets.filter { f => bucketSet.get(f._1) } } else { filesGroupedToBuckets } val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => FilePartition(bucketId prunedFilesGroupedToBuckets.getOrElse(bucketId Nil)) } new FileScanRDD(fsRelation.sparkSession readFile filePartitions) }

以上,是大体的流程。具体的QueryExecution没有讲,之后会专门分析。

猜您喜欢: