0%

https://spark.apache.org/docs/latest/cluster-overview.html

https://spark.apache.org/docs/latest/cluster-overview.html#glossary

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-pi.yaml

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-py-pi.yaml

KUBERNETES_SERVICE_HOST

KUBERNETES_SERVICE_PORT

1
2
3
4
5
6
--class=...
--master k8s://https://%s:%s
--deploy-mode cluster/client
--conf spark.kubernetes.namespace=default
--conf spark.app.name=spark-pi
SparkPi.jar (MainApplicationFile: MainFile is the path to a bundled JAR, Python, or R file of the application.)

SPARK_HOME/bin/spark-submit args

Spark-on-k8s-operator controller run the spark-submit scripts

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#cluster-mode

1
2
3
4
5
6
7
8
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#dependency-management

client mode is not supported in 2.3.1 (cluster manager)

but it seems work in 2.4

https://spark.apache.org/docs/latest/running-on-kubernetes.html

Dependencies Jars and Files

logs

1
kubectl -n=<namespace> logs -f <driver-pod-name>

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md

CP

先写 log

再复制 log

一致性协议确保大多数都已成功复制 log 后,这个时候 log 也称为被 committed 了

执行状态机

三种角色

Leader / Follower / Candidate

Leader 负责周期性发起心跳

Follower 接收心跳

Follower 选举超时后,发起 Vote,状态转变为 Candidate

ETCD 的流程

写 WAL; raft 协议 ok; apply data to boltdb

使用 raft 来同步 WAL;

选举的限制

A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs

简单来说,在发起选举投票时,需要携带最新的 log 信息,包括 index 及 term;term 越大越新,如果 term 相同,则 log 的长度越长越新;这可以保证新选举出来的 leader 包含了之前所有 commited 的信息

网络分区需要特殊考虑 (2PC)

第一阶段先征求其他节点是否同意选举,如果同意选举则发起真正的选举操作,否则降为 Follower 角色

例如当有 follow 被隔离时,其 term 会不断增大,当其重新加入集群时,会触发选举,影响集群稳定性;为避免如此情况,增加 preVote 阶段,即发起 vote 的前提为

  • 没有收到有效领导的心跳,至少有一次选举超时
  • Candidate的日志足够新(Term更大,或者Term相同raft index更大)

才开始选举,避免网络隔离后,恢复加入的节点 term 较高,触发集群选举

客户端也需要考虑,网络分区,无法完成写入,需要 server 端返回特定的错误时,直接切换后端

Summary

  • 2PC 两阶段
  • Leader 一致性算法

没实际玩儿过 … 家里一直登陆不上 … 以下全凭看文档与代码

以 EC2 为基石设计,以镜像作为运行环境,完全面向镜像的设计

支持的 EC2 实例类型

https://amazonaws-china.com/sagemaker/pricing/instance-types/

性能最好的实例类型当前为

1
2
3
4
5
6
ml.p3dn.24xlarge
vCPU: 96
GPUs: 8xV100
Mem(GiB): 768
GPU Mem(GiB): 256
Networking Performance: 100 Gigabit

EC2 对应的实例类型为

1
2
3
4
5
6
7
8
9
p3dn.24xlarge   
GPUs: 8
vCPU: 96
Mem(GiB): 768
GPU Mem(GiB): 256
GPU P2P: NVLink
Storage(GB): 2 x 900 NVMe SSD
Dedicated EBS Bandwidth: 14 Gbps
Networking Performance: 100 Gigabit

100 Gigabit 由 AWS efa 提供,提供 OS-bypass 通信能力,应该就是 Infiniband 网络

启动训练过程 (Script Mode)

  • 上传本地代码至 S3
  • 调用 Sagemaker 创建训练作业接口
    • 创建 EC2 实例
    • 在 EC2 实例上启动 Sagemaker 系统进程
    • 下载数据集
    • 启动容器 (镜像)
    • 下载训练代码
    • 启动训练代码

https://github.com/aws/sagemaker-containers

下载

https://github.com/aws/sagemaker-containers/blob/master/src/sagemaker_containers/_files.py#L112

上传

https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/fw_utils.py#L322

因此完整流程

创建作业/版本

hikari 轻量级数据库连接池

https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#limited-resources

这句话挺有趣,言简意赅

More threads only perform better when blocking creates opportunities for executing.

hikari 的理念,connections 并非越多越好,相反,可能只需要少量;太多的 connections 反而会导致性能下降

一般而言,一个数据库操作由一个线程执行;而 CPU 一个核心,同时只能执行一个线程

基于该前提,在没有 IO 阻塞的情况,即线程只要启动,就能一直在处理工作,那多线程 (多 connections),并不能提高性能

如果有 IO 阻塞等情况,使得线程在自己的时间片中,不能充分使用 CPU core,这样通过线程切换技术,可以提高 CPU core 的使用率,最终来看提高了性能 (吞吐量)

因此 PostgresSQL 项目给出了一个连接数计算的参考公式

1
connections = ((core_count * 2) + effective_spindle_count)

Effective spindle count is zero if
the active data set is fully cached, and approaches the actual number of spindles
as the cache hit rate falls

effective_spindle_count 即 IO 等待时间的估计值

hikari 特殊的优化手段介绍

https://github.com/brettwooldridge/HikariCP/wiki/Down-the-Rabbit-Hole

  • 修改代码: 生成更优的字节码
  • 实现特定的数据结构: 更好的性能

连接池的一些问题

https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#pool-locking

一个线程获取多个数据库连接(饿死)

可能的优化方案

  • 设置保证不会出现饿死情况的 pool size = Tn x (Cm - 1) + 1, Tn 为线程数, Cm 为每个线程最大获取的连接数
  • 如果当前线程已经获取过数据库连接,再调用 getConnection 时,返回该线程当前的 connection,而不是返回新的 connection

hikari 的一些配置项

https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby

leakDetectionThreshold: connection 泄漏告警,getConnection 后,在 leakDetectionThreshold 时间前,未调用 close 方法,会日志打印告警

慢 SQL (10s 以上执行时间) 会导致该告警

idleTimeout = idleTimeout = 600s

This property controls the maximum amount of time that a connection is allowed to sit idle in the pool

此属性控制允许连接在池中空闲的最长时间,即空闲超过该时间后,该 Connection 将被 softEvictConnection

maxLifetime = maxLifetime = 1800s

Hikari 新建 connection 时,使用 houseKeeperExecutor 线程池执行,maxLifeTime 后触发 softEvictConnection

maxLifetime 需要结合 DB 的 connection timeout 设置

maxLifetime 加了 2.5% 的抖动,防止同一时间有大量的 Connections 被关闭

maximumPoolSize = maximumPoolSize = 10

This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. Basically this value will determine the maximum number of actual connections to the database backend.

决定了最大 Connections 数 (idle and in-use)

minimumIdle = maximumPoolSize

需要维持的最小 idle Connections 数

validationTimeout = 5s

This property controls the maximum amount of time that a connection will be tested for aliveness.

getConnection 时会判断 Connection aliveness,设置检测 aliveness 的超时时间

connectionTimeout = 30s

This property controls the maximum number of milliseconds that a client (that’s you) will wait for a connection from the pool.

getConnection 的超时时间

与 Scala Slick 结合

增加 slick-hikari 依赖

http://slick.lightbend.com/doc/3.2.1/gettingstarted.html#adding-slick-to-your-project

增加 typesafe config 配置项

http://slick.lightbend.com/doc/3.2.1/database.html#connection-pools

注意到 slick 将 hikari 的初始化方法封装了一层,另外该文档有一些错误,建议直接参考 hikari 的 readme.md 说明,也没几个核心参数

http://slick.lightbend.com/doc/3.2.1/api/index.html#slick.jdbc.JdbcBackend$DatabaseFactoryDef@forConfig(String,Config,Driver,ClassLoader):Database

Slick 这个 O (or function) RM 怎么说呢,一言难尽,表达式太弱 (当然可能是我不熟悉如此灵魂的 function 写法),生成的 SQL 语句非最优。举个例子

1
select count(1), count(distinct job_name), * from A, B, C

就没法实现,要么写成子查询,要么只能分开并发执行,or 直接写 sql 语句 …

线程池

注意到 slick 维护一个内部线程池,用于执行数据库相关的异步操作,通过 numThreads 参数指定最大线程数

Database.forConfig()

numThreads: 用于执行数据库相关的异步操作的线程数

maxConnections = numThreads * 5

minimumIdle = numThreads

因此以 Slick numThreads = 16 为例

  • maximumPoolSize = 16 * 5 = 80
  • minimumIdle = 16

hikari summary

关键配置项

  • minimumIdle 决定了连接池中的最小 idle 连接数,当 idle 连接少于该阈值时,hikari 会尽快补齐
  • idleTimeout (10min) 决定了 idle 连接的存活时间
  • maximumPoolSize 决定了连接池中的最大连接数
  • maxLifeTime (30min) 决定了连接最大存活时间

https://github.com/open-mpi/ompi/issues/6691#issuecomment-497245597

ess_hnp_module.c

1
2
3
4
orte_set_attribute(&transports, ORTE_RML_TRANSPORT_TYPE, ORTE_ATTR_LOCAL, orte_mgmt_transport, OPAL_STRING);
orte_mgmt_conduit = orte_rml.open_conduit(&transports);
orte_set_attribute(&transports, ORTE_RML_TRANSPORT_TYPE, ORTE_ATTR_LOCAL, orte_coll_transport, OPAL_STRING);
orte_coll_conduit = orte_rml.open_conduit(&transports);

rml: resource message layer

根据 attr 选择 rtmod, 返回的为 mod 在 array 中的 index

Open conduit - call each component and see if they can provide a
conduit that can satisfy all these attributes - return the conduit id
(a negative value indicates error)

rml_base_stubs.c

1
orte_rml_API_open_conduit

遍历 active 的 rml mod, 调用各个 rml mod 的 open_conduit, 例如 oob (rml_oob_component.c), 返回 mod 之后,存入 array,返回 array index

1
2
3
open_conduit()

orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING)

从 attr 里获取 key 值,即 orte_mgmt_transport or orte_coll_transport, 确定是否指定了 oob

继续获取 routed mod

1
orte_get_attribute(attributes, ORTE_RML_ROUTED_ATTRIB, (void**)&comp_attrib, OPAL_STRING);

根据设置的 routed mod (NULL 则按优先级) 分配 routed mod

1
md->routed = orte_routed.assign_module(comp_attrib);

orte_mca_params.c

1
2
3
4
5
6
7
8
9
10
11
orte_mgmt_transport = "oob" // default

--mca orte_mgmt_transport

ORTE management messages

orte_coll_transport = "fabric,ethernet" // default

--mca orte_coll_transports

ORTE collectives

plm_base_launch_support.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
param = NULL;
if (ORTE_SUCCESS != (rc = orte_regx.nidmap_create(orte_node_pool, &param))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (NULL != orte_node_regex) {
free(orte_node_regex);
}
orte_node_regex = param;
/* if this is too long, then we'll have to do it with
* a phone home operation instead */
if (strlen(param) < orte_plm_globals.node_regex_threshold) {
opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID);
opal_argv_append(argc, argv, "orte_node_regex");
opal_argv_append(argc, argv, orte_node_regex);
/* mark that the nidmap has been communicated */
orte_nidmap_communicated = true;
}

orte_node_pool

node_regex_threshold = 1024 default len

Resource Allocation Subsystem (RAS)

managing python version

1
conda create --name dl python=3.6

activate your env [dl]

1
2
conda info --envs
conda activate dl

PyCharm with anaconda

build the latest openmpi

1
2
3
4
5
6
# download the openmpi-v4.0.0.tar.gz from the official website
# untar and run configure
./configure --prefix=/usr/local/openmpi --enable-orterun-prefix-by-default
# make and install
make -j $(nproc) all
make install

verify openmpi

1
mpirun -np 4 --bind-to none --map-by slot hostname

install tensorflow

1
/anaconda3/envs/dl/bin/pip --proxy http://127.0.0.1:1081 install tensorflow==1.13.1

install horovod

1
HOROVOD_WITH_TENSORFLOW=1 /anaconda3/envs/dl/bin/pip install -v --no-cache-dir horovod==0.16.2

ref horovod Dockerfile

0.16.2

horovod 启动流程

hvd.init() 发生了什么

HorovodBasics -> init -> horovod_init -> InitializeHorovodOnce

启动线程

BackgroundThreadLoop

MPI 初始化

MPI_Comm_dup

MPI_Comm_rank 获取当前进程的 RANK

MPI_Comm_size 获取 local size

两次 AllGather, 把 rank 与 size 分发到所有进程

检查是否同构,即 size 是否相同

rank 0 初始化 MessageTable

initialization_done = true 初始化结束

主线程等待

initialization_done = true

背景线程持续 RunLoopOnce

1
while (RunLoopOnce(state, is_coordinator));

从 message_queue 中获取数据

DistributedOptimizer

MXNet 分布式通信

MXnet 使用了 dmlc/ps-lite 实现其分布式通信机制

ps-lite

van.h 父类,使用 Protocol buffer 定义了数据格式

zmq_van.h 继承 van.h,使用 zmq 做具体的数据传输

当前 MXnet 只能使用 IPoIB,不能直接使用 IB verbs 来通信;有个改动 ps-lite 也能直接使用 ib 来通信的 PR 挂了一年多了 …

RDMA in kubernetes

IB 这东西嘛,看起来是没服务发现的能力的;就是使用之前,需要使用 IP 网络通信一次,然后知道对端的 IB 的地址后,才能直接基于 IB 来通信

NCCL 就是这么干的

当然参考阿里云的说法

RDMA通讯建连过程普遍分为 TCP 和 RDMA_CM 两种实现,如果应用程序使用RDMA_CM 的建连的方式,vpc网络插件中分配的pod ip 无法作为RDMA_CM 地址, 容器需要设置HostNetwork。并将bond0 ip 设置为CM的通讯地址

这得看下 mlx device plugin 的实现和容器网络的实现,比较好奇,看一下吧

业界有两种实现 SR-IOV (single root io virtualization) 另一种就是直接共享设备 (HCA: host channel adapter)

HCA

HCA 就没啥好说的了,device plugin allocate 时,给 kubelet 返回设备路径,直接整个把 /dev/infiniband 挂载入容器中即可 … 这点会比较误导,相当于可以用所有的 HCA ?

所以对于 HCA 来说,直接用原先的容器网络来初始化 IB 通信即可,即在容器网络上做 ib 设备地址的发现,随后再用 ib verbs api 来通信

当然如果实现时,不是使用 tcp 的方式来初始化 IB,使用的是 rdma_cm,会有区别,容器的 ip 不能用于通信,只能用主机上的 bond0 ip 来通信 (主网卡)

有点儿奇怪可能与 rdma_cm 的 api 有关 …

另外最好加个这个,让单机多卡的训练可以 IPC 通信

1
2
3
securityContext:
capabilities:
add: [ "IPC_LOCK" ]

SR-IOV

PF -> VF

需要 SR-IOV CNI plugin 及 device plugin 配合

device plugin 挂载 /dev/infiniband 到容器中

SR-IOV CNI plugin 负责将 VF 绑定到容器的 network namespace 中

假设 VF 事先创建好

逻辑是通的,CNI 记录每个节点哪些 VF 已被分配,每个节点记录剩余几个 VF,当有新的 pod 被调度到当前节点时,CNI 即可分配未使用的 VF 与 pod

IP over Infiniband

IPoIB 由 ipoib driver 实现,能像一般 NIC (ifconfig) 一样使用 ib 网络,当然性能差一些,在到达 NIC 之前,与 socket 通信的开销一致

  • socket api
  • ib verbs api

socket api 走 os 调用栈,CPU 参与

ib verbs 直接走 ib,相当于 bypass 了 os 与 CPU,硬件卸载

不过有个地方比较疑惑,IPoIB,infiniband 底层的传输方式是 datagram 即不可靠传输,不过上层应用(zmp)都是 tcp,应有自己的重传机制;其次 ps-lite van.h 里也有开关控制重传功能,在未收到当前消息的 ACK 时,不会处理该请求,这里展开说下

van.h 初始时,如果看到打开了重传开关,则初始化 resend.h

启动了一个线程,周期性 (Timeout) 的重传 send_buf 中的消息

van.h 在

  • 发送数据时,将数据加入 send_buf 中;
  • 接收到数据时
    • 如果是 ACK,则从 send_buf 中移除该消息,并把数据交给上层处理
    • 若当前数据仍不是 ack 数据,则查看是否已发送过 ACK
      • 是的话,则认为是重复消息,不交上层处理,并发送 ACK
      • 否则 交上层处理,记录到 ACK set 中,并发送 ACK

不过这个重传的场景比较神奇,话说 TCP 已经有 ACK 和重传机制了 … 不懂 ps-lite 是遇到了什么具体的场景,再做了一次重传的增强设计

而且这个设计有个限制,收发数据越多,似乎内存消耗越大,因为记录是否收到过该消息的 ACK 的 set,并没有清理的机会,会一直增加

所以一般也未见开启这个功能,env.md 里都没说明这两环境变量,姑且认为是个废弃特性吧 …

1
2
PS_RESEND=1
PS_RESEND_TIMEOUT=1000 # ms

厂家

Azure

只支持 RDMA on IB 不支持 IP over IB

In Azure, IP over IB is not supported. Only RDMA over IB is supported.

https://docs.microsoft.com/zh-cn/azure/virtual-machines/linux/sizes-hpc#rdma-capable-instances

Aliyun

k8s 目前只看到 RDMA on IB,IP over IB 没看到,看起来是直接主机网络之后,用 ib0 ip 就行?

AWS

sagemaker 25Bbps/s 互联,看起来不像 IB or RoCE

参考

https://community.mellanox.com/s/article/kubernetes-ipoib-ethernet-rdma-sr-iov-networking-with-connectx4-connectx5

在 Kubernetes 上使用 RDMA