看完《kubernetes权威指南》和《Go程序设计语言》两本书之后,终于可以进入实际的代码分析阶段,根据之前熟悉其他开源项目源码(如libvirt、OpenStack等)的经验,首先从接口/API开始分析。
k8s开发环境搭建:使用kubeasz快速搭建k8s集群all-in-one开发测试环境
Golang调试环境搭建:kubernetes源码调试体验
k8s源码版本:v1.10
分析API:GET 127.0.0.1:8080/api/v1/nodes/10.0.90.22(10.0.90.22是我环境中的一个node,8080为kubectl proxy API代理端口)
分析目标:搞清楚用户发送请求到这个API之后kube-apiserver的处理流程
参考资料:http://www.wklken.me/posts/2017/09/23/source-apiserver-04.html(还有前面几篇,这里面讲的是老版本的,有很多代码已经改了,但是主要流程值得参考)
源码流程很绕(至少目前我是这么认为,可能是因为我刚开始看源码),需要多看,多动手调试,看很多遍,调很多遍,整天琢磨它,应该都能看明白。
route注册({path} to {handler})
要搞清楚route注册流程,就必须先把go-restful框架的用法搞明白,官方Readme文档有说明,也有示例代码。这里给出上面提到的参考资料:http://www.wklken.me/posts/2017/09/23/source-apiserver-01.html
我们只需要记住,go-restful框架中route注册需要经过如下几个步骤:
- 创建一个container(默认使用default)
- 创建一个web service
- 创建handler(也就是API的请求处理函数)
- 把API path和handler绑定到web service
- 把web service(可多个,但root Path不能相同)绑定到container
- 启动包含container(可多个)的http server
我这边试验的一个简单的示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
|
// https://github.com/emicklei/go-restful/blob/master/examples/restful-multi-containers.go // GET http://localhost:8080/hello // GET http://localhost:8081/hello package main import ( "github.com/emicklei/go-restful" "io" "log" "net/http" ) func main() { // add two web services to default container ws0 := new(restful.WebService) ws0.Path("/hello0") // 这里/hello0就是ws0的root Path ws0.Route(ws0.GET("/").To(hello0)) // curl 127.0.0.1:9080/hello0 restful.Add(ws0) ws1 := new(restful.WebService) ws1.Path("/hello1") ws1.Route(ws1.GET("/").To(hello1)) // curl 127.0.0.1:9080/hello1 restful.Add(ws1) go func() { log.Fatal(http.ListenAndServe(":9080", nil)) }() // container 2 container2 := restful.NewContainer() ws2 := new(restful.WebService) ws2.Route(ws2.GET("/hello2").To(hello2)) // curl 127.0.0.1:9081/hello2 container2.Add(ws2) server := &http.Server{Addr: ":9081", Handler: container2} log.Fatal(server.ListenAndServe()) } func hello0(req *restful.Request, resp *restful.Response) { io.WriteString(resp, "default world 0") } func hello1(req *restful.Request, resp *restful.Response) { io.WriteString(resp, "default world 1") } func hello2(req *restful.Request, resp *restful.Response) { io.WriteString(resp, "second world") } |
我们下面所有的流程都以这个为基础进行分析。
kube-apiserver的main入口:
|
func main() { rand.Seed(time.Now().UTC().UnixNano()) command := app.NewAPIServerCommand() ...... if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } } |
这里用到了github.com/spf13/cobra这个包来解析启动参数并启动可执行程序。
具体注册流程就不一步一步的分析了,直接根据断点的bt输出跟着代码查看吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
(dlv) b k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:96 (dlv) r (dlv) c (dlv) bt 0 0x0000000003f1373b in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:96 1 0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369 2 0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328 3 0x00000000040a2eae in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateKubeAPIServer at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:218 4 0x00000000040a2681 in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateServerChain at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:168 5 0x00000000040a1e51 in k8s.io/kubernetes/cmd/kube-apiserver/app.Run at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:137 6 0x00000000040acd57 in k8s.io/kubernetes/cmd/kube-apiserver/app.NewAPIServerCommand.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:121 7 0x0000000003c5c4e8 in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).execute at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:757 8 0x0000000003c5cf76 in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).ExecuteC at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:843 9 0x0000000003c5c7cf in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).Execute at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:791 10 0x00000000040b16e5 in main.main at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/apiserver.go:51 |
NewLegacyRESTStorage这个方法(注意是LegacyRESTStorageProvider类型的方法),返回了3个参数,restStorage, apiGroupInfo, nil,最后一个是错误信息可忽略,第一个对我们流程分析没啥影响(应该是),中间这个apiGroupInfo是重点,InstallLegacyAPIGroup就是注册/api这个path的,apiPrefix这个参数是
DefaultLegacyAPIPrefix = "/api" ,apiGroupInfo.PrioritizedVersions目前就”v1″一个版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
//InstallLegacyAPI方法调用这个 func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error { if !s.legacyAPIGroupPrefixes.Has(apiPrefix) { return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List()) } if err := s.installAPIResources(apiPrefix, apiGroupInfo); err != nil { return err } // setup discovery apiVersions := []string{} for _, groupVersion := range apiGroupInfo.PrioritizedVersions { apiVersions = append(apiVersions, groupVersion.Version) } // Install the version handler. // Add a handler at /<apiPrefix> to enumerate the supported api versions. // 这步比较简单,就不介绍了,是用来注册GET /api这个path的,用来返回所有api版本, // handler是k8s.io/apiserver/pkg/endpoints/discovery/legacy.go:handle方法 s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix, apiVersions).WebService()) return nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error { for _, groupVersion := range apiGroupInfo.PrioritizedVersions {//只有v1 if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 { glog.Warningf("Skipping API %v because it has no resources.", groupVersion) continue } //注意这里,数据封装过程 apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix) if apiGroupInfo.OptionsExternalVersion != nil { apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion } if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil { return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err) } } return nil } |
注意这里的getAPIGroupVersion方法,它把apiGroupInfo封装到了apiGroupVersion结构体里面,具体是apiGroupVersion.Storage,下面会用到:
|
func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion { storage := make(map[string]rest.Storage) // 遍历restStorageMap加入到storage,从数据类型可以确认,类型都是map[string]rest.Storage for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {//groupVersion.Version就是v1 storage[strings.ToLower(k)] = v //这里k是NewLegacyRESTStorage里restStorageMap的key,v是它的value } version := s.newAPIGroupVersion(apiGroupInfo, groupVersion) version.Root = apiPrefix version.Storage = storage return version } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
...... restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.Binding, "podTemplates": podTemplateStorage, "replicationControllers": controllerStorage.Controller, "replicationControllers/status": controllerStorage.Status, "services": serviceRest, "services/proxy": serviceRestProxy, "services/status": serviceStatusStorage, "endpoints": endpointsStorage, "nodes": nodeStorage.Node, "nodes/status": nodeStorage.Status, "nodes/proxy": nodeStorage.Proxy, "events": eventStorage, "limitRanges": limitRangeStorage, "resourceQuotas": resourceQuotaStorage, "resourceQuotas/status": resourceQuotaStatusStorage, "namespaces": namespaceStorage, "namespaces/status": namespaceStatusStorage, "namespaces/finalize": namespaceFinalizeStorage, "secrets": secretStorage, "serviceAccounts": serviceAccountStorage, "persistentVolumes": persistentVolumeStorage, "persistentVolumes/status": persistentVolumeStatusStorage, "persistentVolumeClaims": persistentVolumeClaimStorage, "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage, "configMaps": configMapStorage, "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate), } ...... apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap ...... |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. func (g *APIGroupVersion) InstallREST(container *restful.Container) error { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, //g就是上面的apiGroupVersion,里面有Storage,g.Storage就是上面的restStorageMap prefix: prefix, minRequestTimeout: g.MinRequestTimeout, enableAPIResponseCompression: g.EnableAPIResponseCompression, } apiResources, ws, registrationErrors := installer.Install()//生成web service versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) //path和handler绑定到web service container.Add(ws) //把web service加入到container return utilerrors.NewAggregate(registrationErrors) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
(dlv) p installer *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.APIInstaller { group: *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.APIGroupVersion { Storage: map[string]k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Storage [...], Root: "/api", GroupVersion: (*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion)(0xc42047a5b8), OptionsExternalVersion: *k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion nil, MetaGroupVersion: *k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion nil, Mapper: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.RESTMapper(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.DefaultRESTMapper) ..., Serializer: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.NegotiatedSerializer(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/serializer.CodecFactory) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.NegotiatedSerializer")(0xc42047a5f8), ParameterCodec: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ParameterCodec(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.parameterCodec) ..., Typer: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectTyper(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ..., Creater: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectCreater(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ..., Convertor: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ..., Defaulter: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectDefaulter(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ..., Linker: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.SelfLinker(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.resourceAccessor) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.SelfLinker")(0xc42047a658), UnsafeConvertor: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.unsafeObjectConvertor) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor")(0xc42047a668), Admit: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/admission.Interface(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/admission/metrics.pluginHandlerWithMetrics) ..., Context: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/request.RequestContextMapper(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/request.requestContextMap) ..., MinRequestTimeout: 1800000000000, EnableAPIResponseCompression: false,}, prefix: "/api/v1", minRequestTimeout: 1800000000000, enableAPIResponseCompression: false,} (dlv) p g.GroupVersion k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion {Group: "", Version: "v1"} |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
// Install handlers for API resources. func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) { var apiResources []metav1.APIResource var errors []error ws := a.newWebService() // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec. paths := make([]string, len(a.group.Storage)) var i int = 0 for path := range a.group.Storage { paths[i] = path i++ } sort.Strings(paths) for _, path := range paths { apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) if err != nil { errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err)) } if apiResource != nil { apiResources = append(apiResources, *apiResource) } } return apiResources, ws, errors } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
|
(dlv) p a.group.Storage //就是NewLegacyRESTStorage里的restStorageMap map[string]k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Storage [ "replicationcontrollers/status": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116be00),}, "services": *k8s.io/kubernetes/pkg/registry/core/service/storage.REST { services: k8s.io/kubernetes/pkg/registry/core/service/storage.ServiceStorage(*k8s.io/kubernetes/pkg/registry/core/service/storage.GenericREST) ..., endpoints: k8s.io/kubernetes/pkg/registry/core/service/storage.EndpointsStorage(*k8s.io/kubernetes/pkg/registry/core/endpoint/storage.REST) ..., serviceIPs: k8s.io/kubernetes/pkg/registry/core/service/ipallocator.Interface(*k8s.io/kubernetes/pkg/registry/core/service/ipallocator.Range) ..., serviceNodePorts: k8s.io/kubernetes/pkg/registry/core/service/portallocator.Interface(*k8s.io/kubernetes/pkg/registry/core/service/portallocator.PortAllocator) ..., proxyTransport: net/http.RoundTripper(*net/http.Transport) ..., pods: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Getter(*k8s.io/kubernetes/pkg/registry/core/pod/storage.REST) ...,}, "serviceaccounts": *k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7a00), Token: *k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.TokenREST nil,}, "events": *k8s.io/kubernetes/pkg/registry/core/event/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4208cb700),}, "persistentvolumeclaims/status": *k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2a00),}, "resourcequotas": *k8s.io/kubernetes/pkg/registry/core/resourcequota/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4201c3d00),}, "pods/exec": *k8s.io/kubernetes/pkg/registry/core/pod/rest.ExecREST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300), KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, "persistentvolumes/status": *k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2300),}, "componentstatuses": *k8s.io/kubernetes/pkg/registry/core/componentstatus.REST {GetServersToValidate: k8s.io/kubernetes/pkg/registry/core/rest.(componentStatusStorage).(k8s.io/kubernetes/pkg/registry/core/rest.serversToValidate)-fm}, "pods/status": *k8s.io/kubernetes/pkg/registry/core/pod/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7900),}, "pods/binding": *k8s.io/kubernetes/pkg/registry/core/pod/storage.BindingREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, "pods/portforward": *k8s.io/kubernetes/pkg/registry/core/pod/rest.PortForwardREST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300), KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, "namespaces/finalize": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.FinalizeREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b900),}, "resourcequotas/status": *k8s.io/kubernetes/pkg/registry/core/resourcequota/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116af00),}, "nodes": *k8s.io/kubernetes/pkg/registry/core/node/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef6c00), connection: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ..., proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, "namespaces": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.REST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd3100), status: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b800),}, "podtemplates": *k8s.io/kubernetes/pkg/registry/core/podtemplate/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4208cb100),}, "persistentvolumeclaims": *k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2400),}, "replicationcontrollers": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4200a8300),}, "nodes/status": *k8s.io/kubernetes/pkg/registry/core/node/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7200),}, "configmaps": *k8s.io/kubernetes/pkg/registry/core/configmap/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2b00),}, "nodes/proxy": *k8s.io/kubernetes/pkg/registry/core/node/rest.ProxyREST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef6c00), Connection: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ..., ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, "pods/eviction": *k8s.io/kubernetes/pkg/registry/core/pod/storage.EvictionREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300), podDisruptionBudgetClient: k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion.PodDisruptionBudgetsGetter(*k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion.PolicyClient) ...,}, "secrets": *k8s.io/kubernetes/pkg/registry/core/secret/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b000),}, "pods/proxy": *k8s.io/kubernetes/pkg/registry/core/pod/rest.ProxyREST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300), ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, "limitranges": *k8s.io/kubernetes/pkg/registry/core/limitrange/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4201c3200),}, "bindings": *k8s.io/kubernetes/pkg/registry/core/pod/storage.BindingREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, "services/status": *k8s.io/kubernetes/pkg/registry/core/service/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4213acf00),}, "endpoints": *k8s.io/kubernetes/pkg/registry/core/endpoint/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116ba00),}, "pods/log": *k8s.io/kubernetes/pkg/registry/core/pod/rest.LogREST { KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ..., Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, "services/proxy": *k8s.io/kubernetes/pkg/registry/core/service.ProxyREST { Redirector: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Redirector(*k8s.io/kubernetes/pkg/registry/core/service/storage.REST) ..., ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, "namespaces/status": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.StatusREST { store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b800),}, "pods/attach": *k8s.io/kubernetes/pkg/registry/core/pod/rest.AttachREST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300), KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, "pods": *k8s.io/kubernetes/pkg/registry/core/pod/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300), proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, "replicationcontrollers/scale": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.ScaleREST { registry: k8s.io/kubernetes/pkg/registry/core/replicationcontroller.Registry(*k8s.io/kubernetes/pkg/registry/core/replicationcontroller.storage) ...,}, "persistentvolumes": *k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage.REST { Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4200a9d00),}, ] |
paths变量就是上面NewLegacyRESTStorage里restStorageMap map的key,也即”pods”、”nodes”等path。a.registerResourceHandlers()就是注册各个path的handler,也即restStorageMap的value。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
|
// 参数storage是restStorageMap的value,path是key func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) { ...... // what verbs are supported by the storage, used to know what verbs we support per path creater, isCreater := storage.(rest.Creater) namedCreater, isNamedCreater := storage.(rest.NamedCreater) lister, isLister := storage.(rest.Lister) <span style="color: #ff0000;">getter, isGetter := storage.(rest.Getter) getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)</span> gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter) updater, isUpdater := storage.(rest.Updater) patcher, isPatcher := storage.(rest.Patcher) watcher, isWatcher := storage.(rest.Watcher) connecter, isConnecter := storage.(rest.Connecter) storageMeta, isMetadata := storage.(rest.StorageMetadata) ...... default: namespaceParamName := "namespaces" // Handler for standard REST verbs (GET, PUT, POST and DELETE). namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string") namespacedPath := namespaceParamName + "/{" + "namespace" + "}/" + resource namespaceParams := []*restful.Parameter{namespaceParam} resourcePath := namespacedPath resourceParams := namespaceParams itemPath := namespacedPath + "/{name}" nameParams := append(namespaceParams, nameParam) proxyParams := append(nameParams, pathParam) itemPathSuffix := "" if isSubresource { itemPathSuffix = "/" + subresource itemPath = itemPath + itemPathSuffix resourcePath = itemPath resourceParams = nameParams } apiResource.Name = path apiResource.Namespaced = true apiResource.Kind = resourceKind namer := handlers.ContextBasedNaming{ SelfLinker: a.group.Linker, ClusterScoped: false, SelfLinkPathPrefix: gpath.Join(a.prefix, namespaceParamName) + "/", SelfLinkPathSuffix: itemPathSuffix, } actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister) actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter) // DEPRECATED actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList) actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter) if getSubpath { actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter) } actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater) actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher) actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter) actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher) actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter) actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath) // list or post across namespace. // For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods. // TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete) if !isSubresource { actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister) actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList) } break } ...... for _, action := range actions { producedObject := storageMeta.ProducesObject(action.Verb) if producedObject == nil { producedObject = defaultVersionedObject } reqScope.Namer = action.Namer requestScope := "cluster" var namespaced string var operationSuffix string if apiResource.Namespaced { requestScope = "namespace" namespaced = "Namespaced" } if strings.HasSuffix(action.Path, "/{path:*}") { requestScope = "resource" operationSuffix = operationSuffix + "WithPath" } if action.AllNamespaces { requestScope = "cluster" operationSuffix = operationSuffix + "ForAllNamespaces" namespaced = "" } if kubeVerb, found := toDiscoveryKubeVerb[action.Verb]; found { if len(kubeVerb) != 0 { kubeVerbs[kubeVerb] = struct{}{} } } else { return nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb) } routes := []*restful.RouteBuilder{} // If there is a subresource, kind should be the parent's kind. if isSubresource { parentStorage, ok := a.group.Storage[resource] if !ok { return nil, fmt.Errorf("missing parent storage: %q", resource) } fqParentKind, err := a.getResourceKind(resource, parentStorage) if err != nil { return nil, err } kind = fqParentKind.Kind } verbOverrider, needOverride := storage.(StorageMetricsOverride) switch action.Verb { case "GET": // Get a resource. var handler restful.RouteFunction //生成handler,用的是上面的getter(=storage.(rest.Getter)) if isGetterWithOptions { <span style="color: #ff0000;">handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)</span> } else { <span style="color: #ff0000;">handler = restfulGetResource(getter, exporter, reqScope)</span> } if needOverride { // need change the reported verb handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler) } else { handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler) } if a.enableAPIResponseCompression { handler = genericfilters.RestfulWithCompression(handler) } doc := "read the specified " + kind if isSubresource { doc = "read " + subresource + " of the specified " + kind } route := ws.GET(action.Path).To(handler). //绑定path、handler到web service Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...). Returns(http.StatusOK, "OK", producedObject). Writes(producedObject) if isGetterWithOptions { if err := addObjectParams(ws, route, versionedGetOptions); err != nil { return nil, err } } if isExporter { if err := addObjectParams(ws, route, versionedExportOptions); err != nil { return nil, err } } addParams(route, action.Params) routes = append(routes, route) ...... return &apiResource, nil } |
下面分析handler具体是什么方法?以restfulGetResource为例:
|
func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { handlers.GetResource(r, e, scope)(res.ResponseWriter, req.Request) } //return的这个函数就是被注册的handler(根据两个入参也可确定), //里面又调用了其他函数handlers.GetResource,它返回一个函数,并被调用,参数是(res.ResponseWriter, req.Request) //改成这种写法可能更容易看明白: // myGetHandler := handlers.GetResource(r, e, scope) // myGetHandler(res.ResponseWriter, req.Request) //这一步是真正处理Get请求的函数 } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object. // 看上面这行注释,GetResource返回的函数就是处理GET请求的具体执行函数,也即上面说的myGetHandler func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc { return getResourceHandler(scope, // 注意这里函数套函数,下面的匿名函数是作为参数传递给getResourceHandler的, // 最先执行的是getResourceHandler,里面会调用下面这个匿名函数 func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) { // check for export options := metav1.GetOptions{} if values := req.URL.Query(); len(values) > 0 { exports := metav1.ExportOptions{} if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &exports); err != nil { err = errors.NewBadRequest(err.Error()) return nil, err } if exports.Export { if e == nil { return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource)) } return e.Export(ctx, name, exports) } if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil { err = errors.NewBadRequest(err.Error()) return nil, err } } if trace != nil { trace.Step("About to Get from storage") } // 这一步是最终调用的关键方法,会真正转到资源对象的Get请求处理方法 return r.Get(ctx, name, &options) }) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
// getResourceHandler is an HTTP handler function for get requests. It delegates to the // passed-in getterFunc to perform the actual get. func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { trace := utiltrace.New("Get " + req.URL.Path) defer trace.LogIfLong(500 * time.Millisecond) namespace, name, err := scope.Namer.Name(req) if err != nil { scope.err(err, w, req) return } ctx := req.Context() ctx = request.WithNamespace(ctx, namespace) // 这里调用上面提到的作为参数传入的匿名函数,获取请求处理结果 result, err := getter(ctx, name, req, trace) if err != nil { scope.err(err, w, req) return } requestInfo, ok := request.RequestInfoFrom(ctx) if !ok { scope.err(fmt.Errorf("missing requestInfo"), w, req) return } if err := setSelfLink(result, requestInfo, scope.Namer); err != nil { scope.err(err, w, req) return } trace.Step("About to write a response") transformResponseObject(ctx, scope, req, w, http.StatusOK, result) } } |
也即r.Get是最终的实际请求处理函数,它是怎么来的?
r就是getter也就是storage.(rest.Getter),上面提到过参数storage是restStorageMap的value,path是key,针对我们分析的API对应”nodes”: nodeStorage.Node(storage == nodeStorage.Node)。rest.Getter是一个go-restful里面定义的接口,接口也是一种类型,根据Go语言的接口类型定义,接口即约定,可以在任何地方(包)为任何数据类型(int、string、struct等等)实现接口(也即实现接口约定的具有指定参数类型和返回类型的函数),我们看下这个接口的定义:
|
// Getter is an object that can retrieve a named RESTful resource. type Getter interface { // Get finds a resource in the storage by name and returns it. // Although it can return an arbitrary error value, IsNotFound(err) is true for the // returned error value err when the specified resource is not found. Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) } |
再看下storage也即nodeStorage.Node有没有实现它,很郁闷,没找到,但是不要灰心,想想看,struct是可以嵌套的(类似父子类的继承关系),并且支持匿名成员,使用匿名成员可以省略中间struct的名称(尤其是匿名成员根本没有嵌套的中间struct变量名可用)(如果没印象了,可以翻看一下《Go程序设计语言》),看看Node的嵌套struct里面有没有实现接口,
|
// NewStorage returns a NodeStorage object that will work against nodes. func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) { store := &genericregistry.Store{ // 注意这个被嵌套的struct是关键 ...... // Set up REST handlers nodeREST := &REST{Store: store, proxyTransport: proxyTransport} // 在这个struct和它嵌套的struct里面找Get接口实现 ...... return &NodeStorage{ Node: nodeREST, //注意这个变量,是一个嵌套struct Status: statusREST, Proxy: proxyREST, KubeletConnectionInfo: connectionInfoGetter, }, nil } |
对,最终我们在genericregistry.Store这个struct定义的包文件里面找到了它的Get接口实现方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
// Get retrieves the item from storage. func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { obj := e.NewFunc() key, err := e.KeyFunc(ctx, name) if err != nil { return nil, err } // 又是一个接口,总之你发送Get请求给apiserver就会在这里被处理 if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil { return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name) } if e.Decorator != nil { if err := e.Decorator(obj); err != nil { return nil, err } } return obj, nil } |
下面继续找e.Storage.Get,其实根据接口的定义,我已经找到了真正的后端实现了,k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get,为啥是etcd3?需要分析etcd存储后端(k8s元数据存储后端)类型注册(默认是etcd3后端)过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
=> 30: func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { 31: switch c.Type { 32: case storagebackend.StorageTypeETCD2: 33: return newETCD2Storage(c) 34: case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: 35: // TODO: We have the following features to implement: (dlv) bt 0 0x0000000001af76eb in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.Create at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30 1 0x0000000001af8614 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.NewRawStorage at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go:55 2 0x0000000001d26fc7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.StorageWithCacher.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:43 3 0x0000000001d2591d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1362 // 注意create会每个path都进来,需要区分path类型,如这里是node 4 0x0000000003eb072d in k8s.io/kubernetes/pkg/registry/core/node/storage.NewStorage at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:91 5 0x0000000003f13f77 in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:130 6 0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369 7 0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328 8 0x00000000040a2eae in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateKubeAPIServer at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:218 9 0x00000000040a2681 in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateServerChain at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:168 10 0x00000000040a1e51 in k8s.io/kubernetes/cmd/kube-apiserver/app.Run at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:137(dlv) p c.Type "" // 对应storagebackend.StorageTypeUnset,选择默认的etcd3后端 |
fff但是怎么反推回来handler?也就是e.Storage.Get,最简单的办法当然是加断点调试,调试结果在下面的请求处理部分有贴出来,这里就不贴了。但是看了之后还是有疑问,怎么从e.Storage.Get走到etcd3的Get方法的?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
(dlv) bt 0 0x0000000001ae5358 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.newStore at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:99 1 0x0000000001ae521d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.New at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:90 2 0x0000000001af7541 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.newETCD3Storage at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:78 3 0x0000000001af77d7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.Create at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:39 <span style="color: #ff0000;">// 注意这里,下面分析从Get方法cacher.go到etcd3.go有用到</span> <span style="color: #ff0000;">4 0x0000000001af8614 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.NewRawStorage</span> at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go:55 5 0x0000000001d26fc7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.StorageWithCacher.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:43 6 0x0000000001d2591d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1362 7 0x0000000003f0f875 in k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.NewREST at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage/storage.go:52 8 0x0000000003f16aa3 in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:146 9 0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369 10 0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1292 (hits goroutine(1):2 total:2) (PC: 0x1d25303) 1287: GetAttrs: attrFunc, 1288: } 1289: } 1290: } 1291: // 关键在下面这行,它里面创建了opts,也就是后面用到的e.Storage的来源 =>1292: opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource) 1293: if err != nil { 1294: return err 1295: } 1296: 1297: // ResourcePrefix must come from the underlying factory (dlv) whatis options.RESTOptions // 这里没搞明白,怎么就变成了*storageFactoryRestOptionsFactory类型? k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.RESTOptionsGetter Concrete type: *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/options.storageFactoryRestOptionsFactory (dlv) > k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1361 (PC: 0x1d25808) 1356: } 1357: return accessor.GetName(), nil 1358: } 1359: } 1360: =>1361: if e.Storage == nil { 1362: e.Storage, e.DestroyFunc = opts.Decorator( // 用opts.Decorator()生成e.Storage 1363: opts.StorageConfig, 1364: e.NewFunc(), 1365: prefix, 1366: keyFunc, |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
// 没搞明白为啥调用这个接口实现?GetRESTOptions接口有很多实现,为啥走这一个?关键还是上面的类型没搞明白 // 这部分没理解应该是因为Go语言没学透彻,比较现学现卖 func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { storageConfig, err := f.StorageFactory.NewConfig(resource) if err != nil { return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error()) } ret := generic.RESTOptions{ StorageConfig: storageConfig, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers, EnableGarbageCollection: f.Options.EnableGarbageCollection, ResourcePrefix: f.StorageFactory.ResourcePrefix(resource), CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod, } if f.Options.EnableWatchCache { sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes) if err != nil { return generic.RESTOptions{}, err } cacheSize, ok := sizes[resource] if !ok { cacheSize = f.Options.DefaultWatchCacheSize } // 找到了opts.Decorator的来源 ret.Decorator = genericregistry.StorageWithCacher(cacheSize) } return ret, nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
// Creates a cacher based given storageConfig. func StorageWithCacher(capacity int) generic.StorageDecorator { return func( // 这个匿名函数就是生成e.Storage的opts.Decorator storageConfig *storagebackend.Config, objectType runtime.Object, resourcePrefix string, keyFunc func(obj runtime.Object) (string, error), newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { // 注意这行,下面有用到s s, d := generic.NewRawStorage(storageConfig) if capacity == 0 { glog.V(5).Infof("Storage caching is disabled for %T", objectType) return s, d } glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity) // TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. // Currently it has two layers of same storage interface -- cacher and low level kv. cacherConfig := storage.CacherConfig{ CacheCapacity: capacity, Storage: s, // 下面会用到它 Versioner: etcdstorage.APIObjectVersioner{}, Type: objectType, ResourcePrefix: resourcePrefix, KeyFunc: keyFunc, NewListFunc: newListFunc, GetAttrsFunc: getAttrsFunc, TriggerPublisherFunc: triggerFunc, Codec: storageConfig.Codec, } // 这个cacher就是e.Storage,类型是storage.Interface // 这部分也没太理解,这里生成的是一个结构体指针,为啥外面变成了storage.Interface? // 应该还是Go语言没学明白,大致理解是结构体实现了这几个接口,所以可以互相转换 cacher := storage.NewCacherFromConfig(cacherConfig) destroyFunc := func() { cacher.Stop() d() } // TODO : Remove RegisterStorageCleanup below when PR // https://github.com/kubernetes/kubernetes/pull/50690 // merges as that shuts down storage properly RegisterStorageCleanup(destroyFunc) return cacher, destroyFunc } } |
NewCacherFromConfig返回的Cache结构体指针,结构体实现了storage.Interface定义的各个接口,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
// Create a new Cacher responsible for servicing WATCH and LIST requests from // its internal cache and updating its cache in the background based on the // given configuration. func NewCacherFromConfig(config CacherConfig) *Cacher { watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc) listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix // Give this error when it is constructed rather than when you get the // first watch item, because it's much easier to track down that way. if obj, ok := config.Type.(runtime.Object); ok { if err := runtime.CheckCodec(config.Codec, obj); err != nil { panic("storage codec doesn't seem to match given type: " + err.Error()) } } stopCh := make(chan struct{}) cacher := &Cacher{ ready: newReady(), storage: config.Storage, // 注意这个成员,下面用到 |
所以e.Storage.Get就是k8s.io/apiserver/pkg/storage/cacher.go:Get方法,它里面又调用了etcd3后端的Get方法:
|
// Implements storage.Interface. func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { if resourceVersion == "" { // If resourceVersion is not specified, serve it from underlying // storage (for backward compatibility). // c.storage是啥上面有提到 return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) } |
c.storage==config.Storage由NewRawStorage生成,根据上面的调试结果可以确定它最终调用到k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.newStore,所以c.storage.Get调用的就是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
// Get implements storage.Interface.Get. func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { key = path.Join(s.pathPrefix, key) // key是/registry/ // 通过etcd client根据key从etcd后端获取数据value getResp, err := s.client.KV.Get(ctx, key, s.getOps...) if err != nil { return err } if len(getResp.Kvs) == 0 { if ignoreNotFound { return runtime.SetZeroValue(out) } return storage.NewKeyNotFoundError(key, 0) } kv := getResp.Kvs[0] data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key)) if err != nil { return storage.NewInternalError(err.Error()) } return decode(s.codec, s.versioner, data, out, kv.ModRevision) } |
这个Get就是最终处理用户发送的Get API请求的位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:127 (hits goroutine(2632):1 total:12) (PC: 0x1ae584b) 122: func (s *store) Versioner() storage.Versioner { 123: return s.versioner 124: } 125: 126: // Get implements storage.Interface.Get. => 127: func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { 128: key = path.Join(s.pathPrefix, key) 129: getResp, err := s.client.KV.Get(ctx, key, s.getOps...) 130: if err != nil { 131: return err 132: } (dlv) p key "/minions/10.0.90.22" (dlv) p s.pathPrefix "/registry" ...... (dlv) n > k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:129 (PC: 0x1ae5977) 124: } 125: 126: // Get implements storage.Interface.Get. 127: func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { 128: key = path.Join(s.pathPrefix, key) => 129: getResp, err := s.client.KV.Get(ctx, key, s.getOps...) 130: if err != nil { 131: return err 132: } 133: 134: if len(getResp.Kvs) == 0 { (dlv) p key // node信息在etcd数据库里面的key "/registry/minions/10.0.90.22" |
请求处理(HTTP RestFul request to {handler})
这部分就不多说了,直接看调用栈吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
(dlv) bt 0 0x0000000001ae584b in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:127 1 0x00000000016cbcdc in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage.(*Cacher).Get at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/cacher.go:357 2 0x0000000001d1eecb in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Get at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:674 3 0x0000000001c9b304 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers.GetResource.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:106 4 0x0000000001c9a6f9 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers.getResourceHandler.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:60 5 0x0000000001cc290b in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.restfulGetResource.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go:1058 6 0x00000000016dfc55 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/metrics.InstrumentRouteFunc.func1 at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go:199 7 0x0000000001511e53 in k8s.io/kubernetes/vendor/github.com/emicklei/go-restful.(*Container).dispatch at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/emicklei/go-restful/container.go:277 8 0x0000000001510e35 in k8s.io/kubernetes/vendor/github.com/emicklei/go-restful.(*Container).Dispatch at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/emicklei/go-restful/container.go:199 9 0x0000000001d022de in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server.director.ServeHTTP at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/handler.go:152 10 0x0000000001d09ea2 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server.(*director).ServeHTTP at /root/k8s/kubernetes/<autogenerated>:1 |
未解决的问题
- 上面代码分析过程中没搞清楚的两个Go语言知识点,都是关于接口的
- container到http server,这个应该不复杂,不在本次代码分析的目标之内,就没关注
- 各种filter注册和使用,同上,没关注
- etcd部署及使用,etcd在k8s中的使用,这部分属于扩展知识,有时间再看下
跟OpenStack API处理流程的比较
最大的差异当然还是语言上的,python和Go还是有点不太一样的,所以用到的RestFul HTTP Server的框架也不一样,route定义和注册流程也差别比较大,当然还是习惯问题,如果整天看这些代码,用这些框架,也就不会有刚开始看代码时很强烈的差异感了。犹记得当年刚开始看OpenStack nova-api route注册转发流程也是一脸懵逼好久好久。。。
其他方面就是交互流程不一样,当然Get方法差不多,都是接收用户API请求,然后分发到具体的处理方法(route到controller或handler),之后controller或handler从后端数据库(MySQL或etcd)查询用户请求的数据,最后封装返回给用户,再由controller或handler获取请求数据之前还会对用户请求进行多次filter过程,把不合法或不合适的请求过滤掉,只允许合法合适(如未被限流)的请求被controller或handler处理并正常返回用户。
而Create方法,则差异比较大,OpenStack一般是用消息队列进行消息传递,从API服务把具体执行的动作RPC到其他服务(如调度服务、计算节点管理服务等),动作执行过程中或者执行完毕也会通过RPC更新操作状态到数据库(当然最开始是直接访问数据库,后面为了安全改为经过RPC)。而k8s则是完全通过etcd来完成各个组件之间的异步交互,通过watch各自关系的key来实现消息传递和异步调用,操作状态更新也是通过更新etcd来实现。