transform算子源码:
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
* 返回一个新的 DStream,其中每个 RDD 是通过对 'this' DStream 的每个 RDD 应用一个函数来生成的。
*/
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
transform((r: RDD[T], _: Time) => cleanedF(r))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}
new TransformedDStream[U](Seq(this), realTransformFunc)
}
返回一个新的 DStream,其中每个 RDD 是通过对 DStream 的每个 RDD 应用一个函数来生成的。这说明它是不能使用action算子的,只是对rdd进行转换的操作,最终还是返回一个rdd。
而foreachRDD算子就是拿到DStream的每一个rdd进行操作(这样的意义是对于rdd的操作算子更加丰富)