Skip to content

client-go 简介

约 3526 字大约 12 分钟

Go部署

2024-07-09

Kubernetes 是大家所非常熟悉的集群管理平台,我们通常在 Web 界面上对集群内部的 pod 和 service 进行操作。如果想要在程序代码中对集群中的 pod 进行操作或者获取其中某些容器的状态信息则可以使用 Kubernetes 官方给出的 API 接口。由于这些接口的参数非常复杂,所以通常我们会使用client-go 这个外部库来进行 API 的操作。

本文将详细介绍 client-go 的操作和源码实现。文中涉及大量的源码,未使用链接的原因是为了避免源码版本的变更。

快速链接:API 文档

简介

client-go 说明

client-go 是一个调用 Kubernetes 集群资源对象 API 的客户端,即通过 client-go 实现对 Kubernetes 集群中资源对象(包括 Deployment、Service、Ingress、Stateful Set、Pod、Namespace、Node 等)的增删改查等动作。大部分对 Kubernetes 进行前置 API 封装的二次开发都通过 client-go 这个第三方包来实现。

示例代码

func GetPods(clientSet *kubernetes.Clientset) {
	for {
		pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
		if err != nil {
			panic(err.Error())
		}
		log.Info().Msgf("There are %d pods in the cluster\n", len(pods.Items))
		time.Sleep(10 * time.Second)
	}
}

运行结果

=== RUN   TestGetPods
{"level":"info","time":"2024-07-05T15:57:22+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:33+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:45+08:00","message":"There are 4958 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:57+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:08+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:20+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:32+08:00","message":"There are 4946 pods in the cluster\n"}

client-go 源码分析

client-go 源码目录结构

  • The kubernetes package contains the clientset to access Kubernetes API.
  • The discovery package is used to discover APIs supported by a Kubernetes API server.
  • The dynamic package contains a dynamic client that can perform generic operations on arbitrary Kubernetes API objects.
  • The plugin/pkg/client/auth packages contain optional authentication plugins for obtaining credentials from external sources.
  • The transport package is used to set up auth and start a connection.
  • The tools/cache package is useful for writing controllers.

kubeconfig

kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")

获取 Kubernetes 配置文件 kubeconfig 的绝对路径。一般路径为 $HOME/.kube/config。该文件主要用来配置本地连接的 Kubernetes 集群。

config 内容示例:

apiVersion: v1
clusters:
- cluster:
    server: http://<kube-master-ip>:8080
  name: k8s
contexts:
- context:
    cluster: k8s
    namespace: default
    user: ""
  name: default
current-context: default
kind: Config
preferences: {}
users: []

rest.config

通过参数(master 的 url 或者 kubeconfig 路径)和 BuildConfigFromFlags 方法来获取 rest.Config 对象,一般是通过参数 kubeconfig 的路径。

config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)

BuildConfigFromFlags 函数源码

// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
	if kubeconfigPath == "" && masterUrl == "" {
		klog.Warning("Neither --kubeconfig nor --master was specified.  Using the inClusterConfig.  This might not work.")
		kubeconfig, err := restclient.InClusterConfig()
		if err == nil {
			return kubeconfig, nil
		}
		klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
	}
	return NewNonInteractiveDeferredLoadingClientConfig(
		&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
		&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}

clientset

通过 *rest.Config 参数和 NewForConfig 方法来获取 clientset 对象,clientset 是多个 client 的集合,每个 client 可能包含不同版本的方法调用。

clientset, err := kubernetes.NewForConfig(config)

NewForConfig

NewForConfig 函数就是初始化 clientset 中的每个 client。

Clientset 的结构体

clientset.Interface

clientset 实现了以下的 Interface,因此可以通过调用以下方法获得具体的 client。例如:

pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
clientset 的方法集接口

CoreV1Client

我们以 clientset 中的 CoreV1Client 为例做分析。通过传入的配置信息 rest.Config 初始化 CoreV1Client 对象。

cs.coreV1, err = corev1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
	return nil, err
}

coreV1.NewForConfig

// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

coreV1.NewForConfig 方法本质是调用了 rest.HTTPClientFor(&config) 方法创建 *http.Client 对象,即 CoreV1Client 的本质就是一个 http.Client 对象。

CoreV1Client 结构体

// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
	restClient rest.Interface
}

CoreV1Client 实现了 CoreV1Interface 的接口,从而对 Kubernetes 的资源对象进行操作:

CoreV1Interface

type CoreV1Interface interface {
	RESTClient() rest.Interface
	ComponentStatusesGetter
	ConfigMapsGetter
	EndpointsGetter
	EventsGetter
	LimitRangesGetter
	NamespacesGetter
	NodesGetter
	PersistentVolumesGetter
	PersistentVolumeClaimsGetter
	PodsGetter
	PodTemplatesGetter
	ReplicationControllersGetter
	ResourceQuotasGetter
	SecretsGetter
	ServicesGetter
	ServiceAccountsGetter
}

CoreV1Interface 中包含了各种 Kubernetes 对象的调用接口,例如 PodsGetter 是对 Kubernetes 中 pod 对象增删改查操作的接口。ServiceGetter 是对 Service 对象操作的接口。

PodsGetter

示例中的代码如下:

pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})

CoreV1().Pods()

func (c *CoreV1Client) Pods(namespace string) PodInterface {
	return newPods(c, namespace)
}

newPods()

// newPods returns a Pods
func newPods(c *CoreV1Client, namespace string) *pods {
	return &pods{
		client: c.RESTClient(),
		ns:     namespace,
	}
}

CoreV1().Pods() 的方法实际上是调用了 newPods() 的方法,创建了一个 pods 对象,pods 对象实现了 rest.Interface 接口,即最终的实现本质是 RESTClient 的 HTTP 调用。

// pods implements PodInterface
type pods struct {
	client rest.Interface
	ns     string
}

pods 对象实现了 PodInterface 接口。PodInterface 定义了 pods 对象的增删改查等方法。

// PodInterface has methods to work with Pod resources.
type PodInterface interface {
	Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
	Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
	UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
	DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
	Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
	List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
	Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
	ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
	UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

	PodExpansion
}

PodsGetter

PodsGetter 继承了 PodInterface 的接口。

// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
	Pods(namespace string) PodInterface
}

Pod().List()

pods.List 方法通过 RESTClient 的 HTTP 调用来实现对 Kubernetes 的 pod 资源的获取。

// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.PodList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}

至此分析了 clientset.CoreV1().Pods("").List(metaV1.ListOptions{}) 对 pod 资源获取的过程,最终是调用 RESTClient 的方法实现。

RESTClient

下面分析 RESTClient 的创建过程及应用。

RESTClient 对象的创建同样是以来传入的 config 信息。

// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

rest.HTTPClientFor

rest.RESTClientForConfigAndClient

rest.NewRESTClient

RESTClient

2.5.5 rest.Interface

// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

在调用 HTTP 方法(Post(),Put(),Get(),Delete())时,实际上调用了 Verb(verb string) 函数。

// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
// c, err := NewRESTClient(...)
// if err != nil { ... }
// resp, err := c.Verb("GET").
//
//	Path("pods").
//	SelectorParam("labels", "area=staging").
//	Timeout(10*time.Second).
//	Do()
//
// if err != nil { ... }
// list, ok := resp.(*api.PodList)
func (c *RESTClient) Verb(verb string) *Request {
	return NewRequest(c).Verb(verb)
}

Verb 函数调用了 NewRequest 方法,最后调用 Do() 方法实现了一个 HTTP 请求获取 Result。

总结

client-go 对 kubernetes 资源对象的调用,需要先获取 kubernetes 的配置信息(或使用 Token),即 $HOME/.kube/config

整个调用的过程如下:

kubeconfig -> rest.config -> clientset -> 具体的 client(CoreV1Client) -> 具体的资源对象(pod)-> RESTClient -> http.Client -> HTTP 请求的发送及响应。

通过 clientset 中不同的 client 和 client 中不同资源对象的方法实现对 kubernetes 中资源对象的增删改查等操作,常用的 client 有 CoreV1Client、AppsV1beta1Client、ExtensionsV1beta1Client 等。

client-go 对 Kubernetes 资源的调用

创建 clientSet

//获取kubeconfig
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
//创建config
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
//创建clientset
clientset, err := kubernetes.NewForConfig(config)
func NewKubernetesClient(c *Config) (*kubernetes.Clientset, error) {
	kubeConf := &rest.Config{
		Host:        fmt.Sprintf("%s:%d", c.Host, c.Port),
		BearerToken: c.Token,
		TLSClientConfig: rest.TLSClientConfig{
			Insecure: true,
		},
	}
	return kubernetes.NewForConfig(kubeConf)
}

deployment

//声明deployment对象
var deployment *v1beta1.Deployment
//构造deployment对象
//创建deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Create(<deployment>)
//更新deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Update(<deployment>)
//删除deployment
err := clientset.AppsV1beta1().Deployments(<namespace>).Delete(<deployment.Name>, &meta_v1.DeleteOptions{})
//查询deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Get(<deployment.Name>, meta_v1.GetOptions{})
//列出deployment
deploymentList, err := clientset.AppsV1beta1().Deployments(<namespace>).List(&meta_v1.ListOptions{})
//watch deployment
watchInterface, err := clientset.AppsV1beta1().Deployments(<namespace>).Watch(&meta_v1.ListOptions{})

service

//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})

ingress

//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})

kubernetes 中一般通过 deployment 来创建 replicaSet,再通过 replicaSet 来控制 pod。

通过以上对 Kubernetes 的资源对象的操作函数可以看出,每个资源对象都有增删改查等方法,基本调用逻辑类似。一般二次开发只需要创建 deployment、service、ingress 三个资源对象即可,pod 对象由 deployment 包含的 replicaSet 来控制创建和删除。函数调用的入参一般只有 NAMESPACE 和 kubernetesObject 两个参数,部分操作有 Options 的参数。在创建前,需要对资源对象构造数据,可以理解为编辑一个资源对象的 yaml 文件,然后通过 kubectl create -f xxx.yaml 来创建对象。

贡献者

更新日志

2025/3/6 02:33
查看所有更新日志
  • 876bb-improve(docs): use chinese punctuation
  • 42218-fix(docs): text typo
  • 1289a-improve(docs): delete extra whitespace and blank lines
  • c2111-modify(docs): remanage folders and rename files
  • 1650f-docs: update docs
  • 96e66-docs: update docs
  • 0440f-更改navbar
  • 4babc-整理文章格式
  • 71726-升级主题+整理文章格式
  • f86ee-update

Keep It Simple