client-go 简介
Kubernetes 是大家所非常熟悉的集群管理平台, 我们通常在 Web 界面上对集群内部的 pod 和 service 进行操作. 如果想要在程序代码中对集群中的 pod 进行操作或者获取其中某些容器的状态信息则可以使用 Kubernetes 官方给出的 API 接口. 由于这些接口的参数非常复杂, 所以通常我们会使用client-go
这个外部库来进行 API 的操作.
本文将详细介绍 client-go
的操作和源码实现. 文中涉及大量的源码, 未使用链接的原因是为了避免源码版本的变更.
0. Kubernetes API
1. client-go 简介
1.1 client-go 说明
client-go 是一个调用 Kubernetes 集群资源对象 API 的客户端, 即通过 client-go 实现对 Kubernetes 集群中资源对象(包括 Deployment、Service、Ingress、Stateful Set、Pod、Namespace、Node 等)的增删改查等动作. 大部分对 Kubernetes 进行前置 API 封装的二次开发都通过 client-go 这个第三方包来实现.
1.2 示例代码
func GetPods(clientSet *kubernetes.Clientset) {
for {
pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
log.Info().Msgf("There are %d pods in the cluster\n", len(pods.Items))
time.Sleep(10 * time.Second)
}
}
1.3 运行结果
=== RUN TestGetPods
{"level":"info","time":"2024-07-05T15:57:22+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:33+08:00","message":"There are 4963 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:45+08:00","message":"There are 4958 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:57:57+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:08+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:20+08:00","message":"There are 4950 pods in the cluster\n"}
{"level":"info","time":"2024-07-05T15:58:32+08:00","message":"There are 4946 pods in the cluster\n"}
2. client-go 源码分析
client-go 源码目录结构
- The
kubernetes
package contains the clientset to access Kubernetes API. - The
discovery
package is used to discover APIs supported by a Kubernetes API server. - The
dynamic
package contains a dynamic client that can perform generic operations on arbitrary Kubernetes API objects. - The
plugin/pkg/client/auth
packages contain optional authentication plugins for obtaining credentials from external sources. - The
transport
package is used to set up auth and start a connection. - The
tools/cache
package is useful for writing controllers.
2.1 kubeconfig
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
获取 Kubernetes 配置文件 kubeconfig 的绝对路径. 一般路径为 $HOME/.kube/config
. 该文件主要用来配置本地连接的 Kubernetes 集群.
config 内容示例:
apiVersion: v1
clusters:
- cluster:
server: http://<kube-master-ip>:8080
name: k8s
contexts:
- context:
cluster: k8s
namespace: default
user: ""
name: default
current-context: default
kind: Config
preferences: {}
users: []
2.2 rest.config
通过参数(master 的 url 或者 kubeconfig 路径)和 BuildConfigFromFlags
方法来获取 rest.Config
对象, 一般是通过参数 kubeconfig 的路径.
config,err := clientcmd.BuildConfigFromFlags("",*kubeconfig)
BuildConfigFromFlags 函数源码
// BuildConfigFromFlags is a helper function that builds configs from a master
// url or a kubeconfig filepath. These are passed in as command line flags for cluster
// components. Warnings should reflect this usage. If neither masterUrl or kubeconfigPath
// are passed in we fallback to inClusterConfig. If inClusterConfig fails, we fallback
// to the default config.
func BuildConfigFromFlags(masterUrl, kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" && masterUrl == "" {
klog.Warning("Neither --kubeconfig nor --master was specified. Using the inClusterConfig. This might not work.")
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
klog.Warning("error creating inClusterConfig, falling back to default config: ", err)
}
return NewNonInteractiveDeferredLoadingClientConfig(
&ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: masterUrl}}).ClientConfig()
}
2.3 clientset
通过 *rest.Config
参数和 NewForConfig
方法来获取 clientset
对象, clientset
是多个 client
的集合, 每个 client
可能包含不同版本的方法调用.
clientset, err := kubernetes.NewForConfig(config)
2.3.1 NewForConfig
NewForConfig
函数就是初始化 clientset 中的每个 client.
// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.UserAgent == "" {
configShallowCopy.UserAgent = rest.DefaultKubernetesUserAgent()
}
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&configShallowCopy, httpClient)
}
2.3.2 Clientset 的结构体
// Clientset contains the clients for groups.
type Clientset struct {
*discovery.DiscoveryClient
admissionregistrationV1 *admissionregistrationv1.AdmissionregistrationV1Client
admissionregistrationV1alpha1 *admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Client
admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
internalV1alpha1 *internalv1alpha1.InternalV1alpha1Client
appsV1 *appsv1.AppsV1Client
appsV1beta1 *appsv1beta1.AppsV1beta1Client
appsV1beta2 *appsv1beta2.AppsV1beta2Client
authenticationV1 *authenticationv1.AuthenticationV1Client
authenticationV1alpha1 *authenticationv1alpha1.AuthenticationV1alpha1Client
authenticationV1beta1 *authenticationv1beta1.AuthenticationV1beta1Client
authorizationV1 *authorizationv1.AuthorizationV1Client
authorizationV1beta1 *authorizationv1beta1.AuthorizationV1beta1Client
autoscalingV1 *autoscalingv1.AutoscalingV1Client
autoscalingV2 *autoscalingv2.AutoscalingV2Client
autoscalingV2beta1 *autoscalingv2beta1.AutoscalingV2beta1Client
autoscalingV2beta2 *autoscalingv2beta2.AutoscalingV2beta2Client
batchV1 *batchv1.BatchV1Client
batchV1beta1 *batchv1beta1.BatchV1beta1Client
certificatesV1 *certificatesv1.CertificatesV1Client
certificatesV1beta1 *certificatesv1beta1.CertificatesV1beta1Client
certificatesV1alpha1 *certificatesv1alpha1.CertificatesV1alpha1Client
coordinationV1beta1 *coordinationv1beta1.CoordinationV1beta1Client
coordinationV1 *coordinationv1.CoordinationV1Client
coreV1 *corev1.CoreV1Client
discoveryV1 *discoveryv1.DiscoveryV1Client
discoveryV1beta1 *discoveryv1beta1.DiscoveryV1beta1Client
eventsV1 *eventsv1.EventsV1Client
eventsV1beta1 *eventsv1beta1.EventsV1beta1Client
extensionsV1beta1 *extensionsv1beta1.ExtensionsV1beta1Client
flowcontrolV1alpha1 *flowcontrolv1alpha1.FlowcontrolV1alpha1Client
flowcontrolV1beta1 *flowcontrolv1beta1.FlowcontrolV1beta1Client
flowcontrolV1beta2 *flowcontrolv1beta2.FlowcontrolV1beta2Client
flowcontrolV1beta3 *flowcontrolv1beta3.FlowcontrolV1beta3Client
networkingV1 *networkingv1.NetworkingV1Client
networkingV1alpha1 *networkingv1alpha1.NetworkingV1alpha1Client
networkingV1beta1 *networkingv1beta1.NetworkingV1beta1Client
nodeV1 *nodev1.NodeV1Client
nodeV1alpha1 *nodev1alpha1.NodeV1alpha1Client
nodeV1beta1 *nodev1beta1.NodeV1beta1Client
policyV1 *policyv1.PolicyV1Client
policyV1beta1 *policyv1beta1.PolicyV1beta1Client
rbacV1 *rbacv1.RbacV1Client
rbacV1beta1 *rbacv1beta1.RbacV1beta1Client
rbacV1alpha1 *rbacv1alpha1.RbacV1alpha1Client
resourceV1alpha2 *resourcev1alpha2.ResourceV1alpha2Client
schedulingV1alpha1 *schedulingv1alpha1.SchedulingV1alpha1Client
schedulingV1beta1 *schedulingv1beta1.SchedulingV1beta1Client
schedulingV1 *schedulingv1.SchedulingV1Client
storageV1beta1 *storagev1beta1.StorageV1beta1Client
storageV1 *storagev1.StorageV1Client
storageV1alpha1 *storagev1alpha1.StorageV1alpha1Client
}
2.3.3 clientset.Interface
clientset 实现了以下的 Interface, 因此可以通过调用以下方法获得具体的 client. 例如:
pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
clientset 的方法集接口
type Interface interface {
Discovery() discovery.DiscoveryInterface
AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
AuthenticationV1() authenticationv1.AuthenticationV1Interface
AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
AuthorizationV1() authorizationv1.AuthorizationV1Interface
AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
AutoscalingV1() autoscalingv1.AutoscalingV1Interface
AutoscalingV2() autoscalingv2.AutoscalingV2Interface
AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
BatchV1() batchv1.BatchV1Interface
BatchV1beta1() batchv1beta1.BatchV1beta1Interface
CertificatesV1() certificatesv1.CertificatesV1Interface
CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
CoordinationV1() coordinationv1.CoordinationV1Interface
CoreV1() corev1.CoreV1Interface
DiscoveryV1() discoveryv1.DiscoveryV1Interface
DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
EventsV1() eventsv1.EventsV1Interface
EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
NetworkingV1() networkingv1.NetworkingV1Interface
NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
NodeV1() nodev1.NodeV1Interface
NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
NodeV1beta1() nodev1beta1.NodeV1beta1Interface
PolicyV1() policyv1.PolicyV1Interface
PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
RbacV1() rbacv1.RbacV1Interface
RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
SchedulingV1() schedulingv1.SchedulingV1Interface
StorageV1beta1() storagev1beta1.StorageV1beta1Interface
StorageV1() storagev1.StorageV1Interface
StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}
2.4 CoreV1Client
我们以 clientset 中的 CoreV1Client
为例做分析
通过传入的配置信息 rest.Config
初始化 CoreV1Client
对象.
cs.coreV1, err = corev1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
return nil, err
}
2.4.1 coreV1.NewForConfig
// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(&config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&config, httpClient)
}
coreV1.NewForConfig
方法本质是调用了 rest.HTTPClientFor(&config)
方法创建 *http.Client
对象, 即 CoreV1Client
的本质就是一个 http.Client
对象.
2.4.2 CoreV1Client 结构体
// CoreV1Client is used to interact with features provided by the group.
type CoreV1Client struct {
restClient rest.Interface
}
CoreV1Client
实现了 CoreV1Interface
的接口, 从而对 Kubernetes 的资源对象进行操作:
func (c *CoreV1Client) ComponentStatuses() ComponentStatusInterface {
return newComponentStatuses(c)
}
func (c *CoreV1Client) ConfigMaps(namespace string) ConfigMapInterface {
return newConfigMaps(c, namespace)
}
func (c *CoreV1Client) Endpoints(namespace string) EndpointsInterface {
return newEndpoints(c, namespace)
}
func (c *CoreV1Client) Events(namespace string) EventInterface {
return newEvents(c, namespace)
}
func (c *CoreV1Client) LimitRanges(namespace string) LimitRangeInterface {
return newLimitRanges(c, namespace)
}
func (c *CoreV1Client) Namespaces() NamespaceInterface {
return newNamespaces(c)
}
func (c *CoreV1Client) Nodes() NodeInterface {
return newNodes(c)
}
func (c *CoreV1Client) PersistentVolumes() PersistentVolumeInterface {
return newPersistentVolumes(c)
}
func (c *CoreV1Client) PersistentVolumeClaims(namespace string) PersistentVolumeClaimInterface {
return newPersistentVolumeClaims(c, namespace)
}
func (c *CoreV1Client) Pods(namespace string) PodInterface {
return newPods(c, namespace)
}
func (c *CoreV1Client) PodTemplates(namespace string) PodTemplateInterface {
return newPodTemplates(c, namespace)
}
func (c *CoreV1Client) ReplicationControllers(namespace string) ReplicationControllerInterface {
return newReplicationControllers(c, namespace)
}
func (c *CoreV1Client) ResourceQuotas(namespace string) ResourceQuotaInterface {
return newResourceQuotas(c, namespace)
}
func (c *CoreV1Client) Secrets(namespace string) SecretInterface {
return newSecrets(c, namespace)
}
func (c *CoreV1Client) Services(namespace string) ServiceInterface {
return newServices(c, namespace)
}
func (c *CoreV1Client) ServiceAccounts(namespace string) ServiceAccountInterface {
return newServiceAccounts(c, namespace)
}
2.4.3 CoreV1Interface
type CoreV1Interface interface {
RESTClient() rest.Interface
ComponentStatusesGetter
ConfigMapsGetter
EndpointsGetter
EventsGetter
LimitRangesGetter
NamespacesGetter
NodesGetter
PersistentVolumesGetter
PersistentVolumeClaimsGetter
PodsGetter
PodTemplatesGetter
ReplicationControllersGetter
ResourceQuotasGetter
SecretsGetter
ServicesGetter
ServiceAccountsGetter
}
CoreV1Interface
中包含了各种 Kubernetes 对象的调用接口, 例如 PodsGetter
是对 Kubernetes 中 pod
对象增删改查操作的接口. ServiceGetter
是对 Service
对象操作的接口.
2.4.4 PodsGetter
示例中的代码如下:
pods, err := clientSet.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
CoreV1().Pods()
func (c *CoreV1Client) Pods(namespace string) PodInterface {
return newPods(c, namespace)
}
newPods()
// newPods returns a Pods
func newPods(c *CoreV1Client, namespace string) *pods {
return &pods{
client: c.RESTClient(),
ns: namespace,
}
}
CoreV1().Pods()
的方法实际上是调用了 newPods()
的方法, 创建了一个 pods
对象, pods
对象实现了 rest.Interface
接口, 即最终的实现本质是 RESTClient
的HTTP调用.
// pods implements PodInterface
type pods struct {
client rest.Interface
ns string
}
pods
对象实现了 PodInterface
接口. PodInterface
定义了 pods
对象的增删改查等方法.
// PodInterface has methods to work with Pod resources.
type PodInterface interface {
Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
PodExpansion
}
PodsGetter
PodsGetter 继承了 PodInterface 的接口.
// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
Pods(namespace string) PodInterface
}
Pod().List()
pods.List 方法通过 RESTClient
的 HTTP 调用来实现对 Kubernetes 的 pod 资源的获取. 源码
// List takes label and field selectors, and returns the list of Pods that match those selectors.
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
至此分析了 clientset.CoreV1().Pods("").List(metaV1.ListOptions{})
对 pod 资源获取的过程, 最终是调用 RESTClient
的方法实现.
2.5 RESTClient
下面分析 RESTClient
的创建过程及应用.
RESTClient
对象的创建同样是以来传入的 config 信息.
// NewForConfig creates a new CoreV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(&config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&config, httpClient)
}
2.5.1 rest.HTTPClientFor
// HTTPClientFor returns an http.Client that will provide the authentication
// or transport level security defined by the provided Config. Will return the
// default http.DefaultClient if no special case behavior is needed.
func HTTPClientFor(config *Config) (*http.Client, error) {
transport, err := TransportFor(config)
if err != nil {
return nil, err
}
var httpClient *http.Client
if transport != http.DefaultTransport || config.Timeout > 0 {
httpClient = &http.Client{
Transport: transport,
Timeout: config.Timeout,
}
} else {
httpClient = http.DefaultClient
}
return httpClient, nil
}
2.5.2 rest.RESTClientForConfigAndClient
// RESTClientForConfigAndClient returns a RESTClient that satisfies the requested attributes on a
// client Config object.
// Unlike RESTClientFor, RESTClientForConfigAndClient allows to pass an http.Client that is shared
// between all the API Groups and Versions.
// Note that the http client takes precedence over the transport values configured.
// The http client defaults to the `http.DefaultClient` if nil.
func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)
if err != nil {
return nil, err
}
rateLimiter := config.RateLimiter
if rateLimiter == nil {
qps := config.QPS
if config.QPS == 0.0 {
qps = DefaultQPS
}
burst := config.Burst
if config.Burst == 0 {
burst = DefaultBurst
}
if qps > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
}
}
var gv schema.GroupVersion
if config.GroupVersion != nil {
gv = *config.GroupVersion
}
clientContent := ClientContentConfig{
AcceptContentTypes: config.AcceptContentTypes,
ContentType: config.ContentType,
GroupVersion: gv,
Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
}
restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
if err == nil && config.WarningHandler != nil {
restClient.warningHandler = config.WarningHandler
}
return restClient, err
}
2.5.3 rest.NewRESTClient
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
content: config,
createBackoffMgr: readExpBackoffConfig,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
2.5.4 RESTClient
// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources. The server should return a decodable API resource
// object, or an api.Status object which contains information about the reason for
// any failure.
//
// Most consumers should use client.New() to get a Kubernetes API client.
type RESTClient struct {
// base is the root URL for all invocations of the client
base *url.URL
// versionedAPIPath is a path segment connecting the base URL to the resource root
versionedAPIPath string
// content describes how a RESTClient encodes and decodes responses.
content ClientContentConfig
// creates BackoffManager that is passed to requests.
createBackoffMgr func() BackoffManager
// rateLimiter is shared among all requests created by this client unless specifically
// overridden.
rateLimiter flowcontrol.RateLimiter
// warningHandler is shared among all requests created by this client.
// If not set, defaultWarningHandler is used.
warningHandler WarningHandler
// Set specific behavior of the client. If not set http.DefaultClient will be used.
Client *http.Client
}
2.5.5 rest.Interface
// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
在调用 HTTP 方法(Post(), Put(), Get(), Delete() )时, 实际上调用了 Verb(verb string)函数.
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
// c, err := NewRESTClient(...)
// if err != nil { ... }
// resp, err := c.Verb("GET").
//
// Path("pods").
// SelectorParam("labels", "area=staging").
// Timeout(10*time.Second).
// Do()
//
// if err != nil { ... }
// list, ok := resp.(*api.PodList)
func (c *RESTClient) Verb(verb string) *Request {
return NewRequest(c).Verb(verb)
}
Verb 函数调用了 NewRequest 方法, 最后调用 Do() 方法实现了一个 HTTP 请求获取 Result.
2.6 总结
client-go 对 kubernetes 资源对象的调用, 需要先获取 kubernetes 的配置信息, 即 $HOME/.kube/config
. (或使用 Token)
整个调用的过程如下:
kubeconfig -> rest.config -> clientset -> 具体的 client(CoreV1Client) -> 具体的资源对象(pod)-> RESTClient -> http.Client -> HTTP 请求的发送及响应.
通过 clientset 中不同的 client 和 client 中不同资源对象的方法实现对 kubernetes 中资源对象的增删改查等操作, 常用的 client 有 CoreV1Client、AppsV1beta1Client、ExtensionsV1beta1Client 等.
3. client-go 对 Kubernetes 资源的调用
创建clientSet
//获取kubeconfig
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
//创建config
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
//创建clientset
clientset, err := kubernetes.NewForConfig(config)
func NewKubernetesClient(c *Config) (*kubernetes.Clientset, error) {
kubeConf := &rest.Config{
Host: fmt.Sprintf("%s:%d", c.Host, c.Port),
BearerToken: c.Token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
return kubernetes.NewForConfig(kubeConf)
}
3.1 deployment
//声明deployment对象
var deployment *v1beta1.Deployment
//构造deployment对象
//创建deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Create(<deployment>)
//更新deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Update(<deployment>)
//删除deployment
err := clientset.AppsV1beta1().Deployments(<namespace>).Delete(<deployment.Name>, &meta_v1.DeleteOptions{})
//查询deployment
deployment, err := clientset.AppsV1beta1().Deployments(<namespace>).Get(<deployment.Name>, meta_v1.GetOptions{})
//列出deployment
deploymentList, err := clientset.AppsV1beta1().Deployments(<namespace>).List(&meta_v1.ListOptions{})
//watch deployment
watchInterface, err := clientset.AppsV1beta1().Deployments(<namespace>).Watch(&meta_v1.ListOptions{})
3.2 service
//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})
3.3 ingress
//声明service对象
var service *v1.Service
//构造service对象
//创建service
service, err := clientset.CoreV1().Services(<namespace>).Create(<service>)
//更新service
service, err := clientset.CoreV1().Services(<namespace>).Update(<service>)
//删除service
err := clientset.CoreV1().Services(<namespace>).Delete(<service.Name>, &meta_v1.DeleteOptions{})
//查询service
service, err := clientset.CoreV1().Services(<namespace>).Get(<service.Name>, meta_v1.GetOptions{})
//列出service
serviceList, err := clientset.CoreV1().Services(<namespace>).List(&meta_v1.ListOptions{})
//watch service
watchInterface, err := clientset.CoreV1().Services(<namespace>).Watch(&meta_v1.ListOptions{})
replicaSet、pod、statefulset....... (kubernetes 中一般通过 deployment 来创建 replicaSet, 再通过 replicaSet 来控制 pod.)
通过以上对 Kubernetes 的资源对象的操作函数可以看出, 每个资源对象都有增删改查等方法, 基本调用逻辑类似. 一般二次开发只需要创建 deployment、service、ingress 三个资源对象即可, pod 对象由 deployment 包含的 replicaSet 来控制创建和删除. 函数调用的入参一般只有 NAMESPACE 和 kubernetesObject 两个参数, 部分操作有 Options 的参数. 在创建前, 需要对资源对象构造数据, 可以理解为编辑一个资源对象的 yaml 文件, 然后通过 kubectl create -f xxx.yaml 来创建对象.
变更历史
最后更新于: 查看全部变更历史
first commit
于 2024/9/20整理文章
于 2024/9/23更改主题色+更改首页
于 2024/9/23调整博客分类+修改about-me.md
于 2024/9/24给予文件夹顺序
于 2024/9/24新增文字+CRLF全部替换为LF
于 2024/10/17update
于 2024/10/17升级主题+整理文章格式
于 2024/11/1整理文章格式
于 2024/11/1更改navbar
于 2024/11/4docs: update docs
于 2024/11/19docs: update docs
于 2024/11/21modify(docs): remanage folders and rename files
于 2024/12/17