前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用python进行操作k8s api

使用python进行操作k8s api

原创
作者头像
dbdocker
发布2023-03-14 16:09:27
1.3K0
发布2023-03-14 16:09:27
举报
文章被收录于专栏:JupyterHub
代码语言:javascript
复制
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2022/5/13 11:42
# @Author  : xxx
# @File    : s.py
# @Description : 这个类封装kube-sdk

import os
import re
import time
import kubernetes
import requests
from  kubernetes.stream import stream
from kubernetes import client, config


from kubernetes.client.api.networking_v1beta1_api import NetworkingV1beta1Api

class Kube(object):
    def __init__(self, source=None,namespace=None):
        # print("开始请求".center(40, "*"), time.ctime())
        # print("初始化kube对象",source)
        self.source = source
        if source == "intranet-dev":
            # print(local_kubeconfig)
            self.config_file = local_kubeconfig
            # self.config_file = "/application/Repair/Repair/libs/local-config"
            config.kube_config.load_kube_config(config_file=self.config_file)

        elif source == "tencent-prod":
            # print(tencent_kubeconfig)
            # self.config_file = "D:\pyrepair\Repair\Repair\libs\\ten-config"
            self.config_file = tencent_kubeconfig
            config.kube_config.load_kube_config(config_file=self.config_file)

        elif source == "aliyun-prod":
            # print(aliyun_kubeconfig)
            self.config_file = aliyun_kubeconfig
            config.kube_config.load_kube_config(config_file=self.config_file)

        elif os.name == "nt":
            # print(local_kubeconfig)
            self.config_file = local_kubeconfig
            # self.config_file = "/application/Repair/Repair/libs/config/local-config"
            config.kube_config.load_kube_config(config_file=self.config_file)

        else:
            pass

        # 获取api对象
        self.namespace = namespace
        self.Api_Instance = client.CoreV1Api()
        self.Api_Beatch = client.BatchApi()
        self.Api_Apps = client.AppsV1Api()
        self.k8s_client = client.ApiClient()
        self.Api_Network = NetworkingV1beta1Api()

    # 获取ingress
    def get_ingress(self, flag=False):
        ingress_list = []
        # flag = True 进行单空间查询 反之 为多空间
        if flag:
            time_list_namespace = self.list_namespaces(flag)
            # print(time_list_namespace)

            ingress_obj = self.Api_Network.list_ingress_for_all_namespaces()
            for ingress in ingress_obj.items:
                # if ingress.metadata.namespace == "wuhan":
                for rule in ingress.spec.rules:
                    for path in rule.http.paths:
                        if ingress.metadata.namespace in time_list_namespace:
                            # if ingress.metadata.namespace == "muleizhwl":
                            data = {
                                "namespace": ingress.metadata.namespace,
                                "ingress": ingress.metadata.name,
                                "dns": ingress.spec.rules[0].host,
                                "service": path.backend.service_name,
                                "port": path.backend.service_port,
                                "path": path.path,
                            }
                            ingress_list.append(data)
        else:
            ingress_obj = self.Api_Network.list_ingress_for_all_namespaces()
            for ingress in ingress_obj.items:
                # if ingress.metadata.namespace == "wuhan":
                for rule in ingress.spec.rules:
                    for path in rule.http.paths:
                        data = {
                            "namespace": ingress.metadata.namespace,
                            "ingress": ingress.metadata.name,
                            "dns": ingress.spec.rules[0].host,
                            "service": path.backend.service_name,
                            "port": path.backend.service_port,
                            "path": path.path,
                        }
                        ingress_list.append(data)
        return ingress_list

    # 写入文件
    def write_execl(self, flag=False):
        ingress_list = self.get_ingress(flag)
        data_tp = ("空间", "ingress", "域名", "服务", "端口", "路由")
        _execl = Execl()
        data = _execl.format_data(data_info=ingress_list)
        result = _execl.write_excel(file="ingress情况表.xlsx", data=data, title=data_tp)
        if result:
            print("写入完成")

    # 获取空间
    def list_namespaces(self, flag=False):
        # print("获取空间")
        namespaces = self.Api_Instance.list_namespace()
        namespace_list_line = []
        time_list_namespace = []

        for namespace in namespaces.items:
            # 启用时间筛选
            if flag:
                if namespace.metadata.name == "sjj-wangmangling-dev":
                    print(namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-"))
                    break
            else:

                if namespace.metadata.labels.get("kubesphere.io/devopsproject") == None:
                    desc = namespace.metadata.annotations
                    data = self.get_patch_namespace(namespace,desc)
                    namespace_list_line.append(data)
                    if len(namespace_list_line) == 0:
                        return time_list_namespace
        return namespace_list_line


    def get_patch_namespace(self,namespace,desc):
        cloud_type = models.DictItems.objects.filter(dict_code=self.source).first()

        # print("获取到的",cloud_type)

        head = self.re_head(namespace)
        try:
            #环境
            environment = namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-")[-3]
        except Exception:
            environment = ''

        ex_time = self.re_time(namespace)

        if self.source == 'aliyun-prod':
            url = "https://xxx/{0}/clusters/default/projects/{1}/deployments".format(
                namespace.metadata.labels.get("kubesphere.io/workspace"), namespace.metadata.name)

        elif self.source == "tencent-prod" or self.source == "intranet-dev":
            url = f"https://xxxx/{namespace.metadata.labels.get('kubesphere.io/workspace')}/clusters/{self.source}/projects/{namespace.metadata.name}/deployments"

        data = {
            "namespace": namespace.metadata.name,
            "desc": desc,
            "work": namespace.metadata.labels.get("kubesphere.io/workspace"),
            "createtime": namespace.metadata.creation_timestamp.strftime("%Y-%m-%d"),
            "url": url,
            "cloud_type": cloud_type,
            "create_time": namespace.metadata.creation_timestamp,
            "head": head,
            "ex_time": ex_time,
            "environment": environment,
        }
        return data




    # 处理配额
    def get_namespace_resource_quota(self, flag=False):
        namespace_quota = self.Api_Instance.list_resource_quota_for_all_namespaces()
        quota_new_list = []
        for quota in namespace_quota.items:
            if flag:
                break
            if quota.status.used == None:
                quotas = "无配置"
            else:
                quotas = quota.status.used.get("limits.memory")
            data = {
                "namespace": quota.metadata.namespace,
                "limit_memory": quotas,
            }
            quota_new_list.append(data)
        return quota_new_list

    """
    处理有状态服务
    """
    def get_statefulset_pod(self,namespace=None):
        new_statefulset_list = []
        statefulset_list = self.Api_Apps.list_namespaced_stateful_set(namespace=namespace)
        for statefulset_pod in statefulset_list.items:
            new_statefulset_list.append(statefulset_pod.metadata.name)
        return new_statefulset_list

    # 处理deployment控制器
    def get_list_deployments_probe(self, namespace=None, flag=False, namespaces=False):
        print("本次要获取的空间",namespace)
        new_namespaces_dict = {}

        # 获取所有空间
        if namespaces:
            namespace = self.list_namespaces()
            for name in namespace:
                deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list = self.get_list_deployment_dict_data(
                    name.get("namespace"), flag)
                new_namespaces_dict["deployment_list_line"] = deployment_list_line
                new_namespaces_dict["deployment_list_limit"] = deployment_list_limit
                new_namespaces_dict["deployment_list_desc"] = deployment_list_desc
            return new_namespaces_dict
        else:
            print("手动获取空间")
            deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list,clse_list = self.get_list_deployment_dict_data(namespace,flag)
            return deployment_list_line, deployment_list_limit, deployment_list_desc,middleware_list,clse_list



    def re_head(self,namespace):
        try:
            if "-" in re.search("负责人(.*)",
                                 namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")).group(1):
                head = re.search("负责人(.*)",
                                 namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")).group(1).split("-")[0]
            else:

                    head = re.search("负责人(.*)",
                                     namespace.metadata.annotations.get(
                                         "kubesphere.io/alias-name")).group(1)
        except Exception:
            return ""
        return head

    def re_time(self,namespace):
        try:
            if re.search("-([0-9]{4}-[0-9]{2}-[0-9]{2})", namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")):
                ex_time = re.search("-([0-9]{4}-[0-9]{2}-[0-9]{2})", namespace.metadata.annotations.get(
                                     "kubesphere.io/alias-name")).group(1)
            else:
                ex_time = namespace.metadata.annotations.get("kubesphere.io/alias-name").split("-")[-1]
        except Exception:
            return ""
        # print(ex_time)
        return ex_time


    # 处理deployment的资源组合成dict数据
    def get_list_deployment_dict_data(self, namespace, flag=False):
        deployment_list_line = []
        deployment_list_limit = []
        deployment_list_desc = []
        middleware_list = []
        clse_list = []
        statefulset_list = self.get_statefulset_pod(namespace)
        list_deployment_response = self.Api_Apps.list_namespaced_deployment(namespace)

        data_middleware_list = ["redis","mysql","elasticsearch","es","mongo-bi-v1","redis-v1","mongo-bi","rabbitmq-server","nacos","rabbitmq","mongo"
                ,"elasticsearch-v1",
                "mongdo","rabbitmq-v1","rmq","mysql-v1","sentinel","minio","redis5","mongodb","mysql-server","mongodb-bi"]


        for deployment in list_deployment_response.items:
            if  deployment.metadata.name in data_middleware_list:
                middleware_list.append(deployment.metadata.name)
                continue
            else:
                if flag:
                    print("控制器", deployment)
                else:
                    if deployment.spec.replicas == 0:
                        print("以关闭空间",deployment.metadata.self_link.split("/")[5])
                        clse_list.append(deployment.metadata.self_link.split("/")[5])
                        time.sleep(2)

                    else:
                        if deployment.spec.template.spec.containers[0].liveness_probe == None and \
                                deployment.spec.template.spec.containers[0].readiness_probe == None:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "line": "×",
                                    "cloud_type":models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_line.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "line": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_line.append(data)
                        else:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "line": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),

                                }
                                deployment_list_line.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "line": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),

                                }
                                deployment_list_line.append(data)

                        if deployment.metadata.annotations.get(
                                "kubesphere.io/description") == None and deployment.spec.replicas == 1:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "desc": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_desc.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "desc": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_desc.append(data)
                        else:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "desc": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_desc.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "desc": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),
                                }
                                deployment_list_desc.append(data)

                        if deployment.spec.template.spec.containers[0] \
                                .resources.limits == None and deployment.spec.replicas == 1:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "limit": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_limit.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "limit": "×",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_limit.append(data)
                        else:
                            try:
                                data = {
                                    "namespace": deployment.metadata.self_link.split("/")[5],
                                    "deployment": deployment.metadata.name,
                                    "limit": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),


                                }
                                deployment_list_limit.append(data)
                            except Exception:
                                data = {
                                    "namespace": "无命名空间",
                                    "deployment": deployment.metadata.name,
                                    "limit": "√",
                                    "cloud_type": models.DictItems.objects.filter(dict_code=self.source).first(),

                                }
                                deployment_list_limit.append(data)
        new_middleware_list = middleware_list + statefulset_list
        return deployment_list_line, deployment_list_limit, deployment_list_desc,new_middleware_list,clse_list



    # 获取非running状态
    def get_not_running(self, running=True):
        list_pod_data = []
        project_data = []
        # 获取所有空间的pod
        pod_list = self.Api_Instance.list_pod_for_all_namespaces(watch=False)
        for pod in pod_list.items:
            # print(pod)
            if pod.status.phase != "Running":
                if pod.status.phase != "Succeeded":
                    # print("结果",pod)
                    try:
                        if pod.status.container_statuses[0].ready == True:
                            ready = "准备好"
                        else:
                            ready = "未准备好"

                        if pod.status.container_statuses[0].started == True:
                            status = "已启动"
                        else:
                            status = "未启动"
                        restart = pod.status.container_statuses[0].restart_count
                    except Exception:
                        status, ready, restart = None, None, None

                    msg_dict = {"project": pod.metadata.namespace, "service": pod.metadata.name,
                                "status": pod.status.phase, "reay": ready,
                                "runing": status,
                                "restart": restart}
                    list_pod_data.append("项目:{0}\t 服务名称:{1}\t 当前状态:{2}\t 是否准备好:{3}\t 是否启动:{4}\t  自动重启次数:{5}".format(
                        pod.metadata.namespace, pod.metadata.name,
                        pod.status.phase, ready,
                        status, restart))
                    project_data.append(msg_dict)
                    # self.send_msg(msg=msg_dict)
        if running != True:
            return project_data

        return list_pod_data


    # 创建deployment使用yaml格式
    def create_namespace_deployment_yaml(self, namespace, images, name, port,cluster_name):
        #print("端口号",port)
        if int(port) != 80:
            print("是java")
            init_time = 60
            time_out = 3
            path = "/health"
        else:
            print("非java")
            init_time = 5
            time_out = 1
            path = "/"
        deployment_pvc_json = {'apiVersion': 'v1', 'kind': 'PersistentVolumeClaim', 'metadata': {'name': name},
                               'spec': {'accessModes': ['ReadWriteOnce'],
                                        'the__resources': {'requests': {'storage': '10Gi'}},
                                        'storageClassName': 'nfs-client'}}

        deployment_json = {'kind': 'Deployment', 'apiVersion': 'apps/v1',
                           'metadata': {'name': name, 'namespace': namespace, "labels": {'app': name},
                                        'annotations': {'deployment.kubernetes.io/revision': '1.txt'}},
                           'spec': {'replicas': 1, 'selector': {'matchLabels': {'app': name}},
                                    'template': {'metadata': {'labels': {'app': name}}, 'spec': {'volumes': [
                                        {'name': 'host-time', 'hostPath': {'path': '/etc/localtime', 'type': ''}},
                                        {'name': name}], 'containers': [{'name': name, 'image': images, 'ports': [
                                        {'name': 'tcp-{0}'.format(port), 'containerPort': int(port),
                                         'protocol': 'TCP'}],
                                                                         'the__resources': {
                                                                             'limits': {'cpu': '1.txt', 'memory': '1500Mi'},
                                                                             'requests': {'cpu': '500m',
                                                                                          'memory': '500Mi'}},
                                                                         'volumeMounts': [
                                                                             {'name': 'host-time', 'readOnly': True,
                                                                              'mountPath': '/etc/localtime'},
                                                                             {'name': name, 'mountPath': '/data/db'}],
                                                                         'livenessProbe': {
                                                                             'httpGet': {
                                                                                 'path': path,
                                                                                 'port': int(port),
                                                                                 'scheme': 'HTTP'
                                                                                 },
                                                                             'initialDelaySeconds': init_time,
                                                                             'timeoutSeconds': time_out,
                                                                             'periodSeconds': 10,
                                                                             'successThreshold': 1,
                                                                             'failureThreshold': 3
                                                                             },
                                                                         'readinessProbe': {
                                                                             'httpGet': {
                                                                                 'path': path,
                                                                                 'port': int(port),
                                                                                 'scheme': 'HTTP'
                                                                             },
                                                                             'initialDelaySeconds': init_time,
                                                                             'timeoutSeconds': time_out,
                                                                             'periodSeconds': 10,
                                                                             'successThreshold': 1,
                                                                             'failureThreshold': 3
                                                                         },
                                                                         'imagePullPolicy': 'Always'}],
                                        'imagePullSecrets': [{"name": "harbor"}],
                                        'restartPolicy': 'Always'},
                                                 }}}

        deployment_svc_json = {'kind': 'Service', 'apiVersion': 'v1',
                               'metadata': {'name': name, 'namespace': namespace, 'labels': {'app': name}}, 'spec': {
                'ports': [
                    {'name': 'tcp-{0}'.format(port), 'protocol': 'TCP', 'port': int(port), 'targetPort': int(port)}],
                'selector': {'app': name}, 'type': 'ClusterIP'}}

        """{'kind': 'Secret', 'apiVersion': 'v1', 'metadata': {'name': 'xx', 'namespace': 'devops', 'annotations': {'kubesphere.io/alias-name': 'xx', 'kubesphere.io/creator': 'liuchengguo', 'kubesphere.io/description': 'xx'}}, 'data': {'tls.crt': '1.txt', 'tls.key': '2'}, 'type': 'kubernetes.io/tls'}"""
        # print(json.dumps(deployment_svc_json))

        deploy_list = self.Api_Apps.list_namespaced_deployment(namespace=namespace)
        new_deploy_list = []

        if len(deploy_list.items) == 0:
            print("检验部署")
            try:
                self.Api_Apps.create_namespaced_deployment(body=deployment_json, namespace=namespace)
                self.Api_Instance.create_namespaced_service(body=deployment_svc_json, namespace=namespace)
                print("部署成功")
                return True
            except Exception as e:
                return "创建deployment失败,部署失败原因:{0}".format(json.loads(e.body).get("message"))
        else:
            print("检验部署1")
            for deploy in deploy_list.items:
                new_deploy_list.append(deploy.metadata.name)
            print(new_deploy_list)

            if name in new_deploy_list:
                print("存在")
                try:
                    depl = self.Api_Apps.replace_namespaced_deployment(body=deployment_json, namespace=namespace,
                                                                             name=name)
                    return True
                except Exception as e:
                    return "创建deployment失败,部署失败原因:{0} 部署环境:{1}".format(json.loads(e.body).get("message"),cluster_name)
            else:
                try:
                    self.Api_Apps.create_namespaced_deployment(body=deployment_json, namespace=namespace)
                    self.Api_Instance.create_namespaced_service(body=deployment_svc_json, namespace=namespace)
                    print("部署成功")
                    return True
                except Exception as e:
                    return "创建deployment失败,部署失败原因:{0} 部署环境:{1}".format(json.loads(e.body).get("message"),cluster_name)



        # try:
        #     pv = self.Api_Instance.create_namespaced_persistent_volume_claim(body=deployment_pvc_json,
        #                                                                      namespace=namespace)
        # except Exception:
        #     print("创建pv失败")


    """监听ingress对象"""
    def watch_all_namespaces_ingress(self):
        try:
            api_response = self.Api_Network.list_ingress_for_all_namespaces(allow_watch_bookmarks=True, watch=True)

        except Exception as e:
            print("watch失败:{0}", e)


    """监听deployment对象"""
    def watch_all_namespaces_deployment(self):
        # watch = kubernetes.watch.Watch()
        # for e in watch.stream(self.Api_Instance.list_event_for_all_namespaces):
        #     print("操作:{0}\t\t空间:{1.txt}\t\t服务:{2}".format(e.get("type"),e.get("object").metadata.namespace,e.get("object").metadata.name))
        deployment_list = self.Api_Apps.list_deployment_for_all_namespaces()
        len_deployment = len(deployment_list.items)
        new_len_add_deploy = []

        new_len_deployment = 1
        watch = kubernetes.watch.Watch()
        for e in watch.stream(self.Api_Apps.list_deployment_for_all_namespaces):
            if new_len_deployment <= len_deployment and e.get("type") == "ADDED":
                new_len_deployment += 1
            else:
                alias_name = e.get("object").metadata.annotations.get("kubesphere.io/alias-name") if e.get(
                    "object").metadata.annotations.get("kubesphere.io/alias-name") else "无别名"
                description = e.get("object").metadata.annotations.get("kubesphere.io/description") if e.get(
                    "object").metadata.annotations.get("kubesphere.io/description") else "无描述"

                new_len_add_deploy.append(e.get("type"))
                namespace = e.get("object").metadata.namespace
                name = e.get("object").metadata.name

                replicas = 0 if e.get("object").status.replicas == None else e.get("object").status.replicas
                ready_replicas = 0 if e.get("object").status.ready_replicas == None else e.get(
                    "object").status.ready_replicas

                try:
                    image = e.get("raw_object").get("spec").get("template").get("spec").get("containers")[0].get(
                        "image")
                except Exception:
                    print("非更新操作")
                    image = "无"

                "删除"
                if e.get("type") == "DELETED":

                    kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
                    print("删除匹配", kube_namespace)
                    if kube_namespace:
                        msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 删除\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n   > \n\n  > **当前镜像:** {4}".format(
                            namespace, name, alias_name, description, image)

                        self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                        new_len_add_deploy = []

                "修改"
                if e.get("type") == "MODIFIED" and len(new_len_add_deploy) == 6:
                    print(new_len_add_deploy)

                    kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
                    print("更新匹配", kube_namespace)
                    if kube_namespace:
                        msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 修改\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n   > \n\n  > **当前镜像:** {4}".format(
                            namespace, name, alias_name, description, image)
                        self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                        new_len_add_deploy = []

                "添加"
                if e.get("type") == "ADDED" and "ADDED" in new_len_add_deploy:
                    print("ADDED", new_len_add_deploy)

                    kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1).first()
                    print("添加匹配", kube_namespace)
                    if kube_namespace:
                        msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 添加\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n   > \n\n  > **当前镜像:** {4}".format(
                            namespace, name, alias_name, description, image)
                        self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                        new_len_add_deploy = []

    #监听pod状态
    def watch_all_namespaces_pod(self):
        pod_list = self.Api_Instance.list_pod_for_all_namespaces()
        len_pod = len(pod_list.items)
        new_len_add_pod = []

        new_len_pod = 1
        watch = kubernetes.watch.Watch()
        for e in watch.stream(self.Api_Instance.list_pod_for_all_namespaces):
            if new_len_pod <= len_pod and e.get("type") == "ADDED":
                new_len_pod += 1
            else:
                print("开始监听pod",e.get("type"))
                description = e.get("object").metadata.annotations
                # description = e.get("object").metadata.annotations.get("kubesphere.io/description") if e.get(
                #     "object").metadata.annotations.get("kubesphere.io/description") else "无描述"

                new_len_add_pod.append(e.get("type"))
                namespace = e.get("object").metadata.namespace
                name = e.get("object").metadata.name
                status = e.get("object").status.phase

                "删除"
                if e.get("type") == "DELETED":

                    # kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
                    # print("删除匹配", kube_namespace)
                    # if kube_namespace:
                    #     msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 删除\n\n > **空间:** {0}\n\n > **服务:** {1.txt}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n   > \n\n  > **当前镜像:** {4}".format(
                    #         namespace, name, alias_name, description, image)

                        # self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                    msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 删除\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n  > **当前镜像:** {2}".format(
                        namespace, name, description)
                    print(msg_json)
                    new_len_add_deploy = []

                "修改"
                if e.get("type") == "MODIFIED" and len(new_len_add_pod) == 6:
                    # print(new_len_add_deploy)
                    #
                    # kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
                    # print("更新匹配", kube_namespace)
                    # if kube_namespace:
                    #     msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 修改\n\n > **空间:** {0}\n\n > **服务:** {1.txt}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n   > \n\n  > **当前镜像:** {4}".format(
                    #         namespace, name, alias_name, description, image)
                    #     self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                    msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 更新\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n  > **当前镜像:** {2}".format(
                        namespace, name, description)
                    print(msg_json)
                    new_len_add_deploy = []


                "添加"
                if e.get("type") == "ADDED" and "ADDED" in new_len_add_pod:
                    # print("ADDED", new_len_add_deploy)
                    #
                    # kube_namespace = models.Kube.objects.filter(kubenamespaces=namespace, status=1.txt).first()
                    # print("添加匹配", kube_namespace)
                    # if kube_namespace:
                    #     msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 添加\n\n > **空间:** {0}\n\n > **服务:** {1.txt}\n\n > **服务别名:** {2}\n\n > **服务描述:** {3}\n\n   > \n\n  > **当前镜像:** {4}".format(
                    #         namespace, name, alias_name, description, image)
                    #     self.send_msg(msg_json=msg_json, token=kube_namespace.token)
                    msg_json = " ### K8S-POD操作告警提示:\n\n > **操作:** 添加\n\n > **空间:** {0}\n\n > **服务:** {1}\n\n  > **当前镜像:** {2}".format(
                        namespace, name, description)
                    # print(msg_json)
                    new_len_add_deploy = []

    def pod_exec(self,pod,container=""):
        exec_command = [
            "/bin/sh",
            "-c",
            'TERM=xterm-256color; export TERM; [ -x /bin/bash ] '
            '&& ([ -x /usr/bin/script ] '
            '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) '
            '|| exec /bin/sh']
        cont_stream = stream(self.Api_Instance.connect_get_namespaced_pod_exec,
                             name="pyrepair-f5fd48f6-nfh2k",
                             namespace="cmdb",
                             container=container,
                             command=exec_command,
                             stderr=True, stdin=True,
                             stdout=True, tty=True,
                             _preload_content=False
                             )
        return cont_stream

    def get_deployment_pod(self, RAND):

        try:
            r = self.Api_Instance.list_namespaced_pod(
                namespace=self.namespace,
                label_selector="app=%s" % RAND
            )

            return True, r
        except Exception as e:
            return False, 'Get Deployment: ' + str(e)


    # 发送报警
    def send_msg(self, msg=None, token="331c2210aca410f0b3f333113579db3caacadca491a2e6060e2e3bbf2aafcf82",
                 msg_json=None):
        # print(msg_json)
        webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token={0}'.format(token)
        headers = {'Content-Type': 'application/json;charset=utf-8'}
        if msg:
            msg_str = "## K8S-POD状态异常告警提示:\n\n  项目:{0}\t 服务名称:{1}\t 当前状态:{2}\t 是否准备好:{3}\t 是否启动:{4}\t 重启次数:{5}\t".format(
                msg.get("project"), msg.get("service"), msg.get("status"), msg.get("reay"), msg.get("runing"),
                msg.get("restart"))
        else:
            msg_str = msg_json

        data = {
            "msgtype": "markdown",
            "markdown": {
                "title": "### K8S-POD状态异常告警提示:",
                "text": msg_str,
            },

            "at": {
                "atMobiles": 'reminders',
                "isAtAll": False,
            },
        }

        r = requests.post(url=webhook_url, json=data, headers=headers)

        return r.text

    #销毁对象
    def __del__(self):
        pass
        # print("销毁对象:{0}".format(self.source))


msg = {"msg": "服务器炸了"}


if __name__ == '__main__':
    client = Kube(source="intranet-dev")
    print(client.test(namespace="zoudao"))

作者这里面基本覆盖了大多调用api的示例

附带:导出execl

代码语言:javascript
复制
"""
公共写入execl方法
"""
from xlsxwriter import workbook


class Execl(object):
    def format_data(self, data_info, kube=False):
        '''
        从全部数据中整理出需要的数据
        '''
        result = []

        # 自定义组装数据
        for line in data_info:
            if kube:
                # 健康检查
                data = (
                    line.get('namespaces'),
                    line.get('name'),
                    line.get("server"),
                    line.get('porbe'),
                    line.get("limit"),
                    line.get("desc"),
                    line.get('url'),
                    line.get('kubetotalscore'),
                    line.get("cloud_type"),
                )
                # 空间
                # data = (
                #     line.get("cloud_type"),
                #     line.get('desc'),
                #     line.get('namespaces'),
                #     line.get('head'),
                #     line.get('environment'),
                #     line.get('time'),
                #     line.get('middleware'),
                #     line.get('url'),
                # )
            else:
                # data = (
                #     line.get("namespace"),
                #     line.get("ingress"),
                #     line.get("dns"),
                #     line.get("service"),
                #     line.get("port"),
                #     line.get("path")
                # )
                # print(line)
                # result.append(data)
                # break

                # 域名导出
                # data = (
                #     line.get("status"),
                #     line.get("domain"),
                #     line.get("type"),
                #     line.get("value"),
                #     line.get("remark"),
                #     line.get("etime"),
                # )

                # 数据库
                data = (
                    line.get("CreateTime"),
                    line.get("desc"),
                    line.get("DBName"),
                    line.get("DB"),
                    line.get("MySQLTotalExecutionCounts"),
                    line.get("MySQLTotalExecutionTimes"),
                    line.get("MaxExecutionTime"),
                    line.get("MaxLockTime"),
                    line.get("ParseTotalRowCounts"),
                    line.get("ParseMaxRowCount"),
                    line.get("ReturnTotalRowCounts"),
                    line.get("ReturnMaxRowCount"),
                    line.get("SQLText")
                )

                # 空间内存
                # data = (
                #     line.get('namespaces'),
                #     line.get('name'),
                #     line.get("memory"),
                #     line.get("url"),
                # )

                #代码统计
                # data = (
                #     line.get('project_name'),
                #     line.get('author_name'),
                #     line.get("author_email"),
                #     line.get("code"),
                # )

                # 短信模板
                # data = (
                #         line.get('template_code'),
                #         line.get('template_content'),
                #         line.get("template_name"),
                #         line.get('create_time'),
                #       )
            result.append(data)
        return result

    def write_excel(self, file=None, data=None, title=None):
        '''
        1.txt、设置 Excel 样式
        2、将数据写入到 Excel 中
        '''
        # 生成 Excel 文件
        work = workbook.Workbook(file)
        # 建立工作表,表名默认
        worksheet = work.add_worksheet()
        # 设置字体加粗、字体大小
        format_title = work.add_format({'bold': True, 'font_size': 10})
        # 设置水平对齐、垂直对齐
        format_title.set_align('center')
        format_title.set_align('vcenter')

        format_body = work.add_format({'font_size': 10})
        # 设置样式,行高、列宽
        worksheet.set_row(0, 25)
        worksheet.set_column(0, 0, 30)
        worksheet.set_column(1, 1, 20)
        worksheet.set_column(2, 3, 28)
        worksheet.set_column(4, 5, 25)
        worksheet.set_column(6, 6, 12)
        worksheet.set_column(7, 9, 16)
        worksheet.set_column(10, 11, 25)

        """
        自定义表头
        """
        # 定义表头
        title = title

        row = 0
        col = 0
        # 表头写入文件,引用样式
        for item in title:
            worksheet.write(row, col, item, format_title)
            col += 1

        for line in data:
            row += 1
            col = 0

            for key in line:
                worksheet.write(row, col, key, format_body)
                col += 1

        work.close()
        return True

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档