也许 kubernetes 对于开发者来说仍然过于复杂,另外现实系统中,又对服务发布后的流量管理有诸多需求

Knative 的出现,似乎是要真正实现 PAAS on the kubernetes

开发者仅需要关心如何编写代码 (write code) ,其他的所有,交给 Knative 吧(Maybe)

One Platform for Your Functions, Applications, and Containers (Cloud Next ‘18)

Knative serving

controller

  • revision
  • configuration
  • services
  • routes

抽象了一套统一的框架去实现四个 controller,不过当前命名的不是很理想,虽然都在 controller package 下面,然而文件名仅为 revision.go … 不如 kubernetes 的 job_controller.go 来的直接,也便于搜索

controller 的主要方法均为 Reconsile,即从 kube-apiserver list-watch CRD 的增量更新后,调用 Reconsile 执行相应操作,使得最终状态与用户期望的一致

提到 list-watch 而又不能不提到 kubernetes 中的杰出 api sdk 实现——informer

基于已日渐稳定的 kubernetes,knative 目前实现的简洁直接

1
ctrlr.Run(threadsPerController, stopCh)

每个 controller 启动 2 (threadsPerController) 个 goroutine 处理 list-watch 获得的 CRD 更新信息

1
2
3
4
5
6
for i := 0; i < threadiness; i++ {
go wait.Until(func() {
for c.processNextWorkItem(syncHandler) {
}
}, time.Second, stopCh)
}

syncHandler 由各个不同的 controller 传入

下面简单分析 knative serving 模块的几个 controller

services

根据 knative 的 simple demo app,开始的时候,我们需要使用 yaml 创建一个 service,这是所有 knative 奇妙之旅的开端 getting-started-knative-app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: serving.knative.dev/v1alpha1 # Current version of Knative
kind: Service
metadata:
name: helloworld-go # The name of the app
namespace: default # The namespace the app will use
spec:
runLatest:
configuration:
revisionTemplate:
spec:
container:
image: gcr.io/knative-samples/helloworld-go # The URL to the image of the app
env:
- name: TARGET # The environment variable printed out by the sample app
value: "Go Sample v1"

services 的 reconcile 首先会查询

service 对应的 configuration (its name is the same with service-name) 是否存在

  • 不存在,创建之
  • 存在,reconcile 之

service 对应的 routes (its name is the same with service-name) 是否存在

  • 不存在,创建之
  • 存在,reconcile 之

configuration

获取对应的 rev

[config-name]-[config.spec.Generation]: helloworld-go-00001

  • 不存在,创建之

随后更新 configuration 的 status

所以可以看到在 configuration 中其实实现了 app 的多版本管理,每次 configuration 的修改(Generation + 1)均会生成一个新的 revision

revision

revision 关注下述几种资源,在下述资源有变化时,将变化加入 queue 中,等待 revision 2 个 goroutine 处理之

  • revisionInformer
  • deploymentInformer

暂时仅关注 revisionInformer,endpointsInformer 及 deploymentInformer

revision controller 获取到 revision 之后

若未找到其对应的 deployment

[rev-name]-deployment

将其 revision 的 status 更新为

  • ResourcesAvailable [status: Unknown, reason: Deploying]
  • ContainerHealthy [status: Unknown, reason: Deploying]
  • Ready [status: Unknown, reason: Deploying]

并调用 kube-apiserver api 创建 deployment

注意到在创建 deployment 时,revision controller 需要连接至该 deployment 的镜像仓库,获取其 digest,因此如果 revision controller 所在节点的网络受限的话,revision 的 status 可能会提示如下信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
status:
conditions:
- lastTransitionTime: 2018-08-29T19:03:43Z
reason: Deploying
status: Unknown
type: ResourcesAvailable
- lastTransitionTime: 2018-08-29T19:04:13Z
message: 'Get https://gcr.io/v2/: dial tcp: i/o timeout'
reason: ContainerMissing
status: "False"
type: ContainerHealthy
- lastTransitionTime: 2018-08-29T19:04:13Z
message: 'Get https://gcr.io/v2/: dial tcp: i/o timeout'
reason: ContainerMissing
status: "False"
type: Ready

即连接镜像仓库(如示例中的连接 gcr.io 超时),导致 revision notReady,正常工作的 revision 状态如下

1
2
3
4
5
6
7
8
9
10
11
status:
conditions:
- lastTransitionTime: 2018-08-30T09:36:46Z
status: "True"
type: ResourcesAvailable
- lastTransitionTime: 2018-08-30T09:36:46Z
status: "True"
type: ContainerHealthy
- lastTransitionTime: 2018-08-30T09:36:46Z
status: "True"
type: Ready

非 active 的 revision 状态如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
status:
conditions:
- lastTransitionTime: 2018-08-30T09:09:18Z
reason: Updating
status: Unknown
type: ResourcesAvailable
- lastTransitionTime: 2018-08-30T09:09:18Z
reason: Updating
status: Unknown
type: ContainerHealthy
- lastTransitionTime: 2018-08-30T09:09:18Z
reason: Inactive
status: "False"
type: Ready

若找到其对应的 deployment

则根据 rev 的状态,决定 deployment replica 的数量

  • rev.spec.servingState 状态为 Active,且 deployment replica = 0 时,需将其调整为 1
  • rev.spec.servingState 状态为 Reserve,且 deployment replica != 0 时,需将其调整为 0

如果期望的 deployment replica 与实际的 replica 相同,那么将 rev 的 status 更新为

  • ResourcesAvailable [status: Unknown, reason: Updating]
  • ContainerHealthy [status: Unknown, reason: Updating]
  • Ready [status: Unknown, reason: Updating]

如果不相同,调用 kube-apiserver api 更新 deployment

routes

routes 获取其对应的 kubernetes svc

[route-name] 例如 helloworld-go

  • 不存在,创建之
  • 否则,更新之

随后通过 istio CRD Istio VirtualService 配置流量

  • 不存在,创建之
  • 否则,更新之

Summary

controller of serving

  • services controller
  • configuration controller
  • routes controller
  • revision controller

数据源均来自 list-watch 相应的 CRD,实现相应的 reconcile 方法

services controller 负责创建 configuration 和 routes 资源

configuration controller 负责创建 revision 资源

routes controller 负责创建 Istio VirtualService 资源

Issues

Knative 还处于较为年轻的阶段,花了两天时间最后成功在内网环境上成功运行了其 simple demo app。目前在需要使用 proxy 访问公网网络的情况下,如何配置 knative,其文档中还没有相关的说明

目前为止尝试 knative 的一些 debug 经历,可参看下述 issues

Knative http proxy sample

istio-statsd-prom-bridge pod crash due to unknown short flag

Configuration is waiting for a Revision to become ready

to be cont …

kubelet 为运行在 node 上的主要组件

其一方面 list-watch kube-apiserver pod 资源的变化

另一方面调用 docker 接口获取当前实际存在的 container 来 SyncLoop (PLEG)

所以下面分两条路线来分析 kubelet 的一些细节 (仅关注 pod/container,略去其他无关资源)

overview

  • Run
    • start a goroutine, one second trigger once podKiller method
    • call kl.statusManager.Start()
    • call kl.probeManager.Start()
    • call kl.pleg.Start()
      • start a goroutine, one second trigger once relist method
    • call kubelet main loop kl.syncLoop(updates, kl)
      • syncLoopIteration

注意到 kubelet main loop 传递的 updates channel 为从 kube-apiserver list-watch 到的 pod 变化数据,当 kubelet 重启时,会收到当前 node 上的所有 pod 数据

syncLoopIteration 是 kubelet 的 main loop,其主要处理

  • configCh channel: pod info with ADD / Update / Delete …, this channel’s data comes from kube-apiserver
  • plegCh channel: pod life cycle generator event, such as ContainerStart / ContainerDied …, this channel’s data comes from docker
  • syncCh channel: a one-second period time ticker
  • livenessManager.Updates() channel
  • housekeepingCh channel: a two-second period time ticker

当 kubelet 启动时指定 -v=2 的情况下

kubelet 处理 configCh 数据时,会显示如下日志

1
2
3
4
5
6
7
SyncLoop (ADD, api): podName_podNamespace(podUID),...

or

SyncLoop (UPDATE, api): podName_podNamespace(podUID),...

and REMOVE / RECONCILE / DELETE / RESTORE type

具体如下

1
SyncLoop (ADD, "api"): "nginx-deployment-6c54bd5869-wppm8_default(1336f9f0-a898-11e8-b01b-000d3a362518)"

kubelet 处理 plegCh 数据时,会显示如下日志

1
SyncLoop (PLEG): podName_podNamespace(podUID),..., event:

具体如下

1
SyncLoop (PLEG): "nginx-deployment-6c54bd5869-9jsp5_default(1336d6cf-a898-11e8-b01b-000d3a362518)", event: &pleg.PodLifecycleEvent{ID:"1336d6cf-a898-11e8-b01b-000d3a362518", Type:"ContainerStarted", Data:"7e06e4ce8ab3a4a0b8bbb84f35ac8ac078bb5ec9db4ce765e35a235664cb3dd7"}

Data 为 ContainerID 与 docker ps 看到的相同

1
2
hzs@kubernetes:~/work/src/k8s.io/kubernetes$ docker ps | grep 7e06e4ce8ab3
7e06e4ce8ab3 nginx "nginx -g 'daemon of…" 3 minutes ago Up 3 minutes k8s_nginx_nginx-deployment-6c54bd5869-9jsp5_default_1336d6cf-a898-11e8-b01b-000d3a362518_0

经过上述分析,大概对 kubelet 的工作原理及数据来源有了个基本认识,下面详细看一下 kubelet 对 configCh 及 plegCh 的数据处理

configCh

list-watch from kube-apiserver in a independant goroutine, once there is events about pods, then these pod data will be put into configCh

syncLoopIteration -> handle the pod data from list-watch from kube-apiserver pod resource -> HandlePodAdditions/… -> dispatchWork -> podWorkers.UpdatePod

UpdatePod

  • 如果之前未处理该 pod,则为该 pod 创建一个大小为 1 的 UpdatePodOptions channel,并启动一个协程调用 managePodLoop(podUpdates)
  • 如果处理过了,判断 isWorking
    • 若 false,则置为 true,并将 *options 置入 UpdatePodOptions channel,以供 managePodLoop 处理
    • 若 true,则进一步判断 lastUndeliveredWorkUpdate 未被记录或者 UpdateType 不等于 kubetypes.SyncPodKill,则更新 lastUndeliveredWorkUpdate 为本次 UpdatePod *options

managePodLoop -> syncPodFn -> kubelet.syncPod

plegCh

one second trigger once time relist -> generate container event (Started/…) -> put the event into eventChannel channel

syncLoopIteration -> handle the event from eventChannel -> HandlePodSyncs -> dispatchWork -> podWorkers.UpdatePod

看到这,简单总结一下,两条更新的路,最终得到统一,即来自于 kube-apiserver pod 更新,又亦或是来自于节点上 container status 的变化 (pleg),最终均会调用 syncPod

1
2
SyncLoop (ADD, "api"): "nginx-deployment-6c54bd5869-wppm8_default(1336f9f0-a898-11e8-b01b-000d3a362518)"
SyncLoop (PLEG): "nginx-deployment-6c54bd5869-9jsp5_default(1336d6cf-a898-11e8-b01b-000d3a362518)", event: &pleg.PodLifecycleEvent{ID:"1336d6cf-a898-11e8-b01b-000d3a362518", Type:"ContainerStarted", Data:"7e06e4ce8ab3a4a0b8bbb84f35ac8ac078bb5ec9db4ce765e35a235664cb3dd7"}

syncPod

举几个典型的例子吧

create a deployment with replica 1

kubelet 的响应流程

configCh

  • SyncLoop (ADD, “api”): “podName_namespace(podUID)”
  • HandlePodAdditions

从 podManager 中获取已存在的 pod,并将新的 pod 添加至其中。从已存在的 pod 中过滤出 activePods,并判断新的 pod canAdmitPod。例如亲和性、反亲和性的判断

  • dispatchWork

调用 podWorkers.UpdatePod

1
2
3
4
5
6
7
8
9
10
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType, // SyncPodCreate
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
  • UpdatePod
1
初始化 podUID -> UpdatePodOptions channel (1),并启动协程执行 p.managePodLoop(podUpdates)。p.isWorking[pod.UID] 为 false,随后设置其为 true,并将 *options 置入 UpdatePodOptions channel
  • managePodLoop

循环处理 UpdatePodOptions channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
  • syncPodFn(kubelet.syncPod)

mkdir dir

1
/var/lib/kubelet/pods/[podUID]
  • volumeManager.WaitForAttachAndMount(pod)

获取 imagePullSecrets

1
2
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
  • containerRuntime.SyncPod
1
2
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
  • createPodSandBox

mkdir logs dir

1
/var/log/pods/[podUID]
  • run init container
  • run container

pleg

relist 的时候,先从 docker 获取一把全量的 pod 数据

1
2
// Get all the pods.
podList, err := g.runtime.GetPods(true)

当前状态与之前的状态一比,生成每个 container 的 PLE (pod life cycle event)

1
SyncLoop (PLEG): "podName_Namespace(podUID)", event: &pleg.PodLifecycleEvent{ID:"podUID", Type:"ContainerStarted", Data:"ContainerID"}

值得注意的是 ContainerDied 就是容器退出的意思

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func generateEvents(podID types.UID, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent {
if newState == oldState {
return nil
}
glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
switch newState {
case plegContainerRunning:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerStarted, Data: cid}}
case plegContainerExited:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}}
case plegContainerUnknown:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerChanged, Data: cid}}
case plegContainerNonExistent:
switch oldState {
case plegContainerExited:
// We already reported that the container died before.
return []*PodLifecycleEvent{{ID: podID, Type: ContainerRemoved, Data: cid}}
default:
return []*PodLifecycleEvent{{ID: podID, Type: ContainerDied, Data: cid}, {ID: podID, Type: ContainerRemoved, Data: cid}}
}
default:
panic(fmt.Sprintf("unrecognized container state: %v", newState))
}
}
  • plegCh

plegCh 有数据后,调用 HandlePodSyncs 处理之

  • UpdatePod
1
2
3
4
5
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}

即更新 lastUndeliveredWorkUpdate

HandlePodSyncs 执行结束之后(同步)

如果容器挂掉,则执行清理动作

1
2
3
4
5
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}

默认配置,只保留最新一个。若 pod 被 evicted,或者是 DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses),那么它的所有容器将会被删除

1
MaxPerPodContainerCount: 1

golang profiling (即剖析),golang 原生提供了 pprof 性能分析工具

前段时间分析了一个 apiserver 处理请求性能较低的问题,正是使用了 pprof 确定了问题点,从而解决了该问题

这次使用 etcd https://github.com/coreos/etcd 来举个例子,关于 pprof 的使用及可视化,Ref 中提到了 golang 性能分析大名鼎鼎的几篇 blog,建议先行参考,看了之后会对 golang 性能分析有个 overall 的思路

此篇并无太多 creative 之处

Show time

之前提到 golang 中自带了 pprof 采集代码,而且启用它们也非常简单

如果是一个 web server 的话,仅需要 import _ “net/http/pprof”,则会注册 pprof 相关的 handler

1
2
3
4
5
6
import _ "net/http/pprof"
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
}

web server 启动后,即可调用 pprof 接口获取数据

1
wget http://localhost:6060/debug/pprof/profile -O profile-data

当然 curl 也行

1
curl http://localhost:6060/debug/pprof/profile -o profile-data

对于 etcd 来说,pprof 也已经集成,可通过启动参数指定开启

1
./etcd --enable-pprof

当然对于非 web server 类的 app,也可以使用 runtime/pprof 包中的方法输出 pprof 数据

go tool pprof

以 etcd v3.1.9 为例子, go1.10

采样 10s cpu profile 数据

1
curl http://localhost:2379/debug/pprof/profile?seconds=10 -o etcd-profile-10

使用 go tool pprof 分析

1
2
3
4
5
6
bash-3.2$ go tool pprof $GOPATH/src/github.com/coreos/etcd/bin/etcd etcd-profile-10
File: etcd
Type: cpu
Time: Aug 12, 2018 at 11:45am (CST)
Duration: 10s, Total samples = 40ms ( 0.4%)
Entering interactive mode (type "help" for commands, "o" for options)

注意传入对应的二进制文件,否则可能无法找到相应的方法

go tool pprof 常用命令 top / list,其中 list 方法是正则匹配的,能显示匹配上的方法的 profile 信息

Secure of pprof

注意到之前均使用的是 http 的 Protocol 访问 pprof 接口,如果 server 是 https 该怎么办?

搜索得知 golang 很快便会支持 go tool pprof with client certificates 了

cmd/pprof: add HTTPS support with client certificates: https://github.com/golang/go/issues/20939

Support HTTPS certs, keys and CAs: https://github.com/google/pprof/pull/261

当然如果 server 不要求 client certificates 的话,可以如此使用 go tool pprof 获取数据(注意 https+insecure)

1
go tool pprof -seconds 5 https+insecure://192.168.99.100:32473/debug/pprof/profile

如果要求 client certificates 的话,亦或是日常使用时,其实也没必要直接用 go tool pprof 获取数据,使用 wget / curl 同样可以下载,下载之后再使用 go tool pprof 或者是 go-torch 分析好了

而 curl 显然是支持传入 ca.crt / tls.crt / tls.key 的

1
curl --cacert ca.crt --cert ./tls.crt --key tls.key https://192.168.99.100:32473/debug/pprof/profile -O profile-data

Visual pprof

go tool pprof 命令行模式,并不是特别直观,如果可以图形化的展示各个方法的消耗情况,那么将能更快的确定问题所在

  • graphviz
1
brew install graphviz

安装 ok graphviz 之后

1
2
3
4
5
6
7
bash-3.2$ go tool pprof $GOPATH/src/github.com/coreos/etcd/bin/etcd etcd-profile-10
File: etcd
Type: cpu
Time: Aug 12, 2018 at 11:45am (CST)
Duration: 10s, Total samples = 40ms ( 0.4%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) web

即可在浏览器中显示 .svg 文件,浏览器中 Ctrl+s 保存到本地,即可传阅

web-pprof

  • go-torch

大名鼎鼎的火焰图 (flame-graph)

1
go get github.com/uber/go-torch

clone brandangregg 的火焰图生成脚本

1
git clone git@github.com:brendangregg/FlameGraph.git

生成火焰图

1
2
3
4
5
bash-3.2$ export PATH=$PATH:$GOPATH/src/github.com/brendangregg/FlameGraph
bash-3.2$
bash-3.2$ $GOPATH/bin/go-torch --file "torch.svg" etcd-profile-10
INFO[13:12:17] Run pprof command: go tool pprof -raw -seconds 30 etcd-profile-10
INFO[13:12:17] Writing svg to torch.svg

浏览器打开 torch.svg 即可

torch

个人觉得 flame-graph 更为直观,横向为各个方法消耗占比,纵向为调用栈上的各个方法消耗占比,一目了然,对应分析消耗较大的方法即可

Ref

golang pprof https://blog.golang.org/profiling-go-programs

pprof tools http://colobu.com/2017/03/02/a-short-survey-of-golang-pprof/

top

top 按 CPU 排序

1
2
top
shift + P

top 按 MEM 排序

1
2
top
shift + M

java utils

为 jdk 设置 JAVA_HOME
设置 jdk bin 至 PATH 中

1
2
3
4
// 查看 java 进程堆及 GC 情况
jstat
// 查看 java 进程中的线程情况
jstack

最后一顿排查,jstat 查看了堆内存情况,发现是 tomcat 启动参数 -Xms -Xmx 设置过大了,同一节点上还有其他进程,其他进程占用内存比较猛

Final, cheers !~

通过 part-1/2/3/4 的分析,可以确认 Server 这边的逻辑 ok,那么现在的确认问题的手段只能沿着网络路径逐级抓包了。此篇重点讲述如何在 Wireshark https://www.wireshark.org/download.html 中分析 WebSocket 流量

当然网上有挺多介绍,这里还是再说一遍,是为啥?因为其他文章大多数都是讲解的 ws:// 的,而现在我们面临的是 wss:// 的,显然有很大的不同

所以呢,简单在 display filter 中输入 websocket 是没法过滤出 WebSocket 流量的,毕竟 TLS encrypted 之后看到的全是 TCP 流量

ENV

1
2
3
Wireshark: 2.6.1
OS: macOS Sierra 10.12.6
WebSocketSite: https://www.websocket.org/echo.html

SSL decryted in WireShark

official doc:https://wiki.wireshark.org/SSL#Usingthe.28Pre.29-Master-Secret

step by step guide: https://jimshaver.net/2015/02/11/decrypting-tls-browser-traffic-with-wireshark-the-easy-way/

照着链接二配置一下即可

Capture network trafic through WireShark

1
2
3
export SSLKEYLOGFILE=/Users/zrss/Documents/BrowserSSLKeyLog/sslkeylog.log
open -a "Google Chrome"
wireshark

访问 https://www.websocket.org/echo.html,loading ok 之后

开启 Wireshark 捕获当前网卡流量

单击 Connect 连接 Server WebSocket,连接建立后,发送 Rock it with HTML5 WebSocket 消息,如下图所示

echo WebSocket

停止 Wireshark 捕获,display filter 中输入 http,寻找到 info 中有 101 Web Socket Protocol Handshake 字样的报文,右键 Follow 选中 SSL Stream 即可查看 WebSocket 的流量,如下图所示

websocket traffic

可见 WebSocket 为 TCP 之上与 HTTP 同层的应用层协议

  • TCP 三次握手建立 TCP 连接
  • SSL 握手协商加密机制
  • WebSocket 握手 (HTTP Upgrade) 建立 WebSocket 连接
  • 客户端 send MASKED WebSocket 上行报文
  • 服务端 echo WebSocket 下行报文

另外需要注意的是在 SSL 握手协商加密机制时,服务器端选择的加密套件为 TLS_RSA_WITH_AES_128_CBC_SHA (在 Server Hello 报文中可见)

为啥提到这个算法,因为在测试的时候,一开始是使用 https://socket.io/demos/chat/ 测试的,从 Chrome F12 控制台中可以看到有两个 WebSocket 请求,然而 Wireshark 似乎只能 decrypt 其中一个请求,而该请求服务器端选择的加密套件为 TLS_AES_128_GCM_SHA256

另外一个请求 (实际上的 chat 请求) 未能 decrypt,呃,不过不知道为啥,反复尝试了几次后,啥都 decrypt 不了了

Best Practice

所以这个 decrypt 实际上不一定靠谱,主要还是需要在生产环境上使用 tcpdump 工具抓取来自特定源 IP 的流量,然后通过与正常情况下的流量相比,识别出为 WebSocket 的流量,逐级排查,找到在哪一级组件上 WebSocket 报文丢掉即可

注意到 WebSocket RFC https://tools.ietf.org/html/rfc6455 中提到每个 WebSocket 请求均要求建立一个连接,另外从 Tomcat 7 WebSocket 实现上,可知每个 WebSocket 连接均会建立一个新的 socket 连接

因此在 Wireshark 中首先过滤出 SSL 的 Client Hello 报文

再通过 Client Hello 报文中的 srcport 字段过滤报文 (或者右键 Follow -> SSL Stream),正常的 WebSocket 报文模式,如下

  • SSL Handshake
  • HTTP Upgrade
  • HTTP Continuation

当然需要客户端构造容易识别的 WebSocket 流量模式,我在测试时,一般会持续输入某个字符,因此会有持续的 HTTP Continuation 报文

Summary

WebSocket 在生产环境中使用,最好不复用 HTTPS 443 端口,即 WebSocket 使用独立的网络架构,不复用之前 HTTP 的网络架构。毕竟 HTTP 的网络路径,一路上有各种防火墙,可得小心了

另外还发现了一个有趣的项目 noVNC https://github.com/novnc/noVNC,提供了在界面上远程登录主机的功能,而我们知道大多数 VNC Server 也支持 WebSocket 协议,因此 noVNC 也使用了 WebSocket 协议传输数据,要不支持的 Server,noVNC 有一个子项目:websockify https://github.com/novnc/websockify,将 WebSocket 流量转为 Socket 流量,以兼容不支持 WebSocket 协议的 VNC Server,有时间再研究一下了

The end of WebSocket in Tomcat 7 series

Other Ref

http://jsslkeylog.sourceforge.net/

现在我们知道 Server 端会首先进入 onOpen 状态,随后会持续从 socket 中获取 WebSocket Frame 组装成 Message 后,回调业务层 onMessage 方法

而 Client 端在接收到 101 status code 之后,也会进入 onOpen 状态,典型的 Client 端实现如下

1
2
3
4
5
6
7
8
9
10
// Create WebSocket connection.
const socket = new WebSocket('wss://remoteAddress:443');
// Connection opened
socket.addEventListener('open', function (event) {
socket.send('Hello Server!');
});
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});

https://developer.mozilla.org/en-US/docs/Web/API/WebSocket

而我遇到的问题是,web terminal 一般实现,会在 onOpen 中发送一个设置 terminal size 的 message,例如 k8s 中可以发送如下消息

1
2
3
4
// Connection opened
socket.addEventListener('open', function (event) {
socket.send('4{"Width":80,"Height":20}');
});

该消息不能被立即发送,若立即发送,则会导致 Tomcat side ServerEndpoint onMessage 一直未被调用

而后端 k8s side 的数据能推送至 Client side,具体来说 Client side 能接收到 k8s side 推送回来的初始化空数据,而 Client side 发送的数据帧 ServerEndpoint onMessage 一直未被调用

说明 WebSocket 链接是没问题的,于是乎 Client 发送的数据帧被发送到了哪里?

之前的几篇讨论中,其实我是判定如果 ServerEndpoint onOpen stuck 住,会出现该问题。然而实际情况可以说明,既然后端的数据可以推送至客户端,证明 onOpen 已经执行结束

ServerEndpoint onOpen 时执行的逻辑如下

1
2
3
4
5
6
7
static hashmap frontEndWsSessionMap;
static hashmap backEndWsSessionMap;
onOpen(session) {
wsBackEndSession = connectToBackEnd(...);
backEndWsSessionMap.put(wsBackEndSession.getId(), session);
frontEndWsSessionMap.put(session.getId(), wsBackEndSession);
}

而通过查看代码知道 connectToBackEnd,即 Server side 实现 WebSocket 请求时,用了两个新的线程,其中一个用于接收数据,另一个用于获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Switch to WebSocket
WsRemoteEndpointImplClient wsRemoteEndpointClient = new WsRemoteEndpointImplClient(channel);
WsSession wsSession = new WsSession(endpoint, wsRemoteEndpointClient,
this, null, null, null, null, null, extensionsAgreed,
subProtocol, Collections.<String,String>emptyMap(), secure,
clientEndpointConfiguration);
WsFrameClient wsFrameClient = new WsFrameClient(response, channel,
wsSession, transformation);
// WsFrame adds the necessary final transformations. Copy the
// completed transformation chain to the remote end point.
wsRemoteEndpointClient.setTransformation(wsFrameClient.getTransformation());
endpoint.onOpen(wsSession, clientEndpointConfiguration);
registerSession(endpoint, wsSession);
/* It is possible that the server sent one or more messages as soon as
* the WebSocket connection was established. Depending on the exact
* timing of when those messages were sent they could be sat in the
* input buffer waiting to be read and will not trigger a "data
* available to read" event. Therefore, it is necessary to process the
* input buffer here. Note that this happens on the current thread which
* means that this thread will be used for any onMessage notifications.
* This is a special case. Subsequent "data available to read" events
* will be handled by threads from the AsyncChannelGroup's executor.
*/
wsFrameClient.startInputProcessing();

链接建立之后,会立即调用 wsFrameClient.startInputProcessing(); 处理当前 response 中的数据,即调用 ClientEndpoint onMessage 方法

后续的数据处理由线程池中的读线程完成

1
2
3
4
onMessage(String message, session) {
frontEndSession = backEndWsSessionMap.get(session.getId());
frontEndSession.getBasicRemote.sendText(message);
}

大致流程也是类似的,从 socketOutput 中持续获取数据,组装 okay 之后,回调 ClientEndpoint onMessage 方法

综上,上述逻辑 okay,并不是在 onOpen 中 stuck

至此与问题现象对比之后,可以认为如果问题出现 Tomcat 侧,则仅可能为 onDataAvailable 之后,一直未能从 socketInputStream 中获取到数据

如果可以抓包分析的话,抓包最为简单,之所以费这么大劲儿分析 Tomcat WebSocket 的实现,实际上是因为全是 TLS 流量,so 抓到的全是 tcp 包,根本没法区分

尝试过导入 Nginx 的私钥,也没法解,听 Nginx 的同事说是 Nginx 与 Server 之间还会动态协商一个私钥 … 醉了

如果能确定 Client 的 WebSocket 包都发送到达 Tomcat 了,这也可以确认是 Server side 的问题,然鹅

绝望 … so sad

回答上篇末尾的调用时序问题

Tomcat 7 默认使用 BIO Http11Protocol

注意到在 Http11Protocol 的 process 方法中(AbstractProtocol)有如下逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
do {
// ...
else if (processor.isUpgrade()) {
// Init WebSocket OPEN_READ processor httpUpgradeHandler is null
// And when the second round came, upgradeDispatch will dispatch OPEN_READ status
// ha, upgradeServletInputStream.onDataAvailable() will be called
state = processor.upgradeDispatch(status);
// but, pay attention to that fact
// WsFrameServer.onDataAvailable is a for loop so it will remain in it
// that's the reason why we can't see the state of Upgraded return
} else {
// Init WebSocket OPEN_READ will go through here
state = processor.process(wrapper);
// Finally adapter.service(request, response) will be called in processor.process
// then the request will go through filter
// Once it arrives the WsFilter
// and it will call upgrade method with a WsHttpUpgradeHandler in WsFilter
// That will instance a WsHttpUpgradeHandler and call action Upgrade hook
// That's the time httpUpgradeHandler be set to WsHttpUpgradeHandler and no more null value
// So isUpgrade() return true and state will become
// SocketState.UPGRADING
// And it will create a new processor named upgradeProcessor to replace previous processor
// in the state of SocketState.UPGRADING in following code
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
// it means that if it is a WebSocket Request
// it turns out to be once debug info
// Status in: OPEN_READ, State out: UPGRADING
}
} while (state == SocketState.UPGRADING);
// ...
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
httpUpgradeHandler.init((WebConnection) processor);
}

所以 Tomcat 7 WebSocket 实现上的时序是正确的,大致的请求处理流程如下

  • Client 与 OS 特定端口建立 Connection …
  • Http11Protocol.process 被调用,传入 OPEN_READ status
  • processor.process(wrapper) 被调用
  • WsFilter 被调用,发现为 HTTP Upgrade to websocket request,设置 httpUpgradeHandler 为 WsHttpUpgradeHandler
  • processor.process(wrapper) 返回 UPGRADING state
  • Http11Protocol.process 创建新的 upgradeProcessor 以代替之前的 processor
  • 调用 WsHttpUpgradeHandler.init 方法
  • init 方法执行
    • 在 sos 上注册 WsWriteListener 方法
    • 调用 onOpen 方法
    • 在 sis 上注册 WsReadListener 方法
  • status 仍然为 OPEN_READ
  • processor.upgradeDispatch(status) 被调用,for loop socketInputStream

上述过程均在同一线程中执行,Tomcat 7 Http11Protocol 实现的是简单的处理模型,Acceptor 获取 socket,当有新的 socket 连接时,使用一个新线程去处理。

现在我们可以给出 WebSocket ServerEndpoint 的精确时序了

  • SocketState OPEN_READ 后 ServerEndpoint onOpen 被调用
  • WsFrameServer onDataAvailable 被调用
  • onDataAvailable 组装好 Message 后 ServerEndpoint onMessage 被调用

因此即使 onOpen 执行时间过长,数据也只是被累积在 socket 输入缓冲区中,一旦执行结束后,依然能触发 onDataAvailable,从而回调 ServerEndpoint onMessage

另一方面也说明了,onOpen 首先被执行,onMessage 其次被执行的时序

值得注意的是 WebSocket 一旦成功 upgradeDispatch(OPEN_READ) state 后,逻辑将会停留在循环从 socketInputStream 获取数据上

而我们知道 WebSocket 为双工协议,那么 OPEN_WRITE 状态什么时候被 upgradeDispatch ? 这不被 upgradeDispatch 的话,Server side 就没法向 Client side 推送数据了?

WsRemoteEndpointImplServer onWritePossible

从 byteBuffers 中读取字节,并写入 socketOutputStream 中,byteBuffers 读取 complete 发送完成后,循环退出

这仅是 onWritePossible 自身的逻辑

而实际上如果是 upgradeDispatch(OPEN_WRITE) trigger onWritePossible(useDispatch: false)

WsRemoteEndpointImplServer doWrite trigger onWritePossible(useDispatch: true)

耐人寻味

注意到我们使用 wsSession.getBasicRemote().sendText() 发送消息,实际上最后调用的为 doWrite 方法,所以逻辑就清晰了,实际上并不一定需要 upgradeDispatch(OPEN_WRITE) 才能写入,只不过在实现上,通过 upgradeDispatch(OPEN_WRITE) 执行的 doWrite 与在 onOpen / onMessage 中使用 wsSession 直接写入的 doWrite 传入参数不同,均能完成写入

  • upgradeDispatch(OPEN_WRITE): onWritePossible(useDispatch: false)
  • wsSession.getBasicRemote().sendText(…): onWritePossible(useDispatch: true)

summary

这块逻辑看的还不是特别清晰,主要是为了没有 OPEN_WRITE state 也是可以写入的这个结论

所以完整的 WebSocket 请求流程

  • 客户端发起 WebSocket 请求时,需要新建一个链接
  • Tomcat 接收到 socket 链接后,按通用处理流程处理之
  • WebSocket 的第一个请求为 HTTP GET Upgrade 请求,这个请求在通用处理流程中经过 WsFilter
  • WsFilter 识别到该请求为 WebSocket 请求,随后设置 WsHttpUpgradeHandler 为其 httpUpgradeHandler,调用 upgrade 方法,并 preInit WsHttpUpgradeHandler
  • Tomcat 发现当前 socket state 变为 UPGRADING,因此创建 upgradeProcessor 以替换之前的 http processor,此时会执行 WsHttpUpgradeHandler init 方法
  • WsHttpUpgradeHandler init 方法中会回调 ServerEndpoint onOpen 方法
  • Tomcat 继续处理时发现 processor 为 upgradeProcessor,因此调用 upgradeDispatch(OPEN_READ)
  • 这时触发 WsHttpUpgradeHandler.onDataAvailable(),随即继续调用至 WsFrameServer.onDataAvailable()
  • WsFrameServer.onDataAvailable() 尝试获取对象锁之后,进入 for loop 获取 socketInputStream 输入逻辑,组装好数据后,回调 ServerEndpoint onMessage 方法,即业务层此时可以感知到 Client 发送的数据
  • 到这里当前线程就一直在干读取 socket 中的数据并调用 onMessage 这件事儿了

接着上一篇,来分析一下 Tomcat 7 中 WebSocket 的实现

WsFilter

WebSocket 的实现入口处为 WsFilter,该 Filter 检查 HTTP header 中是否包含下述字段

1
Upgrade: websocket

且为 HTTP GET 请求。若符合 WebSocket Handshake 要求,则从 WebSocketContainer 中查找请求路径是否有 filter 拦截,若没有则继续后续的 filter,若有则进入 UpgradeUtil.doUpgrade 方法

WsServerContainer

回头来看 WsServerContainer 的初始化,当然最为重要的是注册了 WsFilter

1
2
3
4
5
6
FilterRegistration.Dynamic fr = servletContext.addFilter(
"Tomcat WebSocket (JSR356) Filter", new WsFilter());
fr.setAsyncSupported(true);
EnumSet<DispatcherType> types = EnumSet.of(DispatcherType.REQUEST,
DispatcherType.FORWARD);
fr.addMappingForUrlPatterns(types, true, "/*");

可见 WsFilter 拦截所有请求,当遇到 HTTP Upgrade to websocket 协议的请求时执行 doUpgrade 逻辑

UpgradeUtil.doUpgrade

doUpgrade 在各种检查后,可以接受 Client Upgrade 请求时,需向 Client 端返回的 HTTP 报文头中添加如下字段(Sec-WebSocket-Protocol / Sec-WebSocket-Extensions 可选)

1
2
3
4
5
Upgrade: websocket
Connection: upgrade
Sec-WebSocket-Accept: [key]
Sec-WebSocket-Protocol: [subProtocol]
Sec-WebSocket-Extensions: [extensions]

当然还需要初始化 ServerEndpoint 实例(@ServerEndpoint 注解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Endpoint ep;
try {
Class<?> clazz = sec.getEndpointClass();
if (Endpoint.class.isAssignableFrom(clazz)) {
ep = (Endpoint) sec.getConfigurator().getEndpointInstance(
clazz);
} else {
ep = new PojoEndpointServer();
// Need to make path params available to POJO
perSessionServerEndpointConfig.getUserProperties().put(
PojoEndpointServer.POJO_PATH_PARAM_KEY, pathParams);
}
} catch (InstantiationException e) {
throw new ServletException(e);
}

最后向 Client 返回 HTTP Response

1
2
3
4
5
WsHttpUpgradeHandler wsHandler =
((RequestFacade) inner).upgrade(WsHttpUpgradeHandler.class);
wsHandler.preInit(ep, perSessionServerEndpointConfig, sc, wsRequest,
negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,
req.isSecure());

而 WsHttpUpgradeHandler 则会用于处理

1
The handler for all further incoming data on the current connection.

WsHttpUpgradeHandler

WsHttpUpgradeHandler init 方法中干了很多事情

  • 首先从 WebConnection 中获取输入流/输出流
1
2
3
4
5
6
7
8
9
this.connection = connection;
AbstractServletInputStream sis;
AbstractServletOutputStream sos;
try {
sis = connection.getInputStream();
sos = connection.getOutputStream();
} catch (IOException e) {
throw new IllegalStateException(e);
}
  • 实例化 WsSession,注意 SessionId 使用一个 static AtomicLong 维护,每次增加 1
  • 实例化 WsFrameServer,用于读写 Message
  • 在 sos 上注册 WsWriteListener
  • 调用 ServerEndpoint onOpen 方法
  • 在 sis 上注册 WsReadListener

summary

所以综上所述,每一次 HTTP Upgrade 请求均会创建一个新的 ServerEndpoint 实例,因此定义于 ServerEndpoint 中的 static 变量需注意确保线程安全

另外 ServerEndpoint 中 onOpen 和 onMessage 的执行顺序为 onOpen 必然首先执行,若 onOpen 执行时间过长,则就算 sis 中有数据等待处理,也不会触发 onMessage,因为从 WsHttpUpgradeHandler init 方法中可以看出 onOpen 调用结束后,才会在 sis 上注册 WsReadListener。接下来继续分析,如何触发 WsReadListener

Connector

AbstractServiceInputStream.onDataAvailable() 方法中调用 listener.onDataAvailable(); 即 WsReadListener.onDataAvailable()

而 AbstractServiceInputStream.onDataAvailable() 又由 AbstractProcessor.upgradeDispatch(SocketStatus status) 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public final SocketState upgradeDispatch(SocketStatus status)
throws IOException {
if (status == SocketStatus.OPEN_READ) {
try {
upgradeServletInputStream.onDataAvailable();
} catch (IOException ioe) {
// The error handling within the ServletInputStream should have
// marked the stream for closure which will get picked up below,
// triggering the clean-up of this processor.
getLog().debug(sm.getString("abstractProcessor.onDataAvailableFail"), ioe);
}
} else if (status == SocketStatus.OPEN_WRITE) {
try {
upgradeServletOutputStream.onWritePossible();
} catch (IOException ioe) {
// The error handling within the ServletOutputStream should have
// marked the stream for closure which will get picked up below,
// triggering the clean-up of this processor.
getLog().debug(sm.getString("abstractProcessor.onWritePossibleFail"), ioe);
}
} else if (status == SocketStatus.STOP) {
try {
upgradeServletInputStream.close();
} catch (IOException ioe) {
getLog().debug(sm.getString(
"abstractProcessor.isCloseFail", ioe));
}
try {
upgradeServletOutputStream.close();
} catch (IOException ioe) {
getLog().debug(sm.getString(
"abstractProcessor.osCloseFail", ioe));
}
return SocketState.CLOSED;
} else {
// Unexpected state
return SocketState.CLOSED;
}
if (upgradeServletInputStream.isCloseRequired() ||
upgradeServletOutputStream.isCloseRequired()) {
return SocketState.CLOSED;
}
return SocketState.UPGRADED;
}

Tomcat 7 默认使用 BIO,Http11Protocol.createUpgradeProcessor,其中将 socket 超时时间设置为不超时,并返回一个 Processor

因此 Tomcat 处理 WebSocket 请求的大致流程为

  • JIOEndpoint accept socket and new a thread to handle it
  • Worker wait for the next socket to be assigned and call handler to process socket
  • Http11ConnectionHandler
  • Http11Processor

客户端首先打开一个 connection,connection 建立后,向服务器端发起 WebSocket Handshake 请求,服务器接受后,返回 101 status code,双方可在当前 connection 上双工通信

当 SocketStatus 为 OPEN_READ 时,回调 readListener 的 onDataAvailable 方法,此处逻辑有 trick 的地方,值得注意的是如果 SocketStatus.OPEN_READ 时,仍未完成注册 readListener,则不会触发 listener.onDataAvailable() … 显然,因为 listener 为 null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final void onDataAvailable() throws IOException {
if (listener == null) { // it doesn't have a listener
return;
}
ready = Boolean.TRUE;
Thread thread = Thread.currentThread();
ClassLoader originalClassLoader = thread.getContextClassLoader();
try {
thread.setContextClassLoader(applicationLoader);
listener.onDataAvailable();
} finally {
thread.setContextClassLoader(originalClassLoader);
}
}

在 WsFrameServer 的 onDataAvailable 方法中首先尝试获取对象锁,获取成功后,for loop 监听 Servlet 输入流,当有数据时读取数据供 WsFrameServer 处理,处理 okay 后,回调 ServerEndpoint 的 onMessage 方法,业务层即感知到从 ws 连接中获取到数据

另外 ServerEndpoint onOpen 是在 WsHttpUpgradeHandler init 方法中被回调,看看官方文档对 handler 的 init 方法的描述

This method is called once the request/response pair where the upgrade is initiated has completed processing and is the point where control of the connection passes from the container to the HttpUpgradeHandler.

https://tomcat.apache.org/tomcat-7.0-doc/api/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.html

so 理论上要想使得 Tomcat 7 WebSocket 能正常工作的前提为

  • WsHttpUpgradeHandler init 方法被调用 —— WsHttpUpgradeHandler
  • 在 sos 上注册 WsWriteListener 方法结束 —— WsHttpUpgradeHandler
  • SocketStatus.OPEN_WRITE —— AbstractProcessor
  • onOpen 方法回调结束 —— ServerEndpoint
  • 在 sis 上注册 WsReadListener 方法结束 —— WsHttpUpgradeHandler
  • SocketStatus.OPEN_READ —— AbstractProcessor

接下来需要搞清楚一个问题,上述这些逻辑是单线程在跑,还是多线程,单线程的话,时序问题不大,但是多线程的情况下,就很有讲究了

To be cont. 下一遍回答上述时序问题

又是一部折腾史

之前提到了最近在基于 WebSocket 协议实现 WebTerminal 特性,无奈生产环境遇到了一个 weired bug,百思不得其解,只好看看 WebSocket 在 Tomcat 7 中是如何实现的

环境/版本信息

1
2
3
OS: macOS Sierra 10.12.6
Tomcat: 7.0.88
Intellij: 2016.2

jenv 管理 java 版本

1
2
3
4
5
> jenv versions
system
* 1.6.0.65 (set by /Users/zrss/.jenv/version)
1.7.0.181
1.8.0.172

当前 java 版本

1
2
> jenv global
1.6.0.65

ant 版本

1
2
> ant -version
Apache Ant(TM) version 1.9.12 compiled on June 19 2018

Set up Tomcat src code in Intellij

参考如下官方构建指南即可

http://tomcat.apache.org/tomcat-7.0-doc/building.html

第一步当然是下载源码 http://tomcat.apache.org/download-70.cgi#7.0.88
,当前源码地址 http://mirrors.hust.edu.cn/apache/tomcat/tomcat-7/v7.0.88/src/apache-tomcat-7.0.88-src.tar.gz

解压后目录结论如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apache-tomcat-7.0.88-src
├── BUILDING.txt
├── CONTRIBUTING.md
├── KEYS
├── LICENSE
├── NOTICE
├── README.md
├── RELEASE-NOTES
├── RUNNING.txt
├── STATUS.txt
├── bin
├── build.properties.default
├── build.xml
├── conf
├── java
├── modules
├── res
├── test
└── webapps

Tomcat 诞生的年代比较久远,还是用的比较古老的构建工具 ant,复制 build.properties.default 至 build.properties

1
cp build.properties.default build.properties

Tomcat7 WebSocket 依赖 java7,因此需要设置 build.properties 中的 java7 path。取消该文件中的下述定义注释(54行),并填写相应 jdk 路径

1
java.7.home=/Library/Java/JavaVirtualMachines/zulu-7.jdk/Contents/Home

如果使用 jenv 管理 java 版本,可使用如下命令查看当前 java 的 java_home path
/usr/libexec/java_home -v $(jenv version-name)

确认当前 java 版本为 1.6

1
2
3
4
> java -version
java version "1.6.0_65"
Java(TM) SE Runtime Environment (build 1.6.0_65-b14-468)
Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-468, mixed mode)

设置 JAVA_HOME(make sure it is a 1.6 java’s home)

1
2
3
> export JAVA_HOME=$(/usr/libexec/java_home -v $(jenv version-name))
> echo $JAVA_HOME
/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home

当然国内的小伙伴还要多做一件事儿,那就是配置 proxy

1
2
3
proxy.use=on
proxy.host=127.0.0.1
proxy.port=1081

这个 proxy 方案也是非常常见的了,shadowsocks (socks5) + privoxy (http)

最后进入 Tomcat 源码目录执行 ant 命令构建

1
2
cd /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src
ant

jar 包下载顺利的话,无问题

1
2
BUILD SUCCESSFUL
Total time: 23 seconds

Import Tomcat src to Intellij

在 Tomcat 源码目录执行,如若遇到因为 SSL 无法下载的依赖,手动下载之,并放入 testexist 路径(讲真,是个体力活儿,得有六七个包吧)

1
ant ide-eclipse

最后终于成功了

1
2
BUILD SUCCESSFUL
Total time: 1 second

构建 websocket eclipse

1
ant ide-eclipse-websocket

顺利成功

1
2
BUILD SUCCESSFUL
Total time: 1 second

到此可以开始导入 IntelliJ 了

IntelliJ 欢迎页面选择 Import Project,在弹出框中选择 Tomcat 源码根路径,一路 Next 至 Select Eclipse projects to import,勾选上 tomcat-7.0.x 继续 Next,最后 Project SDK 选择 1.6 即可

WebSocket 代码在如下路径

1
/Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/java/org/apache/tomcat/websocket

最后查看 Tomcat version 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> cd output/build/bin
> ./version.sh
Using CATALINA_BASE: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build
Using CATALINA_HOME: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build
Using CATALINA_TMPDIR: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/temp
Using JRE_HOME: /Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
Using CLASSPATH: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/bootstrap.jar:/Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/tomcat-juli.jar
Server version: Apache Tomcat/7.0.88
Server built: Jul 6 2018 14:30:23 UTC
Server number: 7.0.88.0
OS Name: Mac OS X
OS Version: 10.12.6
Architecture: x86_64
JVM Version: 1.6.0_65-b14-468
JVM Vendor: Apple Inc.

至此 Tomcat 7 导入 IntelliJ 中 okay,可以愉快的查看代码了。当然查看 WebSocket 相关的实现时,在 IntelliJ Project Structure 中切换 SDK 至 1.7 即可

又要开始分析一个疑难杂症了

现象是 100 完成数,40 并发度的 job 无法执行结束,两个 pod 一直处于 running 状态,查看发现 container 已退出

job-controller 的工作原理

job-controller 较为短小精悍,属于 kube-controller-manager 代码中的一部分

1
2
3
4
5
go job.NewJobController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.Options.ConcurrentJobSyncs), ctx.Stop)

从启动代码中可以看出 job-controller 关注 pod 与 job 这两种资源的变化情况

  • ctx.InformerFactory.Core().V1().Pods()
  • ctx.InformerFactory.Batch().V1().Jobs()
1
ConcurrentJobSyncs=5

启动 5 个 worker goroutine

1
2
3
for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh)
}

在 worker 协程中,1s 执行一次 worker 方法

worker 方法,实现从 queue 中获取待处理的对象(key),并调用 syncJob 方法处理之

  • syncJob 处理成功
1
2
3
4
5
6
7
8
9
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
if job := cur.(*batch.Job); !IsJobFinished(job) {
jm.enqueueController(job)
}
},
DeleteFunc: jm.enqueueController,
})
1
2
3
4
5
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
})

kubelet

大致来看与 job-controller 的关系不大,可能还是 kubelet 的问题

我们知道 kubelet 通过 PLEG 来感知节点上 container 的状态,另外 job pod 的 restart policy 目前仅支持两种 Never / onFailure,一般来说默认选择 onFailure 比较合适

这样业务容器在正常退出后(exit 0),kubelet pleg 感知到后,再加上 onFailure 的策略,正常情况下会 killing pod,job-controller 感知到 pod 减少后,即完成数又增加了 1,即可成功结束

看下 kubelet 的代码确认逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Get all the pods.
podList, err := g.runtime.GetPods(true)
ShouldContainerBeRestarted

// Check RestartPolicy for dead container
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, format.Pod(pod))
return false
}
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
// Check the exit code.
if status.ExitCode == 0 {
glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, format.Pod(pod))
return false
}
}
return true

如此情况下需要 KillPod

1
2
3
if keepCount == 0 && len(changes.ContainersToStart) == 0 {
changes.KillPod = true
}

所以正常情况下 job 控制的 pod,重启策略为 RestartPolicyOnFailure,如是正常退出的情况,则该 container 无需重启,而再加上述的判断,则当前 pod 需要被删除

调用该 killPodWithSyncResult 方法

经过如此分析,可能出现的问题原因及疑点

  • 2 个 pod 处于 running 状态,而对应的 container 却没有了,尝试从 relist: g.runtime.GetPods(true) 寻找可能原因
  • 使用命令行如何获取节点上的 container 容器,如果 container 正常获取,且 pod 对应的 container 已正常退出,那么为何未看到 SyncLoop(PLEG) ContainerDied 事件
  • pod 状态也是持续 running,而我们知道在 syncPod 中会调用 statusManager 设置新的 pod status,如果能获取到正确的 container info,pod status 也会被正确的更新至 kube-apiserver
  • 正常情况下,至多为一个 pod 保留一个 container 记录,其余均会被 clean。为何当时 docker ps -a 无一相关的 container,全都被 clear 了?那意味着 node evicted,或者是 pod 被 delete 了。evicted 的可能性较小,而 pod 被 delete 的话,除了人为,job-controller activeDeadline 超了也会设置
0%