您的当前位置:首页正文

Spark2.3.2源码解析: 11. collectAsMap 源码解析& 性能测试

2024-11-27 来源:个人技术集锦

 

本文主要是为了认识collectAsMap 的源码, 以及在使用的时候性能。

对其性能方面做一个直观的理解

 

/**
 * Return the key-value pairs in this RDD to the master as a Map.
 *
 * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
 *          one value per key is preserved in the map returned)
 *
 * @note this method should only be used if the resulting data is expected to be small, as
 * all the data is loaded into the driver's memory.
 */
def collectAsMap(): Map[K, V] = self.withScope {
  // 获取所有的数据
  val data = self.collect()
  // 声明一个HashMap
  val map = new mutable.HashMap[K, V]
  // 设置map的长度
  map.sizeHint(data.length)
  //循环将数据放入
  data.foreach { pair => map.put(pair._1, pair._2) }
  map
}

 

注意:  val data = self.collect() 

 

直接查看collect 源码

 

/**
 * Return an array that contains all of the elements in this RDD.
 * 将RDD 转换为数组返回
 * @note This method should only be used if the resulting array is expected to be small, as
 * all the data is loaded into the driver's memory.
 */
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

所以在使用collectAsMap的时候要慎重!!!!!!!

     Spark中的collect操作是将远程数据通过网络传输到本地,如果数据量特别大的话,会造成很大的网络压力,更为严重的问题是会造成内存溢出(如 Driver)。
 

 

接下来我们简单测试一下:

代码:

import java.util.UUID

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import scala.collection.mutable.ArrayBuffer

object collectAsMapTest {

  def main(args: Array[String]): Unit = {



    val buff=ArrayBuffer[String]()


    val num  = args(0).toInt


    for( x <- 1 to num ) {
      buff+= UUID.randomUUID.toString

    }



    val startTime = System.currentTimeMillis() ;




    val conf = new SparkConf()

//    conf.setMaster("local")
//    System.setProperty("HADOOP_USER_NAME","hadoop")

    conf.set("spark.driver.maxResultSize","10G")
    val spark = SparkSession.builder().config(conf).appName("collectAsMapTest").getOrCreate()


    val sc = spark.sparkContext

    val data = sc.parallelize(buff).map(x => {
      val data = new Array[Byte](1024) ;
      (x,data)
    }).collectAsMap()

    println(" collectAsMap  total use :       "+ ( (System.currentTimeMillis() - startTime)/1000 ) + " s  ;  broadData : " + data.size)

    val  broadcastTime = System.currentTimeMillis()


    val broadData = sc.broadcast(data)

    println(" broadcast  total use :       "+ ( (broadcastTime - startTime )/1000 ) + " s  ;  broadData : " + broadData.value.size)

    println("total use :       "+ ( (System.currentTimeMillis() - startTime)/1000 ) + " s  ;  broadData : " + broadData.value.size)


  }


}

 

为了能看到如何运行的,我在集群上跑的, 用的yarn client模式

 

spark-submit \

--master yarn \

--deploy-mode client\

--class com.zl.collectAsMapTest \

--driver-memory 30g \

--num-executors 3 \

--executor-memory 7g \

--executor-cores 3 \

/data/core-test.jar  10000

 

记得调整代码里面的  spark.driver.maxResultSize , 否则会报错。  默认值为1G 。 

 

首先计算数据大小,心里好对数据量有个评估。

每条数据, 结构 :  key  , value   == >(ea5542c3-7b6c-48c1-ab8f-9ba7d775e111   , 1kb )

 

也就是说, 一条数据大约 1kb  (大约能存放1024个英文字符, 431个中文. 应该够用了)

 

输出结果如下:

 

1 千条

1 万条

10万条

100万条

300万条

500万

数据大小(大约)

1M

10M

100M

1G

3.5G

5G

用时

6s

6s

7s

18s

35s

OutOfMemoryError

 

 

 

 

 

 

其实最重要的是在申请的资源有限的情况下, 数据量不能过大,否者会造成OutOfMemoryError !!!!!!!!!

而且collectAsMap 这个是单线程操作,性能会降低

 

 

 

 

 

 

显示全文