/*
RDD.scala
*/
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
private def sc: SparkContext = {}
private[spark] def conf = sc.conf
val id: Int = sc.newRddId()
@transient var name: String = _
/* state synchronization */
private val stateLock = new Serializable {}
/* partition */
@transient val partitioner: Option[Partitioner] = None
@volatile @transient private var partitions_ : Array[Partition] = _
/* dependencies */
@volatile private var dependencies_ : Seq[Dependency[_]] = _
/* storagelevel */
private var storageLevel: StorageLevel = StorageLevel.NONE
/* creation related */
@transient private[spark] val creationSite = sc.getCallSite()
@transient private[spark] val scope: Option[RDDOperationScope] =
/* checkpoint related */
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
private val checkpointAllMarkedAncestors =
@transient private var doCheckpointCalled = false
@transient protected lazy val isBarrier_ : Boolean =
private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value =
}
Design of Transformation functions
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
/* 1. input */
def foo(f: T ): RDD = withScope { /* 2. withScope */
/* 3. closure */
val cleanedF = sc.clean(f)
/* 4. return value */
new SomeRDD() // if returning new RDD
this.bar() // if transforming from itself
}
Not all, but very common tf functions are designed like above
input RDD
withScope ; 자세한 내용은 operation scope 개념 참고.
cleanedF ; 자세한 내용은 sparkcontext.clean() g함수 참고.
returning RDD
method chaining 기술이 도입됨.
꼭 위와 같은 것은 아님. 그냥 많은 경우 이러하는 것임.ㄷ
Extra functions
for some rdds, it introduces implicit conversion to some special class
ㅇㅇ이거 어떻게 구현한건지 모르겠음..
object RDD
rddToPairRDDFunctions
rddToAsyncRDDActions
rddToSequenceFileRDDFunctions
... 3 more
각각 추가된 tf / act 있는데, tf / their design 에 대해 소
DeterministicLevel
중간중간 사용
* Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]]
* for the definition.
*
* By default, an reliably checkpointed RDD, or RDD without parents(root RDD) is DETERMINATE. For
* RDDs with parents, we will generate a deterministic level candidate per parent according to
* the dependency. The deterministic level of the current RDD is the deterministic level
* candidate that is deterministic least. Please override [[getOutputDeterministicLevel]] to
* provide custom logic of calculating output deterministic level.