MegaThinking

better tokens, better intelligence, contributing superior tokens to models

https://spark.apache.org/docs/latest/cluster-overview.html

https://spark.apache.org/docs/latest/cluster-overview.html#glossary

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-pi.yaml

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-py-pi.yaml

KUBERNETES_SERVICE_HOST

KUBERNETES_SERVICE_PORT

1
2
3
4
5
6
--class=...
--master k8s://https://%s:%s
--deploy-mode cluster/client
--conf spark.kubernetes.namespace=default
--conf spark.app.name=spark-pi
SparkPi.jar (MainApplicationFile: MainFile is the path to a bundled JAR, Python, or R file of the application.)

SPARK_HOME/bin/spark-submit args

Spark-on-k8s-operator controller run the spark-submit scripts

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#cluster-mode

1
2
3
4
5
6
7
8
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#dependency-management

client mode is not supported in 2.3.1 (cluster manager)

but it seems work in 2.4

https://spark.apache.org/docs/latest/running-on-kubernetes.html

Dependencies Jars and Files

logs

1
kubectl -n=<namespace> logs -f <driver-pod-name>

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md

CP

先写 log

再复制 log

一致性协议确保大多数都已成功复制 log 后,这个时候 log 也称为被 committed 了

执行状态机

三种角色

Leader / Follower / Candidate

Leader 负责周期性发起心跳

Follower 接收心跳

Follower 选举超时后,发起 Vote,状态转变为 Candidate

ETCD 的流程

写 WAL; raft 协议 ok; apply data to boltdb

使用 raft 来同步 WAL;

选举的限制

A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs

简单来说,在发起选举投票时,需要携带最新的 log 信息,包括 index 及 term;term 越大越新,如果 term 相同,则 log 的长度越长越新;这可以保证新选举出来的 leader 包含了之前所有 commited 的信息

网络分区需要特殊考虑 (2PC)

第一阶段先征求其他节点是否同意选举,如果同意选举则发起真正的选举操作,否则降为 Follower 角色

例如当有 follow 被隔离时,其 term 会不断增大,当其重新加入集群时,会触发选举,影响集群稳定性;为避免如此情况,增加 preVote 阶段,即发起 vote 的前提为

  • 没有收到有效领导的心跳,至少有一次选举超时
  • Candidate的日志足够新(Term更大,或者Term相同raft index更大)

才开始选举,避免网络隔离后,恢复加入的节点 term 较高,触发集群选举

客户端也需要考虑,网络分区,无法完成写入,需要 server 端返回特定的错误时,直接切换后端

Summary

  • 2PC 两阶段
  • Leader 一致性算法

没实际玩儿过 … 家里一直登陆不上 … 以下全凭看文档与代码

以 EC2 为基石设计,以镜像作为运行环境,完全面向镜像的设计

支持的 EC2 实例类型

https://amazonaws-china.com/sagemaker/pricing/instance-types/

性能最好的实例类型当前为

1
2
3
4
5
6
ml.p3dn.24xlarge
vCPU: 96
GPUs: 8xV100
Mem(GiB): 768
GPU Mem(GiB): 256
Networking Performance: 100 Gigabit

EC2 对应的实例类型为

1
2
3
4
5
6
7
8
9
p3dn.24xlarge   
GPUs: 8
vCPU: 96
Mem(GiB): 768
GPU Mem(GiB): 256
GPU P2P: NVLink
Storage(GB): 2 x 900 NVMe SSD
Dedicated EBS Bandwidth: 14 Gbps
Networking Performance: 100 Gigabit

100 Gigabit 由 AWS efa 提供,提供 OS-bypass 通信能力,应该就是 Infiniband 网络

启动训练过程 (Script Mode)

  • 上传本地代码至 S3
  • 调用 Sagemaker 创建训练作业接口
    • 创建 EC2 实例
    • 在 EC2 实例上启动 Sagemaker 系统进程
    • 下载数据集
    • 启动容器 (镜像)
    • 下载训练代码
    • 启动训练代码

https://github.com/aws/sagemaker-containers

下载

https://github.com/aws/sagemaker-containers/blob/master/src/sagemaker_containers/_files.py#L112

上传

https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/fw_utils.py#L322

因此完整流程

创建作业/版本

hikari 轻量级数据库连接池

https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#limited-resources

这句话挺有趣,言简意赅

More threads only perform better when blocking creates opportunities for executing.

hikari 的理念,connections 并非越多越好,相反,可能只需要少量;太多的 connections 反而会导致性能下降

一般而言,一个数据库操作由一个线程执行;而 CPU 一个核心,同时只能执行一个线程

基于该前提,在没有 IO 阻塞的情况,即线程只要启动,就能一直在处理工作,那多线程 (多 connections),并不能提高性能

如果有 IO 阻塞等情况,使得线程在自己的时间片中,不能充分使用 CPU core,这样通过线程切换技术,可以提高 CPU core 的使用率,最终来看提高了性能 (吞吐量)

因此 PostgresSQL 项目给出了一个连接数计算的参考公式

1
connections = ((core_count * 2) + effective_spindle_count)

Effective spindle count is zero if
the active data set is fully cached, and approaches the actual number of spindles
as the cache hit rate falls

effective_spindle_count 即 IO 等待时间的估计值

hikari 特殊的优化手段介绍

https://github.com/brettwooldridge/HikariCP/wiki/Down-the-Rabbit-Hole

  • 修改代码: 生成更优的字节码
  • 实现特定的数据结构: 更好的性能

连接池的一些问题

https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#pool-locking

一个线程获取多个数据库连接(饿死)

可能的优化方案

  • 设置保证不会出现饿死情况的 pool size = Tn x (Cm - 1) + 1, Tn 为线程数, Cm 为每个线程最大获取的连接数
  • 如果当前线程已经获取过数据库连接,再调用 getConnection 时,返回该线程当前的 connection,而不是返回新的 connection

hikari 的一些配置项

https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby

leakDetectionThreshold: connection 泄漏告警,getConnection 后,在 leakDetectionThreshold 时间前,未调用 close 方法,会日志打印告警

慢 SQL (10s 以上执行时间) 会导致该告警

idleTimeout = idleTimeout = 600s

This property controls the maximum amount of time that a connection is allowed to sit idle in the pool

此属性控制允许连接在池中空闲的最长时间,即空闲超过该时间后,该 Connection 将被 softEvictConnection

maxLifetime = maxLifetime = 1800s

Hikari 新建 connection 时,使用 houseKeeperExecutor 线程池执行,maxLifeTime 后触发 softEvictConnection

maxLifetime 需要结合 DB 的 connection timeout 设置

maxLifetime 加了 2.5% 的抖动,防止同一时间有大量的 Connections 被关闭

maximumPoolSize = maximumPoolSize = 10

This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. Basically this value will determine the maximum number of actual connections to the database backend.

决定了最大 Connections 数 (idle and in-use)

minimumIdle = maximumPoolSize

需要维持的最小 idle Connections 数

validationTimeout = 5s

This property controls the maximum amount of time that a connection will be tested for aliveness.

getConnection 时会判断 Connection aliveness,设置检测 aliveness 的超时时间

connectionTimeout = 30s

This property controls the maximum number of milliseconds that a client (that’s you) will wait for a connection from the pool.

getConnection 的超时时间

与 Scala Slick 结合

增加 slick-hikari 依赖

http://slick.lightbend.com/doc/3.2.1/gettingstarted.html#adding-slick-to-your-project

增加 typesafe config 配置项

http://slick.lightbend.com/doc/3.2.1/database.html#connection-pools

注意到 slick 将 hikari 的初始化方法封装了一层,另外该文档有一些错误,建议直接参考 hikari 的 readme.md 说明,也没几个核心参数

http://slick.lightbend.com/doc/3.2.1/api/index.html#slick.jdbc.JdbcBackend$DatabaseFactoryDef@forConfig(String,Config,Driver,ClassLoader):Database

Slick 这个 O (or function) RM 怎么说呢,一言难尽,表达式太弱 (当然可能是我不熟悉如此灵魂的 function 写法),生成的 SQL 语句非最优。举个例子

1
select count(1), count(distinct job_name), * from A, B, C

就没法实现,要么写成子查询,要么只能分开并发执行,or 直接写 sql 语句 …

线程池

注意到 slick 维护一个内部线程池,用于执行数据库相关的异步操作,通过 numThreads 参数指定最大线程数

Database.forConfig()

numThreads: 用于执行数据库相关的异步操作的线程数

maxConnections = numThreads * 5

minimumIdle = numThreads

因此以 Slick numThreads = 16 为例

  • maximumPoolSize = 16 * 5 = 80
  • minimumIdle = 16

hikari summary

关键配置项

  • minimumIdle 决定了连接池中的最小 idle 连接数,当 idle 连接少于该阈值时,hikari 会尽快补齐
  • idleTimeout (10min) 决定了 idle 连接的存活时间
  • maximumPoolSize 决定了连接池中的最大连接数
  • maxLifeTime (30min) 决定了连接最大存活时间

https://github.com/open-mpi/ompi/issues/6691#issuecomment-497245597

ess_hnp_module.c

1
2
3
4
orte_set_attribute(&transports, ORTE_RML_TRANSPORT_TYPE, ORTE_ATTR_LOCAL, orte_mgmt_transport, OPAL_STRING);
orte_mgmt_conduit = orte_rml.open_conduit(&transports);
orte_set_attribute(&transports, ORTE_RML_TRANSPORT_TYPE, ORTE_ATTR_LOCAL, orte_coll_transport, OPAL_STRING);
orte_coll_conduit = orte_rml.open_conduit(&transports);

rml: resource message layer

根据 attr 选择 rtmod, 返回的为 mod 在 array 中的 index

Open conduit - call each component and see if they can provide a
conduit that can satisfy all these attributes - return the conduit id
(a negative value indicates error)

rml_base_stubs.c

1
orte_rml_API_open_conduit

遍历 active 的 rml mod, 调用各个 rml mod 的 open_conduit, 例如 oob (rml_oob_component.c), 返回 mod 之后,存入 array,返回 array index

1
2
3
open_conduit()

orte_get_attribute(attributes, ORTE_RML_TRANSPORT_TYPE, (void**)&comp_attrib, OPAL_STRING)

从 attr 里获取 key 值,即 orte_mgmt_transport or orte_coll_transport, 确定是否指定了 oob

继续获取 routed mod

1
orte_get_attribute(attributes, ORTE_RML_ROUTED_ATTRIB, (void**)&comp_attrib, OPAL_STRING);

根据设置的 routed mod (NULL 则按优先级) 分配 routed mod

1
md->routed = orte_routed.assign_module(comp_attrib);

orte_mca_params.c

1
2
3
4
5
6
7
8
9
10
11
orte_mgmt_transport = "oob" // default

--mca orte_mgmt_transport

ORTE management messages

orte_coll_transport = "fabric,ethernet" // default

--mca orte_coll_transports

ORTE collectives

plm_base_launch_support.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
param = NULL;
if (ORTE_SUCCESS != (rc = orte_regx.nidmap_create(orte_node_pool, &param))) {
ORTE_ERROR_LOG(rc);
return rc;
}
if (NULL != orte_node_regex) {
free(orte_node_regex);
}
orte_node_regex = param;
/* if this is too long, then we'll have to do it with
* a phone home operation instead */
if (strlen(param) < orte_plm_globals.node_regex_threshold) {
opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID);
opal_argv_append(argc, argv, "orte_node_regex");
opal_argv_append(argc, argv, orte_node_regex);
/* mark that the nidmap has been communicated */
orte_nidmap_communicated = true;
}

orte_node_pool

node_regex_threshold = 1024 default len

Resource Allocation Subsystem (RAS)

managing python version

1
conda create --name dl python=3.6

activate your env [dl]

1
2
conda info --envs
conda activate dl

PyCharm with anaconda

build the latest openmpi

1
2
3
4
5
6
# download the openmpi-v4.0.0.tar.gz from the official website
# untar and run configure
./configure --prefix=/usr/local/openmpi --enable-orterun-prefix-by-default
# make and install
make -j $(nproc) all
make install

verify openmpi

1
mpirun -np 4 --bind-to none --map-by slot hostname

install tensorflow

1
/anaconda3/envs/dl/bin/pip --proxy http://127.0.0.1:1081 install tensorflow==1.13.1

install horovod

1
HOROVOD_WITH_TENSORFLOW=1 /anaconda3/envs/dl/bin/pip install -v --no-cache-dir horovod==0.16.2

ref horovod Dockerfile

0.16.2

horovod 启动流程

hvd.init() 发生了什么

HorovodBasics -> init -> horovod_init -> InitializeHorovodOnce

启动线程

BackgroundThreadLoop

MPI 初始化

MPI_Comm_dup

MPI_Comm_rank 获取当前进程的 RANK

MPI_Comm_size 获取 local size

两次 AllGather, 把 rank 与 size 分发到所有进程

检查是否同构,即 size 是否相同

rank 0 初始化 MessageTable

initialization_done = true 初始化结束

主线程等待

initialization_done = true

背景线程持续 RunLoopOnce

1
while (RunLoopOnce(state, is_coordinator));

从 message_queue 中获取数据

DistributedOptimizer

0%