apiVersion:serving.knative.dev/v1alpha1# Current version of Knative kind:Service metadata: name:helloworld-go# The name of the app namespace:default# The namespace the app will use spec: runLatest: configuration: revisionTemplate: spec: container: image:gcr.io/knative-samples/helloworld-go# The URL to the image of the app env: -name:TARGET# The environment variable printed out by the sample app value:"Go Sample v1"
services 的 reconcile 首先会查询
service 对应的 configuration (its name is the same with service-name) 是否存在
不存在,创建之
存在,reconcile 之
service 对应的 routes (its name is the same with service-name) 是否存在
list-watch from kube-apiserver in a independant goroutine, once there is events about pods, then these pod data will be put into configCh
syncLoopIteration -> handle the pod data from list-watch from kube-apiserver pod resource -> HandlePodAdditions/… -> dispatchWork -> podWorkers.UpdatePod
UpdatePod
如果之前未处理该 pod,则为该 pod 创建一个大小为 1 的 UpdatePodOptions channel,并启动一个协程调用 managePodLoop(podUpdates)
// This is a blocking call that would return only if the cache // has an entry for the pod that is newer than minRuntimeCache // Time. This ensures the worker doesn't start syncing until // after the cache is at least newer than the finished time of // the previous sync. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now()
syncPodFn(kubelet.syncPod)
mkdir dir
1
/var/lib/kubelet/pods/[podUID]
volumeManager.WaitForAttachAndMount(pod)
获取 imagePullSecrets
1 2
// Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod)
containerRuntime.SyncPod
1 2
// Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
createPodSandBox
mkdir logs dir
1
/var/log/pods/[podUID]
run init container
run container
pleg
relist 的时候,先从 docker 获取一把全量的 pod 数据
1 2
// Get all the pods. podList, err := g.runtime.GetPods(true)
当前状态与之前的状态一比,生成每个 container 的 PLE (pod life cycle event)
// if a request to kill a pod is pending, we do not let anything overwrite that request. update, found := p.lastUndeliveredWorkUpdate[pod.UID] if !found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options }
即更新 lastUndeliveredWorkUpdate
HandlePodSyncs 执行结束之后(同步)
如果容器挂掉,则执行清理动作
1 2 3 4 5
if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } }
默认配置,只保留最新一个。若 pod 被 evicted,或者是 DeletionTimestamp != nil && notRunning(apiPodStatus.ContainerStatuses),那么它的所有容器将会被删除
// Create WebSocket connection. const socket = new WebSocket('wss://remoteAddress:443'); // Connection opened socket.addEventListener('open', function (event) { socket.send('Hello Server!'); }); // Listen for messages socket.addEventListener('message', function (event) { console.log('Message from server ', event.data); });
// Switch to WebSocket WsRemoteEndpointImplClient wsRemoteEndpointClient = new WsRemoteEndpointImplClient(channel); WsSession wsSession = new WsSession(endpoint, wsRemoteEndpointClient, this, null, null, null, null, null, extensionsAgreed, subProtocol, Collections.<String,String>emptyMap(), secure, clientEndpointConfiguration); WsFrameClient wsFrameClient = new WsFrameClient(response, channel, wsSession, transformation); // WsFrame adds the necessary final transformations. Copy the // completed transformation chain to the remote end point. wsRemoteEndpointClient.setTransformation(wsFrameClient.getTransformation()); endpoint.onOpen(wsSession, clientEndpointConfiguration); registerSession(endpoint, wsSession); /* It is possible that the server sent one or more messages as soon as * the WebSocket connection was established. Depending on the exact * timing of when those messages were sent they could be sat in the * input buffer waiting to be read and will not trigger a "data * available to read" event. Therefore, it is necessary to process the * input buffer here. Note that this happens on the current thread which * means that this thread will be used for any onMessage notifications. * This is a special case. Subsequent "data available to read" events * will be handled by threads from the AsyncChannelGroup's executor. */ wsFrameClient.startInputProcessing();
do { // ... elseif (processor.isUpgrade()) { // Init WebSocket OPEN_READ processor httpUpgradeHandler is null // And when the second round came, upgradeDispatch will dispatch OPEN_READ status // ha, upgradeServletInputStream.onDataAvailable() will be called state = processor.upgradeDispatch(status); // but, pay attention to that fact // WsFrameServer.onDataAvailable is a for loop so it will remain in it // that's the reason why we can't see the state of Upgraded return } else { // Init WebSocket OPEN_READ will go through here state = processor.process(wrapper); // Finally adapter.service(request, response) will be called in processor.process // then the request will go through filter // Once it arrives the WsFilter // and it will call upgrade method with a WsHttpUpgradeHandler in WsFilter // That will instance a WsHttpUpgradeHandler and call action Upgrade hook // That's the time httpUpgradeHandler be set to WsHttpUpgradeHandler and no more null value // So isUpgrade() return true and state will become // SocketState.UPGRADING // And it will create a new processor named upgradeProcessor to replace previous processor // in the state of SocketState.UPGRADING in following code } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + wrapper + "], Status in: [" + status + "], State out: [" + state + "]"); // it means that if it is a WebSocket Request // it turns out to be once debug info // Status in: OPEN_READ, State out: UPGRADING } } while (state == SocketState.UPGRADING); // ... if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler HttpUpgradeHandler httpUpgradeHandler = processor.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(wrapper, processor, false, false); // Create the upgrade processor processor = createUpgradeProcessor( wrapper, httpUpgradeHandler); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. httpUpgradeHandler.init((WebConnection) processor); }
所以 Tomcat 7 WebSocket 实现上的时序是正确的,大致的请求处理流程如下
Client 与 OS 特定端口建立 Connection …
Http11Protocol.process 被调用,传入 OPEN_READ status
processor.process(wrapper) 被调用
WsFilter 被调用,发现为 HTTP Upgrade to websocket request,设置 httpUpgradeHandler 为 WsHttpUpgradeHandler
Endpoint ep; try { Class<?> clazz = sec.getEndpointClass(); if (Endpoint.class.isAssignableFrom(clazz)) { ep = (Endpoint) sec.getConfigurator().getEndpointInstance( clazz); } else { ep = new PojoEndpointServer(); // Need to make path params available to POJO perSessionServerEndpointConfig.getUserProperties().put( PojoEndpointServer.POJO_PATH_PARAM_KEY, pathParams); } } catch (InstantiationException e) { thrownew ServletException(e); }
@Override publicfinal SocketState upgradeDispatch(SocketStatus status) throws IOException { if (status == SocketStatus.OPEN_READ) { try { upgradeServletInputStream.onDataAvailable(); } catch (IOException ioe) { // The error handling within the ServletInputStream should have // marked the stream for closure which will get picked up below, // triggering the clean-up of this processor. getLog().debug(sm.getString("abstractProcessor.onDataAvailableFail"), ioe); } } elseif (status == SocketStatus.OPEN_WRITE) { try { upgradeServletOutputStream.onWritePossible(); } catch (IOException ioe) { // The error handling within the ServletOutputStream should have // marked the stream for closure which will get picked up below, // triggering the clean-up of this processor. getLog().debug(sm.getString("abstractProcessor.onWritePossibleFail"), ioe); } } elseif (status == SocketStatus.STOP) { try { upgradeServletInputStream.close(); } catch (IOException ioe) { getLog().debug(sm.getString( "abstractProcessor.isCloseFail", ioe)); } try { upgradeServletOutputStream.close(); } catch (IOException ioe) { getLog().debug(sm.getString( "abstractProcessor.osCloseFail", ioe)); } return SocketState.CLOSED; } else { // Unexpected state return SocketState.CLOSED; } if (upgradeServletInputStream.isCloseRequired() || upgradeServletOutputStream.isCloseRequired()) { return SocketState.CLOSED; } return SocketState.UPGRADED; }
This method is called once the request/response pair where the upgrade is initiated has completed processing and is the point where control of the connection passes from the container to the HttpUpgradeHandler.
> cd output/build/bin > ./version.sh Using CATALINA_BASE: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build Using CATALINA_HOME: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build Using CATALINA_TMPDIR: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/temp Using JRE_HOME: /Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home Using CLASSPATH: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/bootstrap.jar:/Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/tomcat-juli.jar Server version: Apache Tomcat/7.0.88 Server built: Jul 6 2018 14:30:23 UTC Server number: 7.0.88.0 OS Name: Mac OS X OS Version: 10.12.6 Architecture: x86_64 JVM Version: 1.6.0_65-b14-468 JVM Vendor: Apple Inc.