RDD Design

Abstract

  • use transient

  • tf/act functions and internal functions

Member value/variables

/*
  RDD.scala 
*/

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
  
    
    private def sc: SparkContext = {}
    private[spark] def conf = sc.conf
    
    val id: Int = sc.newRddId()
    @transient var name: String = _
    
    /* state synchronization */
    private val stateLock = new Serializable {}
    
    /* partition */
    @transient val partitioner: Option[Partitioner] = None
    @volatile @transient private var partitions_ : Array[Partition] = _
    
    /* dependencies */
    @volatile private var dependencies_ : Seq[Dependency[_]] = _
    
    /* storagelevel */
    private var storageLevel: StorageLevel = StorageLevel.NONE
    
    /* creation related */
    @transient private[spark] val creationSite = sc.getCallSite()
    @transient private[spark] val scope: Option[RDDOperationScope] = 
    
    /* checkpoint related */
    private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
    private val checkpointAllMarkedAncestors =
    @transient private var doCheckpointCalled = false
    @transient protected lazy val isBarrier_ : Boolean =
    private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value =
    
  }

Design of Transformation functions

private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)


/* 1. input */
def foo(f: T ): RDD = withScope {    /* 2. withScope */

    /* 3. closure */
    val cleanedF = sc.clean(f)
    
    /* 4. return value */
    new SomeRDD()    // if returning new RDD
    this.bar()       // if transforming from itself
}

Not all, but very common tf functions are designed like above

  1. input RDD

  2. withScope ; 자세한 내용은 operation scope 개념 참고.

  3. cleanedF ; 자세한 내용은 sparkcontext.clean() g함수 참고.

  4. returning RDD

    • method chaining 기술이 도입됨.

꼭 위와 같은 것은 아님. 그냥 많은 경우 이러하는 것임.ㄷ

Extra functions

for some rdds, it introduces implicit conversion to some special class

ㅇㅇ이거 어떻게 구현한건지 모르겠음..

object RDD

rddToPairRDDFunctions
rddToAsyncRDDActions
rddToSequenceFileRDDFunctions
... 3 more

각각 추가된 tf / act 있는데, tf / their design 에 대해 소

DeterministicLevel

중간중간 사용

* Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]]
* for the definition.
*
* By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is DETERMINATE. For
* RDDs with parents, we will generate a deterministic level candidate per parent according to
* the dependency. The deterministic level of the current RDD is the deterministic level
* candidate that is deterministic least. Please override [[getOutputDeterministicLevel]] to
* provide custom logic of calculating output deterministic level.

Last updated