0%

https://github.com/linux-rdma/perftest/blob/master/src/write_bw.c

main

1
2
user_param.verb = WRITE;
user_param.tst = BW;

parser

1
2
-c, --connection=<RC/XRC/UC/DC> Connection type RC/XRC/UC/DC (default RC)
-s, --size=<size> Size of message to exchange (default 65536)

init_perftest_params

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...

#define DEF_SIZE_BW (65536)
#define DEF_SIZE_LAT (2)
#define DEF_CACHE_LINE_SIZE (64)
#define DEF_PAGE_SIZE (4096)
#define DEF_FLOWS (1)

...

user_param->size = (user_param->tst == BW ) ? DEF_SIZE_BW : DEF_SIZE_LAT;

user_param->connection_type = (user_param->connection_type == RawEth) ? RawEth : RC;

...

user_param->cache_line_size = get_cache_line_size();
user_param->cycle_buffer = sysconf(_SC_PAGESIZE);

if (user_param->cycle_buffer <= 0) {
user_param->cycle_buffer = DEF_PAGE_SIZE;
}

...

user_param->flows = DEF_FLOWS;

get_cache_line_size()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
	int size = 0;
#if !defined(__FreeBSD__)
size = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
if (size == 0) {
#if defined(__sparc__) && defined(__arch64__)
char* file_name =
"/sys/devices/system/cpu/cpu0/l2_cache_line_size";
#else
char* file_name =
"/sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size";
#endif

FILE *fp;
char line[10];
fp = fopen(file_name, "r");
if (fp == NULL) {
return DEF_CACHE_LINE_SIZE;
}
if(fgets(line,10,fp) != NULL) {
size = atoi(line);
fclose(fp);
}
}
#endif
if (size <= 0)
size = DEF_CACHE_LINE_SIZE;

getconf LEVEL1_DCACHE_LINESIZE

main -> alloc_ctx

1
2
3
4
5
6
7
8
9
10
ctx->size = user_param->size;

num_of_qps_factor = (user_param->mr_per_qp) ? 1 : user_param->num_of_qps;

/* holds the size of maximum between msg size and cycle buffer,
* aligned to cache line,
* it is multiply by 2 for send and receive
* with reference to number of flows and number of QPs */
ctx->buff_size = INC(BUFF_SIZE(ctx->size, ctx->cycle_buffer),
ctx->cache_line_size) * 2 * num_of_qps_factor * user_param->flows;

65536 = 64Kb

generally, 16 pages

root cause: ulimit -l is 16 (default) in container

本地复现一个 TcpStore 的测试用例问题,修改了部分代码,因此需要源码编译 PyTorch

环境信息

  • macOS 10.15.7
  • XCode 12.2 (12B45b)

源码信息

master latest commit (2020-11-14): f8248543a13b0144a6f5d0a549f72b1e470d88aa

1
2
3
commit f8248543a13b0144a6f5d0a549f72b1e470d88aa (github/master, github/gh/ljk53/194/base, github/HEAD, master)
Author: Rohan Varma <rvarm1@fb.com>
Date: Sat Nov 14 13:36:31 2020 -0800

构建

(1) https://github.com/pytorch/pytorch#from-source

(2) https://github.com/pytorch/pytorch/blob/master/CONTRIBUTING.md#c-development-tips

glog

1
brew install glog

conda

1
2
3
4
5
6
7
8
conda create -n pytorch-dev python=3.6

conda activate pytorch-dev

conda install numpy ninja pyyaml mkl mkl-include setuptools cmake cffi typing_extensions future six requests dataclasses

# Add these packages if torch.distributed is needed
conda install pkg-config libuv

build and install

uninstall

1
2
3
4
conda uninstall torch
pip uninstall torch

rm -rf build/

then reinstall

1
2
export CMAKE_PREFIX_PATH=${CONDA_PREFIX:-"$(dirname $(which conda))/../"}
MACOSX_DEPLOYMENT_TARGET=10.9 CC=clang CXX=clang++ MAX_JOBS=8 BUILD_CAFFE2=0 BUILD_CAFFE2_OPS=0 USE_GLOG=1 USE_DISTRIBUTED=1 USE_MKLDNN=0 USE_CUDA=0 USE_FBGEMM=0 USE_NNPACK=0 USE_QNNPACK=0 USE_XNNPACK=0 python setup.py develop

Quad-Core Intel Core i7 ~ 45min

测试

1
2
3
4
5
6
Python 3.6.12 |Anaconda, Inc.| (default, Sep  8 2020, 17:50:39)
[GCC Clang 10.0.0 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import torch
>>> torch.__version__
'1.8.0a0+f1a8a82'

TcpStore

1
2
3
4
5
6
7
python test/distributed/test_c10d.py

Python 3.6.12 |Anaconda, Inc.| (default, Sep 8 2020, 17:50:39)
[GCC Clang 10.0.0 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import torch.distributed as dist
>>> server_store = dist.TCPStore("127.0.0.1", 18668, 1, True)

or

1
./build/bin/TCPStoreTest

https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-termination

简而言之,pod 被删除时

  1. kubelet 触发 container runtime 对 pod 中的每个 container 的 1 号进程,发送 TERM 信号
  2. 等待 the grace period expires (terminationGracePeriodSeconds 默认为 30s)
  3. 如果 the grace period expires 后 containers 仍未退出,则 kubelet 触发 container runtime,向 pod 中的每个 container 中仍然处于 running 状态的进程发送 KILL 信号

正确处理 TERM 信号,可以让业务优雅退出(or 更快退出);例如

假设 pod command 为

https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#run-a-command-in-a-shell

1
2
command: ["/bin/bash"]
args: ["-c", "/home/rt/run.sh"]

or

1
2
3
4
command:
- "/bin/bash"
- "-c"
- "/home/rt/run.sh"

/bin/bash /home/rt/run.sh 是 1 号进程

在 /home/rt/run.sh 中可以如此处理,以达到优雅退出的目的

1
2
3
4
5
6
7
8
9
10
11
function prog_exit {
echo "receive SIGTERM signal"
pkill python
}

trap prog_exit SIGTERM

# main function
python /home/rt/train.py &

wait $!

ref: docker stop

https://docs.docker.com/engine/reference/commandline/stop/

The main process inside the container will receive SIGTERM, and after a grace period, SIGKILL.

k8s subPath and s3 fuse mount (s3fs)

意识流分析,致敬一个生产环境中的问题,真是看了很多代码

https://github.com/s3fs-fuse/s3fs-fuse

实现了 fuse 接口,底层对接 s3 协议

打开 s3fs info 日志,观察可知

fuse 周期性触发

  1. s3fs_getattr
    1. check_object_access
      1. get_object_attribute
        1. s3 head request
    2. check_parent_object_access
  2. s3fs_opendir
    1. check_object_access
      1. check_parent_object_access
  3. s3fs_readdir 方法,该方法底层会调用 list_bucket func,到s3 server查询对象列表
    1. 查询到有新的对象后,会 add 到本地 stat cache,并且下载到本地

所以使用 s3fs mount s3 bucket 到本地时,s3fs 会周期性同步远端的数据到本地,写入时,也会同步到远端

1
kill -SIGUSR2 ${s3fs_pid}

可以动态调整 s3fs 日志级别,s3fs 默认日志输出到 /var/log/messages

messages.2.gz 文件解压,可以使用 gzip -d messages.2.gz


生产环境出了一个诡异的问题

server / worker 两个实例,在不同节点,挂载了同一个桶

k8s subPath 方式挂载子对象,例如 test-bucket/output1

在开始阶段 server s3fs 会上传 test-bucket/output1 文件夹,两次

而 worker s3fs 不会

debug 了一会儿,直接原因与 s3 对象的权限相关,用户创建的对象,s3fs mount 时,没权限 head,但是又能 list 到

s3fs 如果 list 该对象为非空时,可正常读写该对象下的内容,也与验证结果符合 -.0


分割线,前边是一些分析

k8s subPath

subPath 是相对于 volume root path 的一个 sub path,用于挂载 volume 内的子目录

实现上首先会将 volume 挂载于节点的 pod volume 路径下

  1. 再去 open subPath,open 之后,成为 kubelet 进程下的一个 fd
  2. 接下来创建 volume-subpaths 下的一个路径(文件名与 volumeMount index 一致),并且判断 subPath 是文件还是文件夹
    1. 文件夹,mkdir 一把
    2. 文件,写一个空文件
  3. 最后将该 fd mount 到上述创建的路径中

不明点,在 1 步骤上,实际上 subPath 为 noObject;从文件系统的角度不确定它为文件夹,还是对象,而 kubelet 却执行成功了,并顺利挂载到容器中

从 s3fs 的日志,对 subPath 首先执行了

  1. s3fs_mkdir,mode 0750
  2. s3fs_chmod,mode 40777

即触发了两次 s3 PUT.OBJECT 的操作

这块就不深入分析了为何如此了

感叹一下,系统层层调用之后,问题定位需要极其清晰的思路,大胆假设,小心求证;解决问题的思路是第一重要的


另外 s3fs 1.80 版本 err ret 不打印错误信息,只打印了个 response code,这个也比较伤,建议升级到当前最新版本
s3fs 1.86,会把错误信息也打印出来

https://spark.apache.org/docs/latest/cluster-overview.html

https://spark.apache.org/docs/latest/cluster-overview.html#glossary

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-pi.yaml

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-py-pi.yaml

KUBERNETES_SERVICE_HOST

KUBERNETES_SERVICE_PORT

1
2
3
4
5
6
--class=...
--master k8s://https://%s:%s
--deploy-mode cluster/client
--conf spark.kubernetes.namespace=default
--conf spark.app.name=spark-pi
SparkPi.jar (MainApplicationFile: MainFile is the path to a bundled JAR, Python, or R file of the application.)

SPARK_HOME/bin/spark-submit args

Spark-on-k8s-operator controller run the spark-submit scripts

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#cluster-mode

1
2
3
4
5
6
7
8
$ bin/spark-submit \
--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
--deploy-mode cluster
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/examples.jar

https://spark.apache.org/docs/2.3.1/running-on-kubernetes.html#dependency-management

client mode is not supported in 2.3.1 (cluster manager)

but it seems work in 2.4

https://spark.apache.org/docs/latest/running-on-kubernetes.html

Dependencies Jars and Files

logs

1
kubectl -n=<namespace> logs -f <driver-pod-name>

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/design.md

CP

先写 log

再复制 log

一致性协议确保大多数都已成功复制 log 后,这个时候 log 也称为被 committed 了

执行状态机

三种角色

Leader / Follower / Candidate

Leader 负责周期性发起心跳

Follower 接收心跳

Follower 选举超时后,发起 Vote,状态转变为 Candidate

ETCD 的流程

写 WAL; raft 协议 ok; apply data to boltdb

使用 raft 来同步 WAL;

选举的限制

A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs

简单来说,在发起选举投票时,需要携带最新的 log 信息,包括 index 及 term;term 越大越新,如果 term 相同,则 log 的长度越长越新;这可以保证新选举出来的 leader 包含了之前所有 commited 的信息

网络分区需要特殊考虑 (2PC)

第一阶段先征求其他节点是否同意选举,如果同意选举则发起真正的选举操作,否则降为 Follower 角色

例如当有 follow 被隔离时,其 term 会不断增大,当其重新加入集群时,会触发选举,影响集群稳定性;为避免如此情况,增加 preVote 阶段,即发起 vote 的前提为

  • 没有收到有效领导的心跳,至少有一次选举超时
  • Candidate的日志足够新(Term更大,或者Term相同raft index更大)

才开始选举,避免网络隔离后,恢复加入的节点 term 较高,触发集群选举

客户端也需要考虑,网络分区,无法完成写入,需要 server 端返回特定的错误时,直接切换后端

Summary

  • 2PC 两阶段
  • Leader 一致性算法

没实际玩儿过 … 家里一直登陆不上 … 以下全凭看文档与代码

以 EC2 为基石设计,以镜像作为运行环境,完全面向镜像的设计

支持的 EC2 实例类型

https://amazonaws-china.com/sagemaker/pricing/instance-types/

性能最好的实例类型当前为

1
2
3
4
5
6
ml.p3dn.24xlarge
vCPU: 96
GPUs: 8xV100
Mem(GiB): 768
GPU Mem(GiB): 256
Networking Performance: 100 Gigabit

EC2 对应的实例类型为

1
2
3
4
5
6
7
8
9
p3dn.24xlarge   
GPUs: 8
vCPU: 96
Mem(GiB): 768
GPU Mem(GiB): 256
GPU P2P: NVLink
Storage(GB): 2 x 900 NVMe SSD
Dedicated EBS Bandwidth: 14 Gbps
Networking Performance: 100 Gigabit

100 Gigabit 由 AWS efa 提供,提供 OS-bypass 通信能力,应该就是 Infiniband 网络

启动训练过程 (Script Mode)

  • 上传本地代码至 S3
  • 调用 Sagemaker 创建训练作业接口
    • 创建 EC2 实例
    • 在 EC2 实例上启动 Sagemaker 系统进程
    • 下载数据集
    • 启动容器 (镜像)
    • 下载训练代码
    • 启动训练代码

https://github.com/aws/sagemaker-containers

下载

https://github.com/aws/sagemaker-containers/blob/master/src/sagemaker_containers/_files.py#L112

上传

https://github.com/aws/sagemaker-python-sdk/blob/master/src/sagemaker/fw_utils.py#L322

因此完整流程

创建作业/版本

hikari 轻量级数据库连接池

https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#limited-resources

这句话挺有趣,言简意赅

More threads only perform better when blocking creates opportunities for executing.

hikari 的理念,connections 并非越多越好,相反,可能只需要少量;太多的 connections 反而会导致性能下降

一般而言,一个数据库操作由一个线程执行;而 CPU 一个核心,同时只能执行一个线程

基于该前提,在没有 IO 阻塞的情况,即线程只要启动,就能一直在处理工作,那多线程 (多 connections),并不能提高性能

如果有 IO 阻塞等情况,使得线程在自己的时间片中,不能充分使用 CPU core,这样通过线程切换技术,可以提高 CPU core 的使用率,最终来看提高了性能 (吞吐量)

因此 PostgresSQL 项目给出了一个连接数计算的参考公式

1
connections = ((core_count * 2) + effective_spindle_count)

Effective spindle count is zero if
the active data set is fully cached, and approaches the actual number of spindles
as the cache hit rate falls

effective_spindle_count 即 IO 等待时间的估计值

hikari 特殊的优化手段介绍

https://github.com/brettwooldridge/HikariCP/wiki/Down-the-Rabbit-Hole

  • 修改代码: 生成更优的字节码
  • 实现特定的数据结构: 更好的性能

连接池的一些问题

https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing#pool-locking

一个线程获取多个数据库连接(饿死)

可能的优化方案

  • 设置保证不会出现饿死情况的 pool size = Tn x (Cm - 1) + 1, Tn 为线程数, Cm 为每个线程最大获取的连接数
  • 如果当前线程已经获取过数据库连接,再调用 getConnection 时,返回该线程当前的 connection,而不是返回新的 connection

hikari 的一些配置项

https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby

leakDetectionThreshold: connection 泄漏告警,getConnection 后,在 leakDetectionThreshold 时间前,未调用 close 方法,会日志打印告警

慢 SQL (10s 以上执行时间) 会导致该告警

idleTimeout = idleTimeout = 600s

This property controls the maximum amount of time that a connection is allowed to sit idle in the pool

此属性控制允许连接在池中空闲的最长时间,即空闲超过该时间后,该 Connection 将被 softEvictConnection

maxLifetime = maxLifetime = 1800s

Hikari 新建 connection 时,使用 houseKeeperExecutor 线程池执行,maxLifeTime 后触发 softEvictConnection

maxLifetime 需要结合 DB 的 connection timeout 设置

maxLifetime 加了 2.5% 的抖动,防止同一时间有大量的 Connections 被关闭

maximumPoolSize = maximumPoolSize = 10

This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. Basically this value will determine the maximum number of actual connections to the database backend.

决定了最大 Connections 数 (idle and in-use)

minimumIdle = maximumPoolSize

需要维持的最小 idle Connections 数

validationTimeout = 5s

This property controls the maximum amount of time that a connection will be tested for aliveness.

getConnection 时会判断 Connection aliveness,设置检测 aliveness 的超时时间

connectionTimeout = 30s

This property controls the maximum number of milliseconds that a client (that’s you) will wait for a connection from the pool.

getConnection 的超时时间

与 Scala Slick 结合

增加 slick-hikari 依赖

http://slick.lightbend.com/doc/3.2.1/gettingstarted.html#adding-slick-to-your-project

增加 typesafe config 配置项

http://slick.lightbend.com/doc/3.2.1/database.html#connection-pools

注意到 slick 将 hikari 的初始化方法封装了一层,另外该文档有一些错误,建议直接参考 hikari 的 readme.md 说明,也没几个核心参数

http://slick.lightbend.com/doc/3.2.1/api/index.html#slick.jdbc.JdbcBackend$DatabaseFactoryDef@forConfig(String,Config,Driver,ClassLoader):Database

Slick 这个 O (or function) RM 怎么说呢,一言难尽,表达式太弱 (当然可能是我不熟悉如此灵魂的 function 写法),生成的 SQL 语句非最优。举个例子

1
select count(1), count(distinct job_name), * from A, B, C

就没法实现,要么写成子查询,要么只能分开并发执行,or 直接写 sql 语句 …

线程池

注意到 slick 维护一个内部线程池,用于执行数据库相关的异步操作,通过 numThreads 参数指定最大线程数

Database.forConfig()

numThreads: 用于执行数据库相关的异步操作的线程数

maxConnections = numThreads * 5

minimumIdle = numThreads

因此以 Slick numThreads = 16 为例

  • maximumPoolSize = 16 * 5 = 80
  • minimumIdle = 16

hikari summary

关键配置项

  • minimumIdle 决定了连接池中的最小 idle 连接数,当 idle 连接少于该阈值时,hikari 会尽快补齐
  • idleTimeout (10min) 决定了 idle 连接的存活时间
  • maximumPoolSize 决定了连接池中的最大连接数
  • maxLifeTime (30min) 决定了连接最大存活时间