新闻资讯

Spark系列014——Spark调度管理之作业调度

2022年1月28日

Spark作业调度器概述

在指定的Spark应用内部(对应同一SparkContext实例),多个线程可能并发地提交Spark作业(Job)。一个并行计算 Job 由一组 Task 组成,并由 Spark Action(如:save、collect)触发启动。Spark的作业调度器是完全线程安全的,并且能够支持Spark应用同时处理多个请求(比如,来自不同用户的查询)。

默认,Spark应用内部使用FIFO调度策略。每个作业被划分为多个阶段(Stage)(例如map阶段和reduce阶段),第一个作业在其启动后会优先获取所有的可用资源,然后是第二个作业再申请,再第三个……。如果前面的作业没有把集群资源占满,则后续的作业可以立即启动运行,否则,后提交的作业会有明显的延迟等待。

不过从Spark 0.8开始,为了能支持各个作业间的公平调度,Spark提供了FAIR调度策略。FAIR调度时,Spark以轮询的方式给每个作业分配资源,因此所有的作业获得的资源大体上是平均分配。这意味着,即使有大作业在运行,小的作业再提交也能立即获得计算资源而不是等待前面的作业结束,大大减少了延迟时间。这种模式特别适合于多用户配置。

要启用公平调度器,只需设置一下 SparkContext中spark.scheduler.mode 属性为 FAIR即可:

val conf = new SparkConf().setMaster(...).setAppName(...)

conf.set("spark.scheduler.mode", "FAIR")

val sc = new SparkContext(conf)

FAIR调度资源池

FAIR调度器还可以支持将作业分组放入资源池(pool),然后给每个资源池配置不同的选项(如:权重)。这样你就可以给一些比较重要的作业创建一个“高优先级”资源池,或者你也可以把每个用户的作业分到一组,这样一来就是各个用户平均分享集群资源,而不是各个作业平分集群资源。Spark FAIR调度器的实现方式基本都是模仿 Hadoop Fair Scheduler(http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html) 来实现的。

默认情况下,新提交的作业都会进入到默认资源池中,不过作业对应于哪个资源池,可以在提交作业的线程中用SparkContext.setLocalProperty 设定 spark.scheduler.pool 属性。示例代码如下:

// Assuming sc is your SparkContext variable

sc.setLocalProperty("spark.scheduler.pool", "pool1")

一旦设好了局部属性,所有该线程所提交的作业(即:在该线程中调用action算子,如:RDD的save、count、collect 等)都会使用这个资源池。这个设置是以线程为单位保存的,你很容易实现用同一线程来提交同一用户的所有作业到同一个资源池中。同样,如果需要清除资源池设置,只需在对应线程中调用如下代码:

sc.setLocalProperty("spark.scheduler.pool", null)

资源池默认行为

默认地,各个资源池之间平分整个集群的资源(包括default资源池),但在资源池内部,默认情况下,作业是FIFO顺序执行的。举例来说,如果你为每个用户创建了一个资源池,那么意味着各个用户之间共享整个集群的资源,但每个用户自己提交的作业是按顺序执行的,而不会出现后提交的作业抢占前面作业的资源。

配置资源池属性

资源池的属性需要通过配置文件来指定。每个资源池都支持以下3个属性:

  • schedulingMode:可以是FIFO或FAIR,控制资源池内部的作业是如何调度的。
  • weight:控制资源池相对其他资源池,可以分配到资源的比例。默认所有资源池的weight都是1。如果你将某个资源池的weight设为2,那么该资源池中的资源将是其他池子的2倍。如果将weight设得很高,如1000,可以实现资源池之间的调度优先级 – 也就是说,weight=1000的资源池总能立即启动其对应的作业。
  • minShare:除了整体weight之外,每个资源池还能指定一个最小资源分配值(CPU个数),管理员可能会需要这个设置。FAIR调度器总是会尝试优先满足所有活跃(active)资源池的最小资源分配值,然后再根据各个池子的weight来分配剩下的资源。因此,minShare属性能够确保每个资源池都能至少获得一定量的集群资源。minShare的默认值是0。

资源池属性是一个XML文件,可以基于 conf/fairscheduler.xml.template 修改,然后在 SparkConf 的 spark.scheduler.allocation.file 属性指定文件路径:

conf.set("spark.scheduler.allocation.file", "/path/to/file")

资源池XML配置文件格式如下,其中每个池子对应一个<pool>元素,每个资源池可以有其独立的配置:

<?xml version="1.0"?>

<allocations>

<pool name="production">

<schedulingMode>FAIR</schedulingMode>

<weight>1</weight>

<minShare>2</minShare>

</pool>

<pool name="test">

<schedulingMode>FIFO</schedulingMode>

<weight>2</weight>

<minShare>3</minShare>

</pool>

</allocations>

注意,没有在配置文件中配置的资源池都会使用默认配置(schedulingMode:FIFO,weight:1,minShare:0)。

使用JDBC连接进行调度

要为JDBC客户端会话设置FAIR调度器池,用户可以设置spark.sql.thriftserver.scheduler.pool变量:

SET spark.sql.thriftserver.scheduler.pool=accounting;