// Create WebSocket connection. const socket = new WebSocket('wss://remoteAddress:443'); // Connection opened socket.addEventListener('open', function (event) { socket.send('Hello Server!'); }); // Listen for messages socket.addEventListener('message', function (event) { console.log('Message from server ', event.data); });
// Switch to WebSocket WsRemoteEndpointImplClient wsRemoteEndpointClient = new WsRemoteEndpointImplClient(channel); WsSession wsSession = new WsSession(endpoint, wsRemoteEndpointClient, this, null, null, null, null, null, extensionsAgreed, subProtocol, Collections.<String,String>emptyMap(), secure, clientEndpointConfiguration); WsFrameClient wsFrameClient = new WsFrameClient(response, channel, wsSession, transformation); // WsFrame adds the necessary final transformations. Copy the // completed transformation chain to the remote end point. wsRemoteEndpointClient.setTransformation(wsFrameClient.getTransformation()); endpoint.onOpen(wsSession, clientEndpointConfiguration); registerSession(endpoint, wsSession); /* It is possible that the server sent one or more messages as soon as * the WebSocket connection was established. Depending on the exact * timing of when those messages were sent they could be sat in the * input buffer waiting to be read and will not trigger a "data * available to read" event. Therefore, it is necessary to process the * input buffer here. Note that this happens on the current thread which * means that this thread will be used for any onMessage notifications. * This is a special case. Subsequent "data available to read" events * will be handled by threads from the AsyncChannelGroup's executor. */ wsFrameClient.startInputProcessing();
do { // ... elseif (processor.isUpgrade()) { // Init WebSocket OPEN_READ processor httpUpgradeHandler is null // And when the second round came, upgradeDispatch will dispatch OPEN_READ status // ha, upgradeServletInputStream.onDataAvailable() will be called state = processor.upgradeDispatch(status); // but, pay attention to that fact // WsFrameServer.onDataAvailable is a for loop so it will remain in it // that's the reason why we can't see the state of Upgraded return } else { // Init WebSocket OPEN_READ will go through here state = processor.process(wrapper); // Finally adapter.service(request, response) will be called in processor.process // then the request will go through filter // Once it arrives the WsFilter // and it will call upgrade method with a WsHttpUpgradeHandler in WsFilter // That will instance a WsHttpUpgradeHandler and call action Upgrade hook // That's the time httpUpgradeHandler be set to WsHttpUpgradeHandler and no more null value // So isUpgrade() return true and state will become // SocketState.UPGRADING // And it will create a new processor named upgradeProcessor to replace previous processor // in the state of SocketState.UPGRADING in following code } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + wrapper + "], Status in: [" + status + "], State out: [" + state + "]"); // it means that if it is a WebSocket Request // it turns out to be once debug info // Status in: OPEN_READ, State out: UPGRADING } } while (state == SocketState.UPGRADING); // ... if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler HttpUpgradeHandler httpUpgradeHandler = processor.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(wrapper, processor, false, false); // Create the upgrade processor processor = createUpgradeProcessor( wrapper, httpUpgradeHandler); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. httpUpgradeHandler.init((WebConnection) processor); }
所以 Tomcat 7 WebSocket 实现上的时序是正确的,大致的请求处理流程如下
Client 与 OS 特定端口建立 Connection …
Http11Protocol.process 被调用,传入 OPEN_READ status
processor.process(wrapper) 被调用
WsFilter 被调用,发现为 HTTP Upgrade to websocket request,设置 httpUpgradeHandler 为 WsHttpUpgradeHandler
Endpoint ep; try { Class<?> clazz = sec.getEndpointClass(); if (Endpoint.class.isAssignableFrom(clazz)) { ep = (Endpoint) sec.getConfigurator().getEndpointInstance( clazz); } else { ep = new PojoEndpointServer(); // Need to make path params available to POJO perSessionServerEndpointConfig.getUserProperties().put( PojoEndpointServer.POJO_PATH_PARAM_KEY, pathParams); } } catch (InstantiationException e) { thrownew ServletException(e); }
@Override publicfinal SocketState upgradeDispatch(SocketStatus status) throws IOException { if (status == SocketStatus.OPEN_READ) { try { upgradeServletInputStream.onDataAvailable(); } catch (IOException ioe) { // The error handling within the ServletInputStream should have // marked the stream for closure which will get picked up below, // triggering the clean-up of this processor. getLog().debug(sm.getString("abstractProcessor.onDataAvailableFail"), ioe); } } elseif (status == SocketStatus.OPEN_WRITE) { try { upgradeServletOutputStream.onWritePossible(); } catch (IOException ioe) { // The error handling within the ServletOutputStream should have // marked the stream for closure which will get picked up below, // triggering the clean-up of this processor. getLog().debug(sm.getString("abstractProcessor.onWritePossibleFail"), ioe); } } elseif (status == SocketStatus.STOP) { try { upgradeServletInputStream.close(); } catch (IOException ioe) { getLog().debug(sm.getString( "abstractProcessor.isCloseFail", ioe)); } try { upgradeServletOutputStream.close(); } catch (IOException ioe) { getLog().debug(sm.getString( "abstractProcessor.osCloseFail", ioe)); } return SocketState.CLOSED; } else { // Unexpected state return SocketState.CLOSED; } if (upgradeServletInputStream.isCloseRequired() || upgradeServletOutputStream.isCloseRequired()) { return SocketState.CLOSED; } return SocketState.UPGRADED; }
This method is called once the request/response pair where the upgrade is initiated has completed processing and is the point where control of the connection passes from the container to the HttpUpgradeHandler.
> cd output/build/bin > ./version.sh Using CATALINA_BASE: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build Using CATALINA_HOME: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build Using CATALINA_TMPDIR: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/temp Using JRE_HOME: /Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home Using CLASSPATH: /Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/bootstrap.jar:/Users/zrss/Documents/Code/Java/apache-tomcat-7.0.88-src/output/build/bin/tomcat-juli.jar Server version: Apache Tomcat/7.0.88 Server built: Jul 6 2018 14:30:23 UTC Server number: 7.0.88.0 OS Name: Mac OS X OS Version: 10.12.6 Architecture: x86_64 JVM Version: 1.6.0_65-b14-468 JVM Vendor: Apple Inc.
// Get all the pods. podList, err := g.runtime.GetPods(true) ShouldContainerBeRestarted
// Check RestartPolicy for dead container if pod.Spec.RestartPolicy == v1.RestartPolicyNever { glog.V(4).Infof("Already ran container %q of pod %q, do nothing", container.Name, format.Pod(pod)) returnfalse } if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { // Check the exit code. if status.ExitCode == 0 { glog.V(4).Infof("Already successfully ran container %q of pod %q, do nothing", container.Name, format.Pod(pod)) returnfalse } } returntrue
minikube 下载完成 iso 后,再 bootstrap k8s cluster,cluster 起来之后,会启动一些 system 组件,比如 kube-addon-manager-minikube/ dashboard / dns 等,然而不幸的是这些 pod 会一直处于 ContainerCreating 的状态
查看 events
1
kubectl describe po kube-addon-manager-minikube -nkube-system
minikube cache list gcr.io/google-containers/kube-addon-manager:v6.5 gcr.io/google_containers/pause-amd64:3.0 gcr.io/k8s-minikube/storage-provisioner:v1.8.1 k8s.gcr.io/k8s-dns-dnsmasq-nanny-amd64:1.14.5 k8s.gcr.io/k8s-dns-kube-dns-amd64:1.14.5 k8s.gcr.io/k8s-dns-sidecar-amd64:1.14.5 k8s.gcr.io/kubernetes-dashboard-amd64:v1.8.1
cache 这些 image 之后,就可以使得 kube-system 下面的 pod 都 running 了
minikube logs healthcheck error
使用 minikube 过程中发现其 logs 中一直有如下错误日志
1
Jun 23 18:15:15 minikube localkube[3034]: E0623 18:15:15.392453 3034 healthcheck.go:317] Failed to start node healthz on 0: listen tcp: address 0: missing port in address
if volume.Spec.ClaimRef == nil { returnfalse } if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace { returnfalse } if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID { returnfalse }
volume 和 claim binding 时,发现 volume 与 claim 的字段不匹配
1 2 3 4 5
// Check if the claim was already bound (either by controller or by user) shouldBind := false if volume.Name != claim.Spec.VolumeName { shouldBind = true }
functionhandleUpgrade(req, socket, head) { // set to initialized when used externally wsInitialized = true; if (shouldProxy(config.context, req)) { var activeProxyOptions = prepareProxyRequest(req); proxy.ws(req, socket, head, activeProxyOptions); logger.info('[HPM] Upgrading to WebSocket'); } }
查看 Nginx 的文档后发现,Nginx add_header 仅在特定的 http status code 生效
1
Adds the specified field to a response header provided that the response code equals 200, 201 (1.3.10), 204, 206, 301, 302, 303, 304, 307 (1.1.16, 1.0.13), or 308 (1.13.0). The value can contain variables.
func(cache *schedulerCache)addPod(pod *v1.Pod) { n, ok := cache.nodes[pod.Spec.NodeName] if !ok { n = NewNodeInfo() cache.nodes[pod.Spec.NodeName] = n } n.addPod(pod) }
计算节点资源
plugin/pkg/scheduler/schedulercache/node_info.go
1 2 3 4 5 6 7 8 9 10 11
// addPod adds pod information to this NodeInfo. func(n *NodeInfo)addPod(pod *v1.Pod) { res, non0_cpu, non0_mem := calculateResource(pod) n.requestedResource.MilliCPU += res.MilliCPU n.requestedResource.Memory += res.Memory ... // Consume ports when pods added. n.updateUsedPorts(pod, true) n.generation++ }
// Only nodes in the "Ready" condition with status == "True" are schedulable nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: c.addNodeToCache, UpdateFunc: c.updateNodeInCache, DeleteFunc: c.deleteNodeFromCache, }, 0, )
scheduler 如何调度 pod 的?
从 nodeList (nodeInformer 中来) 获取 nodes
Computing predicates
Prioritizing
Selecting host (按得分排序,相同得分的 round-robin)
predicates 有哪些?
重要的如
PodFitsResources
计算当前 node 的资源是否能满足 Pod Request,注意 init-container 是串行运行的,因此其所需要的资源,取各个资源维度的最大值,而其他正常的 container 为并行运行的,因此其所需要的资源,取各个资源维度的总和,最后一个 pod 所需要的资源,为 init-container 的最大值与正常 container 的资源总和的较大值
// Returns a *schedulercache.Resource that covers the largest width in each // resource dimension. Because init-containers run sequentially, we collect the // max in each dimension iteratively. In contrast, we sum the resource vectors // for regular containers since they run simultaneously. // // Example: // // Pod: // InitContainers // IC1: // CPU: 2 // Memory: 1G // IC2: // CPU: 2 // Memory: 3G // Containers // C1: // CPU: 2 // Memory: 1G // C2: // CPU: 1 // Memory: 1G // // Result: CPU: 3, Memory: 3G
// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller. // When a pod is scheduled, it looks for services, RCs or RSs that match the pod, then finds existing pods that match those selectors. // It favors nodes that have fewer existing matching pods.