对其性能方面做一个直观的理解
/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): Map[K, V] = self.withScope {
// 获取所有的数据
val data = self.collect()
// 声明一个HashMap
val map = new mutable.HashMap[K, V]
// 设置map的长度
map.sizeHint(data.length)
//循环将数据放入
data.foreach { pair => map.put(pair._1, pair._2) }
map
}
注意: val data = self.collect()
直接查看collect 源码
/**
* Return an array that contains all of the elements in this RDD.
* 将RDD 转换为数组返回
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
所以在使用collectAsMap的时候要慎重!!!!!!!
Spark中的collect操作是将远程数据通过网络传输到本地,如果数据量特别大的话,会造成很大的网络压力,更为严重的问题是会造成内存溢出(如 Driver)。
接下来我们简单测试一下:
代码:
import java.util.UUID
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
object collectAsMapTest {
def main(args: Array[String]): Unit = {
val buff=ArrayBuffer[String]()
val num = args(0).toInt
for( x <- 1 to num ) {
buff+= UUID.randomUUID.toString
}
val startTime = System.currentTimeMillis() ;
val conf = new SparkConf()
// conf.setMaster("local")
// System.setProperty("HADOOP_USER_NAME","hadoop")
conf.set("spark.driver.maxResultSize","10G")
val spark = SparkSession.builder().config(conf).appName("collectAsMapTest").getOrCreate()
val sc = spark.sparkContext
val data = sc.parallelize(buff).map(x => {
val data = new Array[Byte](1024) ;
(x,data)
}).collectAsMap()
println(" collectAsMap total use : "+ ( (System.currentTimeMillis() - startTime)/1000 ) + " s ; broadData : " + data.size)
val broadcastTime = System.currentTimeMillis()
val broadData = sc.broadcast(data)
println(" broadcast total use : "+ ( (broadcastTime - startTime )/1000 ) + " s ; broadData : " + broadData.value.size)
println("total use : "+ ( (System.currentTimeMillis() - startTime)/1000 ) + " s ; broadData : " + broadData.value.size)
}
}
为了能看到如何运行的,我在集群上跑的, 用的yarn client模式
spark-submit \ --master yarn \ --deploy-mode client\ --class com.zl.collectAsMapTest \ --driver-memory 30g \ --num-executors 3 \ --executor-memory 7g \ --executor-cores 3 \ /data/core-test.jar 10000 |
记得调整代码里面的 spark.driver.maxResultSize , 否则会报错。 默认值为1G 。
首先计算数据大小,心里好对数据量有个评估。
每条数据, 结构 : key , value == >(ea5542c3-7b6c-48c1-ab8f-9ba7d775e111 , 1kb )
也就是说, 一条数据大约 1kb (大约能存放1024个英文字符, 431个中文. 应该够用了)
输出结果如下:
| 1 千条 | 1 万条 | 10万条 | 100万条 | 300万条 | 500万 |
数据大小(大约) | 1M | 10M | 100M | 1G | 3.5G | 5G |
用时 | 6s | 6s | 7s | 18s | 35s | OutOfMemoryError |
其实最重要的是在申请的资源有限的情况下, 数据量不能过大,否者会造成OutOfMemoryError !!!!!!!!!
而且collectAsMap 这个是单线程操作,性能会降低