Spark学习(二)

|

[TOC]

一、控制算子

1、概念:

  • 控制算子有三种,cache、persist、checkpoint

  • 以上算子都可以将RDD 持久化,持久化的单位是 partition。

  • cache 和 persist 都是懒 执行的。

  • 必须有一个 action 类算子触发执行。

  • cache 和 persist 算子的返回值可赋值给一个变量,在其他 job 中直接使用这个变量就是使用持久化的数据了

  • checkpoint 算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系(所有父RDD)。

  • 错误:rdd.cache().count() 返回的不是持久化的 RDD,而是一个数值了。

2、详解

:one:​ cache
默认将 RDD 的数据持久化到内存中。cache 是懒执行。

  • 注意

chche () =persist()=persist(StorageLevel.Memory_Only)

:two: persist

支持指定持久化级别

useOffHeap 使用堆外内存

disk、memory、offheap、deserialized(不序列化)、replication(副本数,默认为1)

序列化:压缩数据(节省空间,使用数据时要反序列化,会额外消耗CPU性能)

none 、disk_only、disk_only_2、memeory_only 、memeory_only _ser 、 memory_and_disk 、 memory_and_disk_2

:three: checkpoint

checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关系。

  • checkpoint 的执行原理:
  1. 当 RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。
  2. 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的RDD 做一个标记。
  3. Spark 框架会自动启动一个新的 job,重新计算这个 RDD 的数据,将数据持久化到 HDFS 上。
  • 优化:

对 RDD 执行 checkpoint 之前,最好对这个 RDD 先执行cache,这样新启动的 job 只需要将内存中的数据拷贝到 HDFS上就可以,省去了重新计算这一步。

持久化级别:如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val cocnf = new SparkConf()
conf.setMaster("local").setAppname("count")
val context = new SparkContext()

//设置CP在HDFS上的路径
context.setCheckPointDir("")

val lineADD = context.textFile("./countword.txt")
val time1 = System.currentTimeMillis()
val c = lineADD.count()
val time2 = System.currentTimeMillis()
val t1 = time2 - time1

//做缓存(persisit(m_o))
linelineADD = lineADD.cache()
//做持久化
lineADD.persisit(StorageLevel.memory_only)
//checkpoint 容错,最好还有cache
lineADD.checkpoint()

val time3 = System.currentTimeMillis()
val c = lineADD.count()
val time4 = System.currentTimeMillis()
val t2 = time4 - time3

//t1 远大于 t2

二、算子补充

transformation转换算子

1
2
> join,leftOuterJoin,rightOuterJoin,fullOuterJoin
>

作用在 K,V 格式的 RDD 上。根据 K 进行连接,对(K,V)join(K,W)返回(K,(V,W))

  • join 后的分区数与父 RDD 分区数多的那一个相同
1
2
> union
>

合并两个数据集。两个数据集的类型要一致。

  • 返回新的 RDD 的分区数是合并 RDD 分区数的总和。
1
2
> intersection
>

取两个数据集的交集

1
2
>  subtract
>

取两个数据集的差集

1
2
>  mapPartition
>

与 map 类似,遍历的单位是每个 partition 上的数据。

1
2
>  distinct(map+reduceByKey+map)
>
1
2
> cogroup
>

当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable,Iterable))

action触发算子

1
2
>  foreachPartition
>

遍历的数据是每个 partition 的数据。

三、集群搭建及测试

Standalone

1、下载安装包、解压

Spark历史版本下载

注意: 与Hadoop的版本保持对应。

此处使用: spark-1.6.0-bin-hadoop2.6.tgz

1
tar -zvxf spark-1.6.0-bin-hadoop2.6.tgz

2、改名

1
mv spark-1.6.0-bin-hadoop2.6 spark-1.6.0

3、修改slaves

进入安装包的conf目录下,修改slaves.template文件,添加从节点。并保存。

1
2
3
#备份
cp slaves.template slaves
vim slaves

常驻进程:master、worker

配置slaves(与worker对应)

1
2
node01
node02

4、修改 spark-env.sh

改名(备份)

1
cp spark-env.sh.template spark-env.sh

配置spark-env.sh(注意与虚拟机实际配置对应)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#locally
#cluster
#YARN client
#standalone deploy

#配置 java_home 路径
JAVA_HOME=/usr/soft/jdk1.8.0_191
#master 的 ip
SPARK_MASTER_IP=192.168.198.128
#提交任务的端口,默认是 7077
SPARK_MASTER_PORT=7077
#每个 worker 从节点能够支配的 core 的个数
SPARK_WORKER_CORES=1
#每个 worker 从节点能够支配的内存数
SPARK_WORKER_MEMORY=1024m

#配置yarn
HADOOP_CONF_DIR=/usr/soft/hadoop-2.6.5/etc/hadoop

5、其他节点

将spark解压文件发送到其他两个节点

1
2
[root@node00 soft]# scp -r  spark-1.6.0-bin-hadoop2.6 node01:`pwd`
[root@node00 soft]# scp -r spark-1.6.0-bin-hadoop2.6 node02:`pwd`

6、配置环境变量(可不配,因为bin路径中包含start-all ,该命令与hdfs中的命令会冲突)

7、启动:(node00)

在spark的解压文件的/sbin 目录下

1
./start-all.sh

停止

1
./stop-all.sh

显示:

[root@node00 sbin]# ./start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /usr/soft/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploymaster.Master-1-node00.out

node01: starting org.apache.spark.deploy.worker.Worker, logging to /usr/soft/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spar.deploy.worker.Worker-1-node01.out

node02: starting org.apache.spark.deploy.worker.Worker, logging to /usr/soft/spark-1.6.0-bin-hadoop2.6/logs/spark-root-org.apache.spar.deploy.worker.Worker-1-node02.out

查看三台节点的进程

node00(命令启动的节点)

1
2
> [root@node00 sbin]# jps
>

2343 Master
2408 Jps

nose01(配置的从节点)

1
2
> [root@node01 ~]# jps
>

2292 Jps
2229 Worker

node02(从节点)

1
2
> [root@node02 ~]# jps
>

6216 Worker
6266 Jps

注意:

Worker在这里不是真正干活的进程,而是相当于Yarn中的NM。

它是负责管理所在节点资源的、向Master汇报所在节点的信息(如核数、内存数)

Master: 监控任务、分发任务、回收计算结果

8、搭建客户端

  • 将 spark 安装包原封不动的拷贝到一个新的节点上,然后,在新的节点上提交任务即可。

注意:8080 是Spark WEBUI页面的端口 ; 7077 是Spark任务提交的端口

web页面访问:ip:8080

  • 修改master的WEBUI端口,

方法一(永久):通过修改start-master.sh 文件(在/sbin目录下)

1
vim  start-master.sh

找到文件内容如下的部分:

1
2
3
if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi

方法二:在 Master 节点上导入临时环境变量,只作用于当前进程,重启就无效了。

1
[root@node00 sbin]# export SPARK_MASTER_WEBUI_PORT=8080

删除临时变量

1
[root@node00 sbin]# export -n SPARK_MASTER_WEBUI_PORT

standalonestandalone

Yarn

1、步骤

1。2。3。4。5。8。同standalone

不用Master和Worker,所以不用第7步,我们使用的是yarn中的RM和NM

2、配置

添加 HADOOP_CONF_DIR配置

(在使用Yarn时,就能找到关于hdfs的所有配置,其中就包括IP 和Port)

方式一:

编辑spark-env.sh文件

方式二:

1
export HADOOP_CONF_DIR=/usr/soft/hadoop-2.6.5/etc/hadoop

测试:求π值

Pi案例:

源码案例:

路径:在spark解压路径spark-1.6.0-bin-hadoop2.6中

spark-1.6.0-bin-hadoop2.6/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala

原理:随机产生无穷多个点落入如上图形中,求落入圆中的概率:
$$
概率 p = πrr/(2r*2r)=π/4
$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples

import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf("local").setAppName("Spark Pi")
val spark = new SparkContext(conf)

// args 运行时传入的参数 slices 分区数量 (决定task数量)
val slices = if (args.length > 0) args(0).toInt else 2

//MaxValue 一个无限大的数 n 随机产生的十万个的数
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow

//parallelize可以获得RDD ,将1~n个数字放到RDD中
//val count :[Int] = spark.parallelize(1 until n, slices)
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)

println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
// scalastyle:on println

所需使用的jar包:spark-examples-1.6.0-hadoop2.6.0.jar

位置:解压目录的lib路径下

在任一节点的/bin路径下上执行如下命令:(node00)

Standalone 提交命令:

1
2
3
4
5
./spark-submit    #提交spark 
--master spark://node1:7077 #spark主节点的地址和端口
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100 # 指明运行的jar包+路径 和 jar包中执行的包名+类名 100 为传入的参数

./spark-submit --master spark://node00:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 1000

显示:

提交命令的节点(node00主节点)

会显示执行日志、运算结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
> 19/02/13 23:27:31 INFO scheduler.TaskSetManager: Starting task 999.0 in stage 0.0 (TID 999, node02, partition 999,PROCESS_LOCAL, 2158 bytes)
> 19/02/13 23:27:31 INFO scheduler.TaskSetManager: Finished task 995.0 in stage 0.0 (TID 995) in 68 ms on node02 (996/1000)
> 19/02/13 23:27:31 INFO scheduler.TaskSetManager: Finished task 997.0 in stage 0.0 (TID 997) in 131 ms on node01 (997/1000)
> 19/02/13 23:27:31 INFO scheduler.TaskSetManager: Finished task 996.0 in stage 0.0 (TID 996) in 147 ms on node01 (998/1000)
> 19/02/13 23:27:31 INFO scheduler.TaskSetManager: Finished task 999.0 in stage 0.0 (TID 999) in 112 ms on node02 (999/1000)
> 19/02/13 23:27:31 INFO scheduler.TaskSetManager: Finished task 998.0 in stage 0.0 (TID 998) in 115 ms on node02 (1000/1000)
> 19/02/13 23:27:31 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) finished in 79.202 s
> 19/02/13 23:27:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
> 19/02/13 23:27:31 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 82.641779 s
>
> Pi is roughly 3.14148344 #运算结果
>
> 19/02/13 23:27:32 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
> 19/02/13 23:27:32 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
> 19/02/13 23:27:32 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
> 。。。。。。。。。。。。。。。。。。。。。。。。。
> 19/02/13 23:27:32 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
> 19/02/13 23:27:32 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
> 19/02/13 23:27:32 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.198.128:4040
> 19/02/13 23:27:32 INFO cluster.SparkDeploySchedulerBackend: Shutting down all executors
> 19/02/13 23:27:32 INFO cluster.SparkDeploySchedulerBackend: Asking each executor to shut down
> 19/02/13 23:27:32 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
> 19/02/13 23:27:33 INFO storage.MemoryStore: MemoryStore cleared
> 19/02/13 23:27:33 INFO storage.BlockManager: BlockManager stopped
> 19/02/13 23:27:33 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
> 19/02/13 23:27:33 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
> 19/02/13 23:27:33 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
> 19/02/13 23:27:33 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
> 19/02/13 23:27:34 INFO spark.SparkContext: Successfully stopped SparkContext
> 19/02/13 23:27:34 INFO util.ShutdownHookManager: Shutdown hook called
> 19/02/13 23:27:34 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f7c2019e-10f4-4b31-9308-5a94603de113
> 19/02/13 23:27:35 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f7c2019e-10f4-4b31-9308-5a94603de113/httpd-39b8b4b3-9b80-4247-9c7e-ed6bd2dc389f
>
>

在命令执行期间:

在三个节点敲如下命令:jps,会显示:

node00:

1
2
3
4
5
> [root@node00 ~]# jps
> 4903 Jps
> 2343 Master
> 4764 SparkSubmit #代表是提交spark的节点 (与主从无关)
>

node01和node02:

1
2
3
4
5
> [root@node01 bin]# jps
> 2229 Worker
> 5096 CoarseGrainedExecutorBackend #代表是干活的节点 (仅为从节点进程)
> 5167 Jps
>

如果提交命令的节点是从节点(node01),则在该节点上会显示执行日志、运算结果

则在提交过程中,敲命令:jps 该节点会显示

1
2
3
4
5
6
7
> [root@node01 ~]# jps
> 5298 CoarseGrainedExecutorBackend #代表是干活的节点 (仅为从节点进程)
> 2229 Worker
> 5323 Jps
> 5213 SparkSubmit #代表是提交spark的节点 (与主从无关)
>
>

YARN 提交命令:

基于Hadoop :

NN DN JN ZKFC ZK RM RM
node00
node01
node02

启动zookeeper :(3台)

1
zkServer.sh start

启动hdfs :(1台)

1
start-all.sh

相当于:Instead use start-dfs.sh and start-yarn.sh

启动resourcemanager :(在RM的主节点上启动 :1台)

1
yarn-daemon.sh start resourcemanager

在任一节点的/bin路径下执行:(node01)

1
2
3
4
5
./spark-submit
--master yarn #HADOOP_CONF_DIR配置使得在使用Yarn时能找到hdfs的所有配置,其中就有IP 和Port
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 1000

显示

  • 执行日志、计算结果会在执行提交命令的节点上显示

  • 在命令提交过程中在三台节点上敲命令:jps 会显示

node02:

[root@node02 ~]# jps
3406 DataNode
3491 JournalNode
1681 QuorumPeerMain
4133 CoarseGrainedExecutorBackend # 真正干活的进程
4092 ExecutorLauncher # 启动executor
3585 NodeManager
3942 SparkSubmit #提交spark的进程
4217 Jps

四、Standalone 模式两种提交任务方式

1、Standalone-client 提交任务方式

(1)命令提交

  • 在/sbin路径下:启动standalone
1
./start-all.sh
  • 提交spark

方式一:

1
2
3
4
5
./spark-submit
--master spark://node00:7077
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
1000

方式二:

1
2
3
4
5
6
./spark-submit
--master spark://node1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
1000

(2)执行原理图

(3)执行流程

  1. client 模式提交任务后,会在客户端启动 Driver 进程。
  2. Driver 会向 Master 申请启动 Application 启动的资源。
  3. 资源申请成功,Driver 端将 task 发送到 worker 端执行。
  4. worker 将 task 执行结果返回到 Driver 端

(4)总结

  • client 模式适用于测试调试程序。

  • Driver 进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。

  • 在 Driver 端可以看到 task 执行的情况。生产环境下不能使用 client 模式,

是因为

假设要提交 100 个 application 到集群运行,Driver 每次都会在client 端启动,那么就会导致客户端 100 次网卡流量暴增的问题。

2、Standalone-cluster 提交任务方式

(1)命令提交

  • 在/sbin路径下:
1
./start-all.sh
  • 提交spark
1
2
3
4
5
6
./spark-submit
--master spark://node00:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar
1000

注意:

Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的 worker 节点都要有,因为此种方式,spark 不会自动上传jar包。

Standalone-client 和yarn 模式会在提交命令的时候自动uploading 实现jar包共享,

解决方式:

1、将所有的依赖包和文件各放一份在 worker 节点上。

2、将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。(路径需指定为hdfs上的路径)

1
2
3
4
5
6
7
> > ./spark-submit
> > --master spark://node00:7077
> > --deploy-mode cluster
> > --class org.apache.spark.examples.SparkPi
> > hdfs://Sunrise/lib/spark-examples-1.6.0-hadoop2.6.0.jar
> > 1000
> >

(2)执行原理图

(3)执行流程

  1. cluster 模式提交应用程序后,会向 Master 请求启动 Driver.
  2. Master 接受请求,随机在集群一台节点启动 Driver 进程。
  3. Driver 启动后为当前的应用程序申请资源。
  4. Driver 端发送 task 到 worker 节点上执行。
  5. worker 将执行情况和执行结果返回给 Driver 端。

(4)总结

Driver 进程是在集群某一台 Worker 上启动的,在客户端是无法查看 task 的执行情况的。假设要提交 100
个 application 到集群运行,每次 Driver 会随机在集群中某一台 Worker 上启动,那么这 100 次网卡流量暴
增的问题就散布在集群上

总结 Standalone

Standalone 两种方式提交任务,Driver 与集群的通信包括:

  1. Driver 负责应用程序资源的申请
  2. 任务的分发。
  3. 结果的回收。
  4. 监控 task 执行情况。

五、Yarn 模式两种提交任务方式

1、yarn-client 提交任务方式

(1)命令提交

  • 提交spark

方式一:

1
2
3
4
5
./spark-submit
--master yarn
--class
org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
100

方式二:

1
2
3
4
5
./spark-submit
--master yarn–client
--class
org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
100

方式三:

1
2
3
4
5
6
./spark-submit
--master yarn
--deploy-mode client
--class
org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
100

(2)执行原理图

(3)执行流程

  1. 客户端提交一个 Application,在客户端启动一个 Driver 进程。
  2. 应用程序启动后会向 RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
  3. RS 收到请求,随机选择一台 NM(NodeManager)启动 AM。这里的 NM 相当于 Standalone 中的Worker 节点。
  4. AM启动后,会向RS请求一批container资源,用于启动Executor.
  5. RS 会找到一批 NM 返回给 AM,用于启动 Executor。
  6. AM 会向 NM 发送命令启动 Executor。
  7. Executor 启动后,会反向注册给 Driver,Driver 发送 task 到Executor,执行情况和结果返回给 Driver 端。

(4)总结

Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地,Driver会与 yarn 集群中的 Executor 进行大量的通信,会造成客户机网卡流量的大量增加.

  • ApplicationMaster 的作用:
  1. 为当前的 Application 申请资源
  2. 给 NodeManager 发送消息启动 Executor。
  • 注意:

ApplicationMaster 有 launchExecutor 和申请资源的功能,并没有作业调度的功能

2、yarn-cluster 提交任务方式

(1)命令提交

  • 提交spark

方式一:

1
2
3
4
5
6
./spark-submit
--master yarn
--deploy-mode cluster
--class
org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
1000

方式二:

1
2
3
4
5
./spark-submit
--master yarn-cluster
--class
org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
1000

(2)执行原理图

(3)执行流程

  1. 客户机提交 Application 应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
  2. RS 收到请求后随机在一台 NM(NodeManager)上启动 AM(相当于 Driver 端,同一个进程)。
  3. AM 启动,AM 发送请求到 RS,请求一批 container 用于启动Excutor。
  4. RS 返回一批 NM 节点给 AM。
  5. AM 连接到 NM,发送请求到 NM 启动 Excutor。
  6. Excutor 反向注册到 AM 所在的节点的 Driver。Driver 发送 task到 Excutor。

(4)总结

Yarn-Cluster 主要用于生产环境中,

因为 Driver 运行在 Yarn 集群中某一台 nodeManager 中,每次提交任务的 Driver 所在的机器都是
随机的,不会产生某一台机器网卡流量激增的现象,

缺点是任务提交后不能看到日志。只能通过 yarn 查看日志。

  • ApplicationMaster 的作用:
  1. 为当前的 Application 申请资源
  2. 给 NodeManger 发送消息启动 Excutor。
  3. 任务调度。
  • 停止集群任务命令:yarn application -kill applicationID

总结yarn

——————————————–TODU—————————–

参数解释:spark-submit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spark-submit -h

--master (优先使用代码中的配置)
--name (指定APPname)
--deploy mode (默认为client,指定运行模式)
--jars (可以用来为代码添加所需要的jar包依赖)

IDEA代码打包:BUILD
(注意避免jar包过大,可不打包引用的相关sparkjar包,因搭建的spark集群上已经存在)

--files (添加代码所需的文件)
--conf (PROP=value)
--driver-memory
--executor-memory
--total-executor-core (若不指明,就把所有的核均用完)
--queue (资源分配:若运行在yarn上 :就是将资源 分配到队列中 )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[root@node00 bin]# ./spark-submit -h
Usage: spark-submit [options] <app jar | python file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally("client") or on one of the worker machines inside the cluster ("cluster") (Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated (逗号分隔) list of local jars to include on the driver and executor classpaths.(Driver 和 executor 依赖的第三方 jar 包)
--packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by -- repositories. The format for the coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in -- packages to avoid dependency conflicts.
--repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with -- packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working directory of each executor.

--conf PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark- defaults.conf.

--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.

--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).

--proxy-user NAME User to impersonate when submitting the application.

--help, -h Show this help message and exit
--verbose, -v Print additional debug output
--version, Print the version of current Spark

Spark standalone with cluster deploy mode only:
--driver-cores NUM Cores for driver (Default: 1).

Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.

Spark standalone and Mesos only:
--total-executor-cores NUM Total cores for all executors.

Spark standalone and YARN only:
--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)

YARN-only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1).
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").
--num-executors NUM Number of executors to launch (Default: 2).
--archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor.
--principal PRINCIPAL Principal to be used to login to KDC, while running on
secure HDFS.
--keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.


sc.textFile("hdfs://node00:8020/test.txt")
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)

六、术语解释

七、宽窄依赖

1、窄依赖

父RDD的一个partition对应子RDD一个partition

父RDD的多个partition对应子RDD一个partition

不会产生shuffle

1
2
3
4
map
flatmap
filter
union

2、宽依赖

父RDD的一个partition对应子RDD多个partition

会产生shuffle

会划分stage

1
2
3
reduceByKey
join
groupBy

八、stage

0、概念

(1)Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分相互依赖的多个stage,划分依据就是RDD之间的宽窄依赖关系:遇到宽依赖就划分stage

(2)stageN内有一组并行的task组成,这些task将以taskSet的格式提交给TaskScheduler运行

(2)task运行时,stage之间的关系可能并行,也可能串行

1、stage 切割规则

切割规则:从后往前,遇到宽依赖就切割 stage。

2、stage 计算模式:pipeline 管道计算模式

pipeline 管道计算模式,pipeline 只是一种计算思想、模式。

  • 数据在内存中流转
  • 数据一直在管道里面什么时候数据会落地?
  1. 对 RDD 进行持久化(cache、persisit)。
  2. shuffle write 的时候。
  • 什么决定task数

Stage 的 task 并行度是由 stage 的最后一个RDD的分区数来决定的 (partition分区数决定task数)

同一个stage中的task计算逻辑可能不同

  • 如何改变 RDD 的分区数?

宽依赖可改分区数;(因为此时数据已落地到磁盘)

textFile(“ ”,5)

reduceByKey(_ +_ , 5)

GroupByKey(4)

  • 测试验证 pipeline 计算模式

注意:textFile(“./wc.txt”)是通过文件获得RDD,parallelize(List)是通过转换参数内容获得RDD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> val conf = new SparkConf()
> conf.setMaster("local").setAppName("pipeline");
> val sc = new SparkContext(conf)
> val rdd = sc.parallelize(Array(1,2,3,4))
> val rdd1 = rdd.map { x => {
> println("map--------"+x)
> x
> }
> }
> val rdd2 = rdd1.filter { x => {
> println("fliter********"+x)
> true
> } }
> rdd2.collect().foreach(print+",") //1,2,3,4
> sc.stop()
>

显示:

map——–1

fliter**1

map——–2

fliter**2

map——–3

fliter**3

map——–4

fliter**4

九、Spark 资源调度和任务调度

1、概念解释

  • DAGScheduler是任务调度的高层调度器,是一个对象

DAGScheduler 的主要作用就是

将DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage,然后将这些Stage 以 TaskSet 的形式提交给 TaskScheduler

  • TaskScheduler 是任务调度的低层调度器

  • TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中的并行度 task 任务

Application→Job→Stage→Task

  • Spark推测执行机制

如果有运行缓慢的task,那么TaskScheduler就会启动一个新的task(在不同节点的excutor上)来执行相同的处理逻辑,两个task中哪个task先执行结束,就以那个task的执行结果为准。

在 Spark 中推测执行默认是关闭的。

推测执行可以通过 spark.speculation 属性来配置。
注意:

  • 对于 ETL 类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。

  • 如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。(这时候task慢是因为数据量过多,而不是执行性能不行)

2、Spark 资源调度和任务调度的流程:

1、启动集群后,Worker 节点会向 Master 节点汇报资源情况,Master 掌握了集群资源情况。

2、当 Spark 提交一个 Application 后,根据 RDD 之间依赖关系将 Application 形成一个 DAG 有向无环图。

3、任务提交后,Spark 会在Driver 端创建两个对象:DAGScheduler 和 TaskScheduler,

DAGScheduler 将DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage,然后将这些Stage 以 TaskSet 的形式提交给 TaskScheduler,

TaskSchedule 会遍历TaskSet 集合,拿到每个 task 后会将 task 发送到计算节点 Executor 中去执行(其实就是发送到 Executor 中的线程池 ThreadPool 去执行)。

task 在Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由 TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试 3 次。如果重试 3 次依然失败,那么这个 task 所在的 stage 就失败了。

stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到TaskSchdeuler,Stage 默认重试 4 次。如果重试 4 次以后依然失败,那么这个 job 就失败了。job 失败了,Application 就失败了。

TaskScheduler 不仅能重试失败的 task,还会重试 straggling<落后,缓慢的>task(也就是执行速度比其他 task 慢太多的 task)。如果有运行缓慢的 task那么 TaskScheduler 会启动Spark 的推测执行机制先执行完,task 的执行结果为准。

3、资源调度和任务调度的流程图

4、粗粒度资源申请和细粒度资源申请

  • 粗粒度资源申请(Spark)
    在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 task 执行完成后,才会释放这部分资源。

    • 优点:

      在 Application 执行之前,所有的资源都申请完毕,每一个task 直接使用资源就可以了,不需要 task 在执行前自己去申请资源,task 启动就快了,task 执行快了,stage 执行就快了,job 就快了,application 执行就快了。

    • 缺点:

      直到最后一个 task 执行完成才会释放资源,集群的资源无法充分利用。

  • 细粒度资源申请
    Application 执行之前不需要先去申请资源,而是直接执行,让 job中的每一个 task 在执行前自己去申请资源,task 执行完成就释放资源。

    优点

    集群的资源可以充分利用。

    缺点

    task 自己去申请资源,task 启动变慢,Application 的运行就响应的变慢了。

5、资源调度源码分析

 资源请求简单图

 资源调度 Master 路径:

1
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/deploy/Master/Master.scala

 提交应用程序,submit 的路径:

1
路径:spark-1.6.0/core/src/main/scala/org.apache.spark/ deploy/SparkSubmit.scala

 总结:

  1. Executor 在集群中分散启动,有利于 task 计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置–executor-cores 选项),每一个 Worker 为当前的 Application 启动一个 Executor,这个Executor 会使用这个 Worker 的所有的 cores 和 1G 内存。
  3. 如果想在 Worker 上启动多个 Executor,提交 Application 的时候要加–executor-cores 这个选项。
  4. 默认情况下没有设置–total-executor-cores,一个 Application 会使用 Spark 集群中所有的 cores。

 结论演示

使用 Spark-submit 提交任务演示。也可以使用 spark-shell

  1. 默认情况每个 worker 为当前的 Application 启动一个 Executor,这个 Executor 使用集群中所有的 cores 和 1G 内存。

    1
    2
    3
    4
    5
    ./spark-submit
    --master spark://node1:7077
    --class org.apache.spark.examples.SparkPi
    ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
    10000
  2. 在 workr 上启动多个 Executor,设置–executor-cores 参数指定每个executor 使用的 core 数量。

    1
    2
    3
    4
    5
    6
    ./spark-submit
    --master spark://node1:7077
    --executor-cores 1
    --class org.apache.spark.examples.SparkPi
    ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
    10000
  3. 内存不足的情况下启动 core 的情况。Spark 启动是不仅看 core 配置参数,也要看配置的 core 的内存是否够用。

    1
    2
    3
    4
    5
    6
    7
    ./spark-submit
    --master spark://node1:7077
    --executor-cores 1
    --executor-memory 3g
    --class org.apache.spark.examples.SparkPi
    ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
    10000
  4. –total-executor-cores 集群中共使用多少 cores 。注意:一个进程不能让集群多个节点共同启动。

    1
    2
    3
    4
    5
    6
    7
    8
    ./spark-submit
    --master spark://node1:7077
    --executor-cores 1
    --executor-memory 2g
    --total-executor-cores 3
    --class org.apache.spark.examples.SparkPi
    ../lib/spark-examples-1.6.0-hadoop2.6.0.jar
    10000

6、任务调度源码分析

 Action 算子开始分析
任务调度可以从一个 Action 类算子开始。因为 Action 类算子会触发一个 job 的执行。

 划分 stage,以 taskSet 形式提交任务
DAGScheduler 类中 getMessingParentStages()方法是切割 job 划分stage 。 可 以 结 合 以 下 这 张 图 来 分 析 :

文章目录
  1. 一、控制算子
    1. 1、概念:
    2. 2、详解
  2. 二、算子补充
    1. transformation转换算子
    2. action触发算子
  3. 三、集群搭建及测试
    1. Standalone
      1. 1、下载安装包、解压
      2. 2、改名
      3. 3、修改slaves
      4. 4、修改 spark-env.sh
      5. 5、其他节点
      6. 7、启动:(node00)
      7. 8、搭建客户端
    2. Yarn
      1. 1、步骤
      2. 2、配置
    3. 测试:求π值
      1. 源码案例:
      2. Standalone 提交命令:
      3. YARN 提交命令:
  4. 四、Standalone 模式两种提交任务方式
    1. 1、Standalone-client 提交任务方式
      1. (1)命令提交
      2. (2)执行原理图
      3. (3)执行流程
      4. (4)总结
    2. 2、Standalone-cluster 提交任务方式
      1. (1)命令提交
      2. (2)执行原理图
      3. (3)执行流程
      4. (4)总结
    3. 总结 Standalone
  5. 五、Yarn 模式两种提交任务方式
    1. 1、yarn-client 提交任务方式
      1. (1)命令提交
      2. (2)执行原理图
      3. (3)执行流程
      4. (4)总结
    2. 2、yarn-cluster 提交任务方式
      1. (1)命令提交
      2. (2)执行原理图
      3. (3)执行流程
      4. (4)总结
    3. 总结yarn
  6. 参数解释:spark-submit
  7. 六、术语解释
  8. 七、宽窄依赖
    1. 1、窄依赖
    2. 2、宽依赖
  9. 八、stage
    1. 0、概念
    2. 1、stage 切割规则
    3. 2、stage 计算模式:pipeline 管道计算模式
  10. 九、Spark 资源调度和任务调度
    1. 1、概念解释
    2. 2、Spark 资源调度和任务调度的流程:
    3. 3、资源调度和任务调度的流程图
    4. 4、粗粒度资源申请和细粒度资源申请
    5. 5、资源调度源码分析
    6. 6、任务调度源码分析
|
载入天数...载入时分秒...