Skip to content

client-go 简介

3561字约12分钟

Go

2024-07-09

Kubernetes 是大家所非常熟悉的集群管理平台, 我们通常在Web界面上对集群内部的pod和service进行操作. 如果想要在程序代码中对集群中的pod进行操作或者获取其中某些容器的状态信息则可以使用 Kubernetes 官方给出的API接口. 由于这些接口的参数非常复杂, 所以通常我们会使用client-go这个外部库来进行API的操作.

本文将详细介绍client-go的操作和源码实现. 文中涉及大量的源码, 未使用链接的原因是为了避免源码版本的变更.

0. Kubernetes API

文档

1. client-go简介

1.1 client-go说明

client-go是一个调用Kubernetes集群资源对象API的客户端, 即通过client-go实现对Kubernetes集群中资源对象(包括Deployment、Service、Ingress、Stateful Set、Pod、Namespace、Node等)的增删改查等动作. 大部分对Kubernetes进行前置API封装的二次开发都通过client-go这个第三方包来实现.

client-go官方文档

1.2 示例代码

func GetPods(clientSet *kubernetes.Clientset) {
	for {
		pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
		if err != nil {
			panic(err.Error())
		}
		log.Info().Msgf("There are %d pods in the cluster\n", len(pods.Items))
		time.Sleep(10 * time.Second)
	}
}

1.3 运行结果

=== RUN   TestGetPods
{"level":"info","time":"2024-07-05T15:57:22+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:33+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:45+08:00","message":"There are 4958 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:57+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:08+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:20+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:32+08:00","message":"There are 4946 pods in the cluster\n"}

2. client-go源码分析

client-go源码

client-go源码目录结构

  • The kubernetes package contains the clientset to access Kubernetes API.
  • The discovery package is used to discover APIs supported by a Kubernetes API server.
  • The dynamic package contains a dynamic client that can perform generic operations on arbitrary Kubernetes API objects.
  • The plugin/pkg/client/auth packages contain optional authentication plugins for obtaining credentials from external sources.
  • The transport package is used to set up auth and start a connection.
  • The tools/cache package is useful for writing controllers.

2.1 kubeconfig

kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")

获取Kubernetes配置文件kubeconfig的绝对路径. 一般路径为$HOME/.kube/config. 该文件主要用来配置本地连接的Kubernetes集群.

config内容示例:

apiVersion: v1
clusters:
- cluster:
    server: http://<kube-master-ip>:8080
  name: k8s
contexts:
- context:
    cluster: k8s
    namespace: default
    user: ""
  name: default
current-context: default
kind: Config
preferences: {}
users: []

2.2 rest.config

通过参数(master的url或者kubeconfig路径)和BuildConfigFromFlags方法来获取rest.Config对象, 一般是通过参数kubeconfig的路径

config,err := clientcmd.BuildConfigFromFlags("",*kubeconfig)

BuildConfigFromFlags函数源码

BuildConfigFromFlags函数源码

// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
	if kubeconfigPath == "" && masterUrl == "" {
		klog.Warning("Neither --kubeconfig nor --master was specified.  Using the inClusterConfig.  This might not work.")
		kubeconfig, err := restclient.InClusterConfig()
		if err == nil {
			return kubeconfig, nil
		}
		klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
	}
	return NewNonInteractiveDeferredLoadingClientConfig(
		&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
		&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}

2.3 clientset

通过*rest.Config参数和NewForConfig方法来获取clientset对象, clientset是多个client的集合, 每个client可能包含不同版本的方法调用.

clientset, err := kubernetes.NewForConfig(config)

2.3.1 NewForConfig

NewForConfig函数就是初始化clientset中的每个client

NewForConfig源码

// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c

	if configShallowCopy.UserAgent == "" {
		configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	// share the transport between all clients
	httpClient, err := rest.HTTPClientFor(&configShallowCopy)
	if err != nil {
		return nil, err
	}

	return NewForConfigAndClient(&configShallowCopy, httpClient)
}

2.3.2 Clientset的结构体

Clientset的结构体

// Clientset contains the clients for groups.
type Clientset struct {
	*discovery.DiscoveryClient
	admissionregistrationV1       *admissionregistrationv1.AdmissionregistrationV1Client
	admissionregistrationV1alpha1 *admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Client
	admissionregistrationV1beta1  *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
	internalV1alpha1              *internalv1alpha1.InternalV1alpha1Client
	appsV1                        *appsv1.AppsV1Client
	appsV1beta1                   *appsv1beta1.AppsV1beta1Client
	appsV1beta2                   *appsv1beta2.AppsV1beta2Client
	authenticationV1              *authenticationv1.AuthenticationV1Client
	authenticationV1alpha1        *authenticationv1alpha1.AuthenticationV1alpha1Client
	authenticationV1beta1         *authenticationv1beta1.AuthenticationV1beta1Client
	authorizationV1               *authorizationv1.AuthorizationV1Client
	authorizationV1beta1          *authorizationv1beta1.AuthorizationV1beta1Client
	autoscalingV1                 *autoscalingv1.AutoscalingV1Client
	autoscalingV2                 *autoscalingv2.AutoscalingV2Client
	autoscalingV2beta1            *autoscalingv2beta1.AutoscalingV2beta1Client
	autoscalingV2beta2            *autoscalingv2beta2.AutoscalingV2beta2Client
	batchV1                       *batchv1.BatchV1Client
	batchV1beta1                  *batchv1beta1.BatchV1beta1Client
	certificatesV1                *certificatesv1.CertificatesV1Client
	certificatesV1beta1           *certificatesv1beta1.CertificatesV1beta1Client
	certificatesV1alpha1          *certificatesv1alpha1.CertificatesV1alpha1Client
	coordinationV1beta1           *coordinationv1beta1.CoordinationV1beta1Client
	coordinationV1                *coordinationv1.CoordinationV1Client
	coreV1                        *corev1.CoreV1Client
	discoveryV1                   *discoveryv1.DiscoveryV1Client
	discoveryV1beta1              *discoveryv1beta1.DiscoveryV1beta1Client
	eventsV1                      *eventsv1.EventsV1Client
	eventsV1beta1                 *eventsv1beta1.EventsV1beta1Client
	extensionsV1beta1             *extensionsv1beta1.ExtensionsV1beta1Client
	flowcontrolV1alpha1           *flowcontrolv1alpha1.FlowcontrolV1alpha1Client
	flowcontrolV1beta1            *flowcontrolv1beta1.FlowcontrolV1beta1Client
	flowcontrolV1beta2            *flowcontrolv1beta2.FlowcontrolV1beta2Client
	flowcontrolV1beta3            *flowcontrolv1beta3.FlowcontrolV1beta3Client
	networkingV1                  *networkingv1.NetworkingV1Client
	networkingV1alpha1            *networkingv1alpha1.NetworkingV1alpha1Client
	networkingV1beta1             *networkingv1beta1.NetworkingV1beta1Client
	nodeV1                        *nodev1.NodeV1Client
	nodeV1alpha1                  *nodev1alpha1.NodeV1alpha1Client
	nodeV1beta1                   *nodev1beta1.NodeV1beta1Client
	policyV1                      *policyv1.PolicyV1Client
	policyV1beta1                 *policyv1beta1.PolicyV1beta1Client
	rbacV1                        *rbacv1.RbacV1Client
	rbacV1beta1                   *rbacv1beta1.RbacV1beta1Client
	rbacV1alpha1                  *rbacv1alpha1.RbacV1alpha1Client
	resourceV1alpha2              *resourcev1alpha2.ResourceV1alpha2Client
	schedulingV1alpha1            *schedulingv1alpha1.SchedulingV1alpha1Client
	schedulingV1beta1             *schedulingv1beta1.SchedulingV1beta1Client
	schedulingV1                  *schedulingv1.SchedulingV1Client
	storageV1beta1                *storagev1beta1.StorageV1beta1Client
	storageV1                     *storagev1.StorageV1Client
	storageV1alpha1               *storagev1alpha1.StorageV1alpha1Client
}

2.3.3 clientset.Interface

clientset实现了以下的Interface, 因此可以通过调用以下方法获得具体的client. 例如:

pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
clientset的方法集接口

clientet.Interface

type Interface interface {
	Discovery() discovery.DiscoveryInterface
	AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
	AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
	AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
	InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
	AppsV1() appsv1.AppsV1Interface
	AppsV1beta1() appsv1beta1.AppsV1beta1Interface
	AppsV1beta2() appsv1beta2.AppsV1beta2Interface
	AuthenticationV1() authenticationv1.AuthenticationV1Interface
	AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
	AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
	AuthorizationV1() authorizationv1.AuthorizationV1Interface
	AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
	AutoscalingV1() autoscalingv1.AutoscalingV1Interface
	AutoscalingV2() autoscalingv2.AutoscalingV2Interface
	AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
	AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
	BatchV1() batchv1.BatchV1Interface
	BatchV1beta1() batchv1beta1.BatchV1beta1Interface
	CertificatesV1() certificatesv1.CertificatesV1Interface
	CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
	CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
	CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
	CoordinationV1() coordinationv1.CoordinationV1Interface
	CoreV1() corev1.CoreV1Interface
	DiscoveryV1() discoveryv1.DiscoveryV1Interface
	DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
	EventsV1() eventsv1.EventsV1Interface
	EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
	ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
	FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
	FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
	FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
	FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
	NetworkingV1() networkingv1.NetworkingV1Interface
	NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
	NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
	NodeV1() nodev1.NodeV1Interface
	NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
	NodeV1beta1() nodev1beta1.NodeV1beta1Interface
	PolicyV1() policyv1.PolicyV1Interface
	PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
	RbacV1() rbacv1.RbacV1Interface
	RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
	RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
	ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
	SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
	SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
	SchedulingV1() schedulingv1.SchedulingV1Interface
	StorageV1beta1() storagev1beta1.StorageV1beta1Interface
	StorageV1() storagev1.StorageV1Interface
	StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}

2.4 CoreV1Client

我们以clientset中的CoreV1Client为例做分析

通过传入的配置信息rest.Config初始化CoreV1Client对象.

源码

cs.coreV1, err = corev1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
	return nil, err
}

2.4.1 coreV1.NewForConfig

源码

// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

coreV1.NewForConfig方法本质是调用了rest.HTTPClientFor(&config)方法创建*http.Client对象, 即CoreV1Client的本质就是一个http.Client对象.

2.4.2 CoreV1Client结构体

源码

// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
	restClient rest.Interface
}

CoreV1Client实现了CoreV1Interface的接口, 从而对Kubernetes的资源对象进行操作:

源码

func (c *CoreV1Client) ComponentStatuses() ComponentStatusInterface {
	return newComponentStatuses(c)
}

func (c *CoreV1Client) ConfigMaps(namespace string) ConfigMapInterface {
	return newConfigMaps(c, namespace)
}

func (c *CoreV1Client) Endpoints(namespace string) EndpointsInterface {
	return newEndpoints(c, namespace)
}

func (c *CoreV1Client) Events(namespace string) EventInterface {
	return newEvents(c, namespace)
}

func (c *CoreV1Client) LimitRanges(namespace string) LimitRangeInterface {
	return newLimitRanges(c, namespace)
}

func (c *CoreV1Client) Namespaces() NamespaceInterface {
	return newNamespaces(c)
}

func (c *CoreV1Client) Nodes() NodeInterface {
	return newNodes(c)
}

func (c *CoreV1Client) PersistentVolumes() PersistentVolumeInterface {
	return newPersistentVolumes(c)
}

func (c *CoreV1Client) PersistentVolumeClaims(namespace string) PersistentVolumeClaimInterface {
	return newPersistentVolumeClaims(c, namespace)
}

func (c *CoreV1Client) Pods(namespace string) PodInterface {
	return newPods(c, namespace)
}

func (c *CoreV1Client) PodTemplates(namespace string) PodTemplateInterface {
	return newPodTemplates(c, namespace)
}

func (c *CoreV1Client) ReplicationControllers(namespace string) ReplicationControllerInterface {
	return newReplicationControllers(c, namespace)
}

func (c *CoreV1Client) ResourceQuotas(namespace string) ResourceQuotaInterface {
	return newResourceQuotas(c, namespace)
}

func (c *CoreV1Client) Secrets(namespace string) SecretInterface {
	return newSecrets(c, namespace)
}

func (c *CoreV1Client) Services(namespace string) ServiceInterface {
	return newServices(c, namespace)
}

func (c *CoreV1Client) ServiceAccounts(namespace string) ServiceAccountInterface {
	return newServiceAccounts(c, namespace)
}

2.4.3 CoreV1Interface

源码

type CoreV1Interface interface {
	RESTClient() rest.Interface
	ComponentStatusesGetter
	ConfigMapsGetter
	EndpointsGetter
	EventsGetter
	LimitRangesGetter
	NamespacesGetter
	NodesGetter
	PersistentVolumesGetter
	PersistentVolumeClaimsGetter
	PodsGetter
	PodTemplatesGetter
	ReplicationControllersGetter
	ResourceQuotasGetter
	SecretsGetter
	ServicesGetter
	ServiceAccountsGetter
}

CoreV1Interface中包含了各种Kubernetes对象的调用接口, 例如PodsGetter是对Kubernetes中pod对象增删改查操作的接口. ServiceGetter是对Service对象操作的接口.

2.4.4 PodsGetter

示例中的代码如下:

pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})

CoreV1().Pods()

源码

func (c *CoreV1Client) Pods(namespace string) PodInterface {
	return newPods(c, namespace)
}

newPods()

源码

// newPods returns a Pods
func newPods(c *CoreV1Client, namespace string) *pods {
	return &pods{
		client: c.RESTClient(),
		ns:     namespace,
	}
}

CoreV1().Pods()的方法实际上是调用了newPods()的方法, 创建了一个pods对象, pods对象实现了rest.Interface接口, 即最终的实现本质是RESTClient的HTTP调用.

源码

// pods implements PodInterface
type pods struct {
	client rest.Interface
	ns     string
}

pods对象实现了PodInterface接口. PodInterface定义了pods对象的增删改查等方法.

源码

// PodInterface has methods to work with Pod resources.
type PodInterface interface {
	Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
	Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
	UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
	DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
	Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
	List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
	Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
	ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
	UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

	PodExpansion
}

PodsGetter

PodsGetter继承了PodInterface的接口.

源码

// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
	Pods(namespace string) PodInterface
}

Pod().List()

pods.List方法通过RESTClient的HTTP调用来实现对Kubernetes的pod资源的获取 源码

// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.PodList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}

至此分析了clientset.CoreV1().Pods("").List(metaV1.ListOptions{})对pod资源获取的过程, 最终是调用RESTClient的方法实现.

2.5 RESTClient

下面分析RESTClient的创建过程及应用

RESTClient对象的创建同样是以来传入的config信息.

// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

2.5.1 rest.HTTPClientFor

// HTTPClientFor returns an http.Client that will provide the authentication
// or transport level security defined by the provided Config. Will return the
// default http.DefaultClient if no special case behavior is needed.
func HTTPClientFor(config *Config) (*http.Client, error) {
	transport, err := TransportFor(config)
	if err != nil {
		return nil, err
	}
	var httpClient *http.Client
	if transport != http.DefaultTransport || config.Timeout > 0 {
		httpClient = &http.Client{
			Transport: transport,
			Timeout:   config.Timeout,
		}
	} else {
		httpClient = http.DefaultClient
	}

	return httpClient, nil
}

2.5.2 rest.RESTClientForConfigAndClient

// RESTClientForConfigAndClient returns a RESTClient that satisfies the requested attributes on a
// client Config object.
// Unlike RESTClientFor, RESTClientForConfigAndClient allows to pass an http.Client that is shared
// between all the API Groups and Versions.
// Note that the http client takes precedence over the transport values configured.
// The http client defaults to the `http.DefaultClient` if nil.
func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
	if config.GroupVersion == nil {
		return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
	}
	if config.NegotiatedSerializer == nil {
		return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
	}

	baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
	if err != nil {
		return nil, err
	}

	rateLimiter := config.RateLimiter
	if rateLimiter == nil {
		qps := config.QPS
		if config.QPS == 0.0 {
			qps = DefaultQPS
		}
		burst := config.Burst
		if config.Burst == 0 {
			burst = DefaultBurst
		}
		if qps > 0 {
			rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
		}
	}

	var gv schema.GroupVersion
	if config.GroupVersion != nil {
		gv = *config.GroupVersion
	}
	clientContent := ClientContentConfig{
		AcceptContentTypes: config.AcceptContentTypes,
		ContentType:        config.ContentType,
		GroupVersion:       gv,
		Negotiator:         runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
	}

	restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
	if err == nil && config.WarningHandler != nil {
		restClient.warningHandler = config.WarningHandler
	}
	return restClient, err
}

2.5.3 rest.NewRESTClient

// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
	if len(config.ContentType) == 0 {
		config.ContentType = "application/json"
	}

	base := *baseURL
	if !strings.HasSuffix(base.Path, "/") {
		base.Path += "/"
	}
	base.RawQuery = ""
	base.Fragment = ""

	return &RESTClient{
		base:             &base,
		versionedAPIPath: versionedAPIPath,
		content:          config,
		createBackoffMgr: readExpBackoffConfig,
		rateLimiter:      rateLimiter,

		Client: client,
	}, nil
}

2.5.4 RESTClient

// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources.  The server should return a decodable API resource
// object, or an api.Status object which contains information about the reason for
// any failure.
//
// Most consumers should use client.New() to get a Kubernetes API client.
type RESTClient struct {
	// base is the root URL for all invocations of the client
	base *url.URL
	// versionedAPIPath is a path segment connecting the base URL to the resource root
	versionedAPIPath string

	// content describes how a RESTClient encodes and decodes responses.
	content ClientContentConfig

	// creates BackoffManager that is passed to requests.
	createBackoffMgr func() BackoffManager

	// rateLimiter is shared among all requests created by this client unless specifically
	// overridden.
	rateLimiter flowcontrol.RateLimiter

	// warningHandler is shared among all requests created by this client.
	// If not set, defaultWarningHandler is used.
	warningHandler WarningHandler

	// Set specific behavior of the client.  If not set http.DefaultClient will be used.
	Client *http.Client
}

2.5.5 rest.Interface

// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

在调用HTTP方法(Post(), Put(), Get(), Delete() )时, 实际上调用了Verb(verb string)函数.

// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
// c, err := NewRESTClient(...)
// if err != nil { ... }
// resp, err := c.Verb("GET").
//
//	Path("pods").
//	SelectorParam("labels", "area=staging").
//	Timeout(10*time.Second).
//	Do()
//
// if err != nil { ... }
// list, ok := resp.(*api.PodList)
func (c *RESTClient) Verb(verb string) *Request {
	return NewRequest(c).Verb(verb)
}

Verb函数调用了NewRequest方法, 最后调用Do()方法实现了一个HTTP请求获取Result.

2.6 总结

client-go对kubernetes资源对象的调用, 需要先获取kubernetes的配置信息, 即$HOME/.kube/config. (或使用Token)

整个调用的过程如下:

kubeconfig→rest.config→clientset→具体的client(CoreV1Client)→具体的资源对象(pod)→RESTClient→http.Client→HTTP请求的发送及响应

通过clientset中不同的client和client中不同资源对象的方法实现对kubernetes中资源对象的增删改查等操作, 常用的client有CoreV1Client、AppsV1beta1Client、ExtensionsV1beta1Client等.

3. client-go对Kubernetes资源的调用

创建clientSet

//获取kubeconfig
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
//创建config  
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
//创建clientset
clientset, err := kubernetes.NewForConfig(config)
func NewKubernetesClient(c *Config) (*kubernetes.Clientset, error) {
	kubeConf := &rest.Config{
		Host:        fmt.Sprintf("%s:%d", c.Host, c.Port),
		BearerToken: c.Token,
		TLSClientConfig: rest.TLSClientConfig{
			Insecure: true,
		},
	}
	return kubernetes.NewForConfig(kubeConf)
}

3.1 deployment

//声明deployment对象
var deployment *v1beta1.Deployment
//构造deployment对象
//创建deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Create(<deployment>)
//更新deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Update(<deployment>)
//删除deployment
err := clientset.AppsV1beta1().Deployments(<namespace>).Delete(<deployment.Name>, &meta_v1.DeleteOptions{})
//查询deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Get(<deployment.Name>, meta_v1.GetOptions{})
//列出deployment
deploymentList, err := clientset.AppsV1beta1().Deployments(<namespace>).List(&meta_v1.ListOptions{})
//watch deployment
watchInterface, err := clientset.AppsV1beta1().Deployments(<namespace>).Watch(&meta_v1.ListOptions{})

3.2 service

//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})

3.3 ingress

//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})

replicaSet、pod、statefulset....... (kubernetes中一般通过deployment来创建replicaSet, 再通过replicaSet来控制pod. )

通过以上对Kubernetes的资源对象的操作函数可以看出, 每个资源对象都有增删改查等方法, 基本调用逻辑类似. 一般二次开发只需要创建deployment、service、ingress三个资源对象即可, pod对象由deployment包含的replicaSet来控制创建和删除. 函数调用的入参一般只有NAMESPACE和kubernetesObject两个参数, 部分操作有Options的参数. 在创建前, 需要对资源对象构造数据, 可以理解为编辑一个资源对象的yaml文件, 然后通过kubectl create -f xxx.yaml来创建对象.