0%

现在我们知道 Server 端会首先进入 onOpen 状态,随后会持续从 socket 中获取 WebSocket Frame 组装成 Message 后,回调业务层 onMessage 方法

而 Client 端在接收到 101 status code 之后,也会进入 onOpen 状态,典型的 Client 端实现如下

1
2
3
4
5
6
7
8
9
10
// Create WebSocket connection.
const socket = new WebSocket('wss://remoteAddress:443');
// Connection opened
socket.addEventListener('open', function (event) {
socket.send('Hello Server!');
});
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('Message from server ', event.data);
});

https://developer.mozilla.org/en-US/docs/Web/API/WebSocket

而我遇到的问题是,web terminal 一般实现,会在 onOpen 中发送一个设置 terminal size 的 message,例如 k8s 中可以发送如下消息

1
2
3
4
// Connection opened
socket.addEventListener('open', function (event) {
socket.send('4{"Width":80,"Height":20}');
});

该消息不能被立即发送,若立即发送,则会导致 Tomcat side ServerEndpoint onMessage 一直未被调用

而后端 k8s side 的数据能推送至 Client side,具体来说 Client side 能接收到 k8s side 推送回来的初始化空数据,而 Client side 发送的数据帧 ServerEndpoint onMessage 一直未被调用

说明 WebSocket 链接是没问题的,于是乎 Client 发送的数据帧被发送到了哪里?

之前的几篇讨论中,其实我是判定如果 ServerEndpoint onOpen stuck 住,会出现该问题。然而实际情况可以说明,既然后端的数据可以推送至客户端,证明 onOpen 已经执行结束

ServerEndpoint onOpen 时执行的逻辑如下

1
2
3
4
5
6
7
static hashmap frontEndWsSessionMap;
static hashmap backEndWsSessionMap;
onOpen(session) {
wsBackEndSession = connectToBackEnd(...);
backEndWsSessionMap.put(wsBackEndSession.getId(), session);
frontEndWsSessionMap.put(session.getId(), wsBackEndSession);
}

而通过查看代码知道 connectToBackEnd,即 Server side 实现 WebSocket 请求时,用了两个新的线程,其中一个用于接收数据,另一个用于获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Switch to WebSocket
WsRemoteEndpointImplClient wsRemoteEndpointClient = new WsRemoteEndpointImplClient(channel);
WsSession wsSession = new WsSession(endpoint, wsRemoteEndpointClient,
this, null, null, null, null, null, extensionsAgreed,
subProtocol, Collections.<String,String>emptyMap(), secure,
clientEndpointConfiguration);
WsFrameClient wsFrameClient = new WsFrameClient(response, channel,
wsSession, transformation);
// WsFrame adds the necessary final transformations. Copy the
// completed transformation chain to the remote end point.
wsRemoteEndpointClient.setTransformation(wsFrameClient.getTransformation());
endpoint.onOpen(wsSession, clientEndpointConfiguration);
registerSession(endpoint, wsSession);
/* It is possible that the server sent one or more messages as soon as
* the WebSocket connection was established. Depending on the exact
* timing of when those messages were sent they could be sat in the
* input buffer waiting to be read and will not trigger a "data
* available to read" event. Therefore, it is necessary to process the
* input buffer here. Note that this happens on the current thread which
* means that this thread will be used for any onMessage notifications.
* This is a special case. Subsequent "data available to read" events
* will be handled by threads from the AsyncChannelGroup's executor.
*/
wsFrameClient.startInputProcessing();

链接建立之后,会立即调用 wsFrameClient.startInputProcessing(); 处理当前 response 中的数据,即调用 ClientEndpoint onMessage 方法

后续的数据处理由线程池中的读线程完成

1
2
3
4
onMessage(String message, session) {
frontEndSession = backEndWsSessionMap.get(session.getId());
frontEndSession.getBasicRemote.sendText(message);
}

大致流程也是类似的,从 socketOutput 中持续获取数据,组装 okay 之后,回调 ClientEndpoint onMessage 方法

综上,上述逻辑 okay,并不是在 onOpen 中 stuck

至此与问题现象对比之后,可以认为如果问题出现 Tomcat 侧,则仅可能为 onDataAvailable 之后,一直未能从 socketInputStream 中获取到数据

如果可以抓包分析的话,抓包最为简单,之所以费这么大劲儿分析 Tomcat WebSocket 的实现,实际上是因为全是 TLS 流量,so 抓到的全是 tcp 包,根本没法区分

尝试过导入 Nginx 的私钥,也没法解,听 Nginx 的同事说是 Nginx 与 Server 之间还会动态协商一个私钥 … 醉了

如果能确定 Client 的 WebSocket 包都发送到达 Tomcat 了,这也可以确认是 Server side 的问题,然鹅

绝望 … so sad

回答上篇末尾的调用时序问题

Tomcat 7 默认使用 BIO Http11Protocol

注意到在 Http11Protocol 的 process 方法中(AbstractProtocol)有如下逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
do {
// ...
else if (processor.isUpgrade()) {
// Init WebSocket OPEN_READ processor httpUpgradeHandler is null
// And when the second round came, upgradeDispatch will dispatch OPEN_READ status
// ha, upgradeServletInputStream.onDataAvailable() will be called
state = processor.upgradeDispatch(status);
// but, pay attention to that fact
// WsFrameServer.onDataAvailable is a for loop so it will remain in it
// that's the reason why we can't see the state of Upgraded return
} else {
// Init WebSocket OPEN_READ will go through here
state = processor.process(wrapper);
// Finally adapter.service(request, response) will be called in processor.process
// then the request will go through filter
// Once it arrives the WsFilter
// and it will call upgrade method with a WsHttpUpgradeHandler in WsFilter
// That will instance a WsHttpUpgradeHandler and call action Upgrade hook
// That's the time httpUpgradeHandler be set to WsHttpUpgradeHandler and no more null value
// So isUpgrade() return true and state will become
// SocketState.UPGRADING
// And it will create a new processor named upgradeProcessor to replace previous processor
// in the state of SocketState.UPGRADING in following code
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
// it means that if it is a WebSocket Request
// it turns out to be once debug info
// Status in: OPEN_READ, State out: UPGRADING
}
} while (state == SocketState.UPGRADING);
// ...
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
httpUpgradeHandler.init((WebConnection) processor);
}

所以 Tomcat 7 WebSocket 实现上的时序是正确的,大致的请求处理流程如下

  • Client 与 OS 特定端口建立 Connection …
  • Http11Protocol.process 被调用,传入 OPEN_READ status
  • processor.process(wrapper) 被调用
  • WsFilter 被调用,发现为 HTTP Upgrade to websocket request,设置 httpUpgradeHandler 为 WsHttpUpgradeHandler
  • processor.process(wrapper) 返回 UPGRADING state
  • Http11Protocol.process 创建新的 upgradeProcessor 以代替之前的 processor
  • 调用 WsHttpUpgradeHandler.init 方法
  • init 方法执行
    • 在 sos 上注册 WsWriteListener 方法
    • 调用 onOpen 方法
    • 在 sis 上注册 WsReadListener 方法
  • status 仍然为 OPEN_READ
  • processor.upgradeDispatch(status) 被调用,for loop socketInputStream

上述过程均在同一线程中执行,Tomcat 7 Http11Protocol 实现的是简单的处理模型,Acceptor 获取 socket,当有新的 socket 连接时,使用一个新线程去处理。

现在我们可以给出 WebSocket ServerEndpoint 的精确时序了

  • SocketState OPEN_READ 后 ServerEndpoint onOpen 被调用
  • WsFrameServer onDataAvailable 被调用
  • onDataAvailable 组装好 Message 后 ServerEndpoint onMessage 被调用

因此即使 onOpen 执行时间过长,数据也只是被累积在 socket 输入缓冲区中,一旦执行结束后,依然能触发 onDataAvailable,从而回调 ServerEndpoint onMessage

另一方面也说明了,onOpen 首先被执行,onMessage 其次被执行的时序

值得注意的是 WebSocket 一旦成功 upgradeDispatch(OPEN_READ) state 后,逻辑将会停留在循环从 socketInputStream 获取数据上

而我们知道 WebSocket 为双工协议,那么 OPEN_WRITE 状态什么时候被 upgradeDispatch ? 这不被 upgradeDispatch 的话,Server side 就没法向 Client side 推送数据了?

WsRemoteEndpointImplServer onWritePossible

从 byteBuffers 中读取字节,并写入 socketOutputStream 中,byteBuffers 读取 complete 发送完成后,循环退出

这仅是 onWritePossible 自身的逻辑

而实际上如果是 upgradeDispatch(OPEN_WRITE) trigger onWritePossible(useDispatch: false)

WsRemoteEndpointImplServer doWrite trigger onWritePossible(useDispatch: true)

耐人寻味

注意到我们使用 wsSession.getBasicRemote().sendText() 发送消息,实际上最后调用的为 doWrite 方法,所以逻辑就清晰了,实际上并不一定需要 upgradeDispatch(OPEN_WRITE) 才能写入,只不过在实现上,通过 upgradeDispatch(OPEN_WRITE) 执行的 doWrite 与在 onOpen / onMessage 中使用 wsSession 直接写入的 doWrite 传入参数不同,均能完成写入

  • upgradeDispatch(OPEN_WRITE): onWritePossible(useDispatch: false)
  • wsSession.getBasicRemote().sendText(…): onWritePossible(useDispatch: true)

summary

这块逻辑看的还不是特别清晰,主要是为了没有 OPEN_WRITE state 也是可以写入的这个结论

所以完整的 WebSocket 请求流程

  • 客户端发起 WebSocket 请求时,需要新建一个链接
  • Tomcat 接收到 socket 链接后,按通用处理流程处理之
  • WebSocket 的第一个请求为 HTTP GET Upgrade 请求,这个请求在通用处理流程中经过 WsFilter
  • WsFilter 识别到该请求为 WebSocket 请求,随后设置 WsHttpUpgradeHandler 为其 httpUpgradeHandler,调用 upgrade 方法,并 preInit WsHttpUpgradeHandler
  • Tomcat 发现当前 socket state 变为 UPGRADING,因此创建 upgradeProcessor 以替换之前的 http processor,此时会执行 WsHttpUpgradeHandler init 方法
  • WsHttpUpgradeHandler init 方法中会回调 ServerEndpoint onOpen 方法
  • Tomcat 继续处理时发现 processor 为 upgradeProcessor,因此调用 upgradeDispatch(OPEN_READ)
  • 这时触发 WsHttpUpgradeHandler.onDataAvailable(),随即继续调用至 WsFrameServer.onDataAvailable()
  • WsFrameServer.onDataAvailable() 尝试获取对象锁之后,进入 for loop 获取 socketInputStream 输入逻辑,组装好数据后,回调 ServerEndpoint onMessage 方法,即业务层此时可以感知到 Client 发送的数据
  • 到这里当前线程就一直在干读取 socket 中的数据并调用 onMessage 这件事儿了

接着上一篇,来分析一下 Tomcat 7 中 WebSocket 的实现

WsFilter

WebSocket 的实现入口处为 WsFilter,该 Filter 检查 HTTP header 中是否包含下述字段

1
Upgrade: websocket

且为 HTTP GET 请求。若符合 WebSocket Handshake 要求,则从 WebSocketContainer 中查找请求路径是否有 filter 拦截,若没有则继续后续的 filter,若有则进入 UpgradeUtil.doUpgrade 方法

WsServerContainer

回头来看 WsServerContainer 的初始化,当然最为重要的是注册了 WsFilter

1
2
3
4
5
6
FilterRegistration.Dynamic fr = servletContext.addFilter(
"Tomcat WebSocket (JSR356) Filter", new WsFilter());
fr.setAsyncSupported(true);
EnumSet<DispatcherType> types = EnumSet.of(DispatcherType.REQUEST,
DispatcherType.FORWARD);
fr.addMappingForUrlPatterns(types, true, "/*");

可见 WsFilter 拦截所有请求,当遇到 HTTP Upgrade to websocket 协议的请求时执行 doUpgrade 逻辑

UpgradeUtil.doUpgrade

doUpgrade 在各种检查后,可以接受 Client Upgrade 请求时,需向 Client 端返回的 HTTP 报文头中添加如下字段(Sec-WebSocket-Protocol / Sec-WebSocket-Extensions 可选)

1
2
3
4
5
Upgrade: websocket
Connection: upgrade
Sec-WebSocket-Accept: [key]
Sec-WebSocket-Protocol: [subProtocol]
Sec-WebSocket-Extensions: [extensions]

当然还需要初始化 ServerEndpoint 实例(@ServerEndpoint 注解)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Endpoint ep;
try {
Class<?> clazz = sec.getEndpointClass();
if (Endpoint.class.isAssignableFrom(clazz)) {
ep = (Endpoint) sec.getConfigurator().getEndpointInstance(
clazz);
} else {
ep = new PojoEndpointServer();
// Need to make path params available to POJO
perSessionServerEndpointConfig.getUserProperties().put(
PojoEndpointServer.POJO_PATH_PARAM_KEY, pathParams);
}
} catch (InstantiationException e) {
throw new ServletException(e);
}

最后向 Client 返回 HTTP Response

1
2
3
4
5
WsHttpUpgradeHandler wsHandler =
((RequestFacade) inner).upgrade(WsHttpUpgradeHandler.class);
wsHandler.preInit(ep, perSessionServerEndpointConfig, sc, wsRequest,
negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,
req.isSecure());

而 WsHttpUpgradeHandler 则会用于处理

1
The handler for all further incoming data on the current connection.

WsHttpUpgradeHandler

WsHttpUpgradeHandler init 方法中干了很多事情

  • 首先从 WebConnection 中获取输入流/输出流
1
2
3
4
5
6
7
8
9
this.connection = connection;
AbstractServletInputStream sis;
AbstractServletOutputStream sos;
try {
sis = connection.getInputStream();
sos = connection.getOutputStream();
} catch (IOException e) {
throw new IllegalStateException(e);
}
  • 实例化 WsSession,注意 SessionId 使用一个 static AtomicLong 维护,每次增加 1
  • 实例化 WsFrameServer,用于读写 Message
  • 在 sos 上注册 WsWriteListener
  • 调用 ServerEndpoint onOpen 方法
  • 在 sis 上注册 WsReadListener

summary

所以综上所述,每一次 HTTP Upgrade 请求均会创建一个新的 ServerEndpoint 实例,因此定义于 ServerEndpoint 中的 static 变量需注意确保线程安全

另外 ServerEndpoint 中 onOpen 和 onMessage 的执行顺序为 onOpen 必然首先执行,若 onOpen 执行时间过长,则就算 sis 中有数据等待处理,也不会触发 onMessage,因为从 WsHttpUpgradeHandler init 方法中可以看出 onOpen 调用结束后,才会在 sis 上注册 WsReadListener。接下来继续分析,如何触发 WsReadListener

Connector

AbstractServiceInputStream.onDataAvailable() 方法中调用 listener.onDataAvailable(); 即 WsReadListener.onDataAvailable()

而 AbstractServiceInputStream.onDataAvailable() 又由 AbstractProcessor.upgradeDispatch(SocketStatus status) 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public final SocketState upgradeDispatch(SocketStatus status)
throws IOException {
if (status == SocketStatus.OPEN_READ) {
try {
upgradeServletInputStream.onDataAvailable();
} catch (IOException ioe) {
// The error handling within the ServletInputStream should have
// marked the stream for closure which will get picked up below,
// triggering the clean-up of this processor.
getLog().debug(sm.getString("abstractProcessor.onDataAvailableFail"), ioe);
}
} else if (status == SocketStatus.OPEN_WRITE) {
try {
upgradeServletOutputStream.onWritePossible();
} catch (IOException ioe) {
// The error handling within the ServletOutputStream should have
// marked the stream for closure which will get picked up below,
// triggering the clean-up of this processor.
getLog().debug(sm.getString("abstractProcessor.onWritePossibleFail"), ioe);
}
} else if (status == SocketStatus.STOP) {
try {
upgradeServletInputStream.close();
} catch (IOException ioe) {
getLog().debug(sm.getString(
"abstractProcessor.isCloseFail", ioe));
}
try {
upgradeServletOutputStream.close();
} catch (IOException ioe) {
getLog().debug(sm.getString(
"abstractProcessor.osCloseFail", ioe));
}
return SocketState.CLOSED;
} else {
// Unexpected state
return SocketState.CLOSED;
}
if (upgradeServletInputStream.isCloseRequired() ||
upgradeServletOutputStream.isCloseRequired()) {
return SocketState.CLOSED;
}
return SocketState.UPGRADED;
}

Tomcat 7 默认使用 BIO,Http11Protocol.createUpgradeProcessor,其中将 socket 超时时间设置为不超时,并返回一个 Processor

因此 Tomcat 处理 WebSocket 请求的大致流程为

  • JIOEndpoint accept socket and new a thread to handle it
  • Worker wait for the next socket to be assigned and call handler to process socket
  • Http11ConnectionHandler
  • Http11Processor

客户端首先打开一个 connection,connection 建立后,向服务器端发起 WebSocket Handshake 请求,服务器接受后,返回 101 status code,双方可在当前 connection 上双工通信

当 SocketStatus 为 OPEN_READ 时,回调 readListener 的 onDataAvailable 方法,此处逻辑有 trick 的地方,值得注意的是如果 SocketStatus.OPEN_READ 时,仍未完成注册 readListener,则不会触发 listener.onDataAvailable() … 显然,因为 listener 为 null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final void onDataAvailable() throws IOException {
if (listener == null) { // it doesn't have a listener
return;
}
ready = Boolean.TRUE;
Thread thread = Thread.currentThread();
ClassLoader originalClassLoader = thread.getContextClassLoader();
try {
thread.setContextClassLoader(applicationLoader);
listener.onDataAvailable();
} finally {
thread.setContextClassLoader(originalClassLoader);
}
}

在 WsFrameServer 的 onDataAvailable 方法中首先尝试获取对象锁,获取成功后,for loop 监听 Servlet 输入流,当有数据时读取数据供 WsFrameServer 处理,处理 okay 后,回调 ServerEndpoint 的 onMessage 方法,业务层即感知到从 ws 连接中获取到数据

另外 ServerEndpoint onOpen 是在 WsHttpUpgradeHandler init 方法中被回调,看看官方文档对 handler 的 init 方法的描述

This method is called once the request/response pair where the upgrade is initiated has completed processing and is the point where control of the connection passes from the container to the HttpUpgradeHandler.

https://tomcat.apache.org/tomcat-7.0-doc/api/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.html

so 理论上要想使得 Tomcat 7 WebSocket 能正常工作的前提为

  • WsHttpUpgradeHandler init 方法被调用 —— WsHttpUpgradeHandler
  • 在 sos 上注册 WsWriteListener 方法结束 —— WsHttpUpgradeHandler
  • SocketStatus.OPEN_WRITE —— AbstractProcessor
  • onOpen 方法回调结束 —— ServerEndpoint
  • 在 sis 上注册 WsReadListener 方法结束 —— WsHttpUpgradeHandler
  • SocketStatus.OPEN_READ —— AbstractProcessor

接下来需要搞清楚一个问题,上述这些逻辑是单线程在跑,还是多线程,单线程的话,时序问题不大,但是多线程的情况下,就很有讲究了

To be cont. 下一遍回答上述时序问题

又是一部折腾史

之前提到了最近在基于 WebSocket 协议实现 WebTerminal 特性,无奈生产环境遇到了一个 weired bug,百思不得其解,只好看看 WebSocket 在 Tomcat 7 中是如何实现的

环境/版本信息

1
2
3
OS: macOS Sierra 10.12.6
Tomcat: 7.0.88
Intellij: 2016.2

jenv 管理 java 版本

1
2
3
4
5
> jenv versions
system
* 1.6.0.65 (set by /Users/zrss/.jenv/version)
1.7.0.181
1.8.0.172

当前 java 版本

1
2
> jenv global
1.6.0.65

ant 版本

1
2
> ant -version
Apache Ant(TM) version 1.9.12 compiled on June 19 2018

Set up Tomcat src code in Intellij

参考如下官方构建指南即可

http://tomcat.apache.org/tomcat-7.0-doc/building.html

第一步当然是下载源码 http://tomcat.apache.org/download-70.cgi#7.0.88
,当前源码地址 http://mirrors.hust.edu.cn/apache/tomcat/tomcat-7/v7.0.88/src/apache-tomcat-7.0.88-src.tar.gz

解压后目录结论如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apache-tomcat-7.0.88-src
├── BUILDING.txt
├── CONTRIBUTING.md
├── KEYS
├── LICENSE
├── NOTICE
├── README.md
├── RELEASE-NOTES
├── RUNNING.txt
├── STATUS.txt
├── bin
├── build.properties.default
├── build.xml
├── conf
├── java
├── modules
├── res
├── test
└── webapps

Tomcat 诞生的年代比较久远,还是用的比较古老的构建工具 ant,复制 build.properties.default 至 build.properties

1
cp build.properties.default build.properties

Tomcat7 WebSocket 依赖 java7,因此需要设置 build.properties 中的 java7 path。取消该文件中的下述定义注释(54行),并填写相应 jdk 路径

1
java.7.home=/Library/Java/JavaVirtualMachines/zulu-7.jdk/Contents/Home

如果使用 jenv 管理 java 版本,可使用如下命令查看当前 java 的 java_home path
/usr/libexec/java_home -v $(jenv version-name)

确认当前 java 版本为 1.6

1
2
3
4
> java -version
java version "1.6.0_65"
Java(TM) SE Runtime Environment (build 1.6.0_65-b14-468)
Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-468, mixed mode)

设置 JAVA_HOME(make sure it is a 1.6 java’s home)

1
2
3
> export JAVA_HOME=$(/usr/libexec/java_home -v $(jenv version-name))
> echo $JAVA_HOME
/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home

当然国内的小伙伴还要多做一件事儿,那就是配置 proxy

1
2
3
proxy.use=on
proxy.host=127.0.0.1
proxy.port=1081

这个 proxy 方案也是非常常见的了,shadowsocks (socks5) + privoxy (http)

最后进入 Tomcat 源码目录执行 ant 命令构建

1
2
cd /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src
ant

jar 包下载顺利的话,无问题

1
2
BUILD SUCCESSFUL
Total time: 23 seconds

Import Tomcat src to Intellij

在 Tomcat 源码目录执行,如若遇到因为 SSL 无法下载的依赖,手动下载之,并放入 testexist 路径(讲真,是个体力活儿,得有六七个包吧)

1
ant ide-eclipse

最后终于成功了

1
2
BUILD SUCCESSFUL
Total time: 1 second

构建 websocket eclipse

1
ant ide-eclipse-websocket

顺利成功

1
2
BUILD SUCCESSFUL
Total time: 1 second

到此可以开始导入 IntelliJ 了

IntelliJ 欢迎页面选择 Import Project,在弹出框中选择 Tomcat 源码根路径,一路 Next 至 Select Eclipse projects to import,勾选上 tomcat-7.0.x 继续 Next,最后 Project SDK 选择 1.6 即可

WebSocket 代码在如下路径

1
/Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/java/org/apache/tomcat/websocket

最后查看 Tomcat version 信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> cd output/build/bin
> ./version.sh
Using CATALINA_BASE: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build
Using CATALINA_HOME: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build
Using CATALINA_TMPDIR: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/temp
Using JRE_HOME: /Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
Using CLASSPATH: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/bootstrap.jar:/Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/tomcat-juli.jar
Server version: Apache Tomcat/7.0.88
Server built: Jul 6 2018 14:30:23 UTC
Server number: 7.0.88.0
OS Name: Mac OS X
OS Version: 10.12.6
Architecture: x86_64
JVM Version: 1.6.0_65-b14-468
JVM Vendor: Apple Inc.

至此 Tomcat 7 导入 IntelliJ 中 okay,可以愉快的查看代码了。当然查看 WebSocket 相关的实现时,在 IntelliJ Project Structure 中切换 SDK 至 1.7 即可

又要开始分析一个疑难杂症了

现象是 100 完成数,40 并发度的 job 无法执行结束,两个 pod 一直处于 running 状态,查看发现 container 已退出

job-controller 的工作原理

job-controller 较为短小精悍,属于 kube-controller-manager 代码中的一部分

1
2
3
4
5
go job.NewJobController(
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("job-controller"),
).Run(int(ctx.Options.ConcurrentJobSyncs), ctx.Stop)

从启动代码中可以看出 job-controller 关注 pod 与 job 这两种资源的变化情况

  • ctx.InformerFactory.Core().V1().Pods()
  • ctx.InformerFactory.Batch().V1().Jobs()
1
ConcurrentJobSyncs=5

启动 5 个 worker goroutine

1
2
3
for i := 0; i < workers; i++ {
go wait.Until(jm.worker, time.Second, stopCh)
}

在 worker 协程中,1s 执行一次 worker 方法

worker 方法,实现从 queue 中获取待处理的对象(key),并调用 syncJob 方法处理之

  • syncJob 处理成功
1
2
3
4
5
6
7
8
9
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
if job := cur.(*batch.Job); !IsJobFinished(job) {
jm.enqueueController(job)
}
},
DeleteFunc: jm.enqueueController,
})
1
2
3
4
5
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
})

kubelet

大致来看与 job-controller 的关系不大,可能还是 kubelet 的问题

我们知道 kubelet 通过 PLEG 来感知节点上 container 的状态,另外 job pod 的 restart policy 目前仅支持两种 Never / onFailure,一般来说默认选择 onFailure 比较合适

这样业务容器在正常退出后(exit 0),kubelet pleg 感知到后,再加上 onFailure 的策略,正常情况下会 killing pod,job-controller 感知到 pod 减少后,即完成数又增加了 1,即可成功结束

看下 kubelet 的代码确认逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Get all the pods.
podList, err := g.runtime.GetPods(true)
ShouldContainerBeRestarted

// Check RestartPolicy for dead container
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, format.Pod(pod))
return false
}
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
// Check the exit code.
if status.ExitCode == 0 {
glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, format.Pod(pod))
return false
}
}
return true

如此情况下需要 KillPod

1
2
3
if keepCount == 0 && len(changes.ContainersToStart) == 0 {
changes.KillPod = true
}

所以正常情况下 job 控制的 pod,重启策略为 RestartPolicyOnFailure,如是正常退出的情况,则该 container 无需重启,而再加上述的判断,则当前 pod 需要被删除

调用该 killPodWithSyncResult 方法

经过如此分析,可能出现的问题原因及疑点

  • 2 个 pod 处于 running 状态,而对应的 container 却没有了,尝试从 relist: g.runtime.GetPods(true) 寻找可能原因
  • 使用命令行如何获取节点上的 container 容器,如果 container 正常获取,且 pod 对应的 container 已正常退出,那么为何未看到 SyncLoop(PLEG) ContainerDied 事件
  • pod 状态也是持续 running,而我们知道在 syncPod 中会调用 statusManager 设置新的 pod status,如果能获取到正确的 container info,pod status 也会被正确的更新至 kube-apiserver
  • 正常情况下,至多为一个 pod 保留一个 container 记录,其余均会被 clean。为何当时 docker ps -a 无一相关的 container,全都被 clear 了?那意味着 node evicted,或者是 pod 被 delete 了。evicted 的可能性较小,而 pod 被 delete 的话,除了人为,job-controller activeDeadline 超了也会设置

最近在着手实现 Console k8s workload upgrade 相关的任务,然而 mock 数据不直观,使用完整的集成环境,又存在不稳定性,经常与其他任务冲突(比如测试要开始搞破坏了)。遂重新开始折腾 k8s cluster setup

当然 k8s 发展到今天 local up 已经做的相对简单(可能对于自由的网络环境来说,不自由的就求爷爷告奶奶了)

这里 local up k8s cluster 使用的是 minikube 方案 https://v1-9.docs.kubernetes.io/docs/getting-started-guides/minikube/

此篇适用下述两条背景

  • 非阿里 minikube 版本
  • 解决 minikube docker proxy not work 问题

Environment

  • OS: macOS Sierra 10.12.6
  • minikube: v0.25.0
  • VM Driver: virtualbox
  • ISO version: minikube-v0.25.1.iso

Install

macOS 安装 minikube 可以说很简单了

已安装 homebrew

homebrew install minikube

1
brew cask install minikube

Network

国内的困难主要是这个

我使用的方案是 Shadowsocks + privoxy,相关的资料非常多,不赘述了

privoxy 的配置如下 cat /usr/local/etc/privoxy/config

1
2
listen-address 0.0.0.0:1081
forward-socks5 / 127.0.0.1:1080 .

1080 端口为 socks5 的监听端口(即 Shadowsocks),privoxy 监听 1081 端口,将 1081 端口的 http 请求转发至 1080 socks5 监听端口上,这样就能让 http 请求也经过 socks5 转发了

测试一下 privoxy 是否正常工作

1
2
3
4
5
6
7
8
# check process
ps -ef | grep privoxy
# curl port
curl http://127.0.0.0:1081
# netstat (recommended)
netstat -an | grep 1081
# lsof
lsof -i:1081

Minikube start

使用上一步搭建好的 privoxy http 代理

我使用的终端为 iTerm,理论上系统自带 Terminal 也 ok

设置 http / https 代理

1
2
export http_proxy=http://127.0.0.1:1081
export https_proxy=https://127.0.0.1:1081

启动 minikube

1
minikube start

Docker proxy

minikube 下载完成 iso 后,再 bootstrap k8s cluster,cluster 起来之后,会启动一些 system 组件,比如 kube-addon-manager-minikube/ dashboard / dns 等,然而不幸的是这些 pod 会一直处于 ContainerCreating 的状态

查看 events

1
kubectl describe po kube-addon-manager-minikube -nkube-system

None

查看 kubelet 日志

1
minikube logs kubelet

发现端倪

gcr.io/google_containers/pause-amd64:3.0,pause 容器无法 pull 下来

继续搜索得知 minikube 可配置 docker proxy https://github.com/kubernetes/minikube/blob/v0.25.0/docs/http_proxy.md

遂停止 minikube 并以新参数启动之

1
2
minikube start --docker-env HTTP_PROXY=http://127.0.0.1:1081 --docker-env HTTPS_PROXY=https://127.0.0.1:1081
export no_proxy=$no_proxy,$(minikube ip)

呃,然而事实证明并不 work,遂登陆 vm

1
minikube ssh

尝试 curl 该 1081 端口

1
2
curl http://127.0.0.1:1081
>> curl: (7) Failed to connect to 127.0.0.1 port 1081: Connection refused

可见 vm 中该端口并不通

稍加思索,解决的思路应为在 virtualbox vm 中如何 connect back to host,因为 privoxy 实际上监听的是 host 的 1081 端口。几番搜索后,发现在 virtualbox vm 中可通过 10.0.2.2 IP connect back to host https://superuser.com/questions/310697/connect-to-the-host-machine-from-a-virtualbox-guest-os,登陆 vm

1
curl http://10.0.2.2:1081

果然能通了

于是再次尝试修改 minikube 启动命令

1
minikube start --docker-env HTTP_PROXY=http://10.0.2.2:1081 --docker-env HTTPS_PROXY=https://10.0.2.2:1081

糟糕的是,仍然不 work …,所以问题集中到了 http_proxy 未生效上

登陆 vm 查看 docker version / info,希望能获取到一些线索

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# version
docker version
Client:
Version: 17.09.0-ce
API version: 1.32
Go version: go1.8.3
Git commit: afdb6d4
Built: Tue Sep 26 22:39:28 2017
OS/Arch: linux/amd64
Server:
Version: 17.09.0-ce
API version: 1.32 (minimum version 1.12)
Go version: go1.8.3
Git commit: afdb6d4
Built: Tue Sep 26 22:45:38 2017
OS/Arch: linux/amd64
Experimental: false
# info
docker info
Containers: 0
Running: 0
Paused: 0
Stopped: 0
Images: 0
Server Version: 17.09.0-ce
Storage Driver: overlay2
Backing Filesystem: extfs
Supports d_type: true
Native Overlay Diff: true
Logging Driver: json-file
Cgroup Driver: cgroupfs
Plugins:
Volume: local
Network: bridge host macvlan null overlay
Log: awslogs fluentd gcplogs gelf journald json-file logentries splunk syslog
Swarm: inactive
Runtimes: runc
Default Runtime: runc
Init Binary: docker-init
containerd version: 06b9cb35161009dcb7123345749fef02f7cea8e0
runc version: 3f2f8b84a77f73d38244dd690525642a72156c64
init version: N/A (expected: )
Security Options:
seccomp
Profile: default
Kernel Version: 4.9.64
Operating System: Buildroot 2017.11
OSType: linux
Architecture: x86_64
CPUs: 2
Total Memory: 1.953GiB
Name: minikube
ID: 6RR3:WAF4:FIGA:TTEG:5UE6:V3RD:JNQV:WQQ4:ER3T:ETKJ:ZVP4:2Z7M
Docker Root Dir: /var/lib/docker
Debug Mode (client): false
Debug Mode (server): false
Registry: https://index.docker.io/v1/
Labels:
provider=virtualbox
Experimental: false
Insecure Registries:
10.96.0.0/12
127.0.0.0/8
Live Restore Enabled: false

然而似乎和问题并没有什么关联,http_proxy 和 docker daemon 有关,和 docker 没啥关系,所以在 version / info 中都未见 http_proxy 相关配置 https://github.com/docker/distribution/issues/2397#issuecomment-330079118

追溯到这里,只能看看 minikube 代码中是如何使用 docker-env 这个传入参数的了

how does minikube start

overall

  • startHost
  • startK8S

startHost

  • create virtualbox driver with boot2docker iso
  • waiting docker set up

step by step 的过程省略,最后找到如下代码片段,显示 docker-env 实际上并不会生效

https://github.com/kubernetes/minikube/blob/v0.25.0/cmd/minikube/cmd/start.go#L157

1
2
3
4
5
6
7
start := func() (err error) {
host, err = cluster.StartHost(api, config)
if err != nil {
glog.Errorf("Error starting host: %s.\n\n Retrying.\n", err)
}
return err
}

https://github.com/kubernetes/minikube/blob/v0.25.0/pkg/minikube/machine/client.go#L114-L135

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
return &host.Host{
ConfigVersion: version.ConfigVersion,
Name: driver.GetMachineName(),
Driver: driver,
DriverName: driver.DriverName(),
HostOptions: &host.Options{
AuthOptions: &auth.Options{
CertDir: api.certsDir,
CaCertPath: filepath.Join(api.certsDir, "ca.pem"),
CaPrivateKeyPath: filepath.Join(api.certsDir, "ca-key.pem"),
ClientCertPath: filepath.Join(api.certsDir, "cert.pem"),
ClientKeyPath: filepath.Join(api.certsDir, "key.pem"),
ServerCertPath: filepath.Join(api.GetMachinesDir(), "server.pem"),
ServerKeyPath: filepath.Join(api.GetMachinesDir(), "server-key.pem"),
},
EngineOptions: &engine.Options{
StorageDriver: "aufs",
TLSVerify: true,
},
SwarmOptions: &swarm.Options{},
},
}, nil

可见默认 SwarmOptions 是个空对象,其中值得注意的是 IsSwarm 的值为 false

https://github.com/kubernetes/minikube/blob/v0.25.0/pkg/minikube/cluster/cluster.go#L243-L250

1
2
3
4
5
6
7
h, err := api.NewHost(config.VMDriver, data)
if err != nil {
return nil, errors.Wrap(err, "Error creating new host")
}
h.HostOptions.AuthOptions.CertDir = constants.GetMinipath()
h.HostOptions.AuthOptions.StorePath = constants.GetMinipath()
h.HostOptions.EngineOptions = engineOptions(config)

将 docker-env 赋值与 h.HostOptions.EngineOptions.Env

https://github.com/kubernetes/minikube/blob/v0.25.0/vendor/github.com/docker/machine/libmachine/provision/boot2docker.go#L232

1
2
3
4
provisioner.SwarmOptions = swarmOptions
provisioner.AuthOptions = authOptions
provisioner.EngineOptions = engineOptions
swarmOptions.Env = engineOptions.Env

最后将 docker-env 传与 swarmOptions.Env,而我们又知道 IsSwarm 的值为 false,因此实际上该配置并不会生效 … 社区真是给处于不自由网络地区的童鞋埋了个大坑 …

Back to Docker proxy

回到如何在 minikube 中配置 Docker proxy 的问题,实际上可以参考 Docker 官方的文档配置,使用 systemd

https://docs.docker.com/config/daemon/systemd/#httphttps-proxy

1
2
3
minikube ssh
sudo mkdir -p /etc/systemd/system/docker.service.d
sudo vi /etc/systemd/system/docker.service.d/http-proxy.conf

输入如下内容并保存退出

1
2
[Service]
Environment="HTTP_PROXY=http://10.0.2.2:1081" "HTTPS_PROXY=https://10.0.2.2:1081"

Flush changes & Restart Docker

1
2
sudo systemctl daemon-reload
sudo systemctl restart docker

Verify that the configuration has been loaded

1
systemctl show --property=Environment docker

执行结束之后,此时在 vm 中

1
docker pull gcr.io/google_containers/pause-amd64:3.0

终于可以正常 pull image 了,system pod 也可以正常 running 起来了

1
2
3
4
5
NAME                                    READY     STATUS    RESTARTS   AGE
kube-addon-manager-minikube 1/1 Running 4 12h
kube-dns-54cccfbdf8-wfnxm 3/3 Running 3 12h
kubernetes-dashboard-77d8b98585-t59gj 1/1 Running 1 12h
storage-provisioner 1/1 Running 1 12h

不过因为该改动并未固化到 iso 中,因此 minikube stop 之后改动会丢失 … 另外一个折中的办法

Another method for Docker proxy (recommended)

workaround

之前我们知道,在 terminal 中 export http_proxy 之后,minikube 即可使用 proxy 访问网络资源,而在 minikube –help 中发现 minikube 可以 cache image,所以我们可以 cache 需要使用的 image 资源,如

1
2
3
export http_proxy=http://127.0.0.1:1081
export no_proxy=$no_proxy,$(minikube ip)
minikube cache add gcr.io/google_containers/pause-amd64:3.0

也可以解决问题,比如我目前 cache 的 image

1
2
3
4
5
6
7
8
minikube cache list
gcr.io/google-containers/kube-addon-manager:v6.5
gcr.io/google_containers/pause-amd64:3.0
gcr.io/k8s-minikube/storage-provisioner:v1.8.1
k8s.gcr.io/k8s-dns-dnsmasq-nanny-amd64:1.14.5
k8s.gcr.io/k8s-dns-kube-dns-amd64:1.14.5
k8s.gcr.io/k8s-dns-sidecar-amd64:1.14.5
k8s.gcr.io/kubernetes-dashboard-amd64:v1.8.1

cache 这些 image 之后,就可以使得 kube-system 下面的 pod 都 running 了

minikube logs healthcheck error

使用 minikube 过程中发现其 logs 中一直有如下错误日志

1
Jun 23 18:15:15 minikube localkube[3034]: E0623 18:15:15.392453    3034 healthcheck.go:317] Failed to start node healthz on 0: listen tcp: address 0: missing port in address

查看了相关代码,似乎是正常现象,不过这个实现也是太奇怪了 … 可参考如下 issue,https://github.com/kubernetes/minikube/issues/2609#issuecomment-399701288

k8s 1.7.6

kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go

claim.Spec.VolumeName == nil 时

storage-controller 会从已有的 volume 中查找符合该 claim 的 volume,如果没找到该 volume,则从 claim 的 annotation 中获取 volume.beta.kubernetes.io/storage-class 字段 / 或从 Spec.StorageClassName 获取 storage-class 的值

  • volume.beta.kubernetes.io/storage-class
  • Spec.StorageClassName

在允许动态提供存储(enableDynamicProvisioning)的情况下,尝试去提供一个 volume

1
newClaim, err := ctrl.setClaimProvisioner(claim, storageClass)

动态的 pvc 会增加如下 annotation

1
2
3
volume.beta.kubernetes.io/storage-provisioner: class.Provisioner
pv.kubernetes.io/bound-by-controller: yes
pv.kubernetes.io/provisioned-by: plugin.GetPluginName()

从存储提供服务获取 volume 的过程是异步的,当获取完成时,设置如下 annotation

1
pv.kubernetes.io/bind-completed: yes

如果是其他可以直接 bind 的情况,在 bind 的方法中也会设置上述 annotation

所以可以通过 pvc annotation

  • pv.kubernetes.io/bound-by-controller: yes

确认该 pvc 为动态创建还是直接使用

不过该字段还有一种情况下可能被设置

1
2
3
4
5
6
7
8
9
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}

volume 和 claim binding 时,发现 volume 与 claim 的字段不匹配

1
2
3
4
5
// Check if the claim was already bound (either by controller or by user)
shouldBind := false
if volume.Name != claim.Spec.VolumeName {
shouldBind = true
}

clain 和 volume binding 时,也会出现这种情况,当 volume 与 claim 的字段不匹配时

目前实践经验不足,还不是特别明白这是什么情况下才会出现的,使用 pv.kubernetes.io/bound-by-controller: yes 判断动态创建是否准确?

我们知道在 k8s 集群中,可以通过 kubeclt exec -ti 命令远程登录容器,以执行命令。而要在 Console 上实现这个特性的集成,需要依赖 websocket 协议 (https://tools.ietf.org/html/rfc6455)[https://tools.ietf.org/html/rfc6455]

下面全面回顾一下集成过程中涉及到的方方面面知识

  • kube-apiserver
  • nginx
  • tomcat
  • webpack-dev-server

exec in kube-apiserver

to be cont.

404 nginx

Problem: websocket 404 through nginx

遇到的问题,本地开发完成之后,部署到环境中,websocket 请求在直接用 IP 访问时 okay,而经过了二级域名则不 okay。二级域名是由 Nginx 配置转发的。查看 Nginx 的配置

Nginx 配置为通用的配置,即 upstream / server 的配置

1
2
3
4
5
6
7
8
9
10
upstream console {
ip_hash;
server IP:PORT;
server IP:PORT;
}
server {
location /serviceName {
proxy_pass https://console;
}
}

http 模块有如下配置

1
2
3
4
5
6
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
}

参考该文档 http://nginx.org/en/docs/http/ngx_http_map_module.html 解读 map 的语义

上述语句的语义为,动态设置 $connection_upgrade 变量的值,当请求头中 upgrade 值为 ‘’ 时,则 $connection_upgrade 值为 close,当 upgrade 值非空时,其值为 upgrade

对于 websocket 请求来说,其第一个请求中会携带请求头

1
2
Connection: upgrade
Upgrade: websocket

根据 http://nginx.org/en/docs/http/websocket.html 文档说明

As noted above, hop-by-hop headers including “Upgrade” and “Connection” are not passed from a client to proxied server

Upgrade / Connection header 并不会由 client 传递至被代理的 server,因此在 nginx 的 server 配置处需要手动增加这两 header

1
2
3
4
5
6
7
server {
location /serviceName/ws {
proxy_pass https://console;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}

之前 nginx 未这么配置,导致 ws 请求被转发至后端时,缺少了这两请求头,导致 tomcat wsservlet 没识别到这是 ws 请求,又没有其他的 filter 或 servlet 处理,因此返回了 404

websocket in tomcat

tomcat 7

上述问题中 nginx 为公共组件,需要更改配置的话,要走流程,为了可以先行调试,所以考虑在 webserver 侧想想办法解决请求头的问题

分析

在 WsFilter 中判断请求为 ws 请求的要素

  • GET 请求
  • header 中包含 Upgrade: websocket 值
1
2
3
4
5
6
7
8
9
public static boolean isWebSocketUpgradeRequest(ServletRequest request,
ServletResponse response) {
return ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
headerContainsToken((HttpServletRequest) request,
Constants.UPGRADE_HEADER_NAME,
Constants.UPGRADE_HEADER_VALUE) &&
"GET".equals(((HttpServletRequest) request).getMethod()));
}

在 WsServerContainer 中将该 Filter 加入到 servletContext 中,并设置其拦截所有请求

1
fr.addMappingForUrlPatterns(types, true, "/*");

而这个地方的调用在 WsServerContainer 构造函数中发生,继续探寻而上

WsSci 实现了 ServletContainerInitializer 接口,在 onStartup 接口实现中构造了 WsServerContainer 类

https://tomcat.apache.org/tomcat-8.0-doc/servletapi/javax/servlet/ServletContainerInitializer.html

回忆在 Tomcat 的 release 版本中 tomcat7-websocket.jar 被放置于 {CATALINA_BASE}/lib 目录下

tomcat 启动后会加载

能否实现一个 filter,该 filter 拦截 ws 请求 url,对这个 url 的请求增加特定请求头?

这么实现的话,得确认 filter 的执行顺序。我们知道

First, the matching filter mappings in the same order that these elements appear in the deployment descriptor.

Next, the matching filter mappings in the same order that these elements appear in the deployment descriptor.

那么非定义在描述符中的 filter 呢?

to be cont.

解决方案

实验时发现可行,所以可以认为描述符中的 filter 先于 WsFilter 执行

参考

https://stackoverflow.com/questions/17086712/servlet-filters-order-of-execution

神奇的 http-proxy-middleware

Problem: websocket hangs on pending status

遇到的问题,本地开发调试时,websocket 一直处于 pending 状态。查看 proxy 的 debug 信息,一直没有 get 请求发送,网上漫天搜索 issue / stackoverflow 无果,遂强行不懂装懂看代码

当然有 issue 是直接关联的,因一开始完全没经验,并未注意到实际上就是该 issue 导致的 ^_^

分析

团队开发 Console 使用的 dev tool 为

  • webpakc-dev-server 2.2.0 (WDS)

本地开发调试依赖 WDS,该组件集成了 proxy 的功能,可以将本地的请求转发至实际的后端。例如如下的 proxy 配置

1
2
3
4
5
6
7
8
9
10
11
devServer: {
port: 8888,
proxy: {
"/api": {
target: "http://127.0.0.1:50545",
changeOrigin: true,
secure: false,
logLevel: "debug"
},
}
}

按上述配置,本地的 http://localhost:8888/api… 请求会被转发至 http://127.0.0.1:50545/api...。当然在实际开发调试过程中,被转发至的地址一般为后台 API 接口地址,或者是后台代理服务器地址,这样也就实现了本地 Console 开发与后端分离

websocket 协议的 proxy 需要打开 ws option,参考如下配置

1
2
3
4
5
6
7
8
9
10
11
12
devServer: {
port: 8888,
proxy: {
"/api": {
target: "http://127.0.0.1:50545",
changeOrigin: true,
secure: false,
logLevel: "debug",
ws: true // proxy websocket
}
}
}

WDS 在启动时,会使用 express https://expressjs.com/en/4x/api.html 启动一个 web-server

express 是一个 web framework,类似 java 里的 struts,粗略来看可以定义路由被哪个 middleware 处理,以及处理逻辑

继续回到 WDS 启动时,如果 WDS 定义了 proxy 配置,则监听所有路由,将路由的处理逻辑交给 proxyMiddleware 负责

[https://github.com/webpack/webpack-dev-server/blob/v2.2.0/lib/Server.js#L196-L228)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
options.proxy.forEach(function(proxyConfigOrCallback) {
let proxyConfig;
let proxyMiddleware;
if(typeof proxyConfigOrCallback === "function") {
proxyConfig = proxyConfigOrCallback();
} else {
proxyConfig = proxyConfigOrCallback;
}
proxyMiddleware = getProxyMiddleware(proxyConfig);
app.use(function(req, res, next) {
if(typeof proxyConfigOrCallback === "function") {
const newProxyConfig = proxyConfigOrCallback();
if(newProxyConfig !== proxyConfig) {
proxyConfig = newProxyConfig;
proxyMiddleware = getProxyMiddleware(proxyConfig);
}
}
const bypass = typeof proxyConfig.bypass === "function";
const bypassUrl = bypass && proxyConfig.bypass(req, res, proxyConfig) || false;
if(bypassUrl) {
req.url = bypassUrl;
next();
} else if(proxyMiddleware) {
// proxy request at here
return proxyMiddleware(req, res, next);
} else {
next();
}
});
});

实际开发时,proxy 规则往往不仅仅配置一条,可能类似如下存在多条配置,其中第三条是 websocket api 请求的 proxy 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
devServer: {
port: 8888,
proxy: {
"/api": {
target: "http://api.company.com:50545",
changeOrigin: true,
secure: false,
logLevel: "debug",
},
"/account": {
target: "http://iam.company.com",
changeOrigin: true,
secure: false,
logLevel: "debug",
}
"/exec": {
target: "http://api.company.com:50545",
changeOrigin: true,
secure: false,
logLevel: "debug",
ws: true, // proxy websocket
}
}
}

回顾之前所说 WDS 启动时,proxy 一旦配置,会将所有路由,逐一代理给 proxyMiddleware

更具体来说对于上述例子,每一条 proxy 规则会创建一个 proxyMiddleware,而所有路由都将按 key 的字典序,逐一代理给 proxyMiddleware

对于上述 proxy 配置的处理顺序为

  1. /account proxyMiddleware
  2. /api proxyMiddleware
  3. /exec proxyMiddleware

注意 app.use 中的 next 方法,在当前 proxyMiddleware 不处理该路由后,调用 next 交由下由 middleware 继续处理,有点类似 java servlet 里的 filter

在 WDS 中这个 proxyMiddleware 是使用 http-proxy-middleware 实现的,而 http-proxy-middleware 最终依赖 http-proxy

再继续探究 ws: true option 到底干了啥事儿,使得 WDS 可以 proxy websocket 请求?

答案在 https://github.com/chimurai/http-proxy-middleware/blob/v0.17.3/lib/index.js#L38-L50

可以看到这段代码,如果 proxy 中 ws: true,那么创建该 proxyMiddleware 时会调用 catchUpgradeRequest 方法

1
2
3
4
5
6
7
8
function catchUpgradeRequest(server) {
// subscribe once; don't subscribe on every request...
// https://github.com/chimurai/http-proxy-middleware/issues/113
if (!wsInitialized) {
server.on('upgrade', wsUpgradeDebounced);
wsInitialized = true;
}
}

在 catchUpgradeRequest 方法中,使用 server 对象监听 upgrade 事件,而 wsUpgradeDebounced 调用也很简单

debounce 直译为节流:即持续操作时,不会触发,停止一段时间后才触发。多用于用户连续输入时,停止一段时间后的回调

即使用 underscore 的 debounce 方法调用 handleUpgrade

1
2
3
4
5
6
7
8
9
function handleUpgrade(req, socket, head) {
// set to initialized when used externally
wsInitialized = true;
if (shouldProxy(config.context, req)) {
var activeProxyOptions = prepareProxyRequest(req);
proxy.ws(req, socket, head, activeProxyOptions);
logger.info('[HPM] Upgrading to WebSocket');
}
}

ok,看到这,基本的逻辑都明白了,全流程走一遍

原因

按 websocket 协议来说,第一个请求为 connect upgrade 的请求,即为 http get 请求 (当然与一般的 http get 请求不同,实际上并不能等同认为是一个 http get 请求),应能在 proxy debug 信息中看到这个 get 请求,而该 debug 信息是在 https://github.com/chimurai/http-proxy-middleware/blob/v0.17.3/lib/index.js#L40https://github.com/chimurai/http-proxy-middleware/blob/v0.17.3/lib/index.js#L66 处被打印,L40 这处的打印非 websocket 请求,L66为 websocket 请求

L66 之所以未被打印,是因为未进入 catchUpgradeRequest 方法,而未进入该方法的原因,是因为在配置多条 proxy 规则时,如果按字典序来看,例子中的 /exec 排在最后,而普通 http 请求已被其他 proxyMiddleware 处理,那么就不会调用 next 方法交由下一个 proxyMiddleware 处理,因此 /exec 只有在发起 websocket 请求时才会经过

而如果 https://github.com/chimurai/http-proxy-middleware/blob/v0.17.3/lib/index.js#L56 未被执行,即 http-server 若未监听 upgrade 请求,则 websocket 的 upgrade 请求一直不会被处理,因此出现了 pending 中的状态

另外 app.use 无法拦截到 connect upgrade 请求

所以需要一个 http request warm up websocket proxy https://github.com/chimurai/http-proxy-middleware/issues/207

解决方案

对于例子中的 proxy 来说,浏览器中输入一个 404 未被任一 proxyMiddleware 处理的路由即可,比如 http://localhost:8888/warmup,这样这个请求会经过所有 proxyMiddleware,在经过 websocket proxy 时触发 server listen on upgrade。后续 websocket 发起 connect 请求时,proxy debug 日志中就能看到 websocket http get 的输出,并且有 [HPM] Upgrading to WebSocket 的输出,websocket 本地开发时就能正常连接了

参考

https://github.com/expressjs/express/issues/2594

https://github.com/chimurai/http-proxy-middleware/issues/143

https://github.com/chimurai/http-proxy-middleware/issues/112

https://github.com/chimurai/http-proxy-middleware/issues/207

https://github.com/kubernetes-ui/container-terminal

最近遇到个意外的情况:在未知情况下,Chrome 浏览器会对部分 GET 请求缓存,即该请求的 SIZE 指示为 from disk cache,且 cache 返回的状态码为 410

查看 MDN 上对 HTTP 410 的解释为,Gone 服务器返回该状态码,指示该请求资源在服务器上已不可用,而且这个不可用可能是永久的。如果服务器不清楚该资源的丢失是临时的或者是永久的,那么应返回 404,即 Not Found

另外 410 response 是可被缓存的

考虑到实际我们项目中的开发流程,有 Dev / Alpha / Production 环境,各个环境的访问需要切换 proxy 访问,可能存在 CORS (Cross-Origin-Resource-Sharing) 问题,具体如

Alpha 环境域名为 A,因此若访问 Alpha 环境,则将 A domain 配置至主机 hosts 文件中,静态解析 A domain,使其对应 IP 为 Alpha 环境 IP

Production 环境域名也为 A,访问 Production 环境,可以直接公网访问

对于浏览器来说,访问 Alpha / Production 环境的最大不同,为 Remote Address 不同 (域名实际相同)

那么是否有可能为成功的请求访问的 Alpha 环境,而不成功的请求 (被 cache 410 的请求) 访问的为 Production 环境?

顺着这个可疑点开始搜索相关资料,了解到

Prefight request

客户端在发起 COR 请求时,会首先发起 prefight 请求,检查是否对端 Server 接受 COR 请求

client option request

1
2
3
4
OPTIONS /resource/foo 
Access-Control-Request-Method: DELETE
Access-Control-Request-Headers: origin, x-requested-with
Origin: https://foo.bar.org

if server accept this request then it will response a reponse body like

1
2
3
4
5
6
HTTP/1.1 200 OK
Content-Length: 0
Connection: keep-alive
Access-Control-Allow-Origin: https://foo.bar.org
Access-Control-Allow-Methods: POST, GET, OPTIONS, DELETE
Access-Control-Max-Age: 86400

好了,看到这,发现调查的方向有点偏了,切换 proxy 并不会导致 CORS 问题。切换代理之后,会导致后续请求由代理 A 切换至代理 B 转发,仅此而已

Can it be cache by accidently ?

查看代码后,发现后端部分请求在某一时段,的确返回了 410 错误,而 410 错误时,会返回一个 html 页面结果,是否 nginx 对这个结果,设置了有效的 cache-control 导致,浏览器缓存了发生该错误时的请求?

查看 Nginx 的文档后发现,Nginx add_header 仅在特定的 http status code 生效

1
Adds the specified field to a response header provided that the response code equals 200, 201 (1.3.10), 204, 206, 301, 302, 303, 304, 307 (1.1.16, 1.0.13), or 308 (1.13.0). The value can contain variables.

所以如果特定 http 请求,本应返回正常的 json 结构体,然而后台报错,抛出异常,而该异常又未被捕获,因此 http 请求最后获取到的是 tomcat 的 exception 页面,比如 410 的错误页面

又因为未指定默认的 cache 方式,因此该返回没有 cache 相关的 http header,因此全凭浏览器的启发式 cache 策略,意外将该错误的 http 请求返回结果缓存下来

为解决这个问题,可以在 conf/web.xml 中配置

1
2
3
4
5
6
7
8
9
<security-constraint>
<web-resource-collection>
<web-resource-name>HTTPSOnly</web-resource-name>
<url-pattern>/*</url-pattern>
</web-resource-collection>
<user-data-constraint>
<transport-guarantee>CONFIDENTIAL</transport-guarantee>
</user-data-constraint>
</security-constraint>

即可,这样默认每个请求的返回头都会加上

1
2
Cache-Control: private
Expires: Thu, 01 Jan 1970 00:00:00 GMT

至于为何后端会概率性返回 410,那又是另外一个问题了,后续有机会再说

回顾

因此问题是这样的,对于返回“正常”的请求 Nginx 设置了如下

1
2
3
4
5
6
7
Cache-Control: no-cache, no-store, must-revalidate
Pragma: no-cache
X-Content-Type-Options: nosniff
X-Download-Options: noopen
X-Frame-Options: SAMEORIGIN
X-XSS-Protection: 1; mode=block;
Strict-Transport-Security: max-age=31536000; includeSubdomains;

请求头

对于静态资源文件,如 .html/.css/.js 等,Nginx 使用了 Expires 指令

1
2
3
4
5
6
7
Cache-Control: max-age=604800
Expires: Sun, 27 May 2018 10:28:04 GMT
X-Content-Type-Options: nosniff
X-Download-Options: noopen
X-Frame-Options: SAMEORIGIN
X-XSS-Protection: 1; mode=block;
Strict-Transport-Security: max-age=31536000; includeSubdomains;

因此增加或修改了

1
2
Cache-Control: max-age=604800
Expires: Sun, 27 May 2018 10:28:04 GMT

返回请求头

对于非“正常”的请求,使用 Tomcat CONFIDENTIAL 配置,使其返回请求头中默认携带

1
2
Cache-Control: private
Expires: Thu, 01 Jan 1970 00:00:00 GMT

因此浏览器不会缓存错误的返回结果。当然这么配置之后,实际上是所有返回头均有上述字段,一般来说 Tomcat 前端会有 LB,最常见的如 Nginx,Nginx 对资源文件默认设置了 Expires,该指令会修改 Cache-Control / Expires,因此从通用的角度来说,足以解决缓存带来的各种烦人问题,又不至于太影响性能

参考

https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/410

https://developer.mozilla.org/en-US/docs/Glossary/cacheable

https://developer.mozilla.org/en-US/docs/Glossary/Preflight_request

https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS

https://lists.w3.org/Archives/Public/www-archive/2017Aug/0000.html

https://bugs.chromium.org/p/chromium/issues/detail?id=260239

https://stackoverflow.com/questions/21829553/tomcat-security-constraint-impact-cache

v1.7.16

the workflow of the scheduler in kubernetes,开门见山的说,所谓云主机管理也好,网络流量管理也好,总得有个调度员,控制虚拟机实际被运行于哪台物理机中,管理网络流量的走向等等,那么今天看的 scheduler 组件即为 k8s 中的 Pod 调度器组件

概括的说,scheduler 发现有需要调度的 Pod 时,使用注册好的各种策略,进行备选节点筛选及排序,最后选出pod 应被运行于的节点,即完成了其任务

overview of scheduler

从入口看起,so that is
plugin/pkg/scheduler/scheduler.go

1
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)

即无限执行 sched.scheduleOne,那么 sched.scheduleOne 又干了啥,主要的逻辑如下

  • 获取待调度的 Pod (sched.config.NextPod())
  • 获取该 Pod 被调度到的节点 (sched.schedule(pod))
  • 并发 bind the pod to its host

schedule of scheduler

plugin/pkg/scheduler/core/generic_scheduler.go

默认 schedule 的实现在 generic_scheduler.go 中,实现了调度接口方法 Schedule
Schedule 的方法又完成了下述两个过程

  • predicates
  • prioritizing

predicates,即强制的过滤策略,使用 predicate 过滤出符合条件的节点

prioritizing,即基于优先级的优选策略,给节点打分,选择得分高的节点

得分排序函数实现

plugin/scheduler/api/types.go

1
2
3
4
5
6
func (h HostPriorityList) Less(i, j int) bool {
if h[i].Score == h[j].Score {
return h[i].Host < h[j].Host
}
return h[i].Score < h[j].Score
}

即 Score 升序排列,在 Score 相等时,host (节点名称) 的字典序在前的优先。得分相同的节点,有 lastNodeIndex round-robin 的方式选择节点

plugin/pkg/scheduler/core/generic_scheduler.go

1
2
3
4
5
firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })
g.lastNodeIndexLock.Lock()
ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
g.lastNodeIndex++
g.lastNodeIndexLock.Unlock()

F.A.Q of scheduler

  • scheduler 调度的单位是?

Pod

  • 什么状态的 Pod 会被调度?

待调度的 Pod 会从 podQueue 中被 pop 出,作为 NextPod() 的返回

  • podQueue 什么时候添加 pod?

scheduler 的 config 中使用了 podInformer,仅关注未被 assigned 和非 Succeeded 或者 Failed 的 Pod

plugin/pkg/scheduler/factory/factory.go

1
2
3
4
5
6
7
8
9
10
// unassignedNonTerminatedPod selects pods that are unassigned and non-terminal.
func unassignedNonTerminatedPod(pod *v1.Pod) bool {
if len(pod.Spec.NodeName) != 0 {
return false
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
return false
}
return true
}

scheduler 为 podInformer 提供了 Pod add/update/delete 操作 podQueue 的方法,因此是在此处更新的 podQueue (add 和 update 逻辑实际相同)。另外 podQueue 内部有去重,如果是相同的 pod,则不再入队

plugin/pkg/scheduler/factory/factory.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
...
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.podQueue.Add(obj);
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.podQueue.Update(newObj);
},
DeleteFunc: func(obj interface{}) {
c.podQueue.Delete(obj);
},
},
},
)

注意 podInformer 还用于维护 scheduler 的 cache

向节点中增加 Pod

plugin/pkg/scheduler/schedulercache/cache.go

1
2
3
4
5
6
7
8
func (cache *schedulerCache) addPod(pod *v1.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
}

计算节点资源

plugin/pkg/scheduler/schedulercache/node_info.go

1
2
3
4
5
6
7
8
9
10
11
// addPod adds pod information to this NodeInfo.
func (n *NodeInfo) addPod(pod *v1.Pod) {
res, non0_cpu, non0_mem := calculateResource(pod)
n.requestedResource.MilliCPU += res.MilliCPU
n.requestedResource.Memory += res.Memory

...
// Consume ports when pods added.
n.updateUsedPorts(pod, true)
n.generation++
}
  • scheduler 的 node 从哪里获取?

套路与 Pod 一致,scheduler 使用了 nodeInformer,add/update/delete 操作 node cache,于是 scheduler 可见所有可使用的 node 资源

plugin/pkg/scheduler/factory/factory.go

1
2
3
4
5
6
7
8
9
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,
UpdateFunc: c.updateNodeInCache,
DeleteFunc: c.deleteNodeFromCache,
},
0,
)
  • scheduler 如何调度 pod 的?
  1. 从 nodeList (nodeInformer 中来) 获取 nodes
  2. Computing predicates
  3. Prioritizing
  4. Selecting host (按得分排序,相同得分的 round-robin)
  • predicates 有哪些?

重要的如

PodFitsResources

计算当前 node 的资源是否能满足 Pod Request,注意 init-container 是串行运行的,因此其所需要的资源,取各个资源维度的最大值,而其他正常的 container 为并行运行的,因此其所需要的资源,取各个资源维度的总和,最后一个 pod 所需要的资源,为 init-container 的最大值与正常 container 的资源总和的较大值

plugin/pkg/scheduler/algorithm/predicates/predicates.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Returns a *schedulercache.Resource that covers the largest width in each
// resource dimension. Because init-containers run sequentially, we collect the
// max in each dimension iteratively. In contrast, we sum the resource vectors
// for regular containers since they run simultaneously.
//
// Example:
//
// Pod:
// InitContainers
// IC1:
// CPU: 2
// Memory: 1G
// IC2:
// CPU: 2
// Memory: 3G
// Containers
// C1:
// CPU: 2
// Memory: 1G
// C2:
// CPU: 1
// Memory: 1G
//
// Result: CPU: 3, Memory: 3G

PodMatchNodeSelector

即 pod 只能被调度至 pod.Spec.NodeSelector 指定的节点上

PodFitsHost

即 pod 只能调度至 pod.Spec.NodeName 的节点上

InterPodAffinityMatches

  1. 检查当前 pod 如被调度到节点上,是否会破坏当前节点上的 pod 的反亲和性

  2. 检查当前 pod 如被调度到节点上,是否满足亲和性及反亲和性

CheckNodeMemoryPressurePredicate

当 pod 的 QoS 为 BestEffort 时 (即没一个 container 设置 resource request/limit 时),需检查当前 node 是否有内存压力

CheckNodeDiskPressurePredicate

检查 node 是否有磁盘压力

等其他 predicates

  • prioritizing 有哪些?

即优选策略,尽可能的将 pod 部署到不同的 zone 不同的 node,平衡 node 的资源使用等

CalculateSpreadPriority

plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go

1
2
3
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
// When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.

即 soft-anti-affinity

BalancedResourceAllocationMap

平衡节点资源分配

值得注意的是 prioritizing 返回的均为 HostPriority,当前集群的所有节点会组成 HostPriorityList,可以形象的理解为,经过所有的 prioritizing 后,可以绘制出 node 的条形图,条形即为每个节点的得分,scheduler 最终会推荐得分最高的 node 给 pod

等其他 prioritizing

  • binding ?
  1. 调用 apiserver 接口发送 post binding 请求 sched.config.Binder.Bind(b)

  2. binding 发送之后,调用 sched.config.SchedulerCache.FinishBinding(assumed)

FinishBinding 将 pod 信息附带 ttl 记入 cache,ttl 过期后,从 cache 中删除

为啥在 binding 结束后,还需要如此大费周折的维护 binding 的 ttl cache ?有什么意义呢?

当然有意义,回过开头去看,我们在探寻 podQueue 的 pod 在何处加入时,发现 scheduler 使用了 podInformer,当 podInformer 获得未被调度的 pod 时将这些 pod 加入 podQueue 等待调度

而另外一处 podInformer 则是设置已被调度的 pod 的 add/update/delete 的事件回调,用来同步 cache,若发现 assumePod 已被调度,则从 cache 中删除,又或者 assumePod 已过 ttl 被 cache 删除,则重新 cache.addPod

plugin/pkg/scheduler/factory/factory.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// scheduled pod cache
podInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
...
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: c.addPodToCache,
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
},
)

that’s it

可见 scheduler 基于 informer,实时关注 pod 和 node 状态,获取到待调度的 pod 后,根据 predicate 和 prioritizing 策略,为 pod 选出合适的 node,最后并发完成 pod 和 node 的 binding,完成一次调度过程

更详细来说的话 (可能有误,欢迎大家交流斧正)

part of kube-scheduler

当有新的 pod 创建时,the podInformer of scheduler 一方面过滤未被调度且非 Succeed/Failed 状态的 pod, 触发 add 事件,scheduler 将其加入 podQueue 中等待调度,schedulerOne 方法循环执行,每次从 podQueue 中取出一个 pod,根据 predicates / priorities 策略选出 suggested host,assume 该 pod 被调度到 suggested host 上,更新 pod.Spec.NodeName 字段 (仅为后续 addPod,并不影响实际 etcd 中的 pod 对象),随后开始并发 (goroutine) binding,即调用 kube-apiserver api post a binding RPC,随后 finishingBinding,在 cache 中记录该 pod

cache 会定时扫描其中的 assumedPods 信息,若 pod 被 assumed 且超过了 ttl,则删除该 pod (该 pod 所占用的节点资源也被释放)

the podInformer of scheduler 另一方面过滤已被调度(pod.Spec.NodeName 非空)且非 Succeed/Failed 状态的 pod,触发 add/update/delete 事件等,观察到 assumedPod 被调度后,即从 cache 中删除该 pod

part of kube-apiserver

kube-apiserver 在接收到创建 binding 对象请求后,执行 assignPod 方法,最后在 setPodHostAndAnnotations 方法中,将 pod.Spec.NodeName 写入 etcd 中

pkg/pod/registry/core/pod/storage/storage.go

1
2
3
4
5
6
7
8
9
10
11
// Create ensures a pod is bound to a specific host.
func (r *BindingREST) Create(ctx genericapirequest.Context, obj runtime.Object, includeUninitialized bool) (out runtime.Object, err error) {
binding := obj.(*api.Binding)
// TODO: move me to a binding strategy
if errs := validation.ValidatePodBinding(binding); len(errs) != 0 {
return nil, errs.ToAggregate()
}
err = r.assignPod(ctx, binding.Name, binding.Target.Name, binding.Annotations)
out = &metav1.Status{Status: metav1.StatusSuccess}
return
}

part of kubelet

kubelet 启动时,list-watch apiserver

pkg/kubelet/config/apiserver.go

1
2
3
4
5
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}

kubelet 使用 selector list-watch apiserver,这个 selector 即为 api.PodHostField=nodeName pod.Spec.NodeName=nodeName。kubelet list-watch 被调度本节点上的 pod,当触发 add/update/delete 后做相应的操作