spark-cluster-mode-on-k8s
https://spark.apache.org/docs/latest/cluster-overview.html
https://spark.apache.org/docs/latest/cluster-overview.html#glossary
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-pi.yaml
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-py-pi.yaml
KUBERNETES_SERVICE_HOST
KUBERNETES_SERVICE_PORT
1 | --class=... |
SPARK_HOME/bin/spark-submit args
Spark-on-k8s-operator controller run the spark-submit scripts
https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html
https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#cluster-mode
1 | $ bin/spark-submit \ |
https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#dependency-management
client mode is not supported in 2.3.1 (cluster manager)
but it seems work in 2.4
https://spark.apache.org/docs/latest/running-on-kubernetes.html
Dependencies Jars and Files
logs
1 | kubectl -n=<namespace> logs -f <driver-pod-name> |
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md
raft
CP
先写 log
再复制 log
一致性协议确保大多数都已成功复制 log 后,这个时候 log 也称为被 committed 了
执行状态机
三种角色
Leader / Follower / Candidate
Leader 负责周期性发起心跳
Follower 接收心跳
Follower 选举超时后,发起 Vote,状态转变为 Candidate
ETCD 的流程
写 WAL; raft 协议 ok; apply data to boltdb
使用 raft 来同步 WAL;
选举的限制
A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers
Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs
简单来说,在发起选举投票时,需要携带最新的 log 信息,包括 index 及 term;term 越大越新,如果 term 相同,则 log 的长度越长越新;这可以保证新选举出来的 leader 包含了之前所有 commited 的信息
网络分区需要特殊考虑 (2PC)
第一阶段先征求其他节点是否同意选举,如果同意选举则发起真正的选举操作,否则降为 Follower 角色
例如当有 follow 被隔离时,其 term 会不断增大,当其重新加入集群时,会触发选举,影响集群稳定性;为避免如此情况,增加 preVote 阶段,即发起 vote 的前提为
- 没有收到有效领导的心跳,至少有一次选举超时
- Candidate的日志足够新(Term更大,或者Term相同raft index更大)
才开始选举,避免网络隔离后,恢复加入的节点 term 较高,触发集群选举
客户端也需要考虑,网络分区,无法完成写入,需要 server 端返回特定的错误时,直接切换后端
Summary
- 2PC 两阶段
- Leader 一致性算法
aws sagemaker
没实际玩儿过 … 家里一直登陆不上 … 以下全凭看文档与代码
以 EC2 为基石设计,以镜像作为运行环境,完全面向镜像的设计
支持的 EC2 实例类型
https://amazonaws-china.com/sagemaker/pricing/instance-types/
性能最好的实例类型当前为
1 | ml.p3dn.24xlarge |
EC2 对应的实例类型为
1 | p3dn.24xlarge |
100 Gigabit 由 AWS efa 提供,提供 OS-bypass 通信能力,应该就是 Infiniband 网络
启动训练过程 (Script Mode)
- 上传本地代码至 S3
- 调用 Sagemaker 创建训练作业接口
- 创建 EC2 实例
- 在 EC2 实例上启动 Sagemaker 系统进程
- 下载数据集
- 启动容器 (镜像)
- 下载训练代码
- 启动训练代码
https://github.com/aws/sagemaker-containers
下载
https://github.com/aws/sagemaker-containers/blob/master/src/sagemaker_containers/_files.py#L112
上传
https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/fw_utils.py#L322
因此完整流程
创建作业/版本
play akka guideline
hikari connection pool
hikari 轻量级数据库连接池
https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#limited-resources
这句话挺有趣,言简意赅
More threads only perform better when blocking creates opportunities for executing.
hikari 的理念,connections 并非越多越好,相反,可能只需要少量;太多的 connections 反而会导致性能下降
一般而言,一个数据库操作由一个线程执行;而 CPU 一个核心,同时只能执行一个线程
基于该前提,在没有 IO 阻塞的情况,即线程只要启动,就能一直在处理工作,那多线程 (多 connections),并不能提高性能
如果有 IO 阻塞等情况,使得线程在自己的时间片中,不能充分使用 CPU core,这样通过线程切换技术,可以提高 CPU core 的使用率,最终来看提高了性能 (吞吐量)
因此 PostgresSQL 项目给出了一个连接数计算的参考公式
1 | connections = ((core_count * 2) + effective_spindle_count) |
Effective spindle count is zero if
the active data set is fully cached, and approaches the actual number of spindles
as the cache hit rate falls
effective_spindle_count 即 IO 等待时间的估计值
hikari 特殊的优化手段介绍
https://github.com/brettwooldridge/HikariCP/wiki/Down-the-Rabbit-Hole
- 修改代码: 生成更优的字节码
- 实现特定的数据结构: 更好的性能
连接池的一些问题
https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#pool-locking
一个线程获取多个数据库连接(饿死)
可能的优化方案
- 设置保证不会出现饿死情况的 pool size = Tn x (Cm - 1) + 1, Tn 为线程数, Cm 为每个线程最大获取的连接数
- 如果当前线程已经获取过数据库连接,再调用 getConnection 时,返回该线程当前的 connection,而不是返回新的 connection
hikari 的一些配置项
https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby
leakDetectionThreshold: connection 泄漏告警,getConnection 后,在 leakDetectionThreshold 时间前,未调用 close 方法,会日志打印告警
慢 SQL (10s 以上执行时间) 会导致该告警
idleTimeout = idleTimeout = 600s
This property controls the maximum amount of time that a connection is allowed to sit idle in the pool
此属性控制允许连接在池中空闲的最长时间,即空闲超过该时间后,该 Connection 将被 softEvictConnection
maxLifetime = maxLifetime = 1800s
Hikari 新建 connection 时,使用 houseKeeperExecutor 线程池执行,maxLifeTime 后触发 softEvictConnection
maxLifetime 需要结合 DB 的 connection timeout 设置
maxLifetime 加了 2.5% 的抖动,防止同一时间有大量的 Connections 被关闭
maximumPoolSize = maximumPoolSize = 10
This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. Basically this value will determine the maximum number of actual connections to the database backend.
决定了最大 Connections 数 (idle and in-use)
minimumIdle = maximumPoolSize
需要维持的最小 idle Connections 数
validationTimeout = 5s
This property controls the maximum amount of time that a connection will be tested for aliveness.
getConnection 时会判断 Connection aliveness,设置检测 aliveness 的超时时间
connectionTimeout = 30s
This property controls the maximum number of milliseconds that a client (that’s you) will wait for a connection from the pool.
getConnection 的超时时间
与 Scala Slick 结合
增加 slick-hikari 依赖
http://slick.lightbend.com/doc/3.2.1/gettingstarted.html#adding-slick-to-your-project
增加 typesafe config 配置项
http://slick.lightbend.com/doc/3.2.1/database.html#connection-pools
注意到 slick 将 hikari 的初始化方法封装了一层,另外该文档有一些错误,建议直接参考 hikari 的 readme.md 说明,也没几个核心参数
Slick 这个 O (or function) RM 怎么说呢,一言难尽,表达式太弱 (当然可能是我不熟悉如此灵魂的 function 写法),生成的 SQL 语句非最优。举个例子
1 | select count(1), count(distinct job_name), * from A, B, C |
就没法实现,要么写成子查询,要么只能分开并发执行,or 直接写 sql 语句 …
线程池
注意到 slick 维护一个内部线程池,用于执行数据库相关的异步操作,通过 numThreads 参数指定最大线程数
Database.forConfig()
numThreads: 用于执行数据库相关的异步操作的线程数
maxConnections = numThreads * 5
minimumIdle = numThreads
因此以 Slick numThreads = 16 为例
- maximumPoolSize = 16 * 5 = 80
- minimumIdle = 16
hikari summary
关键配置项
- minimumIdle 决定了连接池中的最小 idle 连接数,当 idle 连接少于该阈值时,hikari 会尽快补齐
- idleTimeout (10min) 决定了 idle 连接的存活时间
- maximumPoolSize 决定了连接池中的最大连接数
- maxLifeTime (30min) 决定了连接最大存活时间
openmpi
https://github.com/open-mpi/ompi/issues/6691#issuecomment-497245597
ess_hnp_module.c
1 | orte_set_attribute(&transports, ORTE_RML_TRANSPORT_TYPE, ORTE_ATTR_LOCAL, orte_mgmt_transport, OPAL_STRING); |
rml: resource message layer
根据 attr 选择 rtmod, 返回的为 mod 在 array 中的 index
Open conduit - call each component and see if they can provide a
conduit that can satisfy all these attributes - return the conduit id
(a negative value indicates error)
rml_base_stubs.c
1 | orte_rml_API_open_conduit |
遍历 active 的 rml mod, 调用各个 rml mod 的 open_conduit, 例如 oob (rml_oob_component.c), 返回 mod 之后,存入 array,返回 array index
1 | open_conduit() |
从 attr 里获取 key 值,即 orte_mgmt_transport or orte_coll_transport, 确定是否指定了 oob
继续获取 routed mod
1 | orte_get_attribute(attributes, ORTE_RML_ROUTED_ATTRIB, (void**)&comp_attrib, OPAL_STRING); |
根据设置的 routed mod (NULL 则按优先级) 分配 routed mod
1 | md->routed = orte_routed.assign_module(comp_attrib); |
orte_mca_params.c
1 | orte_mgmt_transport = "oob" // default |
plm_base_launch_support.c
1 | param = NULL; |
orte_node_pool
node_regex_threshold = 1024 default len
Resource Allocation Subsystem (RAS)
conda-dl
1 | conda create --name dl python=3.6 |
activate your env [dl]
1 | conda info --envs |
PyCharm with anaconda
1 | # download the openmpi-v4.0.0.tar.gz from the official website |
verify openmpi
1 | mpirun -np 4 --bind-to none --map-by slot hostname |
install tensorflow
1 | /anaconda3/envs/dl/bin/pip --proxy http://127.0.0.1:1081 install tensorflow==1.13.1 |
install horovod
1 | HOROVOD_WITH_TENSORFLOW=1 /anaconda3/envs/dl/bin/pip install -v --no-cache-dir horovod==0.16.2 |
mxnet debug
recomplie mxnet
1 | DEBUG=1 in comfig.mk, then recompile the whole framework. |
https://github.com/apache/incubator-mxnet/issues/6796#issuecomment-311310985
1 | make -j $(nproc) YOU_OPTIONS will compile parallelly |
horovod-tf
0.16.2
horovod 启动流程
hvd.init() 发生了什么
HorovodBasics -> init -> horovod_init -> InitializeHorovodOnce
启动线程
BackgroundThreadLoop
MPI 初始化
MPI_Comm_dup
MPI_Comm_rank 获取当前进程的 RANK
MPI_Comm_size 获取 local size
两次 AllGather, 把 rank 与 size 分发到所有进程
检查是否同构,即 size 是否相同
rank 0 初始化 MessageTable
initialization_done = true 初始化结束
主线程等待
initialization_done = true
背景线程持续 RunLoopOnce
1 | while (RunLoopOnce(state, is_coordinator)); |
从 message_queue 中获取数据
DistributedOptimizer