Spark学习(四)

|

[TOC]

一、广播变量

1、广播变量理解图

使用广播变量将list广播出去,spark会将广播变量list放到Excutor中的BlockManager中管理;

tasks(若干个task)会到BlockManager(管理数据)中获取广播变量 ,

2、广播变量的使用

Java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.bd.java.core;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;

public class BroadCast {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("broadcast");
JavaSparkContext sc = new JavaSparkContext(conf);
//List中已经实现了序列化,可以用于跨网络传输
List<String> list = Arrays.asList("hello bjsxt");
//广播变量将list广播出去
final Broadcast<List<String>> broadCastList = sc.broadcast(list);

JavaRDD<String> lines = sc.textFile("data/word.txt");
JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {

private static final long serialVersionUID = 1L;
@Override
public Boolean call(String s) throws Exception {
//匿名内部类中使用的变量在声明时必须使用final修饰
return broadCastList.value().contains(s);
}
});
result.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
});
sc.close();
}
}

Scala:

1
2
3
4
5
6
7
8
9
10
11
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => {
println(broadCast.value)
broadCast.value.contains(x)
}.foreach { println}
sc.stop()

3、注意事项

  • 为什么使用广播变量?

    提高网络传输速率,避免worker端内存浪费。 sparkcontext.broadcast(list);将一个集合作为参数。

  • 能不能将一个 RDD 使用广播变量广播出去?
    不能,因为RDD是不存储数据的。可以将RDD的collect结果广播出去。

  • 广播变量只能在 Driver 端定义,在Executor端使用,不能在 Executor 端定义。

  • 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值

  • 代码中,算子内部执行是在Executor端,其余的在Driver端

  • 序列化:用于机器之间跨网络传输时,要将文件序列化到磁盘才可完成传输

  • 内存大会频繁的gc(垃圾回收)就会卡顿,如果内存还不够,就会报oom(内存溢出)

二、累加器

1、累加器理解图

2、累加器的使用

Java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.bd.java.core;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;

/**
* 累加器在Driver端定义赋初始值和读取,在Executor端累加。
*/
public class AccumulatorOperator {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("accumulator");
JavaSparkContext sc = new JavaSparkContext(conf);
//获取累加器:初始值为0
final Accumulator<Integer> accumulator = sc.accumulator(0);
sc.textFile("data/word.txt",2).foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
accumulator.add(1);
//不能再Executor端获取accumulator.value()来触发累加
// System.out.println(accumulator.value());
System.out.println(accumulator);
}
});
// accumulator.value 写法只能在driver端,用来汇总累加器的值
//excutor端的task只能用accumulator的写法来查看数据
System.out.println(accumulator.value());
sc.stop();
}
}

Scala:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0)
/*
val count = 0
sc.textFile("./words.txt").foreach { x =>{
count+=1
println("count:"+count)
}}
//結果count打印为0 , 因为count未能序列化,无法实现跨网络传输
*/
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()

注意:

  • 累加器在Driver端定义赋初始值,累加器只能在Driver端读取,在 Excutor 端更新。
  • 不能再Executor端获取accumulator.value()来触发累加,它是用来汇总累加器的值
  • accumulator.value 写法只能在driver端,excutor端的task只能用accumulator的写法来查看数据
  • 序列化:用于机器之间跨网络传输时,要将文件序列化到磁盘才可完成传输

三、SparkShuffle

1、SparkShuffle 概念

reduceByKey 会将上一个 RDD 中的每一个 key 对应的所有 value 聚合成一个 value,然后生成一个新的 RDD,元素类型是<key,value>对的形式,这样每一个 key 对应一个聚合起来的 value。

问题聚合之前,每一个 key 对应的 value 不一定都是在一个 partition中,也不太可能在同一个节点上,因为 RDD 是分布式的弹性的数据集,RDD 的 partition 极有可能分布在各个节点上。

如何聚合?

  • – – Shuffle Write :上一个 stage 的每个 map task 就必须保证将自己处理的当前分区的数据相同的 key 写入一个分区文件中,可能会写入多个不同的分区文件中。
  • – – Shuffle Read :reduce task 就会从上一个 stage 的所有 task 所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个 key 所对应的 value 都会汇聚到同一个节点上去处理和聚合。
  • Spark 中有两种 Shuffle 类型,HashShuffle 和 SortShuffle,Spark1.2之HashShuffle 默认的分区器是 HashPartitioner,Spark1.2 引入SortShuffle 默认的分区器是 RangePartitioner。

2、HashShuffle

1> 普通机制

  • 普通机制示意图

  • 执行流程
    a) 每一个 map task 将不同结果写到不同的 buffer 中,每个buffer 的大小为 32K。buffer 起到数据缓存的作用。

    b) 每个 buffer 文件最后对应一个磁盘小文件。

    c) reduce task 来拉取对应的磁盘小文件。

  • 总结
    ① .map task 的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。
    ReduceTask 会去 Map 端拉取相应的磁盘小文件。
    ② .产生的磁盘小文件的个数:M(map task 的个数)*R(reduce task 的个数)

  • 存在的问题
    产生的磁盘小文件过多,会导致以下问题:
    a) 在 Shuffle Write 过程中会产生很多写磁盘小文件的对象。

    b) 在 Shuffle Read 过程中会产生很多读取磁盘小文件的对象。

    c) 在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存 的话,就会 OOM。

    d) 在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致 shuffle file cannot find ,由于这个错误导致的 task 失败,TaskScheduler 不负责重试,由 DAGScheduler 负责重试 Stage。

2> 合并机制

 合并机制示意图

  • 执行流程

1)在只有一个核的Excutor中运行的tasks共用buffer。

2)在提交Application时 , 会给它指定核数core

3)在同一个核core中处理的task属于同一个句柄(操作系统的概念),处理的便是同一个文件。

  • 总结
    产生磁盘小文件的个数:C(core 的个数)*R(reduce 的个数)

3、SortShuffle

1> 普通机制

 普通机制示意图

  • 执行流程

    a) map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是 5M

    b) 在 shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过 5M 时,比如现在内存结构中的数据为 5.01M,那么他会申请 5.01*2-5=5.02M 内存给内存数据结构。

    c) 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。

    d) 在溢写之前内存结构中的数据会进行排序分区

    e) 然后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是 1 万条数据,

    f) map task 执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。

    g) reduce task 去 map 端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。

  • 总结
    产生磁盘小文件的个数: 2*M(map task 的个数)

2>bypass 机制

 bypass 机制示意图

 总结

① .bypass 运行机制的触发条件如下:相较于普通机制少了排序
shuffle reduce task 的 数 量 小 于spark.shuffle.sort.bypassMergeThreshold 的参数值。这个值默认是 200。
② .产生的磁盘小文件为:2*M(map task 的个数)

4、Shuffle 文件寻址

1)MapOutputTracker

MapOutputTracker是Spark架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。

  • MapOutputTrackerMaster 是主对象,存在于 Driver 中。

  • MapOutputTrackerWorker 是从对象,存在于 Excutor 中。

2) BlockManager

BlockManager 块管理者(数据管理),是 Spark 架构中的一个模块,也是一个主从架构。

  • BlockManagerMaster,主对象,存在于 Driver 中。BlockManagerMaster 会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知 BlockManagerSlave 传输或者删除数据。
  • BlockManagerWorker,从对象,存在于 Excutor 中。BlockManagerWorker 会与 BlockManagerWorker 之间通信。
  • 无论在 Driver 端的 BlockManager 还是在 Excutor 端的BlockManager 都含有四个对象:
    ① DiskStore:负责磁盘的管理。
    ② MemoryStore:负责内存的管理。
    ③ ConnectionManager:负责连接其他的BlockManagerWorker。
    ④ BlockTransferService:负责数据的传输。

3) Shuffle 文件寻址图

4) Shuffle 文件寻址流程

a) 当 map task 执行完成后,会将 task 的执行情况和磁盘小文件的地址封装到 MapStatus 对象中,通过MapOutputTrackerWorker对象向 Driver 中的MapOutputTrackerMaster 汇报。

b) 在所有的 map task 执行完毕后,Driver 中就掌握了所有的磁盘小文件的地址。

c) 在 reduce task 执行之前,会通过 Excutor 中MapOutPutTrackerWorker 向 Driver 端的MapOutputTrackerMaster 获取磁盘小文件的地址。

d) 获取到磁盘小文件的地址后,会通过 BlockManager 中的ConnectionManager 连接数据所在节点上的
ConnectionManager,然后通过 BlockTransferService 进行数据的传输。

e) BlockTransferService 默认启动 5 个 task 去节点拉取数据。默认情况下,5 个 task 拉取数据量不能超过 48M。

四、Spark 内存管理

Spark 执行应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,

  • Driver 负责创建 SparkContext 上下文,提交任务,task 的分发等。
  • Executor 负责 task 的计算任务,并将结果返回给 Driver。同时需要为需要持久化的 RDD 提供储存。
  • Driver 端的内存管理比较简单,这里所说的 Spark内存管理针对 Executor 端的内存管理。

Spark 内存管理分为静态内存管理统一内存管理,Spark1.6 之前使用的是静态内存管理,Spark1.6 之后引入了统一内存管理。

  • 静态内存管理中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以在应用程序启动前进行配置。
  • 统一内存管理与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以互相借用对方的空间。
    Spark1.6 以 上 版 本 默 认 使 用 的 是 统 一 内 存 管 理 , 可 以 通 过 参 数spark.memory.useLegacyMode 设置为 true (默认为 false)使用静态内存管理。

官网:内存管理配置

1、静态内存管理分布图

(https://ws1.sinaimg.cn/large/005zftzDgy1g2087fcm4xj30t80hm0vy.jpg)

1
spark.memory.useLegacyMode	false

是否开启静态内存管理, 默认是false ,若设置为true , 以下三个配置才会生效

1
2
3
4
> spark.storage.memoryFraction	0.6
> spark.shuffle.memoryFraction 0.2
> spark.storage.unrollFraction 0.2
>

2、统一内存管理分布图

1
2
3
4
spark.memory.fraction	0.75
spark.memory.storageFraction 0.5
spark.memory.offHeap.enabled false
spark.memory.offHeap.size 0

3、reduce 中 OOM 如何处理?

1) 减少每次拉取的数据量

2) 提高 shuffle 聚合的内存比例

3) 提高 Excutor 的总内存

五、Shuffle 调优

1、SparkShuffle 调优配置项如何使用?

1) 在代码中,不推荐使用,硬编码。

new SparkConf().set(“spark.shuffle.file.buffer”,”64”)

2) 在提交 spark 任务的时候,推荐使用。
spark-submit –conf spark.shuffle.file.buffer=64 –conf ….

3) 在 conf 下的 spark-default.conf 配置文件中,不推荐,因为是写死后所有应用程序都要用。

详情请看下一篇:SparkShuffle调优

文章目录
  1. 一、广播变量
    1. 1、广播变量理解图
    2. 2、广播变量的使用
    3. 3、注意事项
  2. 二、累加器
    1. 1、累加器理解图
    2. 2、累加器的使用
  3. 三、SparkShuffle
    1. 1、SparkShuffle 概念
    2. 2、HashShuffle
      1. 1> 普通机制
      2. 2> 合并机制
    3. 3、SortShuffle
      1. 1> 普通机制
      2. 2>bypass 机制
    4. 4、Shuffle 文件寻址
      1. 1)MapOutputTracker
      2. 2) BlockManager
      3. 3) Shuffle 文件寻址图
      4. 4) Shuffle 文件寻址流程
  4. 四、Spark 内存管理
    1. 1、静态内存管理分布图
    2. 2、统一内存管理分布图
    3. 3、reduce 中 OOM 如何处理?
  5. 五、Shuffle 调优
    1. 1、SparkShuffle 调优配置项如何使用?
|
载入天数...载入时分秒...