kube-apiserver RestFul API route创建流程分析




看完《kubernetes权威指南》和《Go程序设计语言》两本书之后,终于可以进入实际的代码分析阶段,根据之前熟悉其他开源项目源码(如libvirt、OpenStack等)的经验,首先从接口/API开始分析。

k8s开发环境搭建:使用kubeasz快速搭建k8s集群all-in-one开发测试环境

Golang调试环境搭建:kubernetes源码调试体验

k8s源码版本:v1.10

分析API:GET 127.0.0.1:8080/api/v1/nodes/10.0.90.22(10.0.90.22是我环境中的一个node,8080为kubectl proxy API代理端口)

分析目标:搞清楚用户发送请求到这个API之后kube-apiserver的处理流程

参考资料:http://www.wklken.me/posts/2017/09/23/source-apiserver-04.html(还有前面几篇,这里面讲的是老版本的,有很多代码已经改了,但是主要流程值得参考)

源码流程很绕(至少目前我是这么认为,可能是因为我刚开始看源码),需要多看,多动手调试,看很多遍,调很多遍,整天琢磨它,应该都能看明白。

route注册({path} to {handler})

要搞清楚route注册流程,就必须先把go-restful框架的用法搞明白,官方Readme文档有说明,也有示例代码。这里给出上面提到的参考资料:http://www.wklken.me/posts/2017/09/23/source-apiserver-01.html

我们只需要记住,go-restful框架中route注册需要经过如下几个步骤:

  1. 创建一个container(默认使用default)
  2. 创建一个web service
  3. 创建handler(也就是API的请求处理函数)
  4. 把API path和handler绑定到web service
  5. 把web service(可多个,但root Path不能相同)绑定到container
  6. 启动包含container(可多个)的http server

我这边试验的一个简单的示例代码:

// https://github.com/emicklei/go-restful/blob/master/examples/restful-multi-containers.go
// GET http://localhost:8080/hello
// GET http://localhost:8081/hello
package main

import (
    "github.com/emicklei/go-restful"
    "io"
    "log"
    "net/http"
)

func main() {
    // add two web services to default container
    ws0 := new(restful.WebService)
    ws0.Path("/hello0")  // 这里/hello0就是ws0的root Path
    ws0.Route(ws0.GET("/").To(hello0)) // curl 127.0.0.1:9080/hello0
    restful.Add(ws0)

    ws1 := new(restful.WebService)
    ws1.Path("/hello1")
    ws1.Route(ws1.GET("/").To(hello1)) // curl 127.0.0.1:9080/hello1
    restful.Add(ws1)
    go func() {
        log.Fatal(http.ListenAndServe(":9080", nil))
    }()

    // container 2
    container2 := restful.NewContainer()
    ws2 := new(restful.WebService)
    ws2.Route(ws2.GET("/hello2").To(hello2)) // curl 127.0.0.1:9081/hello2
    container2.Add(ws2)
    server := &http.Server{Addr: ":9081", Handler: container2}
    log.Fatal(server.ListenAndServe())
}

func hello0(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "default world 0")
}

func hello1(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "default world 1")
}

func hello2(req *restful.Request, resp *restful.Response) {
    io.WriteString(resp, "second world")
}

 

我们下面所有的流程都以这个为基础进行分析。

kube-apiserver的main入口:

func main() {
	rand.Seed(time.Now().UTC().UnixNano())

	command := app.NewAPIServerCommand()

	......
	if err := command.Execute(); err != nil {
		fmt.Fprintf(os.Stderr, "error: %v\n", err)
		os.Exit(1)
	}
}

这里用到了github.com/spf13/cobra这个包来解析启动参数并启动可执行程序。

具体注册流程就不一步一步的分析了,直接根据断点的bt输出跟着代码查看吧:

(dlv) b k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:96
(dlv) r
(dlv) c
(dlv) bt
 0  0x0000000003f1373b in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:96
 1  0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369
 2  0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328
 3  0x00000000040a2eae in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateKubeAPIServer
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:218
 4  0x00000000040a2681 in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateServerChain
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:168
 5  0x00000000040a1e51 in k8s.io/kubernetes/cmd/kube-apiserver/app.Run
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:137
 6  0x00000000040acd57 in k8s.io/kubernetes/cmd/kube-apiserver/app.NewAPIServerCommand.func1
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:121
 7  0x0000000003c5c4e8 in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).execute
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:757
 8  0x0000000003c5cf76 in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).ExecuteC
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:843
 9  0x0000000003c5c7cf in k8s.io/kubernetes/vendor/github.com/spf13/cobra.(*Command).Execute
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/spf13/cobra/command.go:791
10  0x00000000040b16e5 in main.main
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/apiserver.go:51

NewLegacyRESTStorage这个方法(注意是LegacyRESTStorageProvider类型的方法),返回了3个参数,restStorage, apiGroupInfo, nil,最后一个是错误信息可忽略,第一个对我们流程分析没啥影响(应该是),中间这个apiGroupInfo是重点,InstallLegacyAPIGroup就是注册/api这个path的,apiPrefix这个参数是DefaultLegacyAPIPrefix = “/api” ,apiGroupInfo.PrioritizedVersions目前就”v1″一个版本。

//InstallLegacyAPI方法调用这个
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	if !s.legacyAPIGroupPrefixes.Has(apiPrefix) {
		return fmt.Errorf("%q is not in the allowed legacy API prefixes: %v", apiPrefix, s.legacyAPIGroupPrefixes.List())
	}
	if err := s.installAPIResources(apiPrefix, apiGroupInfo); err != nil {
		return err
	}

	// setup discovery
	apiVersions := []string{}
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
		apiVersions = append(apiVersions, groupVersion.Version)
	}
	// Install the version handler.
	// Add a handler at /<apiPrefix> to enumerate the supported api versions.
        // 这步比较简单,就不介绍了,是用来注册GET /api这个path的,用来返回所有api版本,
        // handler是k8s.io/apiserver/pkg/endpoints/discovery/legacy.go:handle方法
	s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix, apiVersions).WebService())

	return nil
}
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {//只有v1
		if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
			glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
			continue
		}
                //注意这里,数据封装过程
		apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
		if apiGroupInfo.OptionsExternalVersion != nil {
			apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
		}

		if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
			return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
		}
	}

	return nil
}

注意这里的getAPIGroupVersion方法,它把apiGroupInfo封装到了apiGroupVersion结构体里面,具体是apiGroupVersion.Storage,下面会用到:

func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
	storage := make(map[string]rest.Storage)
        // 遍历restStorageMap加入到storage,从数据类型可以确认,类型都是map[string]rest.Storage
	for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {//groupVersion.Version就是v1
		storage[strings.ToLower(k)] = v //这里k是NewLegacyRESTStorage里restStorageMap的key,v是它的value
	}
	version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
	version.Root = apiPrefix
	version.Storage = storage
	return version
}
    ......
	restStorageMap := map[string]rest.Storage{
		"pods":             podStorage.Pod,
		"pods/attach":      podStorage.Attach,
		"pods/status":      podStorage.Status,
		"pods/log":         podStorage.Log,
		"pods/exec":        podStorage.Exec,
		"pods/portforward": podStorage.PortForward,
		"pods/proxy":       podStorage.Proxy,
		"pods/binding":     podStorage.Binding,
		"bindings":         podStorage.Binding,

		"podTemplates": podTemplateStorage,

		"replicationControllers":        controllerStorage.Controller,
		"replicationControllers/status": controllerStorage.Status,

		"services":        serviceRest,
		"services/proxy":  serviceRestProxy,
		"services/status": serviceStatusStorage,

		"endpoints": endpointsStorage,

		"nodes":        nodeStorage.Node,
		"nodes/status": nodeStorage.Status,
		"nodes/proxy":  nodeStorage.Proxy,

		"events": eventStorage,

		"limitRanges":                   limitRangeStorage,
		"resourceQuotas":                resourceQuotaStorage,
		"resourceQuotas/status":         resourceQuotaStatusStorage,
		"namespaces":                    namespaceStorage,
		"namespaces/status":             namespaceStatusStorage,
		"namespaces/finalize":           namespaceFinalizeStorage,
		"secrets":                       secretStorage,
		"serviceAccounts":               serviceAccountStorage,
		"persistentVolumes":             persistentVolumeStorage,
		"persistentVolumes/status":      persistentVolumeStatusStorage,
		"persistentVolumeClaims":        persistentVolumeClaimStorage,
		"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
		"configMaps":                    configMapStorage,

		"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
	}
    ......
    apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
    ......

 

// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:                        g, //g就是上面的apiGroupVersion,里面有Storage,g.Storage就是上面的restStorageMap 
		prefix:                       prefix,
		minRequestTimeout:            g.MinRequestTimeout,
		enableAPIResponseCompression: g.EnableAPIResponseCompression,
	}

	apiResources, ws, registrationErrors := installer.Install()//生成web service
	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
	versionDiscoveryHandler.AddToWebService(ws) //path和handler绑定到web service
	container.Add(ws) //把web service加入到container
	return utilerrors.NewAggregate(registrationErrors)
}
(dlv) p installer
*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.APIInstaller {
        group: *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.APIGroupVersion {
                Storage: map[string]k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Storage [...],
                Root: "/api",
                GroupVersion: (*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion)(0xc42047a5b8),
                OptionsExternalVersion: *k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion nil,
                MetaGroupVersion: *k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion nil,
                Mapper: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.RESTMapper(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.DefaultRESTMapper) ...,
                Serializer: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.NegotiatedSerializer(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/serializer.CodecFactory) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.NegotiatedSerializer")(0xc42047a5f8),
                ParameterCodec: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ParameterCodec(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.parameterCodec) ...,
                Typer: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectTyper(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Creater: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectCreater(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Convertor: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Defaulter: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectDefaulter(*k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.Scheme) ...,
                Linker: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.SelfLinker(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/api/meta.resourceAccessor) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.SelfLinker")(0xc42047a658),
                UnsafeConvertor: k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor(k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.unsafeObjectConvertor) *(*"k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime.ObjectConvertor")(0xc42047a668),
                Admit: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/admission.Interface(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/admission/metrics.pluginHandlerWithMetrics) ...,
                Context: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/request.RequestContextMapper(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/request.requestContextMap) ...,
                MinRequestTimeout: 1800000000000,
                EnableAPIResponseCompression: false,},
        prefix: "/api/v1",
        minRequestTimeout: 1800000000000,
        enableAPIResponseCompression: false,}
(dlv) p g.GroupVersion
k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/runtime/schema.GroupVersion {Group: "", Version: "v1"}
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var errors []error
	ws := a.newWebService()

	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
		apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
	}
	return apiResources, ws, errors
}
(dlv) p a.group.Storage //就是NewLegacyRESTStorage里的restStorageMap
map[string]k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Storage [
        "replicationcontrollers/status": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116be00),}, 
        "services": *k8s.io/kubernetes/pkg/registry/core/service/storage.REST {
                services: k8s.io/kubernetes/pkg/registry/core/service/storage.ServiceStorage(*k8s.io/kubernetes/pkg/registry/core/service/storage.GenericREST) ...,
                endpoints: k8s.io/kubernetes/pkg/registry/core/service/storage.EndpointsStorage(*k8s.io/kubernetes/pkg/registry/core/endpoint/storage.REST) ...,
                serviceIPs: k8s.io/kubernetes/pkg/registry/core/service/ipallocator.Interface(*k8s.io/kubernetes/pkg/registry/core/service/ipallocator.Range) ...,
                serviceNodePorts: k8s.io/kubernetes/pkg/registry/core/service/portallocator.Interface(*k8s.io/kubernetes/pkg/registry/core/service/portallocator.PortAllocator) ...,
                proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,
                pods: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Getter(*k8s.io/kubernetes/pkg/registry/core/pod/storage.REST) ...,}, 
        "serviceaccounts": *k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7a00),
                Token: *k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.TokenREST nil,}, 
        "events": *k8s.io/kubernetes/pkg/registry/core/event/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4208cb700),}, 
        "persistentvolumeclaims/status": *k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2a00),}, 
        "resourcequotas": *k8s.io/kubernetes/pkg/registry/core/resourcequota/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4201c3d00),}, 
        "pods/exec": *k8s.io/kubernetes/pkg/registry/core/pod/rest.ExecREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, 
        "persistentvolumes/status": *k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2300),}, 
        "componentstatuses": *k8s.io/kubernetes/pkg/registry/core/componentstatus.REST {GetServersToValidate: k8s.io/kubernetes/pkg/registry/core/rest.(componentStatusStorage).(k8s.io/kubernetes/pkg/registry/core/rest.serversToValidate)-fm}, 
        "pods/status": *k8s.io/kubernetes/pkg/registry/core/pod/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7900),}, 
        "pods/binding": *k8s.io/kubernetes/pkg/registry/core/pod/storage.BindingREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, 
        "pods/portforward": *k8s.io/kubernetes/pkg/registry/core/pod/rest.PortForwardREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, 
        "namespaces/finalize": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.FinalizeREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b900),}, 
        "resourcequotas/status": *k8s.io/kubernetes/pkg/registry/core/resourcequota/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116af00),}, 
        "nodes": *k8s.io/kubernetes/pkg/registry/core/node/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef6c00),
                connection: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,
                proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "namespaces": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.REST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd3100),
                status: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b800),}, 
        "podtemplates": *k8s.io/kubernetes/pkg/registry/core/podtemplate/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4208cb100),}, 
        "persistentvolumeclaims": *k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2400),}, 
        "replicationcontrollers": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4200a8300),}, 
        "nodes/status": *k8s.io/kubernetes/pkg/registry/core/node/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7200),}, 
        "configmaps": *k8s.io/kubernetes/pkg/registry/core/configmap/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420dd2b00),}, 
        "nodes/proxy": *k8s.io/kubernetes/pkg/registry/core/node/rest.ProxyREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef6c00),
                Connection: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,
                ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "pods/eviction": *k8s.io/kubernetes/pkg/registry/core/pod/storage.EvictionREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                podDisruptionBudgetClient: k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion.PodDisruptionBudgetsGetter(*k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/policy/internalversion.PolicyClient) ...,}, 
        "secrets": *k8s.io/kubernetes/pkg/registry/core/secret/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b000),}, 
        "pods/proxy": *k8s.io/kubernetes/pkg/registry/core/pod/rest.ProxyREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "limitranges": *k8s.io/kubernetes/pkg/registry/core/limitrange/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4201c3200),}, 
        "bindings": *k8s.io/kubernetes/pkg/registry/core/pod/storage.BindingREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, 
        "services/status": *k8s.io/kubernetes/pkg/registry/core/service/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4213acf00),}, 
        "endpoints": *k8s.io/kubernetes/pkg/registry/core/endpoint/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116ba00),}, 
        "pods/log": *k8s.io/kubernetes/pkg/registry/core/pod/rest.LogREST {
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),}, 
        "services/proxy": *k8s.io/kubernetes/pkg/registry/core/service.ProxyREST {
                Redirector: k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/rest.Redirector(*k8s.io/kubernetes/pkg/registry/core/service/storage.REST) ...,
                ProxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "namespaces/status": *k8s.io/kubernetes/pkg/registry/core/namespace/storage.StatusREST {
                store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc42116b800),}, 
        "pods/attach": *k8s.io/kubernetes/pkg/registry/core/pod/rest.AttachREST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                KubeletConn: k8s.io/kubernetes/pkg/kubelet/client.ConnectionInfoGetter(*k8s.io/kubernetes/pkg/kubelet/client.NodeConnectionInfoGetter) ...,}, 
        "pods": *k8s.io/kubernetes/pkg/registry/core/pod/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc420ef7300),
                proxyTransport: net/http.RoundTripper(*net/http.Transport) ...,}, 
        "replicationcontrollers/scale": *k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage.ScaleREST {
                registry: k8s.io/kubernetes/pkg/registry/core/replicationcontroller.Registry(*k8s.io/kubernetes/pkg/registry/core/replicationcontroller.storage) ...,}, 
        "persistentvolumes": *k8s.io/kubernetes/pkg/registry/core/persistentvolume/storage.REST {
                Store: *(*k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.Store)(0xc4200a9d00),}, 
]

paths变量就是上面NewLegacyRESTStorage里restStorageMap map的key,也即”pods”、”nodes”等path。a.registerResourceHandlers()就是注册各个path的handler,也即restStorageMap的value。

// 参数storage是restStorageMap的value,path是key
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
	......
	// what verbs are supported by the storage, used to know what verbs we support per path
	creater, isCreater := storage.(rest.Creater)
	namedCreater, isNamedCreater := storage.(rest.NamedCreater)
	lister, isLister := storage.(rest.Lister)
	getter, isGetter := storage.(rest.Getter)
	getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
	gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
	collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
	updater, isUpdater := storage.(rest.Updater)
	patcher, isPatcher := storage.(rest.Patcher)
	watcher, isWatcher := storage.(rest.Watcher)
	connecter, isConnecter := storage.(rest.Connecter)
	storageMeta, isMetadata := storage.(rest.StorageMetadata)
        ......
	default:
		namespaceParamName := "namespaces"
		// Handler for standard REST verbs (GET, PUT, POST and DELETE).
		namespaceParam := ws.PathParameter("namespace", "object name and auth scope, such as for teams and projects").DataType("string")
		namespacedPath := namespaceParamName + "/{" + "namespace" + "}/" + resource
		namespaceParams := []*restful.Parameter{namespaceParam}

		resourcePath := namespacedPath
		resourceParams := namespaceParams
		itemPath := namespacedPath + "/{name}"
		nameParams := append(namespaceParams, nameParam)
		proxyParams := append(nameParams, pathParam)
		itemPathSuffix := ""
		if isSubresource {
			itemPathSuffix = "/" + subresource
			itemPath = itemPath + itemPathSuffix
			resourcePath = itemPath
			resourceParams = nameParams
		}
		apiResource.Name = path
		apiResource.Namespaced = true
		apiResource.Kind = resourceKind
		namer := handlers.ContextBasedNaming{
			SelfLinker:         a.group.Linker,
			ClusterScoped:      false,
			SelfLinkPathPrefix: gpath.Join(a.prefix, namespaceParamName) + "/",
			SelfLinkPathSuffix: itemPathSuffix,
		}

		actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
		actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
		actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
		// DEPRECATED
		actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)

		actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
		if getSubpath {
			actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
		}
		actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
		actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
		actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)
		actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
		actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
		actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

		// list or post across namespace.
		// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
		// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
		if !isSubresource {
			actions = appendIf(actions, action{"LIST", resource, params, namer, true}, isLister)
			actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer, true}, allowWatchList)
		}
		break
	}

        ......
	for _, action := range actions {
		producedObject := storageMeta.ProducesObject(action.Verb)
		if producedObject == nil {
			producedObject = defaultVersionedObject
		}
		reqScope.Namer = action.Namer

		requestScope := "cluster"
		var namespaced string
		var operationSuffix string
		if apiResource.Namespaced {
			requestScope = "namespace"
			namespaced = "Namespaced"
		}
		if strings.HasSuffix(action.Path, "/{path:*}") {
			requestScope = "resource"
			operationSuffix = operationSuffix + "WithPath"
		}
		if action.AllNamespaces {
			requestScope = "cluster"
			operationSuffix = operationSuffix + "ForAllNamespaces"
			namespaced = ""
		}

		if kubeVerb, found := toDiscoveryKubeVerb[action.Verb]; found {
			if len(kubeVerb) != 0 {
				kubeVerbs[kubeVerb] = struct{}{}
			}
		} else {
			return nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb)
		}

		routes := []*restful.RouteBuilder{}

		// If there is a subresource, kind should be the parent's kind.
		if isSubresource {
			parentStorage, ok := a.group.Storage[resource]
			if !ok {
				return nil, fmt.Errorf("missing parent storage: %q", resource)
			}

			fqParentKind, err := a.getResourceKind(resource, parentStorage)
			if err != nil {
				return nil, err
			}
			kind = fqParentKind.Kind
		}

		verbOverrider, needOverride := storage.(StorageMetricsOverride)

		switch action.Verb {
		case "GET": // Get a resource.
			var handler restful.RouteFunction //生成handler,用的是上面的getter(=storage.(rest.Getter))
			if isGetterWithOptions {
				handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, isSubresource)
			} else {
				handler = restfulGetResource(getter, exporter, reqScope)
			}

			if needOverride {
				// need change the reported verb
				handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler)
			} else {
				handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
			}

			if a.enableAPIResponseCompression {
				handler = genericfilters.RestfulWithCompression(handler)
			}
			doc := "read the specified " + kind
			if isSubresource {
				doc = "read " + subresource + " of the specified " + kind
			}
			route := ws.GET(action.Path).To(handler). //绑定path、handler到web service
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				Writes(producedObject)
			if isGetterWithOptions {
				if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
					return nil, err
				}
			}
			if isExporter {
				if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
					return nil, err
				}
			}
			addParams(route, action.Params)
			routes = append(routes, route)
		......
	return &apiResource, nil
}

下面分析handler具体是什么方法?以restfulGetResource为例:

func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
	return func(req *restful.Request, res *restful.Response) {
		handlers.GetResource(r, e, scope)(res.ResponseWriter, req.Request)
	} //return的这个函数就是被注册的handler(根据两个入参也可确定),
          //里面又调用了其他函数handlers.GetResource,它返回一个函数,并被调用,参数是(res.ResponseWriter, req.Request)
          //改成这种写法可能更容易看明白:
          //    myGetHandler := handlers.GetResource(r, e, scope)
          //    myGetHandler(res.ResponseWriter, req.Request) //这一步是真正处理Get请求的函数
}
// GetResource returns a function that handles retrieving a single resource from a rest.Storage object.
// 看上面这行注释,GetResource返回的函数就是处理GET请求的具体执行函数,也即上面说的myGetHandler
func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc {
	return getResourceHandler(scope, // 注意这里函数套函数,下面的匿名函数是作为参数传递给getResourceHandler的,
                                         // 最先执行的是getResourceHandler,里面会调用下面这个匿名函数
		func(ctx context.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
			// check for export
			options := metav1.GetOptions{}
			if values := req.URL.Query(); len(values) > 0 {
				exports := metav1.ExportOptions{}
				if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &exports); err != nil {
					err = errors.NewBadRequest(err.Error())
					return nil, err
				}
				if exports.Export {
					if e == nil {
						return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource))
					}
					return e.Export(ctx, name, exports)
				}
				if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
					err = errors.NewBadRequest(err.Error())
					return nil, err
				}
			}
			if trace != nil {
				trace.Step("About to Get from storage")
			}
                        // 这一步是最终调用的关键方法,会真正转到资源对象的Get请求处理方法
			return r.Get(ctx, name, &options)
		})
}
// getResourceHandler is an HTTP handler function for get requests. It delegates to the
// passed-in getterFunc to perform the actual get.
func getResourceHandler(scope RequestScope, getter getterFunc) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		trace := utiltrace.New("Get " + req.URL.Path)
		defer trace.LogIfLong(500 * time.Millisecond)

		namespace, name, err := scope.Namer.Name(req)
		if err != nil {
			scope.err(err, w, req)
			return
		}
		ctx := req.Context()
		ctx = request.WithNamespace(ctx, namespace)
                // 这里调用上面提到的作为参数传入的匿名函数,获取请求处理结果
		result, err := getter(ctx, name, req, trace)
		if err != nil {
			scope.err(err, w, req)
			return
		}
		requestInfo, ok := request.RequestInfoFrom(ctx)
		if !ok {
			scope.err(fmt.Errorf("missing requestInfo"), w, req)
			return
		}
		if err := setSelfLink(result, requestInfo, scope.Namer); err != nil {
			scope.err(err, w, req)
			return
		}

		trace.Step("About to write a response")
		transformResponseObject(ctx, scope, req, w, http.StatusOK, result)
	}
}

也即r.Get是最终的实际请求处理函数,它是怎么来的?

r就是getter也就是storage.(rest.Getter),上面提到过参数storage是restStorageMap的value,path是key,针对我们分析的API对应”nodes”: nodeStorage.Node(storage == nodeStorage.Node)。rest.Getter是一个go-restful里面定义的接口,接口也是一种类型,根据Go语言的接口类型定义,接口即约定,可以在任何地方(包)为任何数据类型(int、string、struct等等)实现接口(也即实现接口约定的具有指定参数类型和返回类型的函数),我们看下这个接口的定义:

// Getter is an object that can retrieve a named RESTful resource.
type Getter interface {
	// Get finds a resource in the storage by name and returns it.
	// Although it can return an arbitrary error value, IsNotFound(err) is true for the
	// returned error value err when the specified resource is not found.
	Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}

再看下storage也即nodeStorage.Node有没有实现它,很郁闷,没找到,但是不要灰心,想想看,struct是可以嵌套的(类似父子类的继承关系),并且支持匿名成员,使用匿名成员可以省略中间struct的名称(尤其是匿名成员根本没有嵌套的中间struct变量名可用)(如果没印象了,可以翻看一下《Go程序设计语言》),看看Node的嵌套struct里面有没有实现接口,

// NewStorage returns a NodeStorage object that will work against nodes.
func NewStorage(optsGetter generic.RESTOptionsGetter, kubeletClientConfig client.KubeletClientConfig, proxyTransport http.RoundTripper) (*NodeStorage, error) {
	store := &genericregistry.Store{ // 注意这个被嵌套的struct是关键
        ......
	// Set up REST handlers
	nodeREST := &REST{Store: store, proxyTransport: proxyTransport} // 在这个struct和它嵌套的struct里面找Get接口实现
        ......
	return &NodeStorage{
		Node:   nodeREST,  //注意这个变量,是一个嵌套struct
		Status: statusREST,
		Proxy:  proxyREST,
		KubeletConnectionInfo: connectionInfoGetter,
	}, nil
}

对,最终我们在genericregistry.Store这个struct定义的包文件里面找到了它的Get接口实现方法:

// Get retrieves the item from storage.
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
	obj := e.NewFunc()
	key, err := e.KeyFunc(ctx, name)
	if err != nil {
		return nil, err
	}
        // 又是一个接口,总之你发送Get请求给apiserver就会在这里被处理
	if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
		return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
	}
	if e.Decorator != nil {
		if err := e.Decorator(obj); err != nil {
			return nil, err
		}
	}
	return obj, nil
}

下面继续找e.Storage.Get,其实根据接口的定义,我已经找到了真正的后端实现了,k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get,为啥是etcd3?需要分析etcd存储后端(k8s元数据存储后端)类型注册(默认是etcd3后端)过程:

=>  30: func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    31:         switch c.Type {
    32:         case storagebackend.StorageTypeETCD2:
    33:                 return newETCD2Storage(c)
    34:         case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
    35:                 // TODO: We have the following features to implement:
(dlv) bt
0 0x0000000001af76eb in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.Create
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:30
1 0x0000000001af8614 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.NewRawStorage
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go:55
2 0x0000000001d26fc7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.StorageWithCacher.func1
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:43
3 0x0000000001d2591d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1362
// 注意create会每个path都进来,需要区分path类型,如这里是node
4 0x0000000003eb072d in k8s.io/kubernetes/pkg/registry/core/node/storage.NewStorage
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/node/storage/storage.go:91
5 0x0000000003f13f77 in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:130
6 0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369
7 0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328
8 0x00000000040a2eae in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateKubeAPIServer
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:218
9 0x00000000040a2681 in k8s.io/kubernetes/cmd/kube-apiserver/app.CreateServerChain
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:168
10 0x00000000040a1e51 in k8s.io/kubernetes/cmd/kube-apiserver/app.Run
  at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go:137(dlv) p c.Type
""  // 对应storagebackend.StorageTypeUnset,选择默认的etcd3后端

fff但是怎么反推回来handler?也就是e.Storage.Get,最简单的办法当然是加断点调试,调试结果在下面的请求处理部分有贴出来,这里就不贴了。但是看了之后还是有疑问,怎么从e.Storage.Get走到etcd3的Get方法的?

(dlv) bt
 0  0x0000000001ae5358 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.newStore
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:99
 1  0x0000000001ae521d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.New
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:90
 2  0x0000000001af7541 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.newETCD3Storage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go:78
 3  0x0000000001af77d7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory.Create
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go:39
 // 注意这里,下面分析从Get方法cacher.go到etcd3.go有用到
 4  0x0000000001af8614 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.NewRawStorage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/storage_decorator.go:55
 5  0x0000000001d26fc7 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.StorageWithCacher.func1
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:43
 6  0x0000000001d2591d in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1362
 7  0x0000000003f0f875 in k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage.NewREST
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage/storage.go:52
 8  0x0000000003f16aa3 in k8s.io/kubernetes/pkg/registry/core/rest.LegacyRESTStorageProvider.NewLegacyRESTStorage
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/registry/core/rest/storage_core.go:146
 9  0x000000000408ccc6 in k8s.io/kubernetes/pkg/master.(*Master).InstallLegacyAPI
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:369
10  0x000000000408c263 in k8s.io/kubernetes/pkg/master.completedConfig.New
    at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/pkg/master/master.go:328
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1292 (hits goroutine(1):2 total:2) (PC: 0x1d25303)
1287: GetAttrs: attrFunc,
1288: }
1289: }
1290: }
1291: // 关键在下面这行,它里面创建了opts,也就是后面用到的e.Storage的来源
=>1292: opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
1293: if err != nil {
1294: return err
1295: }
1296:
1297: // ResourcePrefix must come from the underlying factory
(dlv) whatis options.RESTOptions // 这里没搞明白,怎么就变成了*storageFactoryRestOptionsFactory类型?
k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic.RESTOptionsGetter
Concrete type: *k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/options.storageFactoryRestOptionsFactory

(dlv) 
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).CompleteWithOptions() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:1361 (PC: 0x1d25808)
1356: }
1357: return accessor.GetName(), nil
1358: }
1359: }
1360:
=>1361: if e.Storage == nil {
1362: e.Storage, e.DestroyFunc = opts.Decorator( // 用opts.Decorator()生成e.Storage
1363: opts.StorageConfig,
1364: e.NewFunc(),
1365: prefix,
1366: keyFunc,
// 没搞明白为啥调用这个接口实现?GetRESTOptions接口有很多实现,为啥走这一个?关键还是上面的类型没搞明白
// 这部分没理解应该是因为Go语言没学透彻,比较现学现卖
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
	storageConfig, err := f.StorageFactory.NewConfig(resource)
	if err != nil {
		return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
	}

	ret := generic.RESTOptions{
		StorageConfig:           storageConfig,
		Decorator:               generic.UndecoratedStorage,
		DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
		EnableGarbageCollection: f.Options.EnableGarbageCollection,
		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
		CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,
	}
	if f.Options.EnableWatchCache {
		sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
		if err != nil {
			return generic.RESTOptions{}, err
		}
		cacheSize, ok := sizes[resource]
		if !ok {
			cacheSize = f.Options.DefaultWatchCacheSize
		}
                // 找到了opts.Decorator的来源
		ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
	}

	return ret, nil
}
// Creates a cacher based given storageConfig.
func StorageWithCacher(capacity int) generic.StorageDecorator {
	return func( // 这个匿名函数就是生成e.Storage的opts.Decorator
		storageConfig *storagebackend.Config,
		objectType runtime.Object,
		resourcePrefix string,
		keyFunc func(obj runtime.Object) (string, error),
		newListFunc func() runtime.Object,
		getAttrsFunc storage.AttrFunc,
		triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
                // 注意这行,下面有用到s
		s, d := generic.NewRawStorage(storageConfig)
		if capacity == 0 {
			glog.V(5).Infof("Storage caching is disabled for %T", objectType)
			return s, d
		}
		glog.V(5).Infof("Storage caching is enabled for %T with capacity %v", objectType, capacity)

		// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
		// Currently it has two layers of same storage interface -- cacher and low level kv.
		cacherConfig := storage.CacherConfig{
			CacheCapacity:        capacity,
			Storage:              s,  // 下面会用到它
			Versioner:            etcdstorage.APIObjectVersioner{},
			Type:                 objectType,
			ResourcePrefix:       resourcePrefix,
			KeyFunc:              keyFunc,
			NewListFunc:          newListFunc,
			GetAttrsFunc:         getAttrsFunc,
			TriggerPublisherFunc: triggerFunc,
			Codec:                storageConfig.Codec,
		}
                // 这个cacher就是e.Storage,类型是storage.Interface
                // 这部分也没太理解,这里生成的是一个结构体指针,为啥外面变成了storage.Interface?
                // 应该还是Go语言没学明白,大致理解是结构体实现了这几个接口,所以可以互相转换
		cacher := storage.NewCacherFromConfig(cacherConfig)
		destroyFunc := func() {
			cacher.Stop()
			d()
		}

		// TODO : Remove RegisterStorageCleanup below when PR
		// https://github.com/kubernetes/kubernetes/pull/50690
		// merges as that shuts down storage properly
		RegisterStorageCleanup(destroyFunc)

		return cacher, destroyFunc
	}
}

NewCacherFromConfig返回的Cache结构体指针,结构体实现了storage.Interface定义的各个接口,

// Create a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
	watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
	listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
	reflectorName := "storage/cacher.go:" + config.ResourcePrefix

	// Give this error when it is constructed rather than when you get the
	// first watch item, because it's much easier to track down that way.
	if obj, ok := config.Type.(runtime.Object); ok {
		if err := runtime.CheckCodec(config.Codec, obj); err != nil {
			panic("storage codec doesn't seem to match given type: " + err.Error())
		}
	}

	stopCh := make(chan struct{})
	cacher := &Cacher{
		ready:       newReady(),
		storage:     config.Storage, // 注意这个成员,下面用到

所以e.Storage.Get就是k8s.io/apiserver/pkg/storage/cacher.go:Get方法,它里面又调用了etcd3后端的Get方法:

// Implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
	if resourceVersion == "" {
		// If resourceVersion is not specified, serve it from underlying
		// storage (for backward compatibility).
                // c.storage是啥上面有提到
		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
	}

c.storage==config.Storage由NewRawStorage生成,根据上面的调试结果可以确定它最终调用到k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.newStore,所以c.storage.Get调用的就是:

// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
	key = path.Join(s.pathPrefix, key) // key是/registry/
        // 通过etcd client根据key从etcd后端获取数据value
	getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
	if err != nil {
		return err
	}

	if len(getResp.Kvs) == 0 {
		if ignoreNotFound {
			return runtime.SetZeroValue(out)
		}
		return storage.NewKeyNotFoundError(key, 0)
	}
	kv := getResp.Kvs[0]

	data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
	if err != nil {
		return storage.NewInternalError(err.Error())
	}

	return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}

这个Get就是最终处理用户发送的Get API请求的位置。

> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:127 (hits goroutine(2632):1 total:12) (PC: 0x1ae584b)
   122: func (s *store) Versioner() storage.Versioner {
   123:         return s.versioner
   124: }
   125:
   126: // Get implements storage.Interface.Get.
=> 127: func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
   128:         key = path.Join(s.pathPrefix, key)
   129:         getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
   130:         if err != nil {
   131:                 return err
   132:         }
(dlv) p key
"/minions/10.0.90.22"
(dlv) p s.pathPrefix
"/registry"
......
(dlv) n
> k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get() /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:129 (PC: 0x1ae5977)
   124: }
   125:
   126: // Get implements storage.Interface.Get.
   127: func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
   128:         key = path.Join(s.pathPrefix, key)
=> 129:         getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
   130:         if err != nil {
   131:                 return err
   132:         }
   133:
   134:         if len(getResp.Kvs) == 0 {
(dlv) p key    // node信息在etcd数据库里面的key
"/registry/minions/10.0.90.22"

 

请求处理(HTTP RestFul request to {handler})

这部分就不多说了,直接看调用栈吧:

(dlv) bt
0 0x0000000001ae584b in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3.(*store).Get
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go:127
1 0x00000000016cbcdc in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage.(*Cacher).Get
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/cacher.go:357
2 0x0000000001d1eecb in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Get
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go:674
3 0x0000000001c9b304 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers.GetResource.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:106
4 0x0000000001c9a6f9 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers.getResourceHandler.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go:60
5 0x0000000001cc290b in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints.restfulGetResource.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/installer.go:1058
6 0x00000000016dfc55 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/metrics.InstrumentRouteFunc.func1
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go:199
7 0x0000000001511e53 in k8s.io/kubernetes/vendor/github.com/emicklei/go-restful.(*Container).dispatch
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/emicklei/go-restful/container.go:277
8 0x0000000001510e35 in k8s.io/kubernetes/vendor/github.com/emicklei/go-restful.(*Container).Dispatch
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/github.com/emicklei/go-restful/container.go:199
9 0x0000000001d022de in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server.director.ServeHTTP
at /root/k8s/kubernetes/_output/local/go/src/k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/handler.go:152
10 0x0000000001d09ea2 in k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server.(*director).ServeHTTP
at /root/k8s/kubernetes/<autogenerated>:1

 

未解决的问题

  • 上面代码分析过程中没搞清楚的两个Go语言知识点,都是关于接口的
  • container到http server,这个应该不复杂,不在本次代码分析的目标之内,就没关注
  • 各种filter注册和使用,同上,没关注
  • etcd部署及使用,etcd在k8s中的使用,这部分属于扩展知识,有时间再看下

跟OpenStack API处理流程的比较

最大的差异当然还是语言上的,python和Go还是有点不太一样的,所以用到的RestFul HTTP Server的框架也不一样,route定义和注册流程也差别比较大,当然还是习惯问题,如果整天看这些代码,用这些框架,也就不会有刚开始看代码时很强烈的差异感了。犹记得当年刚开始看OpenStack nova-api route注册转发流程也是一脸懵逼好久好久。。。

其他方面就是交互流程不一样,当然Get方法差不多,都是接收用户API请求,然后分发到具体的处理方法(route到controller或handler),之后controller或handler从后端数据库(MySQL或etcd)查询用户请求的数据,最后封装返回给用户,再由controller或handler获取请求数据之前还会对用户请求进行多次filter过程,把不合法或不合适的请求过滤掉,只允许合法合适(如未被限流)的请求被controller或handler处理并正常返回用户。

而Create方法,则差异比较大,OpenStack一般是用消息队列进行消息传递,从API服务把具体执行的动作RPC到其他服务(如调度服务、计算节点管理服务等),动作执行过程中或者执行完毕也会通过RPC更新操作状态到数据库(当然最开始是直接访问数据库,后面为了安全改为经过RPC)。而k8s则是完全通过etcd来完成各个组件之间的异步交互,通过watch各自关系的key来实现消息传递和异步调用,操作状态更新也是通过更新etcd来实现。