使用 Kubernetes API 写点东西

Posted by ShenHengheng on 2019-03-13

本篇文章翻译自博客:https://medium.com/programming-kubernetes/building-stuff-with-the-kubernetes-api-part-4-using-go-b1d0e3c1c899

由于一直向通过一个client-go案例进入kubernetes api的开发,因此我想借这篇文章开启我之后的kubernetes api开发。当然 kubernetes的客户端不仅仅只有client-go一个,虽然client-go是最原生的一种客户端,另外还有python、java等实现的。下面将访问kubernetes集群的几种方式列举下来,仅供参考。(注:表格来自 https://jimmysong.io/kubernetes-handbook/develop/client-go-sample.html)

方式 特点 支持者
Kubernetes dashboard 直接通过Web UI进行操作,简单直接,可定制化程度低 官方支持
kubectl 命令行操作,功能最全,但是比较复杂,适合对其进行进一步的分装,定制功能,版本适配最好 官方支持
client-go 从kubernetes的代码中抽离出来的客户端包,简单易用,但需要小心区分kubernetes的API版本 官方支持
client-python python客户端,kubernetes-incubator 官方支持
Java client fabric8中的一部分,kubernetes的java客户端 redhat

下面将使用 client-go 实现一个简单的 PVC 对象监控工具,用于监控我们持久化卷的容量并告警。

The Kubernetes Go Client Project (client-go)

在进入代码之前,了解Kubernetes Go客户端(或client-go)项目是非常有必要的。它是Kubernetes客户端框架中最古老的,最原生的,因此具有更多的机关和功能。Client-go不使用SwaggerAPI生成器,就像我们在之前的帖子中介绍过的OpenAPI客户端一样。相反,它使用源自Kubernetes项目的源代码生成器来创建Kubernetes样式的API对象和序列化对象。

client-go是由很多包构成,可以满足从REST访问的编程范式到更复杂客户端的不同编程需求。

img

RESTClient 是一个基础包,它使用api-machinery存储库中的类型来提供对作为一组REST原语的API的访问。作为RESTClient之上的抽象而构建,Clientset 将是创建简单的Kubernetes客户端工具的起点。它公开了版本化的API资源及其序列化程序。>

There are several other packages in client-go including discovery, dynamic, and scale. While we are not going to cover these packages, it is important to be aware of their capabilities.

A simple client tool for Kubernetes

img

You can find the complete example on GitHub.

这个例子涵盖了 kubernetes Go客户端的几个高阶的概念

  • 连接
  • 在集群中遍历资源列表
  • 监视资源对象

Setup

client-go项目支持Godepdep管理依赖。为了方便起见,这里使用dep(例如,以下是代码中所需的最小Gopkg.toml配置,它依赖于client-go版本6.0和版本1.9的Kubernetes API:

[[constraint]]
  name = "k8s.io/api"
  version = "kubernetes-1.9.0"
[[constraint]]
  name = "k8s.io/apimachinery"
  version = "kubernetes-1.9.0"
[[constraint]]
  name = "k8s.io/client-go"
  version = "6.0.0"

接下来,执行dep ensure完成后面的事情。

Connecting to the API server

我们Go client程序第一步就是与API Server建立连接,为了实现连接,我们此时需要用到 clientcmd 包。

import (
...
    "k8s.io/client-go/tools/clientcmd"
)
func main() {
    kubeconfig := filepath.Join(
         os.Getenv("HOME"), ".kube", "config",
    )
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        log.Fatal(err)
    }
...
}

Client-go通过提供实用程序功能来从不同的上下文导入配置,从而使这成为一项微不足道的任务,主要有以下几种方式。

From a config file

如上例所示,您可以从kubeconfig文件获取配置用于连接API服务器。当你的代码在集群外运行时,这是理想的选择。

clientcmd.BuildConfigFromFlags("", configFile)

From a cluster

如果您的代码将部署在Kubernetes集群中,则可以使用与上面相同但参数为空的函数,在客户端代码注定要在pod中运行时,使用集群信息配置连接。

clientcmd.BuildConfigFromFlags("", "")

或者,使用 rest 包直接从集群信息创建配置,如下所示:

import "k8s.io/client-go/rest"
...
rest.InClusterConfig()

Create a clientset

我们需要创建一个客户端序列化对象来让我们访问API对象。Clientset类型来自 kubernetes 包,提供对生成的客户端序列化对象的访问,以访问版本化(比如:v1)API对象,如下所示。

type Clientset struct {
    *authenticationv1beta1.AuthenticationV1beta1Client
    *authorizationv1.AuthorizationV1Client
...
    *corev1.CoreV1Client
}

一旦我们完成了配置连接,我们就可以使用配置初始化一个clientset对象。如下所示

func main() {
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    ...
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }
}

对于我们的例子,我们使用`v1版本的API对象,因此接下来,我们将会使用clientset调用CoreV1方法来访问核心的API资源。

func main() {
    ...
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Fatal(err)
    }
    api := clientset.CoreV1()
}

You can see the available clientsets here.

Listing cluster PVCs

我们可以对clientset执行的最基本操作之一是检索存储的API对象的资源列表。在这里我们将检索一个特定的namespace的PVC列表,如下所示。

import (
...
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func main() {
    var ns, label, field string
    flag.StringVar(&ns, "namespace", "", "namespace")
    flag.StringVar(&label, "l", "", "Label selector")
    flag.StringVar(&field, "f", "", "Field selector")
...
    api := clientset.CoreV1()
    // setup list options
    listOptions := metav1.ListOptions{
        LabelSelector: label, 
        FieldSelector: field,
    }
    pvcs, err := api.PersistentVolumeClaims(ns).List(listOptions)
    if err != nil {
        log.Fatal(err)
    }
    printPVCs(pvcs)
...
}

在上面的代码片段中,我们使用ListOptions来指定标签和字段选择器(以及命名空间),以缩小返回的类型为v1.PeristentVolumeClaimList的PVC资源。下一个代码段显示了我们如何遍历和打印从集群中检索到的PVC列表。

func printPVCs(pvcs *v1.PersistentVolumeClaimList) {
    template := "%-32s%-8s%-8s\n"
    fmt.Printf(template, "NAME", "STATUS", "CAPACITY")
    for _, pvc := range pvcs.Items {
        quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
        fmt.Printf(
            template, 
            pvc.Name, 
            string(pvc.Status.Phase), 
            quant.String())
    }
}

Watching the cluster PVCs

Kubernetes Go客户端框架支持在指定的API对象生命周期事件中监视集群的能力,包括分别创建,更新和删除对象时生成的ADDEDMODIFIEDDELETED。对于我们的简单CLI工具,我们将使用此监视功能来监视已声明的持久存储对正在运行的群集的总容量。

当给定命名空间的总的声明容量达到某个阈值(比如200Gi)时,我们将采取任意行动。为简单起见,我们将在屏幕上打印通知。但是,在更复杂的实现中,可以使用相同的方式来触发某些自动操作。

Setup a watch

现在,让我们使用Watch方法为PersistentVolumeClaim资源创建一个监视器。然后,通过方法ResultChan从Go通道接受事件通知。

func main() {
...
    api := clientset.CoreV1()
    listOptions := metav1.ListOptions{
        LabelSelector: label, 
        FieldSelector: field,
    }
    watcher, err :=api.PersistentVolumeClaims(ns).
       Watch(listOptions)
    if err != nil {
      log.Fatal(err)
    }
    ch := watcher.ResultChan()
...
}

Loop through events

接下来,我们准备开始处理资源事件。然而,在我们处理事件之前,我们声明变量maxClaimsQuanttotalClaimQuant其中totalClaimQuant类型为resource.Quantity(表示k8s中的SI数量)来设置我们的数量阈值和运行总数。

import(
    "k8s.io/apimachinery/pkg/api/resource"
    ...
)
func main() {
    var maxClaims string
    flag.StringVar(&maxClaims, "max-claims", "200Gi", 
        "Maximum total claims to watch")
    var totalClaimedQuant resource.Quantity
    maxClaimedQuant := resource.MustParse(maxClaims)
...
    ch := watcher.ResultChan()
    for event := range ch {
        pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
        if !ok {
            log.Fatal("unexpected type")
        }
        ...
    }
}

上面的for-range循环中的watcher的通道用于处理来自服务器的事件传入通知。每个事件都分配给变量event,其中event.Object值被声明为PersistentVolumeClaim类型,因此我们可以提取所需的信息。

Processing ADDED events

添加新PVC时,event.Type设置为值watch.Added。然后,我们使用以下代码提取添加的声明(quant)的容量,将其添加到运行总容量(totalClaimedQuant)。最后,我们检查总容量是否大于定义的最大容量(maxClaimedQuant)。如果是大于,程序可以触发一个动作。

import(
    "k8s.io/apimachinery/pkg/watch"
    ...
)
func main() {
...
    for event := range ch {
        pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
        if !ok {
            log.Fatal("unexpected type")
        }
        quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
        switch event.Type {
            case watch.Added:
                totalClaimedQuant.Add(quant)
                log.Printf("PVC %s added, claim size %s\n", 
                    pvc.Name, quant.String())
                if totalClaimedQuant.Cmp(maxClaimedQuant) == 1 {
                    log.Printf(
                        "\nClaim overage reached: max %s at %s",
                        maxClaimedQuant.String(),
                        totalClaimedQuant.String())
                    // trigger action
                    log.Println("*** Taking action ***")
                }
            }
        ...
        }
    }
}

Process DELETED events

代码也会在删除PVC时做出反应。它从运行总计数中减少已删除的PVC大小。

func main() {
...
    for event := range ch {
        ...
        switch event.Type {
        case watch.Deleted:
            quant := pvc.Spec.Resources.Requests[v1.ResourceStorage]
            totalClaimedQuant.Sub(quant)
            log.Printf("PVC %s removed, size %s\n", 
               pvc.Name, quant.String())
            if totalClaimedQuant.Cmp(maxClaimedQuant) <= 0 {
                log.Printf("Claim usage normal: max %s at %s",
                    maxClaimedQuant.String(),
                    totalClaimedQuant.String(),
                )
                // trigger action
                log.Println("*** Taking action ***")
            }
        }
        ...
    }
}

Run the program

# 创建两个pvc,此时观察程序的日志输出
$ helm install --name my-redis2-redis    \
--set persistence.enabled=true,persistence.size=100Gi stable/redis
$ helm install --name my-redis-redis    \
--set persistence.enabled=true,persistence.size=50Gi stable/redis

当针对正在运行的集群执行程序时,它首先显示现有PVC的列表。然后它开始在集群中查看新的PersistentVolumeClaim事件。

$ ./pvcwatch
Using kubeconfig:  /Users/vladimir/.kube/config
--- PVCs ----
NAME                            STATUS  CAPACITY
my-redis-redis                  Bound   50Gi
my-redis2-redis                 Bound   100Gi
-----------------------------
Total capacity claimed: 150Gi
-----------------------------
--- PVC Watch (max claims 200Gi) ----
2018/02/13 21:55:03 PVC my-redis2-redis added, claim size 100Gi
2018/02/13 21:55:03
At 50.0% claim capcity (100Gi/200Gi)
2018/02/13 21:55:03 PVC my-redis-redis added, claim size 50Gi
2018/02/13 21:55:03
At 75.0% claim capcity (150Gi/200Gi)

接下来,让我们将另一个应用程序部署到请求额外的75Gi存储声明的集群中(对于我们的示例,让我们使用Helm来部署一个实例,例如,InfluxDB 实例)。

$ helm install --name my-influx \
--set persistence.enabled=true,persistence.size=75Gi stable/influxdb

如下所示,我们的工具会立即对新的声明卷作出反应,并显示我们的警报,因为总声明容量超过了阈值。

--- PVC Watch (max claims 200Gi) ----
...
2018/02/13 21:55:03
At 75.0% claim capcity (150Gi/200Gi)
2018/02/13 22:01:29 PVC my-influx-influxdb added, claim size 75Gi
2018/02/13 22:01:29
Claim overage reached: max 200Gi at 225Gi
2018/02/13 22:01:29 *** Taking action ***
2018/02/13 22:01:29
At 112.5% claim capcity (225Gi/200Gi)

相反,当从集群中删除PVC时,该工具会相应地响应警报消息。

...
At 112.5% claim capcity (225Gi/200Gi)
2018/02/14 11:30:36 PVC my-redis2-redis removed, size 100Gi
2018/02/14 11:30:36 Claim usage normal: max 200Gi at 125Gi
2018/02/14 11:30:36 *** Taking action ***

References

Series table of content

Code — https://github.com/vladimirvivien/k8s-client-examples

Kubernetes clients — https://kubernetes.io/docs/reference/client-libraries/

Client-go — https://github.com/kubernetes/client-go

Kubernetes API reference — https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/